micro-broker-kgo/tracer.go
Vasiliy Tolstov 0a395235d6
Some checks are pending
build / test (push) Waiting to run
build / lint (push) Waiting to run
codeql / analyze (go) (push) Waiting to run
backport
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-02-21 15:08:58 +03:00

213 lines
6.6 KiB
Go

package kgo
import (
"context"
"net"
"time"
"unicode/utf8"
"github.com/twmb/franz-go/pkg/kgo"
"go.unistack.org/micro/v3/tracer"
"go.opentelemetry.io/otel/attribute"
semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
)
type hookTracer struct {
clientID string
group string
tracer tracer.Tracer
}
var (
_ kgo.HookBrokerConnect = &hookTracer{}
_ kgo.HookBrokerDisconnect = &hookTracer{}
_ kgo.HookBrokerRead = &hookTracer{}
_ kgo.HookBrokerThrottle = &hookTracer{}
_ kgo.HookBrokerWrite = &hookTracer{}
_ kgo.HookFetchBatchRead = &hookTracer{}
_ kgo.HookProduceBatchWritten = &hookTracer{}
_ kgo.HookGroupManageError = &hookTracer{}
)
func (m *hookTracer) OnGroupManageError(err error) {
}
func (m *hookTracer) OnBrokerConnect(meta kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) {
}
func (m *hookTracer) OnBrokerDisconnect(meta kgo.BrokerMetadata, _ net.Conn) {
}
func (m *hookTracer) OnBrokerWrite(meta kgo.BrokerMetadata, _ int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error) {
}
func (m *hookTracer) OnBrokerRead(meta kgo.BrokerMetadata, _ int16, bytesRead int, readWait, timeToRead time.Duration, err error) {
}
func (m *hookTracer) OnBrokerThrottle(meta kgo.BrokerMetadata, throttleInterval time.Duration, _ bool) {
}
func (m *hookTracer) OnProduceBatchWritten(meta kgo.BrokerMetadata, topic string, _ int32, kmetrics kgo.ProduceBatchMetrics) {
}
func (m *hookTracer) OnFetchBatchRead(meta kgo.BrokerMetadata, topic string, _ int32, kmetrics kgo.FetchBatchMetrics) {
}
// OnProduceRecordBuffered starts a new span for the "publish" operation on a
// buffered record.
//
// It sets span options and injects the span context into record and updates
// the record's context, so it can be ended in the OnProduceRecordUnbuffered
// hook.
func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) {
// Set up span options.
attrs := []attribute.KeyValue{
semconv.MessagingSystemKey.String("kafka"),
semconv.MessagingDestinationKindTopic,
semconv.MessagingDestinationName(r.Topic),
semconv.MessagingOperationPublish,
}
attrs = maybeKeyAttr(attrs, r)
if m.clientID != "" {
attrs = append(attrs, semconv.MessagingKafkaClientIDKey.String(m.clientID))
}
ifattrs := make([]interface{}, 0, len(attrs))
for _, attr := range attrs {
ifattrs = append(ifattrs, attr)
}
opts := []tracer.SpanOption{
tracer.WithSpanLabels(ifattrs...),
tracer.WithSpanKind(tracer.SpanKindProducer),
}
// Start the "publish" span.
ctx, _ := m.tracer.Start(r.Context, r.Topic+" publish", opts...)
// Inject the span context into the record.
// t.propagators.Inject(ctx, NewRecordCarrier(r))
// Update the record context.
r.Context = ctx
}
// OnProduceRecordUnbuffered continues and ends the "publish" span for an
// unbuffered record.
//
// It sets attributes with values unset when producing and records any error
// that occurred during the publish operation.
func (m *hookTracer) OnProduceRecordUnbuffered(r *kgo.Record, err error) {
span, ok := tracer.SpanFromContext(r.Context)
if !ok {
return
}
defer span.Finish()
span.AddLabels(
semconv.MessagingKafkaDestinationPartition(int(r.Partition)),
)
if err != nil {
span.SetStatus(tracer.SpanStatusError, err.Error())
}
}
// OnFetchRecordBuffered starts a new span for the "receive" operation on a
// buffered record.
//
// It sets the span options and extracts the span context from the record,
// updates the record's context to ensure it can be ended in the
// OnFetchRecordUnbuffered hook and can be used in downstream consumer
// processing.
func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) {
// Set up the span options.
attrs := []attribute.KeyValue{
semconv.MessagingSystemKey.String("kafka"),
semconv.MessagingSourceKindTopic,
semconv.MessagingSourceName(r.Topic),
semconv.MessagingOperationReceive,
semconv.MessagingKafkaSourcePartition(int(r.Partition)),
}
attrs = maybeKeyAttr(attrs, r)
if m.clientID != "" {
attrs = append(attrs, semconv.MessagingKafkaClientIDKey.String(m.clientID))
}
if m.group != "" {
attrs = append(attrs, semconv.MessagingKafkaConsumerGroupKey.String(m.group))
}
ifattrs := make([]interface{}, 0, len(attrs))
for _, attr := range attrs {
ifattrs = append(ifattrs, attr)
}
opts := []tracer.SpanOption{
tracer.WithSpanLabels(ifattrs...),
tracer.WithSpanKind(tracer.SpanKindConsumer),
}
if r.Context == nil {
r.Context = context.Background()
}
// Extract the span context from the record.
// ctx := t.propagators.Extract(r.Context, NewRecordCarrier(r))
// Start the "receive" span.
newCtx, _ := m.tracer.Start(r.Context, r.Topic+" receive", opts...)
// Update the record context.
r.Context = newCtx
}
// OnFetchRecordUnbuffered continues and ends the "receive" span for an
// unbuffered record.
func (m *hookTracer) OnFetchRecordUnbuffered(r *kgo.Record, _ bool) {
if span, ok := tracer.SpanFromContext(r.Context); ok {
defer span.Finish()
}
}
// WithProcessSpan starts a new span for the "process" operation on a consumer
// record.
//
// It sets up the span options. The user's application code is responsible for
// ending the span.
//
// This should only ever be called within a polling loop of a consumed record and
// not a record which has been created for producing, so call this at the start of each
// iteration of your processing for the record.
func (m *hookTracer) WithProcessSpan(r *kgo.Record) (context.Context, tracer.Span) {
// Set up the span options.
attrs := []attribute.KeyValue{
semconv.MessagingSystemKey.String("kafka"),
semconv.MessagingSourceKindTopic,
semconv.MessagingSourceName(r.Topic),
semconv.MessagingOperationProcess,
semconv.MessagingKafkaSourcePartition(int(r.Partition)),
semconv.MessagingKafkaMessageOffset(int(r.Offset)),
}
attrs = maybeKeyAttr(attrs, r)
ifattrs := make([]interface{}, 0, len(attrs))
for _, attr := range attrs {
ifattrs = append(ifattrs, attr)
}
if m.clientID != "" {
attrs = append(attrs, semconv.MessagingKafkaClientIDKey.String(m.clientID))
}
if m.group != "" {
attrs = append(attrs, semconv.MessagingKafkaConsumerGroupKey.String(m.group))
}
opts := []tracer.SpanOption{
tracer.WithSpanLabels(ifattrs...),
tracer.WithSpanKind(tracer.SpanKindConsumer),
}
if r.Context == nil {
r.Context = context.Background()
}
// Start a new span using the provided context and options.
return m.tracer.Start(r.Context, r.Topic+" process", opts...)
}
func maybeKeyAttr(attrs []attribute.KeyValue, r *kgo.Record) []attribute.KeyValue {
if r.Key == nil {
return attrs
}
var keykey string
if !utf8.Valid(r.Key) {
return attrs
}
keykey = string(r.Key)
return append(attrs, semconv.MessagingKafkaMessageKeyKey.String(keykey))
}