try to fix ack err cases
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
parent
5934963d81
commit
83b037df20
2
go.mod
2
go.mod
@ -8,7 +8,7 @@ require (
|
|||||||
github.com/klauspost/compress v1.13.1 // indirect
|
github.com/klauspost/compress v1.13.1 // indirect
|
||||||
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
|
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
|
||||||
github.com/segmentio/kafka-go v0.4.17
|
github.com/segmentio/kafka-go v0.4.17
|
||||||
github.com/unistack-org/micro/v3 v3.5.3
|
github.com/unistack-org/micro/v3 v3.5.6
|
||||||
)
|
)
|
||||||
|
|
||||||
//replace github.com/unistack-org/micro/v3 => ../micro
|
//replace github.com/unistack-org/micro/v3 => ../micro
|
||||||
|
6
go.sum
6
go.sum
@ -1,11 +1,11 @@
|
|||||||
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
|
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
|
||||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
|
|
||||||
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
|
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
|
||||||
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
|
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
|
||||||
github.com/ef-ds/deque v1.0.4/go.mod h1:gXDnTC3yqvBcHbq2lcExjtAcVrOnJCbMcZXmuj8Z4tg=
|
github.com/ef-ds/deque v1.0.4/go.mod h1:gXDnTC3yqvBcHbq2lcExjtAcVrOnJCbMcZXmuj8Z4tg=
|
||||||
github.com/frankban/quicktest v1.11.3 h1:8sXhOn0uLys67V8EsXLc6eszDs8VXWxL3iRvebPhedY=
|
github.com/frankban/quicktest v1.11.3 h1:8sXhOn0uLys67V8EsXLc6eszDs8VXWxL3iRvebPhedY=
|
||||||
github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k=
|
github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k=
|
||||||
|
github.com/golang-jwt/jwt v3.2.1+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I=
|
||||||
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||||
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||||
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
|
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
|
||||||
@ -35,8 +35,8 @@ github.com/silas/dag v0.0.0-20210121180416-41cf55125c34/go.mod h1:7RTUFBdIRC9nZ7
|
|||||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||||
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
|
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
|
||||||
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||||
github.com/unistack-org/micro/v3 v3.5.3 h1:yb647rdyxKmzs8fwUm/YbyZupLfcYlZseJr/TpToW+4=
|
github.com/unistack-org/micro/v3 v3.5.6 h1:0ZRWRkJVm5uyMZ2kMqsPudyv57pnGSj2VYHVeAt6cNk=
|
||||||
github.com/unistack-org/micro/v3 v3.5.3/go.mod h1:1ZkwpEqpiHiVhM2hiF9DamtpsF04oFybFhEQ4zEMcro=
|
github.com/unistack-org/micro/v3 v3.5.6/go.mod h1:P8k8nuM0RYUdX6TNxO4yzq3tjtrvh11trhr79zWYeTM=
|
||||||
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
|
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
|
||||||
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
|
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
|
||||||
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
|
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
|
||||||
|
209
segmentio.go
209
segmentio.go
@ -59,13 +59,13 @@ func (p *publication) Message() *broker.Message {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *publication) Ack() error {
|
func (p *publication) Ack() error {
|
||||||
|
if cerr := p.ackErr.Load(); cerr != nil {
|
||||||
|
return cerr.(error)
|
||||||
|
}
|
||||||
if atomic.LoadInt32(p.readerDone) == 1 {
|
if atomic.LoadInt32(p.readerDone) == 1 {
|
||||||
return kafka.ErrGroupClosed
|
return kafka.ErrGroupClosed
|
||||||
}
|
}
|
||||||
p.ackCh <- map[string]map[int]int64{p.topic: {p.partition: p.offset}}
|
p.ackCh <- map[string]map[int]int64{p.topic: {p.partition: p.offset}}
|
||||||
if cerr := p.ackErr.Load(); cerr != nil {
|
|
||||||
return cerr.(error)
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -296,7 +296,6 @@ func (k *kBroker) Publish(ctx context.Context, topic string, msg *broker.Message
|
|||||||
return k.writer.WriteMessages(wCtx, kmsg)
|
return k.writer.WriteMessages(wCtx, kmsg)
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
func (k *kBroker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
func (k *kBroker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
||||||
opt := broker.NewSubscribeOptions(opts...)
|
opt := broker.NewSubscribeOptions(opts...)
|
||||||
|
|
||||||
@ -417,8 +416,8 @@ func (k *kBroker) BatchSubscribe(ctx context.Context, topic string, handler brok
|
|||||||
cntWait := int32(0)
|
cntWait := int32(0)
|
||||||
|
|
||||||
for topic, assignments := range generation.Assignments {
|
for topic, assignments := range generation.Assignments {
|
||||||
if k.opts.Logger.V(logger.TraceLevel) {
|
if k.opts.Logger.V(logger.DebugLevel) {
|
||||||
k.opts.Logger.Tracef(k.opts.Context, "topic: %s assignments: %v", topic, assignments)
|
k.opts.Logger.Debugf(k.opts.Context, "topic: %s assignments: %v", topic, assignments)
|
||||||
}
|
}
|
||||||
for _, assignment := range assignments {
|
for _, assignment := range assignments {
|
||||||
cfg := k.readerConfig
|
cfg := k.readerConfig
|
||||||
@ -440,11 +439,11 @@ func (k *kBroker) BatchSubscribe(ctx context.Context, topic string, handler brok
|
|||||||
}
|
}
|
||||||
errCh := make(chan error)
|
errCh := make(chan error)
|
||||||
errChs = append(errChs, errCh)
|
errChs = append(errChs, errCh)
|
||||||
cgh := &cgBatchHandler{
|
cgh := &cgHandler{
|
||||||
brokerOpts: k.opts,
|
brokerOpts: k.opts,
|
||||||
subOpts: opt,
|
subOpts: opt,
|
||||||
reader: reader,
|
reader: reader,
|
||||||
handler: handler,
|
batchhandler: handler,
|
||||||
ackCh: ackCh,
|
ackCh: ackCh,
|
||||||
errCh: errCh,
|
errCh: errCh,
|
||||||
cntWait: &cntWait,
|
cntWait: &cntWait,
|
||||||
@ -456,7 +455,7 @@ func (k *kBroker) BatchSubscribe(ctx context.Context, topic string, handler brok
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if k.opts.Logger.V(logger.TraceLevel) {
|
if k.opts.Logger.V(logger.TraceLevel) {
|
||||||
k.opts.Logger.Trace(k.opts.Context, "start async commit loop")
|
k.opts.Logger.Trace(k.opts.Context, "start commit loop")
|
||||||
}
|
}
|
||||||
// run async commit loop
|
// run async commit loop
|
||||||
go k.commitLoop(generation, k.readerConfig.CommitInterval, ackCh, errChs, &readerDone, commitDoneCh, &cntWait)
|
go k.commitLoop(generation, k.readerConfig.CommitInterval, ackCh, errChs, &readerDone, commitDoneCh, &cntWait)
|
||||||
@ -478,7 +477,6 @@ type cgBatchHandler struct {
|
|||||||
commitDoneCh chan bool
|
commitDoneCh chan bool
|
||||||
cntWait *int32
|
cntWait *int32
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
|
|
||||||
func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
||||||
opt := broker.NewSubscribeOptions(opts...)
|
opt := broker.NewSubscribeOptions(opts...)
|
||||||
@ -588,11 +586,13 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
k.opts.Meter.Counter("broker_reader_partitions", "topic", topic).Set(uint64(0))
|
||||||
ackCh := make(chan map[string]map[int]int64, DefaultCommitQueueSize)
|
ackCh := make(chan map[string]map[int]int64, DefaultCommitQueueSize)
|
||||||
errChLen := 0
|
errChLen := 0
|
||||||
for _, assignments := range generation.Assignments {
|
for _, assignments := range generation.Assignments {
|
||||||
errChLen += len(assignments)
|
errChLen += len(assignments)
|
||||||
}
|
}
|
||||||
|
|
||||||
errChs := make([]chan error, 0, errChLen)
|
errChs := make([]chan error, 0, errChLen)
|
||||||
|
|
||||||
commitDoneCh := make(chan bool)
|
commitDoneCh := make(chan bool)
|
||||||
@ -600,6 +600,8 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha
|
|||||||
cntWait := int32(0)
|
cntWait := int32(0)
|
||||||
|
|
||||||
for topic, assignments := range generation.Assignments {
|
for topic, assignments := range generation.Assignments {
|
||||||
|
k.opts.Meter.Counter("broker_reader_partitions", "topic", topic).Set(uint64(len(assignments)))
|
||||||
|
|
||||||
if k.opts.Logger.V(logger.TraceLevel) {
|
if k.opts.Logger.V(logger.TraceLevel) {
|
||||||
k.opts.Logger.Tracef(k.opts.Context, "topic: %s assignments: %v", topic, assignments)
|
k.opts.Logger.Tracef(k.opts.Context, "topic: %s assignments: %v", topic, assignments)
|
||||||
}
|
}
|
||||||
@ -655,6 +657,7 @@ type cgHandler struct {
|
|||||||
subOpts broker.SubscribeOptions
|
subOpts broker.SubscribeOptions
|
||||||
reader *kafka.Reader
|
reader *kafka.Reader
|
||||||
handler broker.Handler
|
handler broker.Handler
|
||||||
|
batchhandler broker.BatchHandler
|
||||||
ackCh chan map[string]map[int]int64
|
ackCh chan map[string]map[int]int64
|
||||||
errCh chan error
|
errCh chan error
|
||||||
readerDone *int32
|
readerDone *int32
|
||||||
@ -664,10 +667,6 @@ type cgHandler struct {
|
|||||||
|
|
||||||
func (k *kBroker) commitLoop(generation *kafka.Generation, commitInterval time.Duration, ackCh chan map[string]map[int]int64, errChs []chan error, readerDone *int32, commitDoneCh chan bool, cntWait *int32) {
|
func (k *kBroker) commitLoop(generation *kafka.Generation, commitInterval time.Duration, ackCh chan map[string]map[int]int64, errChs []chan error, readerDone *int32, commitDoneCh chan bool, cntWait *int32) {
|
||||||
|
|
||||||
if k.opts.Logger.V(logger.TraceLevel) {
|
|
||||||
k.opts.Logger.Trace(k.opts.Context, "start commit loop")
|
|
||||||
}
|
|
||||||
|
|
||||||
td := DefaultCommitInterval
|
td := DefaultCommitInterval
|
||||||
|
|
||||||
if commitInterval > 0 {
|
if commitInterval > 0 {
|
||||||
@ -710,7 +709,7 @@ func (k *kBroker) commitLoop(generation *kafka.Generation, commitInterval time.D
|
|||||||
}
|
}
|
||||||
case ack := <-ackCh:
|
case ack := <-ackCh:
|
||||||
if k.opts.Logger.V(logger.TraceLevel) {
|
if k.opts.Logger.V(logger.TraceLevel) {
|
||||||
k.opts.Logger.Tracef(k.opts.Context, "new commit offsets: %v", ack)
|
// k.opts.Logger.Tracef(k.opts.Context, "new commit offsets: %v", ack)
|
||||||
}
|
}
|
||||||
switch td {
|
switch td {
|
||||||
case 0: // sync commits as CommitInterval == 0
|
case 0: // sync commits as CommitInterval == 0
|
||||||
@ -802,7 +801,7 @@ func (k *kBroker) commitLoop(generation *kafka.Generation, commitInterval time.D
|
|||||||
|
|
||||||
func (h *cgHandler) run(ctx context.Context) {
|
func (h *cgHandler) run(ctx context.Context) {
|
||||||
if h.brokerOpts.Logger.V(logger.TraceLevel) {
|
if h.brokerOpts.Logger.V(logger.TraceLevel) {
|
||||||
h.brokerOpts.Logger.Trace(ctx, "start partition reader")
|
h.brokerOpts.Logger.Tracef(ctx, "start partition reader topic: %s partition: %d", h.reader.Config().Topic, h.reader.Config().Partition)
|
||||||
}
|
}
|
||||||
|
|
||||||
td := DefaultStatsInterval
|
td := DefaultStatsInterval
|
||||||
@ -820,20 +819,28 @@ func (h *cgHandler) run(ctx context.Context) {
|
|||||||
|
|
||||||
atomic.CompareAndSwapInt32(h.readerDone, 0, 1)
|
atomic.CompareAndSwapInt32(h.readerDone, 0, 1)
|
||||||
if err := h.reader.Close(); err != nil && h.brokerOpts.Logger.V(logger.ErrorLevel) {
|
if err := h.reader.Close(); err != nil && h.brokerOpts.Logger.V(logger.ErrorLevel) {
|
||||||
h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] reader close error: %v", err)
|
h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] reader for topic %s partition %d close error: %v", h.reader.Config().Topic, h.reader.Config().Partition, err)
|
||||||
}
|
}
|
||||||
<-h.commitDoneCh
|
<-h.commitDoneCh
|
||||||
if h.brokerOpts.Logger.V(logger.TraceLevel) {
|
if h.brokerOpts.Logger.V(logger.TraceLevel) {
|
||||||
h.brokerOpts.Logger.Trace(ctx, "stop partition reader")
|
h.brokerOpts.Logger.Tracef(ctx, "stop partition reader topic: %s partition: %d", h.reader.Config().Topic, h.reader.Config().Partition)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
/*
|
||||||
|
tc := time.NewTicker(3 * time.Second)
|
||||||
|
defer tc.Stop()
|
||||||
|
*/
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
// case <-tc.C:
|
||||||
|
// commitErr.Store(errors.New("my err"))
|
||||||
|
// return
|
||||||
case err := <-h.errCh:
|
case err := <-h.errCh:
|
||||||
if err != nil {
|
if err != nil {
|
||||||
commitErr.Store(err)
|
commitErr.Store(err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
@ -842,69 +849,77 @@ func (h *cgHandler) run(ctx context.Context) {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
msg, err := h.reader.ReadMessage(ctx)
|
select {
|
||||||
switch err {
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
default:
|
default:
|
||||||
if h.brokerOpts.Logger.V(logger.ErrorLevel) {
|
msg, err := h.reader.ReadMessage(ctx)
|
||||||
h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] unexpected error type: %T err: %v", err, err)
|
switch err {
|
||||||
}
|
default:
|
||||||
return
|
|
||||||
case kafka.ErrGenerationEnded:
|
|
||||||
// generation has ended
|
|
||||||
if h.brokerOpts.Logger.V(logger.TraceLevel) {
|
|
||||||
h.brokerOpts.Logger.Trace(h.brokerOpts.Context, "[segmentio] generation ended, rebalance or close")
|
|
||||||
}
|
|
||||||
return
|
|
||||||
case nil:
|
|
||||||
if cerr := commitErr.Load(); cerr != nil {
|
|
||||||
if h.brokerOpts.Logger.V(logger.ErrorLevel) {
|
if h.brokerOpts.Logger.V(logger.ErrorLevel) {
|
||||||
h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] commit error: %v", cerr)
|
h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] unexpected error type: %T err: %v", err, err)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
case kafka.ErrGenerationEnded:
|
||||||
|
// generation has ended
|
||||||
|
if h.brokerOpts.Logger.V(logger.TraceLevel) {
|
||||||
|
h.brokerOpts.Logger.Trace(h.brokerOpts.Context, "[segmentio] generation ended, rebalance or close")
|
||||||
|
}
|
||||||
|
return
|
||||||
|
case nil:
|
||||||
|
if cerr := commitErr.Load(); cerr != nil {
|
||||||
|
if h.brokerOpts.Logger.V(logger.ErrorLevel) {
|
||||||
|
h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] commit error: %v", cerr)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
eh := h.brokerOpts.ErrorHandler
|
eh := h.brokerOpts.ErrorHandler
|
||||||
|
|
||||||
if h.subOpts.ErrorHandler != nil {
|
if h.subOpts.ErrorHandler != nil {
|
||||||
eh = h.subOpts.ErrorHandler
|
eh = h.subOpts.ErrorHandler
|
||||||
}
|
}
|
||||||
p := &publication{ackCh: h.ackCh, partition: msg.Partition, offset: msg.Offset + 1, topic: msg.Topic, msg: &broker.Message{}, readerDone: h.readerDone}
|
p := &publication{ackCh: h.ackCh, partition: msg.Partition, offset: msg.Offset + 1, topic: msg.Topic, msg: &broker.Message{}, readerDone: h.readerDone}
|
||||||
|
|
||||||
if h.subOpts.BodyOnly {
|
if h.subOpts.BodyOnly {
|
||||||
p.msg.Body = msg.Value
|
|
||||||
} else {
|
|
||||||
if err := h.brokerOpts.Codec.Unmarshal(msg.Value, p.msg); err != nil {
|
|
||||||
p.SetError(err)
|
|
||||||
p.msg.Body = msg.Value
|
p.msg.Body = msg.Value
|
||||||
|
} else {
|
||||||
|
if err := h.brokerOpts.Codec.Unmarshal(msg.Value, p.msg); err != nil {
|
||||||
|
p.SetError(err)
|
||||||
|
p.msg.Body = msg.Value
|
||||||
|
if eh != nil {
|
||||||
|
_ = eh(p)
|
||||||
|
} else {
|
||||||
|
if h.brokerOpts.Logger.V(logger.ErrorLevel) {
|
||||||
|
h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio]: failed to unmarshal: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if cerr := commitErr.Load(); cerr != nil {
|
||||||
|
if h.brokerOpts.Logger.V(logger.ErrorLevel) {
|
||||||
|
h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] commit error: %v", cerr)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
err = h.handler(p)
|
||||||
|
if err == nil && h.subOpts.AutoAck {
|
||||||
|
if err = p.Ack(); err != nil {
|
||||||
|
if h.brokerOpts.Logger.V(logger.ErrorLevel) {
|
||||||
|
h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio]: message ack error: %v", err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
} else if err != nil {
|
||||||
|
p.err = err
|
||||||
if eh != nil {
|
if eh != nil {
|
||||||
_ = eh(p)
|
_ = eh(p)
|
||||||
} else {
|
} else {
|
||||||
if h.brokerOpts.Logger.V(logger.ErrorLevel) {
|
if h.brokerOpts.Logger.V(logger.ErrorLevel) {
|
||||||
h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio]: failed to unmarshal: %v", err)
|
h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio]: subscriber error: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if cerr := commitErr.Load(); cerr != nil {
|
|
||||||
p.ackErr.Store(cerr.(bool))
|
|
||||||
}
|
|
||||||
err = h.handler(p)
|
|
||||||
if err == nil && h.subOpts.AutoAck {
|
|
||||||
if err = p.Ack(); err != nil {
|
|
||||||
if h.brokerOpts.Logger.V(logger.ErrorLevel) {
|
|
||||||
h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio]: message ack error: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else if err != nil {
|
|
||||||
p.err = err
|
|
||||||
if eh != nil {
|
|
||||||
_ = eh(p)
|
|
||||||
} else {
|
|
||||||
if h.brokerOpts.Logger.V(logger.ErrorLevel) {
|
|
||||||
h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio]: subscriber error: %v", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -912,16 +927,14 @@ func (h *cgHandler) run(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (sub *subscriber) createGroup(ctx context.Context) {
|
func (sub *subscriber) createGroup(ctx context.Context) {
|
||||||
sub.RLock()
|
|
||||||
cgcfg := sub.cgcfg
|
|
||||||
sub.RUnlock()
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
// closed
|
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
|
sub.RLock()
|
||||||
|
cgcfg := sub.cgcfg
|
||||||
|
sub.RUnlock()
|
||||||
cgroup, err := kafka.NewConsumerGroup(cgcfg)
|
cgroup, err := kafka.NewConsumerGroup(cgcfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if sub.brokerOpts.Logger.V(logger.ErrorLevel) {
|
if sub.brokerOpts.Logger.V(logger.ErrorLevel) {
|
||||||
@ -932,7 +945,6 @@ func (sub *subscriber) createGroup(ctx context.Context) {
|
|||||||
sub.Lock()
|
sub.Lock()
|
||||||
sub.group = cgroup
|
sub.group = cgroup
|
||||||
sub.Unlock()
|
sub.Unlock()
|
||||||
// return
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -989,30 +1001,43 @@ func (k *kBroker) configure(opts ...broker.Option) error {
|
|||||||
}
|
}
|
||||||
k.addrs = cAddrs
|
k.addrs = cAddrs
|
||||||
k.readerConfig = readerConfig
|
k.readerConfig = readerConfig
|
||||||
k.writer = &kafka.Writer{
|
k.writerConfig = writerConfig
|
||||||
Addr: kafka.TCP(k.addrs...),
|
|
||||||
Balancer: writerConfig.Balancer,
|
if k.readerConfig.Dialer == nil {
|
||||||
MaxAttempts: writerConfig.MaxAttempts,
|
k.readerConfig.Dialer = kafka.DefaultDialer
|
||||||
BatchSize: writerConfig.BatchSize,
|
}
|
||||||
BatchBytes: int64(writerConfig.BatchBytes),
|
if k.writerConfig.Dialer == nil {
|
||||||
BatchTimeout: writerConfig.BatchTimeout,
|
k.writerConfig.Dialer = kafka.DefaultDialer
|
||||||
ReadTimeout: writerConfig.ReadTimeout,
|
|
||||||
WriteTimeout: writerConfig.WriteTimeout,
|
|
||||||
RequiredAcks: kafka.RequiredAcks(writerConfig.RequiredAcks),
|
|
||||||
Async: writerConfig.Async,
|
|
||||||
//Completion: writerConfig.Completion,
|
|
||||||
//Compression: writerConfig.Compression,
|
|
||||||
Logger: writerConfig.Logger,
|
|
||||||
ErrorLogger: writerConfig.ErrorLogger,
|
|
||||||
//Transport: writerConfig.Transport,
|
|
||||||
}
|
}
|
||||||
if id, ok := k.opts.Context.Value(clientIDKey{}).(string); ok {
|
if id, ok := k.opts.Context.Value(clientIDKey{}).(string); ok {
|
||||||
if k.readerConfig.Dialer == nil {
|
k.writerConfig.Dialer.ClientID = id
|
||||||
k.readerConfig.Dialer = kafka.DefaultDialer
|
|
||||||
}
|
|
||||||
k.readerConfig.Dialer.ClientID = id
|
k.readerConfig.Dialer.ClientID = id
|
||||||
}
|
}
|
||||||
|
|
||||||
|
k.writer = &kafka.Writer{
|
||||||
|
Addr: kafka.TCP(k.addrs...),
|
||||||
|
Balancer: k.writerConfig.Balancer,
|
||||||
|
MaxAttempts: k.writerConfig.MaxAttempts,
|
||||||
|
BatchSize: k.writerConfig.BatchSize,
|
||||||
|
BatchBytes: int64(k.writerConfig.BatchBytes),
|
||||||
|
BatchTimeout: k.writerConfig.BatchTimeout,
|
||||||
|
ReadTimeout: k.writerConfig.ReadTimeout,
|
||||||
|
WriteTimeout: k.writerConfig.WriteTimeout,
|
||||||
|
RequiredAcks: kafka.RequiredAcks(k.writerConfig.RequiredAcks),
|
||||||
|
Async: k.writerConfig.Async,
|
||||||
|
//Completion: writerConfig.Completion,
|
||||||
|
//Compression: writerConfig.Compression,
|
||||||
|
Logger: k.writerConfig.Logger,
|
||||||
|
ErrorLogger: k.writerConfig.ErrorLogger,
|
||||||
|
Transport: &kafka.Transport{
|
||||||
|
Dial: k.writerConfig.Dialer.DialFunc,
|
||||||
|
ClientID: k.writerConfig.Dialer.ClientID,
|
||||||
|
IdleTimeout: time.Second * 5,
|
||||||
|
MetadataTTL: time.Second * 9,
|
||||||
|
SASL: k.writerConfig.Dialer.SASLMechanism,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
if fn, ok := k.opts.Context.Value(writerCompletionFunc{}).(func([]kafka.Message, error)); ok {
|
if fn, ok := k.opts.Context.Value(writerCompletionFunc{}).(func([]kafka.Message, error)); ok {
|
||||||
k.writer.Completion = fn
|
k.writer.Completion = fn
|
||||||
}
|
}
|
||||||
|
15
stats.go
15
stats.go
@ -2,6 +2,7 @@ package segmentio
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
kafka "github.com/segmentio/kafka-go"
|
kafka "github.com/segmentio/kafka-go"
|
||||||
@ -10,7 +11,14 @@ import (
|
|||||||
|
|
||||||
func readerStats(ctx context.Context, r *kafka.Reader, td time.Duration, m meter.Meter) {
|
func readerStats(ctx context.Context, r *kafka.Reader, td time.Duration, m meter.Meter) {
|
||||||
ticker := time.NewTicker(td)
|
ticker := time.NewTicker(td)
|
||||||
defer ticker.Stop()
|
var once sync.Once
|
||||||
|
|
||||||
|
onceLabels := make([]string, 0, 4)
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
ticker.Stop()
|
||||||
|
m.Counter("broker_reader_count", onceLabels...).Add(int(-1))
|
||||||
|
}()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
@ -22,7 +30,10 @@ func readerStats(ctx context.Context, r *kafka.Reader, td time.Duration, m meter
|
|||||||
}
|
}
|
||||||
rstats := r.Stats()
|
rstats := r.Stats()
|
||||||
labels := []string{"topic", rstats.Topic, "partition", rstats.Partition, "client_id", rstats.ClientID}
|
labels := []string{"topic", rstats.Topic, "partition", rstats.Partition, "client_id", rstats.ClientID}
|
||||||
|
once.Do(func() {
|
||||||
|
onceLabels = []string{"topic", rstats.Topic, "client_id", rstats.ClientID}
|
||||||
|
m.Counter("broker_reader_count", onceLabels...).Add(int(1))
|
||||||
|
})
|
||||||
m.Counter("broker_reader_dial_count", labels...).Add(int(rstats.Dials))
|
m.Counter("broker_reader_dial_count", labels...).Add(int(rstats.Dials))
|
||||||
m.Counter("broker_reader_fetch_count", labels...).Add(int(rstats.Fetches))
|
m.Counter("broker_reader_fetch_count", labels...).Add(int(rstats.Fetches))
|
||||||
m.Counter("broker_reader_message_count", labels...).Add(int(rstats.Messages))
|
m.Counter("broker_reader_message_count", labels...).Add(int(rstats.Messages))
|
||||||
|
Loading…
Reference in New Issue
Block a user