Compare commits

...

6 Commits

Author SHA1 Message Date
d559db4050 fixup logger caller skip count
Some checks failed
build / test (push) Failing after 1m17s
codeql / analyze (go) (push) Failing after 8m52s
build / lint (push) Successful in 9m15s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-04-13 00:53:09 +03:00
aa946c469a fixup logger caller skip count
Some checks failed
codeql / analyze (go) (push) Failing after 1m51s
build / test (push) Failing after 1m55s
build / lint (push) Successful in 9m14s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-04-13 00:40:12 +03:00
9c4d88bb69 fixup for attrs
Some checks failed
build / test (push) Failing after 1m31s
codeql / analyze (go) (push) Failing after 1m53s
build / lint (push) Successful in 9m15s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-03-15 09:38:31 +03:00
56288f46b1 cleanup tracing
Some checks failed
build / test (push) Failing after 1m30s
codeql / analyze (go) (push) Failing after 1m57s
build / lint (push) Successful in 9m19s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-03-14 23:25:19 +03:00
81dcef8b28 fixup tracer span labels
Some checks failed
build / test (push) Failing after 1m29s
codeql / analyze (go) (push) Failing after 1m53s
build / lint (push) Successful in 9m15s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-03-14 16:47:44 +03:00
ec7a22b2dc fix double init
Some checks failed
build / test (push) Failing after 1m34s
codeql / analyze (go) (push) Failing after 2m5s
build / lint (push) Successful in 9m15s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-03-07 09:06:33 +03:00
2 changed files with 17 additions and 22 deletions

10
kgo.go
View File

@@ -14,6 +14,7 @@ import (
"github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg" "github.com/twmb/franz-go/pkg/kmsg"
"go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/tracer" "go.unistack.org/micro/v3/tracer"
mrand "go.unistack.org/micro/v3/util/rand" mrand "go.unistack.org/micro/v3/util/rand"
@@ -54,6 +55,7 @@ var DefaultRetryBackoffFn = func() func(int) time.Duration {
}() }()
type Broker struct { type Broker struct {
init bool
c *kgo.Client c *kgo.Client
kopts []kgo.Opt kopts []kgo.Opt
connected bool connected bool
@@ -182,6 +184,10 @@ func (k *Broker) Init(opts ...broker.Option) error {
k.Lock() k.Lock()
defer k.Unlock() defer k.Unlock()
if len(opts) == 0 && k.init {
return nil
}
for _, o := range opts { for _, o := range opts {
o(&k.opts) o(&k.opts)
} }
@@ -205,6 +211,8 @@ func (k *Broker) Init(opts ...broker.Option) error {
} }
} }
k.init = true
return nil return nil
} }
@@ -405,7 +413,7 @@ func NewBroker(opts ...broker.Option) *Broker {
kgo.DialTimeout(3 * time.Second), kgo.DialTimeout(3 * time.Second),
kgo.DisableIdempotentWrite(), kgo.DisableIdempotentWrite(),
kgo.ProducerBatchCompression(kgo.NoCompression()), kgo.ProducerBatchCompression(kgo.NoCompression()),
kgo.WithLogger(&mlogger{l: options.Logger, ctx: options.Context}), kgo.WithLogger(&mlogger{l: options.Logger.Clone(logger.WithCallerSkipCount(options.Logger.Options().CallerSkipCount + 2)), ctx: options.Context}),
kgo.SeedBrokers(kaddrs...), kgo.SeedBrokers(kaddrs...),
kgo.RetryBackoffFn(DefaultRetryBackoffFn), kgo.RetryBackoffFn(DefaultRetryBackoffFn),
kgo.BlockRebalanceOnPoll(), kgo.BlockRebalanceOnPoll(),

View File

@@ -7,9 +7,8 @@ import (
"unicode/utf8" "unicode/utf8"
"github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kgo"
"go.unistack.org/micro/v3/tracer"
"go.opentelemetry.io/otel/attribute"
semconv "go.opentelemetry.io/otel/semconv/v1.18.0" semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
"go.unistack.org/micro/v3/tracer"
) )
type hookTracer struct { type hookTracer struct {
@@ -61,7 +60,7 @@ func (m *hookTracer) OnFetchBatchRead(meta kgo.BrokerMetadata, topic string, _ i
// hook. // hook.
func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) { func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) {
// Set up span options. // Set up span options.
attrs := []attribute.KeyValue{ attrs := []interface{}{
semconv.MessagingSystemKey.String("kafka"), semconv.MessagingSystemKey.String("kafka"),
semconv.MessagingDestinationKindTopic, semconv.MessagingDestinationKindTopic,
semconv.MessagingDestinationName(r.Topic), semconv.MessagingDestinationName(r.Topic),
@@ -71,12 +70,8 @@ func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) {
if m.clientID != "" { if m.clientID != "" {
attrs = append(attrs, semconv.MessagingKafkaClientIDKey.String(m.clientID)) attrs = append(attrs, semconv.MessagingKafkaClientIDKey.String(m.clientID))
} }
ifattrs := make([]interface{}, 0, len(attrs))
for _, attr := range attrs {
ifattrs = append(ifattrs, attr)
}
opts := []tracer.SpanOption{ opts := []tracer.SpanOption{
tracer.WithSpanLabels(ifattrs...), tracer.WithSpanLabels(attrs...),
tracer.WithSpanKind(tracer.SpanKindProducer), tracer.WithSpanKind(tracer.SpanKindProducer),
} }
// Start the "publish" span. // Start the "publish" span.
@@ -115,7 +110,7 @@ func (m *hookTracer) OnProduceRecordUnbuffered(r *kgo.Record, err error) {
// processing. // processing.
func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) { func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) {
// Set up the span options. // Set up the span options.
attrs := []attribute.KeyValue{ attrs := []interface{}{
semconv.MessagingSystemKey.String("kafka"), semconv.MessagingSystemKey.String("kafka"),
semconv.MessagingSourceKindTopic, semconv.MessagingSourceKindTopic,
semconv.MessagingSourceName(r.Topic), semconv.MessagingSourceName(r.Topic),
@@ -129,12 +124,8 @@ func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) {
if m.group != "" { if m.group != "" {
attrs = append(attrs, semconv.MessagingKafkaConsumerGroupKey.String(m.group)) attrs = append(attrs, semconv.MessagingKafkaConsumerGroupKey.String(m.group))
} }
ifattrs := make([]interface{}, 0, len(attrs))
for _, attr := range attrs {
ifattrs = append(ifattrs, attr)
}
opts := []tracer.SpanOption{ opts := []tracer.SpanOption{
tracer.WithSpanLabels(ifattrs...), tracer.WithSpanLabels(attrs...),
tracer.WithSpanKind(tracer.SpanKindConsumer), tracer.WithSpanKind(tracer.SpanKindConsumer),
} }
@@ -168,7 +159,7 @@ func (m *hookTracer) OnFetchRecordUnbuffered(r *kgo.Record, _ bool) {
// iteration of your processing for the record. // iteration of your processing for the record.
func (m *hookTracer) WithProcessSpan(r *kgo.Record) (context.Context, tracer.Span) { func (m *hookTracer) WithProcessSpan(r *kgo.Record) (context.Context, tracer.Span) {
// Set up the span options. // Set up the span options.
attrs := []attribute.KeyValue{ attrs := []interface{}{
semconv.MessagingSystemKey.String("kafka"), semconv.MessagingSystemKey.String("kafka"),
semconv.MessagingSourceKindTopic, semconv.MessagingSourceKindTopic,
semconv.MessagingSourceName(r.Topic), semconv.MessagingSourceName(r.Topic),
@@ -177,10 +168,6 @@ func (m *hookTracer) WithProcessSpan(r *kgo.Record) (context.Context, tracer.Spa
semconv.MessagingKafkaMessageOffset(int(r.Offset)), semconv.MessagingKafkaMessageOffset(int(r.Offset)),
} }
attrs = maybeKeyAttr(attrs, r) attrs = maybeKeyAttr(attrs, r)
ifattrs := make([]interface{}, 0, len(attrs))
for _, attr := range attrs {
ifattrs = append(ifattrs, attr)
}
if m.clientID != "" { if m.clientID != "" {
attrs = append(attrs, semconv.MessagingKafkaClientIDKey.String(m.clientID)) attrs = append(attrs, semconv.MessagingKafkaClientIDKey.String(m.clientID))
} }
@@ -188,7 +175,7 @@ func (m *hookTracer) WithProcessSpan(r *kgo.Record) (context.Context, tracer.Spa
attrs = append(attrs, semconv.MessagingKafkaConsumerGroupKey.String(m.group)) attrs = append(attrs, semconv.MessagingKafkaConsumerGroupKey.String(m.group))
} }
opts := []tracer.SpanOption{ opts := []tracer.SpanOption{
tracer.WithSpanLabels(ifattrs...), tracer.WithSpanLabels(attrs...),
tracer.WithSpanKind(tracer.SpanKindConsumer), tracer.WithSpanKind(tracer.SpanKindConsumer),
} }
@@ -199,7 +186,7 @@ func (m *hookTracer) WithProcessSpan(r *kgo.Record) (context.Context, tracer.Spa
return m.tracer.Start(r.Context, r.Topic+" process", opts...) return m.tracer.Start(r.Context, r.Topic+" process", opts...)
} }
func maybeKeyAttr(attrs []attribute.KeyValue, r *kgo.Record) []attribute.KeyValue { func maybeKeyAttr(attrs []interface{}, r *kgo.Record) []interface{} {
if r.Key == nil { if r.Key == nil {
return attrs return attrs
} }