From f33dcd628adda7ed16d0ab7bb32802a07a1dd226 Mon Sep 17 00:00:00 2001 From: Evstigneev Denis Date: Sun, 24 Nov 2024 01:15:07 +0300 Subject: [PATCH] Add testcase && hook logger for server --- go.mod | 24 +++++--- go.sum | 67 ++++++++++++++--------- logger/slog/slog.go | 1 + logger/slog/slog_test.go | 26 +++++++++ server/hooks.go | 44 +++++++++++++++ server/noop_test.go | 115 ++++++++++++++++++++++++++++++++++++++- 6 files changed, 241 insertions(+), 36 deletions(-) create mode 100644 server/hooks.go diff --git a/go.mod b/go.mod index 9052eded..c5d4731b 100644 --- a/go.mod +++ b/go.mod @@ -6,14 +6,17 @@ require ( dario.cat/mergo v1.0.0 github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/KimMachineGun/automemlimit v0.6.1 - github.com/google/uuid v1.3.0 + github.com/google/uuid v1.6.0 github.com/patrickmn/go-cache v2.1.0+incompatible github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 go.uber.org/automaxprocs v1.6.0 + go.unistack.org/micro-broker-kgo/v3 v3.8.49 go.unistack.org/micro-proto/v3 v3.4.1 - golang.org/x/sync v0.3.0 - google.golang.org/grpc v1.57.0 - google.golang.org/protobuf v1.33.0 + go.unistack.org/micro-wrapper-requestid/v3 v3.9.2 + golang.org/x/sync v0.8.0 + google.golang.org/grpc v1.67.1 + google.golang.org/protobuf v1.35.2 + gopkg.in/yaml.v3 v3.0.1 ) require ( @@ -22,11 +25,16 @@ require ( github.com/coreos/go-systemd/v22 v22.3.2 // indirect github.com/docker/go-units v0.4.0 // indirect github.com/godbus/dbus/v5 v5.0.4 // indirect - github.com/golang/protobuf v1.5.3 // indirect + github.com/klauspost/compress v1.17.11 // indirect github.com/opencontainers/runtime-spec v1.0.2 // indirect github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect + github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/sirupsen/logrus v1.8.1 // indirect - golang.org/x/net v0.14.0 // indirect - golang.org/x/sys v0.11.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e // indirect + github.com/twmb/franz-go v1.18.0 // indirect + github.com/twmb/franz-go/pkg/kadm v1.14.0 // indirect + github.com/twmb/franz-go/pkg/kmsg v1.9.0 // indirect + go.opentelemetry.io/otel v1.32.0 // indirect + golang.org/x/crypto v0.29.0 // indirect + golang.org/x/sys v0.27.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20241118233622-e639e219e697 // indirect ) diff --git a/go.sum b/go.sum index 025a4af7..d5946ca2 100644 --- a/go.sum +++ b/go.sum @@ -18,14 +18,12 @@ github.com/frankban/quicktest v1.14.0 h1:+cqqvzZV87b4adx/5ayVOaYZ2CrvM4ejQvUdBzP github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09mUdL/ijj8og= github.com/godbus/dbus/v5 v5.0.4 h1:9349emZab16e7zQvpmsbtjc18ykshndd8y2PG3sgJbA= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= -github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= -github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= -github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= -github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -36,6 +34,8 @@ github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaR github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0= github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y= +github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= +github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= @@ -47,32 +47,45 @@ github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5/go.mod h1:7RTUFBdIRC9nZ7 github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= -github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/twmb/franz-go v1.18.0 h1:25FjMZfdozBywVX+5xrWC2W+W76i0xykKjTdEeD2ejw= +github.com/twmb/franz-go v1.18.0/go.mod h1:zXCGy74M0p5FbXsLeASdyvfLFsBvTubVqctIaa5wQ+I= +github.com/twmb/franz-go/pkg/kadm v1.14.0 h1:nAn1co1lXzJQocpzyIyOFOjUBf4WHWs5/fTprXy2IZs= +github.com/twmb/franz-go/pkg/kadm v1.14.0/go.mod h1:XjOPz6ZaXXjrW2jVCfLuucP8H1w2TvD6y3PT2M+aAM4= +github.com/twmb/franz-go/pkg/kmsg v1.9.0 h1:JojYUph2TKAau6SBtErXpXGC7E3gg4vGZMv9xFU/B6M= +github.com/twmb/franz-go/pkg/kmsg v1.9.0/go.mod h1:CMbfazviCyY6HM0SXuG5t9vOwYDHRCSrJJyBAe5paqg= +go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U= +go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg= go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= +go.unistack.org/micro-broker-kgo/v3 v3.8.49 h1:2yqhl4KcGRa9dhJqzPnmRAkOuw2qbD9Rmk2chJbXvqc= +go.unistack.org/micro-broker-kgo/v3 v3.8.49/go.mod h1:9KdCb9kkTq3AQsJosOaT//60DGGKx9NhV9leJkT0S8M= go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q= go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo= -golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14= -golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= -golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= -golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= +go.unistack.org/micro-wrapper-requestid/v3 v3.9.2 h1:Wu0oIGJieH37xroCjlaPivuqTGmv5l5KWgVaRQWEQMY= +go.unistack.org/micro-wrapper-requestid/v3 v3.9.2/go.mod h1:cqgjdSSASTnOLjvwndP9bi0b8DbuoKbDII884R5puwU= +golang.org/x/crypto v0.29.0 h1:L5SG1JTTXupVV3n6sUqMTeWbjAyfPwoda2DLX8J8FrQ= +golang.org/x/crypto v0.29.0/go.mod h1:+F4F4N5hv6v38hfeYwTdx20oUvLLc+QfrE9Ax9HtgRg= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= -golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc= -golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e h1:NumxXLPfHSndr3wBBdeKiVHjGVFzi9RX2HwwQke94iY= -google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= -google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw= -google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo= -google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= -google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= +golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.20.0 h1:gK/Kv2otX8gz+wn7Rmb3vT96ZwuoxnQlY+HlJVj7Qug= +golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241118233622-e639e219e697 h1:LWZqQOEjDyONlF1H6afSWpAL/znlREo2tHfLoe+8LMA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241118233622-e639e219e697/go.mod h1:5uTbfoYQed2U9p3KIj2/Zzm02PYhndfdmML0qC3q3FU= +google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E= +google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= +google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io= +google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/logger/slog/slog.go b/logger/slog/slog.go index ee1fe17a..e39b1e5c 100644 --- a/logger/slog/slog.go +++ b/logger/slog/slog.go @@ -69,6 +69,7 @@ func (s *slogLogger) renameAttr(_ []string, a slog.Attr) slog.Attr { type slogLogger struct { leveler *slog.LevelVar handler slog.Handler + slog slog.Logger opts logger.Options mu sync.RWMutex } diff --git a/logger/slog/slog_test.go b/logger/slog/slog_test.go index fc381892..270213df 100644 --- a/logger/slog/slog_test.go +++ b/logger/slog/slog_test.go @@ -235,3 +235,29 @@ func Test_WithContextAttrFunc(t *testing.T) { t.Fatalf("logger info, buf %s", buf.Bytes()) } } + +func Test_CloneAttrInHandler(t *testing.T) { + ctx := context.TODO() + buf := bytes.NewBuffer(nil) + + log := NewLogger(logger.WithLevel(logger.DebugLevel), logger.WithOutput(buf), logger.WithAddSource(false)) + if err := log.Init(); err != nil { + t.Fatal(err) + } + log = log.Fields("key", "val") + + nlog := log.Clone(logger.WithLevel(logger.InfoLevel)) + if err := nlog.Init(); err != nil { + t.Fatal(err) + } + + nlog.Info(ctx, "new log info message") + log.Info(ctx, "original log info message") + + /* Buffer has: + + {"timestamp":"2024-11-24T01:12:30.752866543+03:00","level":"info","msg":"new log info message"} + {"timestamp":"2024-11-24T01:12:35.528170034+03:00","level":"info","msg":"original log info message","key":"val"} + */ + +} diff --git a/server/hooks.go b/server/hooks.go new file mode 100644 index 00000000..2f1f4fe0 --- /dev/null +++ b/server/hooks.go @@ -0,0 +1,44 @@ +package server + +import ( + "context" + "strings" + + "go.unistack.org/micro/v3/logger" + "go.unistack.org/micro/v3/metadata" +) + +func NewLoggerHookSubHandler(extractKeys ...string) HookSubHandler { + return func(next FuncSubHandler) FuncSubHandler { + return func(ctx context.Context, msg Message) (err error) { + log := logger.MustContext(ctx) + attrs := []interface{}{"endpoint", msg.Topic()} + + if md, ok := metadata.FromIncomingContext(ctx); ok { + for idx := range extractKeys { + if val, ok := md.Get(extractKeys[idx]); ok { + attrs = append(attrs, strings.ToLower(extractKeys[idx]), val) + } + } + } + + log = log.Fields(attrs...) + ctx = logger.NewContext(ctx, log) + + if log.V(logger.DebugLevel) { + log.Fields( + // "payload", MarshalJSON(msg.Body()), + "headers", msg.Header(), + ).Debug(ctx, "subscriber called") + } + + if err = next(ctx, msg); err != nil { + log.Error(ctx, "subscriber finished with error", err) + } else { + log.Debug(ctx, "subscriber finished") + } + + return err + } + } +} diff --git a/server/noop_test.go b/server/noop_test.go index 2bad82e6..c6441abc 100644 --- a/server/noop_test.go +++ b/server/noop_test.go @@ -4,12 +4,20 @@ import ( "context" "fmt" "testing" + "time" + kgo "go.unistack.org/micro-broker-kgo/v3" + requestid "go.unistack.org/micro-wrapper-requestid/v3" + "go.unistack.org/micro/v3" "go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v3/client" "go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v3/logger" + "go.unistack.org/micro/v3/logger/slog" + "go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v3/server" + + "github.com/google/uuid" ) type TestHandler struct { @@ -21,7 +29,8 @@ type TestMessage struct { } func (h *TestHandler) SingleSubHandler(ctx context.Context, msg *codec.Frame) error { - // fmt.Printf("msg %s\n", msg.Data) + log := logger.MustContext(ctx) + log.Info(ctx, fmt.Sprintf("%s", msg.Data)) return nil } @@ -82,3 +91,107 @@ func TestNoopSub(t *testing.T) { } }() } + +func TestConsumerAndServer(t *testing.T) { + const ( + topicTarget = "single_topic" + addressKafka = "localhost:9092" + ) + + ctx := context.Background() + logger.DefaultLogger = slog.NewLogger() + logger.DefaultLogger.Init(logger.WithLevel(logger.DebugLevel)) + rh := requestid.NewHook() + + b := kgo.NewBroker( + kgo.CommitInterval(1*time.Second), + broker.Addrs(addressKafka), + broker.Logger(logger.DefaultLogger.Clone(logger.WithLevel(logger.ParseLevel("error")))), + broker.ErrorHandler(func(event broker.Event) error { + msg := event.Message() + log := logger.DefaultLogger. + Fields("topic", event.Topic(), + "header", msg.Header, + "body", msg.Body) + err := event.Ack() + + if err != nil { + log.Fields("ack_error", err).Error(context.Background(), fmt.Sprintf("brokerHandlerErr: Ack error | %v", event.Error())) + return err + } + + log.Error(context.Background(), fmt.Sprintf("brokerHandlerErr: %v", event.Error())) + + return nil + }), + ) + + if err := b.Init(); err != nil { + t.Fatal(err) + } + + if err := b.Connect(ctx); err != nil { + t.Fatal(err) + } + + s := server.NewServer( + server.Broker(b), + server.Codec("application/octet-stream", codec.NewCodec()), + server.Hooks( + server.NewLoggerHookSubHandler(metadata.HeaderEndpoint), + server.HookSubHandler(rh.ServerSubscriber), + ), + ) + if err := s.Init(); err != nil { + t.Fatal(err) + } + + c := client.NewClient( + client.Broker(b), + client.Codec("application/octet-stream", codec.NewCodec()), + client.ContentType("application/octet-stream"), + ) + if err := c.Init(); err != nil { + t.Fatal(err) + } + h := &TestHandler{t: t} + + if err := s.Subscribe(s.NewSubscriber(topicTarget, h.SingleSubHandler, + server.SubscriberQueue("queue"), + )); err != nil { + t.Fatal(err) + } + + if err := s.Start(); err != nil { + t.Fatal(err) + } + + msgs := make([]client.Message, 0, 8) + for i := 0; i < 8; i++ { + req := &codec.Frame{Data: []byte(fmt.Sprintf(`{"name": "test_name %d"}`, i))} + + opts := []client.MessageOption{ + client.MessageMetadata(metadata.HeaderXRequestID, uuid.NewString()), + client.MessageMetadata(metadata.HeaderEndpoint, "test_endpoint"), + client.MessageMetadata(metadata.HeaderService, "test_service"), + } + + msgs = append(msgs, c.NewMessage(topicTarget, req, opts...)) + } + + if err := c.BatchPublish(ctx, msgs); err != nil { + t.Fatal(err) + } + + msv := micro.NewService( + micro.Server(s), + micro.Client(c), + micro.Broker(b), + micro.Context(ctx), + micro.Name("test_server"), + micro.Version("latest")) + + if err := msv.Run(); err != nil { + t.Fatal(err) + } +} -- 2.45.2