From 5080a528346e6deaa8f2f62a42f73fa469033fbd Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Tue, 2 Dec 2025 21:50:53 +0300 Subject: [PATCH] fixup revoked error Signed-off-by: Vasiliy Tolstov --- subscriber.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/subscriber.go b/subscriber.go index df88d35..331290e 100644 --- a/subscriber.go +++ b/subscriber.go @@ -199,7 +199,15 @@ func (s *Subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[str } s.killConsumers(ctx, revoked) if err := c.CommitMarkedOffsets(ctx); err != nil { - s.kopts.Logger.Error(ctx, "[kgo] revoked CommitMarkedOffsets error", err) + s.mu.Lock() + tpc := make(map[tp]*consumer, len(s.consumers)) + maps.Copy(tpc, s.consumers) + s.mu.Unlock() + for tp, c := range tpc { + if c != nil { + c.recs <- newErrorFetchTopicPartition(err, tp.t, tp.p) + } + } } }