diff --git a/kgo.go b/kgo.go index 2192206..bed4ca4 100644 --- a/kgo.go +++ b/kgo.go @@ -156,7 +156,7 @@ func (b *Broker) NewMessage(ctx context.Context, hdr metadata.Metadata, body int options.ContentType = b.opts.ContentType } - m := &kgoMessage{ctx: ctx, hdr: hdr, opts: options} + m := &kgoMessage{ctx: ctx, hdr: hdr.Copy(), opts: options} c, err := b.newCodec(m.opts.ContentType) if err == nil { m.body, err = c.Marshal(body) @@ -165,6 +165,8 @@ func (b *Broker) NewMessage(ctx context.Context, hdr metadata.Metadata, body int return nil, err } + m.hdr.Set(metadata.HeaderContentType, m.opts.ContentType) + return m, nil } @@ -364,6 +366,19 @@ func (b *Broker) publish(ctx context.Context, topic string, messages ...broker.M } } + kmsg, ok := msg.(*kgoMessage) + if !ok { + continue + } + if kmsg.opts.Context != nil { + if k, ok := kmsg.opts.Context.Value(messageKey{}).([]byte); ok && k != nil { + rec.Key = k + } + if p, ok := kmsg.opts.Context.Value(messagePromiseKey{}).(func(*kgo.Record, error)); ok && p != nil { + promise = p + } + } + setHeaders(rec, msg.Header()) if promise != nil {