diff --git a/kgo.go b/kgo.go index cef7877..a4795dd 100644 --- a/kgo.go +++ b/kgo.go @@ -247,9 +247,16 @@ func (k *kBroker) publish(ctx context.Context, msgs []*broker.Message, opts ...b records := make([]*kgo.Record, 0, len(msgs)) var errs []string var err error + var key []byte + + if options.Context != nil { + if k, ok := options.Context.Value(publishKey{}).([]byte); ok && k != nil { + key = k + } + } for _, msg := range msgs { - rec := &kgo.Record{} + rec := &kgo.Record{Key: key} rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic) if options.BodyOnly { rec.Value = msg.Body @@ -273,6 +280,7 @@ func (k *kBroker) publish(ctx context.Context, msgs []*broker.Message, opts ...b errs = append(errs, result.Err.Error()) } } + if len(errs) > 0 { return fmt.Errorf("publish error: %s", strings.Join(errs, "\n")) }