diff --git a/tracer.go b/tracer.go index 62887d0..8f629e8 100644 --- a/tracer.go +++ b/tracer.go @@ -6,6 +6,7 @@ import ( "github.com/twmb/franz-go/pkg/kgo" semconv "go.opentelemetry.io/otel/semconv/v1.18.0" + "go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v3/tracer" ) @@ -44,8 +45,21 @@ func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) { tracer.WithSpanLabels(attrs...), tracer.WithSpanKind(tracer.SpanKindProducer), } + + if r.Context == nil { + r.Context = context.Background() + } + + md, ok := metadata.FromOutgoingContext(r.Context) + if !ok { + md = metadata.New(len(r.Headers)) + } + for _, h := range r.Headers { + md.Set(h.Key, string(h.Value)) + } + // Start the "publish" span. - ctx, _ := m.tracer.Start(r.Context, r.Topic+" publish", opts...) + ctx, _ := m.tracer.Start(metadata.NewOutgoingContext(r.Context, md), r.Topic+" publish", opts...) // Inject the span context into the record. // t.propagators.Inject(ctx, NewRecordCarrier(r)) // Update the record context. @@ -99,10 +113,18 @@ func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) { if r.Context == nil { r.Context = context.Background() } + md, ok := metadata.FromIncomingContext(r.Context) + if !ok { + md = metadata.New(len(r.Headers)) + } + for _, h := range r.Headers { + md.Set(h.Key, string(h.Value)) + } + // Extract the span context from the record. // ctx := t.propagators.Extract(r.Context, NewRecordCarrier(r)) // Start the "receive" span. - newCtx, _ := m.tracer.Start(r.Context, r.Topic+" receive", opts...) + newCtx, _ := m.tracer.Start(metadata.NewIncomingContext(r.Context, md), r.Topic+" receive", opts...) // Update the record context. r.Context = newCtx } @@ -148,6 +170,14 @@ func (m *hookTracer) WithProcessSpan(r *kgo.Record) (context.Context, tracer.Spa if r.Context == nil { r.Context = context.Background() } + md, ok := metadata.FromIncomingContext(r.Context) + if !ok { + md = metadata.New(len(r.Headers)) + } + for _, h := range r.Headers { + md.Set(h.Key, string(h.Value)) + } + // Start a new span using the provided context and options. return m.tracer.Start(r.Context, r.Topic+" process", opts...) }