From 99ea444f0fbbdeab39c2e63406da541d58d8fa4a Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Tue, 20 Jul 2021 13:04:19 +0300 Subject: [PATCH 1/3] improve consumer speed Signed-off-by: Vasiliy Tolstov --- segmentio.go | 332 +++++++++++++++++++++++++++++++++------------------ stats.go | 2 + 2 files changed, 221 insertions(+), 113 deletions(-) diff --git a/segmentio.go b/segmentio.go index f736020..457a8d8 100644 --- a/segmentio.go +++ b/segmentio.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" "github.com/google/uuid" @@ -31,7 +32,6 @@ type subscriber struct { topic string opts broker.SubscribeOptions closed bool - done chan struct{} group *kafka.ConsumerGroup cgcfg kafka.ConsumerGroupConfig brokerOpts broker.Options @@ -39,14 +39,15 @@ type subscriber struct { } type publication struct { - topic string - partition int - offset int64 - err error - ackErr *error - msg *broker.Message - ackCh chan map[string]map[int]int64 - sync.Mutex + topic string + partition int + offset int64 + err error + ackErr atomic.Value + msg *broker.Message + ackCh chan map[string]map[int]int64 + readerDone *int32 + ackChMu sync.Mutex } func (p *publication) Topic() string { @@ -58,8 +59,16 @@ func (p *publication) Message() *broker.Message { } func (p *publication) Ack() error { + if atomic.LoadInt32(p.readerDone) == 1 { + return fmt.Errorf("kafka reader done") + } + p.ackChMu.Lock() p.ackCh <- map[string]map[int]int64{p.topic: {p.partition: p.offset}} - return *p.ackErr + p.ackChMu.Unlock() + if cerr := p.ackErr.Load(); cerr != nil { + return cerr.(error) + } + return nil } func (p *publication) Error() error { @@ -79,12 +88,13 @@ func (s *subscriber) Unsubscribe(ctx context.Context) error { s.Lock() s.closed = true group := s.group - close(s.done) s.Unlock() + //fmt.Printf("unsub start\n") if group != nil { err = group.Close() } + //fmt.Printf("unsub end\n") return err } @@ -266,26 +276,42 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha StartOffset: k.readerConfig.StartOffset, Logger: k.readerConfig.Logger, ErrorLogger: k.readerConfig.ErrorLogger, + Dialer: k.readerConfig.Dialer, } if err := cgcfg.Validate(); err != nil { return nil, err } - - cgroup, err := kafka.NewConsumerGroup(cgcfg) - if err != nil { - return nil, err + gCtx := k.opts.Context + if ctx != nil { + gCtx = ctx } - sub := &subscriber{brokerOpts: k.opts, opts: opt, topic: topic, group: cgroup, cgcfg: cgcfg, done: make(chan struct{})} + sub := &subscriber{brokerOpts: k.opts, opts: opt, topic: topic, cgcfg: cgcfg} + sub.createGroup(gCtx) + go func() { + defer func() { + sub.RLock() + closed := sub.closed + sub.RUnlock() + if !closed { + if err := sub.group.Close(); err != nil { + k.opts.Logger.Errorf(k.opts.Context, "[segmentio] consumer group close error %v", err) + } + } + }() + for { select { - case <-sub.done: - return case <-ctx.Done(): - // unexpected context closed + sub.RLock() + closed := sub.closed + sub.RUnlock() + if closed { + return + } if k.opts.Context.Err() != nil && k.opts.Logger.V(logger.ErrorLevel) { - k.opts.Logger.Errorf(k.opts.Context, "[segmentio] context closed unexpected %v", k.opts.Context.Err()) + k.opts.Logger.Errorf(k.opts.Context, "[segmentio] subscribe context closed %v", k.opts.Context.Err()) } return case <-k.opts.Context.Done(): @@ -293,77 +319,67 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha closed := sub.closed sub.RUnlock() if closed { - // unsubcribed and closed return } - // unexpected context closed if k.opts.Context.Err() != nil && k.opts.Logger.V(logger.ErrorLevel) { - k.opts.Logger.Errorf(k.opts.Context, "[segmentio] context closed unexpected %v", k.opts.Context.Err()) + k.opts.Logger.Errorf(k.opts.Context, "[segmentio] broker context closed error %v", k.opts.Context.Err()) } return default: sub.RLock() - group := sub.group closed := sub.closed sub.RUnlock() if closed { return } - gCtx := k.opts.Context - if ctx != nil { - gCtx = ctx - } - generation, err := group.Next(gCtx) + generation, err := sub.group.Next(gCtx) switch err { case nil: // normal execution case kafka.ErrGroupClosed: + k.opts.Logger.Tracef(k.opts.Context, "group closed %v", err) sub.RLock() closed := sub.closed sub.RUnlock() - if !closed { - if k.opts.Logger.V(logger.ErrorLevel) { - k.opts.Logger.Errorf(k.opts.Context, "[segmentio] recreate consumer group, as it closed by kafka %v", k.opts.Context.Err()) - } - if err = group.Close(); err != nil && k.opts.Logger.V(logger.ErrorLevel) { - k.opts.Logger.Errorf(k.opts.Context, "[segmentio] consumer group close error %v", err) - continue - } - sub.createGroup(gCtx) - continue + if closed { + return } - return + if k.opts.Logger.V(logger.ErrorLevel) { + k.opts.Logger.Errorf(k.opts.Context, "[segmentio] recreate consumer group, as it closed by kafka %v", k.opts.Context.Err()) + } + sub.createGroup(gCtx) + continue default: + k.opts.Logger.Tracef(k.opts.Context, "some error: %v", err) sub.RLock() closed := sub.closed sub.RUnlock() - if !closed { - if k.opts.Logger.V(logger.TraceLevel) { - k.opts.Logger.Tracef(k.opts.Context, "[segmentio] recreate consumer group, as unexpected consumer error %T %v", err, err) - } + if closed { + return } - if err = group.Close(); err != nil && k.opts.Logger.V(logger.ErrorLevel) { - k.opts.Logger.Errorf(k.opts.Context, "[segmentio] consumer group close error %v", err) + if k.opts.Logger.V(logger.TraceLevel) { + k.opts.Logger.Tracef(k.opts.Context, "[segmentio] recreate consumer group, as unexpected consumer error %T %v", err, err) } sub.createGroup(gCtx) continue } - var wg sync.WaitGroup - ackCh := make(chan map[string]map[int]int64, DefaultCommitQueueSize) errChLen := 0 for _, assignments := range generation.Assignments { errChLen += len(assignments) } - errChs := make([]chan error, errChLen) + errChs := make([]chan error, 0, errChLen) + + commitDoneCh := make(chan bool) + readerDone := int32(0) + cntWait := int32(0) for topic, assignments := range generation.Assignments { if k.opts.Logger.V(logger.TraceLevel) { k.opts.Logger.Tracef(k.opts.Context, "topic: %s assignments: %v", topic, assignments) } for _, assignment := range assignments { - errCh := make(chan error) cfg := k.readerConfig cfg.Topic = topic cfg.Partition = assignment.ID @@ -374,9 +390,27 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha if k.opts.Logger.V(logger.ErrorLevel) { k.opts.Logger.Errorf(k.opts.Context, "assignments offset %d can be set by reader: %v", assignment.Offset, err) } + if err = reader.Close(); err != nil { + if k.opts.Logger.V(logger.ErrorLevel) { + k.opts.Logger.Errorf(k.opts.Context, "reader close err: %v", err) + } + } continue } - cgh := &cgHandler{brokerOpts: k.opts, subOpts: opt, reader: reader, handler: handler, ackCh: ackCh, errCh: errCh, wg: &wg} + errCh := make(chan error) + errChs = append(errChs, errCh) + cgh := &cgHandler{ + brokerOpts: k.opts, + subOpts: opt, + reader: reader, + handler: handler, + ackCh: ackCh, + errCh: errCh, + cntWait: &cntWait, + readerDone: &readerDone, + commitDoneCh: commitDoneCh, + } + atomic.AddInt32(cgh.cntWait, 1) generation.Start(cgh.run) } } @@ -384,7 +418,7 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha k.opts.Logger.Trace(k.opts.Context, "start async commit loop") } // run async commit loop - go k.commitLoop(generation, k.readerConfig.CommitInterval, ackCh, errChs, &wg) + go k.commitLoop(generation, k.readerConfig.CommitInterval, ackCh, errChs, &readerDone, commitDoneCh, &cntWait) } } }() @@ -393,16 +427,22 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha } type cgHandler struct { - brokerOpts broker.Options - subOpts broker.SubscribeOptions - reader *kafka.Reader - handler broker.Handler - ackCh chan map[string]map[int]int64 - errCh chan error - wg *sync.WaitGroup + brokerOpts broker.Options + subOpts broker.SubscribeOptions + reader *kafka.Reader + handler broker.Handler + ackCh chan map[string]map[int]int64 + errCh chan error + readerDone *int32 + commitDoneCh chan bool + cntWait *int32 } -func (k *kBroker) commitLoop(generation *kafka.Generation, commitInterval time.Duration, ackCh chan map[string]map[int]int64, errChs []chan error, wg *sync.WaitGroup) { +func (k *kBroker) commitLoop(generation *kafka.Generation, commitInterval time.Duration, ackCh chan map[string]map[int]int64, errChs []chan error, readerDone *int32, commitDoneCh chan bool, cntWait *int32) { + + if k.opts.Logger.V(logger.TraceLevel) { + k.opts.Logger.Trace(k.opts.Context, "start commit loop") + } td := DefaultCommitInterval @@ -414,22 +454,88 @@ func (k *kBroker) commitLoop(generation *kafka.Generation, commitInterval time.D td = v } - // async commit loop - if td > 0 { - ticker := time.NewTicker(td) - defer ticker.Stop() + var mapMu sync.Mutex + offsets := make(map[string]map[int]int64, 4) - var mapMu sync.Mutex - offsets := make(map[string]map[int]int64, 4) + go func() { + defer func() { + for _, errCh := range errChs { + close(errCh) + } + close(commitDoneCh) + }() for { select { - default: - wg.Wait() + case ack := <-ackCh: if k.opts.Logger.V(logger.TraceLevel) { - k.opts.Logger.Trace(k.opts.Context, "all readers are done, return from commit loop") + k.opts.Logger.Tracef(k.opts.Context, "new commit offsets: %v", ack) + } + switch td { + case 0: // sync commits as CommitInterval == 0 + if len(ack) > 0 { + err := generation.CommitOffsets(ack) + if err != nil { + for _, errCh := range errChs { + errCh <- err + } + return + } + } + default: // async commits as CommitInterval > 0 + mapMu.Lock() + for t, p := range ack { + if _, ok := offsets[t]; !ok { + offsets[t] = make(map[int]int64, 4) + } + for k, v := range p { + offsets[t][k] = v + } + } + mapMu.Unlock() + } + // check for readers done and commit offsets + if atomic.LoadInt32(cntWait) == 0 { + //fmt.Printf("cntWait IS 0\n") + mapMu.Lock() + if len(offsets) > 0 { + if err := generation.CommitOffsets(offsets); err != nil { + for _, errCh := range errChs { + errCh <- err + } + return + } + } + mapMu.Unlock() + if k.opts.Logger.V(logger.TraceLevel) { + k.opts.Logger.Trace(k.opts.Context, "stop commit loop") + } + + return + } + //fmt.Printf("cntWait NOT 0\n") + } + } + }() + + // async commit loop + if td > 0 { + ticker := time.NewTicker(td) + doneTicker := time.NewTicker(300 * time.Millisecond) + defer doneTicker.Stop() + + for { + select { + case <-doneTicker.C: + if atomic.LoadInt32(readerDone) == 1 { + mapMu.Lock() + if len(offsets) == 0 { + //fmt.Printf("close all on <-readerDoneCh\n") + defer ticker.Stop() + return + } + ticker.Stop() } - return case <-ticker.C: mapMu.Lock() if len(offsets) == 0 { @@ -443,74 +549,64 @@ func (k *kBroker) commitLoop(generation *kafka.Generation, commitInterval time.D if err != nil { for _, errCh := range errChs { errCh <- err - close(errCh) } mapMu.Unlock() return } - mapMu.Unlock() offsets = make(map[string]map[int]int64, 4) - } - } - } - - // sync commit loop - for { - select { - default: - wg.Wait() - if k.opts.Logger.V(logger.TraceLevel) { - k.opts.Logger.Trace(k.opts.Context, "all readers are done, return from commit loop") - } - return - case ack := <-ackCh: - if k.opts.Logger.V(logger.TraceLevel) { - k.opts.Logger.Tracef(k.opts.Context, "sync commit offsets: %v", ack) - } - err := generation.CommitOffsets(ack) - if err != nil { - for _, errCh := range errChs { - errCh <- err - close(errCh) + mapMu.Unlock() + if atomic.LoadInt32(readerDone) == 1 && atomic.LoadInt32(cntWait) == 0 { + //fmt.Printf("close all on <-ticker.C\n") + return } - return } } } - } func (h *cgHandler) run(ctx context.Context) { + if h.brokerOpts.Logger.V(logger.TraceLevel) { + h.brokerOpts.Logger.Trace(ctx, "start partition reader") + } + + var ackChMu sync.Mutex + td := DefaultStatsInterval if v, ok := h.brokerOpts.Context.Value(statsIntervalKey{}).(time.Duration); ok && td > 0 { td = v } + // start stats loop go readerStats(ctx, h.reader, td, h.brokerOpts.Meter) - commitDuration := DefaultCommitInterval - if v, ok := h.brokerOpts.Context.Value(commitIntervalKey{}).(time.Duration); ok && td > 0 { - commitDuration = v - } - - var commitErr error - - h.wg.Add(1) + var commitErr atomic.Value defer func() { - h.wg.Done() + atomic.AddInt32(h.cntWait, -1) + + ackChMu.Lock() + if atomic.CompareAndSwapInt32(h.readerDone, 0, 1) { + close(h.ackCh) + } + ackChMu.Unlock() if err := h.reader.Close(); err != nil && h.brokerOpts.Logger.V(logger.ErrorLevel) { h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] reader close error: %v", err) } + <-h.commitDoneCh + //fmt.Printf("<-h.commitDoneCh\n") + if h.brokerOpts.Logger.V(logger.TraceLevel) { + h.brokerOpts.Logger.Trace(ctx, "stop partition reader") + } }() go func() { for { select { case err := <-h.errCh: - commitErr = err + if err != nil { + commitErr.Store(err) + } case <-ctx.Done(): - time.Sleep(commitDuration) return } } @@ -523,20 +619,30 @@ func (h *cgHandler) run(ctx context.Context) { if h.brokerOpts.Logger.V(logger.ErrorLevel) { h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] unexpected error type: %T err: %v", err, err) } + //fmt.Printf("exit from readMessage loop\n") return case kafka.ErrGenerationEnded: // generation has ended if h.brokerOpts.Logger.V(logger.TraceLevel) { - h.brokerOpts.Logger.Trace(h.brokerOpts.Context, "[segmentio] generation ended, rebalance") + h.brokerOpts.Logger.Trace(h.brokerOpts.Context, "[segmentio] generation ended, rebalance or close") } + //fmt.Printf("exit from readMessage loop\n") return case nil: + if cerr := commitErr.Load(); cerr != nil { + if h.brokerOpts.Logger.V(logger.ErrorLevel) { + h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] commit error: %v", cerr) + } + //fmt.Printf("exit from readMessage loop\n") + return + } + eh := h.brokerOpts.ErrorHandler if h.subOpts.ErrorHandler != nil { eh = h.subOpts.ErrorHandler } - p := &publication{ackCh: h.ackCh, partition: msg.Partition, offset: msg.Offset, topic: msg.Topic, msg: &broker.Message{}} + p := &publication{ackCh: h.ackCh, partition: msg.Partition, offset: msg.Offset + 1, topic: msg.Topic, msg: &broker.Message{}, readerDone: h.readerDone, ackChMu: ackChMu} if h.subOpts.BodyOnly { p.msg.Body = msg.Value @@ -554,14 +660,14 @@ func (h *cgHandler) run(ctx context.Context) { continue } } - p.Lock() - p.ackErr = &commitErr - p.Unlock() + if cerr := commitErr.Load(); cerr != nil { + p.ackErr.Store(cerr.(bool)) + } err = h.handler(p) if err == nil && h.subOpts.AutoAck { if err = p.Ack(); err != nil { if h.brokerOpts.Logger.V(logger.ErrorLevel) { - h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio]: unable to commit msg: %v", err) + h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio]: message ack error: %v", err) return } } diff --git a/stats.go b/stats.go index de84350..06ae7cc 100644 --- a/stats.go +++ b/stats.go @@ -2,6 +2,7 @@ package segmentio import ( "context" + "fmt" "time" kafka "github.com/segmentio/kafka-go" @@ -15,6 +16,7 @@ func readerStats(ctx context.Context, r *kafka.Reader, td time.Duration, m meter for { select { case <-ctx.Done(): + fmt.Printf("done reader stats\n") return case <-ticker.C: if r == nil { -- 2.45.2 From fe7831a56979ade5068f341ae5a5928a3e096a19 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Tue, 20 Jul 2021 14:57:38 +0300 Subject: [PATCH 2/3] uglify code Signed-off-by: Vasiliy Tolstov --- segmentio.go | 39 +++++++++++++++++++++++---------------- stats.go | 3 +-- 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/segmentio.go b/segmentio.go index 457a8d8..8860d6e 100644 --- a/segmentio.go +++ b/segmentio.go @@ -47,7 +47,6 @@ type publication struct { msg *broker.Message ackCh chan map[string]map[int]int64 readerDone *int32 - ackChMu sync.Mutex } func (p *publication) Topic() string { @@ -62,9 +61,7 @@ func (p *publication) Ack() error { if atomic.LoadInt32(p.readerDone) == 1 { return fmt.Errorf("kafka reader done") } - p.ackChMu.Lock() p.ackCh <- map[string]map[int]int64{p.topic: {p.partition: p.offset}} - p.ackChMu.Unlock() if cerr := p.ackErr.Load(); cerr != nil { return cerr.(error) } @@ -459,14 +456,32 @@ func (k *kBroker) commitLoop(generation *kafka.Generation, commitInterval time.D go func() { defer func() { - for _, errCh := range errChs { - close(errCh) - } close(commitDoneCh) }() + checkTicker := time.NewTicker(300 * time.Millisecond) + defer checkTicker.Stop() + for { select { + case <-checkTicker.C: + if atomic.LoadInt32(cntWait) == 0 { + //fmt.Printf("cntWait IS 0\n") + mapMu.Lock() + if len(offsets) > 0 { + if err := generation.CommitOffsets(offsets); err != nil { + for _, errCh := range errChs { + errCh <- err + } + return + } + } + mapMu.Unlock() + if k.opts.Logger.V(logger.TraceLevel) { + k.opts.Logger.Trace(k.opts.Context, "stop commit loop") + } + return + } case ack := <-ackCh: if k.opts.Logger.V(logger.TraceLevel) { k.opts.Logger.Tracef(k.opts.Context, "new commit offsets: %v", ack) @@ -510,7 +525,6 @@ func (k *kBroker) commitLoop(generation *kafka.Generation, commitInterval time.D if k.opts.Logger.V(logger.TraceLevel) { k.opts.Logger.Trace(k.opts.Context, "stop commit loop") } - return } //fmt.Printf("cntWait NOT 0\n") @@ -569,8 +583,6 @@ func (h *cgHandler) run(ctx context.Context) { h.brokerOpts.Logger.Trace(ctx, "start partition reader") } - var ackChMu sync.Mutex - td := DefaultStatsInterval if v, ok := h.brokerOpts.Context.Value(statsIntervalKey{}).(time.Duration); ok && td > 0 { td = v @@ -584,16 +596,11 @@ func (h *cgHandler) run(ctx context.Context) { defer func() { atomic.AddInt32(h.cntWait, -1) - ackChMu.Lock() - if atomic.CompareAndSwapInt32(h.readerDone, 0, 1) { - close(h.ackCh) - } - ackChMu.Unlock() + atomic.CompareAndSwapInt32(h.readerDone, 0, 1) if err := h.reader.Close(); err != nil && h.brokerOpts.Logger.V(logger.ErrorLevel) { h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] reader close error: %v", err) } <-h.commitDoneCh - //fmt.Printf("<-h.commitDoneCh\n") if h.brokerOpts.Logger.V(logger.TraceLevel) { h.brokerOpts.Logger.Trace(ctx, "stop partition reader") } @@ -642,7 +649,7 @@ func (h *cgHandler) run(ctx context.Context) { if h.subOpts.ErrorHandler != nil { eh = h.subOpts.ErrorHandler } - p := &publication{ackCh: h.ackCh, partition: msg.Partition, offset: msg.Offset + 1, topic: msg.Topic, msg: &broker.Message{}, readerDone: h.readerDone, ackChMu: ackChMu} + p := &publication{ackCh: h.ackCh, partition: msg.Partition, offset: msg.Offset + 1, topic: msg.Topic, msg: &broker.Message{}, readerDone: h.readerDone} if h.subOpts.BodyOnly { p.msg.Body = msg.Value diff --git a/stats.go b/stats.go index 06ae7cc..ef7471e 100644 --- a/stats.go +++ b/stats.go @@ -2,7 +2,6 @@ package segmentio import ( "context" - "fmt" "time" kafka "github.com/segmentio/kafka-go" @@ -16,7 +15,7 @@ func readerStats(ctx context.Context, r *kafka.Reader, td time.Duration, m meter for { select { case <-ctx.Done(): - fmt.Printf("done reader stats\n") + //fmt.Printf("done reader stats\n") return case <-ticker.C: if r == nil { -- 2.45.2 From a9541f7c4f61ed32d6ceae75f35e47ff158f6c6b Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Tue, 20 Jul 2021 15:03:37 +0300 Subject: [PATCH 3/3] remove debug Signed-off-by: Vasiliy Tolstov --- segmentio.go | 12 +----------- stats.go | 1 - 2 files changed, 1 insertion(+), 12 deletions(-) diff --git a/segmentio.go b/segmentio.go index 8860d6e..2add790 100644 --- a/segmentio.go +++ b/segmentio.go @@ -59,7 +59,7 @@ func (p *publication) Message() *broker.Message { func (p *publication) Ack() error { if atomic.LoadInt32(p.readerDone) == 1 { - return fmt.Errorf("kafka reader done") + return kafka.ErrGroupClosed } p.ackCh <- map[string]map[int]int64{p.topic: {p.partition: p.offset}} if cerr := p.ackErr.Load(); cerr != nil { @@ -87,11 +87,9 @@ func (s *subscriber) Unsubscribe(ctx context.Context) error { group := s.group s.Unlock() - //fmt.Printf("unsub start\n") if group != nil { err = group.Close() } - //fmt.Printf("unsub end\n") return err } @@ -466,7 +464,6 @@ func (k *kBroker) commitLoop(generation *kafka.Generation, commitInterval time.D select { case <-checkTicker.C: if atomic.LoadInt32(cntWait) == 0 { - //fmt.Printf("cntWait IS 0\n") mapMu.Lock() if len(offsets) > 0 { if err := generation.CommitOffsets(offsets); err != nil { @@ -511,7 +508,6 @@ func (k *kBroker) commitLoop(generation *kafka.Generation, commitInterval time.D } // check for readers done and commit offsets if atomic.LoadInt32(cntWait) == 0 { - //fmt.Printf("cntWait IS 0\n") mapMu.Lock() if len(offsets) > 0 { if err := generation.CommitOffsets(offsets); err != nil { @@ -527,7 +523,6 @@ func (k *kBroker) commitLoop(generation *kafka.Generation, commitInterval time.D } return } - //fmt.Printf("cntWait NOT 0\n") } } }() @@ -544,7 +539,6 @@ func (k *kBroker) commitLoop(generation *kafka.Generation, commitInterval time.D if atomic.LoadInt32(readerDone) == 1 { mapMu.Lock() if len(offsets) == 0 { - //fmt.Printf("close all on <-readerDoneCh\n") defer ticker.Stop() return } @@ -570,7 +564,6 @@ func (k *kBroker) commitLoop(generation *kafka.Generation, commitInterval time.D offsets = make(map[string]map[int]int64, 4) mapMu.Unlock() if atomic.LoadInt32(readerDone) == 1 && atomic.LoadInt32(cntWait) == 0 { - //fmt.Printf("close all on <-ticker.C\n") return } } @@ -626,21 +619,18 @@ func (h *cgHandler) run(ctx context.Context) { if h.brokerOpts.Logger.V(logger.ErrorLevel) { h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] unexpected error type: %T err: %v", err, err) } - //fmt.Printf("exit from readMessage loop\n") return case kafka.ErrGenerationEnded: // generation has ended if h.brokerOpts.Logger.V(logger.TraceLevel) { h.brokerOpts.Logger.Trace(h.brokerOpts.Context, "[segmentio] generation ended, rebalance or close") } - //fmt.Printf("exit from readMessage loop\n") return case nil: if cerr := commitErr.Load(); cerr != nil { if h.brokerOpts.Logger.V(logger.ErrorLevel) { h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] commit error: %v", cerr) } - //fmt.Printf("exit from readMessage loop\n") return } diff --git a/stats.go b/stats.go index ef7471e..de84350 100644 --- a/stats.go +++ b/stats.go @@ -15,7 +15,6 @@ func readerStats(ctx context.Context, r *kafka.Reader, td time.Duration, m meter for { select { case <-ctx.Done(): - //fmt.Printf("done reader stats\n") return case <-ticker.C: if r == nil { -- 2.45.2