update to latest kgo

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2021-09-02 00:20:55 +03:00
parent b2975262af
commit 07fd48cd38
3 changed files with 21 additions and 107 deletions

109
kgo.go
View File

@@ -7,12 +7,9 @@ import (
"math/rand"
"strings"
"sync"
"sync/atomic"
"time"
kerr "github.com/twmb/franz-go/pkg/kerr"
kgo "github.com/twmb/franz-go/pkg/kgo"
kmsg "github.com/twmb/franz-go/pkg/kmsg"
"github.com/unistack-org/micro/v3/broker"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/metadata"
@@ -331,17 +328,25 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha
kaddrs[i], kaddrs[j] = kaddrs[j], kaddrs[i]
})
td := DefaultCommitInterval
if k.opts.Context != nil {
if v, ok := k.opts.Context.Value(commitIntervalKey{}).(time.Duration); ok && v > 0 {
td = v
}
}
kopts := append(k.kopts,
kgo.SeedBrokers(kaddrs...),
kgo.ConsumerGroup(options.Group),
kgo.ConsumeTopics(topic),
kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),
kgo.DisableAutoCommit(),
kgo.FetchMaxWait(1*time.Second),
// kgo.KeepControlRecords(),
kgo.Balancers(kgo.CooperativeStickyBalancer(), kgo.StickyBalancer()),
kgo.FetchIsolationLevel(kgo.ReadUncommitted()),
kgo.WithHooks(&metrics{meter: k.opts.Meter}),
kgo.AutoCommitMarks(),
kgo.AutoCommitInterval(td),
// TODO: must set https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#OnRevoked
)
@@ -397,87 +402,7 @@ func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) err
eh = s.opts.ErrorHandler
}
var mu sync.Mutex
done := int32(0)
doneCh := make(chan struct{})
g, gctx := errgroup.WithContext(ctx)
td := DefaultCommitInterval
if s.kopts.Context != nil {
if v, ok := s.kopts.Context.Value(commitIntervalKey{}).(time.Duration); ok && v > 0 {
td = v
}
}
// ticker for commit offsets
ticker := time.NewTicker(td)
defer ticker.Stop()
offsets := make(map[string]map[int32]kgo.EpochOffset)
offsets[s.topic] = make(map[int32]kgo.EpochOffset)
fillOffsets := func(off map[string]map[int32]kgo.EpochOffset, rec *kgo.Record) {
mu.Lock()
if at, ok := off[s.topic][rec.Partition]; ok {
if at.Epoch > rec.LeaderEpoch || at.Epoch == rec.LeaderEpoch && at.Offset > rec.Offset {
mu.Unlock()
return
}
}
off[s.topic][rec.Partition] = kgo.EpochOffset{Epoch: rec.LeaderEpoch, Offset: rec.Offset + 1}
mu.Unlock()
}
commitOffsets := func(cl *kgo.Client, ctx context.Context, off map[string]map[int32]kgo.EpochOffset) error {
var rerr error
mu.Lock()
offsets := off
mu.Unlock()
cl.CommitOffsetsSync(ctx, offsets, func(_ *kgo.Client, _ *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
if err != nil {
rerr = err
return
}
for _, topic := range resp.Topics {
for _, partition := range topic.Partitions {
if err := kerr.ErrorForCode(partition.ErrorCode); err != nil {
rerr = err
return
}
}
}
})
return rerr
}
go func() {
for {
select {
case <-gctx.Done():
return
case <-s.done:
atomic.StoreInt32(&done, 1)
if err := commitOffsets(s.reader, ctx, offsets); err != nil && s.kopts.Logger.V(logger.ErrorLevel) {
s.kopts.Logger.Errorf(s.kopts.Context, "[kgo]: failed to commit offsets: %v", err)
}
return
case <-doneCh:
return
case <-ticker.C:
if err := commitOffsets(s.reader, ctx, offsets); err != nil {
if s.kopts.Logger.V(logger.ErrorLevel) {
s.kopts.Logger.Errorf(s.kopts.Context, "[kgo]: failed to commit offsets: %v", err)
}
return
}
}
}
}()
g := &errgroup.Group{}
for _, fetch := range fetches {
for _, ftopic := range fetch.Topics {
@@ -485,9 +410,6 @@ func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) err
precords := partition.Records
g.Go(func() error {
for _, record := range precords {
if atomic.LoadInt32(&done) == 1 {
return nil
}
p := pPool.Get().(*publication)
p.msg.Header = nil
p.msg.Body = nil
@@ -503,7 +425,7 @@ func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) err
if eh != nil {
_ = eh(p)
if p.ack {
fillOffsets(offsets, record)
s.reader.MarkCommitRecords(record)
}
pPool.Put(p)
continue
@@ -530,7 +452,7 @@ func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) err
}
}
if p.ack {
fillOffsets(offsets, record)
s.reader.MarkCommitRecords(record)
}
pPool.Put(p)
}
@@ -543,9 +465,7 @@ func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) err
}
}
close(doneCh)
return commitOffsets(s.reader, ctx, offsets)
return nil
}
func (k *kBroker) String() string {
@@ -555,7 +475,8 @@ func (k *kBroker) String() string {
func NewBroker(opts ...broker.Option) broker.Broker {
options := broker.NewOptions(opts...)
kopts := []kgo.Opt{
kgo.BatchCompression(kgo.NoCompression()),
kgo.DisableIdempotentWrite(),
kgo.ProducerBatchCompression(kgo.NoCompression()),
kgo.WithLogger(&mlogger{l: options.Logger, ctx: options.Context}),
kgo.RetryBackoffFn(
func() func(int) time.Duration {