Compare commits

...

3 Commits

Author SHA1 Message Date
e66194695e improve tracing
Some checks failed
build / test (push) Failing after 25s
build / lint (push) Successful in 22s
codeql / analyze (go) (push) Failing after 46s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-07-22 23:45:23 +03:00
894d6f4f20 tracing fixes
Some checks failed
build / lint (push) Successful in 27s
build / test (push) Failing after 29s
codeql / analyze (go) (push) Failing after 50s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-07-22 01:11:33 +03:00
d404fa31ab export Subscriber
Some checks failed
build / test (push) Failing after 1m38s
codeql / analyze (go) (push) Failing after 1m59s
build / lint (push) Successful in 9m15s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-05-22 18:28:51 +03:00
4 changed files with 63 additions and 32 deletions

View File

@@ -2,6 +2,7 @@ package kgo
import ( import (
"github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kgo"
"go.unistack.org/micro/v3/metadata"
) )
// RecordCarrier injects and extracts traces from a kgo.Record. // RecordCarrier injects and extracts traces from a kgo.Record.
@@ -51,3 +52,25 @@ func (c RecordCarrier) Keys() []string {
} }
return out return out
} }
func setHeaders(r *kgo.Record, md metadata.Metadata) {
seen := make(map[string]struct{})
loop:
for k, v := range md {
for i := 0; i < len(r.Headers); i++ {
if r.Headers[i].Key == k {
// Key exist, update the value.
r.Headers[i].Value = []byte(v)
continue loop
} else if _, ok := seen[k]; ok {
continue loop
}
// Key does not exist, append new header.
r.Headers = append(r.Headers, kgo.RecordHeader{
Key: k,
Value: []byte(v),
})
seen[k] = struct{}{}
}
}
}

7
kgo.go
View File

@@ -6,6 +6,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"math/rand" "math/rand"
"net/http"
"strings" "strings"
"sync" "sync"
"time" "time"
@@ -62,7 +63,7 @@ type Broker struct {
connected bool connected bool
sync.RWMutex sync.RWMutex
opts broker.Options opts broker.Options
subs []*subscriber subs []*Subscriber
} }
func (k *Broker) Address() string { func (k *Broker) Address() string {
@@ -275,7 +276,7 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br
if options.BodyOnly || k.opts.Codec.String() == "noop" { if options.BodyOnly || k.opts.Codec.String() == "noop" {
rec.Value = msg.Body rec.Value = msg.Body
for k, v := range msg.Header { for k, v := range msg.Header {
rec.Headers = append(rec.Headers, kgo.RecordHeader{Key: k, Value: []byte(v)}) rec.Headers = append(rec.Headers, kgo.RecordHeader{Key: http.CanonicalHeaderKey(k), Value: []byte(v)})
} }
} else { } else {
rec.Value, err = k.opts.Codec.Marshal(msg) rec.Value, err = k.opts.Codec.Marshal(msg)
@@ -364,7 +365,7 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han
} }
} }
sub := &subscriber{ sub := &Subscriber{
topic: topic, topic: topic,
opts: options, opts: options,
handler: handler, handler: handler,

View File

@@ -33,7 +33,7 @@ type consumer struct {
recs chan kgo.FetchTopicPartition recs chan kgo.FetchTopicPartition
} }
type subscriber struct { type Subscriber struct {
c *kgo.Client c *kgo.Client
topic string topic string
htracer *hookTracer htracer *hookTracer
@@ -46,19 +46,19 @@ type subscriber struct {
sync.RWMutex sync.RWMutex
} }
func (s *subscriber) Client() *kgo.Client { func (s *Subscriber) Client() *kgo.Client {
return s.c return s.c
} }
func (s *subscriber) Options() broker.SubscribeOptions { func (s *Subscriber) Options() broker.SubscribeOptions {
return s.opts return s.opts
} }
func (s *subscriber) Topic() string { func (s *Subscriber) Topic() string {
return s.topic return s.topic
} }
func (s *subscriber) Unsubscribe(ctx context.Context) error { func (s *Subscriber) Unsubscribe(ctx context.Context) error {
if s.closed { if s.closed {
return nil return nil
} }
@@ -80,7 +80,7 @@ func (s *subscriber) Unsubscribe(ctx context.Context) error {
return nil return nil
} }
func (s *subscriber) poll(ctx context.Context) { func (s *Subscriber) poll(ctx context.Context) {
maxInflight := DefaultSubscribeMaxInflight maxInflight := DefaultSubscribeMaxInflight
if s.opts.Context != nil { if s.opts.Context != nil {
if n, ok := s.opts.Context.Value(subscribeMaxInflightKey{}).(int); n > 0 && ok { if n, ok := s.opts.Context.Value(subscribeMaxInflightKey{}).(int); n > 0 && ok {
@@ -148,7 +148,7 @@ func (s *subscriber) poll(ctx context.Context) {
} }
} }
func (s *subscriber) killConsumers(ctx context.Context, lost map[string][]int32) { func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32) {
var wg sync.WaitGroup var wg sync.WaitGroup
defer wg.Wait() defer wg.Wait()
@@ -165,12 +165,12 @@ func (s *subscriber) killConsumers(ctx context.Context, lost map[string][]int32)
} }
} }
func (s *subscriber) lost(ctx context.Context, _ *kgo.Client, lost map[string][]int32) { func (s *Subscriber) lost(ctx context.Context, _ *kgo.Client, lost map[string][]int32) {
s.kopts.Logger.Debugf(ctx, "[kgo] lost %#+v", lost) s.kopts.Logger.Debugf(ctx, "[kgo] lost %#+v", lost)
s.killConsumers(ctx, lost) s.killConsumers(ctx, lost)
} }
func (s *subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[string][]int32) { func (s *Subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[string][]int32) {
s.kopts.Logger.Debugf(ctx, "[kgo] revoked %#+v", revoked) s.kopts.Logger.Debugf(ctx, "[kgo] revoked %#+v", revoked)
s.killConsumers(ctx, revoked) s.killConsumers(ctx, revoked)
if err := c.CommitMarkedOffsets(ctx); err != nil { if err := c.CommitMarkedOffsets(ctx); err != nil {
@@ -178,7 +178,7 @@ func (s *subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[str
} }
} }
func (s *subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[string][]int32) { func (s *Subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[string][]int32) {
for topic, partitions := range assigned { for topic, partitions := range assigned {
for _, partition := range partitions { for _, partition := range partitions {
pc := &consumer{ pc := &consumer{

View File

@@ -58,12 +58,15 @@ func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) {
md.Set(h.Key, string(h.Value)) md.Set(h.Key, string(h.Value))
} }
// Start the "publish" span. if !ok {
ctx, _ := m.tracer.Start(metadata.NewOutgoingContext(r.Context, md), r.Topic+" publish", opts...) r.Context, _ = m.tracer.Start(metadata.NewOutgoingContext(r.Context, md), r.Topic+" publish", opts...)
// Inject the span context into the record. } else {
// t.propagators.Inject(ctx, NewRecordCarrier(r)) r.Context, _ = m.tracer.Start(r.Context, r.Topic+" publish", opts...)
// Update the record context. }
r.Context = ctx
md, _ = metadata.FromOutgoingContext(r.Context)
setHeaders(r, md)
} }
// OnProduceRecordUnbuffered continues and ends the "publish" span for an // OnProduceRecordUnbuffered continues and ends the "publish" span for an
@@ -72,14 +75,15 @@ func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) {
// It sets attributes with values unset when producing and records any error // It sets attributes with values unset when producing and records any error
// that occurred during the publish operation. // that occurred during the publish operation.
func (m *hookTracer) OnProduceRecordUnbuffered(r *kgo.Record, err error) { func (m *hookTracer) OnProduceRecordUnbuffered(r *kgo.Record, err error) {
span, _ := tracer.SpanFromContext(r.Context) if span, ok := tracer.SpanFromContext(r.Context); ok {
span.AddLabels( span.AddLabels(
semconv.MessagingKafkaDestinationPartition(int(r.Partition)), semconv.MessagingKafkaDestinationPartition(int(r.Partition)),
) )
if err != nil { if err != nil {
span.SetStatus(tracer.SpanStatusError, err.Error()) span.SetStatus(tracer.SpanStatusError, err.Error())
}
span.Finish()
} }
span.Finish()
} }
// OnFetchRecordBuffered starts a new span for the "receive" operation on a // OnFetchRecordBuffered starts a new span for the "receive" operation on a
@@ -121,12 +125,15 @@ func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) {
md.Set(h.Key, string(h.Value)) md.Set(h.Key, string(h.Value))
} }
// Extract the span context from the record. if !ok {
// ctx := t.propagators.Extract(r.Context, NewRecordCarrier(r)) r.Context, _ = m.tracer.Start(metadata.NewIncomingContext(r.Context, md), r.Topic+" receive", opts...)
// Start the "receive" span. } else {
newCtx, _ := m.tracer.Start(metadata.NewIncomingContext(r.Context, md), r.Topic+" receive", opts...) r.Context, _ = m.tracer.Start(r.Context, r.Topic+" receive", opts...)
// Update the record context. }
r.Context = newCtx
md, _ = metadata.FromIncomingContext(r.Context)
setHeaders(r, md)
} }
// OnFetchRecordUnbuffered continues and ends the "receive" span for an // OnFetchRecordUnbuffered continues and ends the "receive" span for an