fix double ack

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2020-12-11 12:43:02 +03:00
parent 553fc8b998
commit 799ae3283f

View File

@ -397,7 +397,7 @@ func (h *cgHandler) run(ctx context.Context) {
if h.subOpts.ErrorHandler != nil { if h.subOpts.ErrorHandler != nil {
eh = h.subOpts.ErrorHandler eh = h.subOpts.ErrorHandler
} }
offsets[msg.Topic][msg.Partition] = msg.Offset offsets[msg.Topic][msg.Partition] = msg.Offset + 1
p := &publication{topic: msg.Topic, generation: h.generation, m: &m, offsets: offsets} p := &publication{topic: msg.Topic, generation: h.generation, m: &m, offsets: offsets}
if err := h.brokerOpts.Codec.Unmarshal(msg.Value, &m); err != nil { if err := h.brokerOpts.Codec.Unmarshal(msg.Value, &m); err != nil {