diff --git a/.gitea/workflows/lint.yml b/.gitea/workflows/lint.yml index 8da88945..1cc293dc 100644 --- a/.gitea/workflows/lint.yml +++ b/.gitea/workflows/lint.yml @@ -10,15 +10,15 @@ jobs: runs-on: ubuntu-latest steps: - name: setup-go - uses: https://gitea.com/actions/setup-go@v3 + uses: actions/setup-go@v3 with: - go-version: 1.18 + go-version: 1.21 - name: checkout - uses: https://gitea.com/actions/checkout@v3 + uses: actions/checkout@v3 - name: deps run: go get -v -d ./... - name: lint uses: https://github.com/golangci/golangci-lint-action@v3.4.0 continue-on-error: true with: - version: v1.52 \ No newline at end of file + version: v1.52 diff --git a/.gitea/workflows/pr.yml b/.gitea/workflows/pr.yml index ba947f0c..b3e67b06 100644 --- a/.gitea/workflows/pr.yml +++ b/.gitea/workflows/pr.yml @@ -10,14 +10,14 @@ jobs: runs-on: ubuntu-latest steps: - name: checkout - uses: https://gitea.com/actions/checkout@v3 + uses: actions/checkout@v3 - name: setup-go - uses: https://gitea.com/actions/setup-go@v3 + uses: actions/setup-go@v3 with: - go-version: 1.18 + go-version: 1.21 - name: deps run: go get -v -t -d ./... - name: test env: INTEGRATION_TESTS: yes - run: go test -mod readonly -v ./... \ No newline at end of file + run: go test -mod readonly -v ./... diff --git a/.gitignore b/.gitignore index 500d68ca..c2fff381 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,8 @@ # Develop tools /.vscode/ /.idea/ +.idea +.vscode # Binaries for programs and plugins *.exe @@ -13,6 +15,7 @@ _obj _test _build +.DS_Store # Architecture specific extensions/prefixes *.[568vq] diff --git a/broker/broker.go b/broker/broker.go index f95de4b4..80585c4c 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -4,19 +4,22 @@ package broker // import "go.unistack.org/micro/v3/broker" import ( "context" "errors" + "time" "go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v3/metadata" ) // DefaultBroker default memory broker -var DefaultBroker = NewBroker() +var DefaultBroker Broker = NewBroker() var ( // ErrNotConnected returns when broker used but not connected yet ErrNotConnected = errors.New("broker not connected") // ErrDisconnected returns when broker disconnected ErrDisconnected = errors.New("broker disconnected") + // DefaultGracefulTimeout + DefaultGracefulTimeout = 5 * time.Second ) // Broker is an interface used for asynchronous messaging. diff --git a/broker/memory.go b/broker/memory/memory.go similarity index 80% rename from broker/memory.go rename to broker/memory/memory.go index cd723118..d5609e1e 100644 --- a/broker/memory.go +++ b/broker/memory/memory.go @@ -4,6 +4,7 @@ import ( "context" "sync" + "go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/metadata" maddr "go.unistack.org/micro/v3/util/addr" @@ -15,7 +16,7 @@ import ( type memoryBroker struct { subscribers map[string][]*memorySubscriber addr string - opts Options + opts broker.Options sync.RWMutex connected bool } @@ -24,20 +25,20 @@ type memoryEvent struct { err error message interface{} topic string - opts Options + opts broker.Options } type memorySubscriber struct { ctx context.Context exit chan bool - handler Handler - batchhandler BatchHandler + handler broker.Handler + batchhandler broker.BatchHandler id string topic string - opts SubscribeOptions + opts broker.SubscribeOptions } -func (m *memoryBroker) Options() Options { +func (m *memoryBroker) Options() broker.Options { return m.opts } @@ -46,6 +47,12 @@ func (m *memoryBroker) Address() string { } func (m *memoryBroker) Connect(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + m.Lock() defer m.Unlock() @@ -70,6 +77,12 @@ func (m *memoryBroker) Connect(ctx context.Context) error { } func (m *memoryBroker) Disconnect(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + m.Lock() defer m.Unlock() @@ -81,27 +94,27 @@ func (m *memoryBroker) Disconnect(ctx context.Context) error { return nil } -func (m *memoryBroker) Init(opts ...Option) error { +func (m *memoryBroker) Init(opts ...broker.Option) error { for _, o := range opts { o(&m.opts) } return nil } -func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error { +func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error { msg.Header.Set(metadata.HeaderTopic, topic) - return m.publish(ctx, []*Message{msg}, opts...) + return m.publish(ctx, []*broker.Message{msg}, opts...) } -func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error { +func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { return m.publish(ctx, msgs, opts...) } -func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...PublishOption) error { +func (m *memoryBroker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { m.RLock() if !m.connected { m.RUnlock() - return ErrNotConnected + return broker.ErrNotConnected } m.RUnlock() @@ -111,9 +124,9 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...Pub case <-ctx.Done(): return ctx.Err() default: - options := NewPublishOptions(opts...) + options := broker.NewPublishOptions(opts...) - msgTopicMap := make(map[string]Events) + msgTopicMap := make(map[string]broker.Events) for _, v := range msgs { p := &memoryEvent{opts: m.opts} @@ -188,11 +201,11 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...Pub return nil } -func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler BatchHandler, opts ...SubscribeOption) (Subscriber, error) { +func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { m.RLock() if !m.connected { m.RUnlock() - return nil, ErrNotConnected + return nil, broker.ErrNotConnected } m.RUnlock() @@ -201,7 +214,7 @@ func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler return nil, err } - options := NewSubscribeOptions(opts...) + options := broker.NewSubscribeOptions(opts...) sub := &memorySubscriber{ exit: make(chan bool, 1), @@ -233,11 +246,11 @@ func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler return sub, nil } -func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) { +func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { m.RLock() if !m.connected { m.RUnlock() - return nil, ErrNotConnected + return nil, broker.ErrNotConnected } m.RUnlock() @@ -246,7 +259,7 @@ func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Hand return nil, err } - options := NewSubscribeOptions(opts...) + options := broker.NewSubscribeOptions(opts...) sub := &memorySubscriber{ exit: make(chan bool, 1), @@ -290,12 +303,12 @@ func (m *memoryEvent) Topic() string { return m.topic } -func (m *memoryEvent) Message() *Message { +func (m *memoryEvent) Message() *broker.Message { switch v := m.message.(type) { - case *Message: + case *broker.Message: return v case []byte: - msg := &Message{} + msg := &broker.Message{} if err := m.opts.Codec.Unmarshal(v, msg); err != nil { if m.opts.Logger.V(logger.ErrorLevel) { m.opts.Logger.Error(m.opts.Context, "[memory]: failed to unmarshal: %v", err) @@ -320,7 +333,7 @@ func (m *memoryEvent) SetError(err error) { m.err = err } -func (m *memorySubscriber) Options() SubscribeOptions { +func (m *memorySubscriber) Options() broker.SubscribeOptions { return m.opts } @@ -334,9 +347,9 @@ func (m *memorySubscriber) Unsubscribe(ctx context.Context) error { } // NewBroker return new memory broker -func NewBroker(opts ...Option) Broker { +func NewBroker(opts ...broker.Option) broker.Broker { return &memoryBroker{ - opts: NewOptions(opts...), + opts: broker.NewOptions(opts...), subscribers: make(map[string][]*memorySubscriber), } } diff --git a/broker/memory_test.go b/broker/memory/memory_test.go similarity index 88% rename from broker/memory_test.go rename to broker/memory/memory_test.go index af949ede..e558ef10 100644 --- a/broker/memory_test.go +++ b/broker/memory/memory_test.go @@ -5,6 +5,7 @@ import ( "fmt" "testing" + "go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v3/metadata" ) @@ -19,7 +20,7 @@ func TestMemoryBatchBroker(t *testing.T) { topic := "test" count := 10 - fn := func(evts Events) error { + fn := func(evts broker.Events) error { return evts.Ack() } @@ -28,9 +29,9 @@ func TestMemoryBatchBroker(t *testing.T) { t.Fatalf("Unexpected error subscribing %v", err) } - msgs := make([]*Message, 0, count) + msgs := make([]*broker.Message, 0, count) for i := 0; i < count; i++ { - message := &Message{ + message := &broker.Message{ Header: map[string]string{ metadata.HeaderTopic: topic, "foo": "bar", @@ -65,7 +66,7 @@ func TestMemoryBroker(t *testing.T) { topic := "test" count := 10 - fn := func(p Event) error { + fn := func(p broker.Event) error { return nil } @@ -74,9 +75,9 @@ func TestMemoryBroker(t *testing.T) { t.Fatalf("Unexpected error subscribing %v", err) } - msgs := make([]*Message, 0, count) + msgs := make([]*broker.Message, 0, count) for i := 0; i < count; i++ { - message := &Message{ + message := &broker.Message{ Header: map[string]string{ metadata.HeaderTopic: topic, "foo": "bar", diff --git a/broker/noop.go b/broker/noop.go new file mode 100644 index 00000000..29142673 --- /dev/null +++ b/broker/noop.go @@ -0,0 +1,82 @@ +package broker + +import ( + "context" + "strings" +) + +type NoopBroker struct { + opts Options +} + +func NewBroker(opts ...Option) *NoopBroker { + b := &NoopBroker{opts: NewOptions(opts...)} + return b +} + +func (b *NoopBroker) Name() string { + return b.opts.Name +} + +func (b *NoopBroker) String() string { + return "noop" +} + +func (b *NoopBroker) Options() Options { + return b.opts +} + +func (b *NoopBroker) Init(opts ...Option) error { + for _, opt := range opts { + opt(&b.opts) + } + return nil +} + +func (b *NoopBroker) Connect(_ context.Context) error { + return nil +} + +func (b *NoopBroker) Disconnect(_ context.Context) error { + return nil +} + +func (b *NoopBroker) Address() string { + return strings.Join(b.opts.Addrs, ",") +} + +func (b *NoopBroker) BatchPublish(_ context.Context, _ []*Message, _ ...PublishOption) error { + return nil +} + +func (b *NoopBroker) Publish(_ context.Context, _ string, _ *Message, _ ...PublishOption) error { + return nil +} + +type NoopSubscriber struct { + ctx context.Context + topic string + handler Handler + batchHandler BatchHandler + opts SubscribeOptions +} + +func (b *NoopBroker) BatchSubscribe(ctx context.Context, topic string, handler BatchHandler, opts ...SubscribeOption) (Subscriber, error) { + return &NoopSubscriber{ctx: ctx, topic: topic, opts: NewSubscribeOptions(opts...), batchHandler: handler}, nil +} + +func (b *NoopBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) { + return &NoopSubscriber{ctx: ctx, topic: topic, opts: NewSubscribeOptions(opts...), handler: handler}, nil +} + +func (s *NoopSubscriber) Options() SubscribeOptions { + return s.opts +} + +func (s *NoopSubscriber) Topic() string { + return s.topic +} + +func (s *NoopSubscriber) Unsubscribe(ctx context.Context) error { + return nil +} diff --git a/broker/options.go b/broker/options.go index 0e4a8c75..c5e91263 100644 --- a/broker/options.go +++ b/broker/options.go @@ -9,6 +9,7 @@ import ( "go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/meter" "go.unistack.org/micro/v3/register" + "go.unistack.org/micro/v3/sync" "go.unistack.org/micro/v3/tracer" ) @@ -36,17 +37,22 @@ type Options struct { Name string // Addrs holds the broker address Addrs []string + + Wait *sync.WaitGroup + + GracefulTimeout time.Duration } // NewOptions create new Options func NewOptions(opts ...Option) Options { options := Options{ - Register: register.DefaultRegister, - Logger: logger.DefaultLogger, - Context: context.Background(), - Meter: meter.DefaultMeter, - Codec: codec.DefaultCodec, - Tracer: tracer.DefaultTracer, + Register: register.DefaultRegister, + Logger: logger.DefaultLogger, + Context: context.Background(), + Meter: meter.DefaultMeter, + Codec: codec.DefaultCodec, + Tracer: tracer.DefaultTracer, + GracefulTimeout: DefaultGracefulTimeout, } for _, o := range opts { o(&options) diff --git a/cluster/cluster.go b/cluster/cluster.go new file mode 100644 index 00000000..26549e62 --- /dev/null +++ b/cluster/cluster.go @@ -0,0 +1,41 @@ +package cluster + +import ( + "context" + + "go.unistack.org/micro/v3/metadata" +) + +// Message sent to member in cluster +type Message interface { + // Header returns message headers + Header() metadata.Metadata + // Body returns broker message may be []byte slice or some go struct or interface + Body() interface{} +} + +type Node interface { + // Name returns node name + Name() string + // Address returns node address + Address() string + // Metadata returns node metadata + Metadata() metadata.Metadata +} + +// Cluster interface used for cluster communication across nodes +type Cluster interface { + // Join is used to take an existing members and performing state sync + Join(ctx context.Context, addr ...string) error + // Leave broadcast a leave message and stop listeners + Leave(ctx context.Context) error + // Ping is used to probe live status of the node + Ping(ctx context.Context, node Node, payload []byte) error + // Members returns the cluster members + Members() ([]Node, error) + // Broadcast send message for all members in cluster, if filter is not nil, nodes may be filtered + // by key/value pairs + Broadcast(ctx context.Context, msg Message, filter ...string) error + // Unicast send message to single member in cluster + Unicast(ctx context.Context, node Node, msg Message) error +} diff --git a/config/config.go b/config/config.go index ea00fd74..44c18f76 100644 --- a/config/config.go +++ b/config/config.go @@ -13,7 +13,7 @@ type Validator interface { } // DefaultConfig default config -var DefaultConfig = NewConfig() +var DefaultConfig Config = NewConfig() // DefaultWatcherMinInterval default min interval for poll changes var DefaultWatcherMinInterval = 5 * time.Second diff --git a/config/default.go b/config/default.go index 12840450..c03fb812 100644 --- a/config/default.go +++ b/config/default.go @@ -7,8 +7,8 @@ import ( "strings" "time" + "dario.cat/mergo" "github.com/google/uuid" - "github.com/imdario/mergo" mid "go.unistack.org/micro/v3/util/id" rutil "go.unistack.org/micro/v3/util/reflect" mtime "go.unistack.org/micro/v3/util/time" diff --git a/go.mod b/go.mod index 65e6a1f5..5dd808cb 100644 --- a/go.mod +++ b/go.mod @@ -1,11 +1,11 @@ module go.unistack.org/micro/v3 -go 1.19 +go 1.20 require ( + dario.cat/mergo v1.0.0 github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/google/uuid v1.3.0 - github.com/imdario/mergo v0.3.15 github.com/patrickmn/go-cache v2.1.0+incompatible github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 golang.org/x/sync v0.3.0 diff --git a/go.sum b/go.sum index 66b82504..5f3145b9 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk= +dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60= github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= @@ -7,8 +9,6 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/imdario/mergo v0.3.15 h1:M8XP7IuFNsqUx6VPK2P9OSmsYsI/YFaGil0uD21V3dM= -github.com/imdario/mergo v0.3.15/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY= github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 h1:G/FZtUu7a6NTWl3KUHMV9jkLAh/Rvtf03NWMHaEDl+E= diff --git a/logger/logger.go b/logger/logger.go index 425bedfe..71dcc2d2 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -3,12 +3,15 @@ package logger // import "go.unistack.org/micro/v3/logger" import ( "context" - "os" ) +type ContextAttrFunc func(ctx context.Context) []interface{} + +var DefaultContextAttrFuncs []ContextAttrFunc + var ( // DefaultLogger variable - DefaultLogger = NewLogger(WithLevel(ParseLevel(os.Getenv("MICRO_LOG_LEVEL")))) + DefaultLogger Logger = NewLogger() // DefaultLevel used by logger DefaultLevel = InfoLevel // DefaultCallerSkipCount used by logger @@ -57,7 +60,9 @@ type Logger interface { Log(ctx context.Context, level Level, args ...interface{}) // Logf logs message with needed level Logf(ctx context.Context, level Level, msg string, args ...interface{}) - // String returns the name of logger + // Name returns broker instance name + Name() string + // String returns the type of logger String() string } @@ -65,76 +70,106 @@ type Logger interface { type Field interface{} // Info writes msg to default logger on info level +// +// Deprecated: Dont use logger methods directly, use instance of logger to avoid additional allocations func Info(ctx context.Context, args ...interface{}) { - DefaultLogger.Info(ctx, args...) + DefaultLogger.Clone(WithCallerSkipCount(DefaultCallerSkipCount+1)).Info(ctx, args...) } // Error writes msg to default logger on error level +// +// Deprecated: Dont use logger methods directly, use instance of logger to avoid additional allocations func Error(ctx context.Context, args ...interface{}) { - DefaultLogger.Error(ctx, args...) + DefaultLogger.Clone(WithCallerSkipCount(DefaultCallerSkipCount+1)).Error(ctx, args...) } // Debug writes msg to default logger on debug level +// +// Deprecated: Dont use logger methods directly, use instance of logger to avoid additional allocations func Debug(ctx context.Context, args ...interface{}) { - DefaultLogger.Debug(ctx, args...) + DefaultLogger.Clone(WithCallerSkipCount(DefaultCallerSkipCount+1)).Debug(ctx, args...) } // Warn writes msg to default logger on warn level +// +// Deprecated: Dont use logger methods directly, use instance of logger to avoid additional allocations func Warn(ctx context.Context, args ...interface{}) { - DefaultLogger.Warn(ctx, args...) + DefaultLogger.Clone(WithCallerSkipCount(DefaultCallerSkipCount+1)).Warn(ctx, args...) } // Trace writes msg to default logger on trace level +// +// Deprecated: Dont use logger methods directly, use instance of logger to avoid additional allocations func Trace(ctx context.Context, args ...interface{}) { - DefaultLogger.Trace(ctx, args...) + DefaultLogger.Clone(WithCallerSkipCount(DefaultCallerSkipCount+1)).Trace(ctx, args...) } // Fatal writes msg to default logger on fatal level +// +// Deprecated: Dont use logger methods directly, use instance of logger to avoid additional allocations func Fatal(ctx context.Context, args ...interface{}) { - DefaultLogger.Fatal(ctx, args...) + DefaultLogger.Clone(WithCallerSkipCount(DefaultCallerSkipCount+1)).Fatal(ctx, args...) } // Infof writes formatted msg to default logger on info level +// +// Deprecated: Dont use logger methods directly, use instance of logger to avoid additional allocations func Infof(ctx context.Context, msg string, args ...interface{}) { - DefaultLogger.Infof(ctx, msg, args...) + DefaultLogger.Clone(WithCallerSkipCount(DefaultCallerSkipCount+1)).Infof(ctx, msg, args...) } // Errorf writes formatted msg to default logger on error level +// +// Deprecated: Dont use logger methods directly, use instance of logger to avoid additional allocations func Errorf(ctx context.Context, msg string, args ...interface{}) { - DefaultLogger.Errorf(ctx, msg, args...) + DefaultLogger.Clone(WithCallerSkipCount(DefaultCallerSkipCount+1)).Errorf(ctx, msg, args...) } // Debugf writes formatted msg to default logger on debug level +// +// Deprecated: Dont use logger methods directly, use instance of logger to avoid additional allocations func Debugf(ctx context.Context, msg string, args ...interface{}) { - DefaultLogger.Debugf(ctx, msg, args...) + DefaultLogger.Clone(WithCallerSkipCount(DefaultCallerSkipCount+1)).Debugf(ctx, msg, args...) } // Warnf writes formatted msg to default logger on warn level +// +// Deprecated: Dont use logger methods directly, use instance of logger to avoid additional allocations func Warnf(ctx context.Context, msg string, args ...interface{}) { - DefaultLogger.Warnf(ctx, msg, args...) + DefaultLogger.Clone(WithCallerSkipCount(DefaultCallerSkipCount+1)).Warnf(ctx, msg, args...) } // Tracef writes formatted msg to default logger on trace level +// +// Deprecated: Dont use logger methods directly, use instance of logger to avoid additional allocations func Tracef(ctx context.Context, msg string, args ...interface{}) { - DefaultLogger.Tracef(ctx, msg, args...) + DefaultLogger.Clone(WithCallerSkipCount(DefaultCallerSkipCount+1)).Tracef(ctx, msg, args...) } // Fatalf writes formatted msg to default logger on fatal level +// +// Deprecated: Dont use logger methods directly, use instance of logger to avoid additional allocations func Fatalf(ctx context.Context, msg string, args ...interface{}) { - DefaultLogger.Fatalf(ctx, msg, args...) + DefaultLogger.Clone(WithCallerSkipCount(DefaultCallerSkipCount+1)).Fatalf(ctx, msg, args...) } // V returns true if passed level enabled in default logger +// +// Deprecated: Dont use logger methods directly, use instance of logger to avoid additional allocations func V(level Level) bool { return DefaultLogger.V(level) } // Init initialize logger +// +// Deprecated: Dont use logger methods directly, use instance of logger to avoid additional allocations func Init(opts ...Option) error { return DefaultLogger.Init(opts...) } // Fields create logger with specific fields +// +// Deprecated: Dont use logger methods directly, use instance of logger to avoid additional allocations func Fields(fields ...interface{}) Logger { return DefaultLogger.Fields(fields...) } diff --git a/logger/noop.go b/logger/noop.go index 04f0515b..17f3608b 100644 --- a/logger/noop.go +++ b/logger/noop.go @@ -13,11 +13,15 @@ func NewLogger(opts ...Option) Logger { return &noopLogger{opts: options} } -func (l *noopLogger) V(lvl Level) bool { +func (l *noopLogger) V(_ Level) bool { return false } -func (l *noopLogger) Level(lvl Level) { +func (l *noopLogger) Level(_ Level) { +} + +func (l *noopLogger) Name() string { + return l.opts.Name } func (l *noopLogger) Init(opts ...Option) error { @@ -35,7 +39,7 @@ func (l *noopLogger) Clone(opts ...Option) Logger { return nl } -func (l *noopLogger) Fields(attrs ...interface{}) Logger { +func (l *noopLogger) Fields(_ ...interface{}) Logger { return l } diff --git a/logger/options.go b/logger/options.go index 895583ff..5bc96417 100644 --- a/logger/options.go +++ b/logger/options.go @@ -3,10 +3,12 @@ package logger import ( "context" "io" + "log/slog" "os" + "time" ) -// Option func +// Option func signature type Option func(*Options) // Options holds logger options @@ -15,31 +17,65 @@ type Options struct { Out io.Writer // Context holds exernal options Context context.Context - // Fields holds additional metadata - Fields []interface{} // Name holds the logger name Name string - // The logging level the logger should log - Level Level + // Fields holds additional metadata + Fields []interface{} // CallerSkipCount number of frmaes to skip CallerSkipCount int + // ContextAttrFuncs contains funcs that executed before log func on context + ContextAttrFuncs []ContextAttrFunc + // TimeKey is the key used for the time of the log call + TimeKey string + // LevelKey is the key used for the level of the log call + LevelKey string + // ErroreKey is the key used for the error of the log call + ErrorKey string + // MessageKey is the key used for the message of the log call + MessageKey string + // SourceKey is the key used for the source file and line of the log call + SourceKey string + // StacktraceKey is the key used for the stacktrace + StacktraceKey string + // AddStacktrace controls writing of stacktaces on error + AddStacktrace bool + // AddSource enabled writing source file and position in log + AddSource bool + // The logging level the logger should log + Level Level + // TimeFunc used to obtain current time + TimeFunc func() time.Time } // NewOptions creates new options struct func NewOptions(opts ...Option) Options { options := Options{ - Level: DefaultLevel, - Fields: make([]interface{}, 0, 6), - Out: os.Stderr, - CallerSkipCount: DefaultCallerSkipCount, - Context: context.Background(), + Level: DefaultLevel, + Fields: make([]interface{}, 0, 6), + Out: os.Stderr, + CallerSkipCount: DefaultCallerSkipCount, + Context: context.Background(), + ContextAttrFuncs: DefaultContextAttrFuncs, + AddSource: true, + TimeFunc: time.Now, } + + WithMicroKeys()(&options) + for _, o := range opts { o(&options) } + return options } +// WithContextAttrFuncs appends default funcs for the context arrts filler +func WithContextAttrFuncs(fncs ...ContextAttrFunc) Option { + return func(o *Options) { + o.ContextAttrFuncs = append(o.ContextAttrFuncs, fncs...) + } +} + // WithFields set default fields for the logger func WithFields(fields ...interface{}) Option { return func(o *Options) { @@ -61,6 +97,20 @@ func WithOutput(out io.Writer) Option { } } +// WitAddStacktrace controls writing stacktrace on error +func WithAddStacktrace(v bool) Option { + return func(o *Options) { + o.AddStacktrace = v + } +} + +// WitAddSource controls writing source file and pos in log +func WithAddSource(v bool) Option { + return func(o *Options) { + o.AddSource = v + } +} + // WithCallerSkipCount set frame count to skip func WithCallerSkipCount(c int) Option { return func(o *Options) { @@ -82,6 +132,57 @@ func WithName(n string) Option { } } +// WithTimeFunc sets the func to obtain current time +func WithTimeFunc(fn func() time.Time) Option { + return func(o *Options) { + o.TimeFunc = fn + } +} + +func WithZapKeys() Option { + return func(o *Options) { + o.TimeKey = "@timestamp" + o.LevelKey = "level" + o.MessageKey = "msg" + o.SourceKey = "caller" + o.StacktraceKey = "stacktrace" + o.ErrorKey = "error" + } +} + +func WithZerologKeys() Option { + return func(o *Options) { + o.TimeKey = "time" + o.LevelKey = "level" + o.MessageKey = "message" + o.SourceKey = "caller" + o.StacktraceKey = "stacktrace" + o.ErrorKey = "error" + } +} + +func WithSlogKeys() Option { + return func(o *Options) { + o.TimeKey = slog.TimeKey + o.LevelKey = slog.LevelKey + o.MessageKey = slog.MessageKey + o.SourceKey = slog.SourceKey + o.StacktraceKey = "stacktrace" + o.ErrorKey = "error" + } +} + +func WithMicroKeys() Option { + return func(o *Options) { + o.TimeKey = "timestamp" + o.LevelKey = "level" + o.MessageKey = "msg" + o.SourceKey = "caller" + o.StacktraceKey = "stacktrace" + o.ErrorKey = "error" + } +} + func WithIncCallerSkipCount(n int) Option { return func(o *Options) { o.CallerSkipCount += n diff --git a/logger/slog/options.go b/logger/slog/options.go deleted file mode 100644 index 58e277bc..00000000 --- a/logger/slog/options.go +++ /dev/null @@ -1,27 +0,0 @@ -package slog - -import "go.unistack.org/micro/v3/logger" - -type sourceKey struct{} - -func WithSourceKey(v string) logger.Option { - return logger.SetOption(sourceKey{}, v) -} - -type timeKey struct{} - -func WithTimeKey(v string) logger.Option { - return logger.SetOption(timeKey{}, v) -} - -type messageKey struct{} - -func WithMessageKey(v string) logger.Option { - return logger.SetOption(messageKey{}, v) -} - -type levelKey struct{} - -func WithLevelKey(v string) logger.Option { - return logger.SetOption(levelKey{}, v) -} diff --git a/logger/slog/slog.go b/logger/slog/slog.go index 6e12b65e..3c699cff 100644 --- a/logger/slog/slog.go +++ b/logger/slog/slog.go @@ -5,21 +5,16 @@ import ( "fmt" "log/slog" "os" + "regexp" "runtime" "strconv" "sync" - "time" "go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/tracer" ) -var ( - DefaultSourceKey string = slog.SourceKey - DefaultTimeKey string = slog.TimeKey - DefaultMessageKey string = slog.MessageKey - DefaultLevelKey string = slog.LevelKey -) +var reTrace = regexp.MustCompile(`.*/slog/logger\.go.*\n`) var ( traceValue = slog.StringValue("trace") @@ -35,15 +30,15 @@ func (s *slogLogger) renameAttr(_ []string, a slog.Attr) slog.Attr { case slog.SourceKey: source := a.Value.Any().(*slog.Source) a.Value = slog.StringValue(source.File + ":" + strconv.Itoa(source.Line)) - a.Key = s.sourceKey + a.Key = s.opts.SourceKey case slog.TimeKey: - a.Key = s.timeKey + a.Key = s.opts.TimeKey case slog.MessageKey: - a.Key = s.messageKey + a.Key = s.opts.MessageKey case slog.LevelKey: level := a.Value.Any().(slog.Level) lvl := slogToLoggerLevel(level) - a.Key = s.levelKey + a.Key = s.opts.LevelKey switch { case lvl < logger.DebugLevel: a.Value = traceValue @@ -66,56 +61,33 @@ func (s *slogLogger) renameAttr(_ []string, a slog.Attr) slog.Attr { } type slogLogger struct { - slog *slog.Logger - leveler *slog.LevelVar - levelKey string - messageKey string - sourceKey string - timeKey string - opts logger.Options - mu sync.RWMutex + leveler *slog.LevelVar + handler slog.Handler + opts logger.Options + mu sync.RWMutex } func (s *slogLogger) Clone(opts ...logger.Option) logger.Logger { s.mu.RLock() options := s.opts + s.mu.RUnlock() for _, o := range opts { o(&options) } l := &slogLogger{ - opts: options, - levelKey: s.levelKey, - messageKey: s.messageKey, - sourceKey: s.sourceKey, - timeKey: s.timeKey, - } - - if v, ok := l.opts.Context.Value(levelKey{}).(string); ok && v != "" { - l.levelKey = v - } - if v, ok := l.opts.Context.Value(messageKey{}).(string); ok && v != "" { - l.messageKey = v - } - if v, ok := l.opts.Context.Value(sourceKey{}).(string); ok && v != "" { - l.sourceKey = v - } - if v, ok := l.opts.Context.Value(timeKey{}).(string); ok && v != "" { - l.timeKey = v + opts: options, } l.leveler = new(slog.LevelVar) handleOpt := &slog.HandlerOptions{ - ReplaceAttr: s.renameAttr, + ReplaceAttr: l.renameAttr, Level: l.leveler, - AddSource: true, + AddSource: l.opts.AddSource, } l.leveler.Set(loggerToSlogLevel(l.opts.Level)) - handler := slog.NewJSONHandler(options.Out, handleOpt) - l.slog = slog.New(handler).With(options.Fields...) - - s.mu.RUnlock() + l.handler = slog.New(slog.NewJSONHandler(options.Out, handleOpt)).With(options.Fields...).Handler() return l } @@ -134,61 +106,44 @@ func (s *slogLogger) Options() logger.Options { func (s *slogLogger) Fields(attrs ...interface{}) logger.Logger { s.mu.RLock() - nl := &slogLogger{ - opts: s.opts, - levelKey: s.levelKey, - messageKey: s.messageKey, - sourceKey: s.sourceKey, - timeKey: s.timeKey, - } - nl.leveler = new(slog.LevelVar) - nl.leveler.Set(s.leveler.Level()) - - handleOpt := &slog.HandlerOptions{ - ReplaceAttr: nl.renameAttr, - Level: nl.leveler, - AddSource: true, - } - - handler := slog.NewJSONHandler(s.opts.Out, handleOpt) - nl.slog = slog.New(handler).With(attrs...) - + level := s.leveler.Level() + options := s.opts s.mu.RUnlock() - return nl + l := &slogLogger{opts: options} + l.leveler = new(slog.LevelVar) + l.leveler.Set(level) + + handleOpt := &slog.HandlerOptions{ + ReplaceAttr: l.renameAttr, + Level: l.leveler, + AddSource: l.opts.AddSource, + } + + l.handler = slog.New(slog.NewJSONHandler(l.opts.Out, handleOpt)).With(attrs...).Handler() + + return l } func (s *slogLogger) Init(opts ...logger.Option) error { s.mu.Lock() - for _, o := range opts { - o(&s.opts) + + if len(s.opts.ContextAttrFuncs) == 0 { + s.opts.ContextAttrFuncs = logger.DefaultContextAttrFuncs } - if v, ok := s.opts.Context.Value(levelKey{}).(string); ok && v != "" { - s.levelKey = v - } - if v, ok := s.opts.Context.Value(messageKey{}).(string); ok && v != "" { - s.messageKey = v - } - if v, ok := s.opts.Context.Value(sourceKey{}).(string); ok && v != "" { - s.sourceKey = v - } - if v, ok := s.opts.Context.Value(timeKey{}).(string); ok && v != "" { - s.timeKey = v + for _, o := range opts { + o(&s.opts) } s.leveler = new(slog.LevelVar) handleOpt := &slog.HandlerOptions{ ReplaceAttr: s.renameAttr, Level: s.leveler, - AddSource: true, + AddSource: s.opts.AddSource, } s.leveler.Set(loggerToSlogLevel(s.opts.Level)) - handler := slog.NewJSONHandler(s.opts.Out, handleOpt) - s.slog = slog.New(handler).With(s.opts.Fields...) - - slog.SetDefault(s.slog) - + s.handler = slog.New(slog.NewJSONHandler(s.opts.Out, handleOpt)).With(s.opts.Fields...).Handler() s.mu.Unlock() return nil @@ -200,9 +155,37 @@ func (s *slogLogger) Log(ctx context.Context, lvl logger.Level, attrs ...interfa } var pcs [1]uintptr runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, Infof] - r := slog.NewRecord(time.Now(), loggerToSlogLevel(lvl), fmt.Sprintf("%s", attrs[0]), pcs[0]) - // r.Add(attrs[1:]...) - _ = s.slog.Handler().Handle(ctx, r) + r := slog.NewRecord(s.opts.TimeFunc(), loggerToSlogLevel(lvl), fmt.Sprintf("%s", attrs[0]), pcs[0]) + for _, fn := range s.opts.ContextAttrFuncs { + attrs = append(attrs, fn(ctx)...) + } + + for idx, attr := range attrs { + if ve, ok := attr.(error); ok && ve != nil { + attrs[idx] = slog.String(s.opts.ErrorKey, ve.Error()) + break + } + } + if s.opts.AddStacktrace && lvl == logger.ErrorLevel { + stackInfo := make([]byte, 1024*1024) + if stackSize := runtime.Stack(stackInfo, false); stackSize > 0 { + traceLines := reTrace.Split(string(stackInfo[:stackSize]), -1) + if len(traceLines) != 0 { + attrs = append(attrs, slog.String(s.opts.StacktraceKey, traceLines[len(traceLines)-1])) + } + } + } + r.Add(attrs[1:]...) + r.Attrs(func(a slog.Attr) bool { + if a.Key == s.opts.ErrorKey { + if span, ok := tracer.SpanFromContext(ctx); ok { + span.SetStatus(tracer.SpanStatusError, a.Value.String()) + return false + } + } + return true + }) + _ = s.handler.Handle(ctx, r) } func (s *slogLogger) Logf(ctx context.Context, lvl logger.Level, msg string, attrs ...interface{}) { @@ -211,9 +194,37 @@ func (s *slogLogger) Logf(ctx context.Context, lvl logger.Level, msg string, att } var pcs [1]uintptr runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, Infof] - r := slog.NewRecord(time.Now(), loggerToSlogLevel(lvl), fmt.Sprintf(msg, attrs...), pcs[0]) - // r.Add(attrs...) - _ = s.slog.Handler().Handle(ctx, r) + r := slog.NewRecord(s.opts.TimeFunc(), loggerToSlogLevel(lvl), msg, pcs[0]) + for _, fn := range s.opts.ContextAttrFuncs { + attrs = append(attrs, fn(ctx)...) + } + + for idx, attr := range attrs { + if ve, ok := attr.(error); ok && ve != nil { + attrs[idx] = slog.String(s.opts.ErrorKey, ve.Error()) + break + } + } + if s.opts.AddStacktrace && lvl == logger.ErrorLevel { + stackInfo := make([]byte, 1024*1024) + if stackSize := runtime.Stack(stackInfo, false); stackSize > 0 { + traceLines := reTrace.Split(string(stackInfo[:stackSize]), -1) + if len(traceLines) != 0 { + attrs = append(attrs, (slog.String(s.opts.StacktraceKey, traceLines[len(traceLines)-1]))) + } + } + } + r.Add(attrs[1:]...) + r.Attrs(func(a slog.Attr) bool { + if a.Key == s.opts.ErrorKey { + if span, ok := tracer.SpanFromContext(ctx); ok { + span.SetStatus(tracer.SpanStatusError, a.Value.String()) + return false + } + } + return true + }) + _ = s.handler.Handle(ctx, r) } func (s *slogLogger) Info(ctx context.Context, attrs ...interface{}) { @@ -222,9 +233,19 @@ func (s *slogLogger) Info(ctx context.Context, attrs ...interface{}) { } var pcs [1]uintptr runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, Infof] - r := slog.NewRecord(time.Now(), slog.LevelInfo, fmt.Sprintf("%s", attrs[0]), pcs[0]) - // r.Add(attrs[1:]...) - _ = s.slog.Handler().Handle(ctx, r) + r := slog.NewRecord(s.opts.TimeFunc(), slog.LevelInfo, fmt.Sprintf("%s", attrs[0]), pcs[0]) + for _, fn := range s.opts.ContextAttrFuncs { + attrs = append(attrs, fn(ctx)...) + } + + for idx, attr := range attrs { + if ve, ok := attr.(error); ok && ve != nil { + attrs[idx] = slog.String(s.opts.ErrorKey, ve.Error()) + break + } + } + r.Add(attrs[1:]...) + _ = s.handler.Handle(ctx, r) } func (s *slogLogger) Infof(ctx context.Context, msg string, attrs ...interface{}) { @@ -233,9 +254,19 @@ func (s *slogLogger) Infof(ctx context.Context, msg string, attrs ...interface{} } var pcs [1]uintptr runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, Infof] - r := slog.NewRecord(time.Now(), slog.LevelInfo, fmt.Sprintf(msg, attrs...), pcs[0]) - // r.Add(attrs...) - _ = s.slog.Handler().Handle(ctx, r) + r := slog.NewRecord(s.opts.TimeFunc(), slog.LevelInfo, msg, pcs[0]) + for _, fn := range s.opts.ContextAttrFuncs { + attrs = append(attrs, fn(ctx)...) + } + + for idx, attr := range attrs { + if ve, ok := attr.(error); ok && ve != nil { + attrs[idx] = slog.String(s.opts.ErrorKey, ve.Error()) + break + } + } + r.Add(attrs...) + _ = s.handler.Handle(ctx, r) } func (s *slogLogger) Debug(ctx context.Context, attrs ...interface{}) { @@ -244,9 +275,19 @@ func (s *slogLogger) Debug(ctx context.Context, attrs ...interface{}) { } var pcs [1]uintptr runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, Infof] - r := slog.NewRecord(time.Now(), slog.LevelDebug, fmt.Sprintf("%s", attrs[0]), pcs[0]) - // r.Add(attrs[1:]...) - _ = s.slog.Handler().Handle(ctx, r) + r := slog.NewRecord(s.opts.TimeFunc(), slog.LevelDebug, fmt.Sprintf("%s", attrs[0]), pcs[0]) + for _, fn := range s.opts.ContextAttrFuncs { + attrs = append(attrs, fn(ctx)...) + } + + for idx, attr := range attrs { + if ve, ok := attr.(error); ok && ve != nil { + attrs[idx] = slog.String(s.opts.ErrorKey, ve.Error()) + break + } + } + r.Add(attrs[1:]...) + _ = s.handler.Handle(ctx, r) } func (s *slogLogger) Debugf(ctx context.Context, msg string, attrs ...interface{}) { @@ -255,9 +296,19 @@ func (s *slogLogger) Debugf(ctx context.Context, msg string, attrs ...interface{ } var pcs [1]uintptr runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, Infof] - r := slog.NewRecord(time.Now(), slog.LevelDebug, fmt.Sprintf(msg, attrs...), pcs[0]) - // r.Add(attrs...) - _ = s.slog.Handler().Handle(ctx, r) + r := slog.NewRecord(s.opts.TimeFunc(), slog.LevelDebug, msg, pcs[0]) + for _, fn := range s.opts.ContextAttrFuncs { + attrs = append(attrs, fn(ctx)...) + } + + for idx, attr := range attrs { + if ve, ok := attr.(error); ok && ve != nil { + attrs[idx] = slog.String(s.opts.ErrorKey, ve.Error()) + break + } + } + r.Add(attrs...) + _ = s.handler.Handle(ctx, r) } func (s *slogLogger) Trace(ctx context.Context, attrs ...interface{}) { @@ -266,9 +317,19 @@ func (s *slogLogger) Trace(ctx context.Context, attrs ...interface{}) { } var pcs [1]uintptr runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, Infof] - r := slog.NewRecord(time.Now(), slog.LevelDebug-1, fmt.Sprintf("%s", attrs[0]), pcs[0]) - // r.Add(attrs[1:]...) - _ = s.slog.Handler().Handle(ctx, r) + r := slog.NewRecord(s.opts.TimeFunc(), slog.LevelDebug-1, fmt.Sprintf("%s", attrs[0]), pcs[0]) + for _, fn := range s.opts.ContextAttrFuncs { + attrs = append(attrs, fn(ctx)...) + } + + for idx, attr := range attrs { + if ve, ok := attr.(error); ok && ve != nil { + attrs[idx] = slog.String(s.opts.ErrorKey, ve.Error()) + break + } + } + r.Add(attrs[1:]...) + _ = s.handler.Handle(ctx, r) } func (s *slogLogger) Tracef(ctx context.Context, msg string, attrs ...interface{}) { @@ -277,9 +338,19 @@ func (s *slogLogger) Tracef(ctx context.Context, msg string, attrs ...interface{ } var pcs [1]uintptr runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, Infof] - r := slog.NewRecord(time.Now(), slog.LevelDebug-1, fmt.Sprintf(msg, attrs...), pcs[0]) - // r.Add(attrs...) - _ = s.slog.Handler().Handle(ctx, r) + r := slog.NewRecord(s.opts.TimeFunc(), slog.LevelDebug-1, msg, pcs[0]) + for _, fn := range s.opts.ContextAttrFuncs { + attrs = append(attrs, fn(ctx)...) + } + + for idx, attr := range attrs { + if ve, ok := attr.(error); ok && ve != nil { + attrs[idx] = slog.String(s.opts.ErrorKey, ve.Error()) + break + } + } + r.Add(attrs[1:]...) + _ = s.handler.Handle(ctx, r) } func (s *slogLogger) Error(ctx context.Context, attrs ...interface{}) { @@ -288,10 +359,29 @@ func (s *slogLogger) Error(ctx context.Context, attrs ...interface{}) { } var pcs [1]uintptr runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, Infof] - r := slog.NewRecord(time.Now(), slog.LevelError, fmt.Sprintf("%s", attrs[0]), pcs[0]) - // r.Add(attrs[1:]...) + r := slog.NewRecord(s.opts.TimeFunc(), slog.LevelError, fmt.Sprintf("%s", attrs[0]), pcs[0]) + for _, fn := range s.opts.ContextAttrFuncs { + attrs = append(attrs, fn(ctx)...) + } + + for idx, attr := range attrs { + if ve, ok := attr.(error); ok && ve != nil { + attrs[idx] = slog.String(s.opts.ErrorKey, ve.Error()) + break + } + } + if s.opts.AddStacktrace { + stackInfo := make([]byte, 1024*1024) + if stackSize := runtime.Stack(stackInfo, false); stackSize > 0 { + traceLines := reTrace.Split(string(stackInfo[:stackSize]), -1) + if len(traceLines) != 0 { + attrs = append(attrs, slog.String("stacktrace", traceLines[len(traceLines)-1])) + } + } + } + r.Add(attrs[1:]...) r.Attrs(func(a slog.Attr) bool { - if a.Key == "error" { + if a.Key == s.opts.ErrorKey { if span, ok := tracer.SpanFromContext(ctx); ok { span.SetStatus(tracer.SpanStatusError, a.Value.String()) return false @@ -299,7 +389,7 @@ func (s *slogLogger) Error(ctx context.Context, attrs ...interface{}) { } return true }) - _ = s.slog.Handler().Handle(ctx, r) + _ = s.handler.Handle(ctx, r) } func (s *slogLogger) Errorf(ctx context.Context, msg string, attrs ...interface{}) { @@ -308,10 +398,29 @@ func (s *slogLogger) Errorf(ctx context.Context, msg string, attrs ...interface{ } var pcs [1]uintptr runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, Infof] - r := slog.NewRecord(time.Now(), slog.LevelError, fmt.Sprintf(msg, attrs...), pcs[0]) - // r.Add(attrs...) + r := slog.NewRecord(s.opts.TimeFunc(), slog.LevelError, msg, pcs[0]) + for _, fn := range s.opts.ContextAttrFuncs { + attrs = append(attrs, fn(ctx)...) + } + + for idx, attr := range attrs { + if ve, ok := attr.(error); ok && ve != nil { + attrs[idx] = slog.String(s.opts.ErrorKey, ve.Error()) + break + } + } + if s.opts.AddStacktrace { + stackInfo := make([]byte, 1024*1024) + if stackSize := runtime.Stack(stackInfo, false); stackSize > 0 { + traceLines := reTrace.Split(string(stackInfo[:stackSize]), -1) + if len(traceLines) != 0 { + attrs = append(attrs, slog.String("stacktrace", traceLines[len(traceLines)-1])) + } + } + } + r.Add(attrs...) r.Attrs(func(a slog.Attr) bool { - if a.Key == "error" { + if a.Key == s.opts.ErrorKey { if span, ok := tracer.SpanFromContext(ctx); ok { span.SetStatus(tracer.SpanStatusError, a.Value.String()) return false @@ -319,7 +428,7 @@ func (s *slogLogger) Errorf(ctx context.Context, msg string, attrs ...interface{ } return true }) - _ = s.slog.Handler().Handle(ctx, r) + _ = s.handler.Handle(ctx, r) } func (s *slogLogger) Fatal(ctx context.Context, attrs ...interface{}) { @@ -328,9 +437,19 @@ func (s *slogLogger) Fatal(ctx context.Context, attrs ...interface{}) { } var pcs [1]uintptr runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, Infof] - r := slog.NewRecord(time.Now(), slog.LevelError+1, fmt.Sprintf("%s", attrs[0]), pcs[0]) - // r.Add(attrs[1:]...) - _ = s.slog.Handler().Handle(ctx, r) + r := slog.NewRecord(s.opts.TimeFunc(), slog.LevelError+1, fmt.Sprintf("%s", attrs[0]), pcs[0]) + for _, fn := range s.opts.ContextAttrFuncs { + attrs = append(attrs, fn(ctx)...) + } + + for idx, attr := range attrs { + if ve, ok := attr.(error); ok && ve != nil { + attrs[idx] = slog.String(s.opts.ErrorKey, ve.Error()) + break + } + } + r.Add(attrs[1:]...) + _ = s.handler.Handle(ctx, r) os.Exit(1) } @@ -340,9 +459,19 @@ func (s *slogLogger) Fatalf(ctx context.Context, msg string, attrs ...interface{ } var pcs [1]uintptr runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, Infof] - r := slog.NewRecord(time.Now(), slog.LevelError+1, fmt.Sprintf(msg, attrs...), pcs[0]) - // r.Add(attrs...) - _ = s.slog.Handler().Handle(ctx, r) + r := slog.NewRecord(s.opts.TimeFunc(), slog.LevelError+1, msg, pcs[0]) + for _, fn := range s.opts.ContextAttrFuncs { + attrs = append(attrs, fn(ctx)...) + } + + for idx, attr := range attrs { + if ve, ok := attr.(error); ok && ve != nil { + attrs[idx] = slog.String(s.opts.ErrorKey, ve.Error()) + break + } + } + r.Add(attrs...) + _ = s.handler.Handle(ctx, r) os.Exit(1) } @@ -352,9 +481,19 @@ func (s *slogLogger) Warn(ctx context.Context, attrs ...interface{}) { } var pcs [1]uintptr runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, Infof] - r := slog.NewRecord(time.Now(), slog.LevelWarn, fmt.Sprintf("%s", attrs[0]), pcs[0]) - // r.Add(attrs[1:]...) - _ = s.slog.Handler().Handle(ctx, r) + r := slog.NewRecord(s.opts.TimeFunc(), slog.LevelWarn, fmt.Sprintf("%s", attrs[0]), pcs[0]) + for _, fn := range s.opts.ContextAttrFuncs { + attrs = append(attrs, fn(ctx)...) + } + + for idx, attr := range attrs { + if ve, ok := attr.(error); ok && ve != nil { + attrs[idx] = slog.String(s.opts.ErrorKey, ve.Error()) + break + } + } + r.Add(attrs[1:]...) + _ = s.handler.Handle(ctx, r) } func (s *slogLogger) Warnf(ctx context.Context, msg string, attrs ...interface{}) { @@ -363,9 +502,23 @@ func (s *slogLogger) Warnf(ctx context.Context, msg string, attrs ...interface{} } var pcs [1]uintptr runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, Infof] - r := slog.NewRecord(time.Now(), slog.LevelWarn, fmt.Sprintf(msg, attrs...), pcs[0]) - // r.Add(attrs...) - _ = s.slog.Handler().Handle(ctx, r) + r := slog.NewRecord(s.opts.TimeFunc(), slog.LevelWarn, msg, pcs[0]) + for _, fn := range s.opts.ContextAttrFuncs { + attrs = append(attrs, fn(ctx)...) + } + + for idx, attr := range attrs { + if ve, ok := attr.(error); ok && ve != nil { + attrs[idx] = slog.String(s.opts.ErrorKey, ve.Error()) + break + } + } + r.Add(attrs[1:]...) + _ = s.handler.Handle(ctx, r) +} + +func (s *slogLogger) Name() string { + return s.opts.Name } func (s *slogLogger) String() string { @@ -374,24 +527,9 @@ func (s *slogLogger) String() string { func NewLogger(opts ...logger.Option) logger.Logger { s := &slogLogger{ - opts: logger.NewOptions(opts...), - sourceKey: DefaultSourceKey, - timeKey: DefaultTimeKey, - messageKey: DefaultMessageKey, - levelKey: DefaultLevelKey, - } - if v, ok := s.opts.Context.Value(levelKey{}).(string); ok && v != "" { - s.levelKey = v - } - if v, ok := s.opts.Context.Value(messageKey{}).(string); ok && v != "" { - s.messageKey = v - } - if v, ok := s.opts.Context.Value(sourceKey{}).(string); ok && v != "" { - s.sourceKey = v - } - if v, ok := s.opts.Context.Value(timeKey{}).(string); ok && v != "" { - s.timeKey = v + opts: logger.NewOptions(opts...), } + return s } diff --git a/logger/slog/slog_test.go b/logger/slog/slog_test.go index a6846c6f..93c63c79 100644 --- a/logger/slog/slog_test.go +++ b/logger/slog/slog_test.go @@ -3,12 +3,47 @@ package slog import ( "bytes" "context" + "fmt" "log" "testing" "go.unistack.org/micro/v3/logger" ) +func TestError(t *testing.T) { + ctx := context.TODO() + buf := bytes.NewBuffer(nil) + l := NewLogger(logger.WithLevel(logger.ErrorLevel), logger.WithOutput(buf), logger.WithAddStacktrace(true)) + if err := l.Init(); err != nil { + t.Fatal(err) + } + + l.Error(ctx, "message", fmt.Errorf("error message")) + if !bytes.Contains(buf.Bytes(), []byte(`"stacktrace":"`)) { + t.Fatalf("logger stacktrace not works, buf contains: %s", buf.Bytes()) + } + if !bytes.Contains(buf.Bytes(), []byte(`"error":"`)) { + t.Fatalf("logger error not works, buf contains: %s", buf.Bytes()) + } +} + +func TestErrorf(t *testing.T) { + ctx := context.TODO() + buf := bytes.NewBuffer(nil) + l := NewLogger(logger.WithLevel(logger.ErrorLevel), logger.WithOutput(buf), logger.WithAddStacktrace(true)) + if err := l.Init(); err != nil { + t.Fatal(err) + } + + l.Errorf(ctx, "message", fmt.Errorf("error message")) + if !bytes.Contains(buf.Bytes(), []byte(`"stacktrace":"`)) { + t.Fatalf("logger stacktrace not works, buf contains: %s", buf.Bytes()) + } + if !bytes.Contains(buf.Bytes(), []byte(`"error":"`)) { + t.Fatalf("logger error not works, buf contains: %s", buf.Bytes()) + } +} + func TestContext(t *testing.T) { ctx := context.TODO() buf := bytes.NewBuffer(nil) diff --git a/metadata/metadata.go b/metadata/metadata.go index 4718f420..bf7a8d3a 100644 --- a/metadata/metadata.go +++ b/metadata/metadata.go @@ -98,11 +98,12 @@ func (md Metadata) Del(keys ...string) { } // Copy makes a copy of the metadata -func Copy(md Metadata) Metadata { +func Copy(md Metadata, exclude ...string) Metadata { nmd := New(len(md)) for key, val := range md { nmd.Set(key, val) } + nmd.Del(exclude...) return nmd } diff --git a/metadata/metadata_test.go b/metadata/metadata_test.go index eccea7a7..a5381ede 100644 --- a/metadata/metadata_test.go +++ b/metadata/metadata_test.go @@ -190,3 +190,14 @@ func TestMetadataContext(t *testing.T) { t.Errorf("Expected metadata length 1 got %d", i) } } + +func TestCopy(t *testing.T) { + md := New(2) + md.Set("key1", "val1", "key2", "val2") + nmd := Copy(md, "key2") + if len(nmd) != 1 { + t.Fatal("Copy exclude not works") + } else if nmd["Key1"] != "val1" { + t.Fatal("Copy exclude not works") + } +} diff --git a/meter/meter.go b/meter/meter.go index 71da6914..887b8236 100644 --- a/meter/meter.go +++ b/meter/meter.go @@ -1,5 +1,5 @@ // Package meter is for instrumentation -package meter // import "go.unistack.org/micro/v3/meter" +package meter import ( "io" @@ -11,7 +11,7 @@ import ( var ( // DefaultMeter is the default meter - DefaultMeter = NewMeter() + DefaultMeter Meter = NewMeter() // DefaultAddress data will be made available on this host:port DefaultAddress = ":9090" // DefaultPath the meter endpoint where the Meter data will be made available diff --git a/micro.go b/micro.go new file mode 100644 index 00000000..20295601 --- /dev/null +++ b/micro.go @@ -0,0 +1,94 @@ +package micro + +import ( + "reflect" + + "go.unistack.org/micro/v3/broker" + "go.unistack.org/micro/v3/client" + "go.unistack.org/micro/v3/codec" + "go.unistack.org/micro/v3/flow" + "go.unistack.org/micro/v3/fsm" + "go.unistack.org/micro/v3/logger" + "go.unistack.org/micro/v3/meter" + "go.unistack.org/micro/v3/register" + "go.unistack.org/micro/v3/resolver" + "go.unistack.org/micro/v3/router" + "go.unistack.org/micro/v3/selector" + "go.unistack.org/micro/v3/server" + "go.unistack.org/micro/v3/store" + "go.unistack.org/micro/v3/sync" + "go.unistack.org/micro/v3/tracer" +) + +func As(b any, target any) bool { + if b == nil { + return false + } + if target == nil { + return false + } + val := reflect.ValueOf(target) + typ := val.Type() + if typ.Kind() != reflect.Ptr || val.IsNil() { + return false + } + targetType := typ.Elem() + if targetType.Kind() != reflect.Interface { + switch { + case targetType.Implements(brokerType): + break + case targetType.Implements(loggerType): + break + case targetType.Implements(clientType): + break + case targetType.Implements(serverType): + break + case targetType.Implements(codecType): + break + case targetType.Implements(flowType): + break + case targetType.Implements(fsmType): + break + case targetType.Implements(meterType): + break + case targetType.Implements(registerType): + break + case targetType.Implements(resolverType): + break + case targetType.Implements(selectorType): + break + case targetType.Implements(storeType): + break + case targetType.Implements(syncType): + break + case targetType.Implements(serviceType): + break + case targetType.Implements(routerType): + break + default: + return false + } + } + if reflect.TypeOf(b).AssignableTo(targetType) { + val.Elem().Set(reflect.ValueOf(b)) + return true + } + return false +} + +var brokerType = reflect.TypeOf((*broker.Broker)(nil)).Elem() +var loggerType = reflect.TypeOf((*logger.Logger)(nil)).Elem() +var clientType = reflect.TypeOf((*client.Client)(nil)).Elem() +var serverType = reflect.TypeOf((*server.Server)(nil)).Elem() +var codecType = reflect.TypeOf((*codec.Codec)(nil)).Elem() +var flowType = reflect.TypeOf((*flow.Flow)(nil)).Elem() +var fsmType = reflect.TypeOf((*fsm.FSM)(nil)).Elem() +var meterType = reflect.TypeOf((*meter.Meter)(nil)).Elem() +var registerType = reflect.TypeOf((*register.Register)(nil)).Elem() +var resolverType = reflect.TypeOf((*resolver.Resolver)(nil)).Elem() +var routerType = reflect.TypeOf((*router.Router)(nil)).Elem() +var selectorType = reflect.TypeOf((*selector.Selector)(nil)).Elem() +var storeType = reflect.TypeOf((*store.Store)(nil)).Elem() +var syncType = reflect.TypeOf((*sync.Sync)(nil)).Elem() +var tracerType = reflect.TypeOf((*tracer.Tracer)(nil)).Elem() +var serviceType = reflect.TypeOf((*Service)(nil)).Elem() diff --git a/micro_test.go b/micro_test.go new file mode 100644 index 00000000..d2a20121 --- /dev/null +++ b/micro_test.go @@ -0,0 +1,115 @@ +package micro + +import ( + "context" + "fmt" + "reflect" + "testing" + + "go.unistack.org/micro/v3/broker" + "go.unistack.org/micro/v3/fsm" +) + +func TestAs(t *testing.T) { + var b *bro + broTarget := &bro{name: "kafka"} + fsmTarget := &fsmT{name: "fsm"} + + testCases := []struct { + b any + target any + match bool + want any + }{ + { + broTarget, + &b, + true, + broTarget, + }, + { + nil, + &b, + false, + nil, + }, + { + fsmTarget, + &b, + false, + nil, + }, + } + for i, tc := range testCases { + name := fmt.Sprintf("%d:As(Errorf(..., %v), %v)", i, tc.b, tc.target) + // Clear the target pointer, in case it was set in a previous test. + rtarget := reflect.ValueOf(tc.target) + rtarget.Elem().Set(reflect.Zero(reflect.TypeOf(tc.target).Elem())) + t.Run(name, func(t *testing.T) { + match := As(tc.b, tc.target) + if match != tc.match { + t.Fatalf("match: got %v; want %v", match, tc.match) + } + if !match { + return + } + if got := rtarget.Elem().Interface(); got != tc.want { + t.Fatalf("got %#v, want %#v", got, tc.want) + } + }) + } +} + +type bro struct { + name string +} + +func (p *bro) Name() string { return p.name } + +func (p *bro) Init(opts ...broker.Option) error { return nil } + +// Options returns broker options +func (p *bro) Options() broker.Options { return broker.Options{} } + +// Address return configured address +func (p *bro) Address() string { return "" } + +// Connect connects to broker +func (p *bro) Connect(ctx context.Context) error { return nil } + +// Disconnect disconnect from broker +func (p *bro) Disconnect(ctx context.Context) error { return nil } + +// Publish message, msg can be single broker.Message or []broker.Message +func (p *bro) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error { + return nil +} + +// BatchPublish messages to broker with multiple topics +func (p *bro) BatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { + return nil +} + +// BatchSubscribe subscribes to topic messages via handler +func (p *bro) BatchSubscribe(ctx context.Context, topic string, h broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { + return nil, nil +} + +// Subscribe subscribes to topic message via handler +func (p *bro) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { + return nil, nil +} + +// String type of broker +func (p *bro) String() string { return p.name } + +type fsmT struct { + name string +} + +func (f *fsmT) Start(ctx context.Context, a interface{}, o ...Option) (interface{}, error) { + return nil, nil +} +func (f *fsmT) Current() string { return f.name } +func (f *fsmT) Reset() {} +func (f *fsmT) State(s string, sf fsm.StateFunc) {} diff --git a/register/memory.go b/register/memory/memory.go similarity index 77% rename from register/memory.go rename to register/memory/memory.go index 85e8b468..b674bddd 100644 --- a/register/memory.go +++ b/register/memory/memory.go @@ -6,6 +6,7 @@ import ( "time" "go.unistack.org/micro/v3/logger" + "go.unistack.org/micro/v3/register" "go.unistack.org/micro/v3/util/id" ) @@ -16,7 +17,7 @@ var ( type node struct { LastSeen time.Time - *Node + *register.Node TTL time.Duration } @@ -25,23 +26,23 @@ type record struct { Version string Metadata map[string]string Nodes map[string]*node - Endpoints []*Endpoint + Endpoints []*register.Endpoint } type memory struct { sync.RWMutex records map[string]services watchers map[string]*watcher - opts Options + opts register.Options } // services is a KV map with service name as the key and a map of records as the value type services map[string]map[string]*record // NewRegister returns an initialized in-memory register -func NewRegister(opts ...Option) Register { +func NewRegister(opts ...register.Option) register.Register { r := &memory{ - opts: NewOptions(opts...), + opts: register.NewOptions(opts...), records: make(map[string]services), watchers: make(map[string]*watcher), } @@ -75,7 +76,7 @@ func (m *memory) ttlPrune() { } } -func (m *memory) sendEvent(r *Result) { +func (m *memory) sendEvent(r *register.Result) { m.RLock() watchers := make([]*watcher, 0, len(m.watchers)) for _, w := range m.watchers { @@ -106,7 +107,7 @@ func (m *memory) Disconnect(ctx context.Context) error { return nil } -func (m *memory) Init(opts ...Option) error { +func (m *memory) Init(opts ...register.Option) error { for _, o := range opts { o(&m.opts) } @@ -118,15 +119,15 @@ func (m *memory) Init(opts ...Option) error { return nil } -func (m *memory) Options() Options { +func (m *memory) Options() register.Options { return m.opts } -func (m *memory) Register(ctx context.Context, s *Service, opts ...RegisterOption) error { +func (m *memory) Register(ctx context.Context, s *register.Service, opts ...register.RegisterOption) error { m.Lock() defer m.Unlock() - options := NewRegisterOptions(opts...) + options := register.NewRegisterOptions(opts...) // get the services for this domain from the register srvs, ok := m.records[options.Domain] @@ -153,7 +154,7 @@ func (m *memory) Register(ctx context.Context, s *Service, opts ...RegisterOptio m.opts.Logger.Debugf(m.opts.Context, "Register added new service: %s, version: %s", s.Name, s.Version) } m.records[options.Domain] = srvs - go m.sendEvent(&Result{Action: "create", Service: s}) + go m.sendEvent(®ister.Result{Action: "create", Service: s}) } var addedNodes bool @@ -176,7 +177,7 @@ func (m *memory) Register(ctx context.Context, s *Service, opts ...RegisterOptio // add the node srvs[s.Name][s.Version].Nodes[n.ID] = &node{ - Node: &Node{ + Node: ®ister.Node{ ID: n.ID, Address: n.Address, Metadata: metadata, @@ -192,7 +193,7 @@ func (m *memory) Register(ctx context.Context, s *Service, opts ...RegisterOptio if m.opts.Logger.V(logger.DebugLevel) { m.opts.Logger.Debugf(m.opts.Context, "Register added new node to service: %s, version: %s", s.Name, s.Version) } - go m.sendEvent(&Result{Action: "update", Service: s}) + go m.sendEvent(®ister.Result{Action: "update", Service: s}) } else { // refresh TTL and timestamp for _, n := range s.Nodes { @@ -208,11 +209,11 @@ func (m *memory) Register(ctx context.Context, s *Service, opts ...RegisterOptio return nil } -func (m *memory) Deregister(ctx context.Context, s *Service, opts ...DeregisterOption) error { +func (m *memory) Deregister(ctx context.Context, s *register.Service, opts ...register.DeregisterOption) error { m.Lock() defer m.Unlock() - options := NewDeregisterOptions(opts...) + options := register.NewDeregisterOptions(opts...) // domain is set in metadata so it can be passed to watchers if s.Metadata == nil { @@ -252,7 +253,7 @@ func (m *memory) Deregister(ctx context.Context, s *Service, opts ...DeregisterO // is cleanup if len(version.Nodes) > 0 { m.records[options.Domain][s.Name][s.Version] = version - go m.sendEvent(&Result{Action: "update", Service: s}) + go m.sendEvent(®ister.Result{Action: "update", Service: s}) return nil } @@ -260,7 +261,7 @@ func (m *memory) Deregister(ctx context.Context, s *Service, opts ...DeregisterO // register and exit if len(versions) == 1 { delete(m.records[options.Domain], s.Name) - go m.sendEvent(&Result{Action: "delete", Service: s}) + go m.sendEvent(®ister.Result{Action: "delete", Service: s}) if m.opts.Logger.V(logger.DebugLevel) { m.opts.Logger.Debugf(m.opts.Context, "Register removed service: %s", s.Name) @@ -270,7 +271,7 @@ func (m *memory) Deregister(ctx context.Context, s *Service, opts ...DeregisterO // there are other versions of the service running, so only remove this version of it delete(m.records[options.Domain][s.Name], s.Version) - go m.sendEvent(&Result{Action: "delete", Service: s}) + go m.sendEvent(®ister.Result{Action: "delete", Service: s}) if m.opts.Logger.V(logger.DebugLevel) { m.opts.Logger.Debugf(m.opts.Context, "Register removed service: %s, version: %s", s.Name, s.Version) } @@ -278,20 +279,20 @@ func (m *memory) Deregister(ctx context.Context, s *Service, opts ...DeregisterO return nil } -func (m *memory) LookupService(ctx context.Context, name string, opts ...LookupOption) ([]*Service, error) { - options := NewLookupOptions(opts...) +func (m *memory) LookupService(ctx context.Context, name string, opts ...register.LookupOption) ([]*register.Service, error) { + options := register.NewLookupOptions(opts...) // if it's a wildcard domain, return from all domains - if options.Domain == WildcardDomain { + if options.Domain == register.WildcardDomain { m.RLock() recs := m.records m.RUnlock() - var services []*Service + var services []*register.Service for domain := range recs { - srvs, err := m.LookupService(ctx, name, append(opts, LookupDomain(domain))...) - if err == ErrNotFound { + srvs, err := m.LookupService(ctx, name, append(opts, register.LookupDomain(domain))...) + if err == register.ErrNotFound { continue } else if err != nil { return nil, err @@ -300,7 +301,7 @@ func (m *memory) LookupService(ctx context.Context, name string, opts ...LookupO } if len(services) == 0 { - return nil, ErrNotFound + return nil, register.ErrNotFound } return services, nil } @@ -311,17 +312,17 @@ func (m *memory) LookupService(ctx context.Context, name string, opts ...LookupO // check the domain exists services, ok := m.records[options.Domain] if !ok { - return nil, ErrNotFound + return nil, register.ErrNotFound } // check the service exists versions, ok := services[name] if !ok || len(versions) == 0 { - return nil, ErrNotFound + return nil, register.ErrNotFound } // serialize the response - result := make([]*Service, len(versions)) + result := make([]*register.Service, len(versions)) var i int @@ -333,19 +334,19 @@ func (m *memory) LookupService(ctx context.Context, name string, opts ...LookupO return result, nil } -func (m *memory) ListServices(ctx context.Context, opts ...ListOption) ([]*Service, error) { - options := NewListOptions(opts...) +func (m *memory) ListServices(ctx context.Context, opts ...register.ListOption) ([]*register.Service, error) { + options := register.NewListOptions(opts...) // if it's a wildcard domain, list from all domains - if options.Domain == WildcardDomain { + if options.Domain == register.WildcardDomain { m.RLock() recs := m.records m.RUnlock() - var services []*Service + var services []*register.Service for domain := range recs { - srvs, err := m.ListServices(ctx, append(opts, ListDomain(domain))...) + srvs, err := m.ListServices(ctx, append(opts, register.ListDomain(domain))...) if err != nil { return nil, err } @@ -361,11 +362,11 @@ func (m *memory) ListServices(ctx context.Context, opts ...ListOption) ([]*Servi // ensure the domain exists services, ok := m.records[options.Domain] if !ok { - return make([]*Service, 0), nil + return make([]*register.Service, 0), nil } // serialize the result, each version counts as an individual service - var result []*Service + var result []*register.Service for _, service := range services { for _, version := range service { @@ -376,16 +377,16 @@ func (m *memory) ListServices(ctx context.Context, opts ...ListOption) ([]*Servi return result, nil } -func (m *memory) Watch(ctx context.Context, opts ...WatchOption) (Watcher, error) { +func (m *memory) Watch(ctx context.Context, opts ...register.WatchOption) (register.Watcher, error) { id, err := id.New() if err != nil { return nil, err } - wo := NewWatchOptions(opts...) + wo := register.NewWatchOptions(opts...) // construct the watcher w := &watcher{ exit: make(chan bool), - res: make(chan *Result), + res: make(chan *register.Result), id: id, wo: wo, } @@ -406,13 +407,13 @@ func (m *memory) String() string { } type watcher struct { - res chan *Result + res chan *register.Result exit chan bool - wo WatchOptions + wo register.WatchOptions id string } -func (m *watcher) Next() (*Result, error) { +func (m *watcher) Next() (*register.Result, error) { for { select { case r := <-m.res: @@ -429,15 +430,15 @@ func (m *watcher) Next() (*Result, error) { if r.Service.Metadata != nil && len(r.Service.Metadata["domain"]) > 0 { domain = r.Service.Metadata["domain"] } else { - domain = DefaultDomain + domain = register.DefaultDomain } // only send the event if watching the wildcard or this specific domain - if m.wo.Domain == WildcardDomain || m.wo.Domain == domain { + if m.wo.Domain == register.WildcardDomain || m.wo.Domain == domain { return r, nil } case <-m.exit: - return nil, ErrWatcherStopped + return nil, register.ErrWatcherStopped } } } @@ -451,7 +452,7 @@ func (m *watcher) Stop() { } } -func serviceToRecord(s *Service, ttl time.Duration) *record { +func serviceToRecord(s *register.Service, ttl time.Duration) *record { metadata := make(map[string]string, len(s.Metadata)) for k, v := range s.Metadata { metadata[k] = v @@ -466,7 +467,7 @@ func serviceToRecord(s *Service, ttl time.Duration) *record { } } - endpoints := make([]*Endpoint, len(s.Endpoints)) + endpoints := make([]*register.Endpoint, len(s.Endpoints)) for i, e := range s.Endpoints { endpoints[i] = e } @@ -480,7 +481,7 @@ func serviceToRecord(s *Service, ttl time.Duration) *record { } } -func recordToService(r *record, domain string) *Service { +func recordToService(r *record, domain string) *register.Service { metadata := make(map[string]string, len(r.Metadata)) for k, v := range r.Metadata { metadata[k] = v @@ -489,14 +490,14 @@ func recordToService(r *record, domain string) *Service { // set the domain in metadata so it can be determined when a wildcard query is performed metadata["domain"] = domain - endpoints := make([]*Endpoint, len(r.Endpoints)) + endpoints := make([]*register.Endpoint, len(r.Endpoints)) for i, e := range r.Endpoints { md := make(map[string]string, len(e.Metadata)) for k, v := range e.Metadata { md[k] = v } - endpoints[i] = &Endpoint{ + endpoints[i] = ®ister.Endpoint{ Name: e.Name, Request: e.Request, Response: e.Response, @@ -504,7 +505,7 @@ func recordToService(r *record, domain string) *Service { } } - nodes := make([]*Node, len(r.Nodes)) + nodes := make([]*register.Node, len(r.Nodes)) i := 0 for _, n := range r.Nodes { md := make(map[string]string, len(n.Metadata)) @@ -512,7 +513,7 @@ func recordToService(r *record, domain string) *Service { md[k] = v } - nodes[i] = &Node{ + nodes[i] = ®ister.Node{ ID: n.ID, Address: n.Address, Metadata: md, @@ -520,7 +521,7 @@ func recordToService(r *record, domain string) *Service { i++ } - return &Service{ + return ®ister.Service{ Name: r.Name, Version: r.Version, Metadata: metadata, diff --git a/register/memory_test.go b/register/memory/memory_test.go similarity index 82% rename from register/memory_test.go rename to register/memory/memory_test.go index 75b2c798..2210b3f6 100644 --- a/register/memory_test.go +++ b/register/memory/memory_test.go @@ -6,14 +6,16 @@ import ( "sync" "testing" "time" + + "go.unistack.org/micro/v3/register" ) -var testData = map[string][]*Service{ +var testData = map[string][]*register.Service{ "foo": { { Name: "foo", Version: "1.0.0", - Nodes: []*Node{ + Nodes: []*register.Node{ { ID: "foo-1.0.0-123", Address: "localhost:9999", @@ -27,7 +29,7 @@ var testData = map[string][]*Service{ { Name: "foo", Version: "1.0.1", - Nodes: []*Node{ + Nodes: []*register.Node{ { ID: "foo-1.0.1-321", Address: "localhost:6666", @@ -37,7 +39,7 @@ var testData = map[string][]*Service{ { Name: "foo", Version: "1.0.3", - Nodes: []*Node{ + Nodes: []*register.Node{ { ID: "foo-1.0.3-345", Address: "localhost:8888", @@ -49,7 +51,7 @@ var testData = map[string][]*Service{ { Name: "bar", Version: "default", - Nodes: []*Node{ + Nodes: []*register.Node{ { ID: "bar-1.0.0-123", Address: "localhost:9999", @@ -63,7 +65,7 @@ var testData = map[string][]*Service{ { Name: "bar", Version: "latest", - Nodes: []*Node{ + Nodes: []*register.Node{ { ID: "bar-1.0.1-321", Address: "localhost:6666", @@ -78,7 +80,7 @@ func TestMemoryRegistry(t *testing.T) { ctx := context.TODO() m := NewRegister() - fn := func(k string, v []*Service) { + fn := func(k string, v []*register.Service) { services, err := m.LookupService(ctx, k) if err != nil { t.Errorf("Unexpected error getting service %s: %v", k, err) @@ -155,8 +157,8 @@ func TestMemoryRegistry(t *testing.T) { for _, v := range testData { for _, service := range v { services, err := m.LookupService(ctx, service.Name) - if err != ErrNotFound { - t.Errorf("Expected error: %v, got: %v", ErrNotFound, err) + if err != register.ErrNotFound { + t.Errorf("Expected error: %v, got: %v", register.ErrNotFound, err) } if len(services) != 0 { t.Errorf("Expected %d services for %s, got %d", 0, service.Name, len(services)) @@ -171,7 +173,7 @@ func TestMemoryRegistryTTL(t *testing.T) { for _, v := range testData { for _, service := range v { - if err := m.Register(ctx, service, RegisterTTL(time.Millisecond)); err != nil { + if err := m.Register(ctx, service, register.RegisterTTL(time.Millisecond)); err != nil { t.Fatal(err) } } @@ -200,7 +202,7 @@ func TestMemoryRegistryTTLConcurrent(t *testing.T) { ctx := context.TODO() for _, v := range testData { for _, service := range v { - if err := m.Register(ctx, service, RegisterTTL(waitTime/2)); err != nil { + if err := m.Register(ctx, service, register.RegisterTTL(waitTime/2)); err != nil { t.Fatal(err) } } @@ -249,34 +251,34 @@ func TestMemoryWildcard(t *testing.T) { m := NewRegister() ctx := context.TODO() - testSrv := &Service{Name: "foo", Version: "1.0.0"} + testSrv := ®ister.Service{Name: "foo", Version: "1.0.0"} - if err := m.Register(ctx, testSrv, RegisterDomain("one")); err != nil { + if err := m.Register(ctx, testSrv, register.RegisterDomain("one")); err != nil { t.Fatalf("Register err: %v", err) } - if err := m.Register(ctx, testSrv, RegisterDomain("two")); err != nil { + if err := m.Register(ctx, testSrv, register.RegisterDomain("two")); err != nil { t.Fatalf("Register err: %v", err) } - if recs, err := m.ListServices(ctx, ListDomain("one")); err != nil { + if recs, err := m.ListServices(ctx, register.ListDomain("one")); err != nil { t.Errorf("List err: %v", err) } else if len(recs) != 1 { t.Errorf("Expected 1 record, got %v", len(recs)) } - if recs, err := m.ListServices(ctx, ListDomain("*")); err != nil { + if recs, err := m.ListServices(ctx, register.ListDomain("*")); err != nil { t.Errorf("List err: %v", err) } else if len(recs) != 2 { t.Errorf("Expected 2 records, got %v", len(recs)) } - if recs, err := m.LookupService(ctx, testSrv.Name, LookupDomain("one")); err != nil { + if recs, err := m.LookupService(ctx, testSrv.Name, register.LookupDomain("one")); err != nil { t.Errorf("Lookup err: %v", err) } else if len(recs) != 1 { t.Errorf("Expected 1 record, got %v", len(recs)) } - if recs, err := m.LookupService(ctx, testSrv.Name, LookupDomain("*")); err != nil { + if recs, err := m.LookupService(ctx, testSrv.Name, register.LookupDomain("*")); err != nil { t.Errorf("Lookup err: %v", err) } else if len(recs) != 2 { t.Errorf("Expected 2 records, got %v", len(recs)) @@ -284,7 +286,7 @@ func TestMemoryWildcard(t *testing.T) { } func TestWatcher(t *testing.T) { - testSrv := &Service{Name: "foo", Version: "1.0.0"} + testSrv := ®ister.Service{Name: "foo", Version: "1.0.0"} ctx := context.TODO() m := NewRegister() diff --git a/register/noop.go b/register/noop.go new file mode 100644 index 00000000..4fb692d3 --- /dev/null +++ b/register/noop.go @@ -0,0 +1,72 @@ +package register + +import "context" + +type noop struct { + opts Options +} + +func NewRegister(opts ...Option) Register { + return &noop{ + opts: NewOptions(opts...), + } +} + +func (n *noop) Name() string { + return n.opts.Name +} + +func (n *noop) Init(opts ...Option) error { + for _, o := range opts { + o(&n.opts) + } + return nil +} + +func (n *noop) Options() Options { + return n.opts +} + +func (n *noop) Connect(ctx context.Context) error { + return nil +} + +func (n *noop) Disconnect(ctx context.Context) error { + return nil +} + +func (n *noop) Register(ctx context.Context, service *Service, option ...RegisterOption) error { + return nil +} + +func (n *noop) Deregister(ctx context.Context, service *Service, option ...DeregisterOption) error { + return nil +} + +func (n *noop) LookupService(ctx context.Context, s string, option ...LookupOption) ([]*Service, error) { + return nil, nil +} + +func (n *noop) ListServices(ctx context.Context, option ...ListOption) ([]*Service, error) { + return nil, nil +} + +func (n *noop) Watch(ctx context.Context, opts ...WatchOption) (Watcher, error) { + wOpts := NewWatchOptions(opts...) + + return &watcher{wo: wOpts}, nil +} + +func (n *noop) String() string { + return "noop" +} + +type watcher struct { + wo WatchOptions +} + +func (m *watcher) Next() (*Result, error) { + return nil, nil +} + +func (m *watcher) Stop() {} diff --git a/register/register.go b/register/register.go index 30055da1..bfe0078a 100644 --- a/register/register.go +++ b/register/register.go @@ -18,7 +18,7 @@ var DefaultDomain = "micro" var ( // DefaultRegister is the global default register - DefaultRegister = NewRegister() + DefaultRegister Register = NewRegister() // ErrNotFound returned when LookupService is called and no services found ErrNotFound = errors.New("service not found") // ErrWatcherStopped returned when when watcher is stopped diff --git a/resolver/resolver.go b/resolver/resolver.go index 7ba1f6cf..1e3e91e6 100644 --- a/resolver/resolver.go +++ b/resolver/resolver.go @@ -1,5 +1,5 @@ // Package resolver resolves network names to addresses -package resolver // import "go.unistack.org/micro/v3/resolver" +package resolver // Resolver is network resolver. It's used to find network nodes // via the name to connect to. This is done based on Network.Name(). diff --git a/router/router.go b/router/router.go index 2509b7c5..43874196 100644 --- a/router/router.go +++ b/router/router.go @@ -7,7 +7,7 @@ import ( var ( // DefaultRouter is the global default router - DefaultRouter = NewRouter() + DefaultRouter Router = NewRouter() // DefaultNetwork is default micro network DefaultNetwork = "micro" // ErrRouteNotFound is returned when no route was found in the routing table diff --git a/semconv/broker.go b/semconv/broker.go new file mode 100644 index 00000000..4c949225 --- /dev/null +++ b/semconv/broker.go @@ -0,0 +1,22 @@ +package semconv + +var ( + // PublishMessageDurationSeconds specifies meter metric name + PublishMessageDurationSeconds = "publish_message_duration_seconds" + // PublishMessageLatencyMicroseconds specifies meter metric name + PublishMessageLatencyMicroseconds = "publish_message_latency_microseconds" + // PublishMessageTotal specifies meter metric name + PublishMessageTotal = "publish_message_total" + // PublishMessageInflight specifies meter metric name + PublishMessageInflight = "publish_message_inflight" + // SubscribeMessageDurationSeconds specifies meter metric name + SubscribeMessageDurationSeconds = "subscribe_message_duration_seconds" + // SubscribeMessageLatencyMicroseconds specifies meter metric name + SubscribeMessageLatencyMicroseconds = "subscribe_message_latency_microseconds" + // SubscribeMessageTotal specifies meter metric name + SubscribeMessageTotal = "subscribe_message_total" + // SubscribeMessageInflight specifies meter metric name + SubscribeMessageInflight = "subscribe_message_inflight" + // BrokerGroupLag specifies broker lag + BrokerGroupLag = "broker_group_lag" +) diff --git a/semconv/cache.go b/semconv/cache.go new file mode 100644 index 00000000..f12739c1 --- /dev/null +++ b/semconv/cache.go @@ -0,0 +1,12 @@ +package semconv + +var ( + // CacheRequestDurationSeconds specifies meter metric name + CacheRequestDurationSeconds = "cache_request_duration_seconds" + // ClientRequestLatencyMicroseconds specifies meter metric name + CacheRequestLatencyMicroseconds = "cache_request_latency_microseconds" + // CacheRequestTotal specifies meter metric name + CacheRequestTotal = "cache_request_total" + // CacheRequestInflight specifies meter metric name + CacheRequestInflight = "cache_request_inflight" +) diff --git a/semconv/client.go b/semconv/client.go new file mode 100644 index 00000000..96671471 --- /dev/null +++ b/semconv/client.go @@ -0,0 +1,12 @@ +package semconv + +var ( + // ClientRequestDurationSeconds specifies meter metric name + ClientRequestDurationSeconds = "client_request_duration_seconds" + // ClientRequestLatencyMicroseconds specifies meter metric name + ClientRequestLatencyMicroseconds = "client_request_latency_microseconds" + // ClientRequestTotal specifies meter metric name + ClientRequestTotal = "client_request_total" + // ClientRequestInflight specifies meter metric name + ClientRequestInflight = "client_request_inflight" +) diff --git a/semconv/server.go b/semconv/server.go new file mode 100644 index 00000000..fedb7074 --- /dev/null +++ b/semconv/server.go @@ -0,0 +1,12 @@ +package semconv + +var ( + // ServerRequestDurationSeconds specifies meter metric name + ServerRequestDurationSeconds = "server_request_duration_seconds" + // ServerRequestLatencyMicroseconds specifies meter metric name + ServerRequestLatencyMicroseconds = "server_request_latency_microseconds" + // ServerRequestTotal specifies meter metric name + ServerRequestTotal = "server_request_total" + // ServerRequestInflight specifies meter metric name + ServerRequestInflight = "server_request_inflight" +) diff --git a/server/options.go b/server/options.go index 8ab8c0d2..e320ff86 100644 --- a/server/options.go +++ b/server/options.go @@ -15,6 +15,7 @@ import ( "go.unistack.org/micro/v3/network/transport" "go.unistack.org/micro/v3/options" "go.unistack.org/micro/v3/register" + msync "go.unistack.org/micro/v3/sync" "go.unistack.org/micro/v3/tracer" "go.unistack.org/micro/v3/util/id" ) @@ -47,7 +48,7 @@ type Options struct { // Listener may be passed if already created Listener net.Listener // Wait group - Wait *sync.WaitGroup + Wait *msync.WaitGroup // TLSConfig specifies tls.Config for secure serving TLSConfig *tls.Config // Metadata holds the server metadata @@ -86,6 +87,8 @@ type Options struct { DeregisterAttempts int // Hooks may contains SubscriberWrapper, HandlerWrapper or Server func wrapper Hooks options.Hooks + // GracefulTimeout timeout for graceful stop server + GracefulTimeout time.Duration } // NewOptions returns new options struct with default or passed values @@ -108,6 +111,7 @@ func NewOptions(opts ...Option) Options { Version: DefaultVersion, ID: id.Must(), Namespace: DefaultNamespace, + GracefulTimeout: DefaultGracefulTimeout, } for _, o := range opts { @@ -279,7 +283,7 @@ func Wait(wg *sync.WaitGroup) Option { if wg == nil { wg = new(sync.WaitGroup) } - o.Wait = wg + o.Wait = msync.WrapWaitGroup(wg) } } @@ -321,6 +325,13 @@ func Listener(l net.Listener) Option { // HandlerOption func type HandlerOption func(*HandlerOptions) +// GracefulTimeout duration +func GracefulTimeout(td time.Duration) Option { + return func(o *Options) { + o.GracefulTimeout = td + } +} + // HandlerOptions struct type HandlerOptions struct { // Context holds external options diff --git a/server/server.go b/server/server.go index ef710d79..f6ea3956 100644 --- a/server/server.go +++ b/server/server.go @@ -11,7 +11,7 @@ import ( ) // DefaultServer default server -var DefaultServer = NewServer() +var DefaultServer Server = NewServer() var ( // DefaultAddress will be used if no address passed, use secure localhost @@ -34,6 +34,8 @@ var ( DefaultMaxMsgRecvSize = 1024 * 1024 * 4 // 4Mb // DefaultMaxMsgSendSize holds default max send size DefaultMaxMsgSendSize = 1024 * 1024 * 4 // 4Mb + // DefaultGracefulTimeout default time for graceful stop + DefaultGracefulTimeout = 5 * time.Second ) // Server is a simple micro server abstraction diff --git a/service.go b/service.go index 693f8d40..22bcb74e 100644 --- a/service.go +++ b/service.go @@ -72,8 +72,8 @@ func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...se } type service struct { - sync.RWMutex opts Options + sync.RWMutex } // NewService creates and returns a new Service based on the packages within. @@ -376,71 +376,15 @@ func (s *service) Run() error { return s.Stop() } -func getNameIndex(n string, ifaces interface{}) int { - switch values := ifaces.(type) { - case []router.Router: - for idx, iface := range values { - if iface.Name() == n { - return idx - } - } - case []register.Register: - for idx, iface := range values { - if iface.Name() == n { - return idx - } - } - case []store.Store: - for idx, iface := range values { - if iface.Name() == n { - return idx - } - } - case []tracer.Tracer: - for idx, iface := range values { - if iface.Name() == n { - return idx - } - } - case []server.Server: - for idx, iface := range values { - if iface.Name() == n { - return idx - } - } - case []config.Config: - for idx, iface := range values { - if iface.Name() == n { - return idx - } - } - case []meter.Meter: - for idx, iface := range values { - if iface.Name() == n { - return idx - } - } - case []broker.Broker: - for idx, iface := range values { - if iface.Name() == n { - return idx - } - } - case []client.Client: - for idx, iface := range values { - if iface.Name() == n { - return idx - } - } - /* - case []logger.Logger: - for idx, iface := range values { - if iface.Name() == n { - return idx - } - } - */ - } +type Namer interface { + Name() string +} +func getNameIndex[T Namer](n string, ifaces []T) int { + for idx, iface := range ifaces { + if iface.Name() == n { + return idx + } + } return 0 } diff --git a/service_test.go b/service_test.go index 1a23de60..b6bca118 100644 --- a/service_test.go +++ b/service_test.go @@ -22,13 +22,14 @@ func TestClient(t *testing.T) { c2 := client.NewClient(client.Name("test2")) svc := NewService(Client(c1, c2)) + if err := svc.Init(); err != nil { t.Fatal(err) } x1 := svc.Client("test2") if x1.Name() != "test2" { - t.Fatal("invalid client") + t.Fatalf("invalid client %#+v", svc.Options().Clients) } } @@ -40,12 +41,11 @@ func (ti *testItem) Name() string { return ti.name } -func TestGetNameIndex(t *testing.T) { - item1 := &testItem{name: "first"} - item2 := &testItem{name: "second"} - items := []interface{}{item1, item2} - if idx := getNameIndex("second", items); idx != 1 { - t.Fatalf("getNameIndex func error, item not found") +func Test_getNameIndex(t *testing.T) { + items := []*testItem{{name: "test1"}, {name: "test2"}} + idx := getNameIndex("test2", items) + if items[idx].Name() != "test2" { + t.Fatal("getNameIndex wrong") } } diff --git a/store/options.go b/store/options.go index 586969c3..6bbee1a4 100644 --- a/store/options.go +++ b/store/options.go @@ -144,6 +144,10 @@ type ReadOptions struct { Context context.Context // Namespace holds namespace Namespace string + // Name holds mnemonic name + Name string + // Timeout specifies max timeout for operation + Timeout time.Duration } // NewReadOptions fills ReadOptions struct with opts slice @@ -158,6 +162,20 @@ func NewReadOptions(opts ...ReadOption) ReadOptions { // ReadOption sets values in ReadOptions type ReadOption func(r *ReadOptions) +// ReadTimeout pass timeout to ReadOptions +func ReadTimeout(td time.Duration) ReadOption { + return func(o *ReadOptions) { + o.Timeout = td + } +} + +// ReadName pass name to ReadOptions +func ReadName(name string) ReadOption { + return func(o *ReadOptions) { + o.Name = name + } +} + // ReadContext pass context.Context to ReadOptions func ReadContext(ctx context.Context) ReadOption { return func(o *ReadOptions) { @@ -180,6 +198,10 @@ type WriteOptions struct { Metadata metadata.Metadata // Namespace holds namespace Namespace string + // Name holds mnemonic name + Name string + // Timeout specifies max timeout for operation + Timeout time.Duration // TTL specifies key TTL TTL time.Duration } @@ -224,12 +246,30 @@ func WriteNamespace(ns string) WriteOption { } } +// WriteName pass name to WriteOptions +func WriteName(name string) WriteOption { + return func(o *WriteOptions) { + o.Name = name + } +} + +// WriteTimeout pass timeout to WriteOptions +func WriteTimeout(td time.Duration) WriteOption { + return func(o *WriteOptions) { + o.Timeout = td + } +} + // DeleteOptions configures an individual Delete operation type DeleteOptions struct { // Context holds external options Context context.Context // Namespace holds namespace Namespace string + // Name holds mnemonic name + Name string + // Timeout specifies max timeout for operation + Timeout time.Duration } // NewDeleteOptions fills DeleteOptions struct with opts slice @@ -258,14 +298,32 @@ func DeleteNamespace(ns string) DeleteOption { } } +// DeleteName pass name to DeleteOptions +func DeleteName(name string) DeleteOption { + return func(o *DeleteOptions) { + o.Name = name + } +} + +// DeleteTimeout pass timeout to DeleteOptions +func DeleteTimeout(td time.Duration) DeleteOption { + return func(o *DeleteOptions) { + o.Timeout = td + } +} + // ListOptions configures an individual List operation type ListOptions struct { Context context.Context Prefix string Suffix string Namespace string - Limit uint - Offset uint + // Name holds mnemonic name + Name string + Limit uint + Offset uint + // Timeout specifies max timeout for operation + Timeout time.Duration } // NewListOptions fills ListOptions struct with opts slice @@ -322,12 +380,23 @@ func ListNamespace(ns string) ListOption { } } +// ListTimeout pass timeout to ListOptions +func ListTimeout(td time.Duration) ListOption { + return func(o *ListOptions) { + o.Timeout = td + } +} + // ExistsOptions holds options for Exists method type ExistsOptions struct { // Context holds external options Context context.Context // Namespace contains namespace Namespace string + // Name holds mnemonic name + Name string + // Timeout specifies max timeout for operation + Timeout time.Duration } // ExistsOption specifies Exists call options @@ -358,6 +427,20 @@ func ExistsNamespace(ns string) ExistsOption { } } +// ExistsName pass name to exist options +func ExistsName(name string) ExistsOption { + return func(o *ExistsOptions) { + o.Name = name + } +} + +// ExistsTimeout timeout to ListOptions +func ExistsTimeout(td time.Duration) ExistsOption { + return func(o *ExistsOptions) { + o.Timeout = td + } +} + /* // WrapStore adds a store Wrapper to a list of options passed into the store func WrapStore(w Wrapper) Option { diff --git a/store/store.go b/store/store.go index 251c2ba0..896c1350 100644 --- a/store/store.go +++ b/store/store.go @@ -12,7 +12,7 @@ var ( // ErrInvalidKey is returned when a key has empty or have invalid format ErrInvalidKey = errors.New("invalid key") // DefaultStore is the global default store - DefaultStore = NewStore() + DefaultStore Store = NewStore() // DefaultSeparator is the gloabal default key parts separator DefaultSeparator = "/" ) diff --git a/sync/waitgroup.go b/sync/waitgroup.go new file mode 100644 index 00000000..3124d948 --- /dev/null +++ b/sync/waitgroup.go @@ -0,0 +1,69 @@ +package sync + +import ( + "context" + "sync" +) + +type WaitGroup struct { + wg *sync.WaitGroup + c int + mu sync.Mutex +} + +func WrapWaitGroup(wg *sync.WaitGroup) *WaitGroup { + g := &WaitGroup{ + wg: wg, + } + return g +} + +func NewWaitGroup() *WaitGroup { + var wg sync.WaitGroup + return WrapWaitGroup(&wg) +} + +func (g *WaitGroup) Add(n int) { + g.mu.Lock() + g.c += n + g.wg.Add(n) + g.mu.Unlock() +} + +func (g *WaitGroup) Done() { + g.mu.Lock() + g.c += -1 + g.wg.Add(-1) + g.mu.Unlock() +} + +func (g *WaitGroup) Wait() { + g.wg.Wait() +} + +func (g *WaitGroup) WaitContext(ctx context.Context) { + done := make(chan struct{}) + go func() { + g.wg.Wait() + close(done) + }() + + select { + case <-ctx.Done(): + g.mu.Lock() + g.wg.Add(-g.c) + <-done + g.wg.Add(g.c) + g.mu.Unlock() + return + case <-done: + return + } +} + +func (g *WaitGroup) Waiters() int { + g.mu.Lock() + c := g.c + g.mu.Unlock() + return c +} diff --git a/sync/waitgroup_test.go b/sync/waitgroup_test.go new file mode 100644 index 00000000..c3f6f1b7 --- /dev/null +++ b/sync/waitgroup_test.go @@ -0,0 +1,37 @@ +package sync + +import ( + "context" + "testing" + "time" +) + +func TestWaitGroupContext(t *testing.T) { + wg := NewWaitGroup() + _ = t + wg.Add(1) + ctx, cancel := context.WithTimeout(context.TODO(), 1*time.Second) + defer cancel() + wg.WaitContext(ctx) +} + +func TestWaitGroupReuse(t *testing.T) { + wg := NewWaitGroup() + defer func() { + if wg.Waiters() != 0 { + t.Fatal("lost goroutines") + } + }() + + wg.Add(1) + defer wg.Done() + ctx, cancel := context.WithTimeout(context.TODO(), 1*time.Second) + defer cancel() + wg.WaitContext(ctx) + + wg.Add(1) + defer wg.Done() + ctx, cancel = context.WithTimeout(context.TODO(), 1*time.Second) + defer cancel() + wg.WaitContext(ctx) +} diff --git a/tracer/memory/memory.go b/tracer/memory/memory.go new file mode 100644 index 00000000..4216d863 --- /dev/null +++ b/tracer/memory/memory.go @@ -0,0 +1,139 @@ +package memory + +import ( + "context" + "time" + + "go.unistack.org/micro/v3/tracer" + "go.unistack.org/micro/v3/util/id" +) + +var _ tracer.Tracer = (*Tracer)(nil) + +type Tracer struct { + opts tracer.Options + spans []tracer.Span +} + +func (t *Tracer) Spans() []tracer.Span { + return t.spans +} + +func (t *Tracer) Start(ctx context.Context, name string, opts ...tracer.SpanOption) (context.Context, tracer.Span) { + options := tracer.NewSpanOptions(opts...) + span := &Span{ + name: name, + ctx: ctx, + tracer: t, + kind: options.Kind, + startTime: time.Now(), + } + span.spanID.s, _ = id.New() + span.traceID.s, _ = id.New() + if span.ctx == nil { + span.ctx = context.Background() + } + t.spans = append(t.spans, span) + return tracer.NewSpanContext(ctx, span), span +} + +func (t *Tracer) Flush(_ context.Context) error { + return nil +} + +func (t *Tracer) Init(opts ...tracer.Option) error { + for _, o := range opts { + o(&t.opts) + } + return nil +} + +func (t *Tracer) Name() string { + return t.opts.Name +} + +type noopStringer struct { + s string +} + +func (s noopStringer) String() string { + return s.s +} + +type Span struct { + ctx context.Context + tracer tracer.Tracer + name string + statusMsg string + startTime time.Time + finishTime time.Time + traceID noopStringer + spanID noopStringer + events []*Event + labels []interface{} + logs []interface{} + kind tracer.SpanKind + status tracer.SpanStatus +} + +func (s *Span) Finish(_ ...tracer.SpanOption) { + s.finishTime = time.Now() +} + +func (s *Span) Context() context.Context { + return s.ctx +} + +func (s *Span) Tracer() tracer.Tracer { + return s.tracer +} + +type Event struct { + name string + labels []interface{} +} + +func (s *Span) AddEvent(name string, opts ...tracer.EventOption) { + options := tracer.NewEventOptions(opts...) + s.events = append(s.events, &Event{name: name, labels: options.Labels}) +} + +func (s *Span) SetName(name string) { + s.name = name +} + +func (s *Span) AddLogs(kv ...interface{}) { + s.logs = append(s.logs, kv...) +} + +func (s *Span) AddLabels(kv ...interface{}) { + s.labels = append(s.labels, kv...) +} + +func (s *Span) Kind() tracer.SpanKind { + return s.kind +} + +func (s *Span) TraceID() string { + return s.traceID.String() +} + +func (s *Span) SpanID() string { + return s.spanID.String() +} + +func (s *Span) Status() (tracer.SpanStatus, string) { + return s.status, s.statusMsg +} + +func (s *Span) SetStatus(st tracer.SpanStatus, msg string) { + s.status = st + s.statusMsg = msg +} + +// NewTracer returns new memory tracer +func NewTracer(opts ...tracer.Option) *Tracer { + return &Tracer{ + opts: tracer.NewOptions(opts...), + } +} diff --git a/tracer/memory/memory_test.go b/tracer/memory/memory_test.go new file mode 100644 index 00000000..7a6dddd6 --- /dev/null +++ b/tracer/memory/memory_test.go @@ -0,0 +1,38 @@ +package memory + +import ( + "bytes" + "context" + "fmt" + "strings" + "testing" + + "go.unistack.org/micro/v3/logger" + "go.unistack.org/micro/v3/logger/slog" + "go.unistack.org/micro/v3/tracer" +) + +func TestLoggerWithTracer(t *testing.T) { + ctx := context.TODO() + buf := bytes.NewBuffer(nil) + logger.DefaultLogger = slog.NewLogger(logger.WithOutput(buf)) + + if err := logger.Init(); err != nil { + t.Fatal(err) + } + var span tracer.Span + tr := NewTracer() + ctx, span = tr.Start(ctx, "test1") + + logger.Error(ctx, "my test error", fmt.Errorf("error")) + + if !strings.Contains(buf.String(), span.TraceID()) { + t.Fatalf("log does not contains trace id: %s", buf.Bytes()) + } + + _, _ = tr.Start(ctx, "test2") + + for _, s := range tr.Spans() { + _ = s + } +} diff --git a/tracer/noop.go b/tracer/noop.go index 8548dbe8..de4756cc 100644 --- a/tracer/noop.go +++ b/tracer/noop.go @@ -2,6 +2,8 @@ package tracer import ( "context" + + "go.unistack.org/micro/v3/util/id" ) var _ Tracer = (*noopTracer)(nil) @@ -24,6 +26,8 @@ func (t *noopTracer) Start(ctx context.Context, name string, opts ...SpanOption) labels: options.Labels, kind: options.Kind, } + span.spanID.s, _ = id.New() + span.traceID.s, _ = id.New() if span.ctx == nil { span.ctx = context.Background() } @@ -31,6 +35,14 @@ func (t *noopTracer) Start(ctx context.Context, name string, opts ...SpanOption) return NewSpanContext(ctx, span), span } +type noopStringer struct { + s string +} + +func (s noopStringer) String() string { + return s.s +} + func (t *noopTracer) Flush(ctx context.Context) error { return nil } @@ -56,6 +68,8 @@ type noopSpan struct { tracer Tracer name string statusMsg string + traceID noopStringer + spanID noopStringer events []*noopEvent labels []interface{} logs []interface{} @@ -63,7 +77,15 @@ type noopSpan struct { status SpanStatus } -func (s *noopSpan) Finish(opts ...SpanOption) { +func (s *noopSpan) TraceID() string { + return s.traceID.String() +} + +func (s *noopSpan) SpanID() string { + return s.spanID.String() +} + +func (s *noopSpan) Finish(_ ...SpanOption) { } func (s *noopSpan) Context() context.Context { diff --git a/tracer/options.go b/tracer/options.go index 4c1c0f74..a243f03e 100644 --- a/tracer/options.go +++ b/tracer/options.go @@ -100,13 +100,13 @@ type EventOption func(o *EventOptions) func WithEventLabels(kv ...interface{}) EventOption { return func(o *EventOptions) { - o.Labels = kv + o.Labels = append(o.Labels, kv...) } } func WithSpanLabels(kv ...interface{}) SpanOption { return func(o *SpanOptions) { - o.Labels = kv + o.Labels = append(o.Labels, kv...) } } @@ -159,7 +159,8 @@ func NewSpanOptions(opts ...SpanOption) SpanOptions { // NewOptions returns default options func NewOptions(opts ...Option) Options { options := Options{ - Logger: logger.DefaultLogger, + Logger: logger.DefaultLogger, + Context: context.Background(), } for _, o := range opts { o(&options) diff --git a/tracer/tracer.go b/tracer/tracer.go index fffd1cf4..caf56960 100644 --- a/tracer/tracer.go +++ b/tracer/tracer.go @@ -3,12 +3,32 @@ package tracer // import "go.unistack.org/micro/v3/tracer" import ( "context" - "fmt" - "sort" + + "go.unistack.org/micro/v3/logger" ) // DefaultTracer is the global default tracer -var DefaultTracer = NewTracer() +var DefaultTracer Tracer = NewTracer() + +var ( + // TraceIDKey is the key used for the trace id in the log call + TraceIDKey = "trace-id" + // SpanIDKey is the key used for the span id in the log call + SpanIDKey = "span-id" +) + +func init() { + logger.DefaultContextAttrFuncs = append(logger.DefaultContextAttrFuncs, + func(ctx context.Context) []interface{} { + if span, ok := SpanFromContext(ctx); ok { + return []interface{}{ + TraceIDKey, span.TraceID(), + SpanIDKey, span.SpanID(), + } + } + return nil + }) +} // Tracer is an interface for distributed tracing type Tracer interface { @@ -43,38 +63,8 @@ type Span interface { AddLogs(kv ...interface{}) // Kind returns span kind Kind() SpanKind -} - -// sort labels alphabeticaly by label name -type byKey []interface{} - -func (k byKey) Len() int { return len(k) / 2 } -func (k byKey) Less(i, j int) bool { return fmt.Sprintf("%s", k[i*2]) < fmt.Sprintf("%s", k[j*2]) } -func (k byKey) Swap(i, j int) { - k[i*2], k[j*2] = k[j*2], k[i*2] - k[i*2+1], k[j*2+1] = k[j*2+1], k[i*2+1] -} - -func UniqLabels(labels []interface{}) []interface{} { - if len(labels)%2 == 1 { - labels = labels[:len(labels)-1] - } - - if len(labels) > 2 { - sort.Sort(byKey(labels)) - - idx := 0 - for { - if labels[idx] == labels[idx+2] { - copy(labels[idx:], labels[idx+2:]) - labels = labels[:len(labels)-2] - } else { - idx += 2 - } - if idx+2 >= len(labels) { - break - } - } - } - return labels + // TraceID returns trace id + TraceID() string + // SpanID returns span id + SpanID() string } diff --git a/util/reflect/reflect.go b/util/reflect/reflect.go index 87479554..f0cda32c 100644 --- a/util/reflect/reflect.go +++ b/util/reflect/reflect.go @@ -1,6 +1,7 @@ -package reflect // import "go.unistack.org/micro/v3/util/reflect" +package reflect import ( + "encoding/json" "errors" "fmt" "reflect" @@ -45,15 +46,23 @@ func SliceAppend(b bool) Option { // Merge merges map[string]interface{} to destination struct func Merge(dst interface{}, mp map[string]interface{}, opts ...Option) error { - var err error - var sval reflect.Value - var fname string - options := Options{} for _, o := range opts { o(&options) } + if unmarshaler, ok := dst.(json.Unmarshaler); ok { + buf, err := json.Marshal(mp) + if err == nil { + err = unmarshaler.UnmarshalJSON(buf) + } + return err + } + + var err error + var sval reflect.Value + var fname string + dviface := reflect.ValueOf(dst) if dviface.Kind() == reflect.Ptr { dviface = dviface.Elem() diff --git a/util/sort/sort.go b/util/sort/sort.go new file mode 100644 index 00000000..d80794b1 --- /dev/null +++ b/util/sort/sort.go @@ -0,0 +1,40 @@ +package sort + +import ( + "fmt" + "sort" +) + +// sort labels alphabeticaly by label name +type byKey []interface{} + +func (k byKey) Len() int { return len(k) / 2 } +func (k byKey) Less(i, j int) bool { return fmt.Sprintf("%s", k[i*2]) < fmt.Sprintf("%s", k[j*2]) } +func (k byKey) Swap(i, j int) { + k[i*2], k[j*2] = k[j*2], k[i*2] + k[i*2+1], k[j*2+1] = k[j*2+1], k[i*2+1] +} + +func Uniq(labels []interface{}) []interface{} { + if len(labels)%2 == 1 { + labels = labels[:len(labels)-1] + } + + if len(labels) > 2 { + sort.Sort(byKey(labels)) + + idx := 0 + for { + if labels[idx] == labels[idx+2] { + copy(labels[idx:], labels[idx+2:]) + labels = labels[:len(labels)-2] + } else { + idx += 2 + } + if idx+2 >= len(labels) { + break + } + } + } + return labels +} diff --git a/util/time/duration_test.go b/util/time/duration_test.go index 80b7289c..12447510 100644 --- a/util/time/duration_test.go +++ b/util/time/duration_test.go @@ -35,8 +35,8 @@ func TestUnmarshalJSON(t *testing.T) { err = json.Unmarshal([]byte(`{"ttl":"1y"}`), v) if err != nil { t.Fatal(err) - } else if v.TTL != 31536000000000000 { - t.Fatalf("invalid duration %v != 31536000000000000", v.TTL) + } else if v.TTL != 31622400000000000 { + t.Fatalf("invalid duration %v != 31622400000000000", v.TTL) } } @@ -55,7 +55,7 @@ func TestParseDuration(t *testing.T) { if err != nil { t.Fatalf("ParseDuration error: %v", err) } - if td.String() != "8760h0m0s" { - t.Fatalf("ParseDuration 1y != 8760h0m0s : %s", td.String()) + if td.String() != "8784h0m0s" { + t.Fatalf("ParseDuration 1y != 8784h0m0s : %s", td.String()) } } diff --git a/util/xpool/pool.go b/util/xpool/pool.go new file mode 100644 index 00000000..1ffe4293 --- /dev/null +++ b/util/xpool/pool.go @@ -0,0 +1,25 @@ +package pool + +import "sync" + +type Pool[T any] struct { + p *sync.Pool +} + +func NewPool[T any](fn func() T) Pool[T] { + return Pool[T]{ + p: &sync.Pool{ + New: func() interface{} { + return fn() + }, + }, + } +} + +func (p Pool[T]) Get() T { + return p.p.Get().(T) +} + +func (p Pool[T]) Put(t T) { + p.p.Put(t) +} diff --git a/util/xpool/pool_test.go b/util/xpool/pool_test.go new file mode 100644 index 00000000..8e7a9b81 --- /dev/null +++ b/util/xpool/pool_test.go @@ -0,0 +1,27 @@ +package pool + +import ( + "bytes" + "strings" + "testing" +) + +func TestBytes(t *testing.T) { + p := NewPool(func() *bytes.Buffer { return bytes.NewBuffer(nil) }) + b := p.Get() + b.Write([]byte(`test`)) + if b.String() != "test" { + t.Fatal("pool not works") + } + p.Put(b) +} + +func TestStrings(t *testing.T) { + p := NewPool(func() *strings.Builder { return &strings.Builder{} }) + b := p.Get() + b.Write([]byte(`test`)) + if b.String() != "test" { + t.Fatal("pool not works") + } + p.Put(b) +}