support sync and async commit offsets

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2021-07-18 13:45:12 +03:00
parent b15586163e
commit f39888f6d4
2 changed files with 50 additions and 28 deletions

View File

@ -13,7 +13,7 @@ var (
DefaultReaderConfig = kafka.ReaderConfig{} DefaultReaderConfig = kafka.ReaderConfig{}
DefaultWriterConfig = kafka.WriterConfig{} DefaultWriterConfig = kafka.WriterConfig{}
DefaultStatsInterval = time.Second * 10 DefaultStatsInterval = time.Second * 10
DefaultCommitInterval = time.Second * 2 DefaultCommitInterval = time.Second * 0
DefaultCommitQueueSize = 2000 DefaultCommitQueueSize = 2000
) )

View File

@ -384,7 +384,7 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha
k.opts.Logger.Trace(k.opts.Context, "start async commit loop") k.opts.Logger.Trace(k.opts.Context, "start async commit loop")
} }
// run async commit loop // run async commit loop
go k.commitLoop(generation, ackCh, errChs, &wg) go k.commitLoop(generation, k.readerConfig.CommitInterval, ackCh, errChs, &wg)
} }
} }
}() }()
@ -402,17 +402,24 @@ type cgHandler struct {
wg *sync.WaitGroup wg *sync.WaitGroup
} }
func (k *kBroker) commitLoop(generation *kafka.Generation, ackCh chan map[string]map[int]int64, errChs []chan error, wg *sync.WaitGroup) { func (k *kBroker) commitLoop(generation *kafka.Generation, commitInterval time.Duration, ackCh chan map[string]map[int]int64, errChs []chan error, wg *sync.WaitGroup) {
var mapMu sync.Mutex
td := DefaultCommitInterval td := DefaultCommitInterval
if commitInterval > 0 {
td = commitInterval
}
if v, ok := k.opts.Context.Value(commitIntervalKey{}).(time.Duration); ok && td > 0 { if v, ok := k.opts.Context.Value(commitIntervalKey{}).(time.Duration); ok && td > 0 {
td = v td = v
} }
// async commit loop
if td > 0 {
ticker := time.NewTicker(td) ticker := time.NewTicker(td)
defer ticker.Stop() defer ticker.Stop()
var mapMu sync.Mutex
offsets := make(map[string]map[int]int64, 4) offsets := make(map[string]map[int]int64, 4)
for { for {
@ -423,17 +430,6 @@ func (k *kBroker) commitLoop(generation *kafka.Generation, ackCh chan map[string
k.opts.Logger.Trace(k.opts.Context, "all readers are done, return from commit loop") k.opts.Logger.Trace(k.opts.Context, "all readers are done, return from commit loop")
} }
return return
case ack := <-ackCh:
mapMu.Lock()
for k, v := range ack {
if _, ok := offsets[k]; !ok {
offsets[k] = make(map[int]int64, 4)
}
for p, o := range v {
offsets[k][p] = o + 1
}
}
mapMu.Unlock()
case <-ticker.C: case <-ticker.C:
mapMu.Lock() mapMu.Lock()
if len(offsets) == 0 { if len(offsets) == 0 {
@ -441,7 +437,7 @@ func (k *kBroker) commitLoop(generation *kafka.Generation, ackCh chan map[string
continue continue
} }
if k.opts.Logger.V(logger.TraceLevel) { if k.opts.Logger.V(logger.TraceLevel) {
k.opts.Logger.Tracef(k.opts.Context, "commit offsets: %v", offsets) k.opts.Logger.Tracef(k.opts.Context, "async commit offsets: %v", offsets)
} }
err := generation.CommitOffsets(offsets) err := generation.CommitOffsets(offsets)
if err != nil { if err != nil {
@ -458,6 +454,32 @@ func (k *kBroker) commitLoop(generation *kafka.Generation, ackCh chan map[string
} }
} }
// sync commit loop
for {
select {
default:
wg.Wait()
if k.opts.Logger.V(logger.TraceLevel) {
k.opts.Logger.Trace(k.opts.Context, "all readers are done, return from commit loop")
}
return
case ack := <-ackCh:
if k.opts.Logger.V(logger.TraceLevel) {
k.opts.Logger.Tracef(k.opts.Context, "sync commit offsets: %v", ack)
}
err := generation.CommitOffsets(ack)
if err != nil {
for _, errCh := range errChs {
errCh <- err
close(errCh)
}
return
}
}
}
}
func (h *cgHandler) run(ctx context.Context) { func (h *cgHandler) run(ctx context.Context) {
td := DefaultStatsInterval td := DefaultStatsInterval
if v, ok := h.brokerOpts.Context.Value(statsIntervalKey{}).(time.Duration); ok && td > 0 { if v, ok := h.brokerOpts.Context.Value(statsIntervalKey{}).(time.Duration); ok && td > 0 {