From 8475922d4288ee7edbb103df00ba30eaf2715ddf Mon Sep 17 00:00:00 2001 From: Evstigneev Denis Date: Tue, 20 May 2025 14:22:58 +0300 Subject: [PATCH] move from v4 enabled tracer && added fieldalignment struct && upd deps --- broker.go | 2 +- go.mod | 2 +- go.sum | 28 ++++------------------------ kgo.go | 6 +++--- subscriber.go | 38 ++++++++++++++++++++++++-------------- tracer.go | 19 +++++++++++++++++++ 6 files changed, 52 insertions(+), 43 deletions(-) diff --git a/broker.go b/broker.go index a7b2ece..bd045c4 100644 --- a/broker.go +++ b/broker.go @@ -12,8 +12,8 @@ import ( type hookEvent struct { log logger.Logger - fatalOnError bool connected *atomic.Uint32 + fatalOnError bool } var ( diff --git a/go.mod b/go.mod index d77a9f0..1da867e 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/twmb/franz-go/pkg/kadm v1.15.0 github.com/twmb/franz-go/pkg/kmsg v1.9.0 go.opentelemetry.io/otel v1.34.0 - go.unistack.org/micro/v3 v3.11.41 + go.unistack.org/micro/v3 v3.11.44 ) require ( diff --git a/go.sum b/go.sum index a116105..4af0b07 100644 --- a/go.sum +++ b/go.sum @@ -11,8 +11,6 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= -github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= @@ -31,48 +29,30 @@ github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/twmb/franz-go v1.18.0 h1:25FjMZfdozBywVX+5xrWC2W+W76i0xykKjTdEeD2ejw= -github.com/twmb/franz-go v1.18.0/go.mod h1:zXCGy74M0p5FbXsLeASdyvfLFsBvTubVqctIaa5wQ+I= github.com/twmb/franz-go v1.18.1 h1:D75xxCDyvTqBSiImFx2lkPduE39jz1vaD7+FNc+vMkc= github.com/twmb/franz-go v1.18.1/go.mod h1:Uzo77TarcLTUZeLuGq+9lNpSkfZI+JErv7YJhlDjs9M= -github.com/twmb/franz-go/pkg/kadm v1.14.0 h1:nAn1co1lXzJQocpzyIyOFOjUBf4WHWs5/fTprXy2IZs= -github.com/twmb/franz-go/pkg/kadm v1.14.0/go.mod h1:XjOPz6ZaXXjrW2jVCfLuucP8H1w2TvD6y3PT2M+aAM4= github.com/twmb/franz-go/pkg/kadm v1.15.0 h1:Yo3NAPfcsx3Gg9/hdhq4vmwO77TqRRkvpUcGWzjworc= github.com/twmb/franz-go/pkg/kadm v1.15.0/go.mod h1:MUdcUtnf9ph4SFBLLA/XxE29rvLhWYLM9Ygb8dfSCvw= github.com/twmb/franz-go/pkg/kmsg v1.9.0 h1:JojYUph2TKAau6SBtErXpXGC7E3gg4vGZMv9xFU/B6M= github.com/twmb/franz-go/pkg/kmsg v1.9.0/go.mod h1:CMbfazviCyY6HM0SXuG5t9vOwYDHRCSrJJyBAe5paqg= -go.opentelemetry.io/otel v1.33.0 h1:/FerN9bax5LoK51X/sI0SVYrjSE0/yUL7DpxW4K3FWw= -go.opentelemetry.io/otel v1.33.0/go.mod h1:SUUkR6csvUQl+yjReHu5uM3EtVV7MBm5FHKRlNx4I8I= go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q= go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo= -go.unistack.org/micro/v3 v3.11.37 h1:ZcpnXAYEMcAwmnVb5b7o8/PylGnILxXMHaUlRrPmRI0= -go.unistack.org/micro/v3 v3.11.37/go.mod h1:POGU5hstnAT9LH70m8FalyQSNi2GfIew71K75JenIZk= -go.unistack.org/micro/v3 v3.11.41 h1:dP4sBLIZpMo+MWGe5bbESewK8wBzYm4Yik/67x4dEtQ= -go.unistack.org/micro/v3 v3.11.41/go.mod h1:POGU5hstnAT9LH70m8FalyQSNi2GfIew71K75JenIZk= -golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= -golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= +go.unistack.org/micro/v3 v3.11.44 h1:A+T8zVcL2vlL66kn/Y4rqhtBybLO829wFEYZJYorDOU= +go.unistack.org/micro/v3 v3.11.44/go.mod h1:13EFW2ps3BN9mpYbp9K0oQu/VDjEN6LJ4wwdom7hcXQ= golang.org/x/crypto v0.35.0 h1:b15kiHdrGCHrP6LvwaQ3c03kgNhhiMgvlhxHQhmg2Xs= golang.org/x/crypto v0.35.0/go.mod h1:dy7dXNW32cAb/6/PRuTNsix8T+vJAqvuIy5Bli/x0YQ= golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= -golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= -golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= -golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 h1:Z7FRVJPSMaHQxD0uXU8WdgFh8PseLM8Q8NzhnpMrBhQ= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484/go.mod h1:lcTa1sDdWEIHMWlITnIczmw5w60CF9ffkb8Z+DVmmjA= +golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM= +golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= google.golang.org/genproto/googleapis/rpc v0.0.0-20250224174004-546df14abb99 h1:ZSlhAUqC4r8TPzqLXQ0m3upBNZeF+Y8jQ3c4CR3Ujms= google.golang.org/genproto/googleapis/rpc v0.0.0-20250224174004-546df14abb99/go.mod h1:LuRYeWDFV6WOn90g357N17oMCaxpgCnbi/44qJvDn2I= -google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU= -google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= google.golang.org/grpc v1.70.0 h1:pWFv03aZoHzlRKHWicjsZytKAiYCtNS0dHbXnIdq7jQ= google.golang.org/grpc v1.70.0/go.mod h1:ofIJqVKDXx/JiXrwr2IG4/zwdH9txy3IlF40RmcJSQw= -google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk= -google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/kgo.go b/kgo.go index c99cfdc..b295b95 100644 --- a/kgo.go +++ b/kgo.go @@ -60,15 +60,15 @@ type Broker struct { c *kgo.Client connected *atomic.Uint32 + done chan struct{} + kopts []kgo.Opt subs []*Subscriber opts broker.Options - sync.RWMutex - init bool - done chan struct{} + init bool } func (r *Broker) Live() bool { diff --git a/subscriber.go b/subscriber.go index d891f8f..0ff859b 100644 --- a/subscriber.go +++ b/subscriber.go @@ -24,34 +24,44 @@ type tp struct { } type consumer struct { - topic string + topic string + c *kgo.Client htracer *hookTracer - quit chan struct{} - done chan struct{} - recs chan kgo.FetchTopicPartition - kopts broker.Options - partition int32 - opts broker.SubscribeOptions - handler broker.Handler connected *atomic.Uint32 + + quit chan struct{} + done chan struct{} + recs chan kgo.FetchTopicPartition + + handler broker.Handler + + kopts broker.Options + opts broker.SubscribeOptions + + partition int32 } type Subscriber struct { + topic string + consumers map[tp]*consumer + c *kgo.Client htracer *hookTracer - topic string + connected *atomic.Uint32 handler broker.Handler - done chan struct{} - kopts broker.Options - opts broker.SubscribeOptions - connected *atomic.Uint32 - sync.RWMutex + done chan struct{} + + kopts broker.Options + opts broker.SubscribeOptions + closed bool fatalOnError bool + + sync.RWMutex } func (s *Subscriber) Client() *kgo.Client { diff --git a/tracer.go b/tracer.go index feca43c..8a654c7 100644 --- a/tracer.go +++ b/tracer.go @@ -32,6 +32,9 @@ var ( // the record's context, so it can be ended in the OnProduceRecordUnbuffered // hook. func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) { + if !m.tracer.Enabled() { + return + } // Set up span options. attrs := []interface{}{ messagingSystem, @@ -77,6 +80,9 @@ func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) { // It sets attributes with values unset when producing and records any error // that occurred during the publish operation. func (m *hookTracer) OnProduceRecordUnbuffered(r *kgo.Record, err error) { + if !m.tracer.Enabled() { + return + } if span, ok := tracer.SpanFromContext(r.Context); ok { span.AddLabels( semconv.MessagingKafkaDestinationPartition(int(r.Partition)), @@ -96,6 +102,9 @@ func (m *hookTracer) OnProduceRecordUnbuffered(r *kgo.Record, err error) { // OnFetchRecordUnbuffered hook and can be used in downstream consumer // processing. func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) { + if !m.tracer.Enabled() { + return + } // Set up the span options. attrs := []interface{}{ messagingSystem, @@ -141,6 +150,9 @@ func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) { // OnFetchRecordUnbuffered continues and ends the "receive" span for an // unbuffered record. func (m *hookTracer) OnFetchRecordUnbuffered(r *kgo.Record, _ bool) { + if !m.tracer.Enabled() { + return + } span, _ := tracer.SpanFromContext(r.Context) span.Finish() } @@ -155,6 +167,13 @@ func (m *hookTracer) OnFetchRecordUnbuffered(r *kgo.Record, _ bool) { // not a record which has been created for producing, so call this at the start of each // iteration of your processing for the record. func (m *hookTracer) WithProcessSpan(r *kgo.Record) (context.Context, tracer.Span) { + if r.Context == nil { + r.Context = context.Background() + } + + if !m.tracer.Enabled() { + return r.Context, nil + } // Set up the span options. attrs := []interface{}{ messagingSystem,