added set ct for records && updated setHeaders #150

Merged
vtolstov merged 2 commits from devstigneev/micro-broker-kgo:v3 into v3 2025-01-21 15:29:03 +03:00
3 changed files with 32 additions and 16 deletions

View File

@ -1,6 +1,10 @@
package kgo package kgo
import ( import (
"net/http"
"slices"
"strings"
"github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kgo"
"go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v3/metadata"
) )
@ -53,24 +57,37 @@ func (c RecordCarrier) Keys() []string {
return out return out
} }
func setHeaders(r *kgo.Record, md metadata.Metadata) { func setHeaders(r *kgo.Record, md metadata.Metadata, exclude ...string) {
seen := make(map[string]struct{}) seen := make(map[string]struct{})
loop: loop:
for k, v := range md { for k, v := range md {
k = http.CanonicalHeaderKey(k)
if _, ok := seen[k]; ok {
continue loop
}
if slices.ContainsFunc(exclude, func(s string) bool {
return strings.EqualFold(s, k)
}) {
continue loop
}
for i := 0; i < len(r.Headers); i++ { for i := 0; i < len(r.Headers); i++ {
if r.Headers[i].Key == k { if strings.EqualFold(r.Headers[i].Key, k) {
// Key exist, update the value. // Key exist, update the value.
r.Headers[i].Value = []byte(v) r.Headers[i].Value = []byte(v)
continue loop continue loop
} else if _, ok := seen[k]; ok {
continue loop
} }
}
// Key does not exist, append new header. // Key does not exist, append new header.
r.Headers = append(r.Headers, kgo.RecordHeader{ r.Headers = append(r.Headers, kgo.RecordHeader{
Key: k, Key: k,
Value: []byte(v), Value: []byte(v),
}) })
seen[k] = struct{}{} seen[k] = struct{}{}
} }
} }
}

7
kgo.go
View File

@ -6,7 +6,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"math/rand/v2" "math/rand/v2"
"net/http"
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -279,14 +278,14 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br
for _, msg := range msgs { for _, msg := range msgs {
rec := &kgo.Record{Context: ctx, Key: key} rec := &kgo.Record{Context: ctx, Key: key}
rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic) rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic)
msg.Header.Del(metadata.HeaderTopic) msg.Header.Del(metadata.HeaderTopic)
k.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Inc() k.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Inc()
if options.BodyOnly || k.opts.Codec.String() == "noop" { if options.BodyOnly || k.opts.Codec.String() == "noop" {
rec.Value = msg.Body rec.Value = msg.Body
for k, v := range msg.Header { setHeaders(rec, msg.Header)
rec.Headers = append(rec.Headers, kgo.RecordHeader{Key: http.CanonicalHeaderKey(k), Value: []byte(v)})
}
} else { } else {
rec.Value, err = k.opts.Codec.Marshal(msg) rec.Value, err = k.opts.Codec.Marshal(msg)
if err != nil { if err != nil {

View File

@ -68,7 +68,7 @@ func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) {
r.Context, _ = m.tracer.Start(r.Context, "sdk.broker", opts...) r.Context, _ = m.tracer.Start(r.Context, "sdk.broker", opts...)
} }
setHeaders(r, omd) setHeaders(r, omd, metadata.HeaderContentType)
} }
// OnProduceRecordUnbuffered continues and ends the "publish" span for an // OnProduceRecordUnbuffered continues and ends the "publish" span for an
@ -135,7 +135,7 @@ func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) {
r.Context, _ = m.tracer.Start(r.Context, "sdk.broker", opts...) r.Context, _ = m.tracer.Start(r.Context, "sdk.broker", opts...)
} }
setHeaders(r, omd) setHeaders(r, omd, metadata.HeaderContentType)
} }
// OnFetchRecordUnbuffered continues and ends the "receive" span for an // OnFetchRecordUnbuffered continues and ends the "receive" span for an