diff --git a/kgo_test.go b/kgo_test.go index 02997ef..491b7a2 100644 --- a/kgo_test.go +++ b/kgo_test.go @@ -239,6 +239,8 @@ func TestKillConsumers_E2E_Rebalance(t *testing.T) { t.Fatal(err) } + bLogger := broker.Logger(logger.DefaultLogger.Clone(logger.WithLevel(logger.DebugLevel))) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -246,6 +248,7 @@ func TestKillConsumers_E2E_Rebalance(t *testing.T) { broker.ContentType("application/octet-stream"), broker.Codec("application/octet-stream", codec.NewCodec()), broker.Addrs(cluster.ListenAddrs()...), + bLogger, kgo.CommitInterval(500*time.Millisecond), kgo.Options( kg.ClientID("test-1"), @@ -266,6 +269,7 @@ func TestKillConsumers_E2E_Rebalance(t *testing.T) { broker.ContentType("application/octet-stream"), broker.Codec("application/octet-stream", codec.NewCodec()), broker.Addrs(cluster.ListenAddrs()...), + bLogger, kgo.CommitInterval(500*time.Millisecond), kgo.Options( kg.ClientID("test-2"), diff --git a/subscriber.go b/subscriber.go index 3eda61f..d181f63 100644 --- a/subscriber.go +++ b/subscriber.go @@ -79,8 +79,10 @@ func (s *Subscriber) Unsubscribe(ctx context.Context) error { kc[ctp.t] = append(kc[ctp.t], ctp.p) } s.killConsumers(ctx, kc) + s.mu.Lock() close(s.done) s.closed = true + s.mu.Unlock() s.c.ResumeFetchTopics(s.topic) return nil