From 969e459e3d57d1e4ae0566a2e6283051ad5679f9 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Mon, 19 May 2025 09:34:19 +0300 Subject: [PATCH] add tracer enabled status Signed-off-by: Vasiliy Tolstov --- go.mod | 3 +-- go.sum | 8 ++------ subscriber.go | 13 +++++++++---- tracer.go | 19 +++++++++++++++++++ 4 files changed, 31 insertions(+), 12 deletions(-) diff --git a/go.mod b/go.mod index 8d56889..ad1b299 100644 --- a/go.mod +++ b/go.mod @@ -8,8 +8,7 @@ require ( github.com/twmb/franz-go/pkg/kfake v0.0.0-20250508175730-72e1646135e3 github.com/twmb/franz-go/pkg/kmsg v1.11.2 go.opentelemetry.io/otel v1.35.0 - go.unistack.org/micro-codec-json/v4 v4.1.0 - go.unistack.org/micro/v4 v4.1.13 + go.unistack.org/micro/v4 v4.1.14 ) require ( diff --git a/go.sum b/go.sum index 697ddab..9f7b1b3 100644 --- a/go.sum +++ b/go.sum @@ -24,8 +24,6 @@ github.com/spf13/cast v1.8.0 h1:gEN9K4b8Xws4EX0+a0reLmhq8moKn7ntRlQYgjPeCDk= github.com/spf13/cast v1.8.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/twmb/franz-go v1.19.0 h1:FzBAPUeaip68X9cbLDesgQesa5zxKVaZMk+du98vj3c= -github.com/twmb/franz-go v1.19.0/go.mod h1:4kFJ5tmbbl7asgwAGVuyG1ZMx0NNpYk7EqflvWfPCpM= github.com/twmb/franz-go v1.19.1 h1:cOhDFUkGvUFHSQ7UYW6bO77BJa2fYEk5mA2AX+1NIdE= github.com/twmb/franz-go v1.19.1/go.mod h1:4kFJ5tmbbl7asgwAGVuyG1ZMx0NNpYk7EqflvWfPCpM= github.com/twmb/franz-go/pkg/kadm v1.16.0 h1:STMs1t5lYR5mR974PSiwNzE5TvsosByTp+rKXLOhAjE= @@ -36,12 +34,10 @@ github.com/twmb/franz-go/pkg/kmsg v1.11.2 h1:hIw75FpwcAjgeyfIGFqivAvwC5uNIOWRGvQ github.com/twmb/franz-go/pkg/kmsg v1.11.2/go.mod h1:CFfkkLysDNmukPYhGzuUcDtf46gQSqCZHMW1T4Z+wDE= go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ= go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y= -go.unistack.org/micro-codec-json/v4 v4.1.0 h1:iydeSkt3ee7IPU0dHHKlGN97lw+YFQasBk9rdv0woYA= -go.unistack.org/micro-codec-json/v4 v4.1.0/go.mod h1:aUg86elSlURSynTAetDAAXj/VzFDwwcg92QNrRzcvrM= go.unistack.org/micro-proto/v4 v4.1.0 h1:qPwL2n/oqh9RE3RTTDgt28XK3QzV597VugQPaw9lKUk= go.unistack.org/micro-proto/v4 v4.1.0/go.mod h1:ArmK7o+uFvxSY3dbJhKBBX4Pm1rhWdLEFf3LxBrMtec= -go.unistack.org/micro/v4 v4.1.13 h1:1IEQwiIwHdypZN4dWmsWN83Plq9bdkQ4U8aYZsT17s8= -go.unistack.org/micro/v4 v4.1.13/go.mod h1:xleO2M5Yxh4s6I+RUcLrEpUjobefh+71ctrdIfn7TUs= +go.unistack.org/micro/v4 v4.1.14 h1:6EotPq9kz/gaFb5YulHdKuuUwmj/7Hk44DpOlzh/A6k= +go.unistack.org/micro/v4 v4.1.14/go.mod h1:xleO2M5Yxh4s6I+RUcLrEpUjobefh+71ctrdIfn7TUs= golang.org/x/crypto v0.38.0 h1:jt+WWG8IZlBnVbomuhg2Mdq0+BBQaHbtqHEFEigjUV8= golang.org/x/crypto v0.38.0/go.mod h1:MvrbAqul58NNYPKnOra203SB9vpuZW0e+RRZV+Ggqjw= google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= diff --git a/subscriber.go b/subscriber.go index e26a65e..e57177e 100644 --- a/subscriber.go +++ b/subscriber.go @@ -282,7 +282,9 @@ func (pc *consumer) consume() { pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Dec() if err != nil { - sp.SetStatus(tracer.SpanStatusError, err.Error()) + if sp != nil { + sp.SetStatus(tracer.SpanStatusError, err.Error()) + } pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "failure").Inc() } else if pc.opts.AutoAck { pm.ack = true @@ -299,13 +301,16 @@ func (pc *consumer) consume() { if ack { pc.c.MarkCommitRecords(record) } else { - sp.Finish() + if sp != nil { + sp.Finish() + } // pc.connected.Store(0) pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] message not commited") return } - - sp.Finish() + if sp != nil { + sp.Finish() + } } } } diff --git a/tracer.go b/tracer.go index 291a8ca..77fa85d 100644 --- a/tracer.go +++ b/tracer.go @@ -32,6 +32,9 @@ var ( // the record's context, so it can be ended in the OnProduceRecordUnbuffered // hook. func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) { + if !m.tracer.Enabled() { + return + } // Set up span options. attrs := []interface{}{ messagingSystem, @@ -77,6 +80,9 @@ func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) { // It sets attributes with values unset when producing and records any error // that occurred during the publish operation. func (m *hookTracer) OnProduceRecordUnbuffered(r *kgo.Record, err error) { + if !m.tracer.Enabled() { + return + } if span, ok := tracer.SpanFromContext(r.Context); ok { span.AddLabels( semconv.MessagingKafkaDestinationPartition(int(r.Partition)), @@ -96,6 +102,9 @@ func (m *hookTracer) OnProduceRecordUnbuffered(r *kgo.Record, err error) { // OnFetchRecordUnbuffered hook and can be used in downstream consumer // processing. func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) { + if !m.tracer.Enabled() { + return + } // Set up the span options. attrs := []interface{}{ messagingSystem, @@ -141,6 +150,9 @@ func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) { // OnFetchRecordUnbuffered continues and ends the "receive" span for an // unbuffered record. func (m *hookTracer) OnFetchRecordUnbuffered(r *kgo.Record, _ bool) { + if !m.tracer.Enabled() { + return + } span, _ := tracer.SpanFromContext(r.Context) span.Finish() } @@ -155,6 +167,13 @@ func (m *hookTracer) OnFetchRecordUnbuffered(r *kgo.Record, _ bool) { // not a record which has been created for producing, so call this at the start of each // iteration of your processing for the record. func (m *hookTracer) WithProcessSpan(r *kgo.Record) (context.Context, tracer.Span) { + if r.Context == nil { + r.Context = context.Background() + } + + if !m.tracer.Enabled() { + return r.Context, nil + } // Set up the span options. attrs := []interface{}{ messagingSystem,