added check span!=nil && conn close
Some checks failed
lint / lint (pull_request) Failing after 1m3s
test / test (pull_request) Successful in 3m27s

This commit is contained in:
2025-05-21 11:26:54 +03:00
parent 1e587b348a
commit 9dd6efddf9
2 changed files with 35 additions and 12 deletions

1
kgo.go
View File

@@ -287,6 +287,7 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br
k.Unlock() k.Unlock()
return err return err
} }
k.c.Close()
k.c = c k.c = c
k.connected.Store(1) k.connected.Store(1)
} }

View File

@@ -274,11 +274,17 @@ func (pc *consumer) consume() {
} else if pc.opts.BodyOnly { } else if pc.opts.BodyOnly {
p.msg.Body = record.Value p.msg.Body = record.Value
} else { } else {
if sp != nil {
sp.AddEvent("codec unmarshal start") sp.AddEvent("codec unmarshal start")
}
err := pc.kopts.Codec.Unmarshal(record.Value, p.msg) err := pc.kopts.Codec.Unmarshal(record.Value, p.msg)
if sp != nil {
sp.AddEvent("codec unmarshal stop") sp.AddEvent("codec unmarshal stop")
}
if err != nil { if err != nil {
if sp != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error()) sp.SetStatus(tracer.SpanStatusError, err.Error())
}
pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "failure").Inc() pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "failure").Inc()
p.err = err p.err = err
p.msg.Body = record.Value p.msg.Body = record.Value
@@ -308,17 +314,25 @@ func (pc *consumer) consume() {
eventPool.Put(p) eventPool.Put(p)
// pc.connected.Store(0) // pc.connected.Store(0)
pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?") pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?")
if sp != nil {
sp.Finish() sp.Finish()
}
return return
} }
} }
if sp != nil {
sp.AddEvent("handler start") sp.AddEvent("handler start")
}
err := pc.handler(p) err := pc.handler(p)
if sp != nil {
sp.AddEvent("handler stop") sp.AddEvent("handler stop")
}
if err == nil { if err == nil {
pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "success").Inc() pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "success").Inc()
} else { } else {
if sp != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error()) sp.SetStatus(tracer.SpanStatusError, err.Error())
}
pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "failure").Inc() pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "failure").Inc()
} }
pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Dec() pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Dec()
@@ -327,9 +341,13 @@ func (pc *consumer) consume() {
} else if err != nil { } else if err != nil {
p.err = err p.err = err
if eh != nil { if eh != nil {
if sp != nil {
sp.AddEvent("error handler start") sp.AddEvent("error handler start")
}
_ = eh(p) _ = eh(p)
if sp != nil {
sp.AddEvent("error handler stop") sp.AddEvent("error handler stop")
}
} else { } else {
if pc.kopts.Logger.V(logger.ErrorLevel) { if pc.kopts.Logger.V(logger.ErrorLevel) {
pc.kopts.Logger.Error(pc.kopts.Context, "[kgo]: subscriber error", err) pc.kopts.Logger.Error(pc.kopts.Context, "[kgo]: subscriber error", err)
@@ -346,12 +364,16 @@ func (pc *consumer) consume() {
eventPool.Put(p) eventPool.Put(p)
// pc.connected.Store(0) // pc.connected.Store(0)
pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?") pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?")
if sp != nil {
sp.SetStatus(tracer.SpanStatusError, "ErrLostMessage") sp.SetStatus(tracer.SpanStatusError, "ErrLostMessage")
sp.Finish() sp.Finish()
}
return return
} }
if sp != nil {
sp.Finish() sp.Finish()
} }
} }
} }
} }
}