Compare commits
8 Commits
Author | SHA1 | Date | |
---|---|---|---|
be5f9ab77f | |||
144dca0cae | |||
75173560e3 | |||
9b3bccd1f1 | |||
ce125b77c1 | |||
2ee8d4ed46 | |||
f58781d076 | |||
e1af4aa3a4 |
@@ -39,8 +39,6 @@ func FromOutgoingContext(ctx context.Context) (Metadata, bool) {
|
||||
|
||||
// FromContext returns metadata from the given context
|
||||
// returned metadata shoud not be modified or race condition happens
|
||||
//
|
||||
// Deprecated: use FromIncomingContext or FromOutgoingContext
|
||||
func FromContext(ctx context.Context) (Metadata, bool) {
|
||||
if ctx == nil {
|
||||
return nil, false
|
||||
@@ -53,8 +51,6 @@ func FromContext(ctx context.Context) (Metadata, bool) {
|
||||
}
|
||||
|
||||
// NewContext creates a new context with the given metadata
|
||||
//
|
||||
// Deprecated: use NewIncomingContext or NewOutgoingContext
|
||||
func NewContext(ctx context.Context, md Metadata) context.Context {
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
|
@@ -202,39 +202,6 @@ func (n *noopServer) Register() error {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
|
||||
cx := config.Context
|
||||
|
||||
var sub broker.Subscriber
|
||||
|
||||
for sb := range n.subscribers {
|
||||
if sb.Options().Context != nil {
|
||||
cx = sb.Options().Context
|
||||
}
|
||||
|
||||
opts := []broker.SubscribeOption{broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck)}
|
||||
if queue := sb.Options().Queue; len(queue) > 0 {
|
||||
opts = append(opts, broker.SubscribeGroup(queue))
|
||||
}
|
||||
|
||||
if sb.Options().Batch {
|
||||
// batch processing handler
|
||||
sub, err = config.Broker.BatchSubscribe(cx, sb.Topic(), n.newBatchSubHandler(sb, config), opts...)
|
||||
} else {
|
||||
// single processing handler
|
||||
sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.newSubHandler(sb, config), opts...)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if config.Logger.V(logger.InfoLevel) {
|
||||
config.Logger.Infof(n.opts.Context, "subscribing to topic: %s", sb.Topic())
|
||||
}
|
||||
|
||||
n.subscribers[sb] = []broker.Subscriber{sub}
|
||||
}
|
||||
|
||||
n.registered = true
|
||||
if cacheService {
|
||||
n.rsvc = service
|
||||
@@ -366,6 +333,10 @@ func (n *noopServer) Start() error {
|
||||
}
|
||||
}
|
||||
|
||||
if err := n.subscribe(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go func() {
|
||||
t := new(time.Ticker)
|
||||
|
||||
@@ -449,6 +420,45 @@ func (n *noopServer) Start() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *noopServer) subscribe() error {
|
||||
config := n.Options()
|
||||
|
||||
cx := config.Context
|
||||
var err error
|
||||
var sub broker.Subscriber
|
||||
|
||||
for sb := range n.subscribers {
|
||||
if sb.Options().Context != nil {
|
||||
cx = sb.Options().Context
|
||||
}
|
||||
|
||||
opts := []broker.SubscribeOption{broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck)}
|
||||
if queue := sb.Options().Queue; len(queue) > 0 {
|
||||
opts = append(opts, broker.SubscribeGroup(queue))
|
||||
}
|
||||
|
||||
if sb.Options().Batch {
|
||||
// batch processing handler
|
||||
sub, err = config.Broker.BatchSubscribe(cx, sb.Topic(), n.createBatchSubHandler(sb, config), opts...)
|
||||
} else {
|
||||
// single processing handler
|
||||
sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.createSubHandler(sb, config), opts...)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if config.Logger.V(logger.InfoLevel) {
|
||||
config.Logger.Infof(n.opts.Context, "subscribing to topic: %s", sb.Topic())
|
||||
}
|
||||
|
||||
n.subscribers[sb] = []broker.Subscriber{sub}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *noopServer) Stop() error {
|
||||
n.RLock()
|
||||
if !n.started {
|
||||
|
@@ -191,7 +191,7 @@ func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subs
|
||||
}
|
||||
|
||||
//nolint:gocyclo
|
||||
func (n *noopServer) newBatchSubHandler(sb *subscriber, opts Options) broker.BatchHandler {
|
||||
func (n *noopServer) createBatchSubHandler(sb *subscriber, opts Options) broker.BatchHandler {
|
||||
return func(ps broker.Events) (err error) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
@@ -309,7 +309,7 @@ func (n *noopServer) newBatchSubHandler(sb *subscriber, opts Options) broker.Bat
|
||||
}
|
||||
|
||||
//nolint:gocyclo
|
||||
func (n *noopServer) newSubHandler(sb *subscriber, opts Options) broker.Handler {
|
||||
func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handler {
|
||||
return func(p broker.Event) (err error) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
|
@@ -4,6 +4,8 @@ import (
|
||||
"context"
|
||||
)
|
||||
|
||||
var _ Tracer = (*noopTracer)(nil)
|
||||
|
||||
type noopTracer struct {
|
||||
opts Options
|
||||
}
|
||||
@@ -21,6 +23,10 @@ func (t *noopTracer) Start(ctx context.Context, name string, opts ...SpanOption)
|
||||
return NewSpanContext(ctx, span), span
|
||||
}
|
||||
|
||||
func (t *noopTracer) Flush(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *noopTracer) Init(opts ...Option) error {
|
||||
for _, o := range opts {
|
||||
o(&t.opts)
|
||||
|
@@ -16,6 +16,8 @@ type Tracer interface {
|
||||
Init(...Option) error
|
||||
// Start a trace
|
||||
Start(ctx context.Context, name string, opts ...SpanOption) (context.Context, Span)
|
||||
// Flush flushes spans
|
||||
Flush(ctx context.Context) error
|
||||
}
|
||||
|
||||
type Span interface {
|
||||
|
@@ -2,7 +2,9 @@ package time
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -13,39 +15,42 @@ func ParseDuration(s string) (time.Duration, error) {
|
||||
return 0, fmt.Errorf(`time: invalid duration "` + s + `"`)
|
||||
}
|
||||
|
||||
//var sb strings.Builder
|
||||
/*
|
||||
for i, r := range s {
|
||||
switch r {
|
||||
case 'd':
|
||||
n, err := strconv.Atoi(s[idx:i])
|
||||
if err != nil {
|
||||
return 0, errors.New("time: invalid duration " + s)
|
||||
}
|
||||
s[idx:i] = fmt.Sprintf("%d", n*24)
|
||||
default:
|
||||
sb.WriteRune(r)
|
||||
var p int
|
||||
var hours int
|
||||
loop:
|
||||
for i, r := range s {
|
||||
switch r {
|
||||
case 's', 'm':
|
||||
break loop
|
||||
case 'h':
|
||||
d, err := strconv.Atoi(s[p:i])
|
||||
if err != nil {
|
||||
return 0, errors.New("time: invalid duration " + s)
|
||||
}
|
||||
}
|
||||
*/
|
||||
var td time.Duration
|
||||
var err error
|
||||
switch s[len(s)-1] {
|
||||
case 's', 'm', 'h':
|
||||
td, err = time.ParseDuration(s)
|
||||
case 'd':
|
||||
if td, err = time.ParseDuration(s[:len(s)-1] + "h"); err == nil {
|
||||
td *= 24
|
||||
}
|
||||
case 'y':
|
||||
if td, err = time.ParseDuration(s[:len(s)-1] + "h"); err == nil {
|
||||
year := time.Date(time.Now().Year(), time.December, 31, 0, 0, 0, 0, time.Local)
|
||||
days := year.YearDay()
|
||||
td *= 24 * time.Duration(days)
|
||||
hours += d
|
||||
p = i + 1
|
||||
case 'd':
|
||||
d, err := strconv.Atoi(s[p:i])
|
||||
if err != nil {
|
||||
return 0, errors.New("time: invalid duration " + s)
|
||||
}
|
||||
hours += d * 24
|
||||
p = i + 1
|
||||
case 'y':
|
||||
n, err := strconv.Atoi(s[p:i])
|
||||
if err != nil {
|
||||
return 0, errors.New("time: invalid duration " + s)
|
||||
}
|
||||
var d int
|
||||
for j := n - 1; j >= 0; j-- {
|
||||
d += time.Date(time.Now().Year()+j, time.December, 31, 0, 0, 0, 0, time.Local).YearDay()
|
||||
}
|
||||
hours += d * 24
|
||||
p = i + 1
|
||||
}
|
||||
}
|
||||
|
||||
return td, err
|
||||
return time.ParseDuration(fmt.Sprintf("%dh%s", hours, s[p:]))
|
||||
}
|
||||
|
||||
func (d Duration) MarshalJSON() ([]byte, error) {
|
||||
@@ -62,7 +67,7 @@ func (d *Duration) UnmarshalJSON(b []byte) error {
|
||||
*d = Duration(time.Duration(value))
|
||||
return nil
|
||||
case string:
|
||||
dv, err := time.ParseDuration(value)
|
||||
dv, err := ParseDuration(value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@@ -23,27 +23,34 @@ func TestUnmarshalJSON(t *testing.T) {
|
||||
TTL Duration `json:"ttl"`
|
||||
}
|
||||
v := &str{}
|
||||
var err error
|
||||
|
||||
err := json.Unmarshal([]byte(`{"ttl":"10ms"}`), v)
|
||||
err = json.Unmarshal([]byte(`{"ttl":"10ms"}`), v)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if v.TTL != 10000000 {
|
||||
t.Fatalf("invalid duration %v != 10000000", v.TTL)
|
||||
}
|
||||
|
||||
err = json.Unmarshal([]byte(`{"ttl":"1y"}`), v)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if v.TTL != 31536000000000000 {
|
||||
t.Fatalf("invalid duration %v != 31536000000000000", v.TTL)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseDuration(t *testing.T) {
|
||||
var td time.Duration
|
||||
var err error
|
||||
t.Skip()
|
||||
|
||||
td, err = ParseDuration("14d4h")
|
||||
if err != nil {
|
||||
t.Fatalf("ParseDuration error: %v", err)
|
||||
}
|
||||
if td.String() != "336h0m0s" {
|
||||
t.Fatalf("ParseDuration 14d != 336h0m0s : %s", td.String())
|
||||
if td.String() != "340h0m0s" {
|
||||
t.Fatalf("ParseDuration 14d != 340h0m0s : %s", td.String())
|
||||
}
|
||||
|
||||
td, err = ParseDuration("1y")
|
||||
if err != nil {
|
||||
t.Fatalf("ParseDuration error: %v", err)
|
||||
|
Reference in New Issue
Block a user