skip marshal/unmarshal on noop codec

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2021-09-23 07:06:51 +03:00
parent 549a7b5a6d
commit 4b676bea64
2 changed files with 22 additions and 6 deletions

21
kgo.go
View File

@ -19,6 +19,8 @@ import (
mrand "github.com/unistack-org/micro/v3/util/rand" mrand "github.com/unistack-org/micro/v3/util/rand"
) )
var _ broker.Broker = &kBroker{}
type kBroker struct { type kBroker struct {
writer *kgo.Client // used only to push messages writer *kgo.Client // used only to push messages
kopts []kgo.Opt kopts []kgo.Opt
@ -245,19 +247,23 @@ func (k *kBroker) publish(ctx context.Context, msgs []*broker.Message, opts ...b
records := make([]*kgo.Record, 0, len(msgs)) records := make([]*kgo.Record, 0, len(msgs))
var errs []string var errs []string
var err error var err error
var buf []byte
for _, msg := range msgs { for _, msg := range msgs {
rec := &kgo.Record{}
rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic)
if options.BodyOnly { if options.BodyOnly {
buf = msg.Body rec.Value = msg.Body
} else if k.opts.Codec.String() == "noop" {
rec.Value = msg.Body
for k, v := range msg.Header {
rec.Headers = append(rec.Headers, kgo.RecordHeader{Key: k, Value: []byte(v)})
}
} else { } else {
buf, err = k.opts.Codec.Marshal(msg) rec.Value, err = k.opts.Codec.Marshal(msg)
if err != nil { if err != nil {
return err return err
} }
} }
topic, _ := msg.Header.Get(metadata.HeaderTopic)
rec := &kgo.Record{Value: buf, Topic: topic}
records = append(records, rec) records = append(records, rec)
} }
@ -390,8 +396,11 @@ func (k *kBroker) String() string {
return "kgo" return "kgo"
} }
func NewBroker(opts ...broker.Option) broker.Broker { func NewBroker(opts ...broker.Option) *kBroker {
options := broker.NewOptions(opts...) options := broker.NewOptions(opts...)
if options.Codec.String() != "noop" {
options.Logger.Infof(options.Context, "broker codec not noop, disable plain kafka headers usage")
}
kopts := []kgo.Opt{ kopts := []kgo.Opt{
kgo.DisableIdempotentWrite(), kgo.DisableIdempotentWrite(),
kgo.ProducerBatchCompression(kgo.NoCompression()), kgo.ProducerBatchCompression(kgo.NoCompression()),

View File

@ -8,6 +8,7 @@ import (
kgo "github.com/twmb/franz-go/pkg/kgo" kgo "github.com/twmb/franz-go/pkg/kgo"
"github.com/unistack-org/micro/v3/broker" "github.com/unistack-org/micro/v3/broker"
"github.com/unistack-org/micro/v3/logger" "github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/metadata"
) )
var ErrLostMessage = errors.New("message not marked for offsets commit and will be lost in next iteration") var ErrLostMessage = errors.New("message not marked for offsets commit and will be lost in next iteration")
@ -157,6 +158,12 @@ func (w *worker) handle() {
p.ack = false p.ack = false
if w.opts.BodyOnly { if w.opts.BodyOnly {
p.msg.Body = record.Value p.msg.Body = record.Value
} else if w.kopts.Codec.String() == "noop" {
p.msg.Body = record.Value
p.msg.Header = metadata.New(len(record.Headers))
for _, h := range record.Headers {
p.msg.Header.Set(h.Key, string(h.Value))
}
} else { } else {
if err := w.kopts.Codec.Unmarshal(record.Value, p.msg); err != nil { if err := w.kopts.Codec.Unmarshal(record.Value, p.msg); err != nil {
p.err = err p.err = err