broker/segmentio: fix offset handle

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2020-04-22 13:06:07 +03:00
parent 03a583d9d4
commit 5cad4c7723

View File

@ -271,7 +271,7 @@ func (k *kBroker) Subscribe(topic string, handler broker.Handler, opts ...broker
cfg.Topic = t
cfg.Partition = assignment.ID
cfg.GroupID = ""
// cfg.StartOffset = assignment.Offset
cfg.StartOffset = assignment.Offset
cgh := &cgHandler{generation: generation, brokerOpts: k.opts, subOpts: opt, reader: kafka.NewReader(cfg), handler: handler}
generation.Start(cgh.run)
}