fixup message options and content-type header
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
17
kgo.go
17
kgo.go
@@ -156,7 +156,7 @@ func (b *Broker) NewMessage(ctx context.Context, hdr metadata.Metadata, body int
|
||||
options.ContentType = b.opts.ContentType
|
||||
}
|
||||
|
||||
m := &kgoMessage{ctx: ctx, hdr: hdr, opts: options}
|
||||
m := &kgoMessage{ctx: ctx, hdr: hdr.Copy(), opts: options}
|
||||
c, err := b.newCodec(m.opts.ContentType)
|
||||
if err == nil {
|
||||
m.body, err = c.Marshal(body)
|
||||
@@ -165,6 +165,8 @@ func (b *Broker) NewMessage(ctx context.Context, hdr metadata.Metadata, body int
|
||||
return nil, err
|
||||
}
|
||||
|
||||
m.hdr.Set(metadata.HeaderContentType, m.opts.ContentType)
|
||||
|
||||
return m, nil
|
||||
}
|
||||
|
||||
@@ -364,6 +366,19 @@ func (b *Broker) publish(ctx context.Context, topic string, messages ...broker.M
|
||||
}
|
||||
}
|
||||
|
||||
kmsg, ok := msg.(*kgoMessage)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if kmsg.opts.Context != nil {
|
||||
if k, ok := kmsg.opts.Context.Value(messageKey{}).([]byte); ok && k != nil {
|
||||
rec.Key = k
|
||||
}
|
||||
if p, ok := kmsg.opts.Context.Value(messagePromiseKey{}).(func(*kgo.Record, error)); ok && p != nil {
|
||||
promise = p
|
||||
}
|
||||
}
|
||||
|
||||
setHeaders(rec, msg.Header())
|
||||
|
||||
if promise != nil {
|
||||
|
Reference in New Issue
Block a user