improve consumer speed #41
39
segmentio.go
39
segmentio.go
@ -47,7 +47,6 @@ type publication struct {
|
|||||||
msg *broker.Message
|
msg *broker.Message
|
||||||
ackCh chan map[string]map[int]int64
|
ackCh chan map[string]map[int]int64
|
||||||
readerDone *int32
|
readerDone *int32
|
||||||
ackChMu sync.Mutex
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *publication) Topic() string {
|
func (p *publication) Topic() string {
|
||||||
@ -62,9 +61,7 @@ func (p *publication) Ack() error {
|
|||||||
if atomic.LoadInt32(p.readerDone) == 1 {
|
if atomic.LoadInt32(p.readerDone) == 1 {
|
||||||
return fmt.Errorf("kafka reader done")
|
return fmt.Errorf("kafka reader done")
|
||||||
}
|
}
|
||||||
p.ackChMu.Lock()
|
|
||||||
p.ackCh <- map[string]map[int]int64{p.topic: {p.partition: p.offset}}
|
p.ackCh <- map[string]map[int]int64{p.topic: {p.partition: p.offset}}
|
||||||
p.ackChMu.Unlock()
|
|
||||||
if cerr := p.ackErr.Load(); cerr != nil {
|
if cerr := p.ackErr.Load(); cerr != nil {
|
||||||
return cerr.(error)
|
return cerr.(error)
|
||||||
}
|
}
|
||||||
@ -459,14 +456,32 @@ func (k *kBroker) commitLoop(generation *kafka.Generation, commitInterval time.D
|
|||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer func() {
|
defer func() {
|
||||||
for _, errCh := range errChs {
|
|
||||||
close(errCh)
|
|
||||||
}
|
|
||||||
close(commitDoneCh)
|
close(commitDoneCh)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
checkTicker := time.NewTicker(300 * time.Millisecond)
|
||||||
|
defer checkTicker.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
case <-checkTicker.C:
|
||||||
|
if atomic.LoadInt32(cntWait) == 0 {
|
||||||
|
//fmt.Printf("cntWait IS 0\n")
|
||||||
|
mapMu.Lock()
|
||||||
|
if len(offsets) > 0 {
|
||||||
|
if err := generation.CommitOffsets(offsets); err != nil {
|
||||||
|
for _, errCh := range errChs {
|
||||||
|
errCh <- err
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
mapMu.Unlock()
|
||||||
|
if k.opts.Logger.V(logger.TraceLevel) {
|
||||||
|
k.opts.Logger.Trace(k.opts.Context, "stop commit loop")
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
case ack := <-ackCh:
|
case ack := <-ackCh:
|
||||||
if k.opts.Logger.V(logger.TraceLevel) {
|
if k.opts.Logger.V(logger.TraceLevel) {
|
||||||
k.opts.Logger.Tracef(k.opts.Context, "new commit offsets: %v", ack)
|
k.opts.Logger.Tracef(k.opts.Context, "new commit offsets: %v", ack)
|
||||||
@ -510,7 +525,6 @@ func (k *kBroker) commitLoop(generation *kafka.Generation, commitInterval time.D
|
|||||||
if k.opts.Logger.V(logger.TraceLevel) {
|
if k.opts.Logger.V(logger.TraceLevel) {
|
||||||
k.opts.Logger.Trace(k.opts.Context, "stop commit loop")
|
k.opts.Logger.Trace(k.opts.Context, "stop commit loop")
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
//fmt.Printf("cntWait NOT 0\n")
|
//fmt.Printf("cntWait NOT 0\n")
|
||||||
@ -569,8 +583,6 @@ func (h *cgHandler) run(ctx context.Context) {
|
|||||||
h.brokerOpts.Logger.Trace(ctx, "start partition reader")
|
h.brokerOpts.Logger.Trace(ctx, "start partition reader")
|
||||||
}
|
}
|
||||||
|
|
||||||
var ackChMu sync.Mutex
|
|
||||||
|
|
||||||
td := DefaultStatsInterval
|
td := DefaultStatsInterval
|
||||||
if v, ok := h.brokerOpts.Context.Value(statsIntervalKey{}).(time.Duration); ok && td > 0 {
|
if v, ok := h.brokerOpts.Context.Value(statsIntervalKey{}).(time.Duration); ok && td > 0 {
|
||||||
td = v
|
td = v
|
||||||
@ -584,16 +596,11 @@ func (h *cgHandler) run(ctx context.Context) {
|
|||||||
defer func() {
|
defer func() {
|
||||||
atomic.AddInt32(h.cntWait, -1)
|
atomic.AddInt32(h.cntWait, -1)
|
||||||
|
|
||||||
ackChMu.Lock()
|
atomic.CompareAndSwapInt32(h.readerDone, 0, 1)
|
||||||
if atomic.CompareAndSwapInt32(h.readerDone, 0, 1) {
|
|
||||||
close(h.ackCh)
|
|
||||||
}
|
|
||||||
ackChMu.Unlock()
|
|
||||||
if err := h.reader.Close(); err != nil && h.brokerOpts.Logger.V(logger.ErrorLevel) {
|
if err := h.reader.Close(); err != nil && h.brokerOpts.Logger.V(logger.ErrorLevel) {
|
||||||
h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] reader close error: %v", err)
|
h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] reader close error: %v", err)
|
||||||
}
|
}
|
||||||
<-h.commitDoneCh
|
<-h.commitDoneCh
|
||||||
//fmt.Printf("<-h.commitDoneCh\n")
|
|
||||||
if h.brokerOpts.Logger.V(logger.TraceLevel) {
|
if h.brokerOpts.Logger.V(logger.TraceLevel) {
|
||||||
h.brokerOpts.Logger.Trace(ctx, "stop partition reader")
|
h.brokerOpts.Logger.Trace(ctx, "stop partition reader")
|
||||||
}
|
}
|
||||||
@ -642,7 +649,7 @@ func (h *cgHandler) run(ctx context.Context) {
|
|||||||
if h.subOpts.ErrorHandler != nil {
|
if h.subOpts.ErrorHandler != nil {
|
||||||
eh = h.subOpts.ErrorHandler
|
eh = h.subOpts.ErrorHandler
|
||||||
}
|
}
|
||||||
p := &publication{ackCh: h.ackCh, partition: msg.Partition, offset: msg.Offset + 1, topic: msg.Topic, msg: &broker.Message{}, readerDone: h.readerDone, ackChMu: ackChMu}
|
p := &publication{ackCh: h.ackCh, partition: msg.Partition, offset: msg.Offset + 1, topic: msg.Topic, msg: &broker.Message{}, readerDone: h.readerDone}
|
||||||
|
|
||||||
if h.subOpts.BodyOnly {
|
if h.subOpts.BodyOnly {
|
||||||
p.msg.Body = msg.Value
|
p.msg.Body = msg.Value
|
||||||
|
3
stats.go
3
stats.go
@ -2,7 +2,6 @@ package segmentio
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
kafka "github.com/segmentio/kafka-go"
|
kafka "github.com/segmentio/kafka-go"
|
||||||
@ -16,7 +15,7 @@ func readerStats(ctx context.Context, r *kafka.Reader, td time.Duration, m meter
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
fmt.Printf("done reader stats\n")
|
//fmt.Printf("done reader stats\n")
|
||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
if r == nil {
|
if r == nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user