From 52318d68b893e5a89e5f7b58f0eef8e3605190a4 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Thu, 25 May 2023 15:11:53 +0300 Subject: [PATCH] allow rebalance on unsubscribe Signed-off-by: Vasiliy Tolstov --- subscriber.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/subscriber.go b/subscriber.go index 8099fc0..1cf97fa 100644 --- a/subscriber.go +++ b/subscriber.go @@ -52,10 +52,11 @@ func (s *subscriber) Unsubscribe(ctx context.Context) error { return nil } select { - case <-ctx.Done(): - return ctx.Err() + // case <-ctx.Done(): + // return ctx.Err() default: s.c.PauseFetchTopics(s.topic) + s.c.CloseAllowingRebalance() kc := make(map[string][]int32) for ctp := range s.consumers { kc[ctp.t] = append(kc[ctp.t], ctp.p) @@ -80,7 +81,6 @@ func (s *subscriber) poll(ctx context.Context) { s.c.CloseAllowingRebalance() return case <-s.done: - s.c.CloseAllowingRebalance() return default: fetches := s.c.PollRecords(ctx, maxInflight)