From 1ba02ed8adbbff201d65b5b56a5a534528df976d Mon Sep 17 00:00:00 2001 From: Evstigneev Denis Date: Thu, 19 Dec 2024 13:36:55 +0300 Subject: [PATCH] resolve conflict --- .golangci.yml | 1 - event.go | 3 +-- kgo.go | 10 +++++----- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index a5d4482..20f3144 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -10,7 +10,6 @@ linters: - unused - gosimple - govet - - goimports - prealloc - unconvert - nakedret diff --git a/event.go b/event.go index 0601001..9ac6ba9 100644 --- a/event.go +++ b/event.go @@ -10,11 +10,10 @@ import ( type event struct { msg *broker.Message err error + ctx context.Context topic string - ctx context.Context - sync.RWMutex ack bool } diff --git a/kgo.go b/kgo.go index 80c0449..86abf30 100644 --- a/kgo.go +++ b/kgo.go @@ -57,16 +57,16 @@ var DefaultRetryBackoffFn = func() func(int) time.Duration { }() type Broker struct { - c *kgo.Client + c *kgo.Client connected *atomic.Uint32 kopts []kgo.Opt - subs []*subscriber + subs []*Subscriber opts broker.Options sync.RWMutex - init bool + init bool } func (r *Broker) Live() bool { @@ -305,7 +305,7 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br k.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", rec.Topic, "topic", rec.Topic).Update(te.Seconds()) k.opts.Meter.Histogram(semconv.PublishMessageDurationSeconds, "endpoint", rec.Topic, "topic", rec.Topic).Update(te.Seconds()) // if err != nil { - k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "failure").Inc() + k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "failure").Inc() // } else { k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "success").Inc() // } @@ -325,7 +325,7 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "failure").Inc() errs = append(errs, result.Err.Error()) } // else { - k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "success").Inc() + k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "success").Inc() // } }