fixup tracing
Some checks failed
build / test (push) Failing after 1m46s
codeql / analyze (go) (push) Failing after 1m45s
build / lint (push) Successful in 9m12s

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2024-05-06 07:30:17 +03:00
parent 25dda1f34c
commit 8fcc23f639
3 changed files with 29 additions and 59 deletions

View File

@@ -24,6 +24,7 @@ type consumer struct {
c *kgo.Client
topic string
partition int32
htracer *hookTracer
opts broker.SubscribeOptions
kopts broker.Options
handler broker.Handler
@@ -35,6 +36,7 @@ type consumer struct {
type subscriber struct {
c *kgo.Client
topic string
htracer *hookTracer
opts broker.SubscribeOptions
kopts broker.Options
handler broker.Handler
@@ -179,13 +181,13 @@ func (s *subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[str
c: c,
topic: topic,
partition: partition,
quit: make(chan struct{}),
done: make(chan struct{}),
recs: make(chan kgo.FetchTopicPartition, 100),
handler: s.handler,
kopts: s.kopts,
opts: s.opts,
htracer: s.htracer,
quit: make(chan struct{}),
done: make(chan struct{}),
recs: make(chan kgo.FetchTopicPartition, 100),
handler: s.handler,
kopts: s.kopts,
opts: s.opts,
}
s.Lock()
s.consumers[tp{topic, partition}] = pc
@@ -211,6 +213,7 @@ func (pc *consumer) consume() {
return
case p := <-pc.recs:
for _, record := range p.Records {
ctx, sp := pc.htracer.WithProcessSpan(record)
ts := time.Now()
pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Inc()
p := eventPool.Get().(*event)
@@ -220,8 +223,7 @@ func (pc *consumer) consume() {
p.err = nil
p.ack = false
p.msg.Header = metadata.New(len(record.Headers))
p.ctx = record.Context
sp, _ := tracer.SpanFromContext(p.ctx)
p.ctx = ctx
for _, hdr := range record.Headers {
p.msg.Header.Set(hdr.Key, string(hdr.Value))
}