From 7fa1fbeb446b309681ed55a1cd2e1487b6ef4085 Mon Sep 17 00:00:00 2001
From: Evstigneev Denis <danteevstigneev@yandex.ru>
Date: Mon, 20 Jan 2025 14:24:04 +0300
Subject: [PATCH] update logic in setHeaders

---
 carrier.go | 15 +++++++++++++--
 kgo.go     |  8 +-------
 tracer.go  |  4 ++--
 3 files changed, 16 insertions(+), 11 deletions(-)

diff --git a/carrier.go b/carrier.go
index 5d4d672..5c55e25 100644
--- a/carrier.go
+++ b/carrier.go
@@ -3,6 +3,9 @@ package kgo
 import (
 	"github.com/twmb/franz-go/pkg/kgo"
 	"go.unistack.org/micro/v3/metadata"
+	"net/http"
+	"slices"
+	"strings"
 )
 
 // RecordCarrier injects and extracts traces from a kgo.Record.
@@ -53,17 +56,25 @@ func (c RecordCarrier) Keys() []string {
 	return out
 }
 
-func setHeaders(r *kgo.Record, md metadata.Metadata) {
+func setHeaders(r *kgo.Record, md metadata.Metadata, exclude ...string) {
 	seen := make(map[string]struct{})
 
 loop:
 	for k, v := range md {
+		k = http.CanonicalHeaderKey(k)
+
 		if _, ok := seen[k]; ok {
 			continue loop
 		}
 
+		if slices.ContainsFunc(exclude, func(s string) bool {
+			return strings.EqualFold(s, k)
+		}) {
+			continue loop
+		}
+
 		for i := 0; i < len(r.Headers); i++ {
-			if r.Headers[i].Key == k {
+			if strings.EqualFold(r.Headers[i].Key, k) {
 				// Key exist, update the value.
 				r.Headers[i].Value = []byte(v)
 				continue loop
diff --git a/kgo.go b/kgo.go
index d8aa989..af0549e 100644
--- a/kgo.go
+++ b/kgo.go
@@ -6,7 +6,6 @@ import (
 	"errors"
 	"fmt"
 	"math/rand/v2"
-	"net/http"
 	"strings"
 	"sync"
 	"sync/atomic"
@@ -283,15 +282,10 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br
 		rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic)
 		msg.Header.Del(metadata.HeaderTopic)
 
-		ct, _ := msg.Header.Get(metadata.HeaderContentType)
-		rec.Headers = append(rec.Headers, kgo.RecordHeader{Key: metadata.HeaderContentType, Value: []byte(ct)})
-
 		k.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Inc()
 		if options.BodyOnly || k.opts.Codec.String() == "noop" {
 			rec.Value = msg.Body
-			for k, v := range msg.Header {
-				rec.Headers = append(rec.Headers, kgo.RecordHeader{Key: http.CanonicalHeaderKey(k), Value: []byte(v)})
-			}
+			setHeaders(rec, msg.Header)
 		} else {
 			rec.Value, err = k.opts.Codec.Marshal(msg)
 			if err != nil {
diff --git a/tracer.go b/tracer.go
index 4fa1212..feca43c 100644
--- a/tracer.go
+++ b/tracer.go
@@ -68,7 +68,7 @@ func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) {
 		r.Context, _ = m.tracer.Start(r.Context, "sdk.broker", opts...)
 	}
 
-	setHeaders(r, omd)
+	setHeaders(r, omd, metadata.HeaderContentType)
 }
 
 // OnProduceRecordUnbuffered continues and ends the "publish" span for an
@@ -135,7 +135,7 @@ func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) {
 		r.Context, _ = m.tracer.Start(r.Context, "sdk.broker", opts...)
 	}
 
-	setHeaders(r, omd)
+	setHeaders(r, omd, metadata.HeaderContentType)
 }
 
 // OnFetchRecordUnbuffered continues and ends the "receive" span for an