diff --git a/go.mod b/go.mod index ede9cf1..9a94c69 100644 --- a/go.mod +++ b/go.mod @@ -2,13 +2,12 @@ module go.unistack.org/micro-broker-kgo/v3 go 1.24.0 -toolchain go1.24.3 - require ( github.com/google/uuid v1.6.0 github.com/stretchr/testify v1.11.1 - github.com/twmb/franz-go v1.20.2 + github.com/twmb/franz-go v1.20.4 github.com/twmb/franz-go/pkg/kadm v1.17.1 + github.com/twmb/franz-go/pkg/kfake v0.0.0-20251227070528-0c71f7e25fa1 github.com/twmb/franz-go/pkg/kmsg v1.12.0 go.opentelemetry.io/otel v1.38.0 go.unistack.org/micro/v3 v3.11.48 @@ -23,10 +22,10 @@ require ( github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect go.unistack.org/micro-proto/v3 v3.4.1 // indirect - golang.org/x/crypto v0.43.0 // indirect - golang.org/x/sys v0.37.0 // indirect + golang.org/x/crypto v0.47.0 // indirect + golang.org/x/sys v0.40.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20251022142026-3a174f9686a8 // indirect google.golang.org/grpc v1.76.0 // indirect - google.golang.org/protobuf v1.36.10 // indirect + google.golang.org/protobuf v1.36.11 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index be36d56..224139d 100644 --- a/go.sum +++ b/go.sum @@ -11,8 +11,6 @@ github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= -github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co= github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= @@ -31,42 +29,34 @@ github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= -github.com/twmb/franz-go v1.19.5 h1:W7+o8D0RsQsedqib71OVlLeZ0zI6CbFra7yTYhZTs5Y= -github.com/twmb/franz-go v1.19.5/go.mod h1:4kFJ5tmbbl7asgwAGVuyG1ZMx0NNpYk7EqflvWfPCpM= -github.com/twmb/franz-go v1.20.2 h1:CiwhyKZHW6vqSHJkh+RTxFAJkio0jBjM/JQhx/HZ72A= -github.com/twmb/franz-go v1.20.2/go.mod h1:YCnepDd4gl6vdzG03I5Wa57RnCTIC6DVEyMpDX/J8UA= -github.com/twmb/franz-go/pkg/kadm v1.16.1 h1:IEkrhTljgLHJ0/hT/InhXGjPdmWfFvxp7o/MR7vJ8cw= -github.com/twmb/franz-go/pkg/kadm v1.16.1/go.mod h1:Ue/ye1cc9ipsQFg7udFbbGiFNzQMqiH73fGC2y0rwyc= +github.com/twmb/franz-go v1.20.4 h1:1wTvyLTOxS0oJh5ro/DVt2JHVdx7/kGNtmtFhbcr0O0= +github.com/twmb/franz-go v1.20.4/go.mod h1:YCnepDd4gl6vdzG03I5Wa57RnCTIC6DVEyMpDX/J8UA= github.com/twmb/franz-go/pkg/kadm v1.17.1 h1:Bt02Y/RLgnFO2NP2HVP1kd2TFtGRiJZx+fSArjZDtpw= github.com/twmb/franz-go/pkg/kadm v1.17.1/go.mod h1:s4duQmrDbloVW9QTMXhs6mViTepze7JLG43xwPcAeTg= -github.com/twmb/franz-go/pkg/kmsg v1.11.2 h1:hIw75FpwcAjgeyfIGFqivAvwC5uNIOWRGvQgZhH4mhg= -github.com/twmb/franz-go/pkg/kmsg v1.11.2/go.mod h1:CFfkkLysDNmukPYhGzuUcDtf46gQSqCZHMW1T4Z+wDE= +github.com/twmb/franz-go/pkg/kfake v0.0.0-20251227070528-0c71f7e25fa1 h1:z21wmCm1zk2DZoi3khCD3OWA05FVj1fsEJOBIxpIJH0= +github.com/twmb/franz-go/pkg/kfake v0.0.0-20251227070528-0c71f7e25fa1/go.mod h1:2W79ILYghTbIIi4y4j0k3PmV2mCxWoj6D7PtQlZmH3E= github.com/twmb/franz-go/pkg/kmsg v1.12.0 h1:CbatD7ers1KzDNgJqPbKOq0Bz/WLBdsTH75wgzeVaPc= github.com/twmb/franz-go/pkg/kmsg v1.12.0/go.mod h1:+DPt4NC8RmI6hqb8G09+3giKObE6uD2Eya6CfqBpeJY= go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8= go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM= go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q= go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo= -go.unistack.org/micro/v3 v3.11.45 h1:fjTLZYWgsVf9FIMZBxOg8ios2/tmyimnjZrsrxEUeXU= -go.unistack.org/micro/v3 v3.11.45/go.mod h1:fDQ8Mu9wubaFP0L8hNQlpzHiEnWN0wbOlawN9HYo0N4= go.unistack.org/micro/v3 v3.11.48 h1:lHJYSHU2z1TTcuswItGwG7cZXN6n04EFqY7lk/0gA7w= go.unistack.org/micro/v3 v3.11.48/go.mod h1:fDQ8Mu9wubaFP0L8hNQlpzHiEnWN0wbOlawN9HYo0N4= -golang.org/x/crypto v0.43.0 h1:dduJYIi3A3KOfdGOHX8AVZ/jGiyPa3IbBozJ5kNuE04= -golang.org/x/crypto v0.43.0/go.mod h1:BFbav4mRNlXJL4wNeejLpWxB7wMbc79PdRGhWKncxR0= -golang.org/x/net v0.45.0 h1:RLBg5JKixCy82FtLJpeNlVM0nrSqpCRYzVU1n8kj0tM= -golang.org/x/net v0.45.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY= -golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ= -golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= -golang.org/x/text v0.30.0 h1:yznKA/E9zq54KzlzBEAWn1NXSQ8DIp/NYMy88xJjl4k= -golang.org/x/text v0.30.0/go.mod h1:yDdHFIX9t+tORqspjENWgzaCVXgk0yYnYuSZ8UzzBVM= -google.golang.org/genproto/googleapis/rpc v0.0.0-20251007200510-49b9836ed3ff h1:A90eA31Wq6HOMIQlLfzFwzqGKBTuaVztYu/g8sn+8Zc= -google.golang.org/genproto/googleapis/rpc v0.0.0-20251007200510-49b9836ed3ff/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk= +golang.org/x/crypto v0.47.0 h1:V6e3FRj+n4dbpw86FJ8Fv7XVOql7TEwpHapKoMJ/GO8= +golang.org/x/crypto v0.47.0/go.mod h1:ff3Y9VzzKbwSSEzWqJsJVBnWmRwRSHt/6Op5n9bQc4A= +golang.org/x/net v0.48.0 h1:zyQRTTrjc33Lhh0fBgT/H3oZq9WuvRR5gPC70xpDiQU= +golang.org/x/net v0.48.0/go.mod h1:+ndRgGjkh8FGtu1w1FGbEC31if4VrNVMuKTgcAAnQRY= +golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ= +golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE= +golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8= google.golang.org/genproto/googleapis/rpc v0.0.0-20251022142026-3a174f9686a8 h1:M1rk8KBnUsBDg1oPGHNCxG4vc1f49epmTO7xscSajMk= google.golang.org/genproto/googleapis/rpc v0.0.0-20251022142026-3a174f9686a8/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk= google.golang.org/grpc v1.76.0 h1:UnVkv1+uMLYXoIz6o7chp59WfQUYA2ex/BXQ9rHZu7A= google.golang.org/grpc v1.76.0/go.mod h1:Ju12QI8M6iQJtbcsV+awF5a4hfJMLi4X0JLo94ULZ6c= -google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE= -google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= +google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= +google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/kgo_test.go b/kgo_test.go index 93dcc97..2a208c3 100644 --- a/kgo_test.go +++ b/kgo_test.go @@ -3,16 +3,16 @@ package kgo_test import ( "context" "os" - "strings" "sync/atomic" "testing" "time" + "github.com/stretchr/testify/require" + "github.com/twmb/franz-go/pkg/kfake" kg "github.com/twmb/franz-go/pkg/kgo" kgo "go.unistack.org/micro-broker-kgo/v3" "go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v3/logger" - "go.unistack.org/micro/v3/logger/slog" "go.unistack.org/micro/v3/metadata" ) @@ -20,61 +20,55 @@ var ( msgcnt = int64(10) group = "38" prefill = true - loglevel = logger.DebugLevel + loglevel = logger.ErrorLevel + cluster *kfake.Cluster ) -var bm = &broker.Message{ - Header: map[string]string{"hkey": "hval", metadata.HeaderTopic: "test"}, - Body: []byte(`"body"`), +func TestMain(m *testing.M) { + cluster = kfake.MustCluster( + kfake.AllowAutoTopicCreation(), + ) + defer cluster.Close() + os.Exit(m.Run()) } -func TestFail(t *testing.T) { - if tr := os.Getenv("INTEGRATION_TESTS"); len(tr) > 0 { - t.Skip() - } - - logger.DefaultLogger = slog.NewLogger() - if err := logger.DefaultLogger.Init(logger.WithLevel(loglevel)); err != nil { - t.Fatal(err) - } - ctx := context.Background() - - var addrs []string - if addr := os.Getenv("BROKER_ADDRS"); len(addr) == 0 { - addrs = []string{"127.0.0.1:9092"} - } else { - addrs = strings.Split(addr, ",") - } - +func helperCreateBroker() *kgo.Broker { b := kgo.NewBroker( - broker.Addrs(addrs...), + broker.Addrs(cluster.ListenAddrs()...), kgo.CommitInterval(5*time.Second), kgo.Options(kg.ClientID("test"), kg.FetchMaxBytes(10*1024*1024), kg.AllowAutoTopicCreation(), ), ) + return b +} + +func TestFail(t *testing.T) { + ctx := context.Background() + err := logger.DefaultLogger.Init(logger.WithLevel(loglevel)) + require.Nil(t, err) + + b := helperCreateBroker() + t.Logf("broker init") - if err := b.Init(); err != nil { - t.Fatal(err) - } + require.Nil(t, b.Init()) t.Logf("broker connect") - if err := b.Connect(ctx); err != nil { - t.Fatal(err) - } + require.Nil(t, b.Connect(ctx)) defer func() { t.Logf("broker disconnect") - if err := b.Disconnect(ctx); err != nil { - t.Fatal(err) - } + require.Nil(t, b.Disconnect(ctx)) }() t.Logf("broker health %v", b.Health()) msgs := make([]*broker.Message, 0, msgcnt) for i := int64(0); i < msgcnt; i++ { - msgs = append(msgs, bm) + msgs = append(msgs, &broker.Message{ + Header: map[string]string{"hkey": "hval", metadata.HeaderTopic: "test"}, + Body: []byte(`"body"`), + }) } for _, msg := range msgs { @@ -83,7 +77,6 @@ func TestFail(t *testing.T) { break } } - // t.Skip() idx := int64(0) fn := func(msg broker.Event) error { @@ -97,13 +90,11 @@ func TestFail(t *testing.T) { broker.SubscribeAutoAck(true), broker.SubscribeGroup(group), broker.SubscribeBodyOnly(true)) - if err != nil { - t.Fatal(err) - } + + require.Nil(t, err) + defer func() { - if err := sub.Unsubscribe(ctx); err != nil { - t.Fatal(err) - } + require.Nil(t, sub.Unsubscribe(ctx)) }() for { @@ -117,92 +108,54 @@ func TestFail(t *testing.T) { } func TestConnect(t *testing.T) { - if tr := os.Getenv("INTEGRATION_TESTS"); len(tr) > 0 { - t.Skip() - } - - var addrs []string ctx := context.TODO() - b := kgo.NewBroker( - broker.Addrs(addrs...), - kgo.CommitInterval(5*time.Second), - kgo.Options(kg.ClientID("test"), kg.FetchMaxBytes(10*1024*1024)), - ) - if err := b.Init(); err != nil { - t.Fatal(err) - } + b := helperCreateBroker() - if err := b.Connect(ctx); err != nil { - t.Fatal(err) - } + require.Nil(t, b.Init()) + + require.Nil(t, b.Connect(ctx)) } func TestPubSub(t *testing.T) { - if tr := os.Getenv("INTEGRATION_TESTS"); len(tr) > 0 { - t.Skip() - } - - if err := logger.DefaultLogger.Init(logger.WithLevel(loglevel)); err != nil { - t.Fatal(err) - } ctx := context.Background() + err := logger.DefaultLogger.Init(logger.WithLevel(loglevel)) + require.Nil(t, err) - var addrs []string - if addr := os.Getenv("BROKER_ADDRS"); len(addr) == 0 { - addrs = []string{"127.0.0.1:29091", "127.0.0.2:29092", "127.0.0.3:29093"} - } else { - addrs = strings.Split(addr, ",") - } - - b := kgo.NewBroker( - broker.Addrs(addrs...), - kgo.CommitInterval(5*time.Second), - kgo.Options(kg.ClientID("test"), kg.FetchMaxBytes(10*1024*1024)), - ) - - if err := b.Init(); err != nil { - t.Fatal(err) - } - - if err := b.Connect(ctx); err != nil { - t.Fatal(err) - } + b := helperCreateBroker() + require.Nil(t, b.Init()) + require.Nil(t, b.Connect(ctx)) defer func() { - if err := b.Disconnect(ctx); err != nil { - t.Fatal(err) - } + require.Nil(t, b.Disconnect(ctx)) }() - if prefill { - msgs := make([]*broker.Message, 0, msgcnt) - for i := int64(0); i < msgcnt; i++ { - msgs = append(msgs, bm) - } - if err := b.BatchPublish(ctx, msgs); err != nil { - t.Fatal(err) + if prefill { + var msgs []*broker.Message + for i := int64(0); i < msgcnt; i++ { + msgs = append(msgs, &broker.Message{ + Header: map[string]string{"hkey": "hval", metadata.HeaderTopic: "test.pubsub"}, + Body: []byte(`"body"`), + }) } - // t.Skip() + require.Nil(t, b.BatchPublish(ctx, msgs)) } done := make(chan bool, 1) idx := int64(0) fn := func(msg broker.Event) error { atomic.AddInt64(&idx, 1) - // time.Sleep(200 * time.Millisecond) return msg.Ack() } - sub, err := b.Subscribe(ctx, "test", fn, + sub, err := b.Subscribe(ctx, "test.pubsub", fn, broker.SubscribeAutoAck(true), broker.SubscribeGroup(group), - broker.SubscribeBodyOnly(true)) - if err != nil { - t.Fatal(err) - } + broker.SubscribeBodyOnly(true), + ) + + require.Nil(t, err) + defer func() { - if err := sub.Unsubscribe(ctx); err != nil { - t.Fatal(err) - } + require.Nil(t, sub.Unsubscribe(ctx)) }() ticker := time.NewTicker(2 * time.Minute) @@ -210,14 +163,16 @@ func TestPubSub(t *testing.T) { pticker := time.NewTicker(1 * time.Second) defer pticker.Stop() + go func() { for { select { case <-pticker.C: if prc := atomic.LoadInt64(&idx); prc == msgcnt { + t.Log("everything is read") close(done) } else { - t.Logf("processed %v\n", prc) + t.Logf("processed %v of %v\n", prc, msgcnt) } case <-ticker.C: close(done) diff --git a/meter.go b/meter.go index 8ee7012..817d248 100644 --- a/meter.go +++ b/meter.go @@ -56,7 +56,7 @@ const ( labelNode = "node_id" labelSuccess = "success" - labelFaulure = "failure" + labelFailure = "failure" labelStatus = "status" labelTopic = "topic" ) @@ -68,7 +68,7 @@ func (m *hookMeter) OnGroupManageError(_ error) { func (m *hookMeter) OnBrokerConnect(meta kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) { node := strconv.Itoa(int(meta.NodeID)) if err != nil { - m.meter.Counter(metricBrokerConnects, labelNode, node, labelStatus, labelFaulure).Inc() + m.meter.Counter(metricBrokerConnects, labelNode, node, labelStatus, labelFailure).Inc() return } m.meter.Counter(metricBrokerConnects, labelNode, node, labelStatus, labelSuccess).Inc() diff --git a/subscriber.go b/subscriber.go index e221e90..e665cec 100644 --- a/subscriber.go +++ b/subscriber.go @@ -153,17 +153,40 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32) for topic, partitions := range lost { for _, partition := range partitions { tps := tp{topic, partition} + s.mu.Lock() pc, ok := s.consumers[tps] + if ok { + delete(s.consumers, tps) + } + s.mu.Unlock() if !ok || pc == nil { continue } - delete(s.consumers, tps) - close(pc.quit) + if s.kopts.Logger.V(logger.DebugLevel) { - s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] waiting for work to finish topic %s partition %d", topic, partition)) + s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] killing consumer topic %s partition %d", topic, partition)) } + + close(pc.quit) + wg.Add(1) - go func() { <-pc.done; wg.Done() }() + go func(c *consumer, t string, p int32) { + defer wg.Done() + + timeout := time.NewTimer(s.kopts.GracefulTimeout) + defer timeout.Stop() + + select { + case <-c.done: + if s.kopts.Logger.V(logger.DebugLevel) { + s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] consumer stopped topic %s partition %d", t, p)) + } + case <-timeout.C: + if s.kopts.Logger.V(logger.DebugLevel) { + s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] timeout waiting for consumer topic %s partition %d", t, p)) + } + } + }(pc, topic, partition) } } }