diff --git a/go.mod b/go.mod index 6f44977..22477ef 100644 --- a/go.mod +++ b/go.mod @@ -4,10 +4,9 @@ go 1.16 require ( github.com/klauspost/compress v1.13.5 // indirect - github.com/twmb/franz-go v0.10.3-0.20210829190149-e4b93a10d4d0 - github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210829190149-e4b93a10d4d0 + github.com/twmb/franz-go v0.11.0 + github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210901054312-f2002b3e2313 // indirect github.com/unistack-org/micro-codec-json/v3 v3.2.5 - github.com/unistack-org/micro-proto v0.0.8 // indirect github.com/unistack-org/micro/v3 v3.7.0 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c google.golang.org/protobuf v1.27.1 // indirect diff --git a/go.sum b/go.sum index 63f9b57..dc4c3e7 100644 --- a/go.sum +++ b/go.sum @@ -22,7 +22,6 @@ github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QD github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -42,7 +41,6 @@ github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/U github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aWNYyvBVK62bc= github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= -github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEEeX6s= github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/klauspost/compress v1.13.5 h1:9O69jUPDcsT9fEm74W92rZL9FQY7rCdaXVneq+yyzl4= github.com/klauspost/compress v1.13.5/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= @@ -58,22 +56,18 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/twmb/franz-go v0.10.3-0.20210829190149-e4b93a10d4d0 h1:QsuADzDJnU0UmGc3MPQHKHMiZOyC5wkfHPlZMTqwKaU= -github.com/twmb/franz-go v0.10.3-0.20210829190149-e4b93a10d4d0/go.mod h1:a8tUwwic5WYy32hMT7QUrsuwQ9b/AwN3Ub61PoEQagg= -github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210829174113-fcaaf3f18f2f h1:A4B6hpsLYYrO1cdwUnul93CF2/+VpgyT6D9TC5n5wMg= +github.com/twmb/franz-go v0.11.0 h1:WHGZWV4rZVbkQndjqxYL3dXU1VdF4CZ68KgaHLwEtH8= +github.com/twmb/franz-go v0.11.0/go.mod h1:a8tUwwic5WYy32hMT7QUrsuwQ9b/AwN3Ub61PoEQagg= github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210829174113-fcaaf3f18f2f/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= -github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210829190149-e4b93a10d4d0 h1:57Hn2rf+MZc3/A5foxUCGpsKhF6Pg4tWZfyTR1HObAU= -github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210829190149-e4b93a10d4d0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= +github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210901054312-f2002b3e2313 h1:iS6Upu5PnhziQaIUHg2eXHLvjHGwmk+1eK1WGIl7gOU= +github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210901054312-f2002b3e2313/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= github.com/twmb/go-rbtree v1.0.0 h1:KxN7dXJ8XaZ4cvmHV1qqXTshxX3EBvX/toG5+UR49Mg= github.com/twmb/go-rbtree v1.0.0/go.mod h1:UlIAI8gu3KRPkXSobZnmJfVwCJgEhD/liWzT5ppzIyc= github.com/unistack-org/micro-codec-json/v3 v3.2.5 h1:WOilhbL0YSu58iIQIIxpawRYZyx6CR16tCpbX4ai3Vc= github.com/unistack-org/micro-codec-json/v3 v3.2.5/go.mod h1:LSzfrD9GYWCl6KOyihywx1wlbOgStrpyy3NVHNZAvHA= -github.com/unistack-org/micro-proto v0.0.5/go.mod h1:EuI7UlfGXmT1hy6WacULib9LbNgRnDYQvTCFoLgKM2I= github.com/unistack-org/micro-proto v0.0.8 h1:g4UZGQGeYGI3CFJtjuEm47aouYPviG8SDhSifl0831w= github.com/unistack-org/micro-proto v0.0.8/go.mod h1:GYO53DWmeldRIo90cAdQx8bLr/WJMxW62W4ja74p1Ac= github.com/unistack-org/micro/v3 v3.3.19/go.mod h1:LXmPfbJnJNvL0kQs8HfnkV3Wya2Wb+C7keVq++RCZnk= -github.com/unistack-org/micro/v3 v3.6.3 h1:CvC4B2kOgwzhPx+w9fcbEIJkgzxS7i84PRB+vL6Vz0U= -github.com/unistack-org/micro/v3 v3.6.3/go.mod h1:Hp+DmX1Bt+/z+ihk5coYwNtXhPMwr4SJuCFRy2kyUl8= github.com/unistack-org/micro/v3 v3.7.0 h1:jEEegoVh1VIgT/+4gHw3TmwI3p7ufAdV37RVNxx9kNc= github.com/unistack-org/micro/v3 v3.7.0/go.mod h1:xXGbjNQShqlth0hv+q7ijGvciXGVqxUnVdkFm0t95rk= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/kgo.go b/kgo.go index d6543c3..82f1303 100644 --- a/kgo.go +++ b/kgo.go @@ -7,12 +7,9 @@ import ( "math/rand" "strings" "sync" - "sync/atomic" "time" - kerr "github.com/twmb/franz-go/pkg/kerr" kgo "github.com/twmb/franz-go/pkg/kgo" - kmsg "github.com/twmb/franz-go/pkg/kmsg" "github.com/unistack-org/micro/v3/broker" "github.com/unistack-org/micro/v3/logger" "github.com/unistack-org/micro/v3/metadata" @@ -331,17 +328,25 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha kaddrs[i], kaddrs[j] = kaddrs[j], kaddrs[i] }) + td := DefaultCommitInterval + if k.opts.Context != nil { + if v, ok := k.opts.Context.Value(commitIntervalKey{}).(time.Duration); ok && v > 0 { + td = v + } + } + kopts := append(k.kopts, kgo.SeedBrokers(kaddrs...), kgo.ConsumerGroup(options.Group), kgo.ConsumeTopics(topic), kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()), - kgo.DisableAutoCommit(), kgo.FetchMaxWait(1*time.Second), // kgo.KeepControlRecords(), kgo.Balancers(kgo.CooperativeStickyBalancer(), kgo.StickyBalancer()), kgo.FetchIsolationLevel(kgo.ReadUncommitted()), kgo.WithHooks(&metrics{meter: k.opts.Meter}), + kgo.AutoCommitMarks(), + kgo.AutoCommitInterval(td), // TODO: must set https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#OnRevoked ) @@ -397,87 +402,7 @@ func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) err eh = s.opts.ErrorHandler } - var mu sync.Mutex - - done := int32(0) - doneCh := make(chan struct{}) - g, gctx := errgroup.WithContext(ctx) - - td := DefaultCommitInterval - if s.kopts.Context != nil { - if v, ok := s.kopts.Context.Value(commitIntervalKey{}).(time.Duration); ok && v > 0 { - td = v - } - } - - // ticker for commit offsets - ticker := time.NewTicker(td) - defer ticker.Stop() - - offsets := make(map[string]map[int32]kgo.EpochOffset) - offsets[s.topic] = make(map[int32]kgo.EpochOffset) - - fillOffsets := func(off map[string]map[int32]kgo.EpochOffset, rec *kgo.Record) { - mu.Lock() - if at, ok := off[s.topic][rec.Partition]; ok { - if at.Epoch > rec.LeaderEpoch || at.Epoch == rec.LeaderEpoch && at.Offset > rec.Offset { - mu.Unlock() - return - } - } - off[s.topic][rec.Partition] = kgo.EpochOffset{Epoch: rec.LeaderEpoch, Offset: rec.Offset + 1} - mu.Unlock() - } - - commitOffsets := func(cl *kgo.Client, ctx context.Context, off map[string]map[int32]kgo.EpochOffset) error { - var rerr error - - mu.Lock() - offsets := off - mu.Unlock() - - cl.CommitOffsetsSync(ctx, offsets, func(_ *kgo.Client, _ *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) { - if err != nil { - rerr = err - return - } - - for _, topic := range resp.Topics { - for _, partition := range topic.Partitions { - if err := kerr.ErrorForCode(partition.ErrorCode); err != nil { - rerr = err - return - } - } - } - }) - - return rerr - } - - go func() { - for { - select { - case <-gctx.Done(): - return - case <-s.done: - atomic.StoreInt32(&done, 1) - if err := commitOffsets(s.reader, ctx, offsets); err != nil && s.kopts.Logger.V(logger.ErrorLevel) { - s.kopts.Logger.Errorf(s.kopts.Context, "[kgo]: failed to commit offsets: %v", err) - } - return - case <-doneCh: - return - case <-ticker.C: - if err := commitOffsets(s.reader, ctx, offsets); err != nil { - if s.kopts.Logger.V(logger.ErrorLevel) { - s.kopts.Logger.Errorf(s.kopts.Context, "[kgo]: failed to commit offsets: %v", err) - } - return - } - } - } - }() + g := &errgroup.Group{} for _, fetch := range fetches { for _, ftopic := range fetch.Topics { @@ -485,9 +410,6 @@ func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) err precords := partition.Records g.Go(func() error { for _, record := range precords { - if atomic.LoadInt32(&done) == 1 { - return nil - } p := pPool.Get().(*publication) p.msg.Header = nil p.msg.Body = nil @@ -503,7 +425,7 @@ func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) err if eh != nil { _ = eh(p) if p.ack { - fillOffsets(offsets, record) + s.reader.MarkCommitRecords(record) } pPool.Put(p) continue @@ -530,7 +452,7 @@ func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) err } } if p.ack { - fillOffsets(offsets, record) + s.reader.MarkCommitRecords(record) } pPool.Put(p) } @@ -543,9 +465,7 @@ func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) err } } - close(doneCh) - - return commitOffsets(s.reader, ctx, offsets) + return nil } func (k *kBroker) String() string { @@ -555,7 +475,8 @@ func (k *kBroker) String() string { func NewBroker(opts ...broker.Option) broker.Broker { options := broker.NewOptions(opts...) kopts := []kgo.Opt{ - kgo.BatchCompression(kgo.NoCompression()), + kgo.DisableIdempotentWrite(), + kgo.ProducerBatchCompression(kgo.NoCompression()), kgo.WithLogger(&mlogger{l: options.Logger, ctx: options.Context}), kgo.RetryBackoffFn( func() func(int) time.Duration {