diff --git a/subscriber.go b/subscriber.go index 4aeb6b1..9fde3f2 100644 --- a/subscriber.go +++ b/subscriber.go @@ -147,9 +147,9 @@ func (s *Subscriber) poll(ctx context.Context) { fetches.EachPartition(func(p kgo.FetchTopicPartition) { tps := tp{p.Topic, p.Partition} - s.Lock() + s.mu.Lock() c := s.consumers[tps] - s.Unlock() + s.mu.Unlock() if c != nil { c.recs <- p } @@ -166,15 +166,15 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32) for topic, partitions := range lost { for _, partition := range partitions { tps := tp{topic, partition} - s.Lock() + s.mu.Lock() pc, ok := s.consumers[tps] - s.Unlock() + s.mu.Unlock() if !ok { continue } - s.Lock() + s.mu.Lock() delete(s.consumers, tps) - s.Unlock() + s.mu.Unlock() close(pc.quit) if s.kopts.Logger.V(logger.DebugLevel) { s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] waiting for work to finish topic %s partition %d", topic, partition))