broker/segmentio: fix kafka topic in publication

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2020-05-08 17:57:16 +03:00
parent dd94415716
commit 62d02b4297

View File

@ -324,7 +324,7 @@ func (h *cgHandler) run(ctx context.Context) {
var m broker.Message
eh := h.brokerOpts.ErrorHandler
offsets[msg.Topic][msg.Partition] = msg.Offset
p := &publication{generation: h.generation, m: &m, offsets: offsets}
p := &publication{topic: msg.Topic, generation: h.generation, m: &m, offsets: offsets}
if err := h.brokerOpts.Codec.Unmarshal(msg.Value, &m); err != nil {
p.err = err