From 1018abe7bddf0c629f3e49a7dde44b383066ccc2 Mon Sep 17 00:00:00 2001 From: Evstigneev Denis Date: Thu, 19 Dec 2024 13:21:32 +0300 Subject: [PATCH 1/4] update deps && structs && hooks --- .gitignore | 1 + .golangci.yml | 22 +++++++++++ event.go | 14 +++++-- go.mod | 25 ++++++------ go.sum | 58 ++++++++++++++++------------ kgo.go | 53 +++++++++++++++++--------- kgo_test.go | 2 +- logger.go | 7 +--- meter.go | 2 +- subscriber.go | 103 +++++++++++++++++++++++++++----------------------- tracer.go | 2 +- 11 files changed, 177 insertions(+), 112 deletions(-) create mode 100644 .golangci.yml diff --git a/.gitignore b/.gitignore index 66fd13c..d1baad6 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,4 @@ # Dependency directories (remove the comment below to include it) # vendor/ +.idea \ No newline at end of file diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000..a5d4482 --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,22 @@ +run: + concurrency: 8 + deadline: 5m + issues-exit-code: 1 + tests: true + +linters: + enable: + - staticcheck + - unused + - gosimple + - govet + - goimports + - prealloc + - unconvert + - nakedret + +linters-settings: + govet: + check-all: true + enable: + - fieldalignment \ No newline at end of file diff --git a/event.go b/event.go index 6dbf963..ea18bf3 100644 --- a/event.go +++ b/event.go @@ -1,19 +1,27 @@ package kgo import ( + "context" "sync" "go.unistack.org/micro/v3/broker" ) type event struct { - topic string - err error - sync.RWMutex msg *broker.Message + err error + + topic string + + sync.RWMutex ack bool } +func (p *event) Context() context.Context { + //TODO implement me + return context.Background() +} + func (p *event) Topic() string { return p.topic } diff --git a/go.mod b/go.mod index b65849b..ec84eea 100644 --- a/go.mod +++ b/go.mod @@ -1,21 +1,24 @@ module go.unistack.org/micro-broker-kgo/v3 -go 1.19 +go 1.22.0 + +toolchain go1.23.3 require ( github.com/google/uuid v1.6.0 - github.com/twmb/franz-go v1.16.1 - github.com/twmb/franz-go/pkg/kmsg v1.7.0 - go.opentelemetry.io/otel v1.24.0 - go.unistack.org/micro/v3 v3.10.46 + github.com/twmb/franz-go v1.18.0 + github.com/twmb/franz-go/pkg/kmsg v1.9.0 + go.opentelemetry.io/otel v1.33.0 + go.unistack.org/micro/v3 v3.11.20 ) require ( - github.com/golang/protobuf v1.5.4 // indirect - github.com/klauspost/compress v1.17.7 // indirect + github.com/klauspost/compress v1.17.8 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect - golang.org/x/sys v0.18.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240304212257-790db918fca8 // indirect - google.golang.org/grpc v1.62.1 // indirect - google.golang.org/protobuf v1.33.0 // indirect + go.unistack.org/micro-proto/v3 v3.4.1 // indirect + golang.org/x/crypto v0.31.0 // indirect + golang.org/x/sys v0.28.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 // indirect + google.golang.org/grpc v1.69.2 // indirect + google.golang.org/protobuf v1.36.0 // indirect ) diff --git a/go.sum b/go.sum index 6e7a5f7..8a27de0 100644 --- a/go.sum +++ b/go.sum @@ -1,32 +1,42 @@ -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg= -github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= +github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= -github.com/twmb/franz-go v1.16.1 h1:rpWc7fB9jd7TgmCyfxzenBI+QbgS8ZfJOUQE+tzPtbE= -github.com/twmb/franz-go v1.16.1/go.mod h1:/pER254UPPGp/4WfGqRi+SIRGE50RSQzVubQp6+N4FA= -github.com/twmb/franz-go/pkg/kmsg v1.7.0 h1:a457IbvezYfA5UkiBvyV3zj0Is3y1i8EJgqjJYoij2E= -github.com/twmb/franz-go/pkg/kmsg v1.7.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw= -go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= -go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= -go.unistack.org/micro/v3 v3.10.46 h1:rnuEqiFkerwJmKzHmHBXRgxFemZustxWz2hRNLQQ8cU= -go.unistack.org/micro/v3 v3.10.46/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg= -golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= -golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo= -golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= -golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240304212257-790db918fca8 h1:IR+hp6ypxjH24bkMfEJ0yHR21+gwPWdV+/IBrPQyn3k= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240304212257-790db918fca8/go.mod h1:UCOku4NytXMJuLQE5VuqA5lX3PcHCBo8pxNyvkf4xBs= -google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk= -google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE= -google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= -google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/twmb/franz-go v1.18.0 h1:25FjMZfdozBywVX+5xrWC2W+W76i0xykKjTdEeD2ejw= +github.com/twmb/franz-go v1.18.0/go.mod h1:zXCGy74M0p5FbXsLeASdyvfLFsBvTubVqctIaa5wQ+I= +github.com/twmb/franz-go/pkg/kmsg v1.9.0 h1:JojYUph2TKAau6SBtErXpXGC7E3gg4vGZMv9xFU/B6M= +github.com/twmb/franz-go/pkg/kmsg v1.9.0/go.mod h1:CMbfazviCyY6HM0SXuG5t9vOwYDHRCSrJJyBAe5paqg= +go.opentelemetry.io/otel v1.33.0 h1:/FerN9bax5LoK51X/sI0SVYrjSE0/yUL7DpxW4K3FWw= +go.opentelemetry.io/otel v1.33.0/go.mod h1:SUUkR6csvUQl+yjReHu5uM3EtVV7MBm5FHKRlNx4I8I= +go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q= +go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo= +go.unistack.org/micro/v3 v3.11.20 h1:Foymuty1Oi1AqbTD/aKSo/0mAW3Az4SPqgPA5FqZChw= +go.unistack.org/micro/v3 v3.11.20/go.mod h1:TjF2+KJ2RG+IB4d0wnXtaF5KgqwAqy/AMh+w9gDpRHg= +golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= +golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= +golang.org/x/net v0.32.0 h1:ZqPmj8Kzc+Y6e0+skZsuACbx+wzMgo5MQsJh9Qd6aYI= +golang.org/x/net v0.32.0/go.mod h1:CwU0IoeOlnQQWJ6ioyFrfRuomB8GKF6KbYXZVyeXNfs= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 h1:Z7FRVJPSMaHQxD0uXU8WdgFh8PseLM8Q8NzhnpMrBhQ= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484/go.mod h1:lcTa1sDdWEIHMWlITnIczmw5w60CF9ffkb8Z+DVmmjA= +google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU= +google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= +google.golang.org/protobuf v1.36.0 h1:mjIs9gYtt56AzC4ZaffQuh88TZurBGhIJMBZGSxNerQ= +google.golang.org/protobuf v1.36.0/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/kgo.go b/kgo.go index 24b8e56..8ab1549 100644 --- a/kgo.go +++ b/kgo.go @@ -5,7 +5,7 @@ import ( "context" "errors" "fmt" - "math/rand" + "math/rand/v2" "strings" "sync" "time" @@ -25,7 +25,6 @@ var ErrLostMessage = errors.New("message not marked for offsets commit and will var DefaultRetryBackoffFn = func() func(int) time.Duration { var rngMu sync.Mutex - rng := rand.New(rand.NewSource(time.Now().UnixNano())) return func(fails int) time.Duration { const ( min = 100 * time.Millisecond @@ -41,7 +40,7 @@ var DefaultRetryBackoffFn = func() func(int) time.Duration { backoff := min * time.Duration(1<<(fails-1)) rngMu.Lock() - jitter := 0.8 + 0.4*rng.Float64() + jitter := 0.8 + 0.4*rand.Float64() rngMu.Unlock() backoff = time.Duration(float64(backoff) * jitter) @@ -54,13 +53,31 @@ var DefaultRetryBackoffFn = func() func(int) time.Duration { }() type Broker struct { - init bool - c *kgo.Client - kopts []kgo.Opt - connected bool - sync.RWMutex + c *kgo.Client + + kopts []kgo.Opt + subs []*subscriber + opts broker.Options - subs []*subscriber + + sync.RWMutex + connected bool + init bool +} + +func (k *Broker) Live() bool { + //TODO implement me + return true +} + +func (k *Broker) Ready() bool { + //TODO implement me + return true +} + +func (k *Broker) Health() bool { + //TODO implement me + return true } func (k *Broker) Address() string { @@ -288,11 +305,11 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br // k.opts.Meter.Counter(broker.PublishMessageInflight, "endpoint", rec.Topic).Dec() // k.opts.Meter.Summary(broker.PublishMessageLatencyMicroseconds, "endpoint", r.Topic).Update(te.Seconds()) // k.opts.Meter.Histogram(broker.PublishMessageDurationSeconds, "endpoint", r.Topic).Update(te.Seconds()) - if err != nil { - // k.opts.Meter.Counter(broker.PublishMessageTotal, "endpoint", r.Topic, "status", "failure").Inc() - } else { - // k.opts.Meter.Counter(broker.PublishMessageTotal, "endpoint", r.Topic, "status", "success").Inc() - } + // if err != nil { + // k.opts.Meter.Counter(broker.PublishMessageTotal, "endpoint", r.Topic, "status", "failure").Inc() + // } else { + // k.opts.Meter.Counter(broker.PublishMessageTotal, "endpoint", r.Topic, "status", "success").Inc() + // } promise(r, err) }) } @@ -308,9 +325,9 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br if result.Err != nil { // k.opts.Meter.Counter(broker.PublishMessageTotal, "endpoint", result.Record.Topic, "status", "failure").Inc() errs = append(errs, result.Err.Error()) - } else { - // k.opts.Meter.Counter(broker.PublishMessageTotal, "endpoint", result.Record.Topic, "status", "success").Inc() - } + } // else { + // k.opts.Meter.Counter(broker.PublishMessageTotal, "endpoint", result.Record.Topic, "status", "success").Inc() + // } } if len(errs) > 0 { @@ -320,7 +337,7 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br return nil } -func (k *Broker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { +func (k *Broker) BatchSubscribe(_ context.Context, _ string, _ broker.BatchHandler, _ ...broker.SubscribeOption) (broker.Subscriber, error) { return nil, nil } diff --git a/kgo_test.go b/kgo_test.go index 743a848..51a72df 100644 --- a/kgo_test.go +++ b/kgo_test.go @@ -50,7 +50,7 @@ func TestPubSub(t *testing.T) { t.Skip() } - if err := logger.DefaultLogger.Init(logger.WithLevel(loglevel), logger.WithCallerSkipCount(3)); err != nil { + if err := logger.DefaultLogger.Init(logger.WithLevel(loglevel), logger.WithAddCallerSkipCount(1)); err != nil { t.Fatal(err) } ctx := context.Background() diff --git a/logger.go b/logger.go index 6c71a12..05dc1b4 100644 --- a/logger.go +++ b/logger.go @@ -28,11 +28,8 @@ func (l *mlogger) Log(lvl kgo.LogLevel, msg string, args ...interface{}) { default: return } - if len(args) > 0 { - l.l.Log(l.ctx, mlvl, append([]interface{}{msg}, args...)...) - } else { - l.l.Log(l.ctx, mlvl, msg) - } + + l.l.Log(l.ctx, mlvl, msg, args...) } func (l *mlogger) Level() kgo.LogLevel { diff --git a/meter.go b/meter.go index 4b3871d..e5ff98f 100644 --- a/meter.go +++ b/meter.go @@ -54,7 +54,7 @@ const ( labelTopic = "topic" ) -func (m *hookMeter) OnGroupManageError(err error) { +func (m *hookMeter) OnGroupManageError(_ error) { m.meter.Counter(metricBrokerGroupErrors).Inc() } diff --git a/subscriber.go b/subscriber.go index dc85d92..3ef827e 100644 --- a/subscriber.go +++ b/subscriber.go @@ -2,6 +2,7 @@ package kgo import ( "context" + "fmt" "sync" "github.com/twmb/franz-go/pkg/kgo" @@ -16,27 +17,33 @@ type tp struct { } type consumer struct { - c *kgo.Client - topic string + topic string + + c *kgo.Client + + handler broker.Handler + quit chan struct{} + done chan struct{} + recs chan kgo.FetchTopicPartition + + kopts broker.Options + opts broker.SubscribeOptions + partition int32 - opts broker.SubscribeOptions - kopts broker.Options - handler broker.Handler - quit chan struct{} - done chan struct{} - recs chan kgo.FetchTopicPartition } type subscriber struct { + consumers map[tp]*consumer c *kgo.Client topic string - opts broker.SubscribeOptions - kopts broker.Options - handler broker.Handler - closed bool - done chan struct{} - consumers map[tp]*consumer + + handler broker.Handler + done chan struct{} + kopts broker.Options + opts broker.SubscribeOptions + sync.RWMutex + closed bool } func (s *subscriber) Options() broker.SubscribeOptions { @@ -51,21 +58,21 @@ func (s *subscriber) Unsubscribe(ctx context.Context) error { if s.closed { return nil } - select { + // select { // case <-ctx.Done(): // return ctx.Err() - default: - s.c.PauseFetchTopics(s.topic) - s.c.CloseAllowingRebalance() - kc := make(map[string][]int32) - for ctp := range s.consumers { - kc[ctp.t] = append(kc[ctp.t], ctp.p) - } - s.killConsumers(ctx, kc) - close(s.done) - s.closed = true - s.c.ResumeFetchTopics(s.topic) + // default: + s.c.PauseFetchTopics(s.topic) + s.c.CloseAllowingRebalance() + kc := make(map[string][]int32) + for ctp := range s.consumers { + kc[ctp.t] = append(kc[ctp.t], ctp.p) } + s.killConsumers(ctx, kc) + close(s.done) + s.closed = true + s.c.ResumeFetchTopics(s.topic) + // } return nil } @@ -90,12 +97,12 @@ func (s *subscriber) poll(ctx context.Context) { return } fetches.EachError(func(t string, p int32, err error) { - s.kopts.Logger.Fatalf(ctx, "[kgo] fetch topic %s partition %d err: %v", t, p, err) + s.kopts.Logger.Fatal(ctx, fmt.Sprintf("[kgo] fetch topic %s partition %d", t, p), err) }) fetches.EachPartition(func(p kgo.FetchTopicPartition) { - tp := tp{p.Topic, p.Partition} - s.consumers[tp].recs <- p + nTp := tp{p.Topic, p.Partition} + s.consumers[nTp].recs <- p }) s.c.AllowRebalance() } @@ -108,11 +115,11 @@ func (s *subscriber) killConsumers(ctx context.Context, lost map[string][]int32) for topic, partitions := range lost { for _, partition := range partitions { - tp := tp{topic, partition} - pc := s.consumers[tp] - delete(s.consumers, tp) + nTp := tp{topic, partition} + pc := s.consumers[nTp] + delete(s.consumers, nTp) close(pc.quit) - s.kopts.Logger.Debugf(ctx, "[kgo] waiting for work to finish topic %s partition %d", topic, partition) + s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] waiting for work to finish topic %s partition %d", topic, partition)) wg.Add(1) go func() { <-pc.done; wg.Done() }() } @@ -120,15 +127,15 @@ func (s *subscriber) killConsumers(ctx context.Context, lost map[string][]int32) } func (s *subscriber) lost(ctx context.Context, _ *kgo.Client, lost map[string][]int32) { - s.kopts.Logger.Debugf(ctx, "[kgo] lost %#+v", lost) + s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] lost %#+v", lost)) s.killConsumers(ctx, lost) } func (s *subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[string][]int32) { - s.kopts.Logger.Debugf(ctx, "[kgo] revoked %#+v", revoked) + s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] revoked %#+v", revoked)) s.killConsumers(ctx, revoked) if err := c.CommitMarkedOffsets(ctx); err != nil { - s.kopts.Logger.Errorf(ctx, "[kgo] revoked CommitMarkedOffsets err: %v", err) + s.kopts.Logger.Error(ctx, "[kgo] revoked CommitMarkedOffsets", err) } } @@ -155,8 +162,8 @@ func (s *subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[str func (pc *consumer) consume() { defer close(pc.done) - pc.kopts.Logger.Debugf(pc.kopts.Context, "starting, topic %s partition %d", pc.topic, pc.partition) - defer pc.kopts.Logger.Debugf(pc.kopts.Context, "killing, topic %s partition %d", pc.topic, pc.partition) + pc.kopts.Logger.Debug(pc.kopts.Context, fmt.Sprintf("starting, topic %s partition %d", pc.topic, pc.partition)) + defer pc.kopts.Logger.Debug(pc.kopts.Context, fmt.Sprintf("killing, topic %s partition %d", pc.topic, pc.partition)) eh := pc.kopts.ErrorHandler if pc.opts.ErrorHandler != nil { @@ -197,7 +204,7 @@ func (pc *consumer) consume() { pc.c.MarkCommitRecords(record) } else { eventPool.Put(p) - pc.kopts.Logger.Fatalf(pc.kopts.Context, "[kgo] ErrLostMessage wtf?") + pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?") return } eventPool.Put(p) @@ -207,7 +214,7 @@ func (pc *consumer) consume() { continue } else { if pc.kopts.Logger.V(logger.ErrorLevel) { - pc.kopts.Logger.Errorf(pc.kopts.Context, "[kgo]: failed to unmarshal: %v", err) + pc.kopts.Logger.Error(pc.kopts.Context, "[kgo]: failed to unmarshal", err) } } // te := time.Since(ts) @@ -215,16 +222,16 @@ func (pc *consumer) consume() { // pc.kopts.Meter.Summary(broker.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic).Update(te.Seconds()) // pc.kopts.Meter.Histogram(broker.SubscribeMessageDurationSeconds, "endpoint", record.Topic).Update(te.Seconds()) eventPool.Put(p) - pc.kopts.Logger.Fatalf(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?") + pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?") return } } err := pc.handler(p) - if err == nil { - // pc.kopts.Meter.Counter(broker.SubscribeMessageTotal, "endpoint", record.Topic, "status", "success").Inc() - } else { - // pc.kopts.Meter.Counter(broker.SubscribeMessageTotal, "endpoint", record.Topic, "status", "failure").Inc() - } + // if err == nil { + // pc.kopts.Meter.Counter(broker.SubscribeMessageTotal, "endpoint", record.Topic, "status", "success").Inc() + // } else { + // pc.kopts.Meter.Counter(broker.SubscribeMessageTotal, "endpoint", record.Topic, "status", "failure").Inc() + // } // pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Dec() if err == nil && pc.opts.AutoAck { p.ack = true @@ -234,7 +241,7 @@ func (pc *consumer) consume() { _ = eh(p) } else { if pc.kopts.Logger.V(logger.ErrorLevel) { - pc.kopts.Logger.Errorf(pc.kopts.Context, "[kgo]: subscriber error: %v", err) + pc.kopts.Logger.Error(pc.kopts.Context, "[kgo]: subscriber error", err) } } } @@ -246,7 +253,7 @@ func (pc *consumer) consume() { pc.c.MarkCommitRecords(record) } else { eventPool.Put(p) - pc.kopts.Logger.Fatalf(pc.kopts.Context, "[kgo] ErrLostMessage wtf?") + pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?") return } } diff --git a/tracer.go b/tracer.go index 0d82fdd..74933cd 100644 --- a/tracer.go +++ b/tracer.go @@ -12,9 +12,9 @@ import ( ) type hookTracer struct { + tracer tracer.Tracer clientID string group string - tracer tracer.Tracer } var ( From ff6a27259484c0bac327b5cba857427a515c289a Mon Sep 17 00:00:00 2001 From: Evstigneev Denis Date: Thu, 19 Dec 2024 13:21:32 +0300 Subject: [PATCH 2/4] update deps && structs && hooks --- .golangci.yml | 17 ++++++++++++ event.go | 11 +++++--- kgo.go | 32 +++++++++++----------- meter.go | 2 +- subscriber.go | 74 ++++++++++++++++++++++++++++----------------------- tracer.go | 2 +- 6 files changed, 83 insertions(+), 55 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 2bb1c30..a5d4482 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -3,3 +3,20 @@ run: deadline: 5m issues-exit-code: 1 tests: true + +linters: + enable: + - staticcheck + - unused + - gosimple + - govet + - goimports + - prealloc + - unconvert + - nakedret + +linters-settings: + govet: + check-all: true + enable: + - fieldalignment \ No newline at end of file diff --git a/event.go b/event.go index c701616..0601001 100644 --- a/event.go +++ b/event.go @@ -8,11 +8,14 @@ import ( ) type event struct { - ctx context.Context - topic string - err error - sync.RWMutex msg *broker.Message + err error + + topic string + + ctx context.Context + + sync.RWMutex ack bool } diff --git a/kgo.go b/kgo.go index 860dfa0..80c0449 100644 --- a/kgo.go +++ b/kgo.go @@ -5,7 +5,7 @@ import ( "context" "errors" "fmt" - "math/rand" + "math/rand/v2" "net/http" "strings" "sync" @@ -29,7 +29,6 @@ var ErrLostMessage = errors.New("message not marked for offsets commit and will var DefaultRetryBackoffFn = func() func(int) time.Duration { var rngMu sync.Mutex - rng := rand.New(rand.NewSource(time.Now().UnixNano())) return func(fails int) time.Duration { const ( min = 100 * time.Millisecond @@ -45,7 +44,7 @@ var DefaultRetryBackoffFn = func() func(int) time.Duration { backoff := min * time.Duration(1<<(fails-1)) rngMu.Lock() - jitter := 0.8 + 0.4*rng.Float64() + jitter := 0.8 + 0.4*rand.Float64() rngMu.Unlock() backoff = time.Duration(float64(backoff) * jitter) @@ -58,13 +57,16 @@ var DefaultRetryBackoffFn = func() func(int) time.Duration { }() type Broker struct { - init bool - c *kgo.Client - kopts []kgo.Opt + c *kgo.Client connected *atomic.Uint32 - sync.RWMutex + + kopts []kgo.Opt + subs []*subscriber + opts broker.Options - subs []*Subscriber + + sync.RWMutex + init bool } func (r *Broker) Live() bool { @@ -302,11 +304,11 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br k.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Dec() k.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", rec.Topic, "topic", rec.Topic).Update(te.Seconds()) k.opts.Meter.Histogram(semconv.PublishMessageDurationSeconds, "endpoint", rec.Topic, "topic", rec.Topic).Update(te.Seconds()) - if err != nil { + // if err != nil { k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "failure").Inc() - } else { - k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "success").Inc() - } + // } else { + k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "success").Inc() + // } promise(r, err) }) } @@ -322,9 +324,9 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br if result.Err != nil { k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "failure").Inc() errs = append(errs, result.Err.Error()) - } else { + } // else { k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "success").Inc() - } + // } } if len(errs) > 0 { @@ -350,7 +352,7 @@ func (k *Broker) TopicExists(ctx context.Context, topic string) error { return nil } -func (k *Broker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { +func (k *Broker) BatchSubscribe(_ context.Context, _ string, _ broker.BatchHandler, _ ...broker.SubscribeOption) (broker.Subscriber, error) { return nil, nil } diff --git a/meter.go b/meter.go index a6fc7af..54a6831 100644 --- a/meter.go +++ b/meter.go @@ -61,7 +61,7 @@ const ( labelTopic = "topic" ) -func (m *hookMeter) OnGroupManageError(err error) { +func (m *hookMeter) OnGroupManageError(_ error) { m.meter.Counter(metricBrokerGroupErrors).Inc() } diff --git a/subscriber.go b/subscriber.go index 56926c5..01e8ffe 100644 --- a/subscriber.go +++ b/subscriber.go @@ -22,29 +22,35 @@ type tp struct { } type consumer struct { - c *kgo.Client - topic string - partition int32 + topic string + + c *kgo.Client htracer *hookTracer - opts broker.SubscribeOptions - kopts broker.Options - handler broker.Handler - quit chan struct{} - done chan struct{} - recs chan kgo.FetchTopicPartition + + handler broker.Handler + quit chan struct{} + done chan struct{} + recs chan kgo.FetchTopicPartition + + kopts broker.Options + opts broker.SubscribeOptions + + partition int32 } type Subscriber struct { - c *kgo.Client - topic string - htracer *hookTracer - opts broker.SubscribeOptions - kopts broker.Options - handler broker.Handler - closed bool - done chan struct{} consumers map[tp]*consumer + c *kgo.Client + htracer *hookTracer + topic string + + handler broker.Handler + done chan struct{} + kopts broker.Options + opts broker.SubscribeOptions + sync.RWMutex + closed bool } func (s *Subscriber) Client() *kgo.Client { @@ -63,21 +69,21 @@ func (s *Subscriber) Unsubscribe(ctx context.Context) error { if s.closed { return nil } - select { + // select { // case <-ctx.Done(): // return ctx.Err() - default: - s.c.PauseFetchTopics(s.topic) - s.c.CloseAllowingRebalance() - kc := make(map[string][]int32) - for ctp := range s.consumers { - kc[ctp.t] = append(kc[ctp.t], ctp.p) - } - s.killConsumers(ctx, kc) - close(s.done) - s.closed = true - s.c.ResumeFetchTopics(s.topic) + // default: + s.c.PauseFetchTopics(s.topic) + s.c.CloseAllowingRebalance() + kc := make(map[string][]int32) + for ctp := range s.consumers { + kc[ctp.t] = append(kc[ctp.t], ctp.p) } + s.killConsumers(ctx, kc) + close(s.done) + s.closed = true + s.c.ResumeFetchTopics(s.topic) + // } return nil } @@ -141,8 +147,8 @@ func (s *Subscriber) poll(ctx context.Context) { }) fetches.EachPartition(func(p kgo.FetchTopicPartition) { - tp := tp{p.Topic, p.Partition} - s.consumers[tp].recs <- p + nTp := tp{p.Topic, p.Partition} + s.consumers[nTp].recs <- p }) s.c.AllowRebalance() } @@ -155,9 +161,9 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32) for topic, partitions := range lost { for _, partition := range partitions { - tp := tp{topic, partition} - pc := s.consumers[tp] - delete(s.consumers, tp) + nTp := tp{topic, partition} + pc := s.consumers[nTp] + delete(s.consumers, nTp) close(pc.quit) if s.kopts.Logger.V(logger.DebugLevel) { s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] waiting for work to finish topic %s partition %d", topic, partition)) diff --git a/tracer.go b/tracer.go index 4011477..4fa1212 100644 --- a/tracer.go +++ b/tracer.go @@ -11,9 +11,9 @@ import ( ) type hookTracer struct { + tracer tracer.Tracer clientID string group string - tracer tracer.Tracer } var messagingSystem = semconv.MessagingSystemKey.String("kafka") From f461bb6876a3b35a59cd578fcbfdfa8358a6951c Mon Sep 17 00:00:00 2001 From: Evstigneev Denis Date: Thu, 19 Dec 2024 13:36:55 +0300 Subject: [PATCH 3/4] resolve conflict --- .golangci.yml | 1 - event.go | 3 +-- go.sum | 16 ++++++++++++++++ kgo.go | 10 +++++----- 4 files changed, 22 insertions(+), 8 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index a5d4482..20f3144 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -10,7 +10,6 @@ linters: - unused - gosimple - govet - - goimports - prealloc - unconvert - nakedret diff --git a/event.go b/event.go index 0601001..9ac6ba9 100644 --- a/event.go +++ b/event.go @@ -10,11 +10,10 @@ import ( type event struct { msg *broker.Message err error + ctx context.Context topic string - ctx context.Context - sync.RWMutex ack bool } diff --git a/go.sum b/go.sum index bca9de9..02102a0 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,19 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/twmb/franz-go v1.18.0 h1:25FjMZfdozBywVX+5xrWC2W+W76i0xykKjTdEeD2ejw= github.com/twmb/franz-go v1.18.0/go.mod h1:zXCGy74M0p5FbXsLeASdyvfLFsBvTubVqctIaa5wQ+I= github.com/twmb/franz-go/pkg/kadm v1.14.0 h1:nAn1co1lXzJQocpzyIyOFOjUBf4WHWs5/fTprXy2IZs= @@ -18,11 +28,17 @@ go.unistack.org/micro/v3 v3.11.0 h1:usQ+8wQuOWpQd4+DGhFXSgZ+e+wOBjuT3W5GJZ02bSs= go.unistack.org/micro/v3 v3.11.0/go.mod h1:YzMldzHN9Ei+zy5t/Psu7RUWDZwUfrNYiStSQtTz90g= golang.org/x/crypto v0.29.0 h1:L5SG1JTTXupVV3n6sUqMTeWbjAyfPwoda2DLX8J8FrQ= golang.org/x/crypto v0.29.0/go.mod h1:+F4F4N5hv6v38hfeYwTdx20oUvLLc+QfrE9Ax9HtgRg= +golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= +golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.20.0 h1:gK/Kv2otX8gz+wn7Rmb3vT96ZwuoxnQlY+HlJVj7Qug= +golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4= google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a h1:hgh8P4EuoxpsuKMXX/To36nOFD7vixReXgn8lPGnt+o= google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a/go.mod h1:5uTbfoYQed2U9p3KIj2/Zzm02PYhndfdmML0qC3q3FU= google.golang.org/grpc v1.68.0 h1:aHQeeJbo8zAkAa3pRzrVjZlbz6uSfeOXlJNQM0RAbz0= google.golang.org/grpc v1.68.0/go.mod h1:fmSPC5AsjSBCK54MyHRx48kpOti1/jRfOlwEWywNjWA= google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io= google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/kgo.go b/kgo.go index 80c0449..86abf30 100644 --- a/kgo.go +++ b/kgo.go @@ -57,16 +57,16 @@ var DefaultRetryBackoffFn = func() func(int) time.Duration { }() type Broker struct { - c *kgo.Client + c *kgo.Client connected *atomic.Uint32 kopts []kgo.Opt - subs []*subscriber + subs []*Subscriber opts broker.Options sync.RWMutex - init bool + init bool } func (r *Broker) Live() bool { @@ -305,7 +305,7 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br k.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", rec.Topic, "topic", rec.Topic).Update(te.Seconds()) k.opts.Meter.Histogram(semconv.PublishMessageDurationSeconds, "endpoint", rec.Topic, "topic", rec.Topic).Update(te.Seconds()) // if err != nil { - k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "failure").Inc() + k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "failure").Inc() // } else { k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "success").Inc() // } @@ -325,7 +325,7 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "failure").Inc() errs = append(errs, result.Err.Error()) } // else { - k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "success").Inc() + k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "success").Inc() // } } From 849bbd7a096b16e62e8ee117f7575eef78e7bbab Mon Sep 17 00:00:00 2001 From: Evstigneev Denis Date: Thu, 19 Dec 2024 13:42:25 +0300 Subject: [PATCH 4/4] undo comments --- kgo.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/kgo.go b/kgo.go index 86abf30..6a96ea7 100644 --- a/kgo.go +++ b/kgo.go @@ -304,11 +304,11 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br k.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Dec() k.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", rec.Topic, "topic", rec.Topic).Update(te.Seconds()) k.opts.Meter.Histogram(semconv.PublishMessageDurationSeconds, "endpoint", rec.Topic, "topic", rec.Topic).Update(te.Seconds()) - // if err != nil { - k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "failure").Inc() - // } else { - k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "success").Inc() - // } + if err != nil { + k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "failure").Inc() + } else { + k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "success").Inc() + } promise(r, err) }) } @@ -324,9 +324,9 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br if result.Err != nil { k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "failure").Inc() errs = append(errs, result.Err.Error()) - } // else { - k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "success").Inc() - // } + } else { + k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "success").Inc() + } } if len(errs) > 0 {