fixup tracing
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
parent
7329bc23bc
commit
fe66086c40
6
event.go
6
event.go
@ -1,12 +1,14 @@
|
|||||||
package kgo
|
package kgo
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/broker"
|
"go.unistack.org/micro/v3/broker"
|
||||||
)
|
)
|
||||||
|
|
||||||
type event struct {
|
type event struct {
|
||||||
|
ctx context.Context
|
||||||
topic string
|
topic string
|
||||||
err error
|
err error
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
@ -14,6 +16,10 @@ type event struct {
|
|||||||
ack bool
|
ack bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *event) Context() context.Context {
|
||||||
|
return p.ctx
|
||||||
|
}
|
||||||
|
|
||||||
func (p *event) Topic() string {
|
func (p *event) Topic() string {
|
||||||
return p.topic
|
return p.topic
|
||||||
}
|
}
|
||||||
|
4
kgo.go
4
kgo.go
@ -234,10 +234,6 @@ func (k *Broker) Publish(ctx context.Context, topic string, msg *broker.Message,
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
|
func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
|
||||||
var span tracer.Span
|
|
||||||
ctx, span = k.opts.Tracer.Start(ctx, "Publish")
|
|
||||||
defer span.Finish()
|
|
||||||
|
|
||||||
k.Lock()
|
k.Lock()
|
||||||
if !k.connected {
|
if !k.connected {
|
||||||
c, err := k.connect(ctx, k.kopts...)
|
c, err := k.connect(ctx, k.kopts...)
|
||||||
|
@ -12,6 +12,7 @@ import (
|
|||||||
"go.unistack.org/micro/v3/logger"
|
"go.unistack.org/micro/v3/logger"
|
||||||
"go.unistack.org/micro/v3/metadata"
|
"go.unistack.org/micro/v3/metadata"
|
||||||
"go.unistack.org/micro/v3/semconv"
|
"go.unistack.org/micro/v3/semconv"
|
||||||
|
"go.unistack.org/micro/v3/tracer"
|
||||||
)
|
)
|
||||||
|
|
||||||
type tp struct {
|
type tp struct {
|
||||||
@ -219,6 +220,9 @@ func (pc *consumer) consume() {
|
|||||||
p.err = nil
|
p.err = nil
|
||||||
p.ack = false
|
p.ack = false
|
||||||
p.msg.Header = metadata.New(len(record.Headers))
|
p.msg.Header = metadata.New(len(record.Headers))
|
||||||
|
p.ctx = record.Context
|
||||||
|
sp, _ := tracer.SpanFromContext(p.ctx)
|
||||||
|
defer sp.Finish()
|
||||||
for _, hdr := range record.Headers {
|
for _, hdr := range record.Headers {
|
||||||
p.msg.Header.Set(hdr.Key, string(hdr.Value))
|
p.msg.Header.Set(hdr.Key, string(hdr.Value))
|
||||||
}
|
}
|
||||||
@ -227,7 +231,11 @@ func (pc *consumer) consume() {
|
|||||||
} else if pc.opts.BodyOnly {
|
} else if pc.opts.BodyOnly {
|
||||||
p.msg.Body = record.Value
|
p.msg.Body = record.Value
|
||||||
} else {
|
} else {
|
||||||
if err := pc.kopts.Codec.Unmarshal(record.Value, p.msg); err != nil {
|
sp.AddEvent("codec unmarshal start")
|
||||||
|
err := pc.kopts.Codec.Unmarshal(record.Value, p.msg)
|
||||||
|
sp.AddEvent("codec unmarshal stop")
|
||||||
|
if err != nil {
|
||||||
|
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||||
pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "failure").Inc()
|
pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "failure").Inc()
|
||||||
p.err = err
|
p.err = err
|
||||||
p.msg.Body = record.Value
|
p.msg.Body = record.Value
|
||||||
@ -260,10 +268,13 @@ func (pc *consumer) consume() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
sp.AddEvent("handler start")
|
||||||
err := pc.handler(p)
|
err := pc.handler(p)
|
||||||
|
sp.AddEvent("handler stop")
|
||||||
if err == nil {
|
if err == nil {
|
||||||
pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "success").Inc()
|
pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "success").Inc()
|
||||||
} else {
|
} else {
|
||||||
|
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||||
pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "failure").Inc()
|
pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "failure").Inc()
|
||||||
}
|
}
|
||||||
pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Dec()
|
pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Dec()
|
||||||
|
Loading…
Reference in New Issue
Block a user