update for latest micro
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
10
kgo.go
10
kgo.go
@@ -108,7 +108,7 @@ type kgoMessage struct {
|
||||
ctx context.Context
|
||||
body []byte
|
||||
hdr metadata.Metadata
|
||||
opts broker.PublishOptions
|
||||
opts broker.MessageOptions
|
||||
ack bool
|
||||
}
|
||||
|
||||
@@ -150,8 +150,8 @@ func (b *Broker) newCodec(ct string) (codec.Codec, error) {
|
||||
return nil, codec.ErrUnknownContentType
|
||||
}
|
||||
|
||||
func (b *Broker) NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...broker.PublishOption) (broker.Message, error) {
|
||||
options := broker.NewPublishOptions(opts...)
|
||||
func (b *Broker) NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...broker.MessageOption) (broker.Message, error) {
|
||||
options := broker.NewMessageOptions(opts...)
|
||||
if options.ContentType == "" {
|
||||
options.ContentType = b.opts.ContentType
|
||||
}
|
||||
@@ -351,10 +351,10 @@ func (b *Broker) publish(ctx context.Context, topic string, messages ...broker.M
|
||||
|
||||
for _, msg := range messages {
|
||||
if mctx := msg.Context(); mctx != nil {
|
||||
if k, ok := mctx.Value(publishKey{}).([]byte); ok && k != nil {
|
||||
if k, ok := mctx.Value(messageKey{}).([]byte); ok && k != nil {
|
||||
key = k
|
||||
}
|
||||
if p, ok := mctx.Value(publishPromiseKey{}).(func(*kgo.Record, error)); ok && p != nil {
|
||||
if p, ok := mctx.Value(messagePromiseKey{}).(func(*kgo.Record, error)); ok && p != nil {
|
||||
promise = p
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user