diff --git a/.gitea/workflows/autoupdate.yml b/.gitea/workflows/autoupdate.yml index f67ff748..9244941d 100644 --- a/.gitea/workflows/autoupdate.yml +++ b/.gitea/workflows/autoupdate.yml @@ -5,8 +5,8 @@ on: - 'master' - 'v3' schedule: - - cron: '* * * * *' - #- cron: '@hourly' + #- cron: '* * * * *' + - cron: '@hourly' jobs: autoupdate: diff --git a/.gitea/workflows/coverage.yml b/.gitea/workflows/coverage.yml new file mode 100644 index 00000000..47db9227 --- /dev/null +++ b/.gitea/workflows/coverage.yml @@ -0,0 +1,30 @@ +name: Go + +on: + push: + branches: [ master, v3 ] + pull_request: + branches: [ master, v3 ] + workflow_dispatch: + +jobs: + test: + runs-on: ubuntu-latest + steps: + - name: checkout + uses: actions/checkout@v4 + + - name: setup + uses: actions/setup-go@v4 + with: + go-version: stable + + - name: coverage + run: go test -v -coverprofile coverage.out ./... + + - name: badge + uses: ncruces/go-coverage-report@main + with: + coverage-file: coverage.out + reuse-go: true + amend: true \ No newline at end of file diff --git a/.gitea/workflows/pr.yml b/.gitea/workflows/pr.yml index 520b5350..8c58ac3c 100644 --- a/.gitea/workflows/pr.yml +++ b/.gitea/workflows/pr.yml @@ -20,4 +20,4 @@ jobs: - name: test env: INTEGRATION_TESTS: yes - run: go test -mod readonly -v ./... \ No newline at end of file + run: go test -v -mod readonly -race -coverprofile=coverage.txt -covermode=atomic ./... \ No newline at end of file diff --git a/.gitignore b/.gitignore index 500d68ca..c2fff381 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,8 @@ # Develop tools /.vscode/ /.idea/ +.idea +.vscode # Binaries for programs and plugins *.exe @@ -13,6 +15,7 @@ _obj _test _build +.DS_Store # Architecture specific extensions/prefixes *.[568vq] diff --git a/.golangci.yml b/.golangci.yml index 524fc7f8..6c81c4a9 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,6 +1,5 @@ run: concurrency: 4 - deadline: 5m issues-exit-code: 1 tests: true @@ -13,15 +12,13 @@ linters-settings: linters: enable: - govet - - deadcode - errcheck - govet - ineffassign - staticcheck - - structcheck - typecheck - unused - - varcheck + - spancheck - bodyclose - gci - goconst @@ -41,4 +38,5 @@ linters: - prealloc - unconvert - unparam + - unused disable-all: false diff --git a/broker/broker.go b/broker/broker.go index 53099b7a..7e8dd9ae 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -4,6 +4,7 @@ package broker // import "go.unistack.org/micro/v4/broker" import ( "context" "errors" + "time" "go.unistack.org/micro/v4/metadata" "go.unistack.org/micro/v4/options" @@ -19,6 +20,8 @@ var ( ErrDisconnected = errors.New("broker disconnected") // ErrInvalidMessage returns when message has nvalid format ErrInvalidMessage = errors.New("broker message has invalid format") + // DefaultGracefulTimeout + DefaultGracefulTimeout = 5 * time.Second ) // Broker is an interface used for asynchronous messaging. diff --git a/broker/memory_test.go b/broker/memory_test.go index d8bc0a9c..aa8e8211 100644 --- a/broker/memory_test.go +++ b/broker/memory_test.go @@ -37,10 +37,10 @@ func TestMemoryBatchBroker(t *testing.T) { msgs := make([]Message, 0, count) for i := 0; i < count; i++ { message := &memoryMessage{ - header: map[string]string{ - metadata.HeaderTopic: topic, - "foo": "bar", - "id": fmt.Sprintf("%d", i), + header: metadata.Metadata{ + metadata.HeaderTopic: []string{topic}, + "foo": []string{"bar"}, + "id": []string{fmt.Sprintf("%d", i)}, }, body: []byte(`"hello world"`), } @@ -83,10 +83,10 @@ func TestMemoryBroker(t *testing.T) { msgs := make([]Message, 0, count) for i := 0; i < count; i++ { message := &memoryMessage{ - header: map[string]string{ - metadata.HeaderTopic: topic, - "foo": "bar", - "id": fmt.Sprintf("%d", i), + header: metadata.Metadata{ + metadata.HeaderTopic: []string{topic}, + "foo": []string{"bar"}, + "id": []string{fmt.Sprintf("%d", i)}, }, body: []byte(`"hello world"`), } diff --git a/broker/options.go b/broker/options.go index baa72ee9..aa3a7272 100644 --- a/broker/options.go +++ b/broker/options.go @@ -11,6 +11,7 @@ import ( "go.unistack.org/micro/v4/meter" "go.unistack.org/micro/v4/options" "go.unistack.org/micro/v4/register" + "go.unistack.org/micro/v4/sync" "go.unistack.org/micro/v4/tracer" ) @@ -36,36 +37,41 @@ type Options struct { Name string // Address holds the broker address Address []string + + Wait *sync.WaitGroup + + GracefulTimeout time.Duration } // NewOptions create new Options func NewOptions(opts ...options.Option) Options { - options := Options{ - Register: register.DefaultRegister, - Logger: logger.DefaultLogger, - Context: context.Background(), - Meter: meter.DefaultMeter, - Codecs: make(map[string]codec.Codec), - Tracer: tracer.DefaultTracer, + newOpts := Options{ + Register: register.DefaultRegister, + Logger: logger.DefaultLogger, + Context: context.Background(), + Meter: meter.DefaultMeter, + Codecs: make(map[string]codec.Codec), + Tracer: tracer.DefaultTracer, + GracefulTimeout: DefaultGracefulTimeout, } for _, o := range opts { - o(&options) + o(&newOpts) } - return options + return newOpts } // PublishOptions struct type PublishOptions struct { // Context holds external options Context context.Context - // BodyOnly flag says the message contains raw body bytes - BodyOnly bool // Message metadata usually passed as message headers Metadata metadata.Metadata // Content-Type of message for marshal ContentType string // Topic destination Topic string + // BodyOnly flag says the message contains raw body bytes + BodyOnly bool } // NewPublishOptions creates PublishOptions struct diff --git a/broker/subscriber.go b/broker/subscriber.go index c330c022..e30c6cce 100644 --- a/broker/subscriber.go +++ b/broker/subscriber.go @@ -19,8 +19,8 @@ var typeOfError = reflect.TypeOf((*error)(nil)).Elem() // Is this an exported - upper case - name? func isExported(name string) bool { - rune, _ := utf8.DecodeRuneInString(name) - return unicode.IsUpper(rune) + r, _ := utf8.DecodeRuneInString(name) + return unicode.IsUpper(r) } // Is this type exported or a builtin? diff --git a/database/dsn.go b/database/dsn.go index 8e6627a8..d4b158fc 100644 --- a/database/dsn.go +++ b/database/dsn.go @@ -75,78 +75,80 @@ func ParseDSN(dsn string) (*Config, error) { // Find last '/' that goes before dbname foundSlash := false for i := len(dsn) - 1; i >= 0; i-- { - if dsn[i] == '/' { - foundSlash = true - var j, k int + if dsn[i] != '/' { + continue + } - // left part is empty if i <= 0 - if i > 0 { - // Find the first ':' in dsn - for j = i; j >= 0; j-- { - if dsn[j] == ':' { - cfg.Scheme = dsn[0:j] - } + foundSlash = true + var j, k int + + // left part is empty if i <= 0 + if i > 0 { + // Find the first ':' in dsn + for j = i; j >= 0; j-- { + if dsn[j] == ':' { + cfg.Scheme = dsn[0:j] } - - // [username[:password]@][host] - // Find the last '@' in dsn[:i] - for j = i; j >= 0; j-- { - if dsn[j] == '@' { - // username[:password] - // Find the second ':' in dsn[:j] - for k = 0; k < j; k++ { - if dsn[k] == ':' { - if cfg.Scheme == dsn[:k] { - continue - } - var err error - cfg.Password, err = url.PathUnescape(dsn[k+1 : j]) - if err != nil { - return nil, err - } - break - } - } - cfg.Username = dsn[len(cfg.Scheme)+3 : k] - break - } - } - - for k = j + 1; k < i; k++ { - if dsn[k] == ':' { - cfg.Host = dsn[j+1 : k] - cfg.Port = dsn[k+1 : i] - break - } - } - } - // dbname[?param1=value1&...¶mN=valueN] - // Find the first '?' in dsn[i+1:] - for j = i + 1; j < len(dsn); j++ { - if dsn[j] == '?' { - parts := strings.Split(dsn[j+1:], "&") - cfg.Params = make([]string, 0, len(parts)*2) - for _, p := range parts { - k, v, found := strings.Cut(p, "=") - if !found { - continue + // [username[:password]@][host] + // Find the last '@' in dsn[:i] + for j = i; j >= 0; j-- { + if dsn[j] == '@' { + // username[:password] + // Find the second ':' in dsn[:j] + for k = 0; k < j; k++ { + if dsn[k] == ':' { + if cfg.Scheme == dsn[:k] { + continue + } + var err error + cfg.Password, err = url.PathUnescape(dsn[k+1 : j]) + if err != nil { + return nil, err + } + break } - cfg.Params = append(cfg.Params, k, v) } - + cfg.Username = dsn[len(cfg.Scheme)+3 : k] break } } - var err error - dbname := dsn[i+1 : j] - if cfg.Database, err = url.PathUnescape(dbname); err != nil { - return nil, fmt.Errorf("invalid dbname %q: %w", dbname, err) + + for k = j + 1; k < i; k++ { + if dsn[k] == ':' { + cfg.Host = dsn[j+1 : k] + cfg.Port = dsn[k+1 : i] + break + } } - break } + + // dbname[?param1=value1&...¶mN=valueN] + // Find the first '?' in dsn[i+1:] + for j = i + 1; j < len(dsn); j++ { + if dsn[j] == '?' { + parts := strings.Split(dsn[j+1:], "&") + cfg.Params = make([]string, 0, len(parts)*2) + for _, p := range parts { + k, v, found := strings.Cut(p, "=") + if !found { + continue + } + cfg.Params = append(cfg.Params, k, v) + } + + break + } + } + var err error + dbname := dsn[i+1 : j] + if cfg.Database, err = url.PathUnescape(dbname); err != nil { + return nil, fmt.Errorf("invalid dbname %q: %w", dbname, err) + } + + break } if !foundSlash && len(dsn) > 0 { diff --git a/go.mod b/go.mod index 80403212..5a3afc27 100644 --- a/go.mod +++ b/go.mod @@ -5,17 +5,16 @@ go 1.20 require ( dario.cat/mergo v1.0.0 github.com/DATA-DOG/go-sqlmock v1.5.0 - github.com/google/uuid v1.3.1 + github.com/google/uuid v1.6.0 github.com/patrickmn/go-cache v2.1.0+incompatible github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 - golang.org/x/sync v0.3.0 - golang.org/x/sys v0.12.0 - google.golang.org/grpc v1.58.2 - google.golang.org/protobuf v1.31.0 + golang.org/x/sync v0.6.0 + golang.org/x/sys v0.16.0 + google.golang.org/grpc v1.62.1 + google.golang.org/protobuf v1.32.0 ) require ( github.com/golang/protobuf v1.5.3 // indirect - golang.org/x/net v0.15.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect ) diff --git a/go.sum b/go.sum index 1bc873a5..cef2316a 100644 --- a/go.sum +++ b/go.sum @@ -6,29 +6,28 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= -github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= -github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 h1:G/FZtUu7a6NTWl3KUHMV9jkLAh/Rvtf03NWMHaEDl+E= github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= -golang.org/x/net v0.15.0 h1:ugBLEUaxABaB5AJqW9enI0ACdci2RUd4eP51NTBvuJ8= -golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= -golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= -golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= -golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= -golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo= +golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= +golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 h1:bVf09lpb+OJbByTj913DRJioFFAjf/ZGxEz7MajTp2U= -google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98/go.mod h1:TUfxEVdsvPg18p6AslUXFoLdpED4oBnGwyqk3dV1XzM= -google.golang.org/grpc v1.58.2 h1:SXUpjxeVF3FKrTYQI4f4KvbGD5u2xccdYdurwowix5I= -google.golang.org/grpc v1.58.2/go.mod h1:tgX3ZQDlNJGU96V6yHh1T/JeoBQ2TXdr43YbYSsCJk0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 h1:AjyfHzEPEFp/NpvfN5g+KDla3EMojjhRVZc1i7cj+oM= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80/go.mod h1:PAREbraiVEVGVdTZsVWjSbbTtSyGbAgIIvni8a8CD5s= +google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk= +google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= -google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= +google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/logger/logger.go b/logger/logger.go index 8df1523b..496e0f9d 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -48,8 +48,10 @@ type Logger interface { Fatal(ctx context.Context, msg string, attrs ...interface{}) // Log logs message with needed level Log(ctx context.Context, level Level, msg string, attrs ...interface{}) - // String returns the name of logger + // String returns the type name of logger String() string + // String returns the name of logger + Name() string } // Info writes formatted msg to default logger on info level @@ -62,8 +64,8 @@ func Error(ctx context.Context, msg string, attrs ...interface{}) { DefaultLogger.Error(ctx, msg, attrs...) } -// Debugf writes formatted msg to default logger on debug level -func Debugf(ctx context.Context, msg string, attrs ...interface{}) { +// Debug writes formatted msg to default logger on debug level +func Debug(ctx context.Context, msg string, attrs ...interface{}) { DefaultLogger.Debug(ctx, msg, attrs...) } diff --git a/logger/noop.go b/logger/noop.go index e59f932f..f2c40057 100644 --- a/logger/noop.go +++ b/logger/noop.go @@ -45,6 +45,10 @@ func (l *noopLogger) Options() Options { return l.opts } +func (l *noopLogger) Name() string { + return l.opts.Name +} + func (l *noopLogger) String() string { return "noop" } diff --git a/logger/options.go b/logger/options.go index 3018121f..5ad9145f 100644 --- a/logger/options.go +++ b/logger/options.go @@ -6,6 +6,7 @@ import ( "log/slog" "os" "reflect" + "time" "go.unistack.org/micro/v4/options" rutil "go.unistack.org/micro/v4/util/reflect" @@ -17,24 +18,34 @@ type Options struct { Out io.Writer // Context holds exernal options Context context.Context - // Attrs holds additional attributes - Attrs []interface{} - // Name holds the logger name - Name string - // The logging level the logger should log - Level Level - // CallerSkipCount number of frmaes to skip - CallerSkipCount int - // ContextAttrFuncs contains funcs that executed before log func on context - ContextAttrFuncs []ContextAttrFunc + // TimeFunc used to obtain current time + TimeFunc func() time.Time // TimeKey is the key used for the time of the log call TimeKey string + // Name holds the logger name + Name string // LevelKey is the key used for the level of the log call LevelKey string // MessageKey is the key used for the message of the log call MessageKey string + // ErrorKey is the key used for the error info + ErrorKey string // SourceKey is the key used for the source file and line of the log call SourceKey string + // StacktraceKey is the key used for the stacktrace + StacktraceKey string + // Attrs holds additional attributes + Attrs []interface{} + // ContextAttrFuncs contains funcs that executed before log func on context + ContextAttrFuncs []ContextAttrFunc + // CallerSkipCount number of frmaes to skip + CallerSkipCount int + // The logging level the logger should log + Level Level + // AddStacktrace controls writing of stacktaces on error + AddStacktrace bool + // AddSource enabled writing source file and position in log + AddSource bool } // NewOptions creates new options struct @@ -46,12 +57,14 @@ func NewOptions(opts ...options.Option) Options { CallerSkipCount: DefaultCallerSkipCount, Context: context.Background(), ContextAttrFuncs: DefaultContextAttrFuncs, + AddSource: true, + TimeFunc: time.Now, } - WithMicroKeys()(&options) + _ = WithMicroKeys()(&options) for _, o := range opts { - o(&options) + _ = o(&options) } return options } @@ -116,6 +129,12 @@ func WithZapKeys() options.Option { if err = options.Set(src, "caller", ".SourceKey"); err != nil { return err } + if err = options.Set(src, "stacktrace", ".StacktraceKey"); err != nil { + return err + } + if err = options.Set(src, "error", ".ErrorKey"); err != nil { + return err + } return nil } } @@ -135,6 +154,12 @@ func WithZerologKeys() options.Option { if err = options.Set(src, "caller", ".SourceKey"); err != nil { return err } + if err = options.Set(src, "stacktrace", ".StacktraceKey"); err != nil { + return err + } + if err = options.Set(src, "error", ".ErrorKey"); err != nil { + return err + } return nil } } @@ -154,6 +179,12 @@ func WithSlogKeys() options.Option { if err = options.Set(src, slog.SourceKey, ".SourceKey"); err != nil { return err } + if err = options.Set(src, "stacktrace", ".StacktraceKey"); err != nil { + return err + } + if err = options.Set(src, "error", ".ErrorKey"); err != nil { + return err + } return nil } } @@ -173,6 +204,12 @@ func WithMicroKeys() options.Option { if err = options.Set(src, "caller", ".SourceKey"); err != nil { return err } + if err = options.Set(src, "stacktrace", ".StacktraceKey"); err != nil { + return err + } + if err = options.Set(src, "error", ".ErrorKey"); err != nil { + return err + } return nil } } @@ -189,3 +226,17 @@ func WithIncCallerSkipCount(n int) options.Option { return nil } } + +// WithAddStacktrace controls writing stacktrace on error +func WithAddStacktrace(v bool) options.Option { + return func(src interface{}) error { + return options.Set(src, v, ".AddStacktrace") + } +} + +// WitAddSource controls writing source file and pos in log +func WithAddSource(v bool) options.Option { + return func(src interface{}) error { + return options.Set(src, v, ".AddSource") + } +} diff --git a/logger/slog/slog.go b/logger/slog/slog.go index eca419b4..3011e58d 100644 --- a/logger/slog/slog.go +++ b/logger/slog/slog.go @@ -4,15 +4,18 @@ import ( "context" "log/slog" "os" + "regexp" "runtime" "strconv" - "time" + "sync" "go.unistack.org/micro/v4/logger" "go.unistack.org/micro/v4/options" "go.unistack.org/micro/v4/tracer" ) +var reTrace = regexp.MustCompile(`.*/slog/logger\.go.*\n`) + var ( traceValue = slog.StringValue("trace") debugValue = slog.StringValue("debug") @@ -58,38 +61,33 @@ func (s *slogLogger) renameAttr(_ []string, a slog.Attr) slog.Attr { } type slogLogger struct { - slog *slog.Logger leveler *slog.LevelVar + handler slog.Handler opts logger.Options + mu sync.RWMutex } func (s *slogLogger) Clone(opts ...options.Option) logger.Logger { + s.mu.RLock() options := s.opts + s.mu.RUnlock() for _, o := range opts { - o(&options) + _ = o(&options) } l := &slogLogger{ opts: options, } - /* - if slog, ok := s.opts.Context.Value(loggerKey{}).(*slog.Logger); ok { - l.slog = slog - return nil - } - */ - l.leveler = new(slog.LevelVar) handleOpt := &slog.HandlerOptions{ - ReplaceAttr: s.renameAttr, + ReplaceAttr: l.renameAttr, Level: l.leveler, - AddSource: true, + AddSource: l.opts.AddSource, } l.leveler.Set(loggerToSlogLevel(l.opts.Level)) - handler := slog.NewJSONHandler(options.Out, handleOpt) - l.slog = slog.New(handler).With(options.Attrs...) + l.handler = slog.New(slog.NewJSONHandler(options.Out, handleOpt)).With(options.Attrs...).Handler() return l } @@ -107,48 +105,48 @@ func (s *slogLogger) Options() logger.Options { } func (s *slogLogger) Attrs(attrs ...interface{}) logger.Logger { - nl := &slogLogger{opts: s.opts} - nl.leveler = new(slog.LevelVar) - nl.leveler.Set(s.leveler.Level()) + s.mu.RLock() + level := s.leveler.Level() + options := s.opts + s.mu.RUnlock() + + l := &slogLogger{opts: options} + l.leveler = new(slog.LevelVar) + l.leveler.Set(level) handleOpt := &slog.HandlerOptions{ - ReplaceAttr: nl.renameAttr, - Level: s.leveler, - AddSource: true, + ReplaceAttr: l.renameAttr, + Level: l.leveler, + AddSource: l.opts.AddSource, } - handler := slog.NewJSONHandler(s.opts.Out, handleOpt) - nl.slog = slog.New(handler).With(attrs...) + l.handler = slog.New(slog.NewJSONHandler(s.opts.Out, handleOpt)).With(attrs...).Handler() - return nl + return l } func (s *slogLogger) Init(opts ...options.Option) error { + s.mu.Lock() + if len(s.opts.ContextAttrFuncs) == 0 { s.opts.ContextAttrFuncs = logger.DefaultContextAttrFuncs } + for _, o := range opts { if err := o(&s.opts); err != nil { return err } } - /* - if slog, ok := s.opts.Context.Value(loggerKey{}).(*slog.Logger); ok { - s.slog = slog - return nil - } - */ - s.leveler = new(slog.LevelVar) handleOpt := &slog.HandlerOptions{ ReplaceAttr: s.renameAttr, Level: s.leveler, - AddSource: true, + AddSource: s.opts.AddSource, } s.leveler.Set(loggerToSlogLevel(s.opts.Level)) - handler := slog.NewJSONHandler(s.opts.Out, handleOpt) - s.slog = slog.New(handler).With(s.opts.Attrs...) + s.handler = slog.New(slog.NewJSONHandler(s.opts.Out, handleOpt)).With(s.opts.Attrs...).Handler() + s.mu.Unlock() return nil } @@ -159,12 +157,36 @@ func (s *slogLogger) Log(ctx context.Context, lvl logger.Level, msg string, attr } var pcs [1]uintptr runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, Infof] - r := slog.NewRecord(time.Now(), loggerToSlogLevel(lvl), msg, pcs[0]) + r := slog.NewRecord(s.opts.TimeFunc(), loggerToSlogLevel(lvl), msg, pcs[0]) for _, fn := range s.opts.ContextAttrFuncs { attrs = append(attrs, fn(ctx)...) } + for idx, attr := range attrs { + if ve, ok := attr.(error); ok && ve != nil { + attrs[idx] = slog.String(s.opts.ErrorKey, ve.Error()) + break + } + } + if s.opts.AddStacktrace && lvl == logger.ErrorLevel { + stackInfo := make([]byte, 1024*1024) + if stackSize := runtime.Stack(stackInfo, false); stackSize > 0 { + traceLines := reTrace.Split(string(stackInfo[:stackSize]), -1) + if len(traceLines) != 0 { + attrs = append(attrs, slog.String(s.opts.StacktraceKey, traceLines[len(traceLines)-1])) + } + } + } r.Add(attrs...) - _ = s.slog.Handler().Handle(ctx, r) + r.Attrs(func(a slog.Attr) bool { + if a.Key == s.opts.ErrorKey { + if span, ok := tracer.SpanFromContext(ctx); ok { + span.SetStatus(tracer.SpanStatusError, a.Value.String()) + return false + } + } + return true + }) + _ = s.handler.Handle(ctx, r) } func (s *slogLogger) Info(ctx context.Context, msg string, attrs ...interface{}) { @@ -173,12 +195,18 @@ func (s *slogLogger) Info(ctx context.Context, msg string, attrs ...interface{}) } var pcs [1]uintptr runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, Infof] - r := slog.NewRecord(time.Now(), slog.LevelInfo, msg, pcs[0]) + r := slog.NewRecord(s.opts.TimeFunc(), slog.LevelInfo, msg, pcs[0]) for _, fn := range s.opts.ContextAttrFuncs { attrs = append(attrs, fn(ctx)...) } + for idx, attr := range attrs { + if ve, ok := attr.(error); ok && ve != nil { + attrs[idx] = slog.String(s.opts.ErrorKey, ve.Error()) + break + } + } r.Add(attrs...) - _ = s.slog.Handler().Handle(ctx, r) + _ = s.handler.Handle(ctx, r) } func (s *slogLogger) Debug(ctx context.Context, msg string, attrs ...interface{}) { @@ -187,12 +215,18 @@ func (s *slogLogger) Debug(ctx context.Context, msg string, attrs ...interface{} } var pcs [1]uintptr runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, Infof] - r := slog.NewRecord(time.Now(), slog.LevelDebug, msg, pcs[0]) + r := slog.NewRecord(s.opts.TimeFunc(), slog.LevelDebug, msg, pcs[0]) for _, fn := range s.opts.ContextAttrFuncs { attrs = append(attrs, fn(ctx)...) } + for idx, attr := range attrs { + if ve, ok := attr.(error); ok && ve != nil { + attrs[idx] = slog.String(s.opts.ErrorKey, ve.Error()) + break + } + } r.Add(attrs...) - _ = s.slog.Handler().Handle(ctx, r) + _ = s.handler.Handle(ctx, r) } func (s *slogLogger) Trace(ctx context.Context, msg string, attrs ...interface{}) { @@ -201,12 +235,18 @@ func (s *slogLogger) Trace(ctx context.Context, msg string, attrs ...interface{} } var pcs [1]uintptr runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, Infof] - r := slog.NewRecord(time.Now(), slog.LevelDebug-1, msg, pcs[0]) + r := slog.NewRecord(s.opts.TimeFunc(), slog.LevelDebug-1, msg, pcs[0]) for _, fn := range s.opts.ContextAttrFuncs { attrs = append(attrs, fn(ctx)...) } + for idx, attr := range attrs { + if ve, ok := attr.(error); ok && ve != nil { + attrs[idx] = slog.String(s.opts.ErrorKey, ve.Error()) + break + } + } r.Add(attrs...) - _ = s.slog.Handler().Handle(ctx, r) + _ = s.handler.Handle(ctx, r) } func (s *slogLogger) Error(ctx context.Context, msg string, attrs ...interface{}) { @@ -215,13 +255,28 @@ func (s *slogLogger) Error(ctx context.Context, msg string, attrs ...interface{} } var pcs [1]uintptr runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, Infof] - r := slog.NewRecord(time.Now(), slog.LevelError, msg, pcs[0]) + r := slog.NewRecord(s.opts.TimeFunc(), slog.LevelError, msg, pcs[0]) for _, fn := range s.opts.ContextAttrFuncs { attrs = append(attrs, fn(ctx)...) } + for idx, attr := range attrs { + if ve, ok := attr.(error); ok && ve != nil { + attrs[idx] = slog.String(s.opts.ErrorKey, ve.Error()) + break + } + } + if s.opts.AddStacktrace { + stackInfo := make([]byte, 1024*1024) + if stackSize := runtime.Stack(stackInfo, false); stackSize > 0 { + traceLines := reTrace.Split(string(stackInfo[:stackSize]), -1) + if len(traceLines) != 0 { + attrs = append(attrs, slog.String(s.opts.StacktraceKey, traceLines[len(traceLines)-1])) + } + } + } r.Add(attrs...) r.Attrs(func(a slog.Attr) bool { - if a.Key == "error" { + if a.Key == s.opts.ErrorKey { if span, ok := tracer.SpanFromContext(ctx); ok { span.SetStatus(tracer.SpanStatusError, a.Value.String()) return false @@ -229,7 +284,7 @@ func (s *slogLogger) Error(ctx context.Context, msg string, attrs ...interface{} } return true }) - _ = s.slog.Handler().Handle(ctx, r) + _ = s.handler.Handle(ctx, r) } func (s *slogLogger) Fatal(ctx context.Context, msg string, attrs ...interface{}) { @@ -238,12 +293,18 @@ func (s *slogLogger) Fatal(ctx context.Context, msg string, attrs ...interface{} } var pcs [1]uintptr runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, Infof] - r := slog.NewRecord(time.Now(), slog.LevelError+1, msg, pcs[0]) + r := slog.NewRecord(s.opts.TimeFunc(), slog.LevelError+1, msg, pcs[0]) for _, fn := range s.opts.ContextAttrFuncs { attrs = append(attrs, fn(ctx)...) } + for idx, attr := range attrs { + if ve, ok := attr.(error); ok && ve != nil { + attrs[idx] = slog.String(s.opts.ErrorKey, ve.Error()) + break + } + } r.Add(attrs...) - _ = s.slog.Handler().Handle(ctx, r) + _ = s.handler.Handle(ctx, r) os.Exit(1) } @@ -253,12 +314,22 @@ func (s *slogLogger) Warn(ctx context.Context, msg string, attrs ...interface{}) } var pcs [1]uintptr runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, Infof] - r := slog.NewRecord(time.Now(), slog.LevelWarn, msg, pcs[0]) + r := slog.NewRecord(s.opts.TimeFunc(), slog.LevelWarn, msg, pcs[0]) for _, fn := range s.opts.ContextAttrFuncs { attrs = append(attrs, fn(ctx)...) } + for idx, attr := range attrs { + if ve, ok := attr.(error); ok && ve != nil { + attrs[idx] = slog.String(s.opts.ErrorKey, ve.Error()) + break + } + } r.Add(attrs...) - _ = s.slog.Handler().Handle(ctx, r) + _ = s.handler.Handle(ctx, r) +} + +func (s *slogLogger) Name() string { + return s.opts.Name } func (s *slogLogger) String() string { diff --git a/logger/slog/slog_test.go b/logger/slog/slog_test.go index 20e9d66f..05462d94 100644 --- a/logger/slog/slog_test.go +++ b/logger/slog/slog_test.go @@ -3,12 +3,26 @@ package slog import ( "bytes" "context" + "fmt" "log" "testing" "go.unistack.org/micro/v4/logger" ) +func TestError(t *testing.T) { + ctx := context.TODO() + buf := bytes.NewBuffer(nil) + l := NewLogger(logger.WithLevel(logger.ErrorLevel), logger.WithOutput(buf), logger.WithAddStacktrace(true)) + if err := l.Init(); err != nil { + t.Fatal(err) + } + l.Error(ctx, "msg", fmt.Errorf("message")) + if !bytes.Contains(buf.Bytes(), []byte(`"stacktrace":"`)) { + t.Fatalf("logger stacktrace not works, buf contains: %s", buf.Bytes()) + } +} + func TestContext(t *testing.T) { ctx := context.TODO() buf := bytes.NewBuffer(nil) diff --git a/logger/unwrap/unwrap.go b/logger/unwrap/unwrap.go index e13844fb..3c456f19 100644 --- a/logger/unwrap/unwrap.go +++ b/logger/unwrap/unwrap.go @@ -56,9 +56,9 @@ type Wrapper struct { s fmt.State pointers map[uintptr]int opts *Options + takeMap map[int]bool depth int ignoreNextType bool - takeMap map[int]bool protoWrapperType bool sqlWrapperType bool } diff --git a/metadata/context.go b/metadata/context.go index 1406b30d..340c1b69 100644 --- a/metadata/context.go +++ b/metadata/context.go @@ -17,11 +17,11 @@ func FromIncomingContext(ctx context.Context) (Metadata, bool) { if ctx == nil { return nil, false } - md, ok := ctx.Value(mdIncomingKey{}).(*rawMetadata) - if !ok || md.md == nil { + md, ok := ctx.Value(mdIncomingKey{}).(Metadata) + if !ok || md == nil { return nil, false } - return md.md, ok + return md, ok } // FromOutgoingContext returns metadata from outgoing ctx @@ -30,11 +30,11 @@ func FromOutgoingContext(ctx context.Context) (Metadata, bool) { if ctx == nil { return nil, false } - md, ok := ctx.Value(mdOutgoingKey{}).(*rawMetadata) - if !ok || md.md == nil { + md, ok := ctx.Value(mdOutgoingKey{}).(Metadata) + if !ok || md == nil { return nil, false } - return md.md, ok + return md, ok } // FromContext returns metadata from the given context @@ -43,11 +43,11 @@ func FromContext(ctx context.Context) (Metadata, bool) { if ctx == nil { return nil, false } - md, ok := ctx.Value(mdKey{}).(*rawMetadata) - if !ok || md.md == nil { + md, ok := ctx.Value(mdKey{}).(Metadata) + if !ok || md == nil { return nil, false } - return md.md, ok + return md, ok } // NewContext creates a new context with the given metadata @@ -55,45 +55,16 @@ func NewContext(ctx context.Context, md Metadata) context.Context { if ctx == nil { ctx = context.Background() } - ctx = context.WithValue(ctx, mdKey{}, &rawMetadata{md}) - ctx = context.WithValue(ctx, mdIncomingKey{}, &rawMetadata{}) - ctx = context.WithValue(ctx, mdOutgoingKey{}, &rawMetadata{}) + ctx = context.WithValue(ctx, mdKey{}, md) return ctx } -// SetOutgoingContext modify outgoing context with given metadata -func SetOutgoingContext(ctx context.Context, md Metadata) bool { - if ctx == nil { - return false - } - if omd, ok := ctx.Value(mdOutgoingKey{}).(*rawMetadata); ok { - omd.md = md - return true - } - return false -} - -// SetIncomingContext modify incoming context with given metadata -func SetIncomingContext(ctx context.Context, md Metadata) bool { - if ctx == nil { - return false - } - if omd, ok := ctx.Value(mdIncomingKey{}).(*rawMetadata); ok { - omd.md = md - return true - } - return false -} - // NewIncomingContext creates a new context with incoming metadata attached func NewIncomingContext(ctx context.Context, md Metadata) context.Context { if ctx == nil { ctx = context.Background() } - ctx = context.WithValue(ctx, mdIncomingKey{}, &rawMetadata{md}) - if v, ok := ctx.Value(mdOutgoingKey{}).(*rawMetadata); !ok || v == nil { - ctx = context.WithValue(ctx, mdOutgoingKey{}, &rawMetadata{}) - } + ctx = context.WithValue(ctx, mdIncomingKey{}, md) return ctx } @@ -102,41 +73,28 @@ func NewOutgoingContext(ctx context.Context, md Metadata) context.Context { if ctx == nil { ctx = context.Background() } - ctx = context.WithValue(ctx, mdOutgoingKey{}, &rawMetadata{md}) - if v, ok := ctx.Value(mdIncomingKey{}).(*rawMetadata); !ok || v == nil { - ctx = context.WithValue(ctx, mdIncomingKey{}, &rawMetadata{}) - } + ctx = context.WithValue(ctx, mdOutgoingKey{}, md) return ctx } // AppendOutgoingContext apends new md to context func AppendOutgoingContext(ctx context.Context, kv ...string) context.Context { - md, ok := Pairs(kv...) - if !ok { - return ctx - } + md := Pairs(kv...) omd, ok := FromOutgoingContext(ctx) if !ok { return NewOutgoingContext(ctx, md) } - for k, v := range md { - omd.Set(k, v) - } - return NewOutgoingContext(ctx, omd) + nmd := Merge(omd, md, true) + return NewOutgoingContext(ctx, nmd) } // AppendIncomingContext apends new md to context func AppendIncomingContext(ctx context.Context, kv ...string) context.Context { - md, ok := Pairs(kv...) - if !ok { - return ctx - } + md := Pairs(kv...) omd, ok := FromIncomingContext(ctx) if !ok { return NewIncomingContext(ctx, md) } - for k, v := range md { - omd.Set(k, v) - } - return NewIncomingContext(ctx, omd) + nmd := Merge(omd, md, true) + return NewIncomingContext(ctx, nmd) } diff --git a/metadata/context_test.go b/metadata/context_test.go index deaa020a..e7df6b06 100644 --- a/metadata/context_test.go +++ b/metadata/context_test.go @@ -24,7 +24,7 @@ func TestNewNilContext(t *testing.T) { } func TestFromContext(t *testing.T) { - ctx := context.WithValue(context.TODO(), mdKey{}, &rawMetadata{New(0)}) + ctx := context.WithValue(context.TODO(), mdKey{}, New(0)) c, ok := FromContext(ctx) if c == nil || !ok { @@ -42,7 +42,7 @@ func TestNewContext(t *testing.T) { } func TestFromIncomingContext(t *testing.T) { - ctx := context.WithValue(context.TODO(), mdIncomingKey{}, &rawMetadata{New(0)}) + ctx := context.WithValue(context.TODO(), mdIncomingKey{}, New(0)) c, ok := FromIncomingContext(ctx) if c == nil || !ok { @@ -51,7 +51,7 @@ func TestFromIncomingContext(t *testing.T) { } func TestFromOutgoingContext(t *testing.T) { - ctx := context.WithValue(context.TODO(), mdOutgoingKey{}, &rawMetadata{New(0)}) + ctx := context.WithValue(context.TODO(), mdOutgoingKey{}, New(0)) c, ok := FromOutgoingContext(ctx) if c == nil || !ok { @@ -59,36 +59,6 @@ func TestFromOutgoingContext(t *testing.T) { } } -func TestSetIncomingContext(t *testing.T) { - md := New(1) - md.Set("key", "val") - ctx := context.WithValue(context.TODO(), mdIncomingKey{}, &rawMetadata{}) - if !SetIncomingContext(ctx, md) { - t.Fatal("SetIncomingContext not works") - } - md, ok := FromIncomingContext(ctx) - if md == nil || !ok { - t.Fatal("SetIncomingContext not works") - } else if v, ok := md.Get("key"); !ok || v != "val" { - t.Fatal("SetIncomingContext not works") - } -} - -func TestSetOutgoingContext(t *testing.T) { - md := New(1) - md.Set("key", "val") - ctx := context.WithValue(context.TODO(), mdOutgoingKey{}, &rawMetadata{}) - if !SetOutgoingContext(ctx, md) { - t.Fatal("SetOutgoingContext not works") - } - md, ok := FromOutgoingContext(ctx) - if md == nil || !ok { - t.Fatal("SetOutgoingContext not works") - } else if v, ok := md.Get("key"); !ok || v != "val" { - t.Fatal("SetOutgoingContext not works") - } -} - func TestNewIncomingContext(t *testing.T) { md := New(1) md.Set("key", "val") diff --git a/metadata/metadata.go b/metadata/metadata.go index 34434f21..f846dc71 100644 --- a/metadata/metadata.go +++ b/metadata/metadata.go @@ -1,9 +1,9 @@ // Package metadata is a way of defining message headers -package metadata // import "go.unistack.org/micro/v4/metadata" +package metadata import ( "net/textproto" - "sort" + "strings" ) var ( @@ -24,47 +24,7 @@ var ( ) // Metadata is our way of representing request headers internally. -// They're used at the RPC level and translate back and forth -// from Transport headers. -type Metadata map[string]string - -type rawMetadata struct { - md Metadata -} - -// defaultMetadataSize used when need to init new Metadata -var defaultMetadataSize = 2 - -// Iterator used to iterate over metadata with order -type Iterator struct { - md Metadata - keys []string - cur int - cnt int -} - -// Next advance iterator to next element -func (iter *Iterator) Next(k, v *string) bool { - if iter.cur+1 > iter.cnt { - return false - } - - *k = iter.keys[iter.cur] - *v = iter.md[*k] - iter.cur++ - return true -} - -// Iterator returns the itarator for metadata in sorted order -func (md Metadata) Iterator() *Iterator { - iter := &Iterator{md: md, cnt: len(md)} - iter.keys = make([]string, 0, iter.cnt) - for k := range md { - iter.keys = append(iter.keys, k) - } - sort.Strings(iter.keys) - return iter -} +type Metadata map[string][]string // Get returns value from metadata by key func (md Metadata) Get(key string) (string, bool) { @@ -74,7 +34,7 @@ func (md Metadata) Get(key string) (string, bool) { // slow path val, ok = md[textproto.CanonicalMIMEHeaderKey(key)] } - return val, ok + return strings.Join(val, ","), ok } // Set is used to store value in metadata @@ -83,10 +43,19 @@ func (md Metadata) Set(kv ...string) { kv = kv[:len(kv)-1] } for idx := 0; idx < len(kv); idx += 2 { - md[textproto.CanonicalMIMEHeaderKey(kv[idx])] = kv[idx+1] + md[textproto.CanonicalMIMEHeaderKey(kv[idx])] = []string{kv[idx+1]} } } +// Append is used to append value in metadata +func (md Metadata) Append(k string, v ...string) { + if len(v) == 0 { + return + } + k = textproto.CanonicalMIMEHeaderKey(k) + md[k] = append(md[k], v...) +} + // Del is used to remove value from metadata func (md Metadata) Del(keys ...string) { for _, key := range keys { @@ -98,46 +67,52 @@ func (md Metadata) Del(keys ...string) { } // Copy makes a copy of the metadata -func Copy(md Metadata) Metadata { - nmd := New(len(md)) - for key, val := range md { - nmd.Set(key, val) +func Copy(md Metadata, exclude ...string) Metadata { + nmd := make(Metadata, len(md)) + for k, v := range md { + nmd[k] = v } + nmd.Del(exclude...) return nmd } // New return new sized metadata func New(size int) Metadata { if size == 0 { - size = defaultMetadataSize + size = 2 } return make(Metadata, size) } // Merge merges metadata to existing metadata, overwriting if specified func Merge(omd Metadata, mmd Metadata, overwrite bool) Metadata { - var ok bool nmd := Copy(omd) for key, val := range mmd { - _, ok = nmd[key] + nval, ok := nmd[key] switch { + case ok && overwrite: + nmd[key] = nval + continue case ok && !overwrite: continue - case val != "": - nmd.Set(key, val) - case ok && val == "": - nmd.Del(key) + case !ok: + for _, v := range val { + if v != "" { + nval = append(nval, v) + } + } + nmd[key] = nval } } return nmd } // Pairs from which metadata created -func Pairs(kv ...string) (Metadata, bool) { +func Pairs(kv ...string) Metadata { if len(kv)%2 == 1 { - return nil, false + kv = kv[:len(kv)-1] } - md := New(len(kv) / 2) + md := make(Metadata, len(kv)/2) md.Set(kv...) - return md, true + return md } diff --git a/metadata/metadata_test.go b/metadata/metadata_test.go index eccea7a7..5d133db0 100644 --- a/metadata/metadata_test.go +++ b/metadata/metadata_test.go @@ -33,30 +33,52 @@ func TestAppend(t *testing.T) { } func TestPairs(t *testing.T) { - md, ok := Pairs("key1", "val1", "key2", "val2") - if !ok { - t.Fatal("odd number of kv") - } - if _, ok = md.Get("key1"); !ok { + md := Pairs("key1", "val1", "key2", "val2") + + if _, ok := md.Get("key1"); !ok { t.Fatal("key1 not found") } } -func testCtx(ctx context.Context) { - md := New(2) - md.Set("Key1", "Val1_new") - md.Set("Key3", "Val3") - SetOutgoingContext(ctx, md) +func testIncomingCtx(ctx context.Context) { + if md, ok := FromIncomingContext(ctx); ok && md != nil { + md.Set("Key1", "Val1_new") + md.Set("Key3", "Val3") + } } -func TestPassing(t *testing.T) { +func testOutgoingCtx(ctx context.Context) { + if md, ok := FromOutgoingContext(ctx); ok && md != nil { + md.Set("Key1", "Val1_new") + md.Set("Key3", "Val3") + } +} + +func TestIncoming(t *testing.T) { ctx := context.TODO() md1 := New(2) md1.Set("Key1", "Val1") md1.Set("Key2", "Val2") ctx = NewIncomingContext(ctx, md1) - testCtx(ctx) + testIncomingCtx(ctx) + md, ok := FromIncomingContext(ctx) + if !ok { + t.Fatalf("missing metadata from incoming context") + } + if v, ok := md.Get("Key1"); !ok || v != "Val1_new" { + t.Fatalf("invalid metadata value %#+v", md) + } +} + +func TestOutgoing(t *testing.T) { + ctx := context.TODO() + md1 := New(2) + md1.Set("Key1", "Val1") + md1.Set("Key2", "Val2") + + ctx = NewOutgoingContext(ctx, md1) + testOutgoingCtx(ctx) md, ok := FromOutgoingContext(ctx) if !ok { t.Fatalf("missing metadata from outgoing context") @@ -68,10 +90,10 @@ func TestPassing(t *testing.T) { func TestMerge(t *testing.T) { omd := Metadata{ - "key1": "val1", + "key1": []string{"val1"}, } mmd := Metadata{ - "key2": "val2", + "key2": []string{"val2"}, } nmd := Merge(omd, mmd, true) @@ -80,21 +102,6 @@ func TestMerge(t *testing.T) { } } -func TestIterator(t *testing.T) { - md := Metadata{ - "1Last": "last", - "2First": "first", - "3Second": "second", - } - - iter := md.Iterator() - var k, v string - - for iter.Next(&k, &v) { - // fmt.Printf("k: %s, v: %s\n", k, v) - } -} - func TestMedataCanonicalKey(t *testing.T) { md := New(1) md.Set("x-request-id", "12345") @@ -134,10 +141,7 @@ func TestMetadataSet(t *testing.T) { } func TestMetadataDelete(t *testing.T) { - md := Metadata{ - "Foo": "bar", - "Baz": "empty", - } + md := Pairs("Foo", "bar", "Baz", "empty") md.Del("Baz") _, ok := md.Get("Baz") @@ -156,24 +160,19 @@ func TestNilContext(t *testing.T) { } func TestMetadataCopy(t *testing.T) { - md := Metadata{ - "Foo": "bar", - "Bar": "baz", - } + md := Pairs("Foo", "bar", "Bar", "baz") cp := Copy(md) for k, v := range md { - if cv := cp[k]; cv != v { + if cv := cp[k]; len(cv) != len(v) { t.Fatalf("Got %s:%s for %s:%s", k, cv, k, v) } } } func TestMetadataContext(t *testing.T) { - md := Metadata{ - "Foo": "bar", - } + md := Pairs("Foo", "bar") ctx := NewContext(context.TODO(), md) @@ -182,7 +181,7 @@ func TestMetadataContext(t *testing.T) { t.Errorf("Unexpected error retrieving metadata, got %t", ok) } - if emd["Foo"] != md["Foo"] { + if len(emd["Foo"]) != len(md["Foo"]) { t.Errorf("Expected key: %s val: %s, got key: %s val: %s", "Foo", md["Foo"], "Foo", emd["Foo"]) } @@ -190,3 +189,14 @@ func TestMetadataContext(t *testing.T) { t.Errorf("Expected metadata length 1 got %d", i) } } + +func TestCopy(t *testing.T) { + md := New(2) + md.Set("key1", "val1", "key2", "val2") + nmd := Copy(md, "key2") + if len(nmd) != 1 { + t.Fatal("Copy exclude not works") + } else if nmd["Key1"][0] != "val1" { + t.Fatal("Copy exclude not works") + } +} diff --git a/micro.go b/micro.go new file mode 100644 index 00000000..6163b2e1 --- /dev/null +++ b/micro.go @@ -0,0 +1,98 @@ +package micro + +import ( + "reflect" + + "go.unistack.org/micro/v4/broker" + "go.unistack.org/micro/v4/client" + "go.unistack.org/micro/v4/codec" + "go.unistack.org/micro/v4/flow" + "go.unistack.org/micro/v4/fsm" + "go.unistack.org/micro/v4/logger" + "go.unistack.org/micro/v4/meter" + "go.unistack.org/micro/v4/register" + "go.unistack.org/micro/v4/resolver" + "go.unistack.org/micro/v4/router" + "go.unistack.org/micro/v4/selector" + "go.unistack.org/micro/v4/server" + "go.unistack.org/micro/v4/store" + "go.unistack.org/micro/v4/sync" + "go.unistack.org/micro/v4/tracer" +) + +func As(b any, target any) bool { + if b == nil { + return false + } + if target == nil { + return false + } + val := reflect.ValueOf(target) + typ := val.Type() + if typ.Kind() != reflect.Ptr || val.IsNil() { + return false + } + targetType := typ.Elem() + if targetType.Kind() != reflect.Interface { + switch { + case targetType.Implements(brokerType): + break + case targetType.Implements(loggerType): + break + case targetType.Implements(clientType): + break + case targetType.Implements(serverType): + break + case targetType.Implements(codecType): + break + case targetType.Implements(flowType): + break + case targetType.Implements(fsmType): + break + case targetType.Implements(meterType): + break + case targetType.Implements(registerType): + break + case targetType.Implements(resolverType): + break + case targetType.Implements(selectorType): + break + case targetType.Implements(storeType): + break + case targetType.Implements(syncType): + break + case targetType.Implements(serviceType): + break + case targetType.Implements(routerType): + break + case targetType.Implements(tracerType): + break + default: + return false + } + } + if reflect.TypeOf(b).AssignableTo(targetType) { + val.Elem().Set(reflect.ValueOf(b)) + return true + } + return false +} + +var ( + brokerType = reflect.TypeOf((*broker.Broker)(nil)).Elem() + loggerType = reflect.TypeOf((*logger.Logger)(nil)).Elem() + clientType = reflect.TypeOf((*client.Client)(nil)).Elem() + serverType = reflect.TypeOf((*server.Server)(nil)).Elem() + codecType = reflect.TypeOf((*codec.Codec)(nil)).Elem() + flowType = reflect.TypeOf((*flow.Flow)(nil)).Elem() + fsmType = reflect.TypeOf((*fsm.FSM)(nil)).Elem() + meterType = reflect.TypeOf((*meter.Meter)(nil)).Elem() + registerType = reflect.TypeOf((*register.Register)(nil)).Elem() + resolverType = reflect.TypeOf((*resolver.Resolver)(nil)).Elem() + routerType = reflect.TypeOf((*router.Router)(nil)).Elem() + selectorType = reflect.TypeOf((*selector.Selector)(nil)).Elem() + storeType = reflect.TypeOf((*store.Store)(nil)).Elem() + syncType = reflect.TypeOf((*sync.Sync)(nil)).Elem() + tracerType = reflect.TypeOf((*tracer.Tracer)(nil)).Elem() + serviceType = reflect.TypeOf((*Service)(nil)).Elem() +) diff --git a/micro_test.go b/micro_test.go new file mode 100644 index 00000000..c9dc1bbb --- /dev/null +++ b/micro_test.go @@ -0,0 +1,103 @@ +package micro + +import ( + "context" + "fmt" + "reflect" + "testing" + + "go.unistack.org/micro/v4/broker" + "go.unistack.org/micro/v4/fsm" + "go.unistack.org/micro/v4/options" +) + +func TestAs(t *testing.T) { + var b *bro + broTarget := &bro{name: "kafka"} + fsmTarget := &fsmT{name: "fsm"} + + testCases := []struct { + b any + target any + match bool + want any + }{ + { + broTarget, + &b, + true, + broTarget, + }, + { + nil, + &b, + false, + nil, + }, + { + fsmTarget, + &b, + false, + nil, + }, + } + for i, tc := range testCases { + name := fmt.Sprintf("%d:As(Errorf(..., %v), %v)", i, tc.b, tc.target) + // Clear the target pointer, in case it was set in a previous test. + rtarget := reflect.ValueOf(tc.target) + rtarget.Elem().Set(reflect.Zero(reflect.TypeOf(tc.target).Elem())) + t.Run(name, func(t *testing.T) { + match := As(tc.b, tc.target) + if match != tc.match { + t.Fatalf("match: got %v; want %v", match, tc.match) + } + if !match { + return + } + if got := rtarget.Elem().Interface(); got != tc.want { + t.Fatalf("got %#v, want %#v", got, tc.want) + } + }) + } +} + +type bro struct { + name string +} + +func (p *bro) Name() string { return p.name } +func (p *bro) Init(opts ...options.Option) error { return nil } + +// Options returns broker options +func (p *bro) Options() broker.Options { return broker.Options{} } + +// Address return configured address +func (p *bro) Address() string { return "" } + +// Connect connects to broker +func (p *bro) Connect(ctx context.Context) error { return nil } + +// Disconnect disconnect from broker +func (p *bro) Disconnect(ctx context.Context) error { return nil } + +// Publish message, msg can be single broker.Message or []broker.Message +func (p *bro) Publish(ctx context.Context, msg interface{}, opts ...options.Option) error { return nil } + +// Subscribe subscribes to topic message via handler +func (p *bro) Subscribe(ctx context.Context, topic string, handler interface{}, opts ...options.Option) (broker.Subscriber, error) { + return nil, nil +} + +// String type of broker +func (p *bro) String() string { return p.name } + +type fsmT struct { + name string +} + +func (f *fsmT) Start(ctx context.Context, a interface{}, o ...Option) (interface{}, error) { + return nil, nil +} +func (f *fsmT) Current() string { return f.name } +func (f *fsmT) Reset() {} +func (f *fsmT) State(s string, sf fsm.StateFunc) {} diff --git a/options/options.go b/options/options.go index cda572a6..2417cff2 100644 --- a/options/options.go +++ b/options/options.go @@ -151,9 +151,41 @@ func ContentType(ct string) Option { } // Metadata pass additional metadata -func Metadata(md metadata.Metadata) Option { +func Metadata(md ...any) Option { + var result metadata.Metadata + if len(md) == 1 { + switch vt := md[0].(type) { + case metadata.Metadata: + result = metadata.Copy(vt) + case map[string]string: + result = make(metadata.Metadata, len(vt)) + for k, v := range vt { + result.Set(k, v) + } + case map[string][]string: + result = metadata.Copy(vt) + default: + result = metadata.New(0) + } + } else { + result = metadata.New(len(md) / 2) + for idx := 0; idx <= len(md)/2; idx += 2 { + k, ok := md[idx].(string) + switch vt := md[idx+1].(type) { + case string: + if ok { + result.Set(k, vt) + } + case []string: + if ok { + result.Append(k, vt...) + } + } + } + } + return func(src interface{}) error { - return Set(src, metadata.Copy(md), ".Metadata") + return Set(src, result, ".Metadata") } } diff --git a/options/options_test.go b/options/options_test.go index 7d3b6a62..9a8946a2 100644 --- a/options/options_test.go +++ b/options/options_test.go @@ -4,7 +4,9 @@ import ( "testing" "go.unistack.org/micro/v4/codec" + "go.unistack.org/micro/v4/metadata" "go.unistack.org/micro/v4/options" + "go.unistack.org/micro/v4/util/reflect" ) func TestAddress(t *testing.T) { @@ -84,3 +86,64 @@ func TestLabels(t *testing.T) { t.Fatal("failed to set labels") } } + +func TestMetadataAny(t *testing.T) { + type s struct { + Metadata metadata.Metadata + } + + testCases := []struct { + Name string + Data any + Expected metadata.Metadata + }{ + { + "strings_even", + []any{"Strkey1", []string{"val1"}, "Strkey2", []string{"val2"}}, + metadata.Pairs("Strkey1", "val1", "Strkey2", "val2"), + }, + { + "strings_odd", + []any{"key1", "val1", "key2"}, + metadata.Pairs("Key1", "val1"), + }, + { + Name: "map", + Data: map[string][]string{ + "Mapkey1": {"val1"}, + "Mapkey2": {"val2"}, + }, + Expected: metadata.Metadata{ + "Mapkey1": []string{"val1"}, + "Mapkey2": []string{"val2"}, + }, + }, + { + "metadata.Metadata", + metadata.Pairs("key1", "val1", "key2", "val2"), + metadata.Pairs("Key1", "val1", "Key2", "val2"), + }, + } + + for _, tt := range testCases { + t.Run(tt.Name, func(t *testing.T) { + src := &s{} + var opts []options.Option + switch valData := tt.Data.(type) { + case []any: + opts = append(opts, options.Metadata(valData...)) + case map[string]string, map[string][]string, metadata.Metadata: + opts = append(opts, options.Metadata(valData)) + } + + for _, o := range opts { + if err := o(src); err != nil { + t.Fatal(err) + } + if !reflect.Equal(tt.Expected, src.Metadata) { + t.Fatalf("expected: %v, actual: %v", tt.Expected, src.Metadata) + } + } + }) + } +} diff --git a/register/memory.go b/register/memory/memory.go similarity index 68% rename from register/memory.go rename to register/memory/memory.go index 8cd75935..35a12562 100644 --- a/register/memory.go +++ b/register/memory/memory.go @@ -1,10 +1,13 @@ -package register +package memory import ( "context" "sync" "time" + "go.unistack.org/micro/v4/metadata" + "go.unistack.org/micro/v4/register" + "go.unistack.org/micro/v4/logger" "go.unistack.org/micro/v4/util/id" ) @@ -16,32 +19,32 @@ var ( type node struct { LastSeen time.Time - *Node + *register.Node TTL time.Duration } type record struct { Name string Version string - Metadata map[string]string + Metadata metadata.Metadata Nodes map[string]*node - Endpoints []*Endpoint + Endpoints []*register.Endpoint } type memory struct { - sync.RWMutex records map[string]services watchers map[string]*watcher - opts Options + opts register.Options + sync.RWMutex } // services is a KV map with service name as the key and a map of records as the value type services map[string]map[string]*record // NewRegister returns an initialized in-memory register -func NewRegister(opts ...Option) Register { +func NewRegister(opts ...register.Option) register.Register { r := &memory{ - opts: NewOptions(opts...), + opts: register.NewOptions(opts...), records: make(map[string]services), watchers: make(map[string]*watcher), } @@ -75,7 +78,7 @@ func (m *memory) ttlPrune() { } } -func (m *memory) sendEvent(r *Result) { +func (m *memory) sendEvent(r *register.Result) { m.RLock() watchers := make([]*watcher, 0, len(m.watchers)) for _, w := range m.watchers { @@ -99,14 +102,24 @@ func (m *memory) sendEvent(r *Result) { } func (m *memory) Connect(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } return nil } func (m *memory) Disconnect(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } return nil } -func (m *memory) Init(opts ...Option) error { +func (m *memory) Init(opts ...register.Option) error { for _, o := range opts { o(&m.opts) } @@ -118,15 +131,20 @@ func (m *memory) Init(opts ...Option) error { return nil } -func (m *memory) Options() Options { +func (m *memory) Options() register.Options { return m.opts } -func (m *memory) Register(ctx context.Context, s *Service, opts ...RegisterOption) error { +func (m *memory) Register(ctx context.Context, s *register.Service, opts ...register.RegisterOption) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } m.Lock() defer m.Unlock() - options := NewRegisterOptions(opts...) + options := register.NewRegisterOptions(opts...) // get the services for this domain from the register srvs, ok := m.records[options.Domain] @@ -136,11 +154,11 @@ func (m *memory) Register(ctx context.Context, s *Service, opts ...RegisterOptio // domain is set in metadata so it can be passed to watchers if s.Metadata == nil { - s.Metadata = map[string]string{"domain": options.Domain} - } else { - s.Metadata["domain"] = options.Domain + s.Metadata = metadata.New(0) } + s.Metadata.Set("domain", options.Domain) + // ensure the service name exists r := serviceToRecord(s, options.TTL) if _, ok := srvs[s.Name]; !ok { @@ -153,7 +171,7 @@ func (m *memory) Register(ctx context.Context, s *Service, opts ...RegisterOptio m.opts.Logger.Debug(m.opts.Context, "register added new service: "+s.Name+", version "+s.Version) } m.records[options.Domain] = srvs - go m.sendEvent(&Result{Action: "create", Service: s}) + go m.sendEvent(®ister.Result{Action: "create", Service: s}) } var addedNodes bool @@ -164,19 +182,12 @@ func (m *memory) Register(ctx context.Context, s *Service, opts ...RegisterOptio continue } - metadata := make(map[string]string, len(n.Metadata)) - - // make copy of metadata - for k, v := range n.Metadata { - metadata[k] = v - } - - // set the domain - metadata["domain"] = options.Domain + metadata := metadata.Copy(n.Metadata) + metadata.Set("domain", options.Domain) // add the node srvs[s.Name][s.Version].Nodes[n.ID] = &node{ - Node: &Node{ + Node: ®ister.Node{ ID: n.ID, Address: n.Address, Metadata: metadata, @@ -192,7 +203,7 @@ func (m *memory) Register(ctx context.Context, s *Service, opts ...RegisterOptio if m.opts.Logger.V(logger.DebugLevel) { m.opts.Logger.Debug(m.opts.Context, "register added new node to service: "+s.Name+", version "+s.Version) } - go m.sendEvent(&Result{Action: "update", Service: s}) + go m.sendEvent(®ister.Result{Action: "update", Service: s}) } else { // refresh TTL and timestamp for _, n := range s.Nodes { @@ -208,18 +219,17 @@ func (m *memory) Register(ctx context.Context, s *Service, opts ...RegisterOptio return nil } -func (m *memory) Deregister(ctx context.Context, s *Service, opts ...DeregisterOption) error { +func (m *memory) Deregister(ctx context.Context, s *register.Service, opts ...register.DeregisterOption) error { m.Lock() defer m.Unlock() - options := NewDeregisterOptions(opts...) + options := register.NewDeregisterOptions(opts...) // domain is set in metadata so it can be passed to watchers if s.Metadata == nil { - s.Metadata = map[string]string{"domain": options.Domain} - } else { - s.Metadata["domain"] = options.Domain + s.Metadata = metadata.New(0) } + s.Metadata.Set("domain", options.Domain) // if the domain doesn't exist, there is nothing to deregister services, ok := m.records[options.Domain] @@ -252,7 +262,7 @@ func (m *memory) Deregister(ctx context.Context, s *Service, opts ...DeregisterO // is cleanup if len(version.Nodes) > 0 { m.records[options.Domain][s.Name][s.Version] = version - go m.sendEvent(&Result{Action: "update", Service: s}) + go m.sendEvent(®ister.Result{Action: "update", Service: s}) return nil } @@ -260,7 +270,7 @@ func (m *memory) Deregister(ctx context.Context, s *Service, opts ...DeregisterO // register and exit if len(versions) == 1 { delete(m.records[options.Domain], s.Name) - go m.sendEvent(&Result{Action: "delete", Service: s}) + go m.sendEvent(®ister.Result{Action: "delete", Service: s}) if m.opts.Logger.V(logger.DebugLevel) { m.opts.Logger.Debug(m.opts.Context, "register removed service: "+s.Name) @@ -270,7 +280,7 @@ func (m *memory) Deregister(ctx context.Context, s *Service, opts ...DeregisterO // there are other versions of the service running, so only remove this version of it delete(m.records[options.Domain][s.Name], s.Version) - go m.sendEvent(&Result{Action: "delete", Service: s}) + go m.sendEvent(®ister.Result{Action: "delete", Service: s}) if m.opts.Logger.V(logger.DebugLevel) { m.opts.Logger.Debug(m.opts.Context, "register removed service: "+s.Name+", version "+s.Version) } @@ -278,20 +288,20 @@ func (m *memory) Deregister(ctx context.Context, s *Service, opts ...DeregisterO return nil } -func (m *memory) LookupService(ctx context.Context, name string, opts ...LookupOption) ([]*Service, error) { - options := NewLookupOptions(opts...) +func (m *memory) LookupService(ctx context.Context, name string, opts ...register.LookupOption) ([]*register.Service, error) { + options := register.NewLookupOptions(opts...) // if it's a wildcard domain, return from all domains - if options.Domain == WildcardDomain { + if options.Domain == register.WildcardDomain { m.RLock() recs := m.records m.RUnlock() - var services []*Service + var services []*register.Service for domain := range recs { - srvs, err := m.LookupService(ctx, name, append(opts, LookupDomain(domain))...) - if err == ErrNotFound { + srvs, err := m.LookupService(ctx, name, append(opts, register.LookupDomain(domain))...) + if err == register.ErrNotFound { continue } else if err != nil { return nil, err @@ -300,7 +310,7 @@ func (m *memory) LookupService(ctx context.Context, name string, opts ...LookupO } if len(services) == 0 { - return nil, ErrNotFound + return nil, register.ErrNotFound } return services, nil } @@ -311,17 +321,17 @@ func (m *memory) LookupService(ctx context.Context, name string, opts ...LookupO // check the domain exists services, ok := m.records[options.Domain] if !ok { - return nil, ErrNotFound + return nil, register.ErrNotFound } // check the service exists versions, ok := services[name] if !ok || len(versions) == 0 { - return nil, ErrNotFound + return nil, register.ErrNotFound } // serialize the response - result := make([]*Service, len(versions)) + result := make([]*register.Service, len(versions)) var i int @@ -333,19 +343,19 @@ func (m *memory) LookupService(ctx context.Context, name string, opts ...LookupO return result, nil } -func (m *memory) ListServices(ctx context.Context, opts ...ListOption) ([]*Service, error) { - options := NewListOptions(opts...) +func (m *memory) ListServices(ctx context.Context, opts ...register.ListOption) ([]*register.Service, error) { + options := register.NewListOptions(opts...) // if it's a wildcard domain, list from all domains - if options.Domain == WildcardDomain { + if options.Domain == register.WildcardDomain { m.RLock() recs := m.records m.RUnlock() - var services []*Service + var services []*register.Service for domain := range recs { - srvs, err := m.ListServices(ctx, append(opts, ListDomain(domain))...) + srvs, err := m.ListServices(ctx, append(opts, register.ListDomain(domain))...) if err != nil { return nil, err } @@ -361,11 +371,11 @@ func (m *memory) ListServices(ctx context.Context, opts ...ListOption) ([]*Servi // ensure the domain exists services, ok := m.records[options.Domain] if !ok { - return make([]*Service, 0), nil + return make([]*register.Service, 0), nil } // serialize the result, each version counts as an individual service - var result []*Service + var result []*register.Service for _, service := range services { for _, version := range service { @@ -376,16 +386,16 @@ func (m *memory) ListServices(ctx context.Context, opts ...ListOption) ([]*Servi return result, nil } -func (m *memory) Watch(ctx context.Context, opts ...WatchOption) (Watcher, error) { +func (m *memory) Watch(ctx context.Context, opts ...register.WatchOption) (register.Watcher, error) { id, err := id.New() if err != nil { return nil, err } - wo := NewWatchOptions(opts...) + wo := register.NewWatchOptions(opts...) // construct the watcher w := &watcher{ exit: make(chan bool), - res: make(chan *Result), + res: make(chan *register.Result), id: id, wo: wo, } @@ -406,13 +416,13 @@ func (m *memory) String() string { } type watcher struct { - res chan *Result + res chan *register.Result exit chan bool - wo WatchOptions + wo register.WatchOptions id string } -func (m *watcher) Next() (*Result, error) { +func (m *watcher) Next() (*register.Result, error) { for { select { case r := <-m.res: @@ -424,20 +434,28 @@ func (m *watcher) Next() (*Result, error) { continue } + if m.wo.Domain == register.WildcardDomain { + return r, nil + } + + if r.Service.Metadata == nil { + continue + } + // extract domain from service metadata var domain string - if r.Service.Metadata != nil && len(r.Service.Metadata["domain"]) > 0 { - domain = r.Service.Metadata["domain"] + if v, ok := r.Service.Metadata.Get("domain"); ok && v != "" { + domain = v } else { - domain = DefaultDomain + domain = register.DefaultDomain } // only send the event if watching the wildcard or this specific domain - if m.wo.Domain == WildcardDomain || m.wo.Domain == domain { + if m.wo.Domain == domain { return r, nil } case <-m.exit: - return nil, ErrWatcherStopped + return nil, register.ErrWatcherStopped } } } @@ -451,11 +469,8 @@ func (m *watcher) Stop() { } } -func serviceToRecord(s *Service, ttl time.Duration) *record { - metadata := make(map[string]string, len(s.Metadata)) - for k, v := range s.Metadata { - metadata[k] = v - } +func serviceToRecord(s *register.Service, ttl time.Duration) *record { + metadata := metadata.Copy(s.Metadata) nodes := make(map[string]*node, len(s.Nodes)) for _, n := range s.Nodes { @@ -466,10 +481,8 @@ func serviceToRecord(s *Service, ttl time.Duration) *record { } } - endpoints := make([]*Endpoint, len(s.Endpoints)) - for i, e := range s.Endpoints { // TODO: vtolstov use copy - endpoints[i] = e - } + endpoints := make([]*register.Endpoint, len(s.Endpoints)) + copy(endpoints, s.Endpoints) return &record{ Name: s.Name, @@ -480,23 +493,14 @@ func serviceToRecord(s *Service, ttl time.Duration) *record { } } -func recordToService(r *record, domain string) *Service { - metadata := make(map[string]string, len(r.Metadata)) - for k, v := range r.Metadata { - metadata[k] = v - } - - // set the domain in metadata so it can be determined when a wildcard query is performed - metadata["domain"] = domain - - endpoints := make([]*Endpoint, len(r.Endpoints)) +func recordToService(r *record, domain string) *register.Service { + endpoints := make([]*register.Endpoint, len(r.Endpoints)) for i, e := range r.Endpoints { - md := make(map[string]string, len(e.Metadata)) - for k, v := range e.Metadata { - md[k] = v - } + md := metadata.Copy(e.Metadata) + // set the domain in metadata so it can be determined when a wildcard query is performed + md.Set("domain", domain) - endpoints[i] = &Endpoint{ + endpoints[i] = ®ister.Endpoint{ Name: e.Name, Request: e.Request, Response: e.Response, @@ -504,26 +508,21 @@ func recordToService(r *record, domain string) *Service { } } - nodes := make([]*Node, len(r.Nodes)) + nodes := make([]*register.Node, len(r.Nodes)) i := 0 for _, n := range r.Nodes { - md := make(map[string]string, len(n.Metadata)) - for k, v := range n.Metadata { - md[k] = v - } - - nodes[i] = &Node{ + nodes[i] = ®ister.Node{ ID: n.ID, Address: n.Address, - Metadata: md, + Metadata: metadata.Copy(n.Metadata), } i++ } - return &Service{ + return ®ister.Service{ Name: r.Name, Version: r.Version, - Metadata: metadata, + Metadata: metadata.Copy(r.Metadata), Endpoints: endpoints, Nodes: nodes, } diff --git a/register/memory_test.go b/register/memory/memory_test.go similarity index 72% rename from register/memory_test.go rename to register/memory/memory_test.go index 75b2c798..c60b9adf 100644 --- a/register/memory_test.go +++ b/register/memory/memory_test.go @@ -1,19 +1,23 @@ -package register +package memory import ( "context" "fmt" + "reflect" "sync" "testing" "time" + + "go.unistack.org/micro/v4" + "go.unistack.org/micro/v4/register" ) -var testData = map[string][]*Service{ +var testData = map[string][]*register.Service{ "foo": { { Name: "foo", Version: "1.0.0", - Nodes: []*Node{ + Nodes: []*register.Node{ { ID: "foo-1.0.0-123", Address: "localhost:9999", @@ -27,7 +31,7 @@ var testData = map[string][]*Service{ { Name: "foo", Version: "1.0.1", - Nodes: []*Node{ + Nodes: []*register.Node{ { ID: "foo-1.0.1-321", Address: "localhost:6666", @@ -37,7 +41,7 @@ var testData = map[string][]*Service{ { Name: "foo", Version: "1.0.3", - Nodes: []*Node{ + Nodes: []*register.Node{ { ID: "foo-1.0.3-345", Address: "localhost:8888", @@ -49,7 +53,7 @@ var testData = map[string][]*Service{ { Name: "bar", Version: "default", - Nodes: []*Node{ + Nodes: []*register.Node{ { ID: "bar-1.0.0-123", Address: "localhost:9999", @@ -63,7 +67,7 @@ var testData = map[string][]*Service{ { Name: "bar", Version: "latest", - Nodes: []*Node{ + Nodes: []*register.Node{ { ID: "bar-1.0.1-321", Address: "localhost:6666", @@ -78,7 +82,7 @@ func TestMemoryRegistry(t *testing.T) { ctx := context.TODO() m := NewRegister() - fn := func(k string, v []*Service) { + fn := func(k string, v []*register.Service) { services, err := m.LookupService(ctx, k) if err != nil { t.Errorf("Unexpected error getting service %s: %v", k, err) @@ -155,8 +159,8 @@ func TestMemoryRegistry(t *testing.T) { for _, v := range testData { for _, service := range v { services, err := m.LookupService(ctx, service.Name) - if err != ErrNotFound { - t.Errorf("Expected error: %v, got: %v", ErrNotFound, err) + if err != register.ErrNotFound { + t.Errorf("Expected error: %v, got: %v", register.ErrNotFound, err) } if len(services) != 0 { t.Errorf("Expected %d services for %s, got %d", 0, service.Name, len(services)) @@ -171,7 +175,7 @@ func TestMemoryRegistryTTL(t *testing.T) { for _, v := range testData { for _, service := range v { - if err := m.Register(ctx, service, RegisterTTL(time.Millisecond)); err != nil { + if err := m.Register(ctx, service, register.RegisterTTL(time.Millisecond)); err != nil { t.Fatal(err) } } @@ -200,15 +204,15 @@ func TestMemoryRegistryTTLConcurrent(t *testing.T) { ctx := context.TODO() for _, v := range testData { for _, service := range v { - if err := m.Register(ctx, service, RegisterTTL(waitTime/2)); err != nil { + if err := m.Register(ctx, service, register.RegisterTTL(waitTime/2)); err != nil { t.Fatal(err) } } } - //if len(os.Getenv("IN_TRAVIS_CI")) == 0 { + // if len(os.Getenv("IN_TRAVIS_CI")) == 0 { // t.Logf("test will wait %v, then check TTL timeouts", waitTime) - //} + // } errChan := make(chan error, concurrency) syncChan := make(chan struct{}) @@ -249,34 +253,41 @@ func TestMemoryWildcard(t *testing.T) { m := NewRegister() ctx := context.TODO() - testSrv := &Service{Name: "foo", Version: "1.0.0"} + if err := m.Init(); err != nil { + t.Fatal(err) + } - if err := m.Register(ctx, testSrv, RegisterDomain("one")); err != nil { + if err := m.Connect(ctx); err != nil { + t.Fatal(err) + } + testSrv := ®ister.Service{Name: "foo", Version: "1.0.0"} + + if err := m.Register(ctx, testSrv, register.RegisterDomain("one")); err != nil { t.Fatalf("Register err: %v", err) } - if err := m.Register(ctx, testSrv, RegisterDomain("two")); err != nil { + if err := m.Register(ctx, testSrv, register.RegisterDomain("two")); err != nil { t.Fatalf("Register err: %v", err) } - if recs, err := m.ListServices(ctx, ListDomain("one")); err != nil { + if recs, err := m.ListServices(ctx, register.ListDomain("one")); err != nil { t.Errorf("List err: %v", err) } else if len(recs) != 1 { t.Errorf("Expected 1 record, got %v", len(recs)) } - if recs, err := m.ListServices(ctx, ListDomain("*")); err != nil { + if recs, err := m.ListServices(ctx, register.ListDomain("*")); err != nil { t.Errorf("List err: %v", err) } else if len(recs) != 2 { t.Errorf("Expected 2 records, got %v", len(recs)) } - if recs, err := m.LookupService(ctx, testSrv.Name, LookupDomain("one")); err != nil { + if recs, err := m.LookupService(ctx, testSrv.Name, register.LookupDomain("one")); err != nil { t.Errorf("Lookup err: %v", err) } else if len(recs) != 1 { t.Errorf("Expected 1 record, got %v", len(recs)) } - if recs, err := m.LookupService(ctx, testSrv.Name, LookupDomain("*")); err != nil { + if recs, err := m.LookupService(ctx, testSrv.Name, register.LookupDomain("*")); err != nil { t.Errorf("Lookup err: %v", err) } else if len(recs) != 2 { t.Errorf("Expected 2 records, got %v", len(recs)) @@ -284,12 +295,16 @@ func TestMemoryWildcard(t *testing.T) { } func TestWatcher(t *testing.T) { - testSrv := &Service{Name: "foo", Version: "1.0.0"} + testSrv := ®ister.Service{Name: "foo", Version: "1.0.0"} ctx := context.TODO() m := NewRegister() - m.Init() - m.Connect(ctx) + if err := m.Init(); err != nil { + t.Fatal(err) + } + if err := m.Connect(ctx); err != nil { + t.Fatal(err) + } wc, err := m.Watch(ctx) if err != nil { t.Fatalf("cant watch: %v", err) @@ -320,3 +335,37 @@ func TestWatcher(t *testing.T) { t.Fatal("expected error on Next()") } } + +func Test_service_Register(t *testing.T) { + t.Skip() + r := NewRegister() + + type args struct { + names []string + } + tests := []struct { + name string + opts []micro.Option + args args + want register.Register + }{ + { + name: "service.Register", + opts: []micro.Option{micro.Register(r)}, + args: args{ + names: []string{"memory"}, + }, + want: r, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := micro.NewService(tt.opts...) + + if got := s.Register(tt.args.names...); !reflect.DeepEqual(got, tt.want) { + t.Errorf("service.Register() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/register/noop.go b/register/noop.go new file mode 100644 index 00000000..4fb692d3 --- /dev/null +++ b/register/noop.go @@ -0,0 +1,72 @@ +package register + +import "context" + +type noop struct { + opts Options +} + +func NewRegister(opts ...Option) Register { + return &noop{ + opts: NewOptions(opts...), + } +} + +func (n *noop) Name() string { + return n.opts.Name +} + +func (n *noop) Init(opts ...Option) error { + for _, o := range opts { + o(&n.opts) + } + return nil +} + +func (n *noop) Options() Options { + return n.opts +} + +func (n *noop) Connect(ctx context.Context) error { + return nil +} + +func (n *noop) Disconnect(ctx context.Context) error { + return nil +} + +func (n *noop) Register(ctx context.Context, service *Service, option ...RegisterOption) error { + return nil +} + +func (n *noop) Deregister(ctx context.Context, service *Service, option ...DeregisterOption) error { + return nil +} + +func (n *noop) LookupService(ctx context.Context, s string, option ...LookupOption) ([]*Service, error) { + return nil, nil +} + +func (n *noop) ListServices(ctx context.Context, option ...ListOption) ([]*Service, error) { + return nil, nil +} + +func (n *noop) Watch(ctx context.Context, opts ...WatchOption) (Watcher, error) { + wOpts := NewWatchOptions(opts...) + + return &watcher{wo: wOpts}, nil +} + +func (n *noop) String() string { + return "noop" +} + +type watcher struct { + wo WatchOptions +} + +func (m *watcher) Next() (*Result, error) { + return nil, nil +} + +func (m *watcher) Stop() {} diff --git a/register/register.go b/register/register.go index 74526544..c06ccc9f 100644 --- a/register/register.go +++ b/register/register.go @@ -4,7 +4,6 @@ package register // import "go.unistack.org/micro/v4/register" import ( "context" "errors" - "go.unistack.org/micro/v4/metadata" ) diff --git a/semconv/broker.go b/semconv/broker.go index 9b61b523..8c51a8f2 100644 --- a/semconv/broker.go +++ b/semconv/broker.go @@ -17,4 +17,6 @@ var ( SubscribeMessageTotal = "subscribe_message_total" // SubscribeMessageInflight specifies meter metric name SubscribeMessageInflight = "subscribe_message_inflight" + // BrokerGroupLag specifies broker lag + BrokerGroupLag = "broker_lag" ) diff --git a/semconv/cache.go b/semconv/cache.go new file mode 100644 index 00000000..f12739c1 --- /dev/null +++ b/semconv/cache.go @@ -0,0 +1,12 @@ +package semconv + +var ( + // CacheRequestDurationSeconds specifies meter metric name + CacheRequestDurationSeconds = "cache_request_duration_seconds" + // ClientRequestLatencyMicroseconds specifies meter metric name + CacheRequestLatencyMicroseconds = "cache_request_latency_microseconds" + // CacheRequestTotal specifies meter metric name + CacheRequestTotal = "cache_request_total" + // CacheRequestInflight specifies meter metric name + CacheRequestInflight = "cache_request_inflight" +) diff --git a/server/noop.go b/server/noop.go index 9b3355b9..fed12e07 100644 --- a/server/noop.go +++ b/server/noop.go @@ -112,8 +112,8 @@ func (n *noopServer) Register() error { } n.RUnlock() - service.Nodes[0].Metadata["protocol"] = "noop" - service.Nodes[0].Metadata["transport"] = service.Nodes[0].Metadata["protocol"] + service.Nodes[0].Metadata.Set("protocol", "noop") + service.Nodes[0].Metadata.Set("transport", "noop") service.Endpoints = endpoints n.RLock() diff --git a/server/options.go b/server/options.go index 143f7eeb..ca61d9d7 100644 --- a/server/options.go +++ b/server/options.go @@ -13,6 +13,7 @@ import ( "go.unistack.org/micro/v4/meter" "go.unistack.org/micro/v4/options" "go.unistack.org/micro/v4/register" + msync "go.unistack.org/micro/v4/sync" "go.unistack.org/micro/v4/tracer" "go.unistack.org/micro/v4/util/id" ) @@ -32,7 +33,7 @@ type Options struct { // Listener may be passed if already created Listener net.Listener // Wait group - Wait *sync.WaitGroup + Wait *msync.WaitGroup // TLSConfig specifies tls.Config for secure serving TLSConfig *tls.Config // Metadata holds the server metadata @@ -65,6 +66,8 @@ type Options struct { DeregisterAttempts int // Hooks may contains HandleWrapper or Server func wrapper Hooks options.Hooks + // GracefulTimeout timeout for graceful stop server + GracefulTimeout time.Duration } // NewOptions returns new options struct with default or passed values @@ -84,6 +87,7 @@ func NewOptions(opts ...options.Option) Options { Name: DefaultName, Version: DefaultVersion, ID: id.Must(), + GracefulTimeout: DefaultGracefulTimeout, } for _, o := range opts { @@ -143,8 +147,11 @@ func Wait(wg *sync.WaitGroup) options.Option { if wg == nil { wg = new(sync.WaitGroup) } + + wrap := msync.WrapWaitGroup(wg) + return func(src interface{}) error { - return options.Set(src, wg, ".Wait") + return options.Set(src, wrap, ".Wait") } } @@ -162,6 +169,12 @@ func Listener(nl net.Listener) options.Option { } } +func GracefulTimeout(td time.Duration) options.Option { + return func(src interface{}) error { + return options.Set(src, td, ".GracefulTimeout") + } +} + // HandleOptions struct type HandleOptions struct { // Context holds external options diff --git a/server/registry.go b/server/registry.go index 0d247dab..17fd01ff 100644 --- a/server/registry.go +++ b/server/registry.go @@ -77,8 +77,8 @@ func NewRegisterService(s Server) (*register.Service, error) { } node.Metadata = metadata.Copy(opts.Metadata) - node.Metadata["server"] = s.String() - node.Metadata["register"] = opts.Register.String() + node.Metadata.Set("server", s.String()) + node.Metadata.Set("register", opts.Register.String()) return ®ister.Service{ Name: opts.Name, diff --git a/server/server.go b/server/server.go index ffeb3c70..c5d46146 100644 --- a/server/server.go +++ b/server/server.go @@ -32,6 +32,8 @@ var ( DefaultMaxMsgRecvSize = 1024 * 1024 * 4 // 4Mb // DefaultMaxMsgSendSize holds default max send size DefaultMaxMsgSendSize = 1024 * 1024 * 4 // 4Mb + // DefaultGracefulTimeout default time for graceful stop + DefaultGracefulTimeout = 5 * time.Second ) // Server is a simple micro server abstraction diff --git a/service_test.go b/service_test.go index 582318da..08a676fe 100644 --- a/service_test.go +++ b/service_test.go @@ -1,6 +1,7 @@ package micro import ( + "go.unistack.org/micro/v4/register/memory" "reflect" "testing" @@ -425,7 +426,7 @@ func Test_service_Store(t *testing.T) { } func Test_service_Register(t *testing.T) { - r := register.NewRegister() + r := memory.NewRegister() type fields struct { opts Options } @@ -444,7 +445,7 @@ func Test_service_Register(t *testing.T) { opts: Options{Registers: []register.Register{r}}, }, args: args{ - names: []string{"noop"}, + names: []string{"memory"}, }, want: r, }, diff --git a/sync/waitgroup.go b/sync/waitgroup.go new file mode 100644 index 00000000..3124d948 --- /dev/null +++ b/sync/waitgroup.go @@ -0,0 +1,69 @@ +package sync + +import ( + "context" + "sync" +) + +type WaitGroup struct { + wg *sync.WaitGroup + c int + mu sync.Mutex +} + +func WrapWaitGroup(wg *sync.WaitGroup) *WaitGroup { + g := &WaitGroup{ + wg: wg, + } + return g +} + +func NewWaitGroup() *WaitGroup { + var wg sync.WaitGroup + return WrapWaitGroup(&wg) +} + +func (g *WaitGroup) Add(n int) { + g.mu.Lock() + g.c += n + g.wg.Add(n) + g.mu.Unlock() +} + +func (g *WaitGroup) Done() { + g.mu.Lock() + g.c += -1 + g.wg.Add(-1) + g.mu.Unlock() +} + +func (g *WaitGroup) Wait() { + g.wg.Wait() +} + +func (g *WaitGroup) WaitContext(ctx context.Context) { + done := make(chan struct{}) + go func() { + g.wg.Wait() + close(done) + }() + + select { + case <-ctx.Done(): + g.mu.Lock() + g.wg.Add(-g.c) + <-done + g.wg.Add(g.c) + g.mu.Unlock() + return + case <-done: + return + } +} + +func (g *WaitGroup) Waiters() int { + g.mu.Lock() + c := g.c + g.mu.Unlock() + return c +} diff --git a/sync/waitgroup_test.go b/sync/waitgroup_test.go new file mode 100644 index 00000000..c3f6f1b7 --- /dev/null +++ b/sync/waitgroup_test.go @@ -0,0 +1,37 @@ +package sync + +import ( + "context" + "testing" + "time" +) + +func TestWaitGroupContext(t *testing.T) { + wg := NewWaitGroup() + _ = t + wg.Add(1) + ctx, cancel := context.WithTimeout(context.TODO(), 1*time.Second) + defer cancel() + wg.WaitContext(ctx) +} + +func TestWaitGroupReuse(t *testing.T) { + wg := NewWaitGroup() + defer func() { + if wg.Waiters() != 0 { + t.Fatal("lost goroutines") + } + }() + + wg.Add(1) + defer wg.Done() + ctx, cancel := context.WithTimeout(context.TODO(), 1*time.Second) + defer cancel() + wg.WaitContext(ctx) + + wg.Add(1) + defer wg.Done() + ctx, cancel = context.WithTimeout(context.TODO(), 1*time.Second) + defer cancel() + wg.WaitContext(ctx) +} diff --git a/tracer/memory/memory.go b/tracer/memory/memory.go new file mode 100644 index 00000000..c98e7413 --- /dev/null +++ b/tracer/memory/memory.go @@ -0,0 +1,143 @@ +package memory + +import ( + "context" + "time" + + "go.unistack.org/micro/v4/options" + "go.unistack.org/micro/v4/tracer" + "go.unistack.org/micro/v4/util/id" +) + +var _ tracer.Tracer = (*Tracer)(nil) + +type Tracer struct { + opts tracer.Options + spans []tracer.Span +} + +func (t *Tracer) Spans() []tracer.Span { + return t.spans +} + +func (t *Tracer) Start(ctx context.Context, name string, opts ...options.Option) (context.Context, tracer.Span) { + options := tracer.NewSpanOptions(opts...) + span := &Span{ + name: name, + ctx: ctx, + tracer: t, + kind: options.Kind, + startTime: time.Now(), + } + span.spanID.s, _ = id.New() + span.traceID.s, _ = id.New() + if span.ctx == nil { + span.ctx = context.Background() + } + t.spans = append(t.spans, span) + return tracer.NewSpanContext(ctx, span), span +} + +func (t *Tracer) Flush(_ context.Context) error { + return nil +} + +func (t *Tracer) Init(opts ...options.Option) error { + var err error + for _, o := range opts { + if err = o(&t.opts); err != nil { + return err + } + } + return nil +} + +func (t *Tracer) Name() string { + return t.opts.Name +} + +type noopStringer struct { + s string +} + +func (s noopStringer) String() string { + return s.s +} + +type Span struct { + ctx context.Context + tracer tracer.Tracer + name string + statusMsg string + startTime time.Time + finishTime time.Time + traceID noopStringer + spanID noopStringer + events []*Event + labels []interface{} + logs []interface{} + kind tracer.SpanKind + status tracer.SpanStatus +} + +func (s *Span) Finish(opts ...options.Option) { + s.finishTime = time.Now() +} + +func (s *Span) Context() context.Context { + return s.ctx +} + +func (s *Span) Tracer() tracer.Tracer { + return s.tracer +} + +type Event struct { + name string + labels []interface{} +} + +func (s *Span) AddEvent(name string, opts ...options.Option) { + options := tracer.NewEventOptions(opts...) + s.events = append(s.events, &Event{name: name, labels: options.Labels}) +} + +func (s *Span) SetName(name string) { + s.name = name +} + +func (s *Span) AddLogs(kv ...interface{}) { + s.logs = append(s.logs, kv...) +} + +func (s *Span) AddLabels(kv ...interface{}) { + s.labels = append(s.labels, kv...) +} + +func (s *Span) Kind() tracer.SpanKind { + return s.kind +} + +func (s *Span) TraceID() string { + return s.traceID.String() +} + +func (s *Span) SpanID() string { + return s.spanID.String() +} + +func (s *Span) Status() (tracer.SpanStatus, string) { + return s.status, s.statusMsg +} + +func (s *Span) SetStatus(st tracer.SpanStatus, msg string) { + s.status = st + s.statusMsg = msg +} + +// NewTracer returns new memory tracer +func NewTracer(opts ...options.Option) *Tracer { + return &Tracer{ + opts: tracer.NewOptions(opts...), + } +} diff --git a/tracer/memory/memory_test.go b/tracer/memory/memory_test.go new file mode 100644 index 00000000..e00b532e --- /dev/null +++ b/tracer/memory/memory_test.go @@ -0,0 +1,38 @@ +package memory + +import ( + "bytes" + "context" + "fmt" + "strings" + "testing" + + "go.unistack.org/micro/v4/logger" + "go.unistack.org/micro/v4/logger/slog" + "go.unistack.org/micro/v4/tracer" +) + +func TestLoggerWithTracer(t *testing.T) { + ctx := context.TODO() + buf := bytes.NewBuffer(nil) + logger.DefaultLogger = slog.NewLogger(logger.WithOutput(buf)) + + if err := logger.Init(); err != nil { + t.Fatal(err) + } + var span tracer.Span + tr := NewTracer() + ctx, span = tr.Start(ctx, "test1") + + logger.Error(ctx, "my test error", fmt.Errorf("error")) + + if !strings.Contains(buf.String(), span.TraceID()) { + t.Fatalf("log does not contains trace id: %s", buf.Bytes()) + } + + _, _ = tr.Start(ctx, "test2") + + for _, s := range tr.Spans() { + _ = s + } +} diff --git a/tracer/noop.go b/tracer/noop.go index c2b91c66..d9655177 100644 --- a/tracer/noop.go +++ b/tracer/noop.go @@ -24,7 +24,6 @@ func (t *noopTracer) Start(ctx context.Context, name string, opts ...options.Opt name: name, ctx: ctx, tracer: t, - labels: options.Labels, kind: options.Kind, } span.spanID.s, _ = id.New() @@ -36,7 +35,7 @@ func (t *noopTracer) Start(ctx context.Context, name string, opts ...options.Opt return NewSpanContext(ctx, span), span } -func (t *noopTracer) Flush(ctx context.Context) error { +func (t *noopTracer) Flush(_ context.Context) error { return nil } @@ -51,11 +50,6 @@ func (t *noopTracer) Name() string { return t.opts.Name } -type noopEvent struct { - name string - labels []interface{} -} - type noopStringer struct { s string } @@ -71,14 +65,11 @@ type noopSpan struct { statusMsg string traceID noopStringer spanID noopStringer - events []*noopEvent - labels []interface{} - logs []interface{} kind SpanKind status SpanStatus } -func (s *noopSpan) Finish(opts ...options.Option) { +func (s *noopSpan) Finish(_ ...options.Option) { } func (s *noopSpan) Context() context.Context { @@ -89,21 +80,17 @@ func (s *noopSpan) Tracer() Tracer { return s.tracer } -func (s *noopSpan) AddEvent(name string, opts ...options.Option) { - options := NewEventOptions(opts...) - s.events = append(s.events, &noopEvent{name: name, labels: options.Labels}) +func (s *noopSpan) AddEvent(_ string, _ ...options.Option) { } func (s *noopSpan) SetName(name string) { s.name = name } -func (s *noopSpan) AddLogs(kv ...interface{}) { - s.logs = append(s.logs, kv...) +func (s *noopSpan) AddLogs(_ ...interface{}) { } -func (s *noopSpan) AddLabels(kv ...interface{}) { - s.labels = append(s.labels, kv...) +func (s *noopSpan) AddLabels(_ ...interface{}) { } func (s *noopSpan) Kind() SpanKind { diff --git a/tracer/options.go b/tracer/options.go index dd185ce7..666f6f7d 100644 --- a/tracer/options.go +++ b/tracer/options.go @@ -171,7 +171,8 @@ func NewEventOptions(opts ...options.Option) EventOptions { // NewOptions returns default options func NewOptions(opts ...options.Option) Options { options := Options{ - Logger: logger.DefaultLogger, + Logger: logger.DefaultLogger, + Context: context.Background(), } for _, o := range opts { o(&options) diff --git a/tracer/tracer.go b/tracer/tracer.go index db61567a..e2634305 100644 --- a/tracer/tracer.go +++ b/tracer/tracer.go @@ -1,10 +1,8 @@ // Package tracer provides an interface for distributed tracing -package tracer // import "go.unistack.org/micro/v4/tracer" +package tracer import ( "context" - "fmt" - "sort" "go.unistack.org/micro/v4/logger" "go.unistack.org/micro/v4/options" @@ -13,6 +11,26 @@ import ( // DefaultTracer is the global default tracer var DefaultTracer = NewTracer() +var ( + // TraceIDKey is the key used for the trace id in the log call + TraceIDKey = "trace-id" + // SpanIDKey is the key used for the span id in the log call + SpanIDKey = "span-id" +) + +func init() { + logger.DefaultContextAttrFuncs = append(logger.DefaultContextAttrFuncs, + func(ctx context.Context) []interface{} { + if span, ok := SpanFromContext(ctx); ok { + return []interface{}{ + TraceIDKey, span.TraceID(), + SpanIDKey, span.SpanID(), + } + } + return nil + }) +} + // Tracer is an interface for distributed tracing type Tracer interface { // Name return tracer name @@ -51,46 +69,3 @@ type Span interface { // SpanID returns span id SpanID() string } - -func init() { - logger.DefaultContextAttrFuncs = append(logger.DefaultContextAttrFuncs, func(ctx context.Context) []interface{} { - span, ok := SpanFromContext(ctx) - if !ok || span == nil { - return nil - } - return []interface{}{"trace", span.TraceID(), "span", span.SpanID()} - }) -} - -// sort labels alphabeticaly by label name -type byKey []interface{} - -func (k byKey) Len() int { return len(k) / 2 } -func (k byKey) Less(i, j int) bool { return fmt.Sprintf("%s", k[i*2]) < fmt.Sprintf("%s", k[j*2]) } -func (k byKey) Swap(i, j int) { - k[i*2], k[j*2] = k[j*2], k[i*2] - k[i*2+1], k[j*2+1] = k[j*2+1], k[i*2+1] -} - -func UniqLabels(labels []interface{}) []interface{} { - if len(labels)%2 == 1 { - labels = labels[:len(labels)-1] - } - if len(labels) > 2 { - sort.Sort(byKey(labels)) - - idx := 0 - for { - if labels[idx] == labels[idx+2] { - copy(labels[idx:], labels[idx+2:]) - labels = labels[:len(labels)-2] - } else { - idx += 2 - } - if idx+2 >= len(labels) { - break - } - } - } - return labels -} diff --git a/util/reflect/reflect.go b/util/reflect/reflect.go index 7d34b068..17a63c2f 100644 --- a/util/reflect/reflect.go +++ b/util/reflect/reflect.go @@ -1,6 +1,7 @@ -package reflect // import "go.unistack.org/micro/v4/util/reflect" +package reflect import ( + "encoding/json" "errors" "fmt" "reflect" @@ -45,15 +46,23 @@ func SliceAppend(b bool) Option { // Merge merges map[string]interface{} to destination struct func Merge(dst interface{}, mp map[string]interface{}, opts ...Option) error { - var err error - var sval reflect.Value - var fname string - options := Options{} for _, o := range opts { o(&options) } + if unmarshaler, ok := dst.(json.Unmarshaler); ok { + buf, err := json.Marshal(mp) + if err == nil { + err = unmarshaler.UnmarshalJSON(buf) + } + return err + } + + var err error + var sval reflect.Value + var fname string + dviface := reflect.ValueOf(dst) if dviface.Kind() == reflect.Ptr { dviface = dviface.Elem() @@ -532,6 +541,9 @@ func Equal(src interface{}, dst interface{}, excptFields ...string) bool { } s := srcVal.MapIndex(key) d := dstVal.MapIndex(key) + if !s.IsValid() || !d.IsValid() { + return false + } if !Equal(s.Interface(), d.Interface(), excptFields...) { return false } diff --git a/util/register/util.go b/util/register/util.go index c235920b..a73e7f19 100644 --- a/util/register/util.go +++ b/util/register/util.go @@ -109,12 +109,11 @@ func Merge(olist []*register.Service, nlist []*register.Service) []*register.Ser seen = true srv = append(srv, sp) break - } else { - sp := ®ister.Service{} - // make copy - *sp = *o - srv = append(srv, sp) } + sp := ®ister.Service{} + // make copy + *sp = *o + srv = append(srv, sp) } if !seen { srv = append(srv, Copy([]*register.Service{n})...) diff --git a/util/sort/sort.go b/util/sort/sort.go new file mode 100644 index 00000000..d80794b1 --- /dev/null +++ b/util/sort/sort.go @@ -0,0 +1,40 @@ +package sort + +import ( + "fmt" + "sort" +) + +// sort labels alphabeticaly by label name +type byKey []interface{} + +func (k byKey) Len() int { return len(k) / 2 } +func (k byKey) Less(i, j int) bool { return fmt.Sprintf("%s", k[i*2]) < fmt.Sprintf("%s", k[j*2]) } +func (k byKey) Swap(i, j int) { + k[i*2], k[j*2] = k[j*2], k[i*2] + k[i*2+1], k[j*2+1] = k[j*2+1], k[i*2+1] +} + +func Uniq(labels []interface{}) []interface{} { + if len(labels)%2 == 1 { + labels = labels[:len(labels)-1] + } + + if len(labels) > 2 { + sort.Sort(byKey(labels)) + + idx := 0 + for { + if labels[idx] == labels[idx+2] { + copy(labels[idx:], labels[idx+2:]) + labels = labels[:len(labels)-2] + } else { + idx += 2 + } + if idx+2 >= len(labels) { + break + } + } + } + return labels +} diff --git a/util/structfs/metadata_ec2.go b/util/structfs/metadata_ec2.go index 6c0f63aa..07be6e42 100644 --- a/util/structfs/metadata_ec2.go +++ b/util/structfs/metadata_ec2.go @@ -12,7 +12,7 @@ type EC2Metadata struct { InstanceType string `json:"instance-type"` LocalHostname string `json:"local-hostname"` LocalIPv4 string `json:"local-ipv4"` - kernelID int `json:"kernel-id"` + KernelID int `json:"kernel-id"` Placement string `json:"placement"` AvailabilityZone string `json:"availability-zone"` ProductCodes string `json:"product-codes"` diff --git a/util/structfs/structfs.go b/util/structfs/structfs.go index 755c691c..740b3014 100644 --- a/util/structfs/structfs.go +++ b/util/structfs/structfs.go @@ -67,9 +67,9 @@ func (fi *fileInfo) Name() string { func (fi *fileInfo) Mode() os.FileMode { if strings.HasSuffix(fi.name, "/") { - return os.FileMode(0755) | os.ModeDir + return os.FileMode(0o755) | os.ModeDir } - return os.FileMode(0644) + return os.FileMode(0o644) } func (fi *fileInfo) IsDir() bool { @@ -112,15 +112,14 @@ func (f *file) Readdir(count int) ([]os.FileInfo, error) { func (f *file) Seek(offset int64, whence int) (int64, error) { // log.Printf("seek %d %d %s\n", offset, whence, f.name) switch whence { - case os.SEEK_SET: + case io.SeekStart: f.offset = offset - case os.SEEK_CUR: + case io.SeekCurrent: f.offset += offset - case os.SEEK_END: + case io.SeekEnd: f.offset = int64(len(f.data)) + offset } return f.offset, nil - } func (f *file) Stat() (os.FileInfo, error) { diff --git a/util/structfs/structfs_test.go b/util/structfs/structfs_test.go index 7abf8edb..5c198fda 100644 --- a/util/structfs/structfs_test.go +++ b/util/structfs/structfs_test.go @@ -2,7 +2,7 @@ package structfs import ( "encoding/json" - "io/ioutil" + "io" "net/http" "reflect" "testing" @@ -82,7 +82,7 @@ func get(path string) ([]byte, error) { return nil, err } defer res.Body.Close() - return ioutil.ReadAll(res.Body) + return io.ReadAll(res.Body) } func TestAll(t *testing.T) { diff --git a/util/time/duration_test.go b/util/time/duration_test.go index 80b7289c..4172e059 100644 --- a/util/time/duration_test.go +++ b/util/time/duration_test.go @@ -35,7 +35,7 @@ func TestUnmarshalJSON(t *testing.T) { err = json.Unmarshal([]byte(`{"ttl":"1y"}`), v) if err != nil { t.Fatal(err) - } else if v.TTL != 31536000000000000 { + } else if v.TTL != 31622400000000000 { t.Fatalf("invalid duration %v != 31536000000000000", v.TTL) } } @@ -55,7 +55,7 @@ func TestParseDuration(t *testing.T) { if err != nil { t.Fatalf("ParseDuration error: %v", err) } - if td.String() != "8760h0m0s" { + if td.String() != "8784h0m0s" { t.Fatalf("ParseDuration 1y != 8760h0m0s : %s", td.String()) } } diff --git a/util/xpool/pool.go b/util/xpool/pool.go new file mode 100644 index 00000000..1ffe4293 --- /dev/null +++ b/util/xpool/pool.go @@ -0,0 +1,25 @@ +package pool + +import "sync" + +type Pool[T any] struct { + p *sync.Pool +} + +func NewPool[T any](fn func() T) Pool[T] { + return Pool[T]{ + p: &sync.Pool{ + New: func() interface{} { + return fn() + }, + }, + } +} + +func (p Pool[T]) Get() T { + return p.p.Get().(T) +} + +func (p Pool[T]) Put(t T) { + p.p.Put(t) +} diff --git a/util/xpool/pool_test.go b/util/xpool/pool_test.go new file mode 100644 index 00000000..8e7a9b81 --- /dev/null +++ b/util/xpool/pool_test.go @@ -0,0 +1,27 @@ +package pool + +import ( + "bytes" + "strings" + "testing" +) + +func TestBytes(t *testing.T) { + p := NewPool(func() *bytes.Buffer { return bytes.NewBuffer(nil) }) + b := p.Get() + b.Write([]byte(`test`)) + if b.String() != "test" { + t.Fatal("pool not works") + } + p.Put(b) +} + +func TestStrings(t *testing.T) { + p := NewPool(func() *strings.Builder { return &strings.Builder{} }) + b := p.Get() + b.Write([]byte(`test`)) + if b.String() != "test" { + t.Fatal("pool not works") + } + p.Put(b) +}