From c89df95fdcb478e94698f16147d8ba46c1269dd3 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Wed, 15 Sep 2021 18:30:07 +0300 Subject: [PATCH] fixup Signed-off-by: Vasiliy Tolstov --- go.mod | 9 +-- go.sum | 17 +++-- kgo.go | 138 +++++-------------------------------- kgo_test.go | 7 +- options.go | 9 +++ util.go | 193 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 241 insertions(+), 132 deletions(-) create mode 100644 util.go diff --git a/go.mod b/go.mod index ef52282..5fe2d15 100644 --- a/go.mod +++ b/go.mod @@ -3,10 +3,11 @@ module github.com/unistack-org/micro-broker-kgo/v3 go 1.16 require ( - github.com/twmb/franz-go v0.11.1 - github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210903012929-a6c6e3b9e991 // indirect + github.com/klauspost/compress v1.13.6 // indirect + github.com/twmb/franz-go v1.0.0 + github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210914174821-2f676c0a574b // indirect github.com/unistack-org/micro-codec-json/v3 v3.2.5 - github.com/unistack-org/micro/v3 v3.7.0 - golang.org/x/sync v0.0.0-20210220032951-036812b2e83c + github.com/unistack-org/micro/v3 v3.7.1 + google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect google.golang.org/protobuf v1.27.1 // indirect ) diff --git a/go.sum b/go.sum index 20f16b4..5d6af2e 100644 --- a/go.sum +++ b/go.sum @@ -41,6 +41,8 @@ github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aW github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/klauspost/compress v1.13.5 h1:9O69jUPDcsT9fEm74W92rZL9FQY7rCdaXVneq+yyzl4= github.com/klauspost/compress v1.13.5/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc= +github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pierrec/lz4/v4 v4.1.8 h1:ieHkV+i2BRzngO4Wd/3HGowuZStgq6QkPsD1eolNAO4= github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= @@ -53,20 +55,25 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/twmb/franz-go v0.11.1 h1:9NbczLCWztTtlEhEXQKNvCLZUxsIMoKKJUlRstSNgB0= -github.com/twmb/franz-go v0.11.1/go.mod h1:cdFLk8d/5/ox88y38xgiDKP3Yo338OO0t5QbTEM2K6I= +github.com/twmb/franz-go v1.0.0 h1:JcsqEImDhr7H/eQx/V58fwzjmVi2FRwz6dyylmdJ0NU= +github.com/twmb/franz-go v1.0.0/go.mod h1:cdFLk8d/5/ox88y38xgiDKP3Yo338OO0t5QbTEM2K6I= +github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210901051457-3c197a133ddd h1:o+cb+mqRFpVKrusJy/Ebt4zTWn94nPbI1ooMeNhovqM= github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210901051457-3c197a133ddd/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= -github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210903012929-a6c6e3b9e991 h1:NzPFcZyF5GhGIpxmFf/KL9XtIko6+q8u68uAi7ayoRU= -github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210903012929-a6c6e3b9e991/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= +github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210914174821-2f676c0a574b h1:7d6eRt9HEqXVxMzD2fry9qtJ0kRkgeJ5olqW9K+aXv8= +github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210914174821-2f676c0a574b/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= github.com/twmb/go-rbtree v1.0.0 h1:KxN7dXJ8XaZ4cvmHV1qqXTshxX3EBvX/toG5+UR49Mg= github.com/twmb/go-rbtree v1.0.0/go.mod h1:UlIAI8gu3KRPkXSobZnmJfVwCJgEhD/liWzT5ppzIyc= github.com/unistack-org/micro-codec-json/v3 v3.2.5 h1:WOilhbL0YSu58iIQIIxpawRYZyx6CR16tCpbX4ai3Vc= github.com/unistack-org/micro-codec-json/v3 v3.2.5/go.mod h1:LSzfrD9GYWCl6KOyihywx1wlbOgStrpyy3NVHNZAvHA= github.com/unistack-org/micro-proto v0.0.8 h1:g4UZGQGeYGI3CFJtjuEm47aouYPviG8SDhSifl0831w= github.com/unistack-org/micro-proto v0.0.8/go.mod h1:GYO53DWmeldRIo90cAdQx8bLr/WJMxW62W4ja74p1Ac= +github.com/unistack-org/micro-proto v0.0.9 h1:KrWLS4FUX7UAWNAilQf70uad6ZPf/0EudeddCXllRVc= +github.com/unistack-org/micro-proto v0.0.9/go.mod h1:Cckwmzd89gvS7ThxzZp9kQR/EOdksFQcsTAtDDyKwrg= github.com/unistack-org/micro/v3 v3.3.19/go.mod h1:LXmPfbJnJNvL0kQs8HfnkV3Wya2Wb+C7keVq++RCZnk= github.com/unistack-org/micro/v3 v3.7.0 h1:jEEegoVh1VIgT/+4gHw3TmwI3p7ufAdV37RVNxx9kNc= github.com/unistack-org/micro/v3 v3.7.0/go.mod h1:xXGbjNQShqlth0hv+q7ijGvciXGVqxUnVdkFm0t95rk= +github.com/unistack-org/micro/v3 v3.7.1 h1:gjCon1U8i9upNgw9+iEgbZh2LCeBizDYotQ+THHV0lo= +github.com/unistack-org/micro/v3 v3.7.1/go.mod h1:gBoY6gvzeFiJTZ4FgDttGNSs4Y1+1PRg2cV1yTRMSlg= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= @@ -87,8 +94,6 @@ golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAG golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= -golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/kgo.go b/kgo.go index 82f1303..42a538a 100644 --- a/kgo.go +++ b/kgo.go @@ -4,7 +4,6 @@ package kgo import ( "context" "fmt" - "math/rand" "strings" "sync" "time" @@ -15,15 +14,8 @@ import ( "github.com/unistack-org/micro/v3/metadata" "github.com/unistack-org/micro/v3/util/id" mrand "github.com/unistack-org/micro/v3/util/rand" - "golang.org/x/sync/errgroup" ) -var pPool = sync.Pool{ - New: func() interface{} { - return &publication{msg: &broker.Message{}} - }, -} - type kBroker struct { writer *kgo.Client // used only to push messages kopts []kgo.Opt @@ -43,6 +35,7 @@ type subscriber struct { batchhandler broker.BatchHandler closed bool done chan struct{} + consumers map[string]map[int32]worker sync.RWMutex } @@ -54,10 +47,6 @@ type publication struct { ack bool } -func init() { - rand.Seed(time.Now().UnixNano()) -} - func (p *publication) Topic() string { return p.topic } @@ -125,7 +114,8 @@ func (k *kBroker) Connect(ctx context.Context) error { kaddrs := k.opts.Addrs // shuffle addrs - rand.Shuffle(len(kaddrs), func(i, j int) { + var rng mrand.Rand + rng.Shuffle(len(kaddrs), func(i, j int) { kaddrs[i], kaddrs[j] = kaddrs[j], kaddrs[i] }) @@ -324,7 +314,8 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha kaddrs := k.opts.Addrs // shuffle addrs - rand.Shuffle(len(kaddrs), func(i, j int) { + var rng mrand.Rand + rng.Shuffle(len(kaddrs), func(i, j int) { kaddrs[i], kaddrs[j] = kaddrs[j], kaddrs[i] }) @@ -335,6 +326,15 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha } } + sub := &subscriber{ + topic: topic, + done: make(chan struct{}), + opts: options, + handler: handler, + kopts: k.opts, + consumers: make(map[string]map[int32]worker), + } + kopts := append(k.kopts, kgo.SeedBrokers(kaddrs...), kgo.ConsumerGroup(options.Group), @@ -347,7 +347,9 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha kgo.WithHooks(&metrics{meter: k.opts.Meter}), kgo.AutoCommitMarks(), kgo.AutoCommitInterval(td), - // TODO: must set https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#OnRevoked + kgo.OnPartitionsAssigned(sub.assigned), + kgo.OnPartitionsRevoked(sub.revoked), + kgo.OnPartitionsLost(sub.revoked), ) reader, err := kgo.NewClient(kopts...) @@ -355,7 +357,7 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha return nil, err } - sub := &subscriber{topic: topic, done: make(chan struct{}), opts: options, reader: reader, handler: handler, kopts: k.opts} + sub.reader = reader go sub.run(ctx) k.Lock() @@ -364,110 +366,6 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha return sub, nil } -func (s *subscriber) run(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case <-s.kopts.Context.Done(): - return - default: - fetches := s.reader.PollFetches(ctx) - if fetches.IsClientClosed() { - // TODO: fatal ? - return - } - if len(fetches.Errors()) > 0 { - for _, err := range fetches.Errors() { - s.kopts.Logger.Errorf(ctx, "fetch err topic %s partition %d: %v", err.Topic, err.Partition, err.Err) - } - // TODO: fatal ? - return - } - - if err := s.handleFetches(ctx, fetches); err != nil { - s.kopts.Logger.Errorf(ctx, "fetch handler err: %v", err) - // TODO: fatal ? - // return - } - } - } -} - -func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) error { - var err error - - eh := s.kopts.ErrorHandler - if s.opts.ErrorHandler != nil { - eh = s.opts.ErrorHandler - } - - g := &errgroup.Group{} - - for _, fetch := range fetches { - for _, ftopic := range fetch.Topics { - for _, partition := range ftopic.Partitions { - precords := partition.Records - g.Go(func() error { - for _, record := range precords { - p := pPool.Get().(*publication) - p.msg.Header = nil - p.msg.Body = nil - p.topic = s.topic - p.err = nil - p.ack = false - if s.opts.BodyOnly { - p.msg.Body = record.Value - } else { - if err := s.kopts.Codec.Unmarshal(record.Value, p.msg); err != nil { - p.err = err - p.msg.Body = record.Value - if eh != nil { - _ = eh(p) - if p.ack { - s.reader.MarkCommitRecords(record) - } - pPool.Put(p) - continue - } else { - if s.kopts.Logger.V(logger.ErrorLevel) { - s.kopts.Logger.Errorf(s.kopts.Context, "[kgo]: failed to unmarshal: %v", err) - } - } - pPool.Put(p) - return err - } - } - err = s.handler(p) - if err == nil && s.opts.AutoAck { - p.ack = true - } else if err != nil { - p.err = err - if eh != nil { - _ = eh(p) - } else { - if s.kopts.Logger.V(logger.ErrorLevel) { - s.kopts.Logger.Errorf(s.kopts.Context, "[kgo]: subscriber error: %v", err) - } - } - } - if p.ack { - s.reader.MarkCommitRecords(record) - } - pPool.Put(p) - } - return nil - }) - } - if err := g.Wait(); err != nil { - return err - } - } - } - - return nil -} - func (k *kBroker) String() string { return "kgo" } diff --git a/kgo_test.go b/kgo_test.go index 94702c8..e03c3f2 100644 --- a/kgo_test.go +++ b/kgo_test.go @@ -10,11 +10,12 @@ import ( "time" kg "github.com/twmb/franz-go/pkg/kgo" - kgo "github.com/unistack-org/micro-broker-kgo/v3" jsoncodec "github.com/unistack-org/micro-codec-json/v3" "github.com/unistack-org/micro/v3/broker" "github.com/unistack-org/micro/v3/logger" "github.com/unistack-org/micro/v3/metadata" + + kgo "github.com/unistack-org/micro-broker-kgo/v3" ) var ( @@ -33,7 +34,9 @@ func TestPubSub(t *testing.T) { t.Skip() } - logger.DefaultLogger.Init(logger.WithLevel(logger.TraceLevel), logger.WithCallerSkipCount(3)) + if err := logger.DefaultLogger.Init(logger.WithLevel(logger.TraceLevel), logger.WithCallerSkipCount(3)); err != nil { + t.Fatal(err) + } ctx := context.Background() var addrs []string diff --git a/options.go b/options.go index cdaaefc..154a056 100644 --- a/options.go +++ b/options.go @@ -54,3 +54,12 @@ type commitIntervalKey struct{} func CommitInterval(td time.Duration) broker.Option { return broker.SetOption(commitIntervalKey{}, td) } + +var DefaultSubscribeMaxInflight = 1000 + +type subscribeMaxInflightKey struct{} + +// SubscribeMaxInFlight specifies interval to send commits +func SubscribeMaxInFlight(n int) broker.SubscribeOption { + return broker.SetSubscribeOption(subscribeMaxInflightKey{}, n) +} diff --git a/util.go b/util.go new file mode 100644 index 0000000..8e72d0a --- /dev/null +++ b/util.go @@ -0,0 +1,193 @@ +package kgo + +import ( + "context" + "sync" + + kgo "github.com/twmb/franz-go/pkg/kgo" + "github.com/unistack-org/micro/v3/broker" + "github.com/unistack-org/micro/v3/logger" +) + +var pPool = sync.Pool{ + New: func() interface{} { + return &publication{msg: &broker.Message{}} + }, +} + +type worker struct { + done chan struct{} + recs chan []*kgo.Record + cherr chan error + handler broker.Handler + batchHandler broker.BatchHandler + opts broker.SubscribeOptions + kopts broker.Options + tpmap map[string][]int32 + maxInflight int + reader *kgo.Client + ctx context.Context +} + +func (s *subscriber) run(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-s.kopts.Context.Done(): + return + default: + fetches := s.reader.PollFetches(ctx) + if fetches.IsClientClosed() { + // TODO: fatal ? + return + } + if len(fetches.Errors()) > 0 { + for _, err := range fetches.Errors() { + s.kopts.Logger.Errorf(ctx, "fetch err topic %s partition %d: %v", err.Topic, err.Partition, err.Err) + } + // TODO: fatal ? + return + } + + fetches.EachPartition(func(p kgo.FetchTopicPartition) { + consumers := s.consumers[p.Topic] + if consumers == nil { + return + } + w, ok := consumers[p.Partition] + if !ok { + return + } + select { + case w.recs <- p.Records: + case <-w.done: + } + }) + } + } +} + +func (s *subscriber) assigned(ctx context.Context, _ *kgo.Client, assigned map[string][]int32) { + maxInflight := DefaultSubscribeMaxInflight + + if s.opts.Context != nil { + if n, ok := s.opts.Context.Value(subscribeMaxInflightKey{}).(int); n > 0 && ok { + maxInflight = n + } + } + + s.Lock() + for topic, partitions := range assigned { + if s.consumers[topic] == nil { + s.consumers[topic] = make(map[int32]worker) + } + for _, partition := range partitions { + w := worker{ + done: make(chan struct{}), + recs: make(chan []*kgo.Record, maxInflight), + cherr: make(chan error), + kopts: s.kopts, + opts: s.opts, + ctx: ctx, + tpmap: map[string][]int32{topic: []int32{partition}}, + reader: s.reader, + handler: s.handler, + batchHandler: s.batchhandler, + maxInflight: maxInflight, + } + s.consumers[topic][partition] = w + go w.handle() + } + } + s.Unlock() +} + +func (s *subscriber) revoked(_ context.Context, _ *kgo.Client, revoked map[string][]int32) { + s.Lock() + for topic, partitions := range revoked { + ptopics := s.consumers[topic] + for _, partition := range partitions { + w := ptopics[partition] + delete(ptopics, partition) + if len(ptopics) == 0 { + delete(s.consumers, topic) + } + close(w.done) + } + } + s.Unlock() +} + +func (w *worker) handle() { + var err error + + eh := w.kopts.ErrorHandler + if w.opts.ErrorHandler != nil { + eh = w.opts.ErrorHandler + } + + for { + select { + case <-w.ctx.Done(): + w.cherr <- w.ctx.Err() + return + case <-w.done: + return + case recs := <-w.recs: + if len(recs) == w.maxInflight { + w.reader.PauseFetchPartitions(w.tpmap) + } + for _, record := range recs { + p := pPool.Get().(*publication) + p.msg.Header = nil + p.msg.Body = nil + p.topic = record.Topic + p.err = nil + p.ack = false + if w.opts.BodyOnly { + p.msg.Body = record.Value + } else { + if err := w.kopts.Codec.Unmarshal(record.Value, p.msg); err != nil { + p.err = err + p.msg.Body = record.Value + if eh != nil { + _ = eh(p) + if p.ack { + w.reader.MarkCommitRecords(record) + } + pPool.Put(p) + continue + } else { + if w.kopts.Logger.V(logger.ErrorLevel) { + w.kopts.Logger.Errorf(w.kopts.Context, "[kgo]: failed to unmarshal: %v", err) + } + } + pPool.Put(p) + w.cherr <- err + return + } + } + err = w.handler(p) + if err == nil && w.opts.AutoAck { + p.ack = true + } else if err != nil { + p.err = err + if eh != nil { + _ = eh(p) + } else { + if w.kopts.Logger.V(logger.ErrorLevel) { + w.kopts.Logger.Errorf(w.kopts.Context, "[kgo]: subscriber error: %v", err) + } + } + } + if p.ack { + w.reader.MarkCommitRecords(record) + } + pPool.Put(p) + } + + w.reader.ResumeFetchPartitions(w.tpmap) + } + } +}