diff --git a/.golangci.yml b/.golangci.yml index 2bb1c30..20f3144 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -3,3 +3,19 @@ run: deadline: 5m issues-exit-code: 1 tests: true + +linters: + enable: + - staticcheck + - unused + - gosimple + - govet + - prealloc + - unconvert + - nakedret + +linters-settings: + govet: + check-all: true + enable: + - fieldalignment \ No newline at end of file diff --git a/event.go b/event.go index c701616..9ac6ba9 100644 --- a/event.go +++ b/event.go @@ -8,11 +8,13 @@ import ( ) type event struct { - ctx context.Context - topic string - err error - sync.RWMutex msg *broker.Message + err error + ctx context.Context + + topic string + + sync.RWMutex ack bool } diff --git a/go.mod b/go.mod index f8d8e81..c617500 100644 --- a/go.mod +++ b/go.mod @@ -14,13 +14,10 @@ require ( ) require ( - github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/klauspost/compress v1.17.11 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect - github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect go.unistack.org/micro-proto/v3 v3.4.1 // indirect golang.org/x/crypto v0.31.0 // indirect - golang.org/x/net v0.32.0 // indirect golang.org/x/sys v0.28.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 // indirect google.golang.org/grpc v1.69.2 // indirect diff --git a/go.sum b/go.sum index 8daf5df..832010a 100644 --- a/go.sum +++ b/go.sum @@ -1,18 +1,19 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= -github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= -github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/twmb/franz-go v1.18.0 h1:25FjMZfdozBywVX+5xrWC2W+W76i0xykKjTdEeD2ejw= github.com/twmb/franz-go v1.18.0/go.mod h1:zXCGy74M0p5FbXsLeASdyvfLFsBvTubVqctIaa5wQ+I= github.com/twmb/franz-go/pkg/kadm v1.14.0 h1:nAn1co1lXzJQocpzyIyOFOjUBf4WHWs5/fTprXy2IZs= @@ -23,11 +24,8 @@ go.opentelemetry.io/otel v1.33.0 h1:/FerN9bax5LoK51X/sI0SVYrjSE0/yUL7DpxW4K3FWw= go.opentelemetry.io/otel v1.33.0/go.mod h1:SUUkR6csvUQl+yjReHu5uM3EtVV7MBm5FHKRlNx4I8I= go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q= go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo= -go.unistack.org/micro/v3 v3.11.0 h1:usQ+8wQuOWpQd4+DGhFXSgZ+e+wOBjuT3W5GJZ02bSs= -go.unistack.org/micro/v3 v3.11.0/go.mod h1:YzMldzHN9Ei+zy5t/Psu7RUWDZwUfrNYiStSQtTz90g= go.unistack.org/micro/v3 v3.11.22 h1:VPtp/+rp/baKlNb6WVlx4ZzufYuwHrfABoftnDi1uek= go.unistack.org/micro/v3 v3.11.22/go.mod h1:TjF2+KJ2RG+IB4d0wnXtaF5KgqwAqy/AMh+w9gDpRHg= -golang.org/x/crypto v0.30.0 h1:RwoQn3GkWiMkzlX562cLB7OxWvjH1L8xutO2WoJcRoY= golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/net v0.32.0 h1:ZqPmj8Kzc+Y6e0+skZsuACbx+wzMgo5MQsJh9Qd6aYI= @@ -35,14 +33,12 @@ golang.org/x/net v0.32.0/go.mod h1:CwU0IoeOlnQQWJ6ioyFrfRuomB8GKF6KbYXZVyeXNfs= golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241209162323-e6fa225c2576 h1:8ZmaLZE4XWrtU3MyClkYqqtl6Oegr3235h7jxsDyqCY= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 h1:Z7FRVJPSMaHQxD0uXU8WdgFh8PseLM8Q8NzhnpMrBhQ= google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484/go.mod h1:lcTa1sDdWEIHMWlITnIczmw5w60CF9ffkb8Z+DVmmjA= -google.golang.org/grpc v1.68.1 h1:oI5oTa11+ng8r8XMMN7jAOmWfPZWbYpCFaMUTACxkM0= google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU= google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= -google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io= -google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= google.golang.org/protobuf v1.36.0 h1:mjIs9gYtt56AzC4ZaffQuh88TZurBGhIJMBZGSxNerQ= google.golang.org/protobuf v1.36.0/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/kgo.go b/kgo.go index 860dfa0..6a96ea7 100644 --- a/kgo.go +++ b/kgo.go @@ -5,7 +5,7 @@ import ( "context" "errors" "fmt" - "math/rand" + "math/rand/v2" "net/http" "strings" "sync" @@ -29,7 +29,6 @@ var ErrLostMessage = errors.New("message not marked for offsets commit and will var DefaultRetryBackoffFn = func() func(int) time.Duration { var rngMu sync.Mutex - rng := rand.New(rand.NewSource(time.Now().UnixNano())) return func(fails int) time.Duration { const ( min = 100 * time.Millisecond @@ -45,7 +44,7 @@ var DefaultRetryBackoffFn = func() func(int) time.Duration { backoff := min * time.Duration(1<<(fails-1)) rngMu.Lock() - jitter := 0.8 + 0.4*rng.Float64() + jitter := 0.8 + 0.4*rand.Float64() rngMu.Unlock() backoff = time.Duration(float64(backoff) * jitter) @@ -58,13 +57,16 @@ var DefaultRetryBackoffFn = func() func(int) time.Duration { }() type Broker struct { - init bool c *kgo.Client - kopts []kgo.Opt connected *atomic.Uint32 - sync.RWMutex + + kopts []kgo.Opt + subs []*Subscriber + opts broker.Options - subs []*Subscriber + + sync.RWMutex + init bool } func (r *Broker) Live() bool { @@ -350,7 +352,7 @@ func (k *Broker) TopicExists(ctx context.Context, topic string) error { return nil } -func (k *Broker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { +func (k *Broker) BatchSubscribe(_ context.Context, _ string, _ broker.BatchHandler, _ ...broker.SubscribeOption) (broker.Subscriber, error) { return nil, nil } diff --git a/meter.go b/meter.go index a6fc7af..54a6831 100644 --- a/meter.go +++ b/meter.go @@ -61,7 +61,7 @@ const ( labelTopic = "topic" ) -func (m *hookMeter) OnGroupManageError(err error) { +func (m *hookMeter) OnGroupManageError(_ error) { m.meter.Counter(metricBrokerGroupErrors).Inc() } diff --git a/subscriber.go b/subscriber.go index da210a4..36b4929 100644 --- a/subscriber.go +++ b/subscriber.go @@ -22,29 +22,35 @@ type tp struct { } type consumer struct { - c *kgo.Client - topic string - partition int32 + topic string + + c *kgo.Client htracer *hookTracer - opts broker.SubscribeOptions - kopts broker.Options - handler broker.Handler - quit chan struct{} - done chan struct{} - recs chan kgo.FetchTopicPartition + + handler broker.Handler + quit chan struct{} + done chan struct{} + recs chan kgo.FetchTopicPartition + + kopts broker.Options + opts broker.SubscribeOptions + + partition int32 } type Subscriber struct { - c *kgo.Client - topic string - htracer *hookTracer - opts broker.SubscribeOptions - kopts broker.Options - handler broker.Handler - closed bool - done chan struct{} consumers map[tp]*consumer + c *kgo.Client + htracer *hookTracer + topic string + + handler broker.Handler + done chan struct{} + kopts broker.Options + opts broker.SubscribeOptions + sync.RWMutex + closed bool } func (s *Subscriber) Client() *kgo.Client { @@ -138,8 +144,8 @@ func (s *Subscriber) poll(ctx context.Context) { }) fetches.EachPartition(func(p kgo.FetchTopicPartition) { - tp := tp{p.Topic, p.Partition} - s.consumers[tp].recs <- p + nTp := tp{p.Topic, p.Partition} + s.consumers[nTp].recs <- p }) s.c.AllowRebalance() } @@ -152,9 +158,9 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32) for topic, partitions := range lost { for _, partition := range partitions { - tp := tp{topic, partition} - pc := s.consumers[tp] - delete(s.consumers, tp) + nTp := tp{topic, partition} + pc := s.consumers[nTp] + delete(s.consumers, nTp) close(pc.quit) if s.kopts.Logger.V(logger.DebugLevel) { s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] waiting for work to finish topic %s partition %d", topic, partition)) diff --git a/tracer.go b/tracer.go index 4011477..4fa1212 100644 --- a/tracer.go +++ b/tracer.go @@ -11,9 +11,9 @@ import ( ) type hookTracer struct { + tracer tracer.Tracer clientID string group string - tracer tracer.Tracer } var messagingSystem = semconv.MessagingSystemKey.String("kafka")