propogate context to kgo record #79

Merged
vtolstov merged 1 commits from kgo-context into v3 2022-10-11 11:57:31 +03:00

2
kgo.go
View File

@ -262,7 +262,7 @@ func (k *kBroker) publish(ctx context.Context, msgs []*broker.Message, opts ...b
} }
for _, msg := range msgs { for _, msg := range msgs {
rec := &kgo.Record{Key: key} rec := &kgo.Record{Context: ctx, Key: key}
rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic) rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic)
if k.opts.Codec.String() == "noop" { if k.opts.Codec.String() == "noop" {
rec.Value = msg.Body rec.Value = msg.Body