diff --git a/subscriber.go b/subscriber.go index 8099fc0..1cf97fa 100644 --- a/subscriber.go +++ b/subscriber.go @@ -52,10 +52,11 @@ func (s *subscriber) Unsubscribe(ctx context.Context) error { return nil } select { - case <-ctx.Done(): - return ctx.Err() + // case <-ctx.Done(): + // return ctx.Err() default: s.c.PauseFetchTopics(s.topic) + s.c.CloseAllowingRebalance() kc := make(map[string][]int32) for ctp := range s.consumers { kc[ctp.t] = append(kc[ctp.t], ctp.p) @@ -80,7 +81,6 @@ func (s *subscriber) poll(ctx context.Context) { s.c.CloseAllowingRebalance() return case <-s.done: - s.c.CloseAllowingRebalance() return default: fetches := s.c.PollRecords(ctx, maxInflight)