Compare commits

...

8 Commits

Author SHA1 Message Date
be5f9ab77f Merge pull request 'tracer: add Flush method' (#225) from traceimp into v3
Reviewed-on: #225
2023-07-04 00:26:33 +03:00
144dca0cae tracer: add Flush method
Some checks failed
pr / test (pull_request) Failing after 2m42s
lint / lint (pull_request) Failing after 1m29s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-07-04 00:25:41 +03:00
75173560e3 Merge pull request 'util/time: ParseDuration fix' (#222) from timefix into v3
Reviewed-on: #222
2023-05-29 14:04:41 +03:00
9b3bccd1f1 util/time: ParseDuration fix
All checks were successful
lint / lint (pull_request) Successful in 1m0s
pr / test (pull_request) Successful in 58s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-05-29 14:02:06 +03:00
ce125b77c1 Merge pull request 'util/time: fix duration parsing' (#219) from timefeature into v3
Reviewed-on: #219
2023-05-27 23:55:51 +03:00
2ee8d4ed46 util/time: fix duration parsing
Some checks failed
lint / lint (pull_request) Successful in 59s
pr / test (pull_request) Failing after 1m0s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-05-27 23:55:08 +03:00
f58781d076 Merge pull request 'server/noop: fix graceful unsubscribe' (#218) from unsubfix into v3
Reviewed-on: #218
2023-05-25 23:19:26 +03:00
e1af4aa3a4 server/noop: fix graceful unsubscribe
All checks were successful
pr / test (pull_request) Successful in 1m2s
lint / lint (pull_request) Successful in 59s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-05-25 23:18:47 +03:00
7 changed files with 100 additions and 74 deletions

View File

@@ -39,8 +39,6 @@ func FromOutgoingContext(ctx context.Context) (Metadata, bool) {
// FromContext returns metadata from the given context // FromContext returns metadata from the given context
// returned metadata shoud not be modified or race condition happens // returned metadata shoud not be modified or race condition happens
//
// Deprecated: use FromIncomingContext or FromOutgoingContext
func FromContext(ctx context.Context) (Metadata, bool) { func FromContext(ctx context.Context) (Metadata, bool) {
if ctx == nil { if ctx == nil {
return nil, false return nil, false
@@ -53,8 +51,6 @@ func FromContext(ctx context.Context) (Metadata, bool) {
} }
// NewContext creates a new context with the given metadata // NewContext creates a new context with the given metadata
//
// Deprecated: use NewIncomingContext or NewOutgoingContext
func NewContext(ctx context.Context, md Metadata) context.Context { func NewContext(ctx context.Context, md Metadata) context.Context {
if ctx == nil { if ctx == nil {
ctx = context.Background() ctx = context.Background()

View File

@@ -202,39 +202,6 @@ func (n *noopServer) Register() error {
n.Lock() n.Lock()
defer n.Unlock() defer n.Unlock()
cx := config.Context
var sub broker.Subscriber
for sb := range n.subscribers {
if sb.Options().Context != nil {
cx = sb.Options().Context
}
opts := []broker.SubscribeOption{broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck)}
if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.SubscribeGroup(queue))
}
if sb.Options().Batch {
// batch processing handler
sub, err = config.Broker.BatchSubscribe(cx, sb.Topic(), n.newBatchSubHandler(sb, config), opts...)
} else {
// single processing handler
sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.newSubHandler(sb, config), opts...)
}
if err != nil {
return err
}
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "subscribing to topic: %s", sb.Topic())
}
n.subscribers[sb] = []broker.Subscriber{sub}
}
n.registered = true n.registered = true
if cacheService { if cacheService {
n.rsvc = service n.rsvc = service
@@ -366,6 +333,10 @@ func (n *noopServer) Start() error {
} }
} }
if err := n.subscribe(); err != nil {
return err
}
go func() { go func() {
t := new(time.Ticker) t := new(time.Ticker)
@@ -449,6 +420,45 @@ func (n *noopServer) Start() error {
return nil return nil
} }
func (n *noopServer) subscribe() error {
config := n.Options()
cx := config.Context
var err error
var sub broker.Subscriber
for sb := range n.subscribers {
if sb.Options().Context != nil {
cx = sb.Options().Context
}
opts := []broker.SubscribeOption{broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck)}
if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.SubscribeGroup(queue))
}
if sb.Options().Batch {
// batch processing handler
sub, err = config.Broker.BatchSubscribe(cx, sb.Topic(), n.createBatchSubHandler(sb, config), opts...)
} else {
// single processing handler
sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.createSubHandler(sb, config), opts...)
}
if err != nil {
return err
}
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "subscribing to topic: %s", sb.Topic())
}
n.subscribers[sb] = []broker.Subscriber{sub}
}
return nil
}
func (n *noopServer) Stop() error { func (n *noopServer) Stop() error {
n.RLock() n.RLock()
if !n.started { if !n.started {

View File

@@ -191,7 +191,7 @@ func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subs
} }
//nolint:gocyclo //nolint:gocyclo
func (n *noopServer) newBatchSubHandler(sb *subscriber, opts Options) broker.BatchHandler { func (n *noopServer) createBatchSubHandler(sb *subscriber, opts Options) broker.BatchHandler {
return func(ps broker.Events) (err error) { return func(ps broker.Events) (err error) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
@@ -309,7 +309,7 @@ func (n *noopServer) newBatchSubHandler(sb *subscriber, opts Options) broker.Bat
} }
//nolint:gocyclo //nolint:gocyclo
func (n *noopServer) newSubHandler(sb *subscriber, opts Options) broker.Handler { func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handler {
return func(p broker.Event) (err error) { return func(p broker.Event) (err error) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {

View File

@@ -4,6 +4,8 @@ import (
"context" "context"
) )
var _ Tracer = (*noopTracer)(nil)
type noopTracer struct { type noopTracer struct {
opts Options opts Options
} }
@@ -21,6 +23,10 @@ func (t *noopTracer) Start(ctx context.Context, name string, opts ...SpanOption)
return NewSpanContext(ctx, span), span return NewSpanContext(ctx, span), span
} }
func (t *noopTracer) Flush(ctx context.Context) error {
return nil
}
func (t *noopTracer) Init(opts ...Option) error { func (t *noopTracer) Init(opts ...Option) error {
for _, o := range opts { for _, o := range opts {
o(&t.opts) o(&t.opts)

View File

@@ -16,6 +16,8 @@ type Tracer interface {
Init(...Option) error Init(...Option) error
// Start a trace // Start a trace
Start(ctx context.Context, name string, opts ...SpanOption) (context.Context, Span) Start(ctx context.Context, name string, opts ...SpanOption) (context.Context, Span)
// Flush flushes spans
Flush(ctx context.Context) error
} }
type Span interface { type Span interface {

View File

@@ -2,7 +2,9 @@ package time
import ( import (
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"strconv"
"time" "time"
) )
@@ -13,39 +15,42 @@ func ParseDuration(s string) (time.Duration, error) {
return 0, fmt.Errorf(`time: invalid duration "` + s + `"`) return 0, fmt.Errorf(`time: invalid duration "` + s + `"`)
} }
//var sb strings.Builder var p int
/* var hours int
loop:
for i, r := range s { for i, r := range s {
switch r { switch r {
case 'd': case 's', 'm':
n, err := strconv.Atoi(s[idx:i]) break loop
case 'h':
d, err := strconv.Atoi(s[p:i])
if err != nil { if err != nil {
return 0, errors.New("time: invalid duration " + s) return 0, errors.New("time: invalid duration " + s)
} }
s[idx:i] = fmt.Sprintf("%d", n*24) hours += d
default: p = i + 1
sb.WriteRune(r)
}
}
*/
var td time.Duration
var err error
switch s[len(s)-1] {
case 's', 'm', 'h':
td, err = time.ParseDuration(s)
case 'd': case 'd':
if td, err = time.ParseDuration(s[:len(s)-1] + "h"); err == nil { d, err := strconv.Atoi(s[p:i])
td *= 24 if err != nil {
return 0, errors.New("time: invalid duration " + s)
} }
hours += d * 24
p = i + 1
case 'y': case 'y':
if td, err = time.ParseDuration(s[:len(s)-1] + "h"); err == nil { n, err := strconv.Atoi(s[p:i])
year := time.Date(time.Now().Year(), time.December, 31, 0, 0, 0, 0, time.Local) if err != nil {
days := year.YearDay() return 0, errors.New("time: invalid duration " + s)
td *= 24 * time.Duration(days) }
var d int
for j := n - 1; j >= 0; j-- {
d += time.Date(time.Now().Year()+j, time.December, 31, 0, 0, 0, 0, time.Local).YearDay()
}
hours += d * 24
p = i + 1
} }
} }
return td, err return time.ParseDuration(fmt.Sprintf("%dh%s", hours, s[p:]))
} }
func (d Duration) MarshalJSON() ([]byte, error) { func (d Duration) MarshalJSON() ([]byte, error) {
@@ -62,7 +67,7 @@ func (d *Duration) UnmarshalJSON(b []byte) error {
*d = Duration(time.Duration(value)) *d = Duration(time.Duration(value))
return nil return nil
case string: case string:
dv, err := time.ParseDuration(value) dv, err := ParseDuration(value)
if err != nil { if err != nil {
return err return err
} }

View File

@@ -23,27 +23,34 @@ func TestUnmarshalJSON(t *testing.T) {
TTL Duration `json:"ttl"` TTL Duration `json:"ttl"`
} }
v := &str{} v := &str{}
var err error
err := json.Unmarshal([]byte(`{"ttl":"10ms"}`), v) err = json.Unmarshal([]byte(`{"ttl":"10ms"}`), v)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} else if v.TTL != 10000000 { } else if v.TTL != 10000000 {
t.Fatalf("invalid duration %v != 10000000", v.TTL) t.Fatalf("invalid duration %v != 10000000", v.TTL)
} }
err = json.Unmarshal([]byte(`{"ttl":"1y"}`), v)
if err != nil {
t.Fatal(err)
} else if v.TTL != 31536000000000000 {
t.Fatalf("invalid duration %v != 31536000000000000", v.TTL)
}
} }
func TestParseDuration(t *testing.T) { func TestParseDuration(t *testing.T) {
var td time.Duration var td time.Duration
var err error var err error
t.Skip()
td, err = ParseDuration("14d4h") td, err = ParseDuration("14d4h")
if err != nil { if err != nil {
t.Fatalf("ParseDuration error: %v", err) t.Fatalf("ParseDuration error: %v", err)
} }
if td.String() != "336h0m0s" { if td.String() != "340h0m0s" {
t.Fatalf("ParseDuration 14d != 336h0m0s : %s", td.String()) t.Fatalf("ParseDuration 14d != 340h0m0s : %s", td.String())
} }
td, err = ParseDuration("1y") td, err = ParseDuration("1y")
if err != nil { if err != nil {
t.Fatalf("ParseDuration error: %v", err) t.Fatalf("ParseDuration error: %v", err)