From 894d6f4f20baf4c4c122a471296a592bb0158aa1 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Mon, 22 Jul 2024 01:11:33 +0300 Subject: [PATCH] tracing fixes Signed-off-by: Vasiliy Tolstov --- tracer.go | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/tracer.go b/tracer.go index 8f629e8..8734a99 100644 --- a/tracer.go +++ b/tracer.go @@ -58,12 +58,15 @@ func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) { md.Set(h.Key, string(h.Value)) } - // Start the "publish" span. - ctx, _ := m.tracer.Start(metadata.NewOutgoingContext(r.Context, md), r.Topic+" publish", opts...) // Inject the span context into the record. // t.propagators.Inject(ctx, NewRecordCarrier(r)) // Update the record context. - r.Context = ctx + + if !ok { + r.Context, _ = m.tracer.Start(metadata.NewOutgoingContext(r.Context, md), r.Topic+" publish", opts...) + } else { + r.Context, _ = m.tracer.Start(r.Context, r.Topic+" publish", opts...) + } } // OnProduceRecordUnbuffered continues and ends the "publish" span for an @@ -124,9 +127,13 @@ func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) { // Extract the span context from the record. // ctx := t.propagators.Extract(r.Context, NewRecordCarrier(r)) // Start the "receive" span. - newCtx, _ := m.tracer.Start(metadata.NewIncomingContext(r.Context, md), r.Topic+" receive", opts...) + if !ok { + r.Context, _ = m.tracer.Start(metadata.NewIncomingContext(r.Context, md), r.Topic+" receive", opts...) + } else { + r.Context, _ = m.tracer.Start(r.Context, r.Topic+" receive", opts...) + } + // Update the record context. - r.Context = newCtx } // OnFetchRecordUnbuffered continues and ends the "receive" span for an