From aaf8c43e0409571b7a5d6bbde4b4b7f10a9e3231 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Wed, 26 Feb 2025 11:06:11 +0300 Subject: [PATCH] add ability to fail probes and fatal on broker errors Signed-off-by: Vasiliy Tolstov --- broker.go | 15 +++++++- go.mod | 22 ++++++------ go.sum | 20 +++++++++++ kadmtest.go | 65 ---------------------------------- kgo.go | 41 +++++++++++++++++----- kgo_test.go | 96 +++++++++++++++++++++++++++++++++++++++++++++++++-- options.go | 11 ++++++ subscriber.go | 6 +++- 8 files changed, 186 insertions(+), 90 deletions(-) delete mode 100644 kadmtest.go diff --git a/broker.go b/broker.go index b85eb54..fbd6772 100644 --- a/broker.go +++ b/broker.go @@ -1,15 +1,19 @@ package kgo import ( + "context" "net" "sync/atomic" "time" "github.com/twmb/franz-go/pkg/kgo" + "go.unistack.org/micro/v3/logger" ) type hookEvent struct { - connected *atomic.Uint32 + log logger.Logger + fatalOnError bool + connected *atomic.Uint32 } var ( @@ -24,12 +28,18 @@ var ( func (m *hookEvent) OnGroupManageError(err error) { if err != nil { m.connected.Store(0) + if m.fatalOnError { + m.log.Fatal(context.TODO(), "kgo.OnGroupManageError", err) + } } } func (m *hookEvent) OnBrokerConnect(_ kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) { if err != nil { m.connected.Store(0) + if m.fatalOnError { + m.log.Fatal(context.TODO(), "kgo.OnBrokerConnect", err) + } } } @@ -40,6 +50,9 @@ func (m *hookEvent) OnBrokerDisconnect(_ kgo.BrokerMetadata, _ net.Conn) { func (m *hookEvent) OnBrokerWrite(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) { if err != nil { m.connected.Store(0) + if m.fatalOnError { + m.log.Fatal(context.TODO(), "kgo.OnBrokerWrite", err) + } } } diff --git a/go.mod b/go.mod index 8e3ee5c..d77a9f0 100644 --- a/go.mod +++ b/go.mod @@ -1,27 +1,27 @@ module go.unistack.org/micro-broker-kgo/v3 -go 1.22.0 +go 1.23.0 require ( github.com/google/uuid v1.6.0 - github.com/twmb/franz-go v1.18.0 - github.com/twmb/franz-go/pkg/kadm v1.14.0 + github.com/twmb/franz-go v1.18.1 + github.com/twmb/franz-go/pkg/kadm v1.15.0 github.com/twmb/franz-go/pkg/kmsg v1.9.0 - go.opentelemetry.io/otel v1.33.0 - go.unistack.org/micro/v3 v3.11.37 + go.opentelemetry.io/otel v1.34.0 + go.unistack.org/micro/v3 v3.11.41 ) require ( github.com/ash3in/uuidv8 v1.2.0 // indirect - github.com/klauspost/compress v1.17.11 // indirect + github.com/klauspost/compress v1.18.0 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/matoous/go-nanoid v1.5.1 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect go.unistack.org/micro-proto/v3 v3.4.1 // indirect - golang.org/x/crypto v0.31.0 // indirect - golang.org/x/sys v0.28.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 // indirect - google.golang.org/grpc v1.69.2 // indirect - google.golang.org/protobuf v1.36.1 // indirect + golang.org/x/crypto v0.35.0 // indirect + golang.org/x/sys v0.30.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250224174004-546df14abb99 // indirect + google.golang.org/grpc v1.70.0 // indirect + google.golang.org/protobuf v1.36.5 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 561e443..a116105 100644 --- a/go.sum +++ b/go.sum @@ -13,6 +13,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -31,30 +33,48 @@ github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOf github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/twmb/franz-go v1.18.0 h1:25FjMZfdozBywVX+5xrWC2W+W76i0xykKjTdEeD2ejw= github.com/twmb/franz-go v1.18.0/go.mod h1:zXCGy74M0p5FbXsLeASdyvfLFsBvTubVqctIaa5wQ+I= +github.com/twmb/franz-go v1.18.1 h1:D75xxCDyvTqBSiImFx2lkPduE39jz1vaD7+FNc+vMkc= +github.com/twmb/franz-go v1.18.1/go.mod h1:Uzo77TarcLTUZeLuGq+9lNpSkfZI+JErv7YJhlDjs9M= github.com/twmb/franz-go/pkg/kadm v1.14.0 h1:nAn1co1lXzJQocpzyIyOFOjUBf4WHWs5/fTprXy2IZs= github.com/twmb/franz-go/pkg/kadm v1.14.0/go.mod h1:XjOPz6ZaXXjrW2jVCfLuucP8H1w2TvD6y3PT2M+aAM4= +github.com/twmb/franz-go/pkg/kadm v1.15.0 h1:Yo3NAPfcsx3Gg9/hdhq4vmwO77TqRRkvpUcGWzjworc= +github.com/twmb/franz-go/pkg/kadm v1.15.0/go.mod h1:MUdcUtnf9ph4SFBLLA/XxE29rvLhWYLM9Ygb8dfSCvw= github.com/twmb/franz-go/pkg/kmsg v1.9.0 h1:JojYUph2TKAau6SBtErXpXGC7E3gg4vGZMv9xFU/B6M= github.com/twmb/franz-go/pkg/kmsg v1.9.0/go.mod h1:CMbfazviCyY6HM0SXuG5t9vOwYDHRCSrJJyBAe5paqg= go.opentelemetry.io/otel v1.33.0 h1:/FerN9bax5LoK51X/sI0SVYrjSE0/yUL7DpxW4K3FWw= go.opentelemetry.io/otel v1.33.0/go.mod h1:SUUkR6csvUQl+yjReHu5uM3EtVV7MBm5FHKRlNx4I8I= +go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= +go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q= go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo= go.unistack.org/micro/v3 v3.11.37 h1:ZcpnXAYEMcAwmnVb5b7o8/PylGnILxXMHaUlRrPmRI0= go.unistack.org/micro/v3 v3.11.37/go.mod h1:POGU5hstnAT9LH70m8FalyQSNi2GfIew71K75JenIZk= +go.unistack.org/micro/v3 v3.11.41 h1:dP4sBLIZpMo+MWGe5bbESewK8wBzYm4Yik/67x4dEtQ= +go.unistack.org/micro/v3 v3.11.41/go.mod h1:POGU5hstnAT9LH70m8FalyQSNi2GfIew71K75JenIZk= golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= +golang.org/x/crypto v0.35.0 h1:b15kiHdrGCHrP6LvwaQ3c03kgNhhiMgvlhxHQhmg2Xs= +golang.org/x/crypto v0.35.0/go.mod h1:dy7dXNW32cAb/6/PRuTNsix8T+vJAqvuIy5Bli/x0YQ= golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= +golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 h1:Z7FRVJPSMaHQxD0uXU8WdgFh8PseLM8Q8NzhnpMrBhQ= google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484/go.mod h1:lcTa1sDdWEIHMWlITnIczmw5w60CF9ffkb8Z+DVmmjA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250224174004-546df14abb99 h1:ZSlhAUqC4r8TPzqLXQ0m3upBNZeF+Y8jQ3c4CR3Ujms= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250224174004-546df14abb99/go.mod h1:LuRYeWDFV6WOn90g357N17oMCaxpgCnbi/44qJvDn2I= google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU= google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= +google.golang.org/grpc v1.70.0 h1:pWFv03aZoHzlRKHWicjsZytKAiYCtNS0dHbXnIdq7jQ= +google.golang.org/grpc v1.70.0/go.mod h1:ofIJqVKDXx/JiXrwr2IG4/zwdH9txy3IlF40RmcJSQw= google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk= google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= +google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/kadmtest.go b/kadmtest.go deleted file mode 100644 index 474ef68..0000000 --- a/kadmtest.go +++ /dev/null @@ -1,65 +0,0 @@ -//go:build ignore - -package main - -import ( - "context" - "fmt" - "os" - "time" - - "github.com/twmb/franz-go/pkg/kadm" - "github.com/twmb/franz-go/pkg/kgo" - "github.com/twmb/franz-go/pkg/kversion" - - //"github.com/twmb/franz-go/pkg/sasl/scram" - "github.com/twmb/franz-go/pkg/sasl/plain" -) - -func die(msg string, args ...any) { - fmt.Fprintf(os.Stderr, msg, args...) - os.Exit(1) -} - -func main() { - seeds := []string{"vm-kafka-ump01tn.mbrd.ru:9092", "vm-kafka-ump02tn.mbrd.ru:9092", "vm-kafka-ump03tn.mbrd.ru:9092"} - - pass := "XXXXX" - user := "XXXXX" - - var adminClient *kadm.Client - { - client, err := kgo.NewClient( - kgo.SeedBrokers(seeds...), - // kgo.SASL((scram.Auth{User: user, Pass: pass}).AsSha512Mechanism()), - kgo.SASL((plain.Auth{User: user, Pass: pass}).AsMechanism()), - - // Do not try to send requests newer than 2.4.0 to avoid breaking changes in the request struct. - // Sometimes there are breaking changes for newer versions where more properties are required to set. - kgo.MaxVersions(kversion.V2_4_0()), - ) - if err != nil { - panic(err) - } - defer client.Close() - - adminClient = kadm.NewClient(client) - } - - ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) - defer cancel() - - dg, err := adminClient.DescribeGroups(ctx, "interestrate_loader") - if err != nil { - die("failed to describe group: %v", err) - } - - for _, m := range dg["interestrate_loader"].Members { - mc, _ := m.Assigned.AsConsumer() - for _, mt := range mc.Topics { - for _, p := range mt.Partitions { - fmt.Printf("client:%s\tpartitions: %d\n", m.ClientID, p) - } - } - } -} diff --git a/kgo.go b/kgo.go index ae62c35..90c87b6 100644 --- a/kgo.go +++ b/kgo.go @@ -109,11 +109,18 @@ func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, *ho } } + var fatalOnError bool + if k.opts.Context != nil { + if v, ok := k.opts.Context.Value(fatalOnErrorKey{}).(bool); ok && v { + fatalOnError = v + } + } + htracer := &hookTracer{group: group, clientID: clientID, tracer: k.opts.Tracer} opts = append(opts, kgo.WithHooks(&hookMeter{meter: k.opts.Meter}), kgo.WithHooks(htracer), - kgo.WithHooks(&hookEvent{connected: k.connected}), + kgo.WithHooks(&hookEvent{log: k.opts.Logger, fatalOnError: fatalOnError, connected: k.connected}), ) select { @@ -260,7 +267,7 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br k.connected.Store(1) } k.Unlock() - + fmt.Printf("EEE\n") options := broker.NewPublishOptions(opts...) records := make([]*kgo.Record, 0, len(msgs)) var errs []string @@ -315,7 +322,9 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br return nil } ts := time.Now() + fmt.Printf("SSSSSSEEE\n") results := k.c.ProduceSync(ctx, records...) + fmt.Printf("SSSSSS\n") te := time.Since(ts) for _, result := range results { k.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Update(te.Seconds()) @@ -374,13 +383,27 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han } } + var fatalOnError bool + if k.opts.Context != nil { + if v, ok := k.opts.Context.Value(fatalOnErrorKey{}).(bool); ok && v { + fatalOnError = v + } + } + + if options.Context != nil { + if v, ok := options.Context.Value(fatalOnErrorKey{}).(bool); ok && v { + fatalOnError = v + } + } + sub := &Subscriber{ - topic: topic, - opts: options, - handler: handler, - kopts: k.opts, - consumers: make(map[tp]*consumer), - done: make(chan struct{}), + topic: topic, + opts: options, + handler: handler, + kopts: k.opts, + consumers: make(map[tp]*consumer), + done: make(chan struct{}), + fatalOnError: fatalOnError, } kopts := append(k.kopts, @@ -454,7 +477,7 @@ func NewBroker(opts ...broker.Option) *Broker { kgo.BlockRebalanceOnPoll(), kgo.Balancers(kgo.CooperativeStickyBalancer()), kgo.FetchIsolationLevel(kgo.ReadUncommitted()), - kgo.UnknownTopicRetries(0), + kgo.UnknownTopicRetries(1), } if options.Context != nil { diff --git a/kgo_test.go b/kgo_test.go index cd072b6..7f09b05 100644 --- a/kgo_test.go +++ b/kgo_test.go @@ -13,14 +13,15 @@ import ( kgo "go.unistack.org/micro-broker-kgo/v3" "go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v3/logger" + "go.unistack.org/micro/v3/logger/slog" "go.unistack.org/micro/v3/metadata" ) var ( - msgcnt = int64(12000000) + msgcnt = int64(1200) group = "38" - prefill = false - loglevel = logger.InfoLevel + prefill = true + loglevel = logger.DebugLevel ) var bm = &broker.Message{ @@ -28,6 +29,94 @@ var bm = &broker.Message{ Body: []byte(`"body"`), } +func TestFail(t *testing.T) { + if tr := os.Getenv("INTEGRATION_TESTS"); len(tr) > 0 { + t.Skip() + } + + logger.DefaultLogger = slog.NewLogger() + if err := logger.DefaultLogger.Init(logger.WithLevel(loglevel)); err != nil { + t.Fatal(err) + } + ctx := context.Background() + + var addrs []string + if addr := os.Getenv("BROKER_ADDRS"); len(addr) == 0 { + addrs = []string{"127.0.0.1:9092"} + } else { + addrs = strings.Split(addr, ",") + } + + b := kgo.NewBroker( + broker.Addrs(addrs...), + kgo.CommitInterval(5*time.Second), + kgo.Options(kg.ClientID("test"), kg.FetchMaxBytes(10*1024*1024), + kg.AllowAutoTopicCreation(), + ), + ) + + t.Logf("broker init") + if err := b.Init(); err != nil { + t.Fatal(err) + } + + t.Logf("broker connect") + if err := b.Connect(ctx); err != nil { + t.Fatal(err) + } + + defer func() { + t.Logf("broker disconnect") + if err := b.Disconnect(ctx); err != nil { + t.Fatal(err) + } + }() + + t.Logf("broker health %v", b.Health()) + msgs := make([]*broker.Message, 0, msgcnt) + for i := int64(0); i < msgcnt; i++ { + msgs = append(msgs, bm) + } + + for _, msg := range msgs { + t.Logf("broker publish") + if err := b.Publish(ctx, "test", msg); err != nil { + t.Fatal(err) + } + } + // t.Skip() + + idx := int64(0) + fn := func(msg broker.Event) error { + atomic.AddInt64(&idx, 1) + time.Sleep(500 * time.Millisecond) + t.Logf("ack") + return msg.Ack() + } + + sub, err := b.Subscribe(ctx, "test", fn, + broker.SubscribeAutoAck(true), + broker.SubscribeGroup(group), + broker.SubscribeBodyOnly(true)) + if err != nil { + t.Fatal(err) + } + defer func() { + if err := sub.Unsubscribe(ctx); err != nil { + t.Fatal(err) + } + }() + + for { + t.Logf("health check") + if !b.Health() { + t.Logf("health works") + break + } + time.Sleep(1 * time.Second) + } +} + func TestConnect(t *testing.T) { var addrs []string ctx := context.TODO() @@ -67,6 +156,7 @@ func TestPubSub(t *testing.T) { kgo.CommitInterval(5*time.Second), kgo.Options(kg.ClientID("test"), kg.FetchMaxBytes(10*1024*1024)), ) + if err := b.Init(); err != nil { t.Fatal(err) } diff --git a/options.go b/options.go index 938bd1b..8f69673 100644 --- a/options.go +++ b/options.go @@ -72,6 +72,12 @@ func SubscribeOptions(opts ...kgo.Opt) broker.SubscribeOption { } } +type fatalOnErrorKey struct{} + +func FatalOnError(b bool) broker.Option { + return broker.SetOption(fatalOnErrorKey{}, b) +} + type clientIDKey struct{} func ClientID(id string) broker.Option { @@ -98,6 +104,11 @@ func SubscribeMaxInFlight(n int) broker.SubscribeOption { return broker.SetSubscribeOption(subscribeMaxInflightKey{}, n) } +// SubscribeMaxInFlight max queued messages +func SubscribeFatalOnError(b bool) broker.SubscribeOption { + return broker.SetSubscribeOption(fatalOnErrorKey{}, b) +} + type publishPromiseKey struct{} // PublishPromise set the kafka promise func for Produce diff --git a/subscriber.go b/subscriber.go index 64f5d11..a7b5fa8 100644 --- a/subscriber.go +++ b/subscriber.go @@ -50,7 +50,8 @@ type Subscriber struct { connected *atomic.Uint32 sync.RWMutex - closed bool + closed bool + fatalOnError bool } func (s *Subscriber) Client() *kgo.Client { @@ -174,6 +175,9 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32) func (s *Subscriber) autocommit(_ *kgo.Client, _ *kmsg.OffsetCommitRequest, _ *kmsg.OffsetCommitResponse, err error) { if err != nil { s.connected.Store(0) + if s.fatalOnError { + s.kopts.Logger.Fatal(context.TODO(), "kgo.AutoCommitCallback error", err) + } } }