update logic in setHeaders
All checks were successful
lint / lint (pull_request) Successful in 1m35s
test / test (pull_request) Successful in 3m14s

This commit is contained in:
Денис Евстигнеев 2025-01-20 14:24:04 +03:00
parent c71478ee9a
commit 0d2d0fe774
3 changed files with 17 additions and 11 deletions

View File

@ -1,6 +1,10 @@
package kgo
import (
"net/http"
"slices"
"strings"
"github.com/twmb/franz-go/pkg/kgo"
"go.unistack.org/micro/v3/metadata"
)
@ -53,17 +57,25 @@ func (c RecordCarrier) Keys() []string {
return out
}
func setHeaders(r *kgo.Record, md metadata.Metadata) {
func setHeaders(r *kgo.Record, md metadata.Metadata, exclude ...string) {
seen := make(map[string]struct{})
loop:
for k, v := range md {
k = http.CanonicalHeaderKey(k)
if _, ok := seen[k]; ok {
continue loop
}
if slices.ContainsFunc(exclude, func(s string) bool {
return strings.EqualFold(s, k)
}) {
continue loop
}
for i := 0; i < len(r.Headers); i++ {
if r.Headers[i].Key == k {
if strings.EqualFold(r.Headers[i].Key, k) {
// Key exist, update the value.
r.Headers[i].Value = []byte(v)
continue loop

8
kgo.go
View File

@ -6,7 +6,6 @@ import (
"errors"
"fmt"
"math/rand/v2"
"net/http"
"strings"
"sync"
"sync/atomic"
@ -283,15 +282,10 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br
rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic)
msg.Header.Del(metadata.HeaderTopic)
ct, _ := msg.Header.Get(metadata.HeaderContentType)
rec.Headers = append(rec.Headers, kgo.RecordHeader{Key: metadata.HeaderContentType, Value: []byte(ct)})
k.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Inc()
if options.BodyOnly || k.opts.Codec.String() == "noop" {
rec.Value = msg.Body
for k, v := range msg.Header {
rec.Headers = append(rec.Headers, kgo.RecordHeader{Key: http.CanonicalHeaderKey(k), Value: []byte(v)})
}
setHeaders(rec, msg.Header)
} else {
rec.Value, err = k.opts.Codec.Marshal(msg)
if err != nil {

View File

@ -68,7 +68,7 @@ func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) {
r.Context, _ = m.tracer.Start(r.Context, "sdk.broker", opts...)
}
setHeaders(r, omd)
setHeaders(r, omd, metadata.HeaderContentType)
}
// OnProduceRecordUnbuffered continues and ends the "publish" span for an
@ -135,7 +135,7 @@ func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) {
r.Context, _ = m.tracer.Start(r.Context, "sdk.broker", opts...)
}
setHeaders(r, omd)
setHeaders(r, omd, metadata.HeaderContentType)
}
// OnFetchRecordUnbuffered continues and ends the "receive" span for an