From 83b037df206f72fde120a770bb29ffc6fcb122ab Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Thu, 29 Jul 2021 23:50:26 +0300 Subject: [PATCH] try to fix ack err cases Signed-off-by: Vasiliy Tolstov --- go.mod | 2 +- go.sum | 6 +- segmentio.go | 209 ++++++++++++++++++++++++++++----------------------- stats.go | 15 +++- 4 files changed, 134 insertions(+), 98 deletions(-) diff --git a/go.mod b/go.mod index 661ffe1..46d8336 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/klauspost/compress v1.13.1 // indirect github.com/pierrec/lz4 v2.6.1+incompatible // indirect github.com/segmentio/kafka-go v0.4.17 - github.com/unistack-org/micro/v3 v3.5.3 + github.com/unistack-org/micro/v3 v3.5.6 ) //replace github.com/unistack-org/micro/v3 => ../micro diff --git a/go.sum b/go.sum index 70e25db..a9c3200 100644 --- a/go.sum +++ b/go.sum @@ -1,11 +1,11 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/ef-ds/deque v1.0.4/go.mod h1:gXDnTC3yqvBcHbq2lcExjtAcVrOnJCbMcZXmuj8Z4tg= github.com/frankban/quicktest v1.11.3 h1:8sXhOn0uLys67V8EsXLc6eszDs8VXWxL3iRvebPhedY= github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= +github.com/golang-jwt/jwt v3.2.1+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= @@ -35,8 +35,8 @@ github.com/silas/dag v0.0.0-20210121180416-41cf55125c34/go.mod h1:7RTUFBdIRC9nZ7 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/unistack-org/micro/v3 v3.5.3 h1:yb647rdyxKmzs8fwUm/YbyZupLfcYlZseJr/TpToW+4= -github.com/unistack-org/micro/v3 v3.5.3/go.mod h1:1ZkwpEqpiHiVhM2hiF9DamtpsF04oFybFhEQ4zEMcro= +github.com/unistack-org/micro/v3 v3.5.6 h1:0ZRWRkJVm5uyMZ2kMqsPudyv57pnGSj2VYHVeAt6cNk= +github.com/unistack-org/micro/v3 v3.5.6/go.mod h1:P8k8nuM0RYUdX6TNxO4yzq3tjtrvh11trhr79zWYeTM= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0= diff --git a/segmentio.go b/segmentio.go index c8bdfc3..109686e 100644 --- a/segmentio.go +++ b/segmentio.go @@ -59,13 +59,13 @@ func (p *publication) Message() *broker.Message { } func (p *publication) Ack() error { + if cerr := p.ackErr.Load(); cerr != nil { + return cerr.(error) + } if atomic.LoadInt32(p.readerDone) == 1 { return kafka.ErrGroupClosed } p.ackCh <- map[string]map[int]int64{p.topic: {p.partition: p.offset}} - if cerr := p.ackErr.Load(); cerr != nil { - return cerr.(error) - } return nil } @@ -296,7 +296,6 @@ func (k *kBroker) Publish(ctx context.Context, topic string, msg *broker.Message return k.writer.WriteMessages(wCtx, kmsg) } -/* func (k *kBroker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { opt := broker.NewSubscribeOptions(opts...) @@ -417,8 +416,8 @@ func (k *kBroker) BatchSubscribe(ctx context.Context, topic string, handler brok cntWait := int32(0) for topic, assignments := range generation.Assignments { - if k.opts.Logger.V(logger.TraceLevel) { - k.opts.Logger.Tracef(k.opts.Context, "topic: %s assignments: %v", topic, assignments) + if k.opts.Logger.V(logger.DebugLevel) { + k.opts.Logger.Debugf(k.opts.Context, "topic: %s assignments: %v", topic, assignments) } for _, assignment := range assignments { cfg := k.readerConfig @@ -440,11 +439,11 @@ func (k *kBroker) BatchSubscribe(ctx context.Context, topic string, handler brok } errCh := make(chan error) errChs = append(errChs, errCh) - cgh := &cgBatchHandler{ + cgh := &cgHandler{ brokerOpts: k.opts, subOpts: opt, reader: reader, - handler: handler, + batchhandler: handler, ackCh: ackCh, errCh: errCh, cntWait: &cntWait, @@ -456,7 +455,7 @@ func (k *kBroker) BatchSubscribe(ctx context.Context, topic string, handler brok } } if k.opts.Logger.V(logger.TraceLevel) { - k.opts.Logger.Trace(k.opts.Context, "start async commit loop") + k.opts.Logger.Trace(k.opts.Context, "start commit loop") } // run async commit loop go k.commitLoop(generation, k.readerConfig.CommitInterval, ackCh, errChs, &readerDone, commitDoneCh, &cntWait) @@ -478,7 +477,6 @@ type cgBatchHandler struct { commitDoneCh chan bool cntWait *int32 } -*/ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { opt := broker.NewSubscribeOptions(opts...) @@ -588,11 +586,13 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha continue } + k.opts.Meter.Counter("broker_reader_partitions", "topic", topic).Set(uint64(0)) ackCh := make(chan map[string]map[int]int64, DefaultCommitQueueSize) errChLen := 0 for _, assignments := range generation.Assignments { errChLen += len(assignments) } + errChs := make([]chan error, 0, errChLen) commitDoneCh := make(chan bool) @@ -600,6 +600,8 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha cntWait := int32(0) for topic, assignments := range generation.Assignments { + k.opts.Meter.Counter("broker_reader_partitions", "topic", topic).Set(uint64(len(assignments))) + if k.opts.Logger.V(logger.TraceLevel) { k.opts.Logger.Tracef(k.opts.Context, "topic: %s assignments: %v", topic, assignments) } @@ -655,6 +657,7 @@ type cgHandler struct { subOpts broker.SubscribeOptions reader *kafka.Reader handler broker.Handler + batchhandler broker.BatchHandler ackCh chan map[string]map[int]int64 errCh chan error readerDone *int32 @@ -664,10 +667,6 @@ type cgHandler struct { func (k *kBroker) commitLoop(generation *kafka.Generation, commitInterval time.Duration, ackCh chan map[string]map[int]int64, errChs []chan error, readerDone *int32, commitDoneCh chan bool, cntWait *int32) { - if k.opts.Logger.V(logger.TraceLevel) { - k.opts.Logger.Trace(k.opts.Context, "start commit loop") - } - td := DefaultCommitInterval if commitInterval > 0 { @@ -710,7 +709,7 @@ func (k *kBroker) commitLoop(generation *kafka.Generation, commitInterval time.D } case ack := <-ackCh: if k.opts.Logger.V(logger.TraceLevel) { - k.opts.Logger.Tracef(k.opts.Context, "new commit offsets: %v", ack) + // k.opts.Logger.Tracef(k.opts.Context, "new commit offsets: %v", ack) } switch td { case 0: // sync commits as CommitInterval == 0 @@ -802,7 +801,7 @@ func (k *kBroker) commitLoop(generation *kafka.Generation, commitInterval time.D func (h *cgHandler) run(ctx context.Context) { if h.brokerOpts.Logger.V(logger.TraceLevel) { - h.brokerOpts.Logger.Trace(ctx, "start partition reader") + h.brokerOpts.Logger.Tracef(ctx, "start partition reader topic: %s partition: %d", h.reader.Config().Topic, h.reader.Config().Partition) } td := DefaultStatsInterval @@ -820,20 +819,28 @@ func (h *cgHandler) run(ctx context.Context) { atomic.CompareAndSwapInt32(h.readerDone, 0, 1) if err := h.reader.Close(); err != nil && h.brokerOpts.Logger.V(logger.ErrorLevel) { - h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] reader close error: %v", err) + h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] reader for topic %s partition %d close error: %v", h.reader.Config().Topic, h.reader.Config().Partition, err) } <-h.commitDoneCh if h.brokerOpts.Logger.V(logger.TraceLevel) { - h.brokerOpts.Logger.Trace(ctx, "stop partition reader") + h.brokerOpts.Logger.Tracef(ctx, "stop partition reader topic: %s partition: %d", h.reader.Config().Topic, h.reader.Config().Partition) } }() + /* + tc := time.NewTicker(3 * time.Second) + defer tc.Stop() + */ go func() { for { select { + // case <-tc.C: + // commitErr.Store(errors.New("my err")) + // return case err := <-h.errCh: if err != nil { commitErr.Store(err) + return } case <-ctx.Done(): return @@ -842,69 +849,77 @@ func (h *cgHandler) run(ctx context.Context) { }() for { - msg, err := h.reader.ReadMessage(ctx) - switch err { + select { + case <-ctx.Done(): + return default: - if h.brokerOpts.Logger.V(logger.ErrorLevel) { - h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] unexpected error type: %T err: %v", err, err) - } - return - case kafka.ErrGenerationEnded: - // generation has ended - if h.brokerOpts.Logger.V(logger.TraceLevel) { - h.brokerOpts.Logger.Trace(h.brokerOpts.Context, "[segmentio] generation ended, rebalance or close") - } - return - case nil: - if cerr := commitErr.Load(); cerr != nil { + msg, err := h.reader.ReadMessage(ctx) + switch err { + default: if h.brokerOpts.Logger.V(logger.ErrorLevel) { - h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] commit error: %v", cerr) + h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] unexpected error type: %T err: %v", err, err) } return - } + case kafka.ErrGenerationEnded: + // generation has ended + if h.brokerOpts.Logger.V(logger.TraceLevel) { + h.brokerOpts.Logger.Trace(h.brokerOpts.Context, "[segmentio] generation ended, rebalance or close") + } + return + case nil: + if cerr := commitErr.Load(); cerr != nil { + if h.brokerOpts.Logger.V(logger.ErrorLevel) { + h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] commit error: %v", cerr) + } + return + } - eh := h.brokerOpts.ErrorHandler + eh := h.brokerOpts.ErrorHandler - if h.subOpts.ErrorHandler != nil { - eh = h.subOpts.ErrorHandler - } - p := &publication{ackCh: h.ackCh, partition: msg.Partition, offset: msg.Offset + 1, topic: msg.Topic, msg: &broker.Message{}, readerDone: h.readerDone} + if h.subOpts.ErrorHandler != nil { + eh = h.subOpts.ErrorHandler + } + p := &publication{ackCh: h.ackCh, partition: msg.Partition, offset: msg.Offset + 1, topic: msg.Topic, msg: &broker.Message{}, readerDone: h.readerDone} - if h.subOpts.BodyOnly { - p.msg.Body = msg.Value - } else { - if err := h.brokerOpts.Codec.Unmarshal(msg.Value, p.msg); err != nil { - p.SetError(err) + if h.subOpts.BodyOnly { p.msg.Body = msg.Value + } else { + if err := h.brokerOpts.Codec.Unmarshal(msg.Value, p.msg); err != nil { + p.SetError(err) + p.msg.Body = msg.Value + if eh != nil { + _ = eh(p) + } else { + if h.brokerOpts.Logger.V(logger.ErrorLevel) { + h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio]: failed to unmarshal: %v", err) + } + } + continue + } + } + if cerr := commitErr.Load(); cerr != nil { + if h.brokerOpts.Logger.V(logger.ErrorLevel) { + h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] commit error: %v", cerr) + } + return + } + err = h.handler(p) + if err == nil && h.subOpts.AutoAck { + if err = p.Ack(); err != nil { + if h.brokerOpts.Logger.V(logger.ErrorLevel) { + h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio]: message ack error: %v", err) + } + return + } + } else if err != nil { + p.err = err if eh != nil { _ = eh(p) } else { if h.brokerOpts.Logger.V(logger.ErrorLevel) { - h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio]: failed to unmarshal: %v", err) + h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio]: subscriber error: %v", err) } } - continue - } - } - if cerr := commitErr.Load(); cerr != nil { - p.ackErr.Store(cerr.(bool)) - } - err = h.handler(p) - if err == nil && h.subOpts.AutoAck { - if err = p.Ack(); err != nil { - if h.brokerOpts.Logger.V(logger.ErrorLevel) { - h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio]: message ack error: %v", err) - return - } - } - } else if err != nil { - p.err = err - if eh != nil { - _ = eh(p) - } else { - if h.brokerOpts.Logger.V(logger.ErrorLevel) { - h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio]: subscriber error: %v", err) - } } } } @@ -912,16 +927,14 @@ func (h *cgHandler) run(ctx context.Context) { } func (sub *subscriber) createGroup(ctx context.Context) { - sub.RLock() - cgcfg := sub.cgcfg - sub.RUnlock() - for { select { case <-ctx.Done(): - // closed return default: + sub.RLock() + cgcfg := sub.cgcfg + sub.RUnlock() cgroup, err := kafka.NewConsumerGroup(cgcfg) if err != nil { if sub.brokerOpts.Logger.V(logger.ErrorLevel) { @@ -932,7 +945,6 @@ func (sub *subscriber) createGroup(ctx context.Context) { sub.Lock() sub.group = cgroup sub.Unlock() - // return return } } @@ -989,30 +1001,43 @@ func (k *kBroker) configure(opts ...broker.Option) error { } k.addrs = cAddrs k.readerConfig = readerConfig - k.writer = &kafka.Writer{ - Addr: kafka.TCP(k.addrs...), - Balancer: writerConfig.Balancer, - MaxAttempts: writerConfig.MaxAttempts, - BatchSize: writerConfig.BatchSize, - BatchBytes: int64(writerConfig.BatchBytes), - BatchTimeout: writerConfig.BatchTimeout, - ReadTimeout: writerConfig.ReadTimeout, - WriteTimeout: writerConfig.WriteTimeout, - RequiredAcks: kafka.RequiredAcks(writerConfig.RequiredAcks), - Async: writerConfig.Async, - //Completion: writerConfig.Completion, - //Compression: writerConfig.Compression, - Logger: writerConfig.Logger, - ErrorLogger: writerConfig.ErrorLogger, - //Transport: writerConfig.Transport, + k.writerConfig = writerConfig + + if k.readerConfig.Dialer == nil { + k.readerConfig.Dialer = kafka.DefaultDialer + } + if k.writerConfig.Dialer == nil { + k.writerConfig.Dialer = kafka.DefaultDialer } if id, ok := k.opts.Context.Value(clientIDKey{}).(string); ok { - if k.readerConfig.Dialer == nil { - k.readerConfig.Dialer = kafka.DefaultDialer - } + k.writerConfig.Dialer.ClientID = id k.readerConfig.Dialer.ClientID = id } + k.writer = &kafka.Writer{ + Addr: kafka.TCP(k.addrs...), + Balancer: k.writerConfig.Balancer, + MaxAttempts: k.writerConfig.MaxAttempts, + BatchSize: k.writerConfig.BatchSize, + BatchBytes: int64(k.writerConfig.BatchBytes), + BatchTimeout: k.writerConfig.BatchTimeout, + ReadTimeout: k.writerConfig.ReadTimeout, + WriteTimeout: k.writerConfig.WriteTimeout, + RequiredAcks: kafka.RequiredAcks(k.writerConfig.RequiredAcks), + Async: k.writerConfig.Async, + //Completion: writerConfig.Completion, + //Compression: writerConfig.Compression, + Logger: k.writerConfig.Logger, + ErrorLogger: k.writerConfig.ErrorLogger, + Transport: &kafka.Transport{ + Dial: k.writerConfig.Dialer.DialFunc, + ClientID: k.writerConfig.Dialer.ClientID, + IdleTimeout: time.Second * 5, + MetadataTTL: time.Second * 9, + SASL: k.writerConfig.Dialer.SASLMechanism, + }, + } + if fn, ok := k.opts.Context.Value(writerCompletionFunc{}).(func([]kafka.Message, error)); ok { k.writer.Completion = fn } diff --git a/stats.go b/stats.go index de84350..83094e3 100644 --- a/stats.go +++ b/stats.go @@ -2,6 +2,7 @@ package segmentio import ( "context" + "sync" "time" kafka "github.com/segmentio/kafka-go" @@ -10,7 +11,14 @@ import ( func readerStats(ctx context.Context, r *kafka.Reader, td time.Duration, m meter.Meter) { ticker := time.NewTicker(td) - defer ticker.Stop() + var once sync.Once + + onceLabels := make([]string, 0, 4) + + defer func() { + ticker.Stop() + m.Counter("broker_reader_count", onceLabels...).Add(int(-1)) + }() for { select { @@ -22,7 +30,10 @@ func readerStats(ctx context.Context, r *kafka.Reader, td time.Duration, m meter } rstats := r.Stats() labels := []string{"topic", rstats.Topic, "partition", rstats.Partition, "client_id", rstats.ClientID} - + once.Do(func() { + onceLabels = []string{"topic", rstats.Topic, "client_id", rstats.ClientID} + m.Counter("broker_reader_count", onceLabels...).Add(int(1)) + }) m.Counter("broker_reader_dial_count", labels...).Add(int(rstats.Dials)) m.Counter("broker_reader_fetch_count", labels...).Add(int(rstats.Fetches)) m.Counter("broker_reader_message_count", labels...).Add(int(rstats.Messages))