Compare commits
	
		
			2 Commits
		
	
	
		
			v3.9.68
			...
			7fa1fbeb44
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 7fa1fbeb44 | |||
| c71478ee9a | 
							
								
								
									
										36
									
								
								carrier.go
									
									
									
									
									
								
							
							
						
						
									
										36
									
								
								carrier.go
									
									
									
									
									
								
							@@ -3,6 +3,9 @@ package kgo
 | 
			
		||||
import (
 | 
			
		||||
	"github.com/twmb/franz-go/pkg/kgo"
 | 
			
		||||
	"go.unistack.org/micro/v3/metadata"
 | 
			
		||||
	"net/http"
 | 
			
		||||
	"slices"
 | 
			
		||||
	"strings"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// RecordCarrier injects and extracts traces from a kgo.Record.
 | 
			
		||||
@@ -53,24 +56,37 @@ func (c RecordCarrier) Keys() []string {
 | 
			
		||||
	return out
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func setHeaders(r *kgo.Record, md metadata.Metadata) {
 | 
			
		||||
func setHeaders(r *kgo.Record, md metadata.Metadata, exclude ...string) {
 | 
			
		||||
	seen := make(map[string]struct{})
 | 
			
		||||
 | 
			
		||||
loop:
 | 
			
		||||
	for k, v := range md {
 | 
			
		||||
		k = http.CanonicalHeaderKey(k)
 | 
			
		||||
 | 
			
		||||
		if _, ok := seen[k]; ok {
 | 
			
		||||
			continue loop
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if slices.ContainsFunc(exclude, func(s string) bool {
 | 
			
		||||
			return strings.EqualFold(s, k)
 | 
			
		||||
		}) {
 | 
			
		||||
			continue loop
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		for i := 0; i < len(r.Headers); i++ {
 | 
			
		||||
			if r.Headers[i].Key == k {
 | 
			
		||||
			if strings.EqualFold(r.Headers[i].Key, k) {
 | 
			
		||||
				// Key exist, update the value.
 | 
			
		||||
				r.Headers[i].Value = []byte(v)
 | 
			
		||||
				continue loop
 | 
			
		||||
			} else if _, ok := seen[k]; ok {
 | 
			
		||||
				continue loop
 | 
			
		||||
			}
 | 
			
		||||
			// Key does not exist, append new header.
 | 
			
		||||
			r.Headers = append(r.Headers, kgo.RecordHeader{
 | 
			
		||||
				Key:   k,
 | 
			
		||||
				Value: []byte(v),
 | 
			
		||||
			})
 | 
			
		||||
			seen[k] = struct{}{}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// Key does not exist, append new header.
 | 
			
		||||
		r.Headers = append(r.Headers, kgo.RecordHeader{
 | 
			
		||||
			Key:   k,
 | 
			
		||||
			Value: []byte(v),
 | 
			
		||||
		})
 | 
			
		||||
 | 
			
		||||
		seen[k] = struct{}{}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										7
									
								
								kgo.go
									
									
									
									
									
								
							
							
						
						
									
										7
									
								
								kgo.go
									
									
									
									
									
								
							@@ -6,7 +6,6 @@ import (
 | 
			
		||||
	"errors"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"math/rand/v2"
 | 
			
		||||
	"net/http"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"sync/atomic"
 | 
			
		||||
@@ -279,14 +278,14 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br
 | 
			
		||||
 | 
			
		||||
	for _, msg := range msgs {
 | 
			
		||||
		rec := &kgo.Record{Context: ctx, Key: key}
 | 
			
		||||
 | 
			
		||||
		rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic)
 | 
			
		||||
		msg.Header.Del(metadata.HeaderTopic)
 | 
			
		||||
 | 
			
		||||
		k.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Inc()
 | 
			
		||||
		if options.BodyOnly || k.opts.Codec.String() == "noop" {
 | 
			
		||||
			rec.Value = msg.Body
 | 
			
		||||
			for k, v := range msg.Header {
 | 
			
		||||
				rec.Headers = append(rec.Headers, kgo.RecordHeader{Key: http.CanonicalHeaderKey(k), Value: []byte(v)})
 | 
			
		||||
			}
 | 
			
		||||
			setHeaders(rec, msg.Header)
 | 
			
		||||
		} else {
 | 
			
		||||
			rec.Value, err = k.opts.Codec.Marshal(msg)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
 
 | 
			
		||||
@@ -68,7 +68,7 @@ func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) {
 | 
			
		||||
		r.Context, _ = m.tracer.Start(r.Context, "sdk.broker", opts...)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	setHeaders(r, omd)
 | 
			
		||||
	setHeaders(r, omd, metadata.HeaderContentType)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// OnProduceRecordUnbuffered continues and ends the "publish" span for an
 | 
			
		||||
@@ -135,7 +135,7 @@ func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) {
 | 
			
		||||
		r.Context, _ = m.tracer.Start(r.Context, "sdk.broker", opts...)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	setHeaders(r, omd)
 | 
			
		||||
	setHeaders(r, omd, metadata.HeaderContentType)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// OnFetchRecordUnbuffered continues and ends the "receive" span for an
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user