From 71e8a13c702c9d7e5dfd11bbc2e3e252e0e93744 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sun, 11 May 2025 01:49:38 +0300 Subject: [PATCH] new kgo version update deps fixup race conditions add kfake usage Signed-off-by: Vasiliy Tolstov --- subscriber.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/subscriber.go b/subscriber.go index 4aeb6b1..9fde3f2 100644 --- a/subscriber.go +++ b/subscriber.go @@ -147,9 +147,9 @@ func (s *Subscriber) poll(ctx context.Context) { fetches.EachPartition(func(p kgo.FetchTopicPartition) { tps := tp{p.Topic, p.Partition} - s.Lock() + s.mu.Lock() c := s.consumers[tps] - s.Unlock() + s.mu.Unlock() if c != nil { c.recs <- p } @@ -166,15 +166,15 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32) for topic, partitions := range lost { for _, partition := range partitions { tps := tp{topic, partition} - s.Lock() + s.mu.Lock() pc, ok := s.consumers[tps] - s.Unlock() + s.mu.Unlock() if !ok { continue } - s.Lock() + s.mu.Lock() delete(s.consumers, tps) - s.Unlock() + s.mu.Unlock() close(pc.quit) if s.kopts.Logger.V(logger.DebugLevel) { s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] waiting for work to finish topic %s partition %d", topic, partition))