allow rebalance on unsubscribe #125
@ -52,10 +52,11 @@ func (s *subscriber) Unsubscribe(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
// case <-ctx.Done():
|
||||||
return ctx.Err()
|
// return ctx.Err()
|
||||||
default:
|
default:
|
||||||
s.c.PauseFetchTopics(s.topic)
|
s.c.PauseFetchTopics(s.topic)
|
||||||
|
s.c.CloseAllowingRebalance()
|
||||||
kc := make(map[string][]int32)
|
kc := make(map[string][]int32)
|
||||||
for ctp := range s.consumers {
|
for ctp := range s.consumers {
|
||||||
kc[ctp.t] = append(kc[ctp.t], ctp.p)
|
kc[ctp.t] = append(kc[ctp.t], ctp.p)
|
||||||
@ -80,7 +81,6 @@ func (s *subscriber) poll(ctx context.Context) {
|
|||||||
s.c.CloseAllowingRebalance()
|
s.c.CloseAllowingRebalance()
|
||||||
return
|
return
|
||||||
case <-s.done:
|
case <-s.done:
|
||||||
s.c.CloseAllowingRebalance()
|
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
fetches := s.c.PollRecords(ctx, maxInflight)
|
fetches := s.c.PollRecords(ctx, maxInflight)
|
||||||
|
Loading…
Reference in New Issue
Block a user