From c71478ee9a88d78f308adbf8b397a12a5524301e Mon Sep 17 00:00:00 2001 From: Evstigneev Denis Date: Mon, 20 Jan 2025 13:46:11 +0300 Subject: [PATCH 1/2] add set ct for records && update setHeaders --- carrier.go | 21 +++++++++++++-------- kgo.go | 5 +++++ 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/carrier.go b/carrier.go index b0a540d..5d4d672 100644 --- a/carrier.go +++ b/carrier.go @@ -55,22 +55,27 @@ func (c RecordCarrier) Keys() []string { func setHeaders(r *kgo.Record, md metadata.Metadata) { seen := make(map[string]struct{}) + loop: for k, v := range md { + if _, ok := seen[k]; ok { + continue loop + } + for i := 0; i < len(r.Headers); i++ { if r.Headers[i].Key == k { // Key exist, update the value. r.Headers[i].Value = []byte(v) continue loop - } else if _, ok := seen[k]; ok { - continue loop } - // Key does not exist, append new header. - r.Headers = append(r.Headers, kgo.RecordHeader{ - Key: k, - Value: []byte(v), - }) - seen[k] = struct{}{} } + + // Key does not exist, append new header. + r.Headers = append(r.Headers, kgo.RecordHeader{ + Key: k, + Value: []byte(v), + }) + + seen[k] = struct{}{} } } diff --git a/kgo.go b/kgo.go index 6a96ea7..d8aa989 100644 --- a/kgo.go +++ b/kgo.go @@ -279,8 +279,13 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br for _, msg := range msgs { rec := &kgo.Record{Context: ctx, Key: key} + rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic) msg.Header.Del(metadata.HeaderTopic) + + ct, _ := msg.Header.Get(metadata.HeaderContentType) + rec.Headers = append(rec.Headers, kgo.RecordHeader{Key: metadata.HeaderContentType, Value: []byte(ct)}) + k.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Inc() if options.BodyOnly || k.opts.Codec.String() == "noop" { rec.Value = msg.Body -- 2.47.1 From 0d2d0fe77478e92f0dc7c42e47e4c92b0989b6e3 Mon Sep 17 00:00:00 2001 From: Evstigneev Denis Date: Mon, 20 Jan 2025 14:24:04 +0300 Subject: [PATCH 2/2] update logic in setHeaders --- carrier.go | 16 ++++++++++++++-- kgo.go | 8 +------- tracer.go | 4 ++-- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/carrier.go b/carrier.go index 5d4d672..2d3c33c 100644 --- a/carrier.go +++ b/carrier.go @@ -1,6 +1,10 @@ package kgo import ( + "net/http" + "slices" + "strings" + "github.com/twmb/franz-go/pkg/kgo" "go.unistack.org/micro/v3/metadata" ) @@ -53,17 +57,25 @@ func (c RecordCarrier) Keys() []string { return out } -func setHeaders(r *kgo.Record, md metadata.Metadata) { +func setHeaders(r *kgo.Record, md metadata.Metadata, exclude ...string) { seen := make(map[string]struct{}) loop: for k, v := range md { + k = http.CanonicalHeaderKey(k) + if _, ok := seen[k]; ok { continue loop } + if slices.ContainsFunc(exclude, func(s string) bool { + return strings.EqualFold(s, k) + }) { + continue loop + } + for i := 0; i < len(r.Headers); i++ { - if r.Headers[i].Key == k { + if strings.EqualFold(r.Headers[i].Key, k) { // Key exist, update the value. r.Headers[i].Value = []byte(v) continue loop diff --git a/kgo.go b/kgo.go index d8aa989..af0549e 100644 --- a/kgo.go +++ b/kgo.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "math/rand/v2" - "net/http" "strings" "sync" "sync/atomic" @@ -283,15 +282,10 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic) msg.Header.Del(metadata.HeaderTopic) - ct, _ := msg.Header.Get(metadata.HeaderContentType) - rec.Headers = append(rec.Headers, kgo.RecordHeader{Key: metadata.HeaderContentType, Value: []byte(ct)}) - k.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Inc() if options.BodyOnly || k.opts.Codec.String() == "noop" { rec.Value = msg.Body - for k, v := range msg.Header { - rec.Headers = append(rec.Headers, kgo.RecordHeader{Key: http.CanonicalHeaderKey(k), Value: []byte(v)}) - } + setHeaders(rec, msg.Header) } else { rec.Value, err = k.opts.Codec.Marshal(msg) if err != nil { diff --git a/tracer.go b/tracer.go index 4fa1212..feca43c 100644 --- a/tracer.go +++ b/tracer.go @@ -68,7 +68,7 @@ func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) { r.Context, _ = m.tracer.Start(r.Context, "sdk.broker", opts...) } - setHeaders(r, omd) + setHeaders(r, omd, metadata.HeaderContentType) } // OnProduceRecordUnbuffered continues and ends the "publish" span for an @@ -135,7 +135,7 @@ func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) { r.Context, _ = m.tracer.Start(r.Context, "sdk.broker", opts...) } - setHeaders(r, omd) + setHeaders(r, omd, metadata.HeaderContentType) } // OnFetchRecordUnbuffered continues and ends the "receive" span for an -- 2.47.1