diff --git a/kgo.go b/kgo.go index fda3910..35878d1 100644 --- a/kgo.go +++ b/kgo.go @@ -73,7 +73,7 @@ func (k *Broker) Name() string { return k.opts.Name } -func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, error) { +func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, *hookTracer, error) { var c *kgo.Client var err error @@ -90,9 +90,10 @@ func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, err } } + htracer := &hookTracer{group: group, clientID: clientID, tracer: k.opts.Tracer} opts = append(opts, kgo.WithHooks(&hookMeter{meter: k.opts.Meter}), - kgo.WithHooks(&hookTracer{group: group, clientID: clientID, tracer: k.opts.Tracer}), + kgo.WithHooks(htracer), ) select { @@ -102,7 +103,7 @@ func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, err sp.SetStatus(tracer.SpanStatusError, ctx.Err().Error()) } } - return nil, ctx.Err() + return nil, nil, ctx.Err() default: c, err = kgo.NewClient(opts...) if err == nil { @@ -112,10 +113,10 @@ func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, err if sp != nil { sp.SetStatus(tracer.SpanStatusError, err.Error()) } - return nil, err + return nil, nil, err } } - return c, nil + return c, htracer, nil } func (k *Broker) Connect(ctx context.Context) error { @@ -131,7 +132,7 @@ func (k *Broker) Connect(ctx context.Context) error { nctx = ctx } - c, err := k.connect(nctx, k.kopts...) + c, _, err := k.connect(nctx, k.kopts...) if err != nil { return err } @@ -236,7 +237,7 @@ func (k *Broker) Publish(ctx context.Context, topic string, msg *broker.Message, func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { k.Lock() if !k.connected { - c, err := k.connect(ctx, k.kopts...) + c, _, err := k.connect(ctx, k.kopts...) if err != nil { k.Unlock() return err @@ -370,7 +371,7 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han } } - c, err := k.connect(ctx, kopts...) + c, htracer, err := k.connect(ctx, kopts...) if err != nil { return nil, err } @@ -388,6 +389,7 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han } sub.c = c + sub.htracer = htracer go sub.poll(ctx) diff --git a/subscriber.go b/subscriber.go index e94bbb6..0500753 100644 --- a/subscriber.go +++ b/subscriber.go @@ -24,6 +24,7 @@ type consumer struct { c *kgo.Client topic string partition int32 + htracer *hookTracer opts broker.SubscribeOptions kopts broker.Options handler broker.Handler @@ -35,6 +36,7 @@ type consumer struct { type subscriber struct { c *kgo.Client topic string + htracer *hookTracer opts broker.SubscribeOptions kopts broker.Options handler broker.Handler @@ -179,13 +181,13 @@ func (s *subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[str c: c, topic: topic, partition: partition, - - quit: make(chan struct{}), - done: make(chan struct{}), - recs: make(chan kgo.FetchTopicPartition, 100), - handler: s.handler, - kopts: s.kopts, - opts: s.opts, + htracer: s.htracer, + quit: make(chan struct{}), + done: make(chan struct{}), + recs: make(chan kgo.FetchTopicPartition, 100), + handler: s.handler, + kopts: s.kopts, + opts: s.opts, } s.Lock() s.consumers[tp{topic, partition}] = pc @@ -211,6 +213,7 @@ func (pc *consumer) consume() { return case p := <-pc.recs: for _, record := range p.Records { + ctx, sp := pc.htracer.WithProcessSpan(record) ts := time.Now() pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Inc() p := eventPool.Get().(*event) @@ -220,8 +223,7 @@ func (pc *consumer) consume() { p.err = nil p.ack = false p.msg.Header = metadata.New(len(record.Headers)) - p.ctx = record.Context - sp, _ := tracer.SpanFromContext(p.ctx) + p.ctx = ctx for _, hdr := range record.Headers { p.msg.Header.Set(hdr.Key, string(hdr.Value)) } diff --git a/tracer.go b/tracer.go index 0d82fdd..62887d0 100644 --- a/tracer.go +++ b/tracer.go @@ -2,8 +2,6 @@ package kgo import ( "context" - "net" - "time" "unicode/utf8" "github.com/twmb/franz-go/pkg/kgo" @@ -18,40 +16,12 @@ type hookTracer struct { } var ( - _ kgo.HookBrokerConnect = &hookTracer{} - _ kgo.HookBrokerDisconnect = &hookTracer{} - _ kgo.HookBrokerRead = &hookTracer{} - _ kgo.HookBrokerThrottle = &hookTracer{} - _ kgo.HookBrokerWrite = &hookTracer{} - _ kgo.HookFetchBatchRead = &hookTracer{} - _ kgo.HookProduceBatchWritten = &hookTracer{} - _ kgo.HookGroupManageError = &hookTracer{} + _ kgo.HookProduceRecordBuffered = (*hookTracer)(nil) + _ kgo.HookProduceRecordUnbuffered = (*hookTracer)(nil) + _ kgo.HookFetchRecordBuffered = (*hookTracer)(nil) + _ kgo.HookFetchRecordUnbuffered = (*hookTracer)(nil) ) -func (m *hookTracer) OnGroupManageError(err error) { -} - -func (m *hookTracer) OnBrokerConnect(meta kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) { -} - -func (m *hookTracer) OnBrokerDisconnect(meta kgo.BrokerMetadata, _ net.Conn) { -} - -func (m *hookTracer) OnBrokerWrite(meta kgo.BrokerMetadata, _ int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error) { -} - -func (m *hookTracer) OnBrokerRead(meta kgo.BrokerMetadata, _ int16, bytesRead int, readWait, timeToRead time.Duration, err error) { -} - -func (m *hookTracer) OnBrokerThrottle(meta kgo.BrokerMetadata, throttleInterval time.Duration, _ bool) { -} - -func (m *hookTracer) OnProduceBatchWritten(meta kgo.BrokerMetadata, topic string, _ int32, kmetrics kgo.ProduceBatchMetrics) { -} - -func (m *hookTracer) OnFetchBatchRead(meta kgo.BrokerMetadata, topic string, _ int32, kmetrics kgo.FetchBatchMetrics) { -} - // OnProduceRecordBuffered starts a new span for the "publish" operation on a // buffered record. // @@ -88,17 +58,14 @@ func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) { // It sets attributes with values unset when producing and records any error // that occurred during the publish operation. func (m *hookTracer) OnProduceRecordUnbuffered(r *kgo.Record, err error) { - span, ok := tracer.SpanFromContext(r.Context) - if !ok { - return - } - defer span.Finish() + span, _ := tracer.SpanFromContext(r.Context) span.AddLabels( semconv.MessagingKafkaDestinationPartition(int(r.Partition)), ) if err != nil { span.SetStatus(tracer.SpanStatusError, err.Error()) } + span.Finish() } // OnFetchRecordBuffered starts a new span for the "receive" operation on a @@ -143,9 +110,8 @@ func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) { // OnFetchRecordUnbuffered continues and ends the "receive" span for an // unbuffered record. func (m *hookTracer) OnFetchRecordUnbuffered(r *kgo.Record, _ bool) { - if span, ok := tracer.SpanFromContext(r.Context); ok { - defer span.Finish() - } + span, _ := tracer.SpanFromContext(r.Context) + span.Finish() } // WithProcessSpan starts a new span for the "process" operation on a consumer