From f98811b93e3a4759b67c8f0e6486052b44e676cb Mon Sep 17 00:00:00 2001 From: Evstigneev Denis Date: Wed, 10 Dec 2025 16:34:44 +0300 Subject: [PATCH] add test rebalance for killconsumers --- kgo_test.go | 155 ++++++++++++++++++++++++++++++++++++++++++++++++++ subscriber.go | 2 +- 2 files changed, 156 insertions(+), 1 deletion(-) diff --git a/kgo_test.go b/kgo_test.go index 47ee974..02997ef 100644 --- a/kgo_test.go +++ b/kgo_test.go @@ -48,6 +48,7 @@ func TestFail(t *testing.T) { kg.ClientID("test"), kg.FetchMaxBytes(10*1024*1024), kg.AllowAutoTopicCreation(), + kg.MaxBufferedRecords(10), ), ) @@ -231,3 +232,157 @@ func TestPubSub(t *testing.T) { }() <-done } + +func TestKillConsumers_E2E_Rebalance(t *testing.T) { + logger.DefaultLogger = slog.NewLogger() + if err := logger.DefaultLogger.Init(logger.WithLevel(logger.InfoLevel)); err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + b1 := kgo.NewBroker( + broker.ContentType("application/octet-stream"), + broker.Codec("application/octet-stream", codec.NewCodec()), + broker.Addrs(cluster.ListenAddrs()...), + kgo.CommitInterval(500*time.Millisecond), + kgo.Options( + kg.ClientID("test-1"), + kg.FetchMaxBytes(10*1024*1024), + kg.AllowAutoTopicCreation(), + kg.MaxBufferedRecords(10), + ), + ) + if err := b1.Init(); err != nil { + t.Fatal(err) + } + if err := b1.Connect(ctx); err != nil { + t.Fatal(err) + } + defer func() { _ = b1.Disconnect(context.Background()) }() + + b2 := kgo.NewBroker( + broker.ContentType("application/octet-stream"), + broker.Codec("application/octet-stream", codec.NewCodec()), + broker.Addrs(cluster.ListenAddrs()...), + kgo.CommitInterval(500*time.Millisecond), + kgo.Options( + kg.ClientID("test-2"), + kg.FetchMaxBytes(10*1024*1024), + kg.AllowAutoTopicCreation(), + kg.MaxBufferedRecords(10), + ), + ) + if err := b2.Init(); err != nil { + t.Fatal(err) + } + if err := b2.Connect(ctx); err != nil { + t.Fatal(err) + } + defer func() { _ = b2.Disconnect(context.Background()) }() + + const topic = "test.kill" + const total = int64(1000) + + var ( + processed int64 + c1Count int64 + c2Count int64 + ) + + done := make(chan struct{}) + + h1 := func(msg broker.Message) error { + time.Sleep(2 * time.Millisecond) + + atomic.AddInt64(&processed, 1) + atomic.AddInt64(&c1Count, 1) + + if atomic.LoadInt64(&processed) >= total { + select { + case <-done: + default: + close(done) + } + } + return msg.Ack() + } + + h2 := func(msg broker.Message) error { + time.Sleep(2 * time.Millisecond) + + atomic.AddInt64(&processed, 1) + atomic.AddInt64(&c2Count, 1) + + if atomic.LoadInt64(&processed) >= total { + select { + case <-done: + default: + close(done) + } + } + return msg.Ack() + } + + sub1, err := b1.Subscribe(ctx, topic, h1, + broker.SubscribeAutoAck(true), + broker.SubscribeGroup(group), + broker.SubscribeBodyOnly(true), + ) + if err != nil { + t.Fatal(err) + } + defer func() { _ = sub1.Unsubscribe(context.Background()) }() + + go func() { + for atomic.LoadInt64(&processed) < total { + batchSize := int64(10) + msgs := make([]broker.Message, 0, batchSize) + for i := int64(0); i < batchSize; i++ { + m, _ := b1.NewMessage(ctx, metadata.New(0), []byte("msg")) + msgs = append(msgs, m) + } + _ = b1.Publish(ctx, topic, msgs...) + + time.Sleep(5 * time.Millisecond) + } + }() + + time.Sleep(200 * time.Millisecond) + + // второй consumer подключается -> KAFKA REBALANCE + sub2, err := b2.Subscribe(ctx, topic, h2, + broker.SubscribeAutoAck(true), + broker.SubscribeGroup(group), + broker.SubscribeBodyOnly(true), + ) + if err != nil { + t.Fatal(err) + } + defer func() { _ = sub2.Unsubscribe(context.Background()) }() + + // ждём окончания + select { + case <-done: + t.Log("DONE") + case <-ctx.Done(): + t.Fatalf("timeout: processed=%d of %d (c1=%d, c2=%d)", + atomic.LoadInt64(&processed), + total, + atomic.LoadInt64(&c1Count), + atomic.LoadInt64(&c2Count), + ) + } + + if got := atomic.LoadInt64(&processed); got != total { + t.Fatalf("processed %d, want %d", got, total) + } + + if atomic.LoadInt64(&c1Count) == 0 { + t.Fatalf("consumer1 did not process any messages") + } + if atomic.LoadInt64(&c2Count) == 0 { + t.Fatalf("consumer2 did not process any messages (rebalance/killConsumers likely broken)") + } +} diff --git a/subscriber.go b/subscriber.go index e5b06a3..3eda61f 100644 --- a/subscriber.go +++ b/subscriber.go @@ -119,7 +119,7 @@ func (s *Subscriber) poll(ctx context.Context) { } fetches.EachError(func(t string, p int32, err error) { tps := tp{t, p} - s.mu.Lock() + s.mu.RLock() c := s.consumers[tps] s.mu.RUnlock() if c != nil {