From 2c74b3232bd0f1646309394d20e9d29c14c42bc4 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sun, 17 Oct 2021 18:29:05 +0300 Subject: [PATCH] support key publish option Signed-off-by: Vasiliy Tolstov --- kgo.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/kgo.go b/kgo.go index cef7877..a4795dd 100644 --- a/kgo.go +++ b/kgo.go @@ -247,9 +247,16 @@ func (k *kBroker) publish(ctx context.Context, msgs []*broker.Message, opts ...b records := make([]*kgo.Record, 0, len(msgs)) var errs []string var err error + var key []byte + + if options.Context != nil { + if k, ok := options.Context.Value(publishKey{}).([]byte); ok && k != nil { + key = k + } + } for _, msg := range msgs { - rec := &kgo.Record{} + rec := &kgo.Record{Key: key} rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic) if options.BodyOnly { rec.Value = msg.Body @@ -273,6 +280,7 @@ func (k *kBroker) publish(ctx context.Context, msgs []*broker.Message, opts ...b errs = append(errs, result.Err.Error()) } } + if len(errs) > 0 { return fmt.Errorf("publish error: %s", strings.Join(errs, "\n")) }