From c1fa2f639d352528415962f1c31159dfce6280ee Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Fri, 27 Sep 2024 13:28:17 +0300 Subject: [PATCH] minimize memory usage, name all traces with sdk.broker Signed-off-by: Vasiliy Tolstov --- tracer.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/tracer.go b/tracer.go index ff03a4f..a1984c1 100644 --- a/tracer.go +++ b/tracer.go @@ -16,6 +16,8 @@ type hookTracer struct { tracer tracer.Tracer } +var messagingSystem = semconv.MessagingSystemKey.String("kafka") + var ( _ kgo.HookProduceRecordBuffered = (*hookTracer)(nil) _ kgo.HookProduceRecordUnbuffered = (*hookTracer)(nil) @@ -32,7 +34,7 @@ var ( func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) { // Set up span options. attrs := []interface{}{ - semconv.MessagingSystemKey.String("kafka"), + messagingSystem, semconv.MessagingDestinationKindTopic, semconv.MessagingDestinationName(r.Topic), semconv.MessagingOperationPublish, @@ -59,9 +61,9 @@ func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) { } if !ok { - r.Context, _ = m.tracer.Start(metadata.NewOutgoingContext(r.Context, md), r.Topic+" publish", opts...) + r.Context, _ = m.tracer.Start(metadata.NewOutgoingContext(r.Context, md), "sdk.broker", opts...) } else { - r.Context, _ = m.tracer.Start(r.Context, r.Topic+" publish", opts...) + r.Context, _ = m.tracer.Start(r.Context, "sdk.broker", opts...) } md, _ = metadata.FromOutgoingContext(r.Context) @@ -96,7 +98,7 @@ func (m *hookTracer) OnProduceRecordUnbuffered(r *kgo.Record, err error) { func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) { // Set up the span options. attrs := []interface{}{ - semconv.MessagingSystemKey.String("kafka"), + messagingSystem, semconv.MessagingSourceKindTopic, semconv.MessagingSourceName(r.Topic), semconv.MessagingOperationReceive, @@ -126,9 +128,9 @@ func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) { } if !ok { - r.Context, _ = m.tracer.Start(metadata.NewIncomingContext(r.Context, md), r.Topic+" receive", opts...) + r.Context, _ = m.tracer.Start(metadata.NewIncomingContext(r.Context, md), "sdk.broker", opts...) } else { - r.Context, _ = m.tracer.Start(r.Context, r.Topic+" receive", opts...) + r.Context, _ = m.tracer.Start(r.Context, "sdk.broker", opts...) } md, _ = metadata.FromIncomingContext(r.Context) @@ -155,7 +157,7 @@ func (m *hookTracer) OnFetchRecordUnbuffered(r *kgo.Record, _ bool) { func (m *hookTracer) WithProcessSpan(r *kgo.Record) (context.Context, tracer.Span) { // Set up the span options. attrs := []interface{}{ - semconv.MessagingSystemKey.String("kafka"), + messagingSystem, semconv.MessagingSourceKindTopic, semconv.MessagingSourceName(r.Topic), semconv.MessagingOperationProcess, @@ -186,7 +188,7 @@ func (m *hookTracer) WithProcessSpan(r *kgo.Record) (context.Context, tracer.Spa } // Start a new span using the provided context and options. - return m.tracer.Start(r.Context, r.Topic+" process", opts...) + return m.tracer.Start(r.Context, "sdk.broker", opts...) } func maybeKeyAttr(attrs []interface{}, r *kgo.Record) []interface{} {