diff --git a/kgo_test.go b/kgo_test.go index e03c3f2..f796c87 100644 --- a/kgo_test.go +++ b/kgo_test.go @@ -19,9 +19,10 @@ import ( ) var ( - msgcnt = int64(12000000) - group = "32" - prefill = false + msgcnt = int64(12000000) + group = "38" + prefill = false + loglevel = logger.InfoLevel ) var bm = &broker.Message{ @@ -34,7 +35,7 @@ func TestPubSub(t *testing.T) { t.Skip() } - if err := logger.DefaultLogger.Init(logger.WithLevel(logger.TraceLevel), logger.WithCallerSkipCount(3)); err != nil { + if err := logger.DefaultLogger.Init(logger.WithLevel(loglevel), logger.WithCallerSkipCount(3)); err != nil { t.Fatal(err) } ctx := context.Background() diff --git a/util.go b/util.go index 26e94f5..5f8008f 100644 --- a/util.go +++ b/util.go @@ -50,7 +50,6 @@ func (s *subscriber) run(ctx context.Context) { return } - s.kopts.Logger.Infof(ctx, "handle fetches") fetches.EachPartition(func(p kgo.FetchTopicPartition) { s.Lock() consumers := s.consumers[p.Topic] @@ -88,7 +87,7 @@ func (s *subscriber) assigned(ctx context.Context, _ *kgo.Client, assigned map[s for _, partition := range partitions { w := worker{ done: make(chan struct{}), - recs: make(chan []*kgo.Record, maxInflight), + recs: make(chan []*kgo.Record), cherr: make(chan error), kopts: s.kopts, opts: s.opts, @@ -130,6 +129,7 @@ func (w *worker) handle() { eh = w.opts.ErrorHandler } + paused := false for { select { case <-w.ctx.Done(): @@ -138,7 +138,8 @@ func (w *worker) handle() { case <-w.done: return case recs := <-w.recs: - if len(recs) == w.maxInflight { + if len(recs) >= w.maxInflight { + paused = true w.reader.PauseFetchPartitions(w.tpmap) } for _, record := range recs { @@ -189,8 +190,10 @@ func (w *worker) handle() { } pPool.Put(p) } - - w.reader.ResumeFetchPartitions(w.tpmap) + if paused { + paused = false + w.reader.ResumeFetchPartitions(w.tpmap) + } } } }