update deps && structs && hooks #147

Merged
vtolstov merged 10 commits from devstigneev/micro-broker-kgo:v3 into v3 2024-12-20 00:02:25 +03:00
3 changed files with 6 additions and 8 deletions
Showing only changes of commit 1ba02ed8ad - Show all commits

View File

@ -10,7 +10,6 @@ linters:
- unused - unused
- gosimple - gosimple
- govet - govet
- goimports
- prealloc - prealloc
- unconvert - unconvert
- nakedret - nakedret

View File

@ -10,11 +10,10 @@ import (
type event struct { type event struct {
msg *broker.Message msg *broker.Message
err error err error
ctx context.Context
topic string topic string
ctx context.Context
sync.RWMutex sync.RWMutex
ack bool ack bool
} }

10
kgo.go
View File

@ -57,16 +57,16 @@ var DefaultRetryBackoffFn = func() func(int) time.Duration {
}() }()
type Broker struct { type Broker struct {
c *kgo.Client c *kgo.Client
connected *atomic.Uint32 connected *atomic.Uint32
kopts []kgo.Opt kopts []kgo.Opt
subs []*subscriber subs []*Subscriber
opts broker.Options opts broker.Options
sync.RWMutex sync.RWMutex
init bool init bool
} }
func (r *Broker) Live() bool { func (r *Broker) Live() bool {
@ -305,7 +305,7 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br
k.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", rec.Topic, "topic", rec.Topic).Update(te.Seconds()) k.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", rec.Topic, "topic", rec.Topic).Update(te.Seconds())
k.opts.Meter.Histogram(semconv.PublishMessageDurationSeconds, "endpoint", rec.Topic, "topic", rec.Topic).Update(te.Seconds()) k.opts.Meter.Histogram(semconv.PublishMessageDurationSeconds, "endpoint", rec.Topic, "topic", rec.Topic).Update(te.Seconds())
// if err != nil { // if err != nil {
k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "failure").Inc() k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "failure").Inc()
// } else { // } else {
k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "success").Inc() k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "success").Inc()
// } // }
@ -325,7 +325,7 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br
k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "failure").Inc() k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "failure").Inc()
errs = append(errs, result.Err.Error()) errs = append(errs, result.Err.Error())
} // else { } // else {
k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "success").Inc() k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "success").Inc()
// } // }
} }