diff --git a/segmentio.go b/segmentio.go index e129059..577e250 100644 --- a/segmentio.go +++ b/segmentio.go @@ -66,6 +66,9 @@ func (p *publication) Message() *broker.Message { } func (p *publication) Ack() error { + if logger.V(logger.TraceLevel, logger.DefaultLogger) { + logger.Tracef("commit offset %#+v\n", p.offsets) + } return p.generation.CommitOffsets(p.offsets) } @@ -252,7 +255,7 @@ func (k *kBroker) Subscribe(topic string, handler broker.Handler, opts ...broker switch err { case kafka.ErrGroupClosed: if logger.V(logger.TraceLevel, logger.DefaultLogger) { - logger.Trace("[segmentio] consumer generation ended %v", k.opts.Context.Err()) + logger.Tracef("[segmentio] consumer generation ended %v", k.opts.Context.Err()) } continue default: @@ -271,8 +274,10 @@ func (k *kBroker) Subscribe(topic string, handler broker.Handler, opts ...broker cfg.Topic = t cfg.Partition = assignment.ID cfg.GroupID = "" - cfg.StartOffset = assignment.Offset - cgh := &cgHandler{generation: generation, brokerOpts: k.opts, subOpts: opt, reader: kafka.NewReader(cfg), handler: handler} + // cfg.StartOffset = assignment.Offset + reader := kafka.NewReader(cfg) + reader.SetOffset(assignment.Offset) + cgh := &cgHandler{generation: generation, brokerOpts: k.opts, subOpts: opt, reader: reader, handler: handler} generation.Start(cgh.run) } }