diff --git a/segmentio.go b/segmentio.go index 261fac6..b5aabfd 100644 --- a/segmentio.go +++ b/segmentio.go @@ -397,7 +397,7 @@ func (h *cgHandler) run(ctx context.Context) { if h.subOpts.ErrorHandler != nil { eh = h.subOpts.ErrorHandler } - offsets[msg.Topic][msg.Partition] = msg.Offset + offsets[msg.Topic][msg.Partition] = msg.Offset + 1 p := &publication{topic: msg.Topic, generation: h.generation, m: &m, offsets: offsets} if err := h.brokerOpts.Codec.Unmarshal(msg.Value, &m); err != nil {