Compare commits
3 Commits
Author | SHA1 | Date | |
---|---|---|---|
e66194695e | |||
894d6f4f20 | |||
d404fa31ab |
23
carrier.go
23
carrier.go
@@ -2,6 +2,7 @@ package kgo
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/twmb/franz-go/pkg/kgo"
|
"github.com/twmb/franz-go/pkg/kgo"
|
||||||
|
"go.unistack.org/micro/v3/metadata"
|
||||||
)
|
)
|
||||||
|
|
||||||
// RecordCarrier injects and extracts traces from a kgo.Record.
|
// RecordCarrier injects and extracts traces from a kgo.Record.
|
||||||
@@ -51,3 +52,25 @@ func (c RecordCarrier) Keys() []string {
|
|||||||
}
|
}
|
||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func setHeaders(r *kgo.Record, md metadata.Metadata) {
|
||||||
|
seen := make(map[string]struct{})
|
||||||
|
loop:
|
||||||
|
for k, v := range md {
|
||||||
|
for i := 0; i < len(r.Headers); i++ {
|
||||||
|
if r.Headers[i].Key == k {
|
||||||
|
// Key exist, update the value.
|
||||||
|
r.Headers[i].Value = []byte(v)
|
||||||
|
continue loop
|
||||||
|
} else if _, ok := seen[k]; ok {
|
||||||
|
continue loop
|
||||||
|
}
|
||||||
|
// Key does not exist, append new header.
|
||||||
|
r.Headers = append(r.Headers, kgo.RecordHeader{
|
||||||
|
Key: k,
|
||||||
|
Value: []byte(v),
|
||||||
|
})
|
||||||
|
seen[k] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
7
kgo.go
7
kgo.go
@@ -6,6 +6,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@@ -62,7 +63,7 @@ type Broker struct {
|
|||||||
connected bool
|
connected bool
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
opts broker.Options
|
opts broker.Options
|
||||||
subs []*subscriber
|
subs []*Subscriber
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *Broker) Address() string {
|
func (k *Broker) Address() string {
|
||||||
@@ -275,7 +276,7 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br
|
|||||||
if options.BodyOnly || k.opts.Codec.String() == "noop" {
|
if options.BodyOnly || k.opts.Codec.String() == "noop" {
|
||||||
rec.Value = msg.Body
|
rec.Value = msg.Body
|
||||||
for k, v := range msg.Header {
|
for k, v := range msg.Header {
|
||||||
rec.Headers = append(rec.Headers, kgo.RecordHeader{Key: k, Value: []byte(v)})
|
rec.Headers = append(rec.Headers, kgo.RecordHeader{Key: http.CanonicalHeaderKey(k), Value: []byte(v)})
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
rec.Value, err = k.opts.Codec.Marshal(msg)
|
rec.Value, err = k.opts.Codec.Marshal(msg)
|
||||||
@@ -364,7 +365,7 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sub := &subscriber{
|
sub := &Subscriber{
|
||||||
topic: topic,
|
topic: topic,
|
||||||
opts: options,
|
opts: options,
|
||||||
handler: handler,
|
handler: handler,
|
||||||
|
@@ -33,7 +33,7 @@ type consumer struct {
|
|||||||
recs chan kgo.FetchTopicPartition
|
recs chan kgo.FetchTopicPartition
|
||||||
}
|
}
|
||||||
|
|
||||||
type subscriber struct {
|
type Subscriber struct {
|
||||||
c *kgo.Client
|
c *kgo.Client
|
||||||
topic string
|
topic string
|
||||||
htracer *hookTracer
|
htracer *hookTracer
|
||||||
@@ -46,19 +46,19 @@ type subscriber struct {
|
|||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *subscriber) Client() *kgo.Client {
|
func (s *Subscriber) Client() *kgo.Client {
|
||||||
return s.c
|
return s.c
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *subscriber) Options() broker.SubscribeOptions {
|
func (s *Subscriber) Options() broker.SubscribeOptions {
|
||||||
return s.opts
|
return s.opts
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *subscriber) Topic() string {
|
func (s *Subscriber) Topic() string {
|
||||||
return s.topic
|
return s.topic
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *subscriber) Unsubscribe(ctx context.Context) error {
|
func (s *Subscriber) Unsubscribe(ctx context.Context) error {
|
||||||
if s.closed {
|
if s.closed {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -80,7 +80,7 @@ func (s *subscriber) Unsubscribe(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *subscriber) poll(ctx context.Context) {
|
func (s *Subscriber) poll(ctx context.Context) {
|
||||||
maxInflight := DefaultSubscribeMaxInflight
|
maxInflight := DefaultSubscribeMaxInflight
|
||||||
if s.opts.Context != nil {
|
if s.opts.Context != nil {
|
||||||
if n, ok := s.opts.Context.Value(subscribeMaxInflightKey{}).(int); n > 0 && ok {
|
if n, ok := s.opts.Context.Value(subscribeMaxInflightKey{}).(int); n > 0 && ok {
|
||||||
@@ -148,7 +148,7 @@ func (s *subscriber) poll(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *subscriber) killConsumers(ctx context.Context, lost map[string][]int32) {
|
func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32) {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
defer wg.Wait()
|
defer wg.Wait()
|
||||||
|
|
||||||
@@ -165,12 +165,12 @@ func (s *subscriber) killConsumers(ctx context.Context, lost map[string][]int32)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *subscriber) lost(ctx context.Context, _ *kgo.Client, lost map[string][]int32) {
|
func (s *Subscriber) lost(ctx context.Context, _ *kgo.Client, lost map[string][]int32) {
|
||||||
s.kopts.Logger.Debugf(ctx, "[kgo] lost %#+v", lost)
|
s.kopts.Logger.Debugf(ctx, "[kgo] lost %#+v", lost)
|
||||||
s.killConsumers(ctx, lost)
|
s.killConsumers(ctx, lost)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[string][]int32) {
|
func (s *Subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[string][]int32) {
|
||||||
s.kopts.Logger.Debugf(ctx, "[kgo] revoked %#+v", revoked)
|
s.kopts.Logger.Debugf(ctx, "[kgo] revoked %#+v", revoked)
|
||||||
s.killConsumers(ctx, revoked)
|
s.killConsumers(ctx, revoked)
|
||||||
if err := c.CommitMarkedOffsets(ctx); err != nil {
|
if err := c.CommitMarkedOffsets(ctx); err != nil {
|
||||||
@@ -178,7 +178,7 @@ func (s *subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[str
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[string][]int32) {
|
func (s *Subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[string][]int32) {
|
||||||
for topic, partitions := range assigned {
|
for topic, partitions := range assigned {
|
||||||
for _, partition := range partitions {
|
for _, partition := range partitions {
|
||||||
pc := &consumer{
|
pc := &consumer{
|
||||||
|
33
tracer.go
33
tracer.go
@@ -58,12 +58,15 @@ func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) {
|
|||||||
md.Set(h.Key, string(h.Value))
|
md.Set(h.Key, string(h.Value))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start the "publish" span.
|
if !ok {
|
||||||
ctx, _ := m.tracer.Start(metadata.NewOutgoingContext(r.Context, md), r.Topic+" publish", opts...)
|
r.Context, _ = m.tracer.Start(metadata.NewOutgoingContext(r.Context, md), r.Topic+" publish", opts...)
|
||||||
// Inject the span context into the record.
|
} else {
|
||||||
// t.propagators.Inject(ctx, NewRecordCarrier(r))
|
r.Context, _ = m.tracer.Start(r.Context, r.Topic+" publish", opts...)
|
||||||
// Update the record context.
|
}
|
||||||
r.Context = ctx
|
|
||||||
|
md, _ = metadata.FromOutgoingContext(r.Context)
|
||||||
|
|
||||||
|
setHeaders(r, md)
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnProduceRecordUnbuffered continues and ends the "publish" span for an
|
// OnProduceRecordUnbuffered continues and ends the "publish" span for an
|
||||||
@@ -72,7 +75,7 @@ func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) {
|
|||||||
// It sets attributes with values unset when producing and records any error
|
// It sets attributes with values unset when producing and records any error
|
||||||
// that occurred during the publish operation.
|
// that occurred during the publish operation.
|
||||||
func (m *hookTracer) OnProduceRecordUnbuffered(r *kgo.Record, err error) {
|
func (m *hookTracer) OnProduceRecordUnbuffered(r *kgo.Record, err error) {
|
||||||
span, _ := tracer.SpanFromContext(r.Context)
|
if span, ok := tracer.SpanFromContext(r.Context); ok {
|
||||||
span.AddLabels(
|
span.AddLabels(
|
||||||
semconv.MessagingKafkaDestinationPartition(int(r.Partition)),
|
semconv.MessagingKafkaDestinationPartition(int(r.Partition)),
|
||||||
)
|
)
|
||||||
@@ -81,6 +84,7 @@ func (m *hookTracer) OnProduceRecordUnbuffered(r *kgo.Record, err error) {
|
|||||||
}
|
}
|
||||||
span.Finish()
|
span.Finish()
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// OnFetchRecordBuffered starts a new span for the "receive" operation on a
|
// OnFetchRecordBuffered starts a new span for the "receive" operation on a
|
||||||
// buffered record.
|
// buffered record.
|
||||||
@@ -121,12 +125,15 @@ func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) {
|
|||||||
md.Set(h.Key, string(h.Value))
|
md.Set(h.Key, string(h.Value))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Extract the span context from the record.
|
if !ok {
|
||||||
// ctx := t.propagators.Extract(r.Context, NewRecordCarrier(r))
|
r.Context, _ = m.tracer.Start(metadata.NewIncomingContext(r.Context, md), r.Topic+" receive", opts...)
|
||||||
// Start the "receive" span.
|
} else {
|
||||||
newCtx, _ := m.tracer.Start(metadata.NewIncomingContext(r.Context, md), r.Topic+" receive", opts...)
|
r.Context, _ = m.tracer.Start(r.Context, r.Topic+" receive", opts...)
|
||||||
// Update the record context.
|
}
|
||||||
r.Context = newCtx
|
|
||||||
|
md, _ = metadata.FromIncomingContext(r.Context)
|
||||||
|
|
||||||
|
setHeaders(r, md)
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnFetchRecordUnbuffered continues and ends the "receive" span for an
|
// OnFetchRecordUnbuffered continues and ends the "receive" span for an
|
||||||
|
Reference in New Issue
Block a user