improve tracing
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
parent
894d6f4f20
commit
e66194695e
23
carrier.go
23
carrier.go
@ -2,6 +2,7 @@ package kgo
|
||||
|
||||
import (
|
||||
"github.com/twmb/franz-go/pkg/kgo"
|
||||
"go.unistack.org/micro/v3/metadata"
|
||||
)
|
||||
|
||||
// RecordCarrier injects and extracts traces from a kgo.Record.
|
||||
@ -51,3 +52,25 @@ func (c RecordCarrier) Keys() []string {
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func setHeaders(r *kgo.Record, md metadata.Metadata) {
|
||||
seen := make(map[string]struct{})
|
||||
loop:
|
||||
for k, v := range md {
|
||||
for i := 0; i < len(r.Headers); i++ {
|
||||
if r.Headers[i].Key == k {
|
||||
// Key exist, update the value.
|
||||
r.Headers[i].Value = []byte(v)
|
||||
continue loop
|
||||
} else if _, ok := seen[k]; ok {
|
||||
continue loop
|
||||
}
|
||||
// Key does not exist, append new header.
|
||||
r.Headers = append(r.Headers, kgo.RecordHeader{
|
||||
Key: k,
|
||||
Value: []byte(v),
|
||||
})
|
||||
seen[k] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
3
kgo.go
3
kgo.go
@ -6,6 +6,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@ -275,7 +276,7 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br
|
||||
if options.BodyOnly || k.opts.Codec.String() == "noop" {
|
||||
rec.Value = msg.Body
|
||||
for k, v := range msg.Header {
|
||||
rec.Headers = append(rec.Headers, kgo.RecordHeader{Key: k, Value: []byte(v)})
|
||||
rec.Headers = append(rec.Headers, kgo.RecordHeader{Key: http.CanonicalHeaderKey(k), Value: []byte(v)})
|
||||
}
|
||||
} else {
|
||||
rec.Value, err = k.opts.Codec.Marshal(msg)
|
||||
|
18
tracer.go
18
tracer.go
@ -58,15 +58,15 @@ func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) {
|
||||
md.Set(h.Key, string(h.Value))
|
||||
}
|
||||
|
||||
// Inject the span context into the record.
|
||||
// t.propagators.Inject(ctx, NewRecordCarrier(r))
|
||||
// Update the record context.
|
||||
|
||||
if !ok {
|
||||
r.Context, _ = m.tracer.Start(metadata.NewOutgoingContext(r.Context, md), r.Topic+" publish", opts...)
|
||||
} else {
|
||||
r.Context, _ = m.tracer.Start(r.Context, r.Topic+" publish", opts...)
|
||||
}
|
||||
|
||||
md, _ = metadata.FromOutgoingContext(r.Context)
|
||||
|
||||
setHeaders(r, md)
|
||||
}
|
||||
|
||||
// OnProduceRecordUnbuffered continues and ends the "publish" span for an
|
||||
@ -75,7 +75,7 @@ func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) {
|
||||
// It sets attributes with values unset when producing and records any error
|
||||
// that occurred during the publish operation.
|
||||
func (m *hookTracer) OnProduceRecordUnbuffered(r *kgo.Record, err error) {
|
||||
span, _ := tracer.SpanFromContext(r.Context)
|
||||
if span, ok := tracer.SpanFromContext(r.Context); ok {
|
||||
span.AddLabels(
|
||||
semconv.MessagingKafkaDestinationPartition(int(r.Partition)),
|
||||
)
|
||||
@ -83,6 +83,7 @@ func (m *hookTracer) OnProduceRecordUnbuffered(r *kgo.Record, err error) {
|
||||
span.SetStatus(tracer.SpanStatusError, err.Error())
|
||||
}
|
||||
span.Finish()
|
||||
}
|
||||
}
|
||||
|
||||
// OnFetchRecordBuffered starts a new span for the "receive" operation on a
|
||||
@ -124,16 +125,15 @@ func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) {
|
||||
md.Set(h.Key, string(h.Value))
|
||||
}
|
||||
|
||||
// Extract the span context from the record.
|
||||
// ctx := t.propagators.Extract(r.Context, NewRecordCarrier(r))
|
||||
// Start the "receive" span.
|
||||
if !ok {
|
||||
r.Context, _ = m.tracer.Start(metadata.NewIncomingContext(r.Context, md), r.Topic+" receive", opts...)
|
||||
} else {
|
||||
r.Context, _ = m.tracer.Start(r.Context, r.Topic+" receive", opts...)
|
||||
}
|
||||
|
||||
// Update the record context.
|
||||
md, _ = metadata.FromIncomingContext(r.Context)
|
||||
|
||||
setHeaders(r, md)
|
||||
}
|
||||
|
||||
// OnFetchRecordUnbuffered continues and ends the "receive" span for an
|
||||
|
Loading…
Reference in New Issue
Block a user