WIP: Add testcase && hook logger for server #360
24
go.mod
24
go.mod
@ -6,14 +6,17 @@ require (
|
||||
dario.cat/mergo v1.0.0
|
||||
github.com/DATA-DOG/go-sqlmock v1.5.0
|
||||
github.com/KimMachineGun/automemlimit v0.6.1
|
||||
github.com/google/uuid v1.3.0
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible
|
||||
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5
|
||||
go.uber.org/automaxprocs v1.6.0
|
||||
go.unistack.org/micro-broker-kgo/v3 v3.8.49
|
||||
go.unistack.org/micro-proto/v3 v3.4.1
|
||||
golang.org/x/sync v0.3.0
|
||||
google.golang.org/grpc v1.57.0
|
||||
google.golang.org/protobuf v1.33.0
|
||||
go.unistack.org/micro-wrapper-requestid/v3 v3.9.2
|
||||
golang.org/x/sync v0.8.0
|
||||
google.golang.org/grpc v1.67.1
|
||||
google.golang.org/protobuf v1.35.2
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
)
|
||||
|
||||
require (
|
||||
@ -22,11 +25,16 @@ require (
|
||||
github.com/coreos/go-systemd/v22 v22.3.2 // indirect
|
||||
github.com/docker/go-units v0.4.0 // indirect
|
||||
github.com/godbus/dbus/v5 v5.0.4 // indirect
|
||||
github.com/golang/protobuf v1.5.3 // indirect
|
||||
github.com/klauspost/compress v1.17.11 // indirect
|
||||
github.com/opencontainers/runtime-spec v1.0.2 // indirect
|
||||
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
|
||||
github.com/pierrec/lz4/v4 v4.1.21 // indirect
|
||||
github.com/sirupsen/logrus v1.8.1 // indirect
|
||||
golang.org/x/net v0.14.0 // indirect
|
||||
golang.org/x/sys v0.11.0 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e // indirect
|
||||
github.com/twmb/franz-go v1.18.0 // indirect
|
||||
github.com/twmb/franz-go/pkg/kadm v1.14.0 // indirect
|
||||
github.com/twmb/franz-go/pkg/kmsg v1.9.0 // indirect
|
||||
go.opentelemetry.io/otel v1.32.0 // indirect
|
||||
golang.org/x/crypto v0.29.0 // indirect
|
||||
golang.org/x/sys v0.27.0 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20241118233622-e639e219e697 // indirect
|
||||
)
|
||||
|
67
go.sum
67
go.sum
@ -18,14 +18,12 @@ github.com/frankban/quicktest v1.14.0 h1:+cqqvzZV87b4adx/5ayVOaYZ2CrvM4ejQvUdBzP
|
||||
github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09mUdL/ijj8og=
|
||||
github.com/godbus/dbus/v5 v5.0.4 h1:9349emZab16e7zQvpmsbtjc18ykshndd8y2PG3sgJbA=
|
||||
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
|
||||
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
|
||||
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
|
||||
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
|
||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
|
||||
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
|
||||
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
|
||||
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
|
||||
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
|
||||
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
|
||||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||
@ -36,6 +34,8 @@ github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaR
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
|
||||
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0=
|
||||
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y=
|
||||
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
|
||||
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
|
||||
@ -47,32 +47,45 @@ github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5/go.mod h1:7RTUFBdIRC9nZ7
|
||||
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
|
||||
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
|
||||
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
|
||||
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
|
||||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
||||
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
|
||||
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
github.com/twmb/franz-go v1.18.0 h1:25FjMZfdozBywVX+5xrWC2W+W76i0xykKjTdEeD2ejw=
|
||||
github.com/twmb/franz-go v1.18.0/go.mod h1:zXCGy74M0p5FbXsLeASdyvfLFsBvTubVqctIaa5wQ+I=
|
||||
github.com/twmb/franz-go/pkg/kadm v1.14.0 h1:nAn1co1lXzJQocpzyIyOFOjUBf4WHWs5/fTprXy2IZs=
|
||||
github.com/twmb/franz-go/pkg/kadm v1.14.0/go.mod h1:XjOPz6ZaXXjrW2jVCfLuucP8H1w2TvD6y3PT2M+aAM4=
|
||||
github.com/twmb/franz-go/pkg/kmsg v1.9.0 h1:JojYUph2TKAau6SBtErXpXGC7E3gg4vGZMv9xFU/B6M=
|
||||
github.com/twmb/franz-go/pkg/kmsg v1.9.0/go.mod h1:CMbfazviCyY6HM0SXuG5t9vOwYDHRCSrJJyBAe5paqg=
|
||||
go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U=
|
||||
go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg=
|
||||
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
|
||||
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
|
||||
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
|
||||
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
|
||||
go.unistack.org/micro-broker-kgo/v3 v3.8.49 h1:2yqhl4KcGRa9dhJqzPnmRAkOuw2qbD9Rmk2chJbXvqc=
|
||||
go.unistack.org/micro-broker-kgo/v3 v3.8.49/go.mod h1:9KdCb9kkTq3AQsJosOaT//60DGGKx9NhV9leJkT0S8M=
|
||||
go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q=
|
||||
go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo=
|
||||
golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14=
|
||||
golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI=
|
||||
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
|
||||
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
|
||||
go.unistack.org/micro-wrapper-requestid/v3 v3.9.2 h1:Wu0oIGJieH37xroCjlaPivuqTGmv5l5KWgVaRQWEQMY=
|
||||
go.unistack.org/micro-wrapper-requestid/v3 v3.9.2/go.mod h1:cqgjdSSASTnOLjvwndP9bi0b8DbuoKbDII884R5puwU=
|
||||
golang.org/x/crypto v0.29.0 h1:L5SG1JTTXupVV3n6sUqMTeWbjAyfPwoda2DLX8J8FrQ=
|
||||
golang.org/x/crypto v0.29.0/go.mod h1:+F4F4N5hv6v38hfeYwTdx20oUvLLc+QfrE9Ax9HtgRg=
|
||||
golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE=
|
||||
golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg=
|
||||
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
|
||||
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
||||
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
|
||||
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc=
|
||||
golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e h1:NumxXLPfHSndr3wBBdeKiVHjGVFzi9RX2HwwQke94iY=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA=
|
||||
google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw=
|
||||
google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo=
|
||||
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
|
||||
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
||||
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
|
||||
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
|
||||
golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s=
|
||||
golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/text v0.20.0 h1:gK/Kv2otX8gz+wn7Rmb3vT96ZwuoxnQlY+HlJVj7Qug=
|
||||
golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20241118233622-e639e219e697 h1:LWZqQOEjDyONlF1H6afSWpAL/znlREo2tHfLoe+8LMA=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20241118233622-e639e219e697/go.mod h1:5uTbfoYQed2U9p3KIj2/Zzm02PYhndfdmML0qC3q3FU=
|
||||
google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E=
|
||||
google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA=
|
||||
google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io=
|
||||
google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
@ -69,6 +69,7 @@ func (s *slogLogger) renameAttr(_ []string, a slog.Attr) slog.Attr {
|
||||
type slogLogger struct {
|
||||
leveler *slog.LevelVar
|
||||
handler slog.Handler
|
||||
slog slog.Logger
|
||||
opts logger.Options
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
@ -235,3 +235,29 @@ func Test_WithContextAttrFunc(t *testing.T) {
|
||||
t.Fatalf("logger info, buf %s", buf.Bytes())
|
||||
}
|
||||
}
|
||||
|
||||
func Test_CloneAttrInHandler(t *testing.T) {
|
||||
ctx := context.TODO()
|
||||
buf := bytes.NewBuffer(nil)
|
||||
|
||||
log := NewLogger(logger.WithLevel(logger.DebugLevel), logger.WithOutput(buf), logger.WithAddSource(false))
|
||||
if err := log.Init(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
log = log.Fields("key", "val")
|
||||
|
||||
|
||||
nlog := log.Clone(logger.WithLevel(logger.InfoLevel))
|
||||
devstigneev
commented
метод клон, аналогичен методу филдс причина: аттррибуты пишутся только в сам хендлер слога, которые уже не копируются https://git.unistack.org/devstigneev/micro/src/branch/log_hooks/logger/slog/slog.go#L130 метод клон, аналогичен методу филдс
причина: аттррибуты пишутся только в сам хендлер слога, которые уже не копируются
[https://git.unistack.org/devstigneev/micro/src/branch/log_hooks/logger/slog/slog.go#L130]()
|
||||
if err := nlog.Init(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
nlog.Info(ctx, "new log info message")
|
||||
log.Info(ctx, "original log info message")
|
||||
|
||||
/* Buffer has:
|
||||
|
||||
{"timestamp":"2024-11-24T01:12:30.752866543+03:00","level":"info","msg":"new log info message"}
|
||||
{"timestamp":"2024-11-24T01:12:35.528170034+03:00","level":"info","msg":"original log info message","key":"val"}
|
||||
*/
|
||||
|
||||
}
|
||||
|
44
server/hooks.go
Normal file
44
server/hooks.go
Normal file
@ -0,0 +1,44 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
"go.unistack.org/micro/v3/logger"
|
||||
"go.unistack.org/micro/v3/metadata"
|
||||
)
|
||||
|
||||
func NewLoggerHookSubHandler(extractKeys ...string) HookSubHandler {
|
||||
return func(next FuncSubHandler) FuncSubHandler {
|
||||
return func(ctx context.Context, msg Message) (err error) {
|
||||
log := logger.MustContext(ctx)
|
||||
attrs := []interface{}{"endpoint", msg.Topic()}
|
||||
|
||||
if md, ok := metadata.FromIncomingContext(ctx); ok {
|
||||
for idx := range extractKeys {
|
||||
if val, ok := md.Get(extractKeys[idx]); ok {
|
||||
attrs = append(attrs, strings.ToLower(extractKeys[idx]), val)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log = log.Fields(attrs...)
|
||||
ctx = logger.NewContext(ctx, log)
|
||||
|
||||
if log.V(logger.DebugLevel) {
|
||||
log.Fields(
|
||||
// "payload", MarshalJSON(msg.Body()),
|
||||
"headers", msg.Header(),
|
||||
).Debug(ctx, "subscriber called")
|
||||
}
|
||||
|
||||
if err = next(ctx, msg); err != nil {
|
||||
log.Error(ctx, "subscriber finished with error", err)
|
||||
} else {
|
||||
log.Debug(ctx, "subscriber finished")
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
@ -4,12 +4,20 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
kgo "go.unistack.org/micro-broker-kgo/v3"
|
||||
requestid "go.unistack.org/micro-wrapper-requestid/v3"
|
||||
"go.unistack.org/micro/v3"
|
||||
"go.unistack.org/micro/v3/broker"
|
||||
"go.unistack.org/micro/v3/client"
|
||||
"go.unistack.org/micro/v3/codec"
|
||||
"go.unistack.org/micro/v3/logger"
|
||||
"go.unistack.org/micro/v3/logger/slog"
|
||||
"go.unistack.org/micro/v3/metadata"
|
||||
"go.unistack.org/micro/v3/server"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type TestHandler struct {
|
||||
@ -21,7 +29,8 @@ type TestMessage struct {
|
||||
}
|
||||
|
||||
func (h *TestHandler) SingleSubHandler(ctx context.Context, msg *codec.Frame) error {
|
||||
// fmt.Printf("msg %s\n", msg.Data)
|
||||
log := logger.MustContext(ctx)
|
||||
log.Info(ctx, fmt.Sprintf("%s", msg.Data))
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -82,3 +91,107 @@ func TestNoopSub(t *testing.T) {
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func TestConsumerAndServer(t *testing.T) {
|
||||
const (
|
||||
topicTarget = "single_topic"
|
||||
addressKafka = "localhost:9092"
|
||||
)
|
||||
|
||||
ctx := context.Background()
|
||||
logger.DefaultLogger = slog.NewLogger()
|
||||
logger.DefaultLogger.Init(logger.WithLevel(logger.DebugLevel))
|
||||
rh := requestid.NewHook()
|
||||
|
||||
b := kgo.NewBroker(
|
||||
kgo.CommitInterval(1*time.Second),
|
||||
broker.Addrs(addressKafka),
|
||||
broker.Logger(logger.DefaultLogger.Clone(logger.WithLevel(logger.ParseLevel("error")))),
|
||||
broker.ErrorHandler(func(event broker.Event) error {
|
||||
msg := event.Message()
|
||||
log := logger.DefaultLogger.
|
||||
Fields("topic", event.Topic(),
|
||||
"header", msg.Header,
|
||||
"body", msg.Body)
|
||||
err := event.Ack()
|
||||
|
||||
if err != nil {
|
||||
log.Fields("ack_error", err).Error(context.Background(), fmt.Sprintf("brokerHandlerErr: Ack error | %v", event.Error()))
|
||||
return err
|
||||
}
|
||||
|
||||
log.Error(context.Background(), fmt.Sprintf("brokerHandlerErr: %v", event.Error()))
|
||||
|
||||
return nil
|
||||
}),
|
||||
)
|
||||
|
||||
if err := b.Init(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := b.Connect(ctx); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
s := server.NewServer(
|
||||
server.Broker(b),
|
||||
server.Codec("application/octet-stream", codec.NewCodec()),
|
||||
server.Hooks(
|
||||
server.NewLoggerHookSubHandler(metadata.HeaderEndpoint),
|
||||
server.HookSubHandler(rh.ServerSubscriber),
|
||||
),
|
||||
)
|
||||
if err := s.Init(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
c := client.NewClient(
|
||||
client.Broker(b),
|
||||
client.Codec("application/octet-stream", codec.NewCodec()),
|
||||
client.ContentType("application/octet-stream"),
|
||||
)
|
||||
if err := c.Init(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
h := &TestHandler{t: t}
|
||||
|
||||
if err := s.Subscribe(s.NewSubscriber(topicTarget, h.SingleSubHandler,
|
||||
server.SubscriberQueue("queue"),
|
||||
)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := s.Start(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
msgs := make([]client.Message, 0, 8)
|
||||
for i := 0; i < 8; i++ {
|
||||
req := &codec.Frame{Data: []byte(fmt.Sprintf(`{"name": "test_name %d"}`, i))}
|
||||
|
||||
opts := []client.MessageOption{
|
||||
client.MessageMetadata(metadata.HeaderXRequestID, uuid.NewString()),
|
||||
client.MessageMetadata(metadata.HeaderEndpoint, "test_endpoint"),
|
||||
client.MessageMetadata(metadata.HeaderService, "test_service"),
|
||||
}
|
||||
|
||||
msgs = append(msgs, c.NewMessage(topicTarget, req, opts...))
|
||||
}
|
||||
|
||||
if err := c.BatchPublish(ctx, msgs); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
msv := micro.NewService(
|
||||
micro.Server(s),
|
||||
micro.Client(c),
|
||||
micro.Broker(b),
|
||||
micro.Context(ctx),
|
||||
micro.Name("test_server"),
|
||||
micro.Version("latest"))
|
||||
|
||||
if err := msv.Run(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user
Fields просто прокидываются в handler
Если использовать опцию WithFields они будут склонированы, тк хранятся в самой опции