25
kgo.go
25
kgo.go
@@ -19,6 +19,12 @@ import (
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
var pPool = sync.Pool{
|
||||
New: func() interface{} {
|
||||
return &broker.Message{}
|
||||
},
|
||||
}
|
||||
|
||||
type kBroker struct {
|
||||
writer *kgo.Client // used only to push messages
|
||||
kopts []kgo.Opt
|
||||
@@ -397,7 +403,7 @@ func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) err
|
||||
}
|
||||
|
||||
// preallocate optimistic
|
||||
crecords := make([]*kgo.Record, 0, 1000)
|
||||
crecords := make([]*kgo.Record, 0, 10000)
|
||||
|
||||
eh := s.kopts.ErrorHandler
|
||||
if s.opts.ErrorHandler != nil {
|
||||
@@ -430,7 +436,6 @@ func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) err
|
||||
return err
|
||||
case <-ticker.C:
|
||||
v := atomic.LoadInt64(&cnt)
|
||||
s.kopts.Logger.Debugf(ctx, "records to commit: %v", v)
|
||||
if v == 0 {
|
||||
return nil
|
||||
}
|
||||
@@ -439,9 +444,8 @@ func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) err
|
||||
mu.Unlock()
|
||||
return err
|
||||
}
|
||||
s.kopts.Logger.Debugf(ctx, "records to need process after commit: %v", v-int64(len(crecords)))
|
||||
atomic.AddInt64(&cnt, -int64(len(crecords)))
|
||||
crecords = nil
|
||||
crecords = crecords[:0]
|
||||
mu.Unlock()
|
||||
}
|
||||
}
|
||||
@@ -457,7 +461,10 @@ func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) err
|
||||
case <-gctx.Done():
|
||||
return gctx.Err()
|
||||
default:
|
||||
p := &publication{topic: record.Topic, msg: &broker.Message{}}
|
||||
msg := pPool.Get().(*broker.Message)
|
||||
msg.Header = nil
|
||||
msg.Body = nil
|
||||
p := &publication{topic: record.Topic, msg: msg}
|
||||
if s.opts.BodyOnly {
|
||||
p.msg.Body = record.Value
|
||||
} else {
|
||||
@@ -471,12 +478,14 @@ func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) err
|
||||
crecords = append(crecords, record)
|
||||
mu.Unlock()
|
||||
}
|
||||
pPool.Put(msg)
|
||||
return nil
|
||||
} else {
|
||||
if s.kopts.Logger.V(logger.ErrorLevel) {
|
||||
s.kopts.Logger.Errorf(s.kopts.Context, "[kgo]: failed to unmarshal: %v", err)
|
||||
}
|
||||
}
|
||||
pPool.Put(msg)
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -501,6 +510,7 @@ func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) err
|
||||
mu.Unlock()
|
||||
}
|
||||
}
|
||||
pPool.Put(msg)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
@@ -511,10 +521,6 @@ func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) err
|
||||
return err
|
||||
}
|
||||
|
||||
if s.kopts.Logger.V(logger.DebugLevel) {
|
||||
logger.Debugf(ctx, "commit %d records", len(crecords))
|
||||
}
|
||||
|
||||
return s.reader.CommitRecords(ctx, crecords...)
|
||||
}
|
||||
|
||||
@@ -525,6 +531,7 @@ func (k *kBroker) String() string {
|
||||
func NewBroker(opts ...broker.Option) broker.Broker {
|
||||
options := broker.NewOptions(opts...)
|
||||
kopts := []kgo.Opt{
|
||||
kgo.BatchCompression(kgo.NoCompression()),
|
||||
kgo.WithLogger(&mlogger{l: options.Logger, ctx: options.Context}),
|
||||
kgo.RequiredAcks(kgo.AllISRAcks()),
|
||||
kgo.RetryBackoffFn(
|
||||
|
Reference in New Issue
Block a user