support key publish option

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2021-10-17 18:29:05 +03:00
parent 69fb765b09
commit 2c74b3232b

10
kgo.go
View File

@ -247,9 +247,16 @@ func (k *kBroker) publish(ctx context.Context, msgs []*broker.Message, opts ...b
records := make([]*kgo.Record, 0, len(msgs)) records := make([]*kgo.Record, 0, len(msgs))
var errs []string var errs []string
var err error var err error
var key []byte
if options.Context != nil {
if k, ok := options.Context.Value(publishKey{}).([]byte); ok && k != nil {
key = k
}
}
for _, msg := range msgs { for _, msg := range msgs {
rec := &kgo.Record{} rec := &kgo.Record{Key: key}
rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic) rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic)
if options.BodyOnly { if options.BodyOnly {
rec.Value = msg.Body rec.Value = msg.Body
@ -273,6 +280,7 @@ func (k *kBroker) publish(ctx context.Context, msgs []*broker.Message, opts ...b
errs = append(errs, result.Err.Error()) errs = append(errs, result.Err.Error())
} }
} }
if len(errs) > 0 { if len(errs) > 0 {
return fmt.Errorf("publish error: %s", strings.Join(errs, "\n")) return fmt.Errorf("publish error: %s", strings.Join(errs, "\n"))
} }