propogate context to kgo record

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2022-10-11 11:55:12 +03:00
parent 9a69314c25
commit 6360f5e78f

2
kgo.go
View File

@ -262,7 +262,7 @@ func (k *kBroker) publish(ctx context.Context, msgs []*broker.Message, opts ...b
} }
for _, msg := range msgs { for _, msg := range msgs {
rec := &kgo.Record{Key: key} rec := &kgo.Record{Context: ctx, Key: key}
rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic) rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic)
if k.opts.Codec.String() == "noop" { if k.opts.Codec.String() == "noop" {
rec.Value = msg.Body rec.Value = msg.Body