massive meter usage
Some checks failed
build / test (push) Failing after 1m33s
codeql / analyze (go) (push) Failing after 1m39s
build / lint (push) Successful in 9m15s

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2024-04-13 02:24:16 +03:00
parent d559db4050
commit 39177da1d0
6 changed files with 134 additions and 68 deletions

19
go.mod
View File

@ -1,21 +1,24 @@
module go.unistack.org/micro-broker-kgo/v3 module go.unistack.org/micro-broker-kgo/v3
go 1.19 go 1.21
toolchain go1.22.1
require ( require (
github.com/google/uuid v1.6.0 github.com/google/uuid v1.6.0
github.com/twmb/franz-go v1.16.1 github.com/twmb/franz-go v1.16.1
github.com/twmb/franz-go/pkg/kadm v1.11.0
github.com/twmb/franz-go/pkg/kmsg v1.7.0 github.com/twmb/franz-go/pkg/kmsg v1.7.0
go.opentelemetry.io/otel v1.24.0 go.opentelemetry.io/otel v1.25.0
go.unistack.org/micro/v3 v3.10.46 go.unistack.org/micro/v3 v3.10.58
) )
require ( require (
github.com/golang/protobuf v1.5.4 // indirect github.com/klauspost/compress v1.17.8 // indirect
github.com/klauspost/compress v1.17.7 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect
golang.org/x/sys v0.18.0 // indirect golang.org/x/crypto v0.22.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240304212257-790db918fca8 // indirect golang.org/x/sys v0.19.0 // indirect
google.golang.org/grpc v1.62.1 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240412170617-26222e5d3d56 // indirect
google.golang.org/grpc v1.63.2 // indirect
google.golang.org/protobuf v1.33.0 // indirect google.golang.org/protobuf v1.33.0 // indirect
) )

42
go.sum
View File

@ -1,32 +1,40 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg= github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/twmb/franz-go v1.16.1 h1:rpWc7fB9jd7TgmCyfxzenBI+QbgS8ZfJOUQE+tzPtbE= github.com/twmb/franz-go v1.16.1 h1:rpWc7fB9jd7TgmCyfxzenBI+QbgS8ZfJOUQE+tzPtbE=
github.com/twmb/franz-go v1.16.1/go.mod h1:/pER254UPPGp/4WfGqRi+SIRGE50RSQzVubQp6+N4FA= github.com/twmb/franz-go v1.16.1/go.mod h1:/pER254UPPGp/4WfGqRi+SIRGE50RSQzVubQp6+N4FA=
github.com/twmb/franz-go/pkg/kadm v1.11.0 h1:FfeWJ0qadntFpAcQt8JzNXW4dijjytZNLrzJuzzzuxA=
github.com/twmb/franz-go/pkg/kadm v1.11.0/go.mod h1:qrhkdH+SWS3ivmbqOgHbpgVHamhaKcjH0UM+uOp0M1A=
github.com/twmb/franz-go/pkg/kmsg v1.7.0 h1:a457IbvezYfA5UkiBvyV3zj0Is3y1i8EJgqjJYoij2E= github.com/twmb/franz-go/pkg/kmsg v1.7.0 h1:a457IbvezYfA5UkiBvyV3zj0Is3y1i8EJgqjJYoij2E=
github.com/twmb/franz-go/pkg/kmsg v1.7.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw= github.com/twmb/franz-go/pkg/kmsg v1.7.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw=
go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= go.opentelemetry.io/otel v1.25.0 h1:gldB5FfhRl7OJQbUHt/8s0a7cE8fbsPAtdpRaApKy4k=
go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= go.opentelemetry.io/otel v1.25.0/go.mod h1:Wa2ds5NOXEMkCmUou1WA7ZBfLTHWIsp034OVD7AO+Vg=
go.unistack.org/micro/v3 v3.10.46 h1:rnuEqiFkerwJmKzHmHBXRgxFemZustxWz2hRNLQQ8cU= go.unistack.org/micro/v3 v3.10.58 h1:2lIQUfb3XdVstcKz7LWm7vVs/HGTzDfG9Q902FcU3xM=
go.unistack.org/micro/v3 v3.10.46/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg= go.unistack.org/micro/v3 v3.10.58/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg=
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30=
golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo= golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240304212257-790db918fca8 h1:IR+hp6ypxjH24bkMfEJ0yHR21+gwPWdV+/IBrPQyn3k= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240304212257-790db918fca8/go.mod h1:UCOku4NytXMJuLQE5VuqA5lX3PcHCBo8pxNyvkf4xBs= google.golang.org/genproto/googleapis/rpc v0.0.0-20240412170617-26222e5d3d56 h1:zviK8GX4VlMstrK3JkexM5UHjH1VOkRebH9y3jhSBGk=
google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk= google.golang.org/genproto/googleapis/rpc v0.0.0-20240412170617-26222e5d3d56/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE= google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM=
google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

32
kgo.go
View File

@ -16,6 +16,7 @@ import (
"go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/semconv"
"go.unistack.org/micro/v3/tracer" "go.unistack.org/micro/v3/tracer"
mrand "go.unistack.org/micro/v3/util/rand" mrand "go.unistack.org/micro/v3/util/rand"
) )
@ -266,7 +267,7 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br
rec := &kgo.Record{Context: ctx, Key: key} rec := &kgo.Record{Context: ctx, Key: key}
rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic) rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic)
msg.Header.Del(metadata.HeaderTopic) msg.Header.Del(metadata.HeaderTopic)
// k.opts.Meter.Counter(broker.PublishMessageInflight, "endpoint", rec.Topic).Inc() k.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Inc()
if options.BodyOnly || k.opts.Codec.String() == "noop" { if options.BodyOnly || k.opts.Codec.String() == "noop" {
rec.Value = msg.Body rec.Value = msg.Body
for k, v := range msg.Header { for k, v := range msg.Header {
@ -282,35 +283,35 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br
} }
if promise != nil { if promise != nil {
// ts := time.Now() ts := time.Now()
for _, rec := range records { for _, rec := range records {
k.c.Produce(ctx, rec, func(r *kgo.Record, err error) { k.c.Produce(ctx, rec, func(r *kgo.Record, err error) {
// te := time.Since(ts) te := time.Since(ts)
// k.opts.Meter.Counter(broker.PublishMessageInflight, "endpoint", rec.Topic).Dec() k.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Dec()
// k.opts.Meter.Summary(broker.PublishMessageLatencyMicroseconds, "endpoint", r.Topic).Update(te.Seconds()) k.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", rec.Topic, "topic", rec.Topic).Update(te.Seconds())
// k.opts.Meter.Histogram(broker.PublishMessageDurationSeconds, "endpoint", r.Topic).Update(te.Seconds()) k.opts.Meter.Histogram(semconv.PublishMessageDurationSeconds, "endpoint", rec.Topic, "topic", rec.Topic).Update(te.Seconds())
if err != nil { if err != nil {
// k.opts.Meter.Counter(broker.PublishMessageTotal, "endpoint", r.Topic, "status", "failure").Inc() k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "failure").Inc()
} else { } else {
// k.opts.Meter.Counter(broker.PublishMessageTotal, "endpoint", r.Topic, "status", "success").Inc() k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "success").Inc()
} }
promise(r, err) promise(r, err)
}) })
} }
return nil return nil
} }
// ts := time.Now() ts := time.Now()
results := k.c.ProduceSync(ctx, records...) results := k.c.ProduceSync(ctx, records...)
// te := time.Since(ts) te := time.Since(ts)
for _, result := range results { for _, result := range results {
// k.opts.Meter.Summary(broker.PublishMessageLatencyMicroseconds, "endpoint", result.Record.Topic).Update(te.Seconds()) k.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Update(te.Seconds())
// k.opts.Meter.Histogram(broker.PublishMessageDurationSeconds, "endpoint", result.Record.Topic).Update(te.Seconds()) k.opts.Meter.Histogram(semconv.PublishMessageDurationSeconds, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Update(te.Seconds())
/// k.opts.Meter.Counter(broker.PublishMessageInflight, "endpoint", result.Record.Topic).Dec() k.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Dec()
if result.Err != nil { if result.Err != nil {
// k.opts.Meter.Counter(broker.PublishMessageTotal, "endpoint", result.Record.Topic, "status", "failure").Inc() k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "failure").Inc()
errs = append(errs, result.Err.Error()) errs = append(errs, result.Err.Error())
} else { } else {
// k.opts.Meter.Counter(broker.PublishMessageTotal, "endpoint", result.Record.Topic, "status", "success").Inc() k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "success").Inc()
} }
} }
@ -388,6 +389,7 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han
} }
sub.c = c sub.c = c
go sub.poll(ctx) go sub.poll(ctx)
k.Lock() k.Lock()

View File

@ -14,14 +14,21 @@ type hookMeter struct {
} }
var ( var (
_ kgo.HookBrokerConnect = &hookMeter{} _ kgo.HookBrokerConnect = &hookMeter{}
_ kgo.HookBrokerDisconnect = &hookMeter{} _ kgo.HookBrokerDisconnect = &hookMeter{}
_ kgo.HookBrokerRead = &hookMeter{} // HookBrokerE2E
_ kgo.HookBrokerThrottle = &hookMeter{} _ kgo.HookBrokerRead = &hookMeter{}
_ kgo.HookBrokerWrite = &hookMeter{} _ kgo.HookBrokerThrottle = &hookMeter{}
_ kgo.HookFetchBatchRead = &hookMeter{} _ kgo.HookBrokerWrite = &hookMeter{}
_ kgo.HookFetchBatchRead = &hookMeter{}
// HookFetchRecordBuffered
// HookFetchRecordUnbuffered
_ kgo.HookGroupManageError = &hookMeter{}
// HookNewClient
_ kgo.HookProduceBatchWritten = &hookMeter{} _ kgo.HookProduceBatchWritten = &hookMeter{}
_ kgo.HookGroupManageError = &hookMeter{} // HookProduceRecordBuffered
// HookProduceRecordPartitioned
// HookProduceRecordUnbuffered
) )
const ( const (

View File

@ -9,8 +9,17 @@ import (
"go.unistack.org/micro/v3/client" "go.unistack.org/micro/v3/client"
) )
// DefaultCommitInterval specifies how fast send commit offsets to kafka var (
var DefaultCommitInterval = 5 * time.Second
// DefaultCommitInterval specifies how fast send commit offsets to kafka
DefaultCommitInterval = 5 * time.Second
// DefaultStatsInterval specifies how fast check consumer lag
DefaultStatsInterval = 5 * time.Second
// DefaultSubscribeMaxInflight specifies how much messages keep inflight
DefaultSubscribeMaxInflight = 100
)
type subscribeContextKey struct{} type subscribeContextKey struct{}
@ -82,8 +91,6 @@ func CommitInterval(td time.Duration) broker.Option {
return broker.SetOption(commitIntervalKey{}, td) return broker.SetOption(commitIntervalKey{}, td)
} }
var DefaultSubscribeMaxInflight = 10
type subscribeMaxInflightKey struct{} type subscribeMaxInflightKey struct{}
// SubscribeMaxInFlight max queued messages // SubscribeMaxInFlight max queued messages

View File

@ -2,12 +2,16 @@ package kgo
import ( import (
"context" "context"
"strconv"
"sync" "sync"
"time"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kgo"
"go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/semconv"
) )
type tp struct { type tp struct {
@ -76,6 +80,41 @@ func (s *subscriber) poll(ctx context.Context) {
maxInflight = n maxInflight = n
} }
} }
go func() {
ac := kadm.NewClient(s.c)
ticker := time.NewTicker(DefaultStatsInterval)
for {
select {
case <-ctx.Done():
ticker.Stop()
return
case <-ticker.C:
dgls, err := ac.Lag(ctx, s.topic)
if err != nil || !dgls.Ok() {
continue
}
dgl, ok := dgls[s.opts.Group]
if !ok {
continue
}
lmap, ok := dgl.Lag[s.topic]
if !ok {
continue
}
for tp := range s.consumers {
if v, ok := lmap[tp.p]; ok {
s.kopts.Meter.Counter(semconv.BrokerGroupLag, "topic", s.topic, "group", s.opts.Group, "partition", strconv.Itoa(int(tp.p)), "lag", strconv.Itoa(int(v.Lag)))
}
}
}
}
}()
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -169,8 +208,8 @@ func (pc *consumer) consume() {
return return
case p := <-pc.recs: case p := <-pc.recs:
for _, record := range p.Records { for _, record := range p.Records {
// ts := time.Now() ts := time.Now()
// pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Inc() pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Inc()
p := eventPool.Get().(*event) p := eventPool.Get().(*event)
p.msg.Header = nil p.msg.Header = nil
p.msg.Body = nil p.msg.Body = nil
@ -187,12 +226,12 @@ func (pc *consumer) consume() {
p.msg.Body = record.Value p.msg.Body = record.Value
} else { } else {
if err := pc.kopts.Codec.Unmarshal(record.Value, p.msg); err != nil { if err := pc.kopts.Codec.Unmarshal(record.Value, p.msg); err != nil {
// pc.kopts.Meter.Counter(broker.SubscribeMessageTotal, "endpoint", record.Topic, "status", "failure").Inc() pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "failure").Inc()
p.err = err p.err = err
p.msg.Body = record.Value p.msg.Body = record.Value
if eh != nil { if eh != nil {
_ = eh(p) _ = eh(p)
// pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Dec() pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Dec()
if p.ack { if p.ack {
pc.c.MarkCommitRecords(record) pc.c.MarkCommitRecords(record)
} else { } else {
@ -201,19 +240,19 @@ func (pc *consumer) consume() {
return return
} }
eventPool.Put(p) eventPool.Put(p)
// te := time.Since(ts) te := time.Since(ts)
// pc.kopts.Meter.Summary(broker.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic).Update(te.Seconds()) pc.kopts.Meter.Summary(semconv.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds())
// pc.kopts.Meter.Histogram(broker.SubscribeMessageDurationSeconds, "endpoint", record.Topic).Update(te.Seconds()) pc.kopts.Meter.Histogram(semconv.SubscribeMessageDurationSeconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds())
continue continue
} else { } else {
if pc.kopts.Logger.V(logger.ErrorLevel) { if pc.kopts.Logger.V(logger.ErrorLevel) {
pc.kopts.Logger.Errorf(pc.kopts.Context, "[kgo]: failed to unmarshal: %v", err) pc.kopts.Logger.Errorf(pc.kopts.Context, "[kgo]: failed to unmarshal: %v", err)
} }
} }
// te := time.Since(ts) te := time.Since(ts)
// pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Dec() pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Dec()
// pc.kopts.Meter.Summary(broker.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic).Update(te.Seconds()) pc.kopts.Meter.Summary(semconv.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds())
// pc.kopts.Meter.Histogram(broker.SubscribeMessageDurationSeconds, "endpoint", record.Topic).Update(te.Seconds()) pc.kopts.Meter.Histogram(semconv.SubscribeMessageDurationSeconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds())
eventPool.Put(p) eventPool.Put(p)
pc.kopts.Logger.Fatalf(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?") pc.kopts.Logger.Fatalf(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?")
return return
@ -221,11 +260,11 @@ func (pc *consumer) consume() {
} }
err := pc.handler(p) err := pc.handler(p)
if err == nil { if err == nil {
// pc.kopts.Meter.Counter(broker.SubscribeMessageTotal, "endpoint", record.Topic, "status", "success").Inc() pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "success").Inc()
} else { } else {
// pc.kopts.Meter.Counter(broker.SubscribeMessageTotal, "endpoint", record.Topic, "status", "failure").Inc() pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "failure").Inc()
} }
// pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Dec() pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Dec()
if err == nil && pc.opts.AutoAck { if err == nil && pc.opts.AutoAck {
p.ack = true p.ack = true
} else if err != nil { } else if err != nil {
@ -238,9 +277,9 @@ func (pc *consumer) consume() {
} }
} }
} }
// te := time.Since(ts) te := time.Since(ts)
// pc.kopts.Meter.Summary(broker.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic).Update(te.Seconds()) pc.kopts.Meter.Summary(semconv.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds())
// pc.kopts.Meter.Histogram(broker.SubscribeMessageDurationSeconds, "endpoint", record.Topic).Update(te.Seconds()) pc.kopts.Meter.Histogram(semconv.SubscribeMessageDurationSeconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds())
if p.ack { if p.ack {
eventPool.Put(p) eventPool.Put(p)
pc.c.MarkCommitRecords(record) pc.c.MarkCommitRecords(record)