diff --git a/kgo.go b/kgo.go index 3c2ba7b..50a6035 100644 --- a/kgo.go +++ b/kgo.go @@ -19,6 +19,8 @@ import ( mrand "github.com/unistack-org/micro/v3/util/rand" ) +var _ broker.Broker = &kBroker{} + type kBroker struct { writer *kgo.Client // used only to push messages kopts []kgo.Opt @@ -245,19 +247,23 @@ func (k *kBroker) publish(ctx context.Context, msgs []*broker.Message, opts ...b records := make([]*kgo.Record, 0, len(msgs)) var errs []string var err error - var buf []byte for _, msg := range msgs { + rec := &kgo.Record{} + rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic) if options.BodyOnly { - buf = msg.Body + rec.Value = msg.Body + } else if k.opts.Codec.String() == "noop" { + rec.Value = msg.Body + for k, v := range msg.Header { + rec.Headers = append(rec.Headers, kgo.RecordHeader{Key: k, Value: []byte(v)}) + } } else { - buf, err = k.opts.Codec.Marshal(msg) + rec.Value, err = k.opts.Codec.Marshal(msg) if err != nil { return err } } - topic, _ := msg.Header.Get(metadata.HeaderTopic) - rec := &kgo.Record{Value: buf, Topic: topic} records = append(records, rec) } @@ -390,8 +396,11 @@ func (k *kBroker) String() string { return "kgo" } -func NewBroker(opts ...broker.Option) broker.Broker { +func NewBroker(opts ...broker.Option) *kBroker { options := broker.NewOptions(opts...) + if options.Codec.String() != "noop" { + options.Logger.Infof(options.Context, "broker codec not noop, disable plain kafka headers usage") + } kopts := []kgo.Opt{ kgo.DisableIdempotentWrite(), kgo.ProducerBatchCompression(kgo.NoCompression()), diff --git a/util.go b/util.go index 4551a7a..f7b549a 100644 --- a/util.go +++ b/util.go @@ -8,6 +8,7 @@ import ( kgo "github.com/twmb/franz-go/pkg/kgo" "github.com/unistack-org/micro/v3/broker" "github.com/unistack-org/micro/v3/logger" + "github.com/unistack-org/micro/v3/metadata" ) var ErrLostMessage = errors.New("message not marked for offsets commit and will be lost in next iteration") @@ -157,6 +158,12 @@ func (w *worker) handle() { p.ack = false if w.opts.BodyOnly { p.msg.Body = record.Value + } else if w.kopts.Codec.String() == "noop" { + p.msg.Body = record.Value + p.msg.Header = metadata.New(len(record.Headers)) + for _, h := range record.Headers { + p.msg.Header.Set(h.Key, string(h.Value)) + } } else { if err := w.kopts.Codec.Unmarshal(record.Value, p.msg); err != nil { p.err = err