From ae4ae64694ab540abb2dbe3c946c5be871e38ad2 Mon Sep 17 00:00:00 2001 From: Evstigneev Denis Date: Tue, 21 Jan 2025 15:29:02 +0300 Subject: [PATCH] added set ct for records && updated setHeaders (#150) Reviewed-on: https://git.unistack.org/unistack-org/micro-broker-kgo/pulls/150 Co-authored-by: Evstigneev Denis Co-committed-by: Evstigneev Denis --- carrier.go | 37 +++++++++++++++++++++++++++---------- kgo.go | 7 +++---- tracer.go | 4 ++-- 3 files changed, 32 insertions(+), 16 deletions(-) diff --git a/carrier.go b/carrier.go index b0a540d..2d3c33c 100644 --- a/carrier.go +++ b/carrier.go @@ -1,6 +1,10 @@ package kgo import ( + "net/http" + "slices" + "strings" + "github.com/twmb/franz-go/pkg/kgo" "go.unistack.org/micro/v3/metadata" ) @@ -53,24 +57,37 @@ func (c RecordCarrier) Keys() []string { return out } -func setHeaders(r *kgo.Record, md metadata.Metadata) { +func setHeaders(r *kgo.Record, md metadata.Metadata, exclude ...string) { seen := make(map[string]struct{}) + loop: for k, v := range md { + k = http.CanonicalHeaderKey(k) + + if _, ok := seen[k]; ok { + continue loop + } + + if slices.ContainsFunc(exclude, func(s string) bool { + return strings.EqualFold(s, k) + }) { + continue loop + } + for i := 0; i < len(r.Headers); i++ { - if r.Headers[i].Key == k { + if strings.EqualFold(r.Headers[i].Key, k) { // Key exist, update the value. r.Headers[i].Value = []byte(v) continue loop - } else if _, ok := seen[k]; ok { - continue loop } - // Key does not exist, append new header. - r.Headers = append(r.Headers, kgo.RecordHeader{ - Key: k, - Value: []byte(v), - }) - seen[k] = struct{}{} } + + // Key does not exist, append new header. + r.Headers = append(r.Headers, kgo.RecordHeader{ + Key: k, + Value: []byte(v), + }) + + seen[k] = struct{}{} } } diff --git a/kgo.go b/kgo.go index 6a96ea7..af0549e 100644 --- a/kgo.go +++ b/kgo.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "math/rand/v2" - "net/http" "strings" "sync" "sync/atomic" @@ -279,14 +278,14 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br for _, msg := range msgs { rec := &kgo.Record{Context: ctx, Key: key} + rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic) msg.Header.Del(metadata.HeaderTopic) + k.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Inc() if options.BodyOnly || k.opts.Codec.String() == "noop" { rec.Value = msg.Body - for k, v := range msg.Header { - rec.Headers = append(rec.Headers, kgo.RecordHeader{Key: http.CanonicalHeaderKey(k), Value: []byte(v)}) - } + setHeaders(rec, msg.Header) } else { rec.Value, err = k.opts.Codec.Marshal(msg) if err != nil { diff --git a/tracer.go b/tracer.go index 4fa1212..feca43c 100644 --- a/tracer.go +++ b/tracer.go @@ -68,7 +68,7 @@ func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) { r.Context, _ = m.tracer.Start(r.Context, "sdk.broker", opts...) } - setHeaders(r, omd) + setHeaders(r, omd, metadata.HeaderContentType) } // OnProduceRecordUnbuffered continues and ends the "publish" span for an @@ -135,7 +135,7 @@ func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) { r.Context, _ = m.tracer.Start(r.Context, "sdk.broker", opts...) } - setHeaders(r, omd) + setHeaders(r, omd, metadata.HeaderContentType) } // OnFetchRecordUnbuffered continues and ends the "receive" span for an