Merge pull request #75 from unistack-org/codec_body_only
fix noop codec usage and body only option
This commit is contained in:
commit
6ccc996fe6
6
kgo.go
6
kgo.go
@ -264,13 +264,13 @@ func (k *kBroker) publish(ctx context.Context, msgs []*broker.Message, opts ...b
|
|||||||
for _, msg := range msgs {
|
for _, msg := range msgs {
|
||||||
rec := &kgo.Record{Key: key}
|
rec := &kgo.Record{Key: key}
|
||||||
rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic)
|
rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic)
|
||||||
if options.BodyOnly {
|
if k.opts.Codec.String() == "noop" {
|
||||||
rec.Value = msg.Body
|
|
||||||
} else if k.opts.Codec.String() == "noop" {
|
|
||||||
rec.Value = msg.Body
|
rec.Value = msg.Body
|
||||||
for k, v := range msg.Header {
|
for k, v := range msg.Header {
|
||||||
rec.Headers = append(rec.Headers, kgo.RecordHeader{Key: k, Value: []byte(v)})
|
rec.Headers = append(rec.Headers, kgo.RecordHeader{Key: k, Value: []byte(v)})
|
||||||
}
|
}
|
||||||
|
} else if options.BodyOnly {
|
||||||
|
rec.Value = msg.Body
|
||||||
} else {
|
} else {
|
||||||
rec.Value, err = k.opts.Codec.Marshal(msg)
|
rec.Value, err = k.opts.Codec.Marshal(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user