Compare commits
3 Commits
Author | SHA1 | Date | |
---|---|---|---|
9c4d88bb69 | |||
56288f46b1 | |||
81dcef8b28 |
29
tracer.go
29
tracer.go
@@ -7,9 +7,8 @@ import (
|
|||||||
"unicode/utf8"
|
"unicode/utf8"
|
||||||
|
|
||||||
"github.com/twmb/franz-go/pkg/kgo"
|
"github.com/twmb/franz-go/pkg/kgo"
|
||||||
"go.unistack.org/micro/v3/tracer"
|
|
||||||
"go.opentelemetry.io/otel/attribute"
|
|
||||||
semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
|
semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
|
||||||
|
"go.unistack.org/micro/v3/tracer"
|
||||||
)
|
)
|
||||||
|
|
||||||
type hookTracer struct {
|
type hookTracer struct {
|
||||||
@@ -61,7 +60,7 @@ func (m *hookTracer) OnFetchBatchRead(meta kgo.BrokerMetadata, topic string, _ i
|
|||||||
// hook.
|
// hook.
|
||||||
func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) {
|
func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) {
|
||||||
// Set up span options.
|
// Set up span options.
|
||||||
attrs := []attribute.KeyValue{
|
attrs := []interface{}{
|
||||||
semconv.MessagingSystemKey.String("kafka"),
|
semconv.MessagingSystemKey.String("kafka"),
|
||||||
semconv.MessagingDestinationKindTopic,
|
semconv.MessagingDestinationKindTopic,
|
||||||
semconv.MessagingDestinationName(r.Topic),
|
semconv.MessagingDestinationName(r.Topic),
|
||||||
@@ -71,12 +70,8 @@ func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) {
|
|||||||
if m.clientID != "" {
|
if m.clientID != "" {
|
||||||
attrs = append(attrs, semconv.MessagingKafkaClientIDKey.String(m.clientID))
|
attrs = append(attrs, semconv.MessagingKafkaClientIDKey.String(m.clientID))
|
||||||
}
|
}
|
||||||
ifattrs := make([]interface{}, 0, len(attrs))
|
|
||||||
for _, attr := range attrs {
|
|
||||||
ifattrs = append(ifattrs, attr)
|
|
||||||
}
|
|
||||||
opts := []tracer.SpanOption{
|
opts := []tracer.SpanOption{
|
||||||
tracer.WithSpanLabels(ifattrs...),
|
tracer.WithSpanLabels(attrs...),
|
||||||
tracer.WithSpanKind(tracer.SpanKindProducer),
|
tracer.WithSpanKind(tracer.SpanKindProducer),
|
||||||
}
|
}
|
||||||
// Start the "publish" span.
|
// Start the "publish" span.
|
||||||
@@ -115,7 +110,7 @@ func (m *hookTracer) OnProduceRecordUnbuffered(r *kgo.Record, err error) {
|
|||||||
// processing.
|
// processing.
|
||||||
func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) {
|
func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) {
|
||||||
// Set up the span options.
|
// Set up the span options.
|
||||||
attrs := []attribute.KeyValue{
|
attrs := []interface{}{
|
||||||
semconv.MessagingSystemKey.String("kafka"),
|
semconv.MessagingSystemKey.String("kafka"),
|
||||||
semconv.MessagingSourceKindTopic,
|
semconv.MessagingSourceKindTopic,
|
||||||
semconv.MessagingSourceName(r.Topic),
|
semconv.MessagingSourceName(r.Topic),
|
||||||
@@ -129,12 +124,8 @@ func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) {
|
|||||||
if m.group != "" {
|
if m.group != "" {
|
||||||
attrs = append(attrs, semconv.MessagingKafkaConsumerGroupKey.String(m.group))
|
attrs = append(attrs, semconv.MessagingKafkaConsumerGroupKey.String(m.group))
|
||||||
}
|
}
|
||||||
ifattrs := make([]interface{}, 0, len(attrs))
|
|
||||||
for _, attr := range attrs {
|
|
||||||
ifattrs = append(ifattrs, attr)
|
|
||||||
}
|
|
||||||
opts := []tracer.SpanOption{
|
opts := []tracer.SpanOption{
|
||||||
tracer.WithSpanLabels(ifattrs...),
|
tracer.WithSpanLabels(attrs...),
|
||||||
tracer.WithSpanKind(tracer.SpanKindConsumer),
|
tracer.WithSpanKind(tracer.SpanKindConsumer),
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -168,7 +159,7 @@ func (m *hookTracer) OnFetchRecordUnbuffered(r *kgo.Record, _ bool) {
|
|||||||
// iteration of your processing for the record.
|
// iteration of your processing for the record.
|
||||||
func (m *hookTracer) WithProcessSpan(r *kgo.Record) (context.Context, tracer.Span) {
|
func (m *hookTracer) WithProcessSpan(r *kgo.Record) (context.Context, tracer.Span) {
|
||||||
// Set up the span options.
|
// Set up the span options.
|
||||||
attrs := []attribute.KeyValue{
|
attrs := []interface{}{
|
||||||
semconv.MessagingSystemKey.String("kafka"),
|
semconv.MessagingSystemKey.String("kafka"),
|
||||||
semconv.MessagingSourceKindTopic,
|
semconv.MessagingSourceKindTopic,
|
||||||
semconv.MessagingSourceName(r.Topic),
|
semconv.MessagingSourceName(r.Topic),
|
||||||
@@ -177,10 +168,6 @@ func (m *hookTracer) WithProcessSpan(r *kgo.Record) (context.Context, tracer.Spa
|
|||||||
semconv.MessagingKafkaMessageOffset(int(r.Offset)),
|
semconv.MessagingKafkaMessageOffset(int(r.Offset)),
|
||||||
}
|
}
|
||||||
attrs = maybeKeyAttr(attrs, r)
|
attrs = maybeKeyAttr(attrs, r)
|
||||||
ifattrs := make([]interface{}, 0, len(attrs))
|
|
||||||
for _, attr := range attrs {
|
|
||||||
ifattrs = append(ifattrs, attr)
|
|
||||||
}
|
|
||||||
if m.clientID != "" {
|
if m.clientID != "" {
|
||||||
attrs = append(attrs, semconv.MessagingKafkaClientIDKey.String(m.clientID))
|
attrs = append(attrs, semconv.MessagingKafkaClientIDKey.String(m.clientID))
|
||||||
}
|
}
|
||||||
@@ -188,7 +175,7 @@ func (m *hookTracer) WithProcessSpan(r *kgo.Record) (context.Context, tracer.Spa
|
|||||||
attrs = append(attrs, semconv.MessagingKafkaConsumerGroupKey.String(m.group))
|
attrs = append(attrs, semconv.MessagingKafkaConsumerGroupKey.String(m.group))
|
||||||
}
|
}
|
||||||
opts := []tracer.SpanOption{
|
opts := []tracer.SpanOption{
|
||||||
tracer.WithSpanLabels(ifattrs...),
|
tracer.WithSpanLabels(attrs...),
|
||||||
tracer.WithSpanKind(tracer.SpanKindConsumer),
|
tracer.WithSpanKind(tracer.SpanKindConsumer),
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -199,7 +186,7 @@ func (m *hookTracer) WithProcessSpan(r *kgo.Record) (context.Context, tracer.Spa
|
|||||||
return m.tracer.Start(r.Context, r.Topic+" process", opts...)
|
return m.tracer.Start(r.Context, r.Topic+" process", opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func maybeKeyAttr(attrs []attribute.KeyValue, r *kgo.Record) []attribute.KeyValue {
|
func maybeKeyAttr(attrs []interface{}, r *kgo.Record) []interface{} {
|
||||||
if r.Key == nil {
|
if r.Key == nil {
|
||||||
return attrs
|
return attrs
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user