Compare commits
10 Commits
Author | SHA1 | Date | |
---|---|---|---|
de6efaee0b | |||
9e0e657003 | |||
be5f9ab77f | |||
144dca0cae | |||
75173560e3 | |||
9b3bccd1f1 | |||
ce125b77c1 | |||
2ee8d4ed46 | |||
f58781d076 | |||
e1af4aa3a4 |
@@ -7,7 +7,9 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
"github.com/imdario/mergo"
|
"github.com/imdario/mergo"
|
||||||
|
mid "go.unistack.org/micro/v3/util/id"
|
||||||
rutil "go.unistack.org/micro/v3/util/reflect"
|
rutil "go.unistack.org/micro/v3/util/reflect"
|
||||||
mtime "go.unistack.org/micro/v3/util/time"
|
mtime "go.unistack.org/micro/v3/util/time"
|
||||||
)
|
)
|
||||||
@@ -124,6 +126,20 @@ func fillValue(value reflect.Value, val string) error {
|
|||||||
}
|
}
|
||||||
value.Set(reflect.ValueOf(v))
|
value.Set(reflect.ValueOf(v))
|
||||||
case reflect.String:
|
case reflect.String:
|
||||||
|
switch val {
|
||||||
|
case "micro:generate uuid":
|
||||||
|
uid, err := uuid.NewRandom()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
val = uid.String()
|
||||||
|
case "micro:generate id":
|
||||||
|
uid, err := mid.New()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
val = uid
|
||||||
|
}
|
||||||
value.Set(reflect.ValueOf(val))
|
value.Set(reflect.ValueOf(val))
|
||||||
case reflect.Float32:
|
case reflect.Float32:
|
||||||
v, err := strconv.ParseFloat(val, 32)
|
v, err := strconv.ParseFloat(val, 32)
|
||||||
|
@@ -7,6 +7,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/config"
|
"go.unistack.org/micro/v3/config"
|
||||||
|
mid "go.unistack.org/micro/v3/util/id"
|
||||||
mtime "go.unistack.org/micro/v3/util/time"
|
mtime "go.unistack.org/micro/v3/util/time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -17,6 +18,9 @@ type cfg struct {
|
|||||||
IntValue int `default:"99"`
|
IntValue int `default:"99"`
|
||||||
DurationValue time.Duration `default:"10s"`
|
DurationValue time.Duration `default:"10s"`
|
||||||
MDurationValue mtime.Duration `default:"10s"`
|
MDurationValue mtime.Duration `default:"10s"`
|
||||||
|
MapValue map[string]bool `default:"key1=true,key2=false"`
|
||||||
|
UUIDValue string `default:"micro:generate uuid"`
|
||||||
|
IDValue string `default:"micro:generate id"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type cfgStructValue struct {
|
type cfgStructValue struct {
|
||||||
@@ -67,6 +71,21 @@ func TestDefault(t *testing.T) {
|
|||||||
if conf.StringValue != "after_load" {
|
if conf.StringValue != "after_load" {
|
||||||
t.Fatal("AfterLoad option not working")
|
t.Fatal("AfterLoad option not working")
|
||||||
}
|
}
|
||||||
|
if len(conf.MapValue) != 2 {
|
||||||
|
t.Fatalf("map value invalid: %#+v\n", conf.MapValue)
|
||||||
|
}
|
||||||
|
|
||||||
|
if conf.UUIDValue == "" {
|
||||||
|
t.Fatalf("uuid value empty")
|
||||||
|
} else if len(conf.UUIDValue) != 36 {
|
||||||
|
t.Fatalf("uuid value invalid: %s", conf.UUIDValue)
|
||||||
|
}
|
||||||
|
|
||||||
|
if conf.IDValue == "" {
|
||||||
|
t.Fatalf("id value empty")
|
||||||
|
} else if len(conf.IDValue) != mid.DefaultSize {
|
||||||
|
t.Fatalf("id value invalid: %s", conf.IDValue)
|
||||||
|
}
|
||||||
_ = conf
|
_ = conf
|
||||||
// t.Logf("%#+v\n", conf)
|
// t.Logf("%#+v\n", conf)
|
||||||
}
|
}
|
||||||
|
3
go.mod
3
go.mod
@@ -3,7 +3,8 @@ module go.unistack.org/micro/v3
|
|||||||
go 1.19
|
go 1.19
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/imdario/mergo v0.3.14
|
github.com/google/uuid v1.3.0
|
||||||
|
github.com/imdario/mergo v0.3.15
|
||||||
github.com/patrickmn/go-cache v2.1.0+incompatible
|
github.com/patrickmn/go-cache v2.1.0+incompatible
|
||||||
github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35
|
github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35
|
||||||
)
|
)
|
||||||
|
6
go.sum
6
go.sum
@@ -1,5 +1,7 @@
|
|||||||
github.com/imdario/mergo v0.3.14 h1:fOqeC1+nCuuk6PKQdg9YmosXX7Y7mHX6R/0ZldI9iHo=
|
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
|
||||||
github.com/imdario/mergo v0.3.14/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
|
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
|
github.com/imdario/mergo v0.3.15 h1:M8XP7IuFNsqUx6VPK2P9OSmsYsI/YFaGil0uD21V3dM=
|
||||||
|
github.com/imdario/mergo v0.3.15/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
|
||||||
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
|
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
|
||||||
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
|
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
|
||||||
github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35 h1:4mohWoM/UGg1BvFFiqSPRl5uwJY3rVV0HQX0ETqauqQ=
|
github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35 h1:4mohWoM/UGg1BvFFiqSPRl5uwJY3rVV0HQX0ETqauqQ=
|
||||||
|
@@ -39,8 +39,6 @@ func FromOutgoingContext(ctx context.Context) (Metadata, bool) {
|
|||||||
|
|
||||||
// FromContext returns metadata from the given context
|
// FromContext returns metadata from the given context
|
||||||
// returned metadata shoud not be modified or race condition happens
|
// returned metadata shoud not be modified or race condition happens
|
||||||
//
|
|
||||||
// Deprecated: use FromIncomingContext or FromOutgoingContext
|
|
||||||
func FromContext(ctx context.Context) (Metadata, bool) {
|
func FromContext(ctx context.Context) (Metadata, bool) {
|
||||||
if ctx == nil {
|
if ctx == nil {
|
||||||
return nil, false
|
return nil, false
|
||||||
@@ -53,8 +51,6 @@ func FromContext(ctx context.Context) (Metadata, bool) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewContext creates a new context with the given metadata
|
// NewContext creates a new context with the given metadata
|
||||||
//
|
|
||||||
// Deprecated: use NewIncomingContext or NewOutgoingContext
|
|
||||||
func NewContext(ctx context.Context, md Metadata) context.Context {
|
func NewContext(ctx context.Context, md Metadata) context.Context {
|
||||||
if ctx == nil {
|
if ctx == nil {
|
||||||
ctx = context.Background()
|
ctx = context.Background()
|
||||||
|
@@ -202,39 +202,6 @@ func (n *noopServer) Register() error {
|
|||||||
n.Lock()
|
n.Lock()
|
||||||
defer n.Unlock()
|
defer n.Unlock()
|
||||||
|
|
||||||
cx := config.Context
|
|
||||||
|
|
||||||
var sub broker.Subscriber
|
|
||||||
|
|
||||||
for sb := range n.subscribers {
|
|
||||||
if sb.Options().Context != nil {
|
|
||||||
cx = sb.Options().Context
|
|
||||||
}
|
|
||||||
|
|
||||||
opts := []broker.SubscribeOption{broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck)}
|
|
||||||
if queue := sb.Options().Queue; len(queue) > 0 {
|
|
||||||
opts = append(opts, broker.SubscribeGroup(queue))
|
|
||||||
}
|
|
||||||
|
|
||||||
if sb.Options().Batch {
|
|
||||||
// batch processing handler
|
|
||||||
sub, err = config.Broker.BatchSubscribe(cx, sb.Topic(), n.newBatchSubHandler(sb, config), opts...)
|
|
||||||
} else {
|
|
||||||
// single processing handler
|
|
||||||
sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.newSubHandler(sb, config), opts...)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if config.Logger.V(logger.InfoLevel) {
|
|
||||||
config.Logger.Infof(n.opts.Context, "subscribing to topic: %s", sb.Topic())
|
|
||||||
}
|
|
||||||
|
|
||||||
n.subscribers[sb] = []broker.Subscriber{sub}
|
|
||||||
}
|
|
||||||
|
|
||||||
n.registered = true
|
n.registered = true
|
||||||
if cacheService {
|
if cacheService {
|
||||||
n.rsvc = service
|
n.rsvc = service
|
||||||
@@ -366,6 +333,10 @@ func (n *noopServer) Start() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := n.subscribe(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
t := new(time.Ticker)
|
t := new(time.Ticker)
|
||||||
|
|
||||||
@@ -449,6 +420,45 @@ func (n *noopServer) Start() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *noopServer) subscribe() error {
|
||||||
|
config := n.Options()
|
||||||
|
|
||||||
|
cx := config.Context
|
||||||
|
var err error
|
||||||
|
var sub broker.Subscriber
|
||||||
|
|
||||||
|
for sb := range n.subscribers {
|
||||||
|
if sb.Options().Context != nil {
|
||||||
|
cx = sb.Options().Context
|
||||||
|
}
|
||||||
|
|
||||||
|
opts := []broker.SubscribeOption{broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck)}
|
||||||
|
if queue := sb.Options().Queue; len(queue) > 0 {
|
||||||
|
opts = append(opts, broker.SubscribeGroup(queue))
|
||||||
|
}
|
||||||
|
|
||||||
|
if sb.Options().Batch {
|
||||||
|
// batch processing handler
|
||||||
|
sub, err = config.Broker.BatchSubscribe(cx, sb.Topic(), n.createBatchSubHandler(sb, config), opts...)
|
||||||
|
} else {
|
||||||
|
// single processing handler
|
||||||
|
sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.createSubHandler(sb, config), opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.Logger.V(logger.InfoLevel) {
|
||||||
|
config.Logger.Infof(n.opts.Context, "subscribing to topic: %s", sb.Topic())
|
||||||
|
}
|
||||||
|
|
||||||
|
n.subscribers[sb] = []broker.Subscriber{sub}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (n *noopServer) Stop() error {
|
func (n *noopServer) Stop() error {
|
||||||
n.RLock()
|
n.RLock()
|
||||||
if !n.started {
|
if !n.started {
|
||||||
|
@@ -191,7 +191,7 @@ func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subs
|
|||||||
}
|
}
|
||||||
|
|
||||||
//nolint:gocyclo
|
//nolint:gocyclo
|
||||||
func (n *noopServer) newBatchSubHandler(sb *subscriber, opts Options) broker.BatchHandler {
|
func (n *noopServer) createBatchSubHandler(sb *subscriber, opts Options) broker.BatchHandler {
|
||||||
return func(ps broker.Events) (err error) {
|
return func(ps broker.Events) (err error) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if r := recover(); r != nil {
|
if r := recover(); r != nil {
|
||||||
@@ -309,7 +309,7 @@ func (n *noopServer) newBatchSubHandler(sb *subscriber, opts Options) broker.Bat
|
|||||||
}
|
}
|
||||||
|
|
||||||
//nolint:gocyclo
|
//nolint:gocyclo
|
||||||
func (n *noopServer) newSubHandler(sb *subscriber, opts Options) broker.Handler {
|
func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handler {
|
||||||
return func(p broker.Event) (err error) {
|
return func(p broker.Event) (err error) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if r := recover(); r != nil {
|
if r := recover(); r != nil {
|
||||||
|
@@ -4,6 +4,8 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var _ Tracer = (*noopTracer)(nil)
|
||||||
|
|
||||||
type noopTracer struct {
|
type noopTracer struct {
|
||||||
opts Options
|
opts Options
|
||||||
}
|
}
|
||||||
@@ -21,6 +23,10 @@ func (t *noopTracer) Start(ctx context.Context, name string, opts ...SpanOption)
|
|||||||
return NewSpanContext(ctx, span), span
|
return NewSpanContext(ctx, span), span
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *noopTracer) Flush(ctx context.Context) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (t *noopTracer) Init(opts ...Option) error {
|
func (t *noopTracer) Init(opts ...Option) error {
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
o(&t.opts)
|
o(&t.opts)
|
||||||
|
@@ -16,6 +16,8 @@ type Tracer interface {
|
|||||||
Init(...Option) error
|
Init(...Option) error
|
||||||
// Start a trace
|
// Start a trace
|
||||||
Start(ctx context.Context, name string, opts ...SpanOption) (context.Context, Span)
|
Start(ctx context.Context, name string, opts ...SpanOption) (context.Context, Span)
|
||||||
|
// Flush flushes spans
|
||||||
|
Flush(ctx context.Context) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type Span interface {
|
type Span interface {
|
||||||
|
@@ -2,7 +2,9 @@ package time
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -13,39 +15,42 @@ func ParseDuration(s string) (time.Duration, error) {
|
|||||||
return 0, fmt.Errorf(`time: invalid duration "` + s + `"`)
|
return 0, fmt.Errorf(`time: invalid duration "` + s + `"`)
|
||||||
}
|
}
|
||||||
|
|
||||||
//var sb strings.Builder
|
var p int
|
||||||
/*
|
var hours int
|
||||||
|
loop:
|
||||||
for i, r := range s {
|
for i, r := range s {
|
||||||
switch r {
|
switch r {
|
||||||
case 'd':
|
case 's', 'm':
|
||||||
n, err := strconv.Atoi(s[idx:i])
|
break loop
|
||||||
|
case 'h':
|
||||||
|
d, err := strconv.Atoi(s[p:i])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, errors.New("time: invalid duration " + s)
|
return 0, errors.New("time: invalid duration " + s)
|
||||||
}
|
}
|
||||||
s[idx:i] = fmt.Sprintf("%d", n*24)
|
hours += d
|
||||||
default:
|
p = i + 1
|
||||||
sb.WriteRune(r)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
var td time.Duration
|
|
||||||
var err error
|
|
||||||
switch s[len(s)-1] {
|
|
||||||
case 's', 'm', 'h':
|
|
||||||
td, err = time.ParseDuration(s)
|
|
||||||
case 'd':
|
case 'd':
|
||||||
if td, err = time.ParseDuration(s[:len(s)-1] + "h"); err == nil {
|
d, err := strconv.Atoi(s[p:i])
|
||||||
td *= 24
|
if err != nil {
|
||||||
|
return 0, errors.New("time: invalid duration " + s)
|
||||||
}
|
}
|
||||||
|
hours += d * 24
|
||||||
|
p = i + 1
|
||||||
case 'y':
|
case 'y':
|
||||||
if td, err = time.ParseDuration(s[:len(s)-1] + "h"); err == nil {
|
n, err := strconv.Atoi(s[p:i])
|
||||||
year := time.Date(time.Now().Year(), time.December, 31, 0, 0, 0, 0, time.Local)
|
if err != nil {
|
||||||
days := year.YearDay()
|
return 0, errors.New("time: invalid duration " + s)
|
||||||
td *= 24 * time.Duration(days)
|
}
|
||||||
|
var d int
|
||||||
|
for j := n - 1; j >= 0; j-- {
|
||||||
|
d += time.Date(time.Now().Year()+j, time.December, 31, 0, 0, 0, 0, time.Local).YearDay()
|
||||||
|
}
|
||||||
|
hours += d * 24
|
||||||
|
p = i + 1
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return td, err
|
return time.ParseDuration(fmt.Sprintf("%dh%s", hours, s[p:]))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d Duration) MarshalJSON() ([]byte, error) {
|
func (d Duration) MarshalJSON() ([]byte, error) {
|
||||||
@@ -62,7 +67,7 @@ func (d *Duration) UnmarshalJSON(b []byte) error {
|
|||||||
*d = Duration(time.Duration(value))
|
*d = Duration(time.Duration(value))
|
||||||
return nil
|
return nil
|
||||||
case string:
|
case string:
|
||||||
dv, err := time.ParseDuration(value)
|
dv, err := ParseDuration(value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@@ -23,27 +23,34 @@ func TestUnmarshalJSON(t *testing.T) {
|
|||||||
TTL Duration `json:"ttl"`
|
TTL Duration `json:"ttl"`
|
||||||
}
|
}
|
||||||
v := &str{}
|
v := &str{}
|
||||||
|
var err error
|
||||||
|
|
||||||
err := json.Unmarshal([]byte(`{"ttl":"10ms"}`), v)
|
err = json.Unmarshal([]byte(`{"ttl":"10ms"}`), v)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else if v.TTL != 10000000 {
|
} else if v.TTL != 10000000 {
|
||||||
t.Fatalf("invalid duration %v != 10000000", v.TTL)
|
t.Fatalf("invalid duration %v != 10000000", v.TTL)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = json.Unmarshal([]byte(`{"ttl":"1y"}`), v)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if v.TTL != 31536000000000000 {
|
||||||
|
t.Fatalf("invalid duration %v != 31536000000000000", v.TTL)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestParseDuration(t *testing.T) {
|
func TestParseDuration(t *testing.T) {
|
||||||
var td time.Duration
|
var td time.Duration
|
||||||
var err error
|
var err error
|
||||||
t.Skip()
|
|
||||||
td, err = ParseDuration("14d4h")
|
td, err = ParseDuration("14d4h")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("ParseDuration error: %v", err)
|
t.Fatalf("ParseDuration error: %v", err)
|
||||||
}
|
}
|
||||||
if td.String() != "336h0m0s" {
|
if td.String() != "340h0m0s" {
|
||||||
t.Fatalf("ParseDuration 14d != 336h0m0s : %s", td.String())
|
t.Fatalf("ParseDuration 14d != 340h0m0s : %s", td.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
td, err = ParseDuration("1y")
|
td, err = ParseDuration("1y")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("ParseDuration error: %v", err)
|
t.Fatalf("ParseDuration error: %v", err)
|
||||||
|
Reference in New Issue
Block a user