Compare commits
	
		
			2 Commits
		
	
	
		
			v4.1.11
			...
			0d2d0fe774
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 0d2d0fe774 | |||
| c71478ee9a | 
							
								
								
									
										27
									
								
								carrier.go
									
									
									
									
									
								
							
							
						
						
									
										27
									
								
								carrier.go
									
									
									
									
									
								
							| @@ -1,6 +1,10 @@ | ||||
| package kgo | ||||
|  | ||||
| import ( | ||||
| 	"net/http" | ||||
| 	"slices" | ||||
| 	"strings" | ||||
|  | ||||
| 	"github.com/twmb/franz-go/pkg/kgo" | ||||
| 	"go.unistack.org/micro/v3/metadata" | ||||
| ) | ||||
| @@ -53,24 +57,37 @@ func (c RecordCarrier) Keys() []string { | ||||
| 	return out | ||||
| } | ||||
|  | ||||
| func setHeaders(r *kgo.Record, md metadata.Metadata) { | ||||
| func setHeaders(r *kgo.Record, md metadata.Metadata, exclude ...string) { | ||||
| 	seen := make(map[string]struct{}) | ||||
|  | ||||
| loop: | ||||
| 	for k, v := range md { | ||||
| 		k = http.CanonicalHeaderKey(k) | ||||
|  | ||||
| 		if _, ok := seen[k]; ok { | ||||
| 			continue loop | ||||
| 		} | ||||
|  | ||||
| 		if slices.ContainsFunc(exclude, func(s string) bool { | ||||
| 			return strings.EqualFold(s, k) | ||||
| 		}) { | ||||
| 			continue loop | ||||
| 		} | ||||
|  | ||||
| 		for i := 0; i < len(r.Headers); i++ { | ||||
| 			if r.Headers[i].Key == k { | ||||
| 			if strings.EqualFold(r.Headers[i].Key, k) { | ||||
| 				// Key exist, update the value. | ||||
| 				r.Headers[i].Value = []byte(v) | ||||
| 				continue loop | ||||
| 			} else if _, ok := seen[k]; ok { | ||||
| 				continue loop | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		// Key does not exist, append new header. | ||||
| 		r.Headers = append(r.Headers, kgo.RecordHeader{ | ||||
| 			Key:   k, | ||||
| 			Value: []byte(v), | ||||
| 		}) | ||||
|  | ||||
| 		seen[k] = struct{}{} | ||||
| 	} | ||||
| 	} | ||||
| } | ||||
|   | ||||
							
								
								
									
										7
									
								
								kgo.go
									
									
									
									
									
								
							
							
						
						
									
										7
									
								
								kgo.go
									
									
									
									
									
								
							| @@ -6,7 +6,6 @@ import ( | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"math/rand/v2" | ||||
| 	"net/http" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"sync/atomic" | ||||
| @@ -279,14 +278,14 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br | ||||
|  | ||||
| 	for _, msg := range msgs { | ||||
| 		rec := &kgo.Record{Context: ctx, Key: key} | ||||
|  | ||||
| 		rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic) | ||||
| 		msg.Header.Del(metadata.HeaderTopic) | ||||
|  | ||||
| 		k.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Inc() | ||||
| 		if options.BodyOnly || k.opts.Codec.String() == "noop" { | ||||
| 			rec.Value = msg.Body | ||||
| 			for k, v := range msg.Header { | ||||
| 				rec.Headers = append(rec.Headers, kgo.RecordHeader{Key: http.CanonicalHeaderKey(k), Value: []byte(v)}) | ||||
| 			} | ||||
| 			setHeaders(rec, msg.Header) | ||||
| 		} else { | ||||
| 			rec.Value, err = k.opts.Codec.Marshal(msg) | ||||
| 			if err != nil { | ||||
|   | ||||
| @@ -68,7 +68,7 @@ func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) { | ||||
| 		r.Context, _ = m.tracer.Start(r.Context, "sdk.broker", opts...) | ||||
| 	} | ||||
|  | ||||
| 	setHeaders(r, omd) | ||||
| 	setHeaders(r, omd, metadata.HeaderContentType) | ||||
| } | ||||
|  | ||||
| // OnProduceRecordUnbuffered continues and ends the "publish" span for an | ||||
| @@ -135,7 +135,7 @@ func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) { | ||||
| 		r.Context, _ = m.tracer.Start(r.Context, "sdk.broker", opts...) | ||||
| 	} | ||||
|  | ||||
| 	setHeaders(r, omd) | ||||
| 	setHeaders(r, omd, metadata.HeaderContentType) | ||||
| } | ||||
|  | ||||
| // OnFetchRecordUnbuffered continues and ends the "receive" span for an | ||||
|   | ||||
		Reference in New Issue
	
	Block a user