Compare commits

...

2 Commits

Author SHA1 Message Date
ea2ac477be set UnknownTopicRetries with 0
Some checks failed
codeql / analyze (go) (push) Failing after 38s
build / test (push) Failing after 4m52s
build / lint (push) Successful in 9m28s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-10-06 13:53:30 +03:00
c1fa2f639d minimize memory usage, name all traces with sdk.broker
Some checks failed
build / lint (push) Failing after 8s
build / test (push) Failing after 7s
codeql / analyze (go) (push) Failing after 11s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-27 13:28:17 +03:00
2 changed files with 11 additions and 8 deletions

1
kgo.go
View File

@@ -443,6 +443,7 @@ func NewBroker(opts ...broker.Option) *Broker {
kgo.BlockRebalanceOnPoll(), kgo.BlockRebalanceOnPoll(),
kgo.Balancers(kgo.CooperativeStickyBalancer()), kgo.Balancers(kgo.CooperativeStickyBalancer()),
kgo.FetchIsolationLevel(kgo.ReadUncommitted()), kgo.FetchIsolationLevel(kgo.ReadUncommitted()),
kgo.UnknownTopicRetries(0),
} }
if options.Context != nil { if options.Context != nil {

View File

@@ -16,6 +16,8 @@ type hookTracer struct {
tracer tracer.Tracer tracer tracer.Tracer
} }
var messagingSystem = semconv.MessagingSystemKey.String("kafka")
var ( var (
_ kgo.HookProduceRecordBuffered = (*hookTracer)(nil) _ kgo.HookProduceRecordBuffered = (*hookTracer)(nil)
_ kgo.HookProduceRecordUnbuffered = (*hookTracer)(nil) _ kgo.HookProduceRecordUnbuffered = (*hookTracer)(nil)
@@ -32,7 +34,7 @@ var (
func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) { func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) {
// Set up span options. // Set up span options.
attrs := []interface{}{ attrs := []interface{}{
semconv.MessagingSystemKey.String("kafka"), messagingSystem,
semconv.MessagingDestinationKindTopic, semconv.MessagingDestinationKindTopic,
semconv.MessagingDestinationName(r.Topic), semconv.MessagingDestinationName(r.Topic),
semconv.MessagingOperationPublish, semconv.MessagingOperationPublish,
@@ -59,9 +61,9 @@ func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) {
} }
if !ok { if !ok {
r.Context, _ = m.tracer.Start(metadata.NewOutgoingContext(r.Context, md), r.Topic+" publish", opts...) r.Context, _ = m.tracer.Start(metadata.NewOutgoingContext(r.Context, md), "sdk.broker", opts...)
} else { } else {
r.Context, _ = m.tracer.Start(r.Context, r.Topic+" publish", opts...) r.Context, _ = m.tracer.Start(r.Context, "sdk.broker", opts...)
} }
md, _ = metadata.FromOutgoingContext(r.Context) md, _ = metadata.FromOutgoingContext(r.Context)
@@ -96,7 +98,7 @@ func (m *hookTracer) OnProduceRecordUnbuffered(r *kgo.Record, err error) {
func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) { func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) {
// Set up the span options. // Set up the span options.
attrs := []interface{}{ attrs := []interface{}{
semconv.MessagingSystemKey.String("kafka"), messagingSystem,
semconv.MessagingSourceKindTopic, semconv.MessagingSourceKindTopic,
semconv.MessagingSourceName(r.Topic), semconv.MessagingSourceName(r.Topic),
semconv.MessagingOperationReceive, semconv.MessagingOperationReceive,
@@ -126,9 +128,9 @@ func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) {
} }
if !ok { if !ok {
r.Context, _ = m.tracer.Start(metadata.NewIncomingContext(r.Context, md), r.Topic+" receive", opts...) r.Context, _ = m.tracer.Start(metadata.NewIncomingContext(r.Context, md), "sdk.broker", opts...)
} else { } else {
r.Context, _ = m.tracer.Start(r.Context, r.Topic+" receive", opts...) r.Context, _ = m.tracer.Start(r.Context, "sdk.broker", opts...)
} }
md, _ = metadata.FromIncomingContext(r.Context) md, _ = metadata.FromIncomingContext(r.Context)
@@ -155,7 +157,7 @@ func (m *hookTracer) OnFetchRecordUnbuffered(r *kgo.Record, _ bool) {
func (m *hookTracer) WithProcessSpan(r *kgo.Record) (context.Context, tracer.Span) { func (m *hookTracer) WithProcessSpan(r *kgo.Record) (context.Context, tracer.Span) {
// Set up the span options. // Set up the span options.
attrs := []interface{}{ attrs := []interface{}{
semconv.MessagingSystemKey.String("kafka"), messagingSystem,
semconv.MessagingSourceKindTopic, semconv.MessagingSourceKindTopic,
semconv.MessagingSourceName(r.Topic), semconv.MessagingSourceName(r.Topic),
semconv.MessagingOperationProcess, semconv.MessagingOperationProcess,
@@ -186,7 +188,7 @@ func (m *hookTracer) WithProcessSpan(r *kgo.Record) (context.Context, tracer.Spa
} }
// Start a new span using the provided context and options. // Start a new span using the provided context and options.
return m.tracer.Start(r.Context, r.Topic+" process", opts...) return m.tracer.Start(r.Context, "sdk.broker", opts...)
} }
func maybeKeyAttr(attrs []interface{}, r *kgo.Record) []interface{} { func maybeKeyAttr(attrs []interface{}, r *kgo.Record) []interface{} {