update deps && structs && hooks #147

Merged
vtolstov merged 10 commits from devstigneev/micro-broker-kgo:v3 into v3 2024-12-20 00:02:25 +03:00
Showing only changes of commit d5bd105cc6 - Show all commits

10
kgo.go
View File

@ -304,11 +304,11 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br
k.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Dec()
k.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", rec.Topic, "topic", rec.Topic).Update(te.Seconds())
k.opts.Meter.Histogram(semconv.PublishMessageDurationSeconds, "endpoint", rec.Topic, "topic", rec.Topic).Update(te.Seconds())
// if err != nil {
if err != nil {
k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "failure").Inc()
// } else {
} else {
k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "success").Inc()
// }
}
promise(r, err)
})
}
@ -324,9 +324,9 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br
if result.Err != nil {
k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "failure").Inc()
errs = append(errs, result.Err.Error())
} // else {
} else {
k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "success").Inc()
// }
}
}
if len(errs) > 0 {