From b445a7eeb7e36a25f1c27495b886f7a5bdb39fd9 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Wed, 15 Sep 2021 18:38:35 +0300 Subject: [PATCH] add locking Signed-off-by: Vasiliy Tolstov --- util.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/util.go b/util.go index 8e72d0a..26e94f5 100644 --- a/util.go +++ b/util.go @@ -50,8 +50,11 @@ func (s *subscriber) run(ctx context.Context) { return } + s.kopts.Logger.Infof(ctx, "handle fetches") fetches.EachPartition(func(p kgo.FetchTopicPartition) { + s.Lock() consumers := s.consumers[p.Topic] + s.Unlock() if consumers == nil { return }