tracing fixes
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
parent
d404fa31ab
commit
894d6f4f20
17
tracer.go
17
tracer.go
@ -58,12 +58,15 @@ func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) {
|
||||
md.Set(h.Key, string(h.Value))
|
||||
}
|
||||
|
||||
// Start the "publish" span.
|
||||
ctx, _ := m.tracer.Start(metadata.NewOutgoingContext(r.Context, md), r.Topic+" publish", opts...)
|
||||
// Inject the span context into the record.
|
||||
// t.propagators.Inject(ctx, NewRecordCarrier(r))
|
||||
// Update the record context.
|
||||
r.Context = ctx
|
||||
|
||||
if !ok {
|
||||
r.Context, _ = m.tracer.Start(metadata.NewOutgoingContext(r.Context, md), r.Topic+" publish", opts...)
|
||||
} else {
|
||||
r.Context, _ = m.tracer.Start(r.Context, r.Topic+" publish", opts...)
|
||||
}
|
||||
}
|
||||
|
||||
// OnProduceRecordUnbuffered continues and ends the "publish" span for an
|
||||
@ -124,9 +127,13 @@ func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) {
|
||||
// Extract the span context from the record.
|
||||
// ctx := t.propagators.Extract(r.Context, NewRecordCarrier(r))
|
||||
// Start the "receive" span.
|
||||
newCtx, _ := m.tracer.Start(metadata.NewIncomingContext(r.Context, md), r.Topic+" receive", opts...)
|
||||
if !ok {
|
||||
r.Context, _ = m.tracer.Start(metadata.NewIncomingContext(r.Context, md), r.Topic+" receive", opts...)
|
||||
} else {
|
||||
r.Context, _ = m.tracer.Start(r.Context, r.Topic+" receive", opts...)
|
||||
}
|
||||
|
||||
// Update the record context.
|
||||
r.Context = newCtx
|
||||
}
|
||||
|
||||
// OnFetchRecordUnbuffered continues and ends the "receive" span for an
|
||||
|
Loading…
Reference in New Issue
Block a user