fixup tracing
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
parent
fe66086c40
commit
25dda1f34c
@ -222,7 +222,6 @@ func (pc *consumer) consume() {
|
||||
p.msg.Header = metadata.New(len(record.Headers))
|
||||
p.ctx = record.Context
|
||||
sp, _ := tracer.SpanFromContext(p.ctx)
|
||||
defer sp.Finish()
|
||||
for _, hdr := range record.Headers {
|
||||
p.msg.Header.Set(hdr.Key, string(hdr.Value))
|
||||
}
|
||||
@ -265,6 +264,7 @@ func (pc *consumer) consume() {
|
||||
pc.kopts.Meter.Histogram(semconv.SubscribeMessageDurationSeconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds())
|
||||
eventPool.Put(p)
|
||||
pc.kopts.Logger.Fatalf(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?")
|
||||
sp.Finish()
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -283,7 +283,9 @@ func (pc *consumer) consume() {
|
||||
} else if err != nil {
|
||||
p.err = err
|
||||
if eh != nil {
|
||||
sp.AddEvent("error handler start")
|
||||
_ = eh(p)
|
||||
sp.AddEvent("error handler stop")
|
||||
} else {
|
||||
if pc.kopts.Logger.V(logger.ErrorLevel) {
|
||||
pc.kopts.Logger.Errorf(pc.kopts.Context, "[kgo]: subscriber error: %v", err)
|
||||
@ -299,8 +301,11 @@ func (pc *consumer) consume() {
|
||||
} else {
|
||||
eventPool.Put(p)
|
||||
pc.kopts.Logger.Fatalf(pc.kopts.Context, "[kgo] ErrLostMessage wtf?")
|
||||
sp.SetStatus(tracer.SpanStatusError, "ErrLostMessage")
|
||||
sp.Finish()
|
||||
return
|
||||
}
|
||||
sp.Finish()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user