cpu and memory optimizations
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
209
kgo.go
209
kgo.go
@@ -10,7 +10,9 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
kerr "github.com/twmb/franz-go/pkg/kerr"
|
||||
kgo "github.com/twmb/franz-go/pkg/kgo"
|
||||
kmsg "github.com/twmb/franz-go/pkg/kmsg"
|
||||
"github.com/unistack-org/micro/v3/broker"
|
||||
"github.com/unistack-org/micro/v3/logger"
|
||||
"github.com/unistack-org/micro/v3/metadata"
|
||||
@@ -21,7 +23,7 @@ import (
|
||||
|
||||
var pPool = sync.Pool{
|
||||
New: func() interface{} {
|
||||
return &broker.Message{}
|
||||
return &publication{msg: &broker.Message{}}
|
||||
},
|
||||
}
|
||||
|
||||
@@ -336,7 +338,7 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha
|
||||
kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),
|
||||
kgo.DisableAutoCommit(),
|
||||
kgo.FetchMaxWait(1*time.Second),
|
||||
kgo.KeepControlRecords(),
|
||||
// kgo.KeepControlRecords(),
|
||||
kgo.Balancers(kgo.CooperativeStickyBalancer(), kgo.StickyBalancer()),
|
||||
kgo.FetchIsolationLevel(kgo.ReadUncommitted()),
|
||||
// kgo.WithHooks(&metrics{meter: k.opts.Meter}),
|
||||
@@ -348,7 +350,7 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sub := &subscriber{done: make(chan struct{}), opts: options, reader: reader, handler: handler, kopts: k.opts}
|
||||
sub := &subscriber{topic: topic, done: make(chan struct{}), opts: options, reader: reader, handler: handler, kopts: k.opts}
|
||||
go sub.run(ctx)
|
||||
|
||||
k.Lock()
|
||||
@@ -390,21 +392,6 @@ func (s *subscriber) run(ctx context.Context) {
|
||||
func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) error {
|
||||
var err error
|
||||
|
||||
mprecords := make(map[int32][]*kgo.Record)
|
||||
|
||||
cnt := int64(0)
|
||||
for _, fetch := range fetches {
|
||||
for _, ftopic := range fetch.Topics {
|
||||
for _, partition := range ftopic.Partitions {
|
||||
mprecords[partition.Partition] = append(mprecords[partition.Partition], partition.Records...)
|
||||
cnt += int64(len(partition.Records))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// preallocate optimistic
|
||||
crecords := make([]*kgo.Record, 0, 10000)
|
||||
|
||||
eh := s.kopts.ErrorHandler
|
||||
if s.opts.ErrorHandler != nil {
|
||||
eh = s.opts.ErrorHandler
|
||||
@@ -412,6 +399,8 @@ func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) err
|
||||
|
||||
var mu sync.Mutex
|
||||
|
||||
done := int32(0)
|
||||
doneCh := make(chan struct{})
|
||||
g, gctx := errgroup.WithContext(ctx)
|
||||
|
||||
td := DefaultCommitInterval
|
||||
@@ -421,107 +410,142 @@ func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) err
|
||||
}
|
||||
}
|
||||
|
||||
// ticker for commit offsets
|
||||
ticker := time.NewTicker(td)
|
||||
defer ticker.Stop()
|
||||
|
||||
g.Go(func() error {
|
||||
offsets := make(map[string]map[int32]kgo.EpochOffset)
|
||||
offsets[s.topic] = make(map[int32]kgo.EpochOffset)
|
||||
|
||||
fillOffsets := func(off map[string]map[int32]kgo.EpochOffset, rec *kgo.Record) {
|
||||
mu.Lock()
|
||||
if at, ok := off[s.topic][rec.Partition]; ok {
|
||||
if at.Epoch > rec.LeaderEpoch || at.Epoch == rec.LeaderEpoch && at.Offset > rec.Offset {
|
||||
mu.Unlock()
|
||||
return
|
||||
}
|
||||
}
|
||||
off[s.topic][rec.Partition] = kgo.EpochOffset{Epoch: rec.LeaderEpoch, Offset: rec.Offset + 1}
|
||||
mu.Unlock()
|
||||
}
|
||||
|
||||
commitOffsets := func(cl *kgo.Client, ctx context.Context, off map[string]map[int32]kgo.EpochOffset) error {
|
||||
var rerr error
|
||||
|
||||
mu.Lock()
|
||||
offsets := off
|
||||
mu.Unlock()
|
||||
|
||||
cl.CommitOffsetsSync(ctx, offsets, func(_ *kgo.Client, _ *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
|
||||
if err != nil {
|
||||
rerr = err
|
||||
return
|
||||
}
|
||||
|
||||
for _, topic := range resp.Topics {
|
||||
for _, partition := range topic.Partitions {
|
||||
if err := kerr.ErrorForCode(partition.ErrorCode); err != nil {
|
||||
rerr = err
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
return rerr
|
||||
}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-gctx.Done():
|
||||
return gctx.Err()
|
||||
return
|
||||
case <-s.done:
|
||||
mu.Lock()
|
||||
err := s.reader.CommitRecords(ctx, crecords...)
|
||||
mu.Unlock()
|
||||
return err
|
||||
atomic.StoreInt32(&done, 1)
|
||||
if err := commitOffsets(s.reader, ctx, offsets); err != nil && s.kopts.Logger.V(logger.ErrorLevel) {
|
||||
s.kopts.Logger.Errorf(s.kopts.Context, "[kgo]: failed to commit offsets: %v", err)
|
||||
}
|
||||
return
|
||||
case <-doneCh:
|
||||
return
|
||||
case <-ticker.C:
|
||||
v := atomic.LoadInt64(&cnt)
|
||||
if v == 0 {
|
||||
return nil
|
||||
if err := commitOffsets(s.reader, ctx, offsets); err != nil {
|
||||
if s.kopts.Logger.V(logger.ErrorLevel) {
|
||||
s.kopts.Logger.Errorf(s.kopts.Context, "[kgo]: failed to commit offsets: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
mu.Lock()
|
||||
if err := s.reader.CommitRecords(ctx, crecords...); err != nil {
|
||||
mu.Unlock()
|
||||
return err
|
||||
}
|
||||
atomic.AddInt64(&cnt, -int64(len(crecords)))
|
||||
crecords = crecords[:0]
|
||||
mu.Unlock()
|
||||
}
|
||||
}
|
||||
})
|
||||
}()
|
||||
|
||||
for _, records := range mprecords {
|
||||
precords := records
|
||||
g.Go(func() error {
|
||||
for _, record := range precords {
|
||||
select {
|
||||
case <-s.done:
|
||||
return nil
|
||||
case <-gctx.Done():
|
||||
return gctx.Err()
|
||||
default:
|
||||
msg := pPool.Get().(*broker.Message)
|
||||
msg.Header = nil
|
||||
msg.Body = nil
|
||||
p := &publication{topic: record.Topic, msg: msg}
|
||||
if s.opts.BodyOnly {
|
||||
p.msg.Body = record.Value
|
||||
} else {
|
||||
if err := s.kopts.Codec.Unmarshal(record.Value, p.msg); err != nil {
|
||||
p.err = err
|
||||
for _, fetch := range fetches {
|
||||
for _, ftopic := range fetch.Topics {
|
||||
for _, partition := range ftopic.Partitions {
|
||||
precords := partition.Records
|
||||
g.Go(func() error {
|
||||
for _, record := range precords {
|
||||
if atomic.LoadInt32(&done) == 1 {
|
||||
return nil
|
||||
}
|
||||
p := pPool.Get().(*publication)
|
||||
p.msg.Header = nil
|
||||
p.msg.Body = nil
|
||||
p.topic = s.topic
|
||||
p.err = nil
|
||||
p.ack = false
|
||||
if s.opts.BodyOnly {
|
||||
p.msg.Body = record.Value
|
||||
} else {
|
||||
if err := s.kopts.Codec.Unmarshal(record.Value, p.msg); err != nil {
|
||||
p.err = err
|
||||
p.msg.Body = record.Value
|
||||
if eh != nil {
|
||||
_ = eh(p)
|
||||
if p.ack {
|
||||
fillOffsets(offsets, record)
|
||||
}
|
||||
pPool.Put(p)
|
||||
continue
|
||||
} else {
|
||||
if s.kopts.Logger.V(logger.ErrorLevel) {
|
||||
s.kopts.Logger.Errorf(s.kopts.Context, "[kgo]: failed to unmarshal: %v", err)
|
||||
}
|
||||
}
|
||||
pPool.Put(p)
|
||||
return err
|
||||
}
|
||||
}
|
||||
err = s.handler(p)
|
||||
if err == nil && s.opts.AutoAck {
|
||||
p.ack = true
|
||||
} else if err != nil {
|
||||
p.err = err
|
||||
if eh != nil {
|
||||
_ = eh(p)
|
||||
if p.ack {
|
||||
mu.Lock()
|
||||
crecords = append(crecords, record)
|
||||
mu.Unlock()
|
||||
}
|
||||
pPool.Put(msg)
|
||||
return nil
|
||||
} else {
|
||||
if s.kopts.Logger.V(logger.ErrorLevel) {
|
||||
s.kopts.Logger.Errorf(s.kopts.Context, "[kgo]: failed to unmarshal: %v", err)
|
||||
s.kopts.Logger.Errorf(s.kopts.Context, "[kgo]: subscriber error: %v", err)
|
||||
}
|
||||
}
|
||||
pPool.Put(msg)
|
||||
return err
|
||||
}
|
||||
}
|
||||
err = s.handler(p)
|
||||
if err == nil && (s.opts.AutoAck || p.ack) {
|
||||
mu.Lock()
|
||||
crecords = append(crecords, record)
|
||||
mu.Unlock()
|
||||
}
|
||||
if err != nil {
|
||||
p.err = err
|
||||
if eh != nil {
|
||||
_ = eh(p)
|
||||
} else {
|
||||
if s.kopts.Logger.V(logger.ErrorLevel) {
|
||||
s.kopts.Logger.Errorf(s.kopts.Context, "[kgo]: subscriber error: %v", err)
|
||||
}
|
||||
}
|
||||
if p.ack {
|
||||
mu.Lock()
|
||||
crecords = append(crecords, record)
|
||||
mu.Unlock()
|
||||
fillOffsets(offsets, record)
|
||||
}
|
||||
pPool.Put(p)
|
||||
}
|
||||
pPool.Put(msg)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err := g.Wait(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := g.Wait(); err != nil {
|
||||
return err
|
||||
}
|
||||
close(doneCh)
|
||||
|
||||
return s.reader.CommitRecords(ctx, crecords...)
|
||||
return commitOffsets(s.reader, ctx, offsets)
|
||||
}
|
||||
|
||||
func (k *kBroker) String() string {
|
||||
@@ -533,7 +557,6 @@ func NewBroker(opts ...broker.Option) broker.Broker {
|
||||
kopts := []kgo.Opt{
|
||||
kgo.BatchCompression(kgo.NoCompression()),
|
||||
kgo.WithLogger(&mlogger{l: options.Logger, ctx: options.Context}),
|
||||
kgo.RequiredAcks(kgo.AllISRAcks()),
|
||||
kgo.RetryBackoffFn(
|
||||
func() func(int) time.Duration {
|
||||
var rng mrand.Rand
|
||||
|
||||
Reference in New Issue
Block a user