minimize memory usage, name all traces with sdk.broker
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
parent
8e3f2c67d7
commit
c1fa2f639d
18
tracer.go
18
tracer.go
@ -16,6 +16,8 @@ type hookTracer struct {
|
|||||||
tracer tracer.Tracer
|
tracer tracer.Tracer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var messagingSystem = semconv.MessagingSystemKey.String("kafka")
|
||||||
|
|
||||||
var (
|
var (
|
||||||
_ kgo.HookProduceRecordBuffered = (*hookTracer)(nil)
|
_ kgo.HookProduceRecordBuffered = (*hookTracer)(nil)
|
||||||
_ kgo.HookProduceRecordUnbuffered = (*hookTracer)(nil)
|
_ kgo.HookProduceRecordUnbuffered = (*hookTracer)(nil)
|
||||||
@ -32,7 +34,7 @@ var (
|
|||||||
func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) {
|
func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) {
|
||||||
// Set up span options.
|
// Set up span options.
|
||||||
attrs := []interface{}{
|
attrs := []interface{}{
|
||||||
semconv.MessagingSystemKey.String("kafka"),
|
messagingSystem,
|
||||||
semconv.MessagingDestinationKindTopic,
|
semconv.MessagingDestinationKindTopic,
|
||||||
semconv.MessagingDestinationName(r.Topic),
|
semconv.MessagingDestinationName(r.Topic),
|
||||||
semconv.MessagingOperationPublish,
|
semconv.MessagingOperationPublish,
|
||||||
@ -59,9 +61,9 @@ func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
r.Context, _ = m.tracer.Start(metadata.NewOutgoingContext(r.Context, md), r.Topic+" publish", opts...)
|
r.Context, _ = m.tracer.Start(metadata.NewOutgoingContext(r.Context, md), "sdk.broker", opts...)
|
||||||
} else {
|
} else {
|
||||||
r.Context, _ = m.tracer.Start(r.Context, r.Topic+" publish", opts...)
|
r.Context, _ = m.tracer.Start(r.Context, "sdk.broker", opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
md, _ = metadata.FromOutgoingContext(r.Context)
|
md, _ = metadata.FromOutgoingContext(r.Context)
|
||||||
@ -96,7 +98,7 @@ func (m *hookTracer) OnProduceRecordUnbuffered(r *kgo.Record, err error) {
|
|||||||
func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) {
|
func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) {
|
||||||
// Set up the span options.
|
// Set up the span options.
|
||||||
attrs := []interface{}{
|
attrs := []interface{}{
|
||||||
semconv.MessagingSystemKey.String("kafka"),
|
messagingSystem,
|
||||||
semconv.MessagingSourceKindTopic,
|
semconv.MessagingSourceKindTopic,
|
||||||
semconv.MessagingSourceName(r.Topic),
|
semconv.MessagingSourceName(r.Topic),
|
||||||
semconv.MessagingOperationReceive,
|
semconv.MessagingOperationReceive,
|
||||||
@ -126,9 +128,9 @@ func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
r.Context, _ = m.tracer.Start(metadata.NewIncomingContext(r.Context, md), r.Topic+" receive", opts...)
|
r.Context, _ = m.tracer.Start(metadata.NewIncomingContext(r.Context, md), "sdk.broker", opts...)
|
||||||
} else {
|
} else {
|
||||||
r.Context, _ = m.tracer.Start(r.Context, r.Topic+" receive", opts...)
|
r.Context, _ = m.tracer.Start(r.Context, "sdk.broker", opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
md, _ = metadata.FromIncomingContext(r.Context)
|
md, _ = metadata.FromIncomingContext(r.Context)
|
||||||
@ -155,7 +157,7 @@ func (m *hookTracer) OnFetchRecordUnbuffered(r *kgo.Record, _ bool) {
|
|||||||
func (m *hookTracer) WithProcessSpan(r *kgo.Record) (context.Context, tracer.Span) {
|
func (m *hookTracer) WithProcessSpan(r *kgo.Record) (context.Context, tracer.Span) {
|
||||||
// Set up the span options.
|
// Set up the span options.
|
||||||
attrs := []interface{}{
|
attrs := []interface{}{
|
||||||
semconv.MessagingSystemKey.String("kafka"),
|
messagingSystem,
|
||||||
semconv.MessagingSourceKindTopic,
|
semconv.MessagingSourceKindTopic,
|
||||||
semconv.MessagingSourceName(r.Topic),
|
semconv.MessagingSourceName(r.Topic),
|
||||||
semconv.MessagingOperationProcess,
|
semconv.MessagingOperationProcess,
|
||||||
@ -186,7 +188,7 @@ func (m *hookTracer) WithProcessSpan(r *kgo.Record) (context.Context, tracer.Spa
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Start a new span using the provided context and options.
|
// Start a new span using the provided context and options.
|
||||||
return m.tracer.Start(r.Context, r.Topic+" process", opts...)
|
return m.tracer.Start(r.Context, "sdk.broker", opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func maybeKeyAttr(attrs []interface{}, r *kgo.Record) []interface{} {
|
func maybeKeyAttr(attrs []interface{}, r *kgo.Record) []interface{} {
|
||||||
|
Loading…
Reference in New Issue
Block a user