Compare commits

..

3 Commits

Author SHA1 Message Date
56288f46b1 cleanup tracing
Some checks failed
build / test (push) Failing after 1m30s
codeql / analyze (go) (push) Failing after 1m57s
build / lint (push) Successful in 9m19s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-03-14 23:25:19 +03:00
81dcef8b28 fixup tracer span labels
Some checks failed
build / test (push) Failing after 1m29s
codeql / analyze (go) (push) Failing after 1m53s
build / lint (push) Successful in 9m15s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-03-14 16:47:44 +03:00
ec7a22b2dc fix double init
Some checks failed
build / test (push) Failing after 1m34s
codeql / analyze (go) (push) Failing after 2m5s
build / lint (push) Successful in 9m15s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-03-07 09:06:33 +03:00
2 changed files with 11 additions and 16 deletions

7
kgo.go
View File

@@ -54,6 +54,7 @@ var DefaultRetryBackoffFn = func() func(int) time.Duration {
}()
type Broker struct {
init bool
c *kgo.Client
kopts []kgo.Opt
connected bool
@@ -182,6 +183,10 @@ func (k *Broker) Init(opts ...broker.Option) error {
k.Lock()
defer k.Unlock()
if len(opts) == 0 && k.init {
return nil
}
for _, o := range opts {
o(&k.opts)
}
@@ -205,6 +210,8 @@ func (k *Broker) Init(opts ...broker.Option) error {
}
}
k.init = true
return nil
}

View File

@@ -7,9 +7,9 @@ import (
"unicode/utf8"
"github.com/twmb/franz-go/pkg/kgo"
"go.unistack.org/micro/v3/tracer"
"go.opentelemetry.io/otel/attribute"
semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
"go.unistack.org/micro/v3/tracer"
)
type hookTracer struct {
@@ -71,12 +71,8 @@ func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) {
if m.clientID != "" {
attrs = append(attrs, semconv.MessagingKafkaClientIDKey.String(m.clientID))
}
ifattrs := make([]interface{}, 0, len(attrs))
for _, attr := range attrs {
ifattrs = append(ifattrs, attr)
}
opts := []tracer.SpanOption{
tracer.WithSpanLabels(ifattrs...),
tracer.WithSpanLabels(attrs...),
tracer.WithSpanKind(tracer.SpanKindProducer),
}
// Start the "publish" span.
@@ -129,12 +125,8 @@ func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) {
if m.group != "" {
attrs = append(attrs, semconv.MessagingKafkaConsumerGroupKey.String(m.group))
}
ifattrs := make([]interface{}, 0, len(attrs))
for _, attr := range attrs {
ifattrs = append(ifattrs, attr)
}
opts := []tracer.SpanOption{
tracer.WithSpanLabels(ifattrs...),
tracer.WithSpanLabels(attrs...),
tracer.WithSpanKind(tracer.SpanKindConsumer),
}
@@ -177,10 +169,6 @@ func (m *hookTracer) WithProcessSpan(r *kgo.Record) (context.Context, tracer.Spa
semconv.MessagingKafkaMessageOffset(int(r.Offset)),
}
attrs = maybeKeyAttr(attrs, r)
ifattrs := make([]interface{}, 0, len(attrs))
for _, attr := range attrs {
ifattrs = append(ifattrs, attr)
}
if m.clientID != "" {
attrs = append(attrs, semconv.MessagingKafkaClientIDKey.String(m.clientID))
}
@@ -188,7 +176,7 @@ func (m *hookTracer) WithProcessSpan(r *kgo.Record) (context.Context, tracer.Spa
attrs = append(attrs, semconv.MessagingKafkaConsumerGroupKey.String(m.group))
}
opts := []tracer.SpanOption{
tracer.WithSpanLabels(ifattrs...),
tracer.WithSpanLabels(attrs...),
tracer.WithSpanKind(tracer.SpanKindConsumer),
}