Compare commits
	
		
			2 Commits
		
	
	
		
			v4.1.11
			...
			0d2d0fe774
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 0d2d0fe774 | |||
| c71478ee9a | 
							
								
								
									
										37
									
								
								carrier.go
									
									
									
									
									
								
							
							
						
						
									
										37
									
								
								carrier.go
									
									
									
									
									
								
							| @@ -1,6 +1,10 @@ | |||||||
| package kgo | package kgo | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
|  | 	"net/http" | ||||||
|  | 	"slices" | ||||||
|  | 	"strings" | ||||||
|  |  | ||||||
| 	"github.com/twmb/franz-go/pkg/kgo" | 	"github.com/twmb/franz-go/pkg/kgo" | ||||||
| 	"go.unistack.org/micro/v3/metadata" | 	"go.unistack.org/micro/v3/metadata" | ||||||
| ) | ) | ||||||
| @@ -53,24 +57,37 @@ func (c RecordCarrier) Keys() []string { | |||||||
| 	return out | 	return out | ||||||
| } | } | ||||||
|  |  | ||||||
| func setHeaders(r *kgo.Record, md metadata.Metadata) { | func setHeaders(r *kgo.Record, md metadata.Metadata, exclude ...string) { | ||||||
| 	seen := make(map[string]struct{}) | 	seen := make(map[string]struct{}) | ||||||
|  |  | ||||||
| loop: | loop: | ||||||
| 	for k, v := range md { | 	for k, v := range md { | ||||||
|  | 		k = http.CanonicalHeaderKey(k) | ||||||
|  |  | ||||||
|  | 		if _, ok := seen[k]; ok { | ||||||
|  | 			continue loop | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		if slices.ContainsFunc(exclude, func(s string) bool { | ||||||
|  | 			return strings.EqualFold(s, k) | ||||||
|  | 		}) { | ||||||
|  | 			continue loop | ||||||
|  | 		} | ||||||
|  |  | ||||||
| 		for i := 0; i < len(r.Headers); i++ { | 		for i := 0; i < len(r.Headers); i++ { | ||||||
| 			if r.Headers[i].Key == k { | 			if strings.EqualFold(r.Headers[i].Key, k) { | ||||||
| 				// Key exist, update the value. | 				// Key exist, update the value. | ||||||
| 				r.Headers[i].Value = []byte(v) | 				r.Headers[i].Value = []byte(v) | ||||||
| 				continue loop | 				continue loop | ||||||
| 			} else if _, ok := seen[k]; ok { |  | ||||||
| 				continue loop |  | ||||||
| 			} | 			} | ||||||
| 			// Key does not exist, append new header. |  | ||||||
| 			r.Headers = append(r.Headers, kgo.RecordHeader{ |  | ||||||
| 				Key:   k, |  | ||||||
| 				Value: []byte(v), |  | ||||||
| 			}) |  | ||||||
| 			seen[k] = struct{}{} |  | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
|  | 		// Key does not exist, append new header. | ||||||
|  | 		r.Headers = append(r.Headers, kgo.RecordHeader{ | ||||||
|  | 			Key:   k, | ||||||
|  | 			Value: []byte(v), | ||||||
|  | 		}) | ||||||
|  |  | ||||||
|  | 		seen[k] = struct{}{} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|   | |||||||
							
								
								
									
										7
									
								
								kgo.go
									
									
									
									
									
								
							
							
						
						
									
										7
									
								
								kgo.go
									
									
									
									
									
								
							| @@ -6,7 +6,6 @@ import ( | |||||||
| 	"errors" | 	"errors" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"math/rand/v2" | 	"math/rand/v2" | ||||||
| 	"net/http" |  | ||||||
| 	"strings" | 	"strings" | ||||||
| 	"sync" | 	"sync" | ||||||
| 	"sync/atomic" | 	"sync/atomic" | ||||||
| @@ -279,14 +278,14 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br | |||||||
|  |  | ||||||
| 	for _, msg := range msgs { | 	for _, msg := range msgs { | ||||||
| 		rec := &kgo.Record{Context: ctx, Key: key} | 		rec := &kgo.Record{Context: ctx, Key: key} | ||||||
|  |  | ||||||
| 		rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic) | 		rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic) | ||||||
| 		msg.Header.Del(metadata.HeaderTopic) | 		msg.Header.Del(metadata.HeaderTopic) | ||||||
|  |  | ||||||
| 		k.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Inc() | 		k.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Inc() | ||||||
| 		if options.BodyOnly || k.opts.Codec.String() == "noop" { | 		if options.BodyOnly || k.opts.Codec.String() == "noop" { | ||||||
| 			rec.Value = msg.Body | 			rec.Value = msg.Body | ||||||
| 			for k, v := range msg.Header { | 			setHeaders(rec, msg.Header) | ||||||
| 				rec.Headers = append(rec.Headers, kgo.RecordHeader{Key: http.CanonicalHeaderKey(k), Value: []byte(v)}) |  | ||||||
| 			} |  | ||||||
| 		} else { | 		} else { | ||||||
| 			rec.Value, err = k.opts.Codec.Marshal(msg) | 			rec.Value, err = k.opts.Codec.Marshal(msg) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
|   | |||||||
| @@ -68,7 +68,7 @@ func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) { | |||||||
| 		r.Context, _ = m.tracer.Start(r.Context, "sdk.broker", opts...) | 		r.Context, _ = m.tracer.Start(r.Context, "sdk.broker", opts...) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	setHeaders(r, omd) | 	setHeaders(r, omd, metadata.HeaderContentType) | ||||||
| } | } | ||||||
|  |  | ||||||
| // OnProduceRecordUnbuffered continues and ends the "publish" span for an | // OnProduceRecordUnbuffered continues and ends the "publish" span for an | ||||||
| @@ -135,7 +135,7 @@ func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) { | |||||||
| 		r.Context, _ = m.tracer.Start(r.Context, "sdk.broker", opts...) | 		r.Context, _ = m.tracer.Start(r.Context, "sdk.broker", opts...) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	setHeaders(r, omd) | 	setHeaders(r, omd, metadata.HeaderContentType) | ||||||
| } | } | ||||||
|  |  | ||||||
| // OnFetchRecordUnbuffered continues and ends the "receive" span for an | // OnFetchRecordUnbuffered continues and ends the "receive" span for an | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user