From 39177da1d06a76958f73bd47d65d77364582b313 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sat, 13 Apr 2024 02:24:16 +0300 Subject: [PATCH] massive meter usage Signed-off-by: Vasiliy Tolstov --- go.mod | 19 ++++++++------ go.sum | 42 +++++++++++++++++------------ kgo.go | 32 +++++++++++----------- meter.go | 21 ++++++++++----- options.go | 15 ++++++++--- subscriber.go | 73 +++++++++++++++++++++++++++++++++++++++------------ 6 files changed, 134 insertions(+), 68 deletions(-) diff --git a/go.mod b/go.mod index b65849b..efc6be2 100644 --- a/go.mod +++ b/go.mod @@ -1,21 +1,24 @@ module go.unistack.org/micro-broker-kgo/v3 -go 1.19 +go 1.21 + +toolchain go1.22.1 require ( github.com/google/uuid v1.6.0 github.com/twmb/franz-go v1.16.1 + github.com/twmb/franz-go/pkg/kadm v1.11.0 github.com/twmb/franz-go/pkg/kmsg v1.7.0 - go.opentelemetry.io/otel v1.24.0 - go.unistack.org/micro/v3 v3.10.46 + go.opentelemetry.io/otel v1.25.0 + go.unistack.org/micro/v3 v3.10.58 ) require ( - github.com/golang/protobuf v1.5.4 // indirect - github.com/klauspost/compress v1.17.7 // indirect + github.com/klauspost/compress v1.17.8 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect - golang.org/x/sys v0.18.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240304212257-790db918fca8 // indirect - google.golang.org/grpc v1.62.1 // indirect + golang.org/x/crypto v0.22.0 // indirect + golang.org/x/sys v0.19.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240412170617-26222e5d3d56 // indirect + google.golang.org/grpc v1.63.2 // indirect google.golang.org/protobuf v1.33.0 // indirect ) diff --git a/go.sum b/go.sum index 6e7a5f7..9ca47d8 100644 --- a/go.sum +++ b/go.sum @@ -1,32 +1,40 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= -github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg= -github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= +github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/twmb/franz-go v1.16.1 h1:rpWc7fB9jd7TgmCyfxzenBI+QbgS8ZfJOUQE+tzPtbE= github.com/twmb/franz-go v1.16.1/go.mod h1:/pER254UPPGp/4WfGqRi+SIRGE50RSQzVubQp6+N4FA= +github.com/twmb/franz-go/pkg/kadm v1.11.0 h1:FfeWJ0qadntFpAcQt8JzNXW4dijjytZNLrzJuzzzuxA= +github.com/twmb/franz-go/pkg/kadm v1.11.0/go.mod h1:qrhkdH+SWS3ivmbqOgHbpgVHamhaKcjH0UM+uOp0M1A= github.com/twmb/franz-go/pkg/kmsg v1.7.0 h1:a457IbvezYfA5UkiBvyV3zj0Is3y1i8EJgqjJYoij2E= github.com/twmb/franz-go/pkg/kmsg v1.7.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw= -go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= -go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= -go.unistack.org/micro/v3 v3.10.46 h1:rnuEqiFkerwJmKzHmHBXRgxFemZustxWz2hRNLQQ8cU= -go.unistack.org/micro/v3 v3.10.46/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg= -golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= -golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo= -golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= -golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +go.opentelemetry.io/otel v1.25.0 h1:gldB5FfhRl7OJQbUHt/8s0a7cE8fbsPAtdpRaApKy4k= +go.opentelemetry.io/otel v1.25.0/go.mod h1:Wa2ds5NOXEMkCmUou1WA7ZBfLTHWIsp034OVD7AO+Vg= +go.unistack.org/micro/v3 v3.10.58 h1:2lIQUfb3XdVstcKz7LWm7vVs/HGTzDfG9Q902FcU3xM= +go.unistack.org/micro/v3 v3.10.58/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg= +golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30= +golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M= +golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= +golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= +golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= +golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240304212257-790db918fca8 h1:IR+hp6ypxjH24bkMfEJ0yHR21+gwPWdV+/IBrPQyn3k= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240304212257-790db918fca8/go.mod h1:UCOku4NytXMJuLQE5VuqA5lX3PcHCBo8pxNyvkf4xBs= -google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk= -google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240412170617-26222e5d3d56 h1:zviK8GX4VlMstrK3JkexM5UHjH1VOkRebH9y3jhSBGk= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240412170617-26222e5d3d56/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= +google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM= +google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA= google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/kgo.go b/kgo.go index 0f84daa..ecbb7e5 100644 --- a/kgo.go +++ b/kgo.go @@ -16,6 +16,7 @@ import ( "go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/metadata" + "go.unistack.org/micro/v3/semconv" "go.unistack.org/micro/v3/tracer" mrand "go.unistack.org/micro/v3/util/rand" ) @@ -266,7 +267,7 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br rec := &kgo.Record{Context: ctx, Key: key} rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic) msg.Header.Del(metadata.HeaderTopic) - // k.opts.Meter.Counter(broker.PublishMessageInflight, "endpoint", rec.Topic).Inc() + k.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Inc() if options.BodyOnly || k.opts.Codec.String() == "noop" { rec.Value = msg.Body for k, v := range msg.Header { @@ -282,35 +283,35 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br } if promise != nil { - // ts := time.Now() + ts := time.Now() for _, rec := range records { k.c.Produce(ctx, rec, func(r *kgo.Record, err error) { - // te := time.Since(ts) - // k.opts.Meter.Counter(broker.PublishMessageInflight, "endpoint", rec.Topic).Dec() - // k.opts.Meter.Summary(broker.PublishMessageLatencyMicroseconds, "endpoint", r.Topic).Update(te.Seconds()) - // k.opts.Meter.Histogram(broker.PublishMessageDurationSeconds, "endpoint", r.Topic).Update(te.Seconds()) + te := time.Since(ts) + k.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Dec() + k.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", rec.Topic, "topic", rec.Topic).Update(te.Seconds()) + k.opts.Meter.Histogram(semconv.PublishMessageDurationSeconds, "endpoint", rec.Topic, "topic", rec.Topic).Update(te.Seconds()) if err != nil { - // k.opts.Meter.Counter(broker.PublishMessageTotal, "endpoint", r.Topic, "status", "failure").Inc() + k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "failure").Inc() } else { - // k.opts.Meter.Counter(broker.PublishMessageTotal, "endpoint", r.Topic, "status", "success").Inc() + k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "success").Inc() } promise(r, err) }) } return nil } - // ts := time.Now() + ts := time.Now() results := k.c.ProduceSync(ctx, records...) - // te := time.Since(ts) + te := time.Since(ts) for _, result := range results { - // k.opts.Meter.Summary(broker.PublishMessageLatencyMicroseconds, "endpoint", result.Record.Topic).Update(te.Seconds()) - // k.opts.Meter.Histogram(broker.PublishMessageDurationSeconds, "endpoint", result.Record.Topic).Update(te.Seconds()) - /// k.opts.Meter.Counter(broker.PublishMessageInflight, "endpoint", result.Record.Topic).Dec() + k.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Update(te.Seconds()) + k.opts.Meter.Histogram(semconv.PublishMessageDurationSeconds, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Update(te.Seconds()) + k.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Dec() if result.Err != nil { - // k.opts.Meter.Counter(broker.PublishMessageTotal, "endpoint", result.Record.Topic, "status", "failure").Inc() + k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "failure").Inc() errs = append(errs, result.Err.Error()) } else { - // k.opts.Meter.Counter(broker.PublishMessageTotal, "endpoint", result.Record.Topic, "status", "success").Inc() + k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "success").Inc() } } @@ -388,6 +389,7 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han } sub.c = c + go sub.poll(ctx) k.Lock() diff --git a/meter.go b/meter.go index 4b3871d..a6fc7af 100644 --- a/meter.go +++ b/meter.go @@ -14,14 +14,21 @@ type hookMeter struct { } var ( - _ kgo.HookBrokerConnect = &hookMeter{} - _ kgo.HookBrokerDisconnect = &hookMeter{} - _ kgo.HookBrokerRead = &hookMeter{} - _ kgo.HookBrokerThrottle = &hookMeter{} - _ kgo.HookBrokerWrite = &hookMeter{} - _ kgo.HookFetchBatchRead = &hookMeter{} + _ kgo.HookBrokerConnect = &hookMeter{} + _ kgo.HookBrokerDisconnect = &hookMeter{} + // HookBrokerE2E + _ kgo.HookBrokerRead = &hookMeter{} + _ kgo.HookBrokerThrottle = &hookMeter{} + _ kgo.HookBrokerWrite = &hookMeter{} + _ kgo.HookFetchBatchRead = &hookMeter{} + // HookFetchRecordBuffered + // HookFetchRecordUnbuffered + _ kgo.HookGroupManageError = &hookMeter{} + // HookNewClient _ kgo.HookProduceBatchWritten = &hookMeter{} - _ kgo.HookGroupManageError = &hookMeter{} + // HookProduceRecordBuffered + // HookProduceRecordPartitioned + // HookProduceRecordUnbuffered ) const ( diff --git a/options.go b/options.go index a69011b..938bd1b 100644 --- a/options.go +++ b/options.go @@ -9,8 +9,17 @@ import ( "go.unistack.org/micro/v3/client" ) -// DefaultCommitInterval specifies how fast send commit offsets to kafka -var DefaultCommitInterval = 5 * time.Second +var ( + + // DefaultCommitInterval specifies how fast send commit offsets to kafka + DefaultCommitInterval = 5 * time.Second + + // DefaultStatsInterval specifies how fast check consumer lag + DefaultStatsInterval = 5 * time.Second + + // DefaultSubscribeMaxInflight specifies how much messages keep inflight + DefaultSubscribeMaxInflight = 100 +) type subscribeContextKey struct{} @@ -82,8 +91,6 @@ func CommitInterval(td time.Duration) broker.Option { return broker.SetOption(commitIntervalKey{}, td) } -var DefaultSubscribeMaxInflight = 10 - type subscribeMaxInflightKey struct{} // SubscribeMaxInFlight max queued messages diff --git a/subscriber.go b/subscriber.go index dc85d92..96871ad 100644 --- a/subscriber.go +++ b/subscriber.go @@ -2,12 +2,16 @@ package kgo import ( "context" + "strconv" "sync" + "time" + "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kgo" "go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/metadata" + "go.unistack.org/micro/v3/semconv" ) type tp struct { @@ -76,6 +80,41 @@ func (s *subscriber) poll(ctx context.Context) { maxInflight = n } } + + go func() { + ac := kadm.NewClient(s.c) + ticker := time.NewTicker(DefaultStatsInterval) + + for { + select { + case <-ctx.Done(): + ticker.Stop() + return + case <-ticker.C: + dgls, err := ac.Lag(ctx, s.topic) + if err != nil || !dgls.Ok() { + continue + } + + dgl, ok := dgls[s.opts.Group] + if !ok { + continue + } + lmap, ok := dgl.Lag[s.topic] + if !ok { + continue + } + + for tp := range s.consumers { + if v, ok := lmap[tp.p]; ok { + s.kopts.Meter.Counter(semconv.BrokerGroupLag, "topic", s.topic, "group", s.opts.Group, "partition", strconv.Itoa(int(tp.p)), "lag", strconv.Itoa(int(v.Lag))) + } + } + + } + } + }() + for { select { case <-ctx.Done(): @@ -169,8 +208,8 @@ func (pc *consumer) consume() { return case p := <-pc.recs: for _, record := range p.Records { - // ts := time.Now() - // pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Inc() + ts := time.Now() + pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Inc() p := eventPool.Get().(*event) p.msg.Header = nil p.msg.Body = nil @@ -187,12 +226,12 @@ func (pc *consumer) consume() { p.msg.Body = record.Value } else { if err := pc.kopts.Codec.Unmarshal(record.Value, p.msg); err != nil { - // pc.kopts.Meter.Counter(broker.SubscribeMessageTotal, "endpoint", record.Topic, "status", "failure").Inc() + pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "failure").Inc() p.err = err p.msg.Body = record.Value if eh != nil { _ = eh(p) - // pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Dec() + pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Dec() if p.ack { pc.c.MarkCommitRecords(record) } else { @@ -201,19 +240,19 @@ func (pc *consumer) consume() { return } eventPool.Put(p) - // te := time.Since(ts) - // pc.kopts.Meter.Summary(broker.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic).Update(te.Seconds()) - // pc.kopts.Meter.Histogram(broker.SubscribeMessageDurationSeconds, "endpoint", record.Topic).Update(te.Seconds()) + te := time.Since(ts) + pc.kopts.Meter.Summary(semconv.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds()) + pc.kopts.Meter.Histogram(semconv.SubscribeMessageDurationSeconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds()) continue } else { if pc.kopts.Logger.V(logger.ErrorLevel) { pc.kopts.Logger.Errorf(pc.kopts.Context, "[kgo]: failed to unmarshal: %v", err) } } - // te := time.Since(ts) - // pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Dec() - // pc.kopts.Meter.Summary(broker.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic).Update(te.Seconds()) - // pc.kopts.Meter.Histogram(broker.SubscribeMessageDurationSeconds, "endpoint", record.Topic).Update(te.Seconds()) + te := time.Since(ts) + pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Dec() + pc.kopts.Meter.Summary(semconv.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds()) + pc.kopts.Meter.Histogram(semconv.SubscribeMessageDurationSeconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds()) eventPool.Put(p) pc.kopts.Logger.Fatalf(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?") return @@ -221,11 +260,11 @@ func (pc *consumer) consume() { } err := pc.handler(p) if err == nil { - // pc.kopts.Meter.Counter(broker.SubscribeMessageTotal, "endpoint", record.Topic, "status", "success").Inc() + pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "success").Inc() } else { - // pc.kopts.Meter.Counter(broker.SubscribeMessageTotal, "endpoint", record.Topic, "status", "failure").Inc() + pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "failure").Inc() } - // pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Dec() + pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Dec() if err == nil && pc.opts.AutoAck { p.ack = true } else if err != nil { @@ -238,9 +277,9 @@ func (pc *consumer) consume() { } } } - // te := time.Since(ts) - // pc.kopts.Meter.Summary(broker.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic).Update(te.Seconds()) - // pc.kopts.Meter.Histogram(broker.SubscribeMessageDurationSeconds, "endpoint", record.Topic).Update(te.Seconds()) + te := time.Since(ts) + pc.kopts.Meter.Summary(semconv.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds()) + pc.kopts.Meter.Histogram(semconv.SubscribeMessageDurationSeconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds()) if p.ack { eventPool.Put(p) pc.c.MarkCommitRecords(record)