Merge pull request 'added check span!=nil && conn close' (#154) from devstigneev/micro-broker-kgo:v3 into v3

Reviewed-on: #154
This commit is contained in:
2025-05-21 11:56:47 +03:00
2 changed files with 34 additions and 24 deletions

12
kgo.go
View File

@@ -280,18 +280,6 @@ func (k *Broker) Publish(ctx context.Context, topic string, msg *broker.Message,
} }
func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
k.Lock()
if k.connected.Load() == 0 {
c, _, err := k.connect(ctx, k.kopts...)
if err != nil {
k.Unlock()
return err
}
k.c = c
k.connected.Store(1)
}
k.Unlock()
options := broker.NewPublishOptions(opts...) options := broker.NewPublishOptions(opts...)
records := make([]*kgo.Record, 0, len(msgs)) records := make([]*kgo.Record, 0, len(msgs))
var errs []string var errs []string

View File

@@ -274,11 +274,17 @@ func (pc *consumer) consume() {
} else if pc.opts.BodyOnly { } else if pc.opts.BodyOnly {
p.msg.Body = record.Value p.msg.Body = record.Value
} else { } else {
sp.AddEvent("codec unmarshal start") if sp != nil {
sp.AddEvent("codec unmarshal start")
}
err := pc.kopts.Codec.Unmarshal(record.Value, p.msg) err := pc.kopts.Codec.Unmarshal(record.Value, p.msg)
sp.AddEvent("codec unmarshal stop") if sp != nil {
sp.AddEvent("codec unmarshal stop")
}
if err != nil { if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error()) if sp != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
}
pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "failure").Inc() pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "failure").Inc()
p.err = err p.err = err
p.msg.Body = record.Value p.msg.Body = record.Value
@@ -308,17 +314,25 @@ func (pc *consumer) consume() {
eventPool.Put(p) eventPool.Put(p)
// pc.connected.Store(0) // pc.connected.Store(0)
pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?") pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?")
sp.Finish() if sp != nil {
sp.Finish()
}
return return
} }
} }
sp.AddEvent("handler start") if sp != nil {
sp.AddEvent("handler start")
}
err := pc.handler(p) err := pc.handler(p)
sp.AddEvent("handler stop") if sp != nil {
sp.AddEvent("handler stop")
}
if err == nil { if err == nil {
pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "success").Inc() pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "success").Inc()
} else { } else {
sp.SetStatus(tracer.SpanStatusError, err.Error()) if sp != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
}
pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "failure").Inc() pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "failure").Inc()
} }
pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Dec() pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Dec()
@@ -327,9 +341,13 @@ func (pc *consumer) consume() {
} else if err != nil { } else if err != nil {
p.err = err p.err = err
if eh != nil { if eh != nil {
sp.AddEvent("error handler start") if sp != nil {
sp.AddEvent("error handler start")
}
_ = eh(p) _ = eh(p)
sp.AddEvent("error handler stop") if sp != nil {
sp.AddEvent("error handler stop")
}
} else { } else {
if pc.kopts.Logger.V(logger.ErrorLevel) { if pc.kopts.Logger.V(logger.ErrorLevel) {
pc.kopts.Logger.Error(pc.kopts.Context, "[kgo]: subscriber error", err) pc.kopts.Logger.Error(pc.kopts.Context, "[kgo]: subscriber error", err)
@@ -346,11 +364,15 @@ func (pc *consumer) consume() {
eventPool.Put(p) eventPool.Put(p)
// pc.connected.Store(0) // pc.connected.Store(0)
pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?") pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?")
sp.SetStatus(tracer.SpanStatusError, "ErrLostMessage") if sp != nil {
sp.Finish() sp.SetStatus(tracer.SpanStatusError, "ErrLostMessage")
sp.Finish()
}
return return
} }
sp.Finish() if sp != nil {
sp.Finish()
}
} }
} }
} }