fix graceful shutdown

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2023-05-13 15:12:26 +03:00
parent 8bbcc30d04
commit 3a86d4c0f4
2 changed files with 26 additions and 5 deletions

View File

@@ -55,6 +55,12 @@ func (s *subscriber) Unsubscribe(ctx context.Context) error {
case <-ctx.Done():
return ctx.Err()
default:
s.c.PauseFetchTopics(s.topic)
kc := make(map[string][]int32)
for ctp := range s.consumers {
kc[ctp.t] = append(kc[ctp.t], ctp.p)
}
s.killConsumers(ctx, kc)
close(s.done)
s.closed = true
}
@@ -71,15 +77,14 @@ func (s *subscriber) poll(ctx context.Context) {
for {
select {
case <-ctx.Done():
s.c.Close()
s.c.CloseAllowingRebalance()
return
case <-s.done:
s.c.Close()
s.c.CloseAllowingRebalance()
return
default:
fetches := s.c.PollRecords(ctx, maxInflight)
if fetches.IsClientClosed() {
s.kopts.Logger.Errorf(ctx, "[kgo] client closed")
if !s.closed && fetches.IsClientClosed() {
s.closed = true
return
}