update for latest micro logger changes
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
		
							
								
								
									
										28
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										28
									
								
								go.mod
									
									
									
									
									
								
							| @@ -9,17 +9,29 @@ require ( | ||||
| 	github.com/twmb/franz-go v1.17.1 | ||||
| 	github.com/twmb/franz-go/pkg/kadm v1.13.0 | ||||
| 	github.com/twmb/franz-go/pkg/kmsg v1.8.0 | ||||
| 	go.opentelemetry.io/otel v1.30.0 | ||||
| 	go.unistack.org/micro/v3 v3.10.91 | ||||
| 	go.opentelemetry.io/otel v1.31.0 | ||||
| 	go.unistack.org/micro/v3 v3.10.97 | ||||
| ) | ||||
|  | ||||
| require ( | ||||
| 	github.com/klauspost/compress v1.17.9 // indirect | ||||
| 	dario.cat/mergo v1.0.0 // indirect | ||||
| 	github.com/KimMachineGun/automemlimit v0.6.1 // indirect | ||||
| 	github.com/cilium/ebpf v0.9.1 // indirect | ||||
| 	github.com/containerd/cgroups/v3 v3.0.1 // indirect | ||||
| 	github.com/coreos/go-systemd/v22 v22.3.2 // indirect | ||||
| 	github.com/docker/go-units v0.4.0 // indirect | ||||
| 	github.com/godbus/dbus/v5 v5.0.4 // indirect | ||||
| 	github.com/klauspost/compress v1.17.11 // indirect | ||||
| 	github.com/opencontainers/runtime-spec v1.0.2 // indirect | ||||
| 	github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect | ||||
| 	github.com/pierrec/lz4/v4 v4.1.21 // indirect | ||||
| 	github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 // indirect | ||||
| 	github.com/sirupsen/logrus v1.8.1 // indirect | ||||
| 	go.uber.org/automaxprocs v1.6.0 // indirect | ||||
| 	go.unistack.org/micro-proto/v3 v3.4.1 // indirect | ||||
| 	golang.org/x/crypto v0.27.0 // indirect | ||||
| 	golang.org/x/sys v0.25.0 // indirect | ||||
| 	google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect | ||||
| 	google.golang.org/grpc v1.67.0 // indirect | ||||
| 	google.golang.org/protobuf v1.34.2 // indirect | ||||
| 	golang.org/x/crypto v0.28.0 // indirect | ||||
| 	golang.org/x/sys v0.26.0 // indirect | ||||
| 	google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9 // indirect | ||||
| 	google.golang.org/grpc v1.67.1 // indirect | ||||
| 	google.golang.org/protobuf v1.35.1 // indirect | ||||
| ) | ||||
|   | ||||
							
								
								
									
										45
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										45
									
								
								go.sum
									
									
									
									
									
								
							| @@ -1,5 +1,19 @@ | ||||
| dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk= | ||||
| dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= | ||||
| github.com/KimMachineGun/automemlimit v0.6.1 h1:ILa9j1onAAMadBsyyUJv5cack8Y1WT26yLj/V+ulKp8= | ||||
| github.com/KimMachineGun/automemlimit v0.6.1/go.mod h1:T7xYht7B8r6AG/AqFcUdc7fzd2bIdBKmepfP2S1svPY= | ||||
| github.com/cilium/ebpf v0.9.1 h1:64sn2K3UKw8NbP/blsixRpF3nXuyhz/VjRlRzvlBRu4= | ||||
| github.com/cilium/ebpf v0.9.1/go.mod h1:+OhNOIXx/Fnu1IE8bJz2dzOA+VSfyTfdNUVdlQnxUFY= | ||||
| github.com/containerd/cgroups/v3 v3.0.1 h1:4hfGvu8rfGIwVIDd+nLzn/B9ZXx4BcCjzt5ToenJRaE= | ||||
| github.com/containerd/cgroups/v3 v3.0.1/go.mod h1:/vtwk1VXrtoa5AaZLkypuOJgA/6DyPMZHJPGQNtlHnw= | ||||
| github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI= | ||||
| github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= | ||||
| github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= | ||||
| github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= | ||||
| github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw= | ||||
| github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= | ||||
| github.com/godbus/dbus/v5 v5.0.4 h1:9349emZab16e7zQvpmsbtjc18ykshndd8y2PG3sgJbA= | ||||
| github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= | ||||
| github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= | ||||
| github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= | ||||
| github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= | ||||
| @@ -8,10 +22,21 @@ github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0N | ||||
| github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= | ||||
| github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= | ||||
| github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= | ||||
| github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= | ||||
| github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= | ||||
| github.com/opencontainers/runtime-spec v1.0.2 h1:UfAcuLBJB9Coz72x1hgl8O5RVzTdNiaglX6v2DM6FI0= | ||||
| github.com/opencontainers/runtime-spec v1.0.2/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= | ||||
| github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0= | ||||
| github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y= | ||||
| github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= | ||||
| github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= | ||||
| github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= | ||||
| github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= | ||||
| github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 h1:G/FZtUu7a6NTWl3KUHMV9jkLAh/Rvtf03NWMHaEDl+E= | ||||
| github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= | ||||
| github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= | ||||
| github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= | ||||
| github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= | ||||
| github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= | ||||
| github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= | ||||
| github.com/twmb/franz-go v1.16.1 h1:rpWc7fB9jd7TgmCyfxzenBI+QbgS8ZfJOUQE+tzPtbE= | ||||
| @@ -30,35 +55,55 @@ go.opentelemetry.io/otel v1.25.0 h1:gldB5FfhRl7OJQbUHt/8s0a7cE8fbsPAtdpRaApKy4k= | ||||
| go.opentelemetry.io/otel v1.25.0/go.mod h1:Wa2ds5NOXEMkCmUou1WA7ZBfLTHWIsp034OVD7AO+Vg= | ||||
| go.opentelemetry.io/otel v1.30.0 h1:F2t8sK4qf1fAmY9ua4ohFS/K+FUuOPemHUIXHtktrts= | ||||
| go.opentelemetry.io/otel v1.30.0/go.mod h1:tFw4Br9b7fOS+uEao81PJjVMjW/5fvNCbpsDIXqP0pc= | ||||
| go.opentelemetry.io/otel v1.31.0 h1:NsJcKPIW0D0H3NgzPDHmo0WW6SptzPdqg/L1zsIm2hY= | ||||
| go.opentelemetry.io/otel v1.31.0/go.mod h1:O0C14Yl9FgkjqcCZAsE053C13OaddMYr/hz6clDkEJE= | ||||
| go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= | ||||
| go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= | ||||
| go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q= | ||||
| go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo= | ||||
| go.unistack.org/micro/v3 v3.10.59 h1:eneYXJLgyu5MZpSvyI0K17CeXvgOoUCN5dWZaPV5lI4= | ||||
| go.unistack.org/micro/v3 v3.10.59/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg= | ||||
| go.unistack.org/micro/v3 v3.10.91 h1:vuJY4tXwpqimwIkEJ3TozMYNVQQs+C5QMlQWPgSY/YM= | ||||
| go.unistack.org/micro/v3 v3.10.91/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg= | ||||
| go.unistack.org/micro/v3 v3.10.96 h1:njukb0DlXOJspX5mLSwcP7+cVSg/8Fy+6++mYkOpY2A= | ||||
| go.unistack.org/micro/v3 v3.10.96/go.mod h1:YzMldzHN9Ei+zy5t/Psu7RUWDZwUfrNYiStSQtTz90g= | ||||
| go.unistack.org/micro/v3 v3.10.97 h1:8l7fv+i06/PjPrBBhRC/ZQkWGIOuHPg3jJN0vktYE78= | ||||
| go.unistack.org/micro/v3 v3.10.97/go.mod h1:YzMldzHN9Ei+zy5t/Psu7RUWDZwUfrNYiStSQtTz90g= | ||||
| golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30= | ||||
| golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M= | ||||
| golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= | ||||
| golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= | ||||
| golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw= | ||||
| golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U= | ||||
| golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= | ||||
| golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= | ||||
| golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= | ||||
| golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= | ||||
| golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= | ||||
| golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= | ||||
| golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= | ||||
| golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= | ||||
| golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= | ||||
| golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= | ||||
| golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= | ||||
| google.golang.org/genproto/googleapis/rpc v0.0.0-20240412170617-26222e5d3d56 h1:zviK8GX4VlMstrK3JkexM5UHjH1VOkRebH9y3jhSBGk= | ||||
| google.golang.org/genproto/googleapis/rpc v0.0.0-20240412170617-26222e5d3d56/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= | ||||
| google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ= | ||||
| google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= | ||||
| google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9 h1:QCqS/PdaHTSWGvupk2F/ehwHtGc0/GYkT+3GAcR1CCc= | ||||
| google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI= | ||||
| google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM= | ||||
| google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA= | ||||
| google.golang.org/grpc v1.67.0 h1:IdH9y6PF5MPSdAntIcpjQ+tXO41pcQsfZV2RxtQgVcw= | ||||
| google.golang.org/grpc v1.67.0/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= | ||||
| google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E= | ||||
| google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= | ||||
| google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= | ||||
| google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= | ||||
| google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= | ||||
| google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= | ||||
| google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= | ||||
| google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= | ||||
| gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= | ||||
| gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= | ||||
| gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= | ||||
|   | ||||
							
								
								
									
										2
									
								
								kgo.go
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								kgo.go
									
									
									
									
									
								
							| @@ -437,7 +437,7 @@ func NewBroker(opts ...broker.Option) *Broker { | ||||
| 		kgo.DialTimeout(3 * time.Second), | ||||
| 		kgo.DisableIdempotentWrite(), | ||||
| 		kgo.ProducerBatchCompression(kgo.NoCompression()), | ||||
| 		kgo.WithLogger(&mlogger{l: options.Logger.Clone(logger.WithCallerSkipCount(options.Logger.Options().CallerSkipCount + 2)), ctx: options.Context}), | ||||
| 		kgo.WithLogger(&mlogger{l: options.Logger.Clone(logger.WithAddCallerSkipCount(2)), ctx: options.Context}), | ||||
| 		kgo.SeedBrokers(kaddrs...), | ||||
| 		kgo.RetryBackoffFn(DefaultRetryBackoffFn), | ||||
| 		kgo.BlockRebalanceOnPoll(), | ||||
|   | ||||
| @@ -50,7 +50,7 @@ func TestPubSub(t *testing.T) { | ||||
| 		t.Skip() | ||||
| 	} | ||||
|  | ||||
| 	if err := logger.DefaultLogger.Init(logger.WithLevel(loglevel), logger.WithCallerSkipCount(3)); err != nil { | ||||
| 	if err := logger.DefaultLogger.Init(logger.WithLevel(loglevel)); err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 	ctx := context.Background() | ||||
|   | ||||
| @@ -28,11 +28,8 @@ func (l *mlogger) Log(lvl kgo.LogLevel, msg string, args ...interface{}) { | ||||
| 	default: | ||||
| 		return | ||||
| 	} | ||||
| 	if len(args) > 0 { | ||||
| 		l.l.Log(l.ctx, mlvl, append([]interface{}{msg}, args...)...) | ||||
| 	} else { | ||||
| 		l.l.Log(l.ctx, mlvl, msg) | ||||
| 	} | ||||
|  | ||||
| 	l.l.Log(l.ctx, mlvl, msg, args...) | ||||
| } | ||||
|  | ||||
| func (l *mlogger) Level() kgo.LogLevel { | ||||
|   | ||||
| @@ -2,6 +2,7 @@ package kgo | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"strconv" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| @@ -136,7 +137,7 @@ func (s *Subscriber) poll(ctx context.Context) { | ||||
| 				return | ||||
| 			} | ||||
| 			fetches.EachError(func(t string, p int32, err error) { | ||||
| 				s.kopts.Logger.Fatalf(ctx, "[kgo] fetch topic %s partition %d err: %v", t, p, err) | ||||
| 				s.kopts.Logger.Fatal(ctx, fmt.Sprintf("[kgo] fetch topic %s partition %d error", t, p), err) | ||||
| 			}) | ||||
|  | ||||
| 			fetches.EachPartition(func(p kgo.FetchTopicPartition) { | ||||
| @@ -158,7 +159,9 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32) | ||||
| 			pc := s.consumers[tp] | ||||
| 			delete(s.consumers, tp) | ||||
| 			close(pc.quit) | ||||
| 			s.kopts.Logger.Debugf(ctx, "[kgo] waiting for work to finish topic %s partition %d", topic, partition) | ||||
| 			if s.kopts.Logger.V(logger.DebugLevel) { | ||||
| 				s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] waiting for work to finish topic %s partition %d", topic, partition)) | ||||
| 			} | ||||
| 			wg.Add(1) | ||||
| 			go func() { <-pc.done; wg.Done() }() | ||||
| 		} | ||||
| @@ -166,15 +169,19 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32) | ||||
| } | ||||
|  | ||||
| func (s *Subscriber) lost(ctx context.Context, _ *kgo.Client, lost map[string][]int32) { | ||||
| 	s.kopts.Logger.Debugf(ctx, "[kgo] lost %#+v", lost) | ||||
| 	if s.kopts.Logger.V(logger.DebugLevel) { | ||||
| 		s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] lost %#+v", lost)) | ||||
| 	} | ||||
| 	s.killConsumers(ctx, lost) | ||||
| } | ||||
|  | ||||
| func (s *Subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[string][]int32) { | ||||
| 	s.kopts.Logger.Debugf(ctx, "[kgo] revoked %#+v", revoked) | ||||
| 	if s.kopts.Logger.V(logger.DebugLevel) { | ||||
| 		s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] revoked %#+v", revoked)) | ||||
| 	} | ||||
| 	s.killConsumers(ctx, revoked) | ||||
| 	if err := c.CommitMarkedOffsets(ctx); err != nil { | ||||
| 		s.kopts.Logger.Errorf(ctx, "[kgo] revoked CommitMarkedOffsets err: %v", err) | ||||
| 		s.kopts.Logger.Error(ctx, "[kgo] revoked CommitMarkedOffsets error", err) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @@ -203,8 +210,10 @@ func (s *Subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[str | ||||
|  | ||||
| func (pc *consumer) consume() { | ||||
| 	defer close(pc.done) | ||||
| 	pc.kopts.Logger.Debugf(pc.kopts.Context, "starting, topic %s partition %d", pc.topic, pc.partition) | ||||
| 	defer pc.kopts.Logger.Debugf(pc.kopts.Context, "killing, topic %s partition %d", pc.topic, pc.partition) | ||||
| 	if pc.kopts.Logger.V(logger.DebugLevel) { | ||||
| 		pc.kopts.Logger.Debug(pc.kopts.Context, fmt.Sprintf("starting, topic %s partition %d", pc.topic, pc.partition)) | ||||
| 		defer pc.kopts.Logger.Debug(pc.kopts.Context, fmt.Sprintf("killing, topic %s partition %d", pc.topic, pc.partition)) | ||||
| 	} | ||||
|  | ||||
| 	eh := pc.kopts.ErrorHandler | ||||
| 	if pc.opts.ErrorHandler != nil { | ||||
| @@ -251,7 +260,7 @@ func (pc *consumer) consume() { | ||||
| 								pc.c.MarkCommitRecords(record) | ||||
| 							} else { | ||||
| 								eventPool.Put(p) | ||||
| 								pc.kopts.Logger.Fatalf(pc.kopts.Context, "[kgo] ErrLostMessage wtf?") | ||||
| 								pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?") | ||||
| 								return | ||||
| 							} | ||||
| 							eventPool.Put(p) | ||||
| @@ -260,16 +269,14 @@ func (pc *consumer) consume() { | ||||
| 							pc.kopts.Meter.Histogram(semconv.SubscribeMessageDurationSeconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds()) | ||||
| 							continue | ||||
| 						} else { | ||||
| 							if pc.kopts.Logger.V(logger.ErrorLevel) { | ||||
| 								pc.kopts.Logger.Errorf(pc.kopts.Context, "[kgo]: failed to unmarshal: %v", err) | ||||
| 							} | ||||
| 							pc.kopts.Logger.Error(pc.kopts.Context, "[kgo]: unmarshal error", err) | ||||
| 						} | ||||
| 						te := time.Since(ts) | ||||
| 						pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Dec() | ||||
| 						pc.kopts.Meter.Summary(semconv.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds()) | ||||
| 						pc.kopts.Meter.Histogram(semconv.SubscribeMessageDurationSeconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds()) | ||||
| 						eventPool.Put(p) | ||||
| 						pc.kopts.Logger.Fatalf(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?") | ||||
| 						pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?") | ||||
| 						sp.Finish() | ||||
| 						return | ||||
| 					} | ||||
| @@ -294,7 +301,7 @@ func (pc *consumer) consume() { | ||||
| 						sp.AddEvent("error handler stop") | ||||
| 					} else { | ||||
| 						if pc.kopts.Logger.V(logger.ErrorLevel) { | ||||
| 							pc.kopts.Logger.Errorf(pc.kopts.Context, "[kgo]: subscriber error: %v", err) | ||||
| 							pc.kopts.Logger.Error(pc.kopts.Context, "[kgo]: subscriber error", err) | ||||
| 						} | ||||
| 					} | ||||
| 				} | ||||
| @@ -306,7 +313,7 @@ func (pc *consumer) consume() { | ||||
| 					pc.c.MarkCommitRecords(record) | ||||
| 				} else { | ||||
| 					eventPool.Put(p) | ||||
| 					pc.kopts.Logger.Fatalf(pc.kopts.Context, "[kgo] ErrLostMessage wtf?") | ||||
| 					pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?") | ||||
| 					sp.SetStatus(tracer.SpanStatusError, "ErrLostMessage") | ||||
| 					sp.Finish() | ||||
| 					return | ||||
|   | ||||
		Reference in New Issue
	
	Block a user