diff --git a/event.go b/event.go index 6dbf963..c701616 100644 --- a/event.go +++ b/event.go @@ -1,12 +1,14 @@ package kgo import ( + "context" "sync" "go.unistack.org/micro/v3/broker" ) type event struct { + ctx context.Context topic string err error sync.RWMutex @@ -14,6 +16,10 @@ type event struct { ack bool } +func (p *event) Context() context.Context { + return p.ctx +} + func (p *event) Topic() string { return p.topic } diff --git a/kgo.go b/kgo.go index 315d28e..fda3910 100644 --- a/kgo.go +++ b/kgo.go @@ -234,10 +234,6 @@ func (k *Broker) Publish(ctx context.Context, topic string, msg *broker.Message, } func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { - var span tracer.Span - ctx, span = k.opts.Tracer.Start(ctx, "Publish") - defer span.Finish() - k.Lock() if !k.connected { c, err := k.connect(ctx, k.kopts...) diff --git a/subscriber.go b/subscriber.go index 472264b..b9eaa0a 100644 --- a/subscriber.go +++ b/subscriber.go @@ -12,6 +12,7 @@ import ( "go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v3/semconv" + "go.unistack.org/micro/v3/tracer" ) type tp struct { @@ -219,6 +220,9 @@ func (pc *consumer) consume() { p.err = nil p.ack = false p.msg.Header = metadata.New(len(record.Headers)) + p.ctx = record.Context + sp, _ := tracer.SpanFromContext(p.ctx) + defer sp.Finish() for _, hdr := range record.Headers { p.msg.Header.Set(hdr.Key, string(hdr.Value)) } @@ -227,7 +231,11 @@ func (pc *consumer) consume() { } else if pc.opts.BodyOnly { p.msg.Body = record.Value } else { - if err := pc.kopts.Codec.Unmarshal(record.Value, p.msg); err != nil { + sp.AddEvent("codec unmarshal start") + err := pc.kopts.Codec.Unmarshal(record.Value, p.msg) + sp.AddEvent("codec unmarshal stop") + if err != nil { + sp.SetStatus(tracer.SpanStatusError, err.Error()) pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "failure").Inc() p.err = err p.msg.Body = record.Value @@ -260,10 +268,13 @@ func (pc *consumer) consume() { return } } + sp.AddEvent("handler start") err := pc.handler(p) + sp.AddEvent("handler stop") if err == nil { pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "success").Inc() } else { + sp.SetStatus(tracer.SpanStatusError, err.Error()) pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "failure").Inc() } pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Dec()