From ceae272ca820ed3a64ed71882434394f13baa8f0 Mon Sep 17 00:00:00 2001 From: Gorbunov Kirill Andreevich Date: Fri, 19 Apr 2024 13:16:23 +0300 Subject: [PATCH 1/2] #133 fix race. --- subscriber.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/subscriber.go b/subscriber.go index 2026834..44560d7 100644 --- a/subscriber.go +++ b/subscriber.go @@ -186,7 +186,9 @@ func (s *subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[str kopts: s.kopts, opts: s.opts, } + s.Lock() s.consumers[tp{topic, partition}] = pc + s.Unlock() go pc.consume() } } -- 2.45.2 From 26daa968b8cb562aba8e4430af83239adcc9a67b Mon Sep 17 00:00:00 2001 From: Gorbunov Kirill Andreevich Date: Fri, 19 Apr 2024 13:26:49 +0300 Subject: [PATCH 2/2] #133 fix race. --- subscriber.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/subscriber.go b/subscriber.go index 44560d7..80d3587 100644 --- a/subscriber.go +++ b/subscriber.go @@ -105,11 +105,13 @@ func (s *subscriber) poll(ctx context.Context) { continue } + s.Lock() for tp := range s.consumers { if v, ok := lmap[tp.p]; ok { s.kopts.Meter.Counter(semconv.BrokerGroupLag, "topic", s.topic, "group", s.opts.Group, "partition", strconv.Itoa(int(tp.p)), "lag", strconv.Itoa(int(v.Lag))) } } + s.Unlock() } } -- 2.45.2