speedup consumer commit offsets

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2021-07-18 13:28:13 +03:00
parent 17404ee935
commit b15586163e
6 changed files with 222 additions and 101 deletions

View File

@ -23,7 +23,10 @@ func TestPubSub(t *testing.T) {
t.Skip() t.Skip()
} }
logger.DefaultLogger.Init(logger.WithLevel(logger.TraceLevel)) if err := logger.DefaultLogger.Init(logger.WithLevel(logger.TraceLevel)); err != nil {
t.Fatal(err)
}
ctx := context.Background() ctx := context.Background()
var addrs []string var addrs []string
@ -33,7 +36,7 @@ func TestPubSub(t *testing.T) {
addrs = strings.Split(addr, ",") addrs = strings.Split(addr, ",")
} }
b := segmentio.NewBroker(broker.Addrs(addrs...)) b := segmentio.NewBroker(broker.Addrs(addrs...), segmentio.ClientID("test"))
if err := b.Init(); err != nil { if err := b.Init(); err != nil {
t.Fatal(err) t.Fatal(err)
} }

9
go.mod
View File

@ -3,7 +3,10 @@ module github.com/unistack-org/micro-broker-segmentio/v3
go 1.16 go 1.16
require ( require (
github.com/google/uuid v1.2.0 github.com/golang/snappy v0.0.4 // indirect
github.com/segmentio/kafka-go v0.4.16 github.com/google/uuid v1.3.0
github.com/unistack-org/micro/v3 v3.4.7 github.com/klauspost/compress v1.13.1 // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
github.com/segmentio/kafka-go v0.4.17
github.com/unistack-org/micro/v3 v3.4.9
) )

13
go.sum
View File

@ -8,13 +8,20 @@ github.com/frankban/quicktest v1.11.3 h1:8sXhOn0uLys67V8EsXLc6eszDs8VXWxL3iRvebP
github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M= github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/klauspost/compress v1.9.8 h1:VMAMUUOh+gaxKTMk+zqbjsSjsIcUcL/LF4o63i82QyA= github.com/klauspost/compress v1.9.8 h1:VMAMUUOh+gaxKTMk+zqbjsSjsIcUcL/LF4o63i82QyA=
github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.13.1 h1:wXr2uRxZTJXHLly6qhJabee5JqIhTRoLBhDOA74hDEQ=
github.com/klauspost/compress v1.13.1/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
@ -23,16 +30,22 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A= github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A=
github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM=
github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/segmentio/kafka-go v0.4.16 h1:9dt78ehM9qzAkekA60D6A96RlqDzC3hnYYa8y5Szd+U= github.com/segmentio/kafka-go v0.4.16 h1:9dt78ehM9qzAkekA60D6A96RlqDzC3hnYYa8y5Szd+U=
github.com/segmentio/kafka-go v0.4.16/go.mod h1:19+Eg7KwrNKy/PFhiIthEPkO8k+ac7/ZYXwYM9Df10w= github.com/segmentio/kafka-go v0.4.16/go.mod h1:19+Eg7KwrNKy/PFhiIthEPkO8k+ac7/ZYXwYM9Df10w=
github.com/segmentio/kafka-go v0.4.17 h1:IyqRstL9KUTDb3kyGPOOa5VffokKWSEzN6geJ92dSDY=
github.com/segmentio/kafka-go v0.4.17/go.mod h1:19+Eg7KwrNKy/PFhiIthEPkO8k+ac7/ZYXwYM9Df10w=
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= github.com/silas/dag v0.0.0-20210121180416-41cf55125c34/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/unistack-org/micro/v3 v3.4.7 h1:zmGFx2J6tIbmr4IGLcc+LNtbftQFZI42bfuNV5xNYM0= github.com/unistack-org/micro/v3 v3.4.7 h1:zmGFx2J6tIbmr4IGLcc+LNtbftQFZI42bfuNV5xNYM0=
github.com/unistack-org/micro/v3 v3.4.7/go.mod h1:LXmPfbJnJNvL0kQs8HfnkV3Wya2Wb+C7keVq++RCZnk= github.com/unistack-org/micro/v3 v3.4.7/go.mod h1:LXmPfbJnJNvL0kQs8HfnkV3Wya2Wb+C7keVq++RCZnk=
github.com/unistack-org/micro/v3 v3.4.9 h1:IBCW/yxQijO/X+2zNzTdSsvzlgE0+y49bvjWemtY2zA=
github.com/unistack-org/micro/v3 v3.4.9/go.mod h1:1ZkwpEqpiHiVhM2hiF9DamtpsF04oFybFhEQ4zEMcro=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0= github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=

View File

@ -13,6 +13,8 @@ var (
DefaultReaderConfig = kafka.ReaderConfig{} DefaultReaderConfig = kafka.ReaderConfig{}
DefaultWriterConfig = kafka.WriterConfig{} DefaultWriterConfig = kafka.WriterConfig{}
DefaultStatsInterval = time.Second * 10 DefaultStatsInterval = time.Second * 10
DefaultCommitInterval = time.Second * 2
DefaultCommitQueueSize = 2000
) )
type readerConfigKey struct{} type readerConfigKey struct{}
@ -66,3 +68,15 @@ type writerCompletionFunc struct{}
func WriterCompletionFunc(fn func([]kafka.Message, error)) broker.Option { func WriterCompletionFunc(fn func([]kafka.Message, error)) broker.Option {
return broker.SetOption(writerCompletionFunc{}, fn) return broker.SetOption(writerCompletionFunc{}, fn)
} }
type clientIDKey struct{}
func ClientID(id string) broker.Option {
return broker.SetOption(clientIDKey{}, id)
}
type commitIntervalKey struct{}
func CommitInterval(td time.Duration) broker.Option {
return broker.SetOption(commitIntervalKey{}, td)
}

View File

@ -28,14 +28,8 @@ type kBroker struct {
} }
type subscriber struct { type subscriber struct {
k *kBroker
topic string topic string
opts broker.SubscribeOptions opts broker.SubscribeOptions
offset int64
gen *kafka.Generation
partition int
handler broker.Handler
reader *kafka.Reader
closed bool closed bool
done chan struct{} done chan struct{}
group *kafka.ConsumerGroup group *kafka.ConsumerGroup
@ -46,14 +40,13 @@ type subscriber struct {
type publication struct { type publication struct {
topic string topic string
partition int
offset int64
err error err error
m *broker.Message ackErr *error
opts broker.Options msg *broker.Message
ctx context.Context ackCh chan map[string]map[int]int64
generation *kafka.Generation sync.Mutex
reader *kafka.Reader
km kafka.Message
offsets map[string]map[int]int64 // for commit offsets
} }
func (p *publication) Topic() string { func (p *publication) Topic() string {
@ -61,14 +54,12 @@ func (p *publication) Topic() string {
} }
func (p *publication) Message() *broker.Message { func (p *publication) Message() *broker.Message {
return p.m return p.msg
} }
func (p *publication) Ack() error { func (p *publication) Ack() error {
if p.opts.Logger.V(logger.TraceLevel) { p.ackCh <- map[string]map[int]int64{p.topic: {p.partition: p.offset}}
p.opts.Logger.Tracef(p.opts.Context, "commit offset %#+v\n", p.offsets) return *p.ackErr
}
return p.generation.CommitOffsets(p.offsets)
} }
func (p *publication) Error() error { func (p *publication) Error() error {
@ -276,7 +267,6 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha
Logger: k.readerConfig.Logger, Logger: k.readerConfig.Logger,
ErrorLogger: k.readerConfig.ErrorLogger, ErrorLogger: k.readerConfig.ErrorLogger,
} }
cgcfg.StartOffset = kafka.LastOffset
if err := cgcfg.Validate(); err != nil { if err := cgcfg.Validate(); err != nil {
return nil, err return nil, err
} }
@ -333,7 +323,7 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha
sub.RUnlock() sub.RUnlock()
if !closed { if !closed {
if k.opts.Logger.V(logger.ErrorLevel) { if k.opts.Logger.V(logger.ErrorLevel) {
k.opts.Logger.Errorf(k.opts.Context, "[segmentio] recreate consumer group, as it closed %v", k.opts.Context.Err()) k.opts.Logger.Errorf(k.opts.Context, "[segmentio] recreate consumer group, as it closed by kafka %v", k.opts.Context.Err())
} }
if err = group.Close(); err != nil && k.opts.Logger.V(logger.ErrorLevel) { if err = group.Close(); err != nil && k.opts.Logger.V(logger.ErrorLevel) {
k.opts.Logger.Errorf(k.opts.Context, "[segmentio] consumer group close error %v", err) k.opts.Logger.Errorf(k.opts.Context, "[segmentio] consumer group close error %v", err)
@ -349,35 +339,52 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha
sub.RUnlock() sub.RUnlock()
if !closed { if !closed {
if k.opts.Logger.V(logger.TraceLevel) { if k.opts.Logger.V(logger.TraceLevel) {
k.opts.Logger.Tracef(k.opts.Context, "[segmentio] recreate consumer group, as unexpected consumer error %v", err) k.opts.Logger.Tracef(k.opts.Context, "[segmentio] recreate consumer group, as unexpected consumer error %T %v", err, err)
} }
} }
if err = group.Close(); err != nil && k.opts.Logger.V(logger.ErrorLevel) { if err = group.Close(); err != nil && k.opts.Logger.V(logger.ErrorLevel) {
k.opts.Logger.Errorf(k.opts.Context, "[segmentio] consumer group close error %v", err) k.opts.Logger.Errorf(k.opts.Context, "[segmentio] consumer group close error %v", err)
} }
sub.createGroup(k.opts.Context) sub.createGroup(gCtx)
continue continue
} }
for _, t := range cgcfg.Topics { var wg sync.WaitGroup
assignments := generation.Assignments[t]
ackCh := make(chan map[string]map[int]int64, DefaultCommitQueueSize)
errChLen := 0
for _, assignments := range generation.Assignments {
errChLen += len(assignments)
}
errChs := make([]chan error, errChLen)
for topic, assignments := range generation.Assignments {
if k.opts.Logger.V(logger.TraceLevel) {
k.opts.Logger.Tracef(k.opts.Context, "topic: %s assignments: %v", topic, assignments)
}
for _, assignment := range assignments { for _, assignment := range assignments {
errCh := make(chan error)
cfg := k.readerConfig cfg := k.readerConfig
cfg.Topic = t cfg.Topic = topic
cfg.Partition = assignment.ID cfg.Partition = assignment.ID
cfg.GroupID = "" cfg.GroupID = ""
// break reading
reader := kafka.NewReader(cfg) reader := kafka.NewReader(cfg)
if k.opts.Logger.V(logger.TraceLevel) {
k.opts.Logger.Tracef(k.opts.Context, "[segmentio] reader current offset: %v new offset: %v", reader.Offset(), assignment.Offset)
}
reader.SetOffset(assignment.Offset)
cgh := &cgHandler{generation: generation, brokerOpts: k.opts, subOpts: opt, reader: reader, handler: handler} if err := reader.SetOffset(assignment.Offset); err != nil {
if k.opts.Logger.V(logger.ErrorLevel) {
k.opts.Logger.Errorf(k.opts.Context, "assignments offset %d can be set by reader: %v", assignment.Offset, err)
}
continue
}
cgh := &cgHandler{brokerOpts: k.opts, subOpts: opt, reader: reader, handler: handler, ackCh: ackCh, errCh: errCh, wg: &wg}
generation.Start(cgh.run) generation.Start(cgh.run)
} }
} }
if k.opts.Logger.V(logger.TraceLevel) {
k.opts.Logger.Trace(k.opts.Context, "start async commit loop")
}
// run async commit loop
go k.commitLoop(generation, ackCh, errChs, &wg)
} }
} }
}() }()
@ -386,18 +393,72 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha
} }
type cgHandler struct { type cgHandler struct {
topic string
generation *kafka.Generation
brokerOpts broker.Options brokerOpts broker.Options
subOpts broker.SubscribeOptions subOpts broker.SubscribeOptions
reader *kafka.Reader reader *kafka.Reader
handler broker.Handler handler broker.Handler
ackCh chan map[string]map[int]int64
errCh chan error
wg *sync.WaitGroup
}
func (k *kBroker) commitLoop(generation *kafka.Generation, ackCh chan map[string]map[int]int64, errChs []chan error, wg *sync.WaitGroup) {
var mapMu sync.Mutex
td := DefaultCommitInterval
if v, ok := k.opts.Context.Value(commitIntervalKey{}).(time.Duration); ok && td > 0 {
td = v
}
ticker := time.NewTicker(td)
defer ticker.Stop()
offsets := make(map[string]map[int]int64, 4)
for {
select {
default:
wg.Wait()
if k.opts.Logger.V(logger.TraceLevel) {
k.opts.Logger.Trace(k.opts.Context, "all readers are done, return from commit loop")
}
return
case ack := <-ackCh:
mapMu.Lock()
for k, v := range ack {
if _, ok := offsets[k]; !ok {
offsets[k] = make(map[int]int64, 4)
}
for p, o := range v {
offsets[k][p] = o + 1
}
}
mapMu.Unlock()
case <-ticker.C:
mapMu.Lock()
if len(offsets) == 0 {
mapMu.Unlock()
continue
}
if k.opts.Logger.V(logger.TraceLevel) {
k.opts.Logger.Tracef(k.opts.Context, "commit offsets: %v", offsets)
}
err := generation.CommitOffsets(offsets)
if err != nil {
for _, errCh := range errChs {
errCh <- err
close(errCh)
}
mapMu.Unlock()
return
}
mapMu.Unlock()
offsets = make(map[string]map[int]int64, 4)
}
}
} }
func (h *cgHandler) run(ctx context.Context) { func (h *cgHandler) run(ctx context.Context) {
offsets := make(map[string]map[int]int64)
offsets[h.reader.Config().Topic] = make(map[int]int64)
td := DefaultStatsInterval td := DefaultStatsInterval
if v, ok := h.brokerOpts.Context.Value(statsIntervalKey{}).(time.Duration); ok && td > 0 { if v, ok := h.brokerOpts.Context.Value(statsIntervalKey{}).(time.Duration); ok && td > 0 {
td = v td = v
@ -405,27 +466,46 @@ func (h *cgHandler) run(ctx context.Context) {
go readerStats(ctx, h.reader, td, h.brokerOpts.Meter) go readerStats(ctx, h.reader, td, h.brokerOpts.Meter)
commitDuration := DefaultCommitInterval
if v, ok := h.brokerOpts.Context.Value(commitIntervalKey{}).(time.Duration); ok && td > 0 {
commitDuration = v
}
var commitErr error
h.wg.Add(1)
defer func() { defer func() {
h.wg.Done()
if err := h.reader.Close(); err != nil && h.brokerOpts.Logger.V(logger.ErrorLevel) { if err := h.reader.Close(); err != nil && h.brokerOpts.Logger.V(logger.ErrorLevel) {
h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] reader close error: %v", err) h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] reader close error: %v", err)
} }
}() }()
go func() {
for { for {
select { select {
case err := <-h.errCh:
commitErr = err
case <-ctx.Done(): case <-ctx.Done():
time.Sleep(commitDuration)
return return
default: }
}
}()
for {
msg, err := h.reader.ReadMessage(ctx) msg, err := h.reader.ReadMessage(ctx)
switch err { switch err {
default: default:
if h.brokerOpts.Logger.V(logger.ErrorLevel) { if h.brokerOpts.Logger.V(logger.ErrorLevel) {
h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] unexpected error: %v", err) h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] unexpected error type: %T err: %v", err, err)
} }
return return
case kafka.ErrGenerationEnded: case kafka.ErrGenerationEnded:
// generation has ended // generation has ended
if h.brokerOpts.Logger.V(logger.TraceLevel) { if h.brokerOpts.Logger.V(logger.TraceLevel) {
h.brokerOpts.Logger.Trace(h.brokerOpts.Context, "[segmentio] generation ended") h.brokerOpts.Logger.Trace(h.brokerOpts.Context, "[segmentio] generation ended, rebalance")
} }
return return
case nil: case nil:
@ -434,19 +514,16 @@ func (h *cgHandler) run(ctx context.Context) {
if h.subOpts.ErrorHandler != nil { if h.subOpts.ErrorHandler != nil {
eh = h.subOpts.ErrorHandler eh = h.subOpts.ErrorHandler
} }
offsets[msg.Topic][msg.Partition] = msg.Offset + 1 p := &publication{ackCh: h.ackCh, partition: msg.Partition, offset: msg.Offset, topic: msg.Topic, msg: &broker.Message{}}
// github.com/segmentio/kafka-go/commit.go makeCommit builds commit message with offset + 1
// zookeeper store offset which needs to be sent on new consumer, so current + 1
p := &publication{topic: msg.Topic, opts: h.brokerOpts, generation: h.generation, m: &broker.Message{}, offsets: offsets}
if h.subOpts.BodyOnly { if h.subOpts.BodyOnly {
p.m.Body = msg.Value p.msg.Body = msg.Value
} else { } else {
if err := h.brokerOpts.Codec.Unmarshal(msg.Value, p.m); err != nil { if err := h.brokerOpts.Codec.Unmarshal(msg.Value, p.msg); err != nil {
p.err = err p.err = err
p.m.Body = msg.Value p.msg.Body = msg.Value
if eh != nil { if eh != nil {
eh(p) _ = eh(p)
} else { } else {
if h.brokerOpts.Logger.V(logger.ErrorLevel) { if h.brokerOpts.Logger.V(logger.ErrorLevel) {
h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio]: failed to unmarshal: %v", err) h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio]: failed to unmarshal: %v", err)
@ -455,17 +532,21 @@ func (h *cgHandler) run(ctx context.Context) {
continue continue
} }
} }
p.Lock()
p.ackErr = &commitErr
p.Unlock()
err = h.handler(p) err = h.handler(p)
if err == nil && h.subOpts.AutoAck { if err == nil && h.subOpts.AutoAck {
if err = p.Ack(); err != nil { if err = p.Ack(); err != nil {
if h.brokerOpts.Logger.V(logger.ErrorLevel) { if h.brokerOpts.Logger.V(logger.ErrorLevel) {
h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio]: unable to commit msg: %v", err) h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio]: unable to commit msg: %v", err)
return
} }
} }
} else if err != nil { } else if err != nil {
p.err = err p.err = err
if eh != nil { if eh != nil {
eh(p) _ = eh(p)
} else { } else {
if h.brokerOpts.Logger.V(logger.ErrorLevel) { if h.brokerOpts.Logger.V(logger.ErrorLevel) {
h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio]: subscriber error: %v", err) h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio]: subscriber error: %v", err)
@ -475,7 +556,6 @@ func (h *cgHandler) run(ctx context.Context) {
} }
} }
} }
}
func (sub *subscriber) createGroup(ctx context.Context) { func (sub *subscriber) createGroup(ctx context.Context) {
sub.RLock() sub.RLock()
@ -572,6 +652,12 @@ func (k *kBroker) configure(opts ...broker.Option) error {
ErrorLogger: writerConfig.ErrorLogger, ErrorLogger: writerConfig.ErrorLogger,
//Transport: writerConfig.Transport, //Transport: writerConfig.Transport,
} }
if id, ok := k.opts.Context.Value(clientIDKey{}).(string); ok {
if k.readerConfig.Dialer == nil {
k.readerConfig.Dialer = kafka.DefaultDialer
}
k.readerConfig.Dialer.ClientID = id
}
if fn, ok := k.opts.Context.Value(writerCompletionFunc{}).(func([]kafka.Message, error)); ok { if fn, ok := k.opts.Context.Value(writerCompletionFunc{}).(func([]kafka.Message, error)); ok {
k.writer.Completion = fn k.writer.Completion = fn

View File

@ -14,7 +14,10 @@ import (
func TestSegmentioSubscribe(t *testing.T) { func TestSegmentioSubscribe(t *testing.T) {
ctx := context.Background() ctx := context.Background()
logger.DefaultLogger.Init(logger.WithLevel(logger.TraceLevel)) if err := logger.DefaultLogger.Init(logger.WithLevel(logger.TraceLevel)); err != nil {
t.Fatal(err)
}
if tr := os.Getenv("INTEGRATION_TESTS"); len(tr) > 0 { if tr := os.Getenv("INTEGRATION_TESTS"); len(tr) > 0 {
t.Skip() t.Skip()
} }
@ -43,7 +46,6 @@ func TestSegmentioSubscribe(t *testing.T) {
done := make(chan struct{}, 100) done := make(chan struct{}, 100)
fn := func(msg broker.Event) error { fn := func(msg broker.Event) error {
if err := msg.Ack(); err != nil { if err := msg.Ack(); err != nil {
panic(err)
return err return err
} }
done <- struct{}{} done <- struct{}{}
@ -179,7 +181,7 @@ func BenchmarkSegmentioCodecJsonSubscribe(b *testing.B) {
return return
} }
if err := brk.Publish(ctx, "test_topic", bm); err != nil { if err := brk.Publish(ctx, "test_topic", bm); err != nil {
b.Fatal(err) panic(err)
} }
} }
}() }()