Compare commits
2 Commits
Author | SHA1 | Date | |
---|---|---|---|
4b0b70e7d0 | |||
|
da88718e03 |
@@ -1,2 +1,2 @@
|
|||||||
# micro-broker-kgo
|
# micro-broker-kgo
|
||||||

|

|
||||||
|
17
kgo.go
17
kgo.go
@@ -156,7 +156,7 @@ func (b *Broker) NewMessage(ctx context.Context, hdr metadata.Metadata, body int
|
|||||||
options.ContentType = b.opts.ContentType
|
options.ContentType = b.opts.ContentType
|
||||||
}
|
}
|
||||||
|
|
||||||
m := &kgoMessage{ctx: ctx, hdr: hdr, opts: options}
|
m := &kgoMessage{ctx: ctx, hdr: hdr.Copy(), opts: options}
|
||||||
c, err := b.newCodec(m.opts.ContentType)
|
c, err := b.newCodec(m.opts.ContentType)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
m.body, err = c.Marshal(body)
|
m.body, err = c.Marshal(body)
|
||||||
@@ -165,6 +165,8 @@ func (b *Broker) NewMessage(ctx context.Context, hdr metadata.Metadata, body int
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
m.hdr.Set(metadata.HeaderContentType, m.opts.ContentType)
|
||||||
|
|
||||||
return m, nil
|
return m, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -364,6 +366,19 @@ func (b *Broker) publish(ctx context.Context, topic string, messages ...broker.M
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
kmsg, ok := msg.(*kgoMessage)
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if kmsg.opts.Context != nil {
|
||||||
|
if k, ok := kmsg.opts.Context.Value(messageKey{}).([]byte); ok && k != nil {
|
||||||
|
rec.Key = k
|
||||||
|
}
|
||||||
|
if p, ok := kmsg.opts.Context.Value(messagePromiseKey{}).(func(*kgo.Record, error)); ok && p != nil {
|
||||||
|
promise = p
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
setHeaders(rec, msg.Header())
|
setHeaders(rec, msg.Header())
|
||||||
|
|
||||||
if promise != nil {
|
if promise != nil {
|
||||||
|
Reference in New Issue
Block a user