propogate context to kgo record #79

Merged
vtolstov merged 1 commits from kgo-context into v3 2022-10-11 11:57:31 +03:00

2
kgo.go
View File

@ -262,7 +262,7 @@ func (k *kBroker) publish(ctx context.Context, msgs []*broker.Message, opts ...b
}
for _, msg := range msgs {
rec := &kgo.Record{Key: key}
rec := &kgo.Record{Context: ctx, Key: key}
rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic)
if k.opts.Codec.String() == "noop" {
rec.Value = msg.Body