diff --git a/carrier.go b/carrier.go index b0a540d..5d4d672 100644 --- a/carrier.go +++ b/carrier.go @@ -55,22 +55,27 @@ func (c RecordCarrier) Keys() []string { func setHeaders(r *kgo.Record, md metadata.Metadata) { seen := make(map[string]struct{}) + loop: for k, v := range md { + if _, ok := seen[k]; ok { + continue loop + } + for i := 0; i < len(r.Headers); i++ { if r.Headers[i].Key == k { // Key exist, update the value. r.Headers[i].Value = []byte(v) continue loop - } else if _, ok := seen[k]; ok { - continue loop } - // Key does not exist, append new header. - r.Headers = append(r.Headers, kgo.RecordHeader{ - Key: k, - Value: []byte(v), - }) - seen[k] = struct{}{} } + + // Key does not exist, append new header. + r.Headers = append(r.Headers, kgo.RecordHeader{ + Key: k, + Value: []byte(v), + }) + + seen[k] = struct{}{} } } diff --git a/kgo.go b/kgo.go index 6a96ea7..d8aa989 100644 --- a/kgo.go +++ b/kgo.go @@ -279,8 +279,13 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br for _, msg := range msgs { rec := &kgo.Record{Context: ctx, Key: key} + rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic) msg.Header.Del(metadata.HeaderTopic) + + ct, _ := msg.Header.Get(metadata.HeaderContentType) + rec.Headers = append(rec.Headers, kgo.RecordHeader{Key: metadata.HeaderContentType, Value: []byte(ct)}) + k.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Inc() if options.BodyOnly || k.opts.Codec.String() == "noop" { rec.Value = msg.Body