From 3b6475b1aeabcca77b0459a2c45c57249b7b7630 Mon Sep 17 00:00:00 2001 From: Evstigneev Denis Date: Mon, 9 Jun 2025 21:35:24 +0300 Subject: [PATCH] try refactoring && decomposition subsribe && update deps --- broker.go | 8 +- event.go | 7 + go.mod | 20 +-- go.sum | 48 +++---- kgo.go | 192 +++++++++++++------------- subscriber.go | 362 ++++++++++++++++++++++++++++---------------------- 6 files changed, 342 insertions(+), 295 deletions(-) diff --git a/broker.go b/broker.go index bd045c4..67fb148 100644 --- a/broker.go +++ b/broker.go @@ -34,11 +34,11 @@ func (m *hookEvent) OnGroupManageError(err error) { } } -func (m *hookEvent) OnBrokerConnect(_ kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) { +func (m *hookEvent) OnBrokerConnect(_ kgo.BrokerMetadata, t time.Duration, _ net.Conn, err error) { if err != nil { // m.connected.Store(0) // if m.fatalOnError { - m.log.Error(context.TODO(), "kgo.OnBrokerConnect", err) + m.log.Error(context.TODO(), "kgo.OnBrokerConnect: "+ t.String(), err) //} } } @@ -56,10 +56,10 @@ func (m *hookEvent) OnBrokerWrite(_ kgo.BrokerMetadata, _ int16, _ int, _ time.D } } -func (m *hookEvent) OnBrokerRead(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) { +func (m *hookEvent) OnBrokerRead(_ kgo.BrokerMetadata, _ int16, _ int, t time.Duration, _ time.Duration, err error) { if err != nil { // m.connected.Store(0) - m.log.Error(context.TODO(), "kgo.OnBrokerRead", err) + m.log.Error(context.TODO(), "kgo.OnBrokerRead: "+ t.String(), err) } } diff --git a/event.go b/event.go index 9ac6ba9..60f57e2 100644 --- a/event.go +++ b/event.go @@ -48,3 +48,10 @@ var eventPool = sync.Pool{ return &event{msg: &broker.Message{}} }, } + +func (p *event) reset() { + clear(p.msg.Header) + p.msg.Body = p.msg.Body[:0] + p.err = nil + p.ack = false +} diff --git a/go.mod b/go.mod index 1da867e..3a19e87 100644 --- a/go.mod +++ b/go.mod @@ -1,13 +1,13 @@ module go.unistack.org/micro-broker-kgo/v3 -go 1.23.0 +go 1.23.8 require ( github.com/google/uuid v1.6.0 - github.com/twmb/franz-go v1.18.1 - github.com/twmb/franz-go/pkg/kadm v1.15.0 - github.com/twmb/franz-go/pkg/kmsg v1.9.0 - go.opentelemetry.io/otel v1.34.0 + github.com/twmb/franz-go v1.19.5 + github.com/twmb/franz-go/pkg/kadm v1.16.0 + github.com/twmb/franz-go/pkg/kmsg v1.11.2 + go.opentelemetry.io/otel v1.36.0 go.unistack.org/micro/v3 v3.11.44 ) @@ -18,10 +18,10 @@ require ( github.com/matoous/go-nanoid v1.5.1 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect go.unistack.org/micro-proto/v3 v3.4.1 // indirect - golang.org/x/crypto v0.35.0 // indirect - golang.org/x/sys v0.30.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20250224174004-546df14abb99 // indirect - google.golang.org/grpc v1.70.0 // indirect - google.golang.org/protobuf v1.36.5 // indirect + golang.org/x/crypto v0.38.0 // indirect + golang.org/x/sys v0.33.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250324211829-b45e905df463 // indirect + google.golang.org/grpc v1.73.0 // indirect + google.golang.org/protobuf v1.36.6 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 4af0b07..d85b07a 100644 --- a/go.sum +++ b/go.sum @@ -7,8 +7,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= @@ -29,32 +29,32 @@ github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/twmb/franz-go v1.18.1 h1:D75xxCDyvTqBSiImFx2lkPduE39jz1vaD7+FNc+vMkc= -github.com/twmb/franz-go v1.18.1/go.mod h1:Uzo77TarcLTUZeLuGq+9lNpSkfZI+JErv7YJhlDjs9M= -github.com/twmb/franz-go/pkg/kadm v1.15.0 h1:Yo3NAPfcsx3Gg9/hdhq4vmwO77TqRRkvpUcGWzjworc= -github.com/twmb/franz-go/pkg/kadm v1.15.0/go.mod h1:MUdcUtnf9ph4SFBLLA/XxE29rvLhWYLM9Ygb8dfSCvw= -github.com/twmb/franz-go/pkg/kmsg v1.9.0 h1:JojYUph2TKAau6SBtErXpXGC7E3gg4vGZMv9xFU/B6M= -github.com/twmb/franz-go/pkg/kmsg v1.9.0/go.mod h1:CMbfazviCyY6HM0SXuG5t9vOwYDHRCSrJJyBAe5paqg= -go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= -go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= +github.com/twmb/franz-go v1.19.5 h1:W7+o8D0RsQsedqib71OVlLeZ0zI6CbFra7yTYhZTs5Y= +github.com/twmb/franz-go v1.19.5/go.mod h1:4kFJ5tmbbl7asgwAGVuyG1ZMx0NNpYk7EqflvWfPCpM= +github.com/twmb/franz-go/pkg/kadm v1.16.0 h1:STMs1t5lYR5mR974PSiwNzE5TvsosByTp+rKXLOhAjE= +github.com/twmb/franz-go/pkg/kadm v1.16.0/go.mod h1:MUdcUtnf9ph4SFBLLA/XxE29rvLhWYLM9Ygb8dfSCvw= +github.com/twmb/franz-go/pkg/kmsg v1.11.2 h1:hIw75FpwcAjgeyfIGFqivAvwC5uNIOWRGvQgZhH4mhg= +github.com/twmb/franz-go/pkg/kmsg v1.11.2/go.mod h1:CFfkkLysDNmukPYhGzuUcDtf46gQSqCZHMW1T4Z+wDE= +go.opentelemetry.io/otel v1.36.0 h1:UumtzIklRBY6cI/lllNZlALOF5nNIzJVb16APdvgTXg= +go.opentelemetry.io/otel v1.36.0/go.mod h1:/TcFMXYjyRNh8khOAO9ybYkqaDBb/70aVwkNML4pP8E= go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q= go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo= go.unistack.org/micro/v3 v3.11.44 h1:A+T8zVcL2vlL66kn/Y4rqhtBybLO829wFEYZJYorDOU= go.unistack.org/micro/v3 v3.11.44/go.mod h1:13EFW2ps3BN9mpYbp9K0oQu/VDjEN6LJ4wwdom7hcXQ= -golang.org/x/crypto v0.35.0 h1:b15kiHdrGCHrP6LvwaQ3c03kgNhhiMgvlhxHQhmg2Xs= -golang.org/x/crypto v0.35.0/go.mod h1:dy7dXNW32cAb/6/PRuTNsix8T+vJAqvuIy5Bli/x0YQ= -golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= -golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= -golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= -golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM= -golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250224174004-546df14abb99 h1:ZSlhAUqC4r8TPzqLXQ0m3upBNZeF+Y8jQ3c4CR3Ujms= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250224174004-546df14abb99/go.mod h1:LuRYeWDFV6WOn90g357N17oMCaxpgCnbi/44qJvDn2I= -google.golang.org/grpc v1.70.0 h1:pWFv03aZoHzlRKHWicjsZytKAiYCtNS0dHbXnIdq7jQ= -google.golang.org/grpc v1.70.0/go.mod h1:ofIJqVKDXx/JiXrwr2IG4/zwdH9txy3IlF40RmcJSQw= -google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= -google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +golang.org/x/crypto v0.38.0 h1:jt+WWG8IZlBnVbomuhg2Mdq0+BBQaHbtqHEFEigjUV8= +golang.org/x/crypto v0.38.0/go.mod h1:MvrbAqul58NNYPKnOra203SB9vpuZW0e+RRZV+Ggqjw= +golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8= +golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= +golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= +golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.25.0 h1:qVyWApTSYLk/drJRO5mDlNYskwQznZmkpV2c8q9zls4= +golang.org/x/text v0.25.0/go.mod h1:WEdwpYrmk1qmdHvhkSTNPm3app7v4rsT8F2UD6+VHIA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250324211829-b45e905df463 h1:e0AIkUUhxyBKh6ssZNrAMeqhA7RKUj42346d1y02i2g= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250324211829-b45e905df463/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= +google.golang.org/grpc v1.73.0 h1:VIWSmpI2MegBtTuFt5/JWy2oXxtjJ/e89Z70ImfD2ok= +google.golang.org/grpc v1.73.0/go.mod h1:50sbHOUqWoCQGI8V2HQLJM0B+LMlIUjNSZmow7EVBQc= +google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= +google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/kgo.go b/kgo.go index 6ad858f..051d2ce 100644 --- a/kgo.go +++ b/kgo.go @@ -71,31 +71,31 @@ type Broker struct { init bool } -func (r *Broker) Live() bool { - return r.connected.Load() == 1 +func (b *Broker) Live() bool { + return b.connected.Load() == 1 } -func (r *Broker) Ready() bool { - return r.connected.Load() == 1 +func (b *Broker) Ready() bool { + return b.connected.Load() == 1 } -func (r *Broker) Health() bool { - return r.connected.Load() == 1 +func (b *Broker) Health() bool { + return b.connected.Load() == 1 } -func (k *Broker) Address() string { - return strings.Join(k.opts.Addrs, ",") +func (b *Broker) Address() string { + return strings.Join(b.opts.Addrs, ",") } -func (k *Broker) Name() string { - return k.opts.Name +func (b *Broker) Name() string { + return b.opts.Name } -func (k *Broker) Client() *kgo.Client { - return k.c +func (b *Broker) Client() *kgo.Client { + return b.c } -func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, *hookTracer, error) { +func (b *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, *hookTracer, error) { var c *kgo.Client var err error @@ -103,27 +103,27 @@ func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, *ho clientID := "kgo" group := "" - if k.opts.Context != nil { - if id, ok := k.opts.Context.Value(clientIDKey{}).(string); ok { + if b.opts.Context != nil { + if id, ok := b.opts.Context.Value(clientIDKey{}).(string); ok { clientID = id } - if id, ok := k.opts.Context.Value(groupKey{}).(string); ok { + if id, ok := b.opts.Context.Value(groupKey{}).(string); ok { group = id } } var fatalOnError bool - if k.opts.Context != nil { - if v, ok := k.opts.Context.Value(fatalOnErrorKey{}).(bool); ok && v { + if b.opts.Context != nil { + if v, ok := b.opts.Context.Value(fatalOnErrorKey{}).(bool); ok && v { fatalOnError = v } } - htracer := &hookTracer{group: group, clientID: clientID, tracer: k.opts.Tracer} + htracer := &hookTracer{group: group, clientID: clientID, tracer: b.opts.Tracer} opts = append(opts, - kgo.WithHooks(&hookMeter{meter: k.opts.Meter}), + kgo.WithHooks(&hookMeter{meter: b.opts.Meter}), kgo.WithHooks(htracer), - kgo.WithHooks(&hookEvent{log: k.opts.Logger, fatalOnError: fatalOnError, connected: k.connected}), + kgo.WithHooks(&hookEvent{log: b.opts.Logger, fatalOnError: fatalOnError, connected: b.connected}), ) select { @@ -145,7 +145,7 @@ func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, *ho } return nil, nil, err } - k.connected.Store(1) + b.connected.Store(1) if fatalOnError { go func() { @@ -154,9 +154,9 @@ func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, *ho tc := mjitter.NewTicker(500*time.Millisecond, 1*time.Second) defer tc.Stop() for range tc.C { - if k.connected.Load() == 0 { + if b.connected.Load() == 0 { if n > c { - k.opts.Logger.Fatal(context.Background(), "broker fatal error") + b.opts.Logger.Fatal(context.Background(), "broker fatal error") } n++ } else { @@ -169,117 +169,117 @@ func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, *ho } } -func (k *Broker) Connect(ctx context.Context) error { - if k.connected.Load() == 1 { +func (b *Broker) Connect(ctx context.Context) error { + if b.connected.Load() == 1 { return nil } - nctx := k.opts.Context + nctx := b.opts.Context if ctx != nil { nctx = ctx } - c, _, err := k.connect(nctx, k.kopts...) + c, _, err := b.connect(nctx, b.kopts...) if err != nil { return err } - k.Lock() - k.c = c - k.connected.Store(1) - k.Unlock() + b.Lock() + b.c = c + b.connected.Store(1) + b.Unlock() return nil } -func (k *Broker) Disconnect(ctx context.Context) error { - if k.connected.Load() == 0 { +func (b *Broker) Disconnect(ctx context.Context) error { + if b.connected.Load() == 0 { return nil } - nctx := k.opts.Context + nctx := b.opts.Context if ctx != nil { nctx = ctx } var span tracer.Span - ctx, span = k.opts.Tracer.Start(ctx, "Disconnect") + ctx, span = b.opts.Tracer.Start(ctx, "Disconnect") defer span.Finish() - k.Lock() - defer k.Unlock() + b.Lock() + defer b.Unlock() select { case <-nctx.Done(): return nctx.Err() default: - for _, sub := range k.subs { - if sub.closed { + for _, sub := range b.subs { + if sub.closed.Load() { continue } if err := sub.Unsubscribe(ctx); err != nil { return err } } - if k.c != nil { - k.c.CloseAllowingRebalance() - // k.c.Close() + if b.c != nil { + b.c.CloseAllowingRebalance() + // b.c.Close() } } - k.connected.Store(0) - close(k.done) + b.connected.Store(0) + close(b.done) return nil } -func (k *Broker) Init(opts ...broker.Option) error { - k.Lock() - defer k.Unlock() +func (b *Broker) Init(opts ...broker.Option) error { + b.Lock() + defer b.Unlock() - if len(opts) == 0 && k.init { + if len(opts) == 0 && b.init { return nil } for _, o := range opts { - o(&k.opts) + o(&b.opts) } - if err := k.opts.Register.Init(); err != nil { + if err := b.opts.Register.Init(); err != nil { return err } - if err := k.opts.Tracer.Init(); err != nil { + if err := b.opts.Tracer.Init(); err != nil { return err } - if err := k.opts.Logger.Init(); err != nil { + if err := b.opts.Logger.Init(); err != nil { return err } - if err := k.opts.Meter.Init(); err != nil { + if err := b.opts.Meter.Init(); err != nil { return err } - if k.opts.Context != nil { - if v, ok := k.opts.Context.Value(optionsKey{}).([]kgo.Opt); ok && len(v) > 0 { - k.kopts = append(k.kopts, v...) + if b.opts.Context != nil { + if v, ok := b.opts.Context.Value(optionsKey{}).([]kgo.Opt); ok && len(v) > 0 { + b.kopts = append(b.kopts, v...) } } - k.init = true + b.init = true return nil } -func (k *Broker) Options() broker.Options { - return k.opts +func (b *Broker) Options() broker.Options { + return b.opts } -func (k *Broker) BatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { - return k.publish(ctx, msgs, opts...) +func (b *Broker) BatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { + return b.publish(ctx, msgs, opts...) } -func (k *Broker) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error { +func (b *Broker) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error { msg.Header.Set(metadata.HeaderTopic, topic) - return k.publish(ctx, []*broker.Message{msg}, opts...) + return b.publish(ctx, []*broker.Message{msg}, opts...) } -func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { +func (b *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { options := broker.NewPublishOptions(opts...) records := make([]*kgo.Record, 0, len(msgs)) var errs []string @@ -302,12 +302,12 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic) msg.Header.Del(metadata.HeaderTopic) - k.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Inc() - if options.BodyOnly || k.opts.Codec.String() == "noop" { + b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Inc() + if options.BodyOnly || b.opts.Codec.String() == "noop" { rec.Value = msg.Body setHeaders(rec, msg.Header) } else { - rec.Value, err = k.opts.Codec.Marshal(msg) + rec.Value, err = b.opts.Codec.Marshal(msg) if err != nil { return err } @@ -318,15 +318,15 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br if promise != nil { ts := time.Now() for _, rec := range records { - k.c.Produce(ctx, rec, func(r *kgo.Record, err error) { + b.c.Produce(ctx, rec, func(r *kgo.Record, err error) { te := time.Since(ts) - k.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Dec() - k.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", rec.Topic, "topic", rec.Topic).Update(te.Seconds()) - k.opts.Meter.Histogram(semconv.PublishMessageDurationSeconds, "endpoint", rec.Topic, "topic", rec.Topic).Update(te.Seconds()) + b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Dec() + b.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", rec.Topic, "topic", rec.Topic).Update(te.Seconds()) + b.opts.Meter.Histogram(semconv.PublishMessageDurationSeconds, "endpoint", rec.Topic, "topic", rec.Topic).Update(te.Seconds()) if err != nil { - k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "failure").Inc() + b.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "failure").Inc() } else { - k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "success").Inc() + b.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "success").Inc() } promise(r, err) }) @@ -335,18 +335,18 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br } ts := time.Now() - results := k.c.ProduceSync(ctx, records...) + results := b.c.ProduceSync(ctx, records...) te := time.Since(ts) for _, result := range results { - k.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Update(te.Seconds()) - k.opts.Meter.Histogram(semconv.PublishMessageDurationSeconds, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Update(te.Seconds()) - k.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Dec() + b.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Update(te.Seconds()) + b.opts.Meter.Histogram(semconv.PublishMessageDurationSeconds, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Update(te.Seconds()) + b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Dec() if result.Err != nil { - k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "failure").Inc() + b.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "failure").Inc() errs = append(errs, result.Err.Error()) } else { - k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "success").Inc() + b.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "success").Inc() } } @@ -357,13 +357,13 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br return nil } -func (k *Broker) TopicExists(ctx context.Context, topic string) error { +func (b *Broker) TopicExists(ctx context.Context, topic string) error { mdreq := kmsg.NewMetadataRequest() mdreq.Topics = []kmsg.MetadataRequestTopic{ {Topic: &topic}, } - mdrsp, err := mdreq.RequestWith(ctx, k.c) + mdrsp, err := mdreq.RequestWith(ctx, b.c) if err != nil { return err } else if mdrsp.Topics[0].ErrorCode != 0 { @@ -373,11 +373,11 @@ func (k *Broker) TopicExists(ctx context.Context, topic string) error { return nil } -func (k *Broker) BatchSubscribe(_ context.Context, _ string, _ broker.BatchHandler, _ ...broker.SubscribeOption) (broker.Subscriber, error) { +func (b *Broker) BatchSubscribe(_ context.Context, _ string, _ broker.BatchHandler, _ ...broker.SubscribeOption) (broker.Subscriber, error) { return nil, nil } -func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { +func (b *Broker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { options := broker.NewSubscribeOptions(opts...) if options.Group == "" { @@ -389,15 +389,15 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han } commitInterval := DefaultCommitInterval - if k.opts.Context != nil { - if v, ok := k.opts.Context.Value(commitIntervalKey{}).(time.Duration); ok && v > 0 { + if b.opts.Context != nil { + if v, ok := b.opts.Context.Value(commitIntervalKey{}).(time.Duration); ok && v > 0 { commitInterval = v } } var fatalOnError bool - if k.opts.Context != nil { - if v, ok := k.opts.Context.Value(fatalOnErrorKey{}).(bool); ok && v { + if b.opts.Context != nil { + if v, ok := b.opts.Context.Value(fatalOnErrorKey{}).(bool); ok && v { fatalOnError = v } } @@ -406,14 +406,14 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han topic: topic, opts: options, handler: handler, - kopts: k.opts, + kopts: b.opts, consumers: make(map[tp]*consumer), done: make(chan struct{}), fatalOnError: fatalOnError, - connected: k.connected, + connected: b.connected, } - kopts := append(k.kopts, + kopts := append(b.kopts, kgo.ConsumerGroup(options.Group), kgo.ConsumeTopics(topic), kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()), @@ -433,7 +433,7 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han } } - c, htracer, err := k.connect(ctx, kopts...) + c, htracer, err := b.connect(ctx, kopts...) if err != nil { return nil, err } @@ -455,13 +455,13 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han go sub.poll(ctx) - k.Lock() - k.subs = append(k.subs, sub) - k.Unlock() + b.Lock() + b.subs = append(b.subs, sub) + b.Unlock() return sub, nil } -func (k *Broker) String() string { +func (b *Broker) String() string { return "kgo" } diff --git a/subscriber.go b/subscriber.go index e0066cd..d5ac459 100644 --- a/subscriber.go +++ b/subscriber.go @@ -14,6 +14,7 @@ import ( "go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/metadata" + "go.unistack.org/micro/v3/meter" "go.unistack.org/micro/v3/semconv" "go.unistack.org/micro/v3/tracer" ) @@ -58,10 +59,11 @@ type Subscriber struct { kopts broker.Options opts broker.SubscribeOptions - closed bool fatalOnError bool + closed atomic.Bool sync.RWMutex + sync.WaitGroup } func (s *Subscriber) Client() *kgo.Client { @@ -77,10 +79,12 @@ func (s *Subscriber) Topic() string { } func (s *Subscriber) Unsubscribe(ctx context.Context) error { - if s.closed { + if s.closed.Load() { return nil } + s.Wait() + s.c.PauseFetchTopics(s.topic) s.c.CloseAllowingRebalance() kc := make(map[string][]int32) @@ -89,8 +93,9 @@ func (s *Subscriber) Unsubscribe(ctx context.Context) error { } s.killConsumers(ctx, kc) close(s.done) - s.closed = true + s.closed.Store(true) s.c.ResumeFetchTopics(s.topic) + s.c.Close() return nil } @@ -103,39 +108,8 @@ func (s *Subscriber) poll(ctx context.Context) { } } - go func() { - ac := kadm.NewClient(s.c) - ticker := time.NewTicker(DefaultStatsInterval) - - for { - select { - case <-ctx.Done(): - ticker.Stop() - return - case <-ticker.C: - dgls, err := ac.Lag(ctx, s.opts.Group) - if err != nil || !dgls.Ok() { - continue - } - - dgl, ok := dgls[s.opts.Group] - if !ok { - continue - } - lmap, ok := dgl.Lag[s.topic] - if !ok { - continue - } - - s.Lock() - for p, l := range lmap { - s.kopts.Meter.Counter(semconv.BrokerGroupLag, "topic", s.topic, "group", s.opts.Group, "partition", strconv.Itoa(int(p))).Set(uint64(l.Lag)) - } - s.Unlock() - - } - } - }() + s.Add(1) + go s.pollLag(ctx) for { select { @@ -146,27 +120,93 @@ func (s *Subscriber) poll(ctx context.Context) { return default: fetches := s.c.PollRecords(ctx, maxInflight) - if !s.closed && fetches.IsClientClosed() { - s.closed = true + if !s.closed.Load() && fetches.IsClientClosed() { + s.closed.Store(true) return } + fetches.EachError(func(t string, p int32, err error) { s.kopts.Logger.Fatal(ctx, fmt.Sprintf("[kgo] fetch topic %s partition %d error", t, p), err) }) fetches.EachPartition(func(p kgo.FetchTopicPartition) { tps := tp{p.Topic, p.Partition} - s.consumers[tps].recs <- p + if consumer, ok := s.consumers[tps]; ok { + select { + case consumer.recs <- p: + default: + if s.kopts.Logger.V(logger.WarnLevel) { + s.kopts.Logger.Warn(ctx, fmt.Sprintf("[kgo] consumer channel full topic %s partition %d", p.Topic, p.Partition)) + } + } + } }) s.c.AllowRebalance() } } } +func (s *Subscriber) pollLag(ctx context.Context) { + ac := kadm.NewClient(s.c) + ticker := time.NewTicker(DefaultStatsInterval) + defer func() { + s.Done() + ticker.Stop() + }() + + // кеш ключей метрик lag: map[partition]metricCounter + type lagMetric struct { + counter meter.Counter + lastLag int64 + } + lagCache := make(map[int32]*lagMetric) + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + dgls, err := ac.Lag(ctx, s.opts.Group) + if err != nil || !dgls.Ok() { + continue + } + + dgl, ok := dgls[s.opts.Group] + if !ok { + continue + } + + lmap, ok := dgl.Lag[s.topic] + if !ok { + continue + } + + s.Lock() + for p, l := range lmap { + lagVal := l.Lag + if metric, exists := lagCache[p]; exists { + if metric.lastLag != lagVal { + metric.counter.Set(uint64(lagVal)) + metric.lastLag = lagVal + } + } else { + counter := s.kopts.Meter.Counter(semconv.BrokerGroupLag, "topic", s.topic, "group", s.opts.Group, "partition", strconv.Itoa(int(p))) + counter.Set(uint64(lagVal)) + lagCache[p] = &lagMetric{ + counter: counter, + lastLag: lagVal, + } + } + } + s.Unlock() + } + } +} + func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32) { var wg sync.WaitGroup - defer wg.Wait() + s.Lock() for topic, partitions := range lost { for _, partition := range partitions { tps := tp{topic, partition} @@ -180,9 +220,16 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32) s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] waiting for work to finish topic %s partition %d", topic, partition)) } wg.Add(1) - go func() { <-pc.done; wg.Done() }() + + go func(pc *consumer) { + defer wg.Done() + <-pc.done + }(pc) } } + s.Unlock() + + wg.Wait() } func (s *Subscriber) autocommit(_ *kgo.Client, _ *kmsg.OffsetCommitRequest, _ *kmsg.OffsetCommitResponse, err error) { @@ -254,126 +301,119 @@ func (pc *consumer) consume() { case <-pc.quit: return case p := <-pc.recs: - for _, record := range p.Records { - ctx, sp := pc.htracer.WithProcessSpan(record) - ts := time.Now() - pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Inc() - p := eventPool.Get().(*event) - p.msg.Header = nil - p.msg.Body = nil - p.topic = record.Topic - p.err = nil - p.ack = false - p.msg.Header = metadata.New(len(record.Headers)) - p.ctx = ctx - for _, hdr := range record.Headers { - p.msg.Header.Set(hdr.Key, string(hdr.Value)) - } - if pc.kopts.Codec.String() == "noop" { - p.msg.Body = record.Value - } else if pc.opts.BodyOnly { - p.msg.Body = record.Value - } else { - if sp != nil { - sp.AddEvent("codec unmarshal start") - } - err := pc.kopts.Codec.Unmarshal(record.Value, p.msg) - if sp != nil { - sp.AddEvent("codec unmarshal stop") - } - if err != nil { - if sp != nil { - sp.SetStatus(tracer.SpanStatusError, err.Error()) - } - pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "failure").Inc() - p.err = err - p.msg.Body = record.Value - if eh != nil { - _ = eh(p) - pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Dec() - if p.ack { - pc.c.MarkCommitRecords(record) - } else { - eventPool.Put(p) - // pc.connected.Store(0) - pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?") - return - } - eventPool.Put(p) - te := time.Since(ts) - pc.kopts.Meter.Summary(semconv.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds()) - pc.kopts.Meter.Histogram(semconv.SubscribeMessageDurationSeconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds()) - continue - } else { - pc.kopts.Logger.Error(pc.kopts.Context, "[kgo]: unmarshal error", err) - } - te := time.Since(ts) - pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Dec() - pc.kopts.Meter.Summary(semconv.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds()) - pc.kopts.Meter.Histogram(semconv.SubscribeMessageDurationSeconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds()) - eventPool.Put(p) - // pc.connected.Store(0) - pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?") - if sp != nil { - sp.Finish() - } - return - } - } - if sp != nil { - sp.AddEvent("handler start") - } - err := pc.handler(p) - if sp != nil { - sp.AddEvent("handler stop") - } - if err == nil { - pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "success").Inc() - } else { - if sp != nil { - sp.SetStatus(tracer.SpanStatusError, err.Error()) - } - pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "failure").Inc() - } - pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Dec() - if err == nil && pc.opts.AutoAck { - p.ack = true - } else if err != nil { - p.err = err - if eh != nil { - if sp != nil { - sp.AddEvent("error handler start") - } - _ = eh(p) - if sp != nil { - sp.AddEvent("error handler stop") - } - } else { - if pc.kopts.Logger.V(logger.ErrorLevel) { - pc.kopts.Logger.Error(pc.kopts.Context, "[kgo]: subscriber error", err) - } - } - } - te := time.Since(ts) - pc.kopts.Meter.Summary(semconv.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds()) - pc.kopts.Meter.Histogram(semconv.SubscribeMessageDurationSeconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds()) - if p.ack { - eventPool.Put(p) - pc.c.MarkCommitRecords(record) - } else { - eventPool.Put(p) - // pc.connected.Store(0) - pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?") - if sp != nil { - sp.SetStatus(tracer.SpanStatusError, "ErrLostMessage") - sp.Finish() - } - return - } - if sp != nil { - sp.Finish() - } - } + pc.processBatch(p, eh) } } + +} + +func (pc *consumer) processBatch(p kgo.FetchTopicPartition, eh broker.Handler) { + var successCount, failureCount int + topic := pc.topic + + for _, record := range p.Records { + ts := time.Now() + pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", topic, "topic", topic).Inc() + err := pc.handleRecord(record, eh) + pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", topic, "topic", topic).Dec() + te := time.Since(ts) + pc.kopts.Meter.Summary(semconv.SubscribeMessageLatencyMicroseconds, "endpoint", topic, "topic", topic).Update(te.Seconds()) + pc.kopts.Meter.Histogram(semconv.SubscribeMessageDurationSeconds, "endpoint", topic, "topic", topic).Update(te.Seconds()) + if err == nil { + successCount++ + } else { + failureCount++ + } + } + + if successCount > 0 { + pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "status", "success", "topic", topic).Add(successCount) + } + if failureCount > 0 { + pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "status", "failure", "topic", topic).Add(failureCount) + } + +} + +func (pc *consumer) handleRecord(record *kgo.Record, eh broker.Handler) error { + ctx, sp := pc.htracer.WithProcessSpan(record) + p := eventPool.Get().(*event) + p.reset() + + defer func() { + eventPool.Put(p) + if sp != nil { + sp.Finish() + } + }() + + p.topic = record.Topic + p.ctx = ctx + + p.msg.Header = metadata.New(len(record.Headers)) + for _, hdr := range record.Headers { + p.msg.Header.Set(hdr.Key, string(hdr.Value)) + } + + if pc.kopts.Codec.String() == "noop" || pc.opts.BodyOnly { + p.msg.Body = record.Value + } else { + if sp != nil { + sp.AddEvent("codec unmarshal start") + } + err := pc.kopts.Codec.Unmarshal(record.Value, p.msg) + if sp != nil { + sp.AddEvent("codec unmarshal stop") + } + if err != nil { + if sp != nil { + sp.SetStatus(tracer.SpanStatusError, err.Error()) + } + p.err = err + p.msg.Body = record.Value + if eh != nil { + _ = eh(p) + if p.ack { + pc.c.MarkCommitRecords(record) + } else { + pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?") + } + return err + } + pc.kopts.Logger.Error(pc.kopts.Context, "[kgo]: unmarshal error", err) + return err + } + } + + if sp != nil { + sp.AddEvent("handler start") + } + err := pc.handler(p) + if sp != nil { + sp.AddEvent("handler stop") + } + + if err == nil { + if pc.opts.AutoAck { + p.ack = true + } + } else { + if sp != nil { + sp.SetStatus(tracer.SpanStatusError, err.Error()) + } + p.err = err + if eh != nil { + _ = eh(p) + } else if pc.kopts.Logger.V(logger.ErrorLevel) { + pc.kopts.Logger.Error(pc.kopts.Context, "[kgo]: subscriber error", err) + } + } + + if p.ack { + pc.c.MarkCommitRecords(record) + } else { + pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?") + } + + return nil }