broker/segmentio: fix error
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
parent
202e3d8c4e
commit
be82d76d81
@ -233,6 +233,7 @@ func (k *kBroker) Subscribe(topic string, handler broker.Handler, opts ...broker
|
|||||||
cfg := k.readerConfig
|
cfg := k.readerConfig
|
||||||
cfg.Topic = topic
|
cfg.Topic = topic
|
||||||
cfg.Partition = partition
|
cfg.Partition = partition
|
||||||
|
cfg.GroupID = ""
|
||||||
reader := kafka.NewReader(cfg)
|
reader := kafka.NewReader(cfg)
|
||||||
defer reader.Close()
|
defer reader.Close()
|
||||||
// seek to the last committed offset for this partition.
|
// seek to the last committed offset for this partition.
|
||||||
|
Loading…
Reference in New Issue
Block a user