diff --git a/segmentio.go b/segmentio.go index 457a8d8..8860d6e 100644 --- a/segmentio.go +++ b/segmentio.go @@ -47,7 +47,6 @@ type publication struct { msg *broker.Message ackCh chan map[string]map[int]int64 readerDone *int32 - ackChMu sync.Mutex } func (p *publication) Topic() string { @@ -62,9 +61,7 @@ func (p *publication) Ack() error { if atomic.LoadInt32(p.readerDone) == 1 { return fmt.Errorf("kafka reader done") } - p.ackChMu.Lock() p.ackCh <- map[string]map[int]int64{p.topic: {p.partition: p.offset}} - p.ackChMu.Unlock() if cerr := p.ackErr.Load(); cerr != nil { return cerr.(error) } @@ -459,14 +456,32 @@ func (k *kBroker) commitLoop(generation *kafka.Generation, commitInterval time.D go func() { defer func() { - for _, errCh := range errChs { - close(errCh) - } close(commitDoneCh) }() + checkTicker := time.NewTicker(300 * time.Millisecond) + defer checkTicker.Stop() + for { select { + case <-checkTicker.C: + if atomic.LoadInt32(cntWait) == 0 { + //fmt.Printf("cntWait IS 0\n") + mapMu.Lock() + if len(offsets) > 0 { + if err := generation.CommitOffsets(offsets); err != nil { + for _, errCh := range errChs { + errCh <- err + } + return + } + } + mapMu.Unlock() + if k.opts.Logger.V(logger.TraceLevel) { + k.opts.Logger.Trace(k.opts.Context, "stop commit loop") + } + return + } case ack := <-ackCh: if k.opts.Logger.V(logger.TraceLevel) { k.opts.Logger.Tracef(k.opts.Context, "new commit offsets: %v", ack) @@ -510,7 +525,6 @@ func (k *kBroker) commitLoop(generation *kafka.Generation, commitInterval time.D if k.opts.Logger.V(logger.TraceLevel) { k.opts.Logger.Trace(k.opts.Context, "stop commit loop") } - return } //fmt.Printf("cntWait NOT 0\n") @@ -569,8 +583,6 @@ func (h *cgHandler) run(ctx context.Context) { h.brokerOpts.Logger.Trace(ctx, "start partition reader") } - var ackChMu sync.Mutex - td := DefaultStatsInterval if v, ok := h.brokerOpts.Context.Value(statsIntervalKey{}).(time.Duration); ok && td > 0 { td = v @@ -584,16 +596,11 @@ func (h *cgHandler) run(ctx context.Context) { defer func() { atomic.AddInt32(h.cntWait, -1) - ackChMu.Lock() - if atomic.CompareAndSwapInt32(h.readerDone, 0, 1) { - close(h.ackCh) - } - ackChMu.Unlock() + atomic.CompareAndSwapInt32(h.readerDone, 0, 1) if err := h.reader.Close(); err != nil && h.brokerOpts.Logger.V(logger.ErrorLevel) { h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] reader close error: %v", err) } <-h.commitDoneCh - //fmt.Printf("<-h.commitDoneCh\n") if h.brokerOpts.Logger.V(logger.TraceLevel) { h.brokerOpts.Logger.Trace(ctx, "stop partition reader") } @@ -642,7 +649,7 @@ func (h *cgHandler) run(ctx context.Context) { if h.subOpts.ErrorHandler != nil { eh = h.subOpts.ErrorHandler } - p := &publication{ackCh: h.ackCh, partition: msg.Partition, offset: msg.Offset + 1, topic: msg.Topic, msg: &broker.Message{}, readerDone: h.readerDone, ackChMu: ackChMu} + p := &publication{ackCh: h.ackCh, partition: msg.Partition, offset: msg.Offset + 1, topic: msg.Topic, msg: &broker.Message{}, readerDone: h.readerDone} if h.subOpts.BodyOnly { p.msg.Body = msg.Value diff --git a/stats.go b/stats.go index 06ae7cc..ef7471e 100644 --- a/stats.go +++ b/stats.go @@ -2,7 +2,6 @@ package segmentio import ( "context" - "fmt" "time" kafka "github.com/segmentio/kafka-go" @@ -16,7 +15,7 @@ func readerStats(ctx context.Context, r *kafka.Reader, td time.Duration, m meter for { select { case <-ctx.Done(): - fmt.Printf("done reader stats\n") + //fmt.Printf("done reader stats\n") return case <-ticker.C: if r == nil {