allow rebalance on unsubscribe

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2023-05-25 15:11:53 +03:00
parent 5c4332ffc4
commit 52318d68b8

View File

@ -52,10 +52,11 @@ func (s *subscriber) Unsubscribe(ctx context.Context) error {
return nil return nil
} }
select { select {
case <-ctx.Done(): // case <-ctx.Done():
return ctx.Err() // return ctx.Err()
default: default:
s.c.PauseFetchTopics(s.topic) s.c.PauseFetchTopics(s.topic)
s.c.CloseAllowingRebalance()
kc := make(map[string][]int32) kc := make(map[string][]int32)
for ctp := range s.consumers { for ctp := range s.consumers {
kc[ctp.t] = append(kc[ctp.t], ctp.p) kc[ctp.t] = append(kc[ctp.t], ctp.p)
@ -80,7 +81,6 @@ func (s *subscriber) poll(ctx context.Context) {
s.c.CloseAllowingRebalance() s.c.CloseAllowingRebalance()
return return
case <-s.done: case <-s.done:
s.c.CloseAllowingRebalance()
return return
default: default:
fetches := s.c.PollRecords(ctx, maxInflight) fetches := s.c.PollRecords(ctx, maxInflight)