diff --git a/subscriber.go b/subscriber.go index 2026834..44560d7 100644 --- a/subscriber.go +++ b/subscriber.go @@ -186,7 +186,9 @@ func (s *subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[str kopts: s.kopts, opts: s.opts, } + s.Lock() s.consumers[tp{topic, partition}] = pc + s.Unlock() go pc.consume() } }