From 7916dafb4dfef70d9476733439881337ef00ec7e Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Wed, 4 Aug 2021 16:40:48 +0300 Subject: [PATCH] intermediate fixes for subscriber Signed-off-by: Vasiliy Tolstov --- segmentio.go | 255 ++++++++++++++++++++++++++------------------------- 1 file changed, 130 insertions(+), 125 deletions(-) diff --git a/segmentio.go b/segmentio.go index 109686e..91829a0 100644 --- a/segmentio.go +++ b/segmentio.go @@ -4,6 +4,7 @@ package segmentio import ( "context" "fmt" + "strings" "sync" "sync/atomic" "time" @@ -88,21 +89,21 @@ func (s *subscriber) Topic() string { func (s *subscriber) Unsubscribe(ctx context.Context) error { var err error s.Lock() - s.closed = true group := s.group s.Unlock() - if group != nil { err = group.Close() } + if err == nil { + s.Lock() + s.closed = true + s.Unlock() + } return err } func (k *kBroker) Address() string { - if len(k.addrs) > 0 { - return k.addrs[0] - } - return "127.0.0.1:9092" + return strings.Join(k.addrs, ",") } func (k *kBroker) Name() string { @@ -293,6 +294,21 @@ func (k *kBroker) Publish(ctx context.Context, topic string, msg *broker.Message if ctx != nil { wCtx = ctx } + + if err = k.writer.WriteMessages(wCtx, kmsg); err == nil { + return nil + } + + logger.Debugf(wCtx, "recreate writer because of err: %v", err) + k.Lock() + if err = k.writer.Close(); err != nil { + logger.Errorf(wCtx, "failed to close writer: %v", err) + k.Unlock() + return err + } + k.writer = newWriter(k.writerConfig) + k.Unlock() + return k.writer.WriteMessages(wCtx, kmsg) } @@ -377,7 +393,7 @@ func (k *kBroker) BatchSubscribe(ctx context.Context, topic string, handler brok case nil: // normal execution case kafka.ErrGroupClosed: - k.opts.Logger.Tracef(k.opts.Context, "group closed %v", err) + k.opts.Logger.Debugf(k.opts.Context, "group closed %v", err) sub.RLock() closed := sub.closed sub.RUnlock() @@ -390,15 +406,15 @@ func (k *kBroker) BatchSubscribe(ctx context.Context, topic string, handler brok sub.createGroup(gCtx) continue default: - k.opts.Logger.Tracef(k.opts.Context, "some error: %v", err) + k.opts.Logger.Debugf(k.opts.Context, "some error: %v", err) sub.RLock() closed := sub.closed sub.RUnlock() if closed { return } - if k.opts.Logger.V(logger.TraceLevel) { - k.opts.Logger.Tracef(k.opts.Context, "[segmentio] recreate consumer group, as unexpected consumer error %T %v", err, err) + if k.opts.Logger.V(logger.DebugLevel) { + k.opts.Logger.Debugf(k.opts.Context, "[segmentio] recreate consumer group, as unexpected consumer error %T %v", err, err) } sub.createGroup(gCtx) continue @@ -454,8 +470,8 @@ func (k *kBroker) BatchSubscribe(ctx context.Context, topic string, handler brok generation.Start(cgh.run) } } - if k.opts.Logger.V(logger.TraceLevel) { - k.opts.Logger.Trace(k.opts.Context, "start commit loop") + if k.opts.Logger.V(logger.DebugLevel) { + k.opts.Logger.Debug(k.opts.Context, "start commit loop") } // run async commit loop go k.commitLoop(generation, k.readerConfig.CommitInterval, ackCh, errChs, &readerDone, commitDoneCh, &cntWait) @@ -466,18 +482,6 @@ func (k *kBroker) BatchSubscribe(ctx context.Context, topic string, handler brok return sub, nil } -type cgBatchHandler struct { - brokerOpts broker.Options - subOpts broker.SubscribeOptions - reader *kafka.Reader - handler broker.BatchHandler - ackCh chan map[string]map[int]int64 - errCh chan error - readerDone *int32 - commitDoneCh chan bool - cntWait *int32 -} - func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { opt := broker.NewSubscribeOptions(opts...) @@ -559,7 +563,7 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha case nil: // normal execution case kafka.ErrGroupClosed: - k.opts.Logger.Tracef(k.opts.Context, "group closed %v", err) + k.opts.Logger.Debugf(k.opts.Context, "group closed %v", err) sub.RLock() closed := sub.closed sub.RUnlock() @@ -572,21 +576,21 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha sub.createGroup(gCtx) continue default: - k.opts.Logger.Tracef(k.opts.Context, "some error: %v", err) + k.opts.Logger.Debugf(k.opts.Context, "some error: %v", err) sub.RLock() closed := sub.closed sub.RUnlock() if closed { return } - if k.opts.Logger.V(logger.TraceLevel) { - k.opts.Logger.Tracef(k.opts.Context, "[segmentio] recreate consumer group, as unexpected consumer error %T %v", err, err) + if k.opts.Logger.V(logger.DebugLevel) { + k.opts.Logger.Debugf(k.opts.Context, "[segmentio] recreate consumer group, as unexpected consumer error %T %v", err, err) } sub.createGroup(gCtx) continue } - k.opts.Meter.Counter("broker_reader_partitions", "topic", topic).Set(uint64(0)) + //k.opts.Meter.Counter("broker_reader_partitions", "topic", topic).Set(uint64(0)) ackCh := make(chan map[string]map[int]int64, DefaultCommitQueueSize) errChLen := 0 for _, assignments := range generation.Assignments { @@ -600,10 +604,10 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha cntWait := int32(0) for topic, assignments := range generation.Assignments { - k.opts.Meter.Counter("broker_reader_partitions", "topic", topic).Set(uint64(len(assignments))) + //k.opts.Meter.Counter("broker_reader_partitions", "topic", topic).Set(uint64(len(assignments))) - if k.opts.Logger.V(logger.TraceLevel) { - k.opts.Logger.Tracef(k.opts.Context, "topic: %s assignments: %v", topic, assignments) + if k.opts.Logger.V(logger.DebugLevel) { + k.opts.Logger.Debugf(k.opts.Context, "topic: %s assignments: %v", topic, assignments) } for _, assignment := range assignments { cfg := k.readerConfig @@ -612,17 +616,9 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha cfg.GroupID = "" reader := kafka.NewReader(cfg) - if err := reader.SetOffset(assignment.Offset); err != nil { - if k.opts.Logger.V(logger.ErrorLevel) { - k.opts.Logger.Errorf(k.opts.Context, "assignments offset %d can be set by reader: %v", assignment.Offset, err) - } - if err = reader.Close(); err != nil { - if k.opts.Logger.V(logger.ErrorLevel) { - k.opts.Logger.Errorf(k.opts.Context, "reader close err: %v", err) - } - } - continue - } + // as we dont use consumer group in reader, reader not started before actuall fetch, so we can ignore all errors + _ = reader.SetOffset(assignment.Offset) + errCh := make(chan error) errChs = append(errChs, errCh) cgh := &cgHandler{ @@ -640,9 +636,6 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha generation.Start(cgh.run) } } - if k.opts.Logger.V(logger.TraceLevel) { - k.opts.Logger.Trace(k.opts.Context, "start async commit loop") - } // run async commit loop go k.commitLoop(generation, k.readerConfig.CommitInterval, ackCh, errChs, &readerDone, commitDoneCh, &cntWait) } @@ -667,6 +660,10 @@ type cgHandler struct { func (k *kBroker) commitLoop(generation *kafka.Generation, commitInterval time.Duration, ackCh chan map[string]map[int]int64, errChs []chan error, readerDone *int32, commitDoneCh chan bool, cntWait *int32) { + if k.opts.Logger.V(logger.DebugLevel) { + k.opts.Logger.Debug(k.opts.Context, "start async commit loop") + } + td := DefaultCommitInterval if commitInterval > 0 { @@ -682,6 +679,7 @@ func (k *kBroker) commitLoop(generation *kafka.Generation, commitInterval time.D go func() { defer func() { + k.opts.Logger.Debug(k.opts.Context, "return from commitLoop and close commitDoneCh") close(commitDoneCh) }() @@ -691,26 +689,23 @@ func (k *kBroker) commitLoop(generation *kafka.Generation, commitInterval time.D for { select { case <-checkTicker.C: - if atomic.LoadInt32(cntWait) == 0 { - mapMu.Lock() - if len(offsets) > 0 { - if err := generation.CommitOffsets(offsets); err != nil { - for _, errCh := range errChs { - errCh <- err - } - return - } + if atomic.LoadInt32(cntWait) != 0 { + continue + } + mapMu.Lock() + if err := generation.CommitOffsets(offsets); err != nil { + for _, errCh := range errChs { + errCh <- err } mapMu.Unlock() - if k.opts.Logger.V(logger.TraceLevel) { - k.opts.Logger.Trace(k.opts.Context, "stop commit loop") - } return } - case ack := <-ackCh: - if k.opts.Logger.V(logger.TraceLevel) { - // k.opts.Logger.Tracef(k.opts.Context, "new commit offsets: %v", ack) + mapMu.Unlock() + if k.opts.Logger.V(logger.DebugLevel) { + k.opts.Logger.Debug(k.opts.Context, "stop commit filling loop") } + return + case ack := <-ackCh: switch td { case 0: // sync commits as CommitInterval == 0 if len(ack) > 0 { @@ -734,27 +729,20 @@ func (k *kBroker) commitLoop(generation *kafka.Generation, commitInterval time.D } mapMu.Unlock() } - // check for readers done and commit offsets - if atomic.LoadInt32(cntWait) == 0 { - mapMu.Lock() - if len(offsets) > 0 { - if err := generation.CommitOffsets(offsets); err != nil { - for _, errCh := range errChs { - errCh <- err - } - return - } - } - mapMu.Unlock() - if k.opts.Logger.V(logger.TraceLevel) { - k.opts.Logger.Trace(k.opts.Context, "stop commit loop") - } - return - } } } }() + if td == 0 { + //sync commit loop + for { + if atomic.LoadInt32(readerDone) == 1 && atomic.LoadInt32(cntWait) == 0 { + break + } + time.Sleep(1 * time.Second) + } + } + // async commit loop if td > 0 { ticker := time.NewTicker(td) @@ -764,22 +752,14 @@ func (k *kBroker) commitLoop(generation *kafka.Generation, commitInterval time.D for { select { case <-doneTicker.C: - if atomic.LoadInt32(readerDone) == 1 { - mapMu.Lock() - if len(offsets) == 0 { - defer ticker.Stop() - return - } + if atomic.LoadInt32(readerDone) == 1 && atomic.LoadInt32(cntWait) == 0 { + // fire immediate commit offsets ticker.Stop() } case <-ticker.C: mapMu.Lock() - if len(offsets) == 0 { - mapMu.Unlock() - continue - } - if k.opts.Logger.V(logger.TraceLevel) { - k.opts.Logger.Tracef(k.opts.Context, "async commit offsets: %v", offsets) + if k.opts.Logger.V(logger.DebugLevel) && len(offsets) > 0 { + k.opts.Logger.Debugf(k.opts.Context, "async commit offsets: %v", offsets) } err := generation.CommitOffsets(offsets) if err != nil { @@ -800,8 +780,8 @@ func (k *kBroker) commitLoop(generation *kafka.Generation, commitInterval time.D } func (h *cgHandler) run(ctx context.Context) { - if h.brokerOpts.Logger.V(logger.TraceLevel) { - h.brokerOpts.Logger.Tracef(ctx, "start partition reader topic: %s partition: %d", h.reader.Config().Topic, h.reader.Config().Partition) + if h.brokerOpts.Logger.V(logger.DebugLevel) { + h.brokerOpts.Logger.Debugf(ctx, "start partition reader topic: %s partition: %d", h.reader.Config().Topic, h.reader.Config().Partition) } td := DefaultStatsInterval @@ -821,9 +801,11 @@ func (h *cgHandler) run(ctx context.Context) { if err := h.reader.Close(); err != nil && h.brokerOpts.Logger.V(logger.ErrorLevel) { h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] reader for topic %s partition %d close error: %v", h.reader.Config().Topic, h.reader.Config().Partition, err) } + h.brokerOpts.Logger.Debug(h.brokerOpts.Context, "wait start for commitDoneCh channel closing") <-h.commitDoneCh - if h.brokerOpts.Logger.V(logger.TraceLevel) { - h.brokerOpts.Logger.Tracef(ctx, "stop partition reader topic: %s partition: %d", h.reader.Config().Topic, h.reader.Config().Partition) + h.brokerOpts.Logger.Debug(h.brokerOpts.Context, "wait stop for commitDoneCh channel closing") + if h.brokerOpts.Logger.V(logger.DebugLevel) { + h.brokerOpts.Logger.Debugf(ctx, "stop partition reader topic: %s partition: %d", h.reader.Config().Topic, h.reader.Config().Partition) } }() @@ -856,14 +838,22 @@ func (h *cgHandler) run(ctx context.Context) { msg, err := h.reader.ReadMessage(ctx) switch err { default: - if h.brokerOpts.Logger.V(logger.ErrorLevel) { - h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] unexpected error type: %T err: %v", err, err) + switch kerr := err.(type) { + case kafka.Error: + if h.brokerOpts.Logger.V(logger.DebugLevel) { + h.brokerOpts.Logger.Debugf(h.brokerOpts.Context, "[segmentio] kafka error %T err: %v", kerr, kerr) + } + return + default: + if h.brokerOpts.Logger.V(logger.ErrorLevel) { + h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] unexpected error type: %T err: %v", err, err) + } + return } - return case kafka.ErrGenerationEnded: // generation has ended - if h.brokerOpts.Logger.V(logger.TraceLevel) { - h.brokerOpts.Logger.Trace(h.brokerOpts.Context, "[segmentio] generation ended, rebalance or close") + if h.brokerOpts.Logger.V(logger.DebugLevel) { + h.brokerOpts.Logger.Debug(h.brokerOpts.Context, "[segmentio] generation ended, rebalance or close") } return case nil: @@ -885,7 +875,7 @@ func (h *cgHandler) run(ctx context.Context) { p.msg.Body = msg.Value } else { if err := h.brokerOpts.Codec.Unmarshal(msg.Value, p.msg); err != nil { - p.SetError(err) + p.err = err p.msg.Body = msg.Value if eh != nil { _ = eh(p) @@ -927,6 +917,7 @@ func (h *cgHandler) run(ctx context.Context) { } func (sub *subscriber) createGroup(ctx context.Context) { + var err error for { select { case <-ctx.Done(): @@ -934,8 +925,20 @@ func (sub *subscriber) createGroup(ctx context.Context) { default: sub.RLock() cgcfg := sub.cgcfg + closed := sub.closed + cgroup := sub.group sub.RUnlock() - cgroup, err := kafka.NewConsumerGroup(cgcfg) + if closed { + return + } + if cgroup != nil { + if err = cgroup.Close(); err != nil { + if sub.brokerOpts.Logger.V(logger.ErrorLevel) { + sub.brokerOpts.Logger.Errorf(sub.brokerOpts.Context, "[segmentio]: consumer group close error %v", err) + } + } + } + cgroup, err = kafka.NewConsumerGroup(cgcfg) if err != nil { if sub.brokerOpts.Logger.V(logger.ErrorLevel) { sub.brokerOpts.Logger.Errorf(sub.brokerOpts.Context, "[segmentio]: consumer group error %v", err) @@ -1002,6 +1005,7 @@ func (k *kBroker) configure(opts ...broker.Option) error { k.addrs = cAddrs k.readerConfig = readerConfig k.writerConfig = writerConfig + k.writerConfig.Brokers = k.addrs if k.readerConfig.Dialer == nil { k.readerConfig.Dialer = kafka.DefaultDialer @@ -1014,30 +1018,7 @@ func (k *kBroker) configure(opts ...broker.Option) error { k.readerConfig.Dialer.ClientID = id } - k.writer = &kafka.Writer{ - Addr: kafka.TCP(k.addrs...), - Balancer: k.writerConfig.Balancer, - MaxAttempts: k.writerConfig.MaxAttempts, - BatchSize: k.writerConfig.BatchSize, - BatchBytes: int64(k.writerConfig.BatchBytes), - BatchTimeout: k.writerConfig.BatchTimeout, - ReadTimeout: k.writerConfig.ReadTimeout, - WriteTimeout: k.writerConfig.WriteTimeout, - RequiredAcks: kafka.RequiredAcks(k.writerConfig.RequiredAcks), - Async: k.writerConfig.Async, - //Completion: writerConfig.Completion, - //Compression: writerConfig.Compression, - Logger: k.writerConfig.Logger, - ErrorLogger: k.writerConfig.ErrorLogger, - Transport: &kafka.Transport{ - Dial: k.writerConfig.Dialer.DialFunc, - ClientID: k.writerConfig.Dialer.ClientID, - IdleTimeout: time.Second * 5, - MetadataTTL: time.Second * 9, - SASL: k.writerConfig.Dialer.SASLMechanism, - }, - } - + k.writer = newWriter(k.writerConfig) if fn, ok := k.opts.Context.Value(writerCompletionFunc{}).(func([]kafka.Message, error)); ok { k.writer.Completion = fn } @@ -1046,6 +1027,30 @@ func (k *kBroker) configure(opts ...broker.Option) error { return nil } +func newWriter(writerConfig kafka.WriterConfig) *kafka.Writer { + return &kafka.Writer{ + Addr: kafka.TCP(writerConfig.Brokers...), + Balancer: writerConfig.Balancer, + MaxAttempts: writerConfig.MaxAttempts, + BatchSize: writerConfig.BatchSize, + BatchBytes: int64(writerConfig.BatchBytes), + BatchTimeout: writerConfig.BatchTimeout, + ReadTimeout: writerConfig.ReadTimeout, + WriteTimeout: writerConfig.WriteTimeout, + RequiredAcks: kafka.RequiredAcks(writerConfig.RequiredAcks), + Async: writerConfig.Async, + Logger: writerConfig.Logger, + ErrorLogger: writerConfig.ErrorLogger, + Transport: &kafka.Transport{ + Dial: writerConfig.Dialer.DialFunc, + ClientID: writerConfig.Dialer.ClientID, + IdleTimeout: time.Second * 5, + MetadataTTL: time.Second * 9, + SASL: writerConfig.Dialer.SASLMechanism, + }, + } +} + func NewBroker(opts ...broker.Option) broker.Broker { return &kBroker{ opts: broker.NewOptions(opts...),