broker/segmentio: fix offsets

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2020-04-22 13:50:18 +03:00
parent 5cad4c7723
commit dd94415716

View File

@ -66,6 +66,9 @@ func (p *publication) Message() *broker.Message {
} }
func (p *publication) Ack() error { func (p *publication) Ack() error {
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
logger.Tracef("commit offset %#+v\n", p.offsets)
}
return p.generation.CommitOffsets(p.offsets) return p.generation.CommitOffsets(p.offsets)
} }
@ -252,7 +255,7 @@ func (k *kBroker) Subscribe(topic string, handler broker.Handler, opts ...broker
switch err { switch err {
case kafka.ErrGroupClosed: case kafka.ErrGroupClosed:
if logger.V(logger.TraceLevel, logger.DefaultLogger) { if logger.V(logger.TraceLevel, logger.DefaultLogger) {
logger.Trace("[segmentio] consumer generation ended %v", k.opts.Context.Err()) logger.Tracef("[segmentio] consumer generation ended %v", k.opts.Context.Err())
} }
continue continue
default: default:
@ -271,8 +274,10 @@ func (k *kBroker) Subscribe(topic string, handler broker.Handler, opts ...broker
cfg.Topic = t cfg.Topic = t
cfg.Partition = assignment.ID cfg.Partition = assignment.ID
cfg.GroupID = "" cfg.GroupID = ""
cfg.StartOffset = assignment.Offset // cfg.StartOffset = assignment.Offset
cgh := &cgHandler{generation: generation, brokerOpts: k.opts, subOpts: opt, reader: kafka.NewReader(cfg), handler: handler} reader := kafka.NewReader(cfg)
reader.SetOffset(assignment.Offset)
cgh := &cgHandler{generation: generation, brokerOpts: k.opts, subOpts: opt, reader: reader, handler: handler}
generation.Start(cgh.run) generation.Start(cgh.run)
} }
} }