diff --git a/subscriber.go b/subscriber.go index b9eaa0a..e94bbb6 100644 --- a/subscriber.go +++ b/subscriber.go @@ -222,7 +222,6 @@ func (pc *consumer) consume() { p.msg.Header = metadata.New(len(record.Headers)) p.ctx = record.Context sp, _ := tracer.SpanFromContext(p.ctx) - defer sp.Finish() for _, hdr := range record.Headers { p.msg.Header.Set(hdr.Key, string(hdr.Value)) } @@ -265,6 +264,7 @@ func (pc *consumer) consume() { pc.kopts.Meter.Histogram(semconv.SubscribeMessageDurationSeconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds()) eventPool.Put(p) pc.kopts.Logger.Fatalf(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?") + sp.Finish() return } } @@ -283,7 +283,9 @@ func (pc *consumer) consume() { } else if err != nil { p.err = err if eh != nil { + sp.AddEvent("error handler start") _ = eh(p) + sp.AddEvent("error handler stop") } else { if pc.kopts.Logger.V(logger.ErrorLevel) { pc.kopts.Logger.Errorf(pc.kopts.Context, "[kgo]: subscriber error: %v", err) @@ -299,8 +301,11 @@ func (pc *consumer) consume() { } else { eventPool.Put(p) pc.kopts.Logger.Fatalf(pc.kopts.Context, "[kgo] ErrLostMessage wtf?") + sp.SetStatus(tracer.SpanStatusError, "ErrLostMessage") + sp.Finish() return } + sp.Finish() } } }