diff --git a/util.go b/util.go index 8e72d0a..26e94f5 100644 --- a/util.go +++ b/util.go @@ -50,8 +50,11 @@ func (s *subscriber) run(ctx context.Context) { return } + s.kopts.Logger.Infof(ctx, "handle fetches") fetches.EachPartition(func(p kgo.FetchTopicPartition) { + s.Lock() consumers := s.consumers[p.Topic] + s.Unlock() if consumers == nil { return }