From ff6a27259484c0bac327b5cba857427a515c289a Mon Sep 17 00:00:00 2001 From: Evstigneev Denis Date: Thu, 19 Dec 2024 13:21:32 +0300 Subject: [PATCH] update deps && structs && hooks --- .golangci.yml | 17 ++++++++++++ event.go | 11 +++++--- kgo.go | 32 +++++++++++----------- meter.go | 2 +- subscriber.go | 74 ++++++++++++++++++++++++++++----------------------- tracer.go | 2 +- 6 files changed, 83 insertions(+), 55 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 2bb1c30..a5d4482 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -3,3 +3,20 @@ run: deadline: 5m issues-exit-code: 1 tests: true + +linters: + enable: + - staticcheck + - unused + - gosimple + - govet + - goimports + - prealloc + - unconvert + - nakedret + +linters-settings: + govet: + check-all: true + enable: + - fieldalignment \ No newline at end of file diff --git a/event.go b/event.go index c701616..0601001 100644 --- a/event.go +++ b/event.go @@ -8,11 +8,14 @@ import ( ) type event struct { - ctx context.Context - topic string - err error - sync.RWMutex msg *broker.Message + err error + + topic string + + ctx context.Context + + sync.RWMutex ack bool } diff --git a/kgo.go b/kgo.go index 860dfa0..80c0449 100644 --- a/kgo.go +++ b/kgo.go @@ -5,7 +5,7 @@ import ( "context" "errors" "fmt" - "math/rand" + "math/rand/v2" "net/http" "strings" "sync" @@ -29,7 +29,6 @@ var ErrLostMessage = errors.New("message not marked for offsets commit and will var DefaultRetryBackoffFn = func() func(int) time.Duration { var rngMu sync.Mutex - rng := rand.New(rand.NewSource(time.Now().UnixNano())) return func(fails int) time.Duration { const ( min = 100 * time.Millisecond @@ -45,7 +44,7 @@ var DefaultRetryBackoffFn = func() func(int) time.Duration { backoff := min * time.Duration(1<<(fails-1)) rngMu.Lock() - jitter := 0.8 + 0.4*rng.Float64() + jitter := 0.8 + 0.4*rand.Float64() rngMu.Unlock() backoff = time.Duration(float64(backoff) * jitter) @@ -58,13 +57,16 @@ var DefaultRetryBackoffFn = func() func(int) time.Duration { }() type Broker struct { - init bool - c *kgo.Client - kopts []kgo.Opt + c *kgo.Client connected *atomic.Uint32 - sync.RWMutex + + kopts []kgo.Opt + subs []*subscriber + opts broker.Options - subs []*Subscriber + + sync.RWMutex + init bool } func (r *Broker) Live() bool { @@ -302,11 +304,11 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br k.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Dec() k.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", rec.Topic, "topic", rec.Topic).Update(te.Seconds()) k.opts.Meter.Histogram(semconv.PublishMessageDurationSeconds, "endpoint", rec.Topic, "topic", rec.Topic).Update(te.Seconds()) - if err != nil { + // if err != nil { k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "failure").Inc() - } else { - k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "success").Inc() - } + // } else { + k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "success").Inc() + // } promise(r, err) }) } @@ -322,9 +324,9 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br if result.Err != nil { k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "failure").Inc() errs = append(errs, result.Err.Error()) - } else { + } // else { k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "success").Inc() - } + // } } if len(errs) > 0 { @@ -350,7 +352,7 @@ func (k *Broker) TopicExists(ctx context.Context, topic string) error { return nil } -func (k *Broker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { +func (k *Broker) BatchSubscribe(_ context.Context, _ string, _ broker.BatchHandler, _ ...broker.SubscribeOption) (broker.Subscriber, error) { return nil, nil } diff --git a/meter.go b/meter.go index a6fc7af..54a6831 100644 --- a/meter.go +++ b/meter.go @@ -61,7 +61,7 @@ const ( labelTopic = "topic" ) -func (m *hookMeter) OnGroupManageError(err error) { +func (m *hookMeter) OnGroupManageError(_ error) { m.meter.Counter(metricBrokerGroupErrors).Inc() } diff --git a/subscriber.go b/subscriber.go index 56926c5..01e8ffe 100644 --- a/subscriber.go +++ b/subscriber.go @@ -22,29 +22,35 @@ type tp struct { } type consumer struct { - c *kgo.Client - topic string - partition int32 + topic string + + c *kgo.Client htracer *hookTracer - opts broker.SubscribeOptions - kopts broker.Options - handler broker.Handler - quit chan struct{} - done chan struct{} - recs chan kgo.FetchTopicPartition + + handler broker.Handler + quit chan struct{} + done chan struct{} + recs chan kgo.FetchTopicPartition + + kopts broker.Options + opts broker.SubscribeOptions + + partition int32 } type Subscriber struct { - c *kgo.Client - topic string - htracer *hookTracer - opts broker.SubscribeOptions - kopts broker.Options - handler broker.Handler - closed bool - done chan struct{} consumers map[tp]*consumer + c *kgo.Client + htracer *hookTracer + topic string + + handler broker.Handler + done chan struct{} + kopts broker.Options + opts broker.SubscribeOptions + sync.RWMutex + closed bool } func (s *Subscriber) Client() *kgo.Client { @@ -63,21 +69,21 @@ func (s *Subscriber) Unsubscribe(ctx context.Context) error { if s.closed { return nil } - select { + // select { // case <-ctx.Done(): // return ctx.Err() - default: - s.c.PauseFetchTopics(s.topic) - s.c.CloseAllowingRebalance() - kc := make(map[string][]int32) - for ctp := range s.consumers { - kc[ctp.t] = append(kc[ctp.t], ctp.p) - } - s.killConsumers(ctx, kc) - close(s.done) - s.closed = true - s.c.ResumeFetchTopics(s.topic) + // default: + s.c.PauseFetchTopics(s.topic) + s.c.CloseAllowingRebalance() + kc := make(map[string][]int32) + for ctp := range s.consumers { + kc[ctp.t] = append(kc[ctp.t], ctp.p) } + s.killConsumers(ctx, kc) + close(s.done) + s.closed = true + s.c.ResumeFetchTopics(s.topic) + // } return nil } @@ -141,8 +147,8 @@ func (s *Subscriber) poll(ctx context.Context) { }) fetches.EachPartition(func(p kgo.FetchTopicPartition) { - tp := tp{p.Topic, p.Partition} - s.consumers[tp].recs <- p + nTp := tp{p.Topic, p.Partition} + s.consumers[nTp].recs <- p }) s.c.AllowRebalance() } @@ -155,9 +161,9 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32) for topic, partitions := range lost { for _, partition := range partitions { - tp := tp{topic, partition} - pc := s.consumers[tp] - delete(s.consumers, tp) + nTp := tp{topic, partition} + pc := s.consumers[nTp] + delete(s.consumers, nTp) close(pc.quit) if s.kopts.Logger.V(logger.DebugLevel) { s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] waiting for work to finish topic %s partition %d", topic, partition)) diff --git a/tracer.go b/tracer.go index 4011477..4fa1212 100644 --- a/tracer.go +++ b/tracer.go @@ -11,9 +11,9 @@ import ( ) type hookTracer struct { + tracer tracer.Tracer clientID string group string - tracer tracer.Tracer } var messagingSystem = semconv.MessagingSystemKey.String("kafka")