fixup tracing
Some checks failed
build / test (push) Failing after 1m36s
codeql / analyze (go) (push) Failing after 1m42s
build / lint (push) Successful in 9m13s

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2024-05-06 08:20:27 +03:00
parent 8fcc23f639
commit 23c2903c21

View File

@ -6,6 +6,7 @@ import (
"github.com/twmb/franz-go/pkg/kgo"
semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/tracer"
)
@ -44,8 +45,21 @@ func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) {
tracer.WithSpanLabels(attrs...),
tracer.WithSpanKind(tracer.SpanKindProducer),
}
if r.Context == nil {
r.Context = context.Background()
}
md, ok := metadata.FromOutgoingContext(r.Context)
if !ok {
md = metadata.New(len(r.Headers))
}
for _, h := range r.Headers {
md.Set(h.Key, string(h.Value))
}
// Start the "publish" span.
ctx, _ := m.tracer.Start(r.Context, r.Topic+" publish", opts...)
ctx, _ := m.tracer.Start(metadata.NewOutgoingContext(r.Context, md), r.Topic+" publish", opts...)
// Inject the span context into the record.
// t.propagators.Inject(ctx, NewRecordCarrier(r))
// Update the record context.
@ -99,10 +113,18 @@ func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) {
if r.Context == nil {
r.Context = context.Background()
}
md, ok := metadata.FromIncomingContext(r.Context)
if !ok {
md = metadata.New(len(r.Headers))
}
for _, h := range r.Headers {
md.Set(h.Key, string(h.Value))
}
// Extract the span context from the record.
// ctx := t.propagators.Extract(r.Context, NewRecordCarrier(r))
// Start the "receive" span.
newCtx, _ := m.tracer.Start(r.Context, r.Topic+" receive", opts...)
newCtx, _ := m.tracer.Start(metadata.NewIncomingContext(r.Context, md), r.Topic+" receive", opts...)
// Update the record context.
r.Context = newCtx
}
@ -148,6 +170,14 @@ func (m *hookTracer) WithProcessSpan(r *kgo.Record) (context.Context, tracer.Spa
if r.Context == nil {
r.Context = context.Background()
}
md, ok := metadata.FromIncomingContext(r.Context)
if !ok {
md = metadata.New(len(r.Headers))
}
for _, h := range r.Headers {
md.Set(h.Key, string(h.Value))
}
// Start a new span using the provided context and options.
return m.tracer.Start(r.Context, r.Topic+" process", opts...)
}