From f39888f6d4fe462044157d7c3697078401809f5a Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sun, 18 Jul 2021 13:45:12 +0300 Subject: [PATCH] support sync and async commit offsets Signed-off-by: Vasiliy Tolstov --- options.go | 2 +- segmentio.go | 76 +++++++++++++++++++++++++++++++++------------------- 2 files changed, 50 insertions(+), 28 deletions(-) diff --git a/options.go b/options.go index efdffda..72b63b7 100644 --- a/options.go +++ b/options.go @@ -13,7 +13,7 @@ var ( DefaultReaderConfig = kafka.ReaderConfig{} DefaultWriterConfig = kafka.WriterConfig{} DefaultStatsInterval = time.Second * 10 - DefaultCommitInterval = time.Second * 2 + DefaultCommitInterval = time.Second * 0 DefaultCommitQueueSize = 2000 ) diff --git a/segmentio.go b/segmentio.go index 6e340b3..f736020 100644 --- a/segmentio.go +++ b/segmentio.go @@ -384,7 +384,7 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha k.opts.Logger.Trace(k.opts.Context, "start async commit loop") } // run async commit loop - go k.commitLoop(generation, ackCh, errChs, &wg) + go k.commitLoop(generation, k.readerConfig.CommitInterval, ackCh, errChs, &wg) } } }() @@ -402,19 +402,59 @@ type cgHandler struct { wg *sync.WaitGroup } -func (k *kBroker) commitLoop(generation *kafka.Generation, ackCh chan map[string]map[int]int64, errChs []chan error, wg *sync.WaitGroup) { - var mapMu sync.Mutex +func (k *kBroker) commitLoop(generation *kafka.Generation, commitInterval time.Duration, ackCh chan map[string]map[int]int64, errChs []chan error, wg *sync.WaitGroup) { td := DefaultCommitInterval + + if commitInterval > 0 { + td = commitInterval + } + if v, ok := k.opts.Context.Value(commitIntervalKey{}).(time.Duration); ok && td > 0 { td = v } - ticker := time.NewTicker(td) - defer ticker.Stop() + // async commit loop + if td > 0 { + ticker := time.NewTicker(td) + defer ticker.Stop() - offsets := make(map[string]map[int]int64, 4) + var mapMu sync.Mutex + offsets := make(map[string]map[int]int64, 4) + for { + select { + default: + wg.Wait() + if k.opts.Logger.V(logger.TraceLevel) { + k.opts.Logger.Trace(k.opts.Context, "all readers are done, return from commit loop") + } + return + case <-ticker.C: + mapMu.Lock() + if len(offsets) == 0 { + mapMu.Unlock() + continue + } + if k.opts.Logger.V(logger.TraceLevel) { + k.opts.Logger.Tracef(k.opts.Context, "async commit offsets: %v", offsets) + } + err := generation.CommitOffsets(offsets) + if err != nil { + for _, errCh := range errChs { + errCh <- err + close(errCh) + } + mapMu.Unlock() + return + } + mapMu.Unlock() + offsets = make(map[string]map[int]int64, 4) + } + } + } + + // sync commit loop for { select { default: @@ -424,38 +464,20 @@ func (k *kBroker) commitLoop(generation *kafka.Generation, ackCh chan map[string } return case ack := <-ackCh: - mapMu.Lock() - for k, v := range ack { - if _, ok := offsets[k]; !ok { - offsets[k] = make(map[int]int64, 4) - } - for p, o := range v { - offsets[k][p] = o + 1 - } - } - mapMu.Unlock() - case <-ticker.C: - mapMu.Lock() - if len(offsets) == 0 { - mapMu.Unlock() - continue - } if k.opts.Logger.V(logger.TraceLevel) { - k.opts.Logger.Tracef(k.opts.Context, "commit offsets: %v", offsets) + k.opts.Logger.Tracef(k.opts.Context, "sync commit offsets: %v", ack) } - err := generation.CommitOffsets(offsets) + err := generation.CommitOffsets(ack) if err != nil { for _, errCh := range errChs { errCh <- err close(errCh) } - mapMu.Unlock() return } - mapMu.Unlock() - offsets = make(map[string]map[int]int64, 4) } } + } func (h *cgHandler) run(ctx context.Context) {