Compare commits
2 Commits
Author | SHA1 | Date | |
---|---|---|---|
81dcef8b28 | |||
ec7a22b2dc |
7
kgo.go
7
kgo.go
@@ -54,6 +54,7 @@ var DefaultRetryBackoffFn = func() func(int) time.Duration {
|
||||
}()
|
||||
|
||||
type Broker struct {
|
||||
init bool
|
||||
c *kgo.Client
|
||||
kopts []kgo.Opt
|
||||
connected bool
|
||||
@@ -182,6 +183,10 @@ func (k *Broker) Init(opts ...broker.Option) error {
|
||||
k.Lock()
|
||||
defer k.Unlock()
|
||||
|
||||
if len(opts) == 0 && k.init {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
o(&k.opts)
|
||||
}
|
||||
@@ -205,6 +210,8 @@ func (k *Broker) Init(opts ...broker.Option) error {
|
||||
}
|
||||
}
|
||||
|
||||
k.init = true
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
28
tracer.go
28
tracer.go
@@ -7,9 +7,9 @@ import (
|
||||
"unicode/utf8"
|
||||
|
||||
"github.com/twmb/franz-go/pkg/kgo"
|
||||
"go.unistack.org/micro/v3/tracer"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
|
||||
"go.unistack.org/micro/v3/tracer"
|
||||
)
|
||||
|
||||
type hookTracer struct {
|
||||
@@ -71,12 +71,8 @@ func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) {
|
||||
if m.clientID != "" {
|
||||
attrs = append(attrs, semconv.MessagingKafkaClientIDKey.String(m.clientID))
|
||||
}
|
||||
ifattrs := make([]interface{}, 0, len(attrs))
|
||||
for _, attr := range attrs {
|
||||
ifattrs = append(ifattrs, attr)
|
||||
}
|
||||
opts := []tracer.SpanOption{
|
||||
tracer.WithSpanLabels(ifattrs...),
|
||||
tracer.WithSpanLabels(otel2Micro(attrs...)),
|
||||
tracer.WithSpanKind(tracer.SpanKindProducer),
|
||||
}
|
||||
// Start the "publish" span.
|
||||
@@ -129,12 +125,8 @@ func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) {
|
||||
if m.group != "" {
|
||||
attrs = append(attrs, semconv.MessagingKafkaConsumerGroupKey.String(m.group))
|
||||
}
|
||||
ifattrs := make([]interface{}, 0, len(attrs))
|
||||
for _, attr := range attrs {
|
||||
ifattrs = append(ifattrs, attr)
|
||||
}
|
||||
opts := []tracer.SpanOption{
|
||||
tracer.WithSpanLabels(ifattrs...),
|
||||
tracer.WithSpanLabels(otel2Micro(attrs...)),
|
||||
tracer.WithSpanKind(tracer.SpanKindConsumer),
|
||||
}
|
||||
|
||||
@@ -177,10 +169,6 @@ func (m *hookTracer) WithProcessSpan(r *kgo.Record) (context.Context, tracer.Spa
|
||||
semconv.MessagingKafkaMessageOffset(int(r.Offset)),
|
||||
}
|
||||
attrs = maybeKeyAttr(attrs, r)
|
||||
ifattrs := make([]interface{}, 0, len(attrs))
|
||||
for _, attr := range attrs {
|
||||
ifattrs = append(ifattrs, attr)
|
||||
}
|
||||
if m.clientID != "" {
|
||||
attrs = append(attrs, semconv.MessagingKafkaClientIDKey.String(m.clientID))
|
||||
}
|
||||
@@ -188,7 +176,7 @@ func (m *hookTracer) WithProcessSpan(r *kgo.Record) (context.Context, tracer.Spa
|
||||
attrs = append(attrs, semconv.MessagingKafkaConsumerGroupKey.String(m.group))
|
||||
}
|
||||
opts := []tracer.SpanOption{
|
||||
tracer.WithSpanLabels(ifattrs...),
|
||||
tracer.WithSpanLabels(otel2Micro(attrs...)),
|
||||
tracer.WithSpanKind(tracer.SpanKindConsumer),
|
||||
}
|
||||
|
||||
@@ -210,3 +198,11 @@ func maybeKeyAttr(attrs []attribute.KeyValue, r *kgo.Record) []attribute.KeyValu
|
||||
keykey = string(r.Key)
|
||||
return append(attrs, semconv.MessagingKafkaMessageKeyKey.String(keykey))
|
||||
}
|
||||
|
||||
func otel2Micro(attrs ...attribute.KeyValue) []interface{} {
|
||||
ret := make([]interface{}, 0, len(attrs))
|
||||
for _, a := range attrs {
|
||||
ret = append(ret, a.Key, a.Value)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
Reference in New Issue
Block a user