multiple fixes

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2021-09-15 18:49:50 +03:00
parent b445a7eeb7
commit 09f4a15ec4
2 changed files with 13 additions and 9 deletions

View File

@ -19,9 +19,10 @@ import (
) )
var ( var (
msgcnt = int64(12000000) msgcnt = int64(12000000)
group = "32" group = "38"
prefill = false prefill = false
loglevel = logger.InfoLevel
) )
var bm = &broker.Message{ var bm = &broker.Message{
@ -34,7 +35,7 @@ func TestPubSub(t *testing.T) {
t.Skip() t.Skip()
} }
if err := logger.DefaultLogger.Init(logger.WithLevel(logger.TraceLevel), logger.WithCallerSkipCount(3)); err != nil { if err := logger.DefaultLogger.Init(logger.WithLevel(loglevel), logger.WithCallerSkipCount(3)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
ctx := context.Background() ctx := context.Background()

13
util.go
View File

@ -50,7 +50,6 @@ func (s *subscriber) run(ctx context.Context) {
return return
} }
s.kopts.Logger.Infof(ctx, "handle fetches")
fetches.EachPartition(func(p kgo.FetchTopicPartition) { fetches.EachPartition(func(p kgo.FetchTopicPartition) {
s.Lock() s.Lock()
consumers := s.consumers[p.Topic] consumers := s.consumers[p.Topic]
@ -88,7 +87,7 @@ func (s *subscriber) assigned(ctx context.Context, _ *kgo.Client, assigned map[s
for _, partition := range partitions { for _, partition := range partitions {
w := worker{ w := worker{
done: make(chan struct{}), done: make(chan struct{}),
recs: make(chan []*kgo.Record, maxInflight), recs: make(chan []*kgo.Record),
cherr: make(chan error), cherr: make(chan error),
kopts: s.kopts, kopts: s.kopts,
opts: s.opts, opts: s.opts,
@ -130,6 +129,7 @@ func (w *worker) handle() {
eh = w.opts.ErrorHandler eh = w.opts.ErrorHandler
} }
paused := false
for { for {
select { select {
case <-w.ctx.Done(): case <-w.ctx.Done():
@ -138,7 +138,8 @@ func (w *worker) handle() {
case <-w.done: case <-w.done:
return return
case recs := <-w.recs: case recs := <-w.recs:
if len(recs) == w.maxInflight { if len(recs) >= w.maxInflight {
paused = true
w.reader.PauseFetchPartitions(w.tpmap) w.reader.PauseFetchPartitions(w.tpmap)
} }
for _, record := range recs { for _, record := range recs {
@ -189,8 +190,10 @@ func (w *worker) handle() {
} }
pPool.Put(p) pPool.Put(p)
} }
if paused {
w.reader.ResumeFetchPartitions(w.tpmap) paused = false
w.reader.ResumeFetchPartitions(w.tpmap)
}
} }
} }
} }