Compare commits
	
		
			2 Commits
		
	
	
		
			v3.9.68
			...
			7fa1fbeb44
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 7fa1fbeb44 | |||
| c71478ee9a | 
							
								
								
									
										36
									
								
								carrier.go
									
									
									
									
									
								
							
							
						
						
									
										36
									
								
								carrier.go
									
									
									
									
									
								
							@@ -3,6 +3,9 @@ package kgo
 | 
				
			|||||||
import (
 | 
					import (
 | 
				
			||||||
	"github.com/twmb/franz-go/pkg/kgo"
 | 
						"github.com/twmb/franz-go/pkg/kgo"
 | 
				
			||||||
	"go.unistack.org/micro/v3/metadata"
 | 
						"go.unistack.org/micro/v3/metadata"
 | 
				
			||||||
 | 
						"net/http"
 | 
				
			||||||
 | 
						"slices"
 | 
				
			||||||
 | 
						"strings"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// RecordCarrier injects and extracts traces from a kgo.Record.
 | 
					// RecordCarrier injects and extracts traces from a kgo.Record.
 | 
				
			||||||
@@ -53,24 +56,37 @@ func (c RecordCarrier) Keys() []string {
 | 
				
			|||||||
	return out
 | 
						return out
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func setHeaders(r *kgo.Record, md metadata.Metadata) {
 | 
					func setHeaders(r *kgo.Record, md metadata.Metadata, exclude ...string) {
 | 
				
			||||||
	seen := make(map[string]struct{})
 | 
						seen := make(map[string]struct{})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
loop:
 | 
					loop:
 | 
				
			||||||
	for k, v := range md {
 | 
						for k, v := range md {
 | 
				
			||||||
 | 
							k = http.CanonicalHeaderKey(k)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							if _, ok := seen[k]; ok {
 | 
				
			||||||
 | 
								continue loop
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							if slices.ContainsFunc(exclude, func(s string) bool {
 | 
				
			||||||
 | 
								return strings.EqualFold(s, k)
 | 
				
			||||||
 | 
							}) {
 | 
				
			||||||
 | 
								continue loop
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		for i := 0; i < len(r.Headers); i++ {
 | 
							for i := 0; i < len(r.Headers); i++ {
 | 
				
			||||||
			if r.Headers[i].Key == k {
 | 
								if strings.EqualFold(r.Headers[i].Key, k) {
 | 
				
			||||||
				// Key exist, update the value.
 | 
									// Key exist, update the value.
 | 
				
			||||||
				r.Headers[i].Value = []byte(v)
 | 
									r.Headers[i].Value = []byte(v)
 | 
				
			||||||
				continue loop
 | 
									continue loop
 | 
				
			||||||
			} else if _, ok := seen[k]; ok {
 | 
					 | 
				
			||||||
				continue loop
 | 
					 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			// Key does not exist, append new header.
 | 
					 | 
				
			||||||
			r.Headers = append(r.Headers, kgo.RecordHeader{
 | 
					 | 
				
			||||||
				Key:   k,
 | 
					 | 
				
			||||||
				Value: []byte(v),
 | 
					 | 
				
			||||||
			})
 | 
					 | 
				
			||||||
			seen[k] = struct{}{}
 | 
					 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							// Key does not exist, append new header.
 | 
				
			||||||
 | 
							r.Headers = append(r.Headers, kgo.RecordHeader{
 | 
				
			||||||
 | 
								Key:   k,
 | 
				
			||||||
 | 
								Value: []byte(v),
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							seen[k] = struct{}{}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										7
									
								
								kgo.go
									
									
									
									
									
								
							
							
						
						
									
										7
									
								
								kgo.go
									
									
									
									
									
								
							@@ -6,7 +6,6 @@ import (
 | 
				
			|||||||
	"errors"
 | 
						"errors"
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
	"math/rand/v2"
 | 
						"math/rand/v2"
 | 
				
			||||||
	"net/http"
 | 
					 | 
				
			||||||
	"strings"
 | 
						"strings"
 | 
				
			||||||
	"sync"
 | 
						"sync"
 | 
				
			||||||
	"sync/atomic"
 | 
						"sync/atomic"
 | 
				
			||||||
@@ -279,14 +278,14 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	for _, msg := range msgs {
 | 
						for _, msg := range msgs {
 | 
				
			||||||
		rec := &kgo.Record{Context: ctx, Key: key}
 | 
							rec := &kgo.Record{Context: ctx, Key: key}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic)
 | 
							rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic)
 | 
				
			||||||
		msg.Header.Del(metadata.HeaderTopic)
 | 
							msg.Header.Del(metadata.HeaderTopic)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		k.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Inc()
 | 
							k.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Inc()
 | 
				
			||||||
		if options.BodyOnly || k.opts.Codec.String() == "noop" {
 | 
							if options.BodyOnly || k.opts.Codec.String() == "noop" {
 | 
				
			||||||
			rec.Value = msg.Body
 | 
								rec.Value = msg.Body
 | 
				
			||||||
			for k, v := range msg.Header {
 | 
								setHeaders(rec, msg.Header)
 | 
				
			||||||
				rec.Headers = append(rec.Headers, kgo.RecordHeader{Key: http.CanonicalHeaderKey(k), Value: []byte(v)})
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		} else {
 | 
							} else {
 | 
				
			||||||
			rec.Value, err = k.opts.Codec.Marshal(msg)
 | 
								rec.Value, err = k.opts.Codec.Marshal(msg)
 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -68,7 +68,7 @@ func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) {
 | 
				
			|||||||
		r.Context, _ = m.tracer.Start(r.Context, "sdk.broker", opts...)
 | 
							r.Context, _ = m.tracer.Start(r.Context, "sdk.broker", opts...)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	setHeaders(r, omd)
 | 
						setHeaders(r, omd, metadata.HeaderContentType)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// OnProduceRecordUnbuffered continues and ends the "publish" span for an
 | 
					// OnProduceRecordUnbuffered continues and ends the "publish" span for an
 | 
				
			||||||
@@ -135,7 +135,7 @@ func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) {
 | 
				
			|||||||
		r.Context, _ = m.tracer.Start(r.Context, "sdk.broker", opts...)
 | 
							r.Context, _ = m.tracer.Start(r.Context, "sdk.broker", opts...)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	setHeaders(r, omd)
 | 
						setHeaders(r, omd, metadata.HeaderContentType)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// OnFetchRecordUnbuffered continues and ends the "receive" span for an
 | 
					// OnFetchRecordUnbuffered continues and ends the "receive" span for an
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user