From 03a583d9d4db956876af6c390de1b47a072cede4 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Wed, 22 Apr 2020 00:43:14 +0300 Subject: [PATCH] broker/segmentio: intermediate rewrite Signed-off-by: Vasiliy Tolstov --- segmentio.go | 265 ++++++++++++++++++++------------------------------- 1 file changed, 104 insertions(+), 161 deletions(-) diff --git a/segmentio.go b/segmentio.go index 4b5248a..f069931 100644 --- a/segmentio.go +++ b/segmentio.go @@ -4,7 +4,6 @@ package segmentio import ( "context" "errors" - "io" "sync" "github.com/google/uuid" @@ -30,25 +29,28 @@ type kBroker struct { type subscriber struct { k *kBroker - group *kafka.ConsumerGroup - t string + topic string opts broker.SubscribeOptions offset int64 gen *kafka.Generation partition int handler broker.Handler reader *kafka.Reader + exit bool + done chan struct{} + group *kafka.ConsumerGroup + sync.RWMutex } type publication struct { - t string - err error - m *broker.Message - ctx context.Context - gen *kafka.Generation - reader *kafka.Reader - km kafka.Message - mp map[string]map[int]int64 // for commit offsets + topic string + err error + m *broker.Message + ctx context.Context + generation *kafka.Generation + reader *kafka.Reader + km kafka.Message + offsets map[string]map[int]int64 // for commit offsets } func init() { @@ -56,7 +58,7 @@ func init() { } func (p *publication) Topic() string { - return p.t + return p.topic } func (p *publication) Message() *broker.Message { @@ -64,8 +66,7 @@ func (p *publication) Message() *broker.Message { } func (p *publication) Ack() error { - //return p.gen.CommitOffsets(p.mp) - return p.reader.CommitMessages(p.ctx, p.km) + return p.generation.CommitOffsets(p.offsets) } func (p *publication) Error() error { @@ -77,12 +78,17 @@ func (s *subscriber) Options() broker.SubscribeOptions { } func (s *subscriber) Topic() string { - return s.t + return s.topic } func (s *subscriber) Unsubscribe() error { - //return s.group.Close() - return s.reader.Close() + var err error + s.Lock() + defer s.Unlock() + if s.group != nil { + err = s.group.Close() + } + return err } func (k *kBroker) Address() string { @@ -202,142 +208,73 @@ func (k *kBroker) Subscribe(topic string, handler broker.Handler, opts ...broker o(&opt) } - cfg := k.readerConfig - cfg.Topic = topic - cfg.GroupID = opt.Queue - cfg.WatchPartitionChanges = true - cfg.MaxAttempts = 1 - if err := cfg.Validate(); err != nil { - return nil, err - } - - reader := kafka.NewReader(cfg) - sub := &subscriber{opts: opt, t: topic, reader: reader} - - go func() { - for { - select { - case <-k.opts.Context.Done(): - return - default: - msg, err := reader.FetchMessage(k.opts.Context) - if err != nil && err == io.EOF { - return - } else if err != nil { - logger.Errorf("[kafka] subscribe error: %v", err) - } - p := &publication{t: topic, ctx: k.opts.Context, reader: reader, km: msg} - - var m broker.Message - eh := k.opts.ErrorHandler - p.m = &m - if err := k.opts.Codec.Unmarshal(msg.Value, &m); err != nil { - p.err = err - p.m.Body = msg.Value - if eh != nil { - eh(p) - } else { - if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Errorf("[kafka]: failed to unmarshal: %v", err) - } - } - continue - } - err = handler(p) - if err == nil && opt.AutoAck { - if err = p.Ack(); err != nil { - logger.Errorf("[kafka]: unable to commit msg: %v", err) - } - } else if err != nil { - p.err = err - if eh != nil { - eh(p) - } else { - if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Errorf("[kafka]: subscriber error: %v", err) - } - } - } - - } - } - }() - - return sub, nil -} - -/* -func (k *kBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { - opt := broker.SubscribeOptions{ - AutoAck: true, - Queue: uuid.New().String(), - } - for _, o := range opts { - o(&opt) - } - - gcfg := kafka.ConsumerGroupConfig{ + cgcfg := kafka.ConsumerGroupConfig{ ID: opt.Queue, WatchPartitionChanges: true, Brokers: k.readerConfig.Brokers, Topics: []string{topic}, GroupBalancers: []kafka.GroupBalancer{kafka.RangeGroupBalancer{}}, } - if err := gcfg.Validate(); err != nil { + if err := cgcfg.Validate(); err != nil { return nil, err } - sub := &subscriber{k: k, opts: opt, t: topic, handler: handler} - - chErr := make(chan error) - - go func() { - for { - logger.Info("new consumer group") - group, err := kafka.NewConsumerGroup(gcfg) - if err != nil { - chErr <- err - time.Sleep(1 * time.Second) - continue - } - logger.Info("group next") - gen, err := group.Next(k.opts.Context) - if err == kafka.ErrGroupClosed { - chErr <- nil - time.Sleep(1 * time.Second) - continue - } else if err != nil { - chErr <- err - time.Sleep(1 * time.Second) - continue - } - chErr <- nil - logger.Info("gen assign") - assignments := gen.Assignments[topic] - for _, assignment := range assignments { - partition, offset := assignment.ID, assignment.Offset - sub.offset = offset - sub.partition = partition - sub.gen = gen - logger.Infof("gen start part %v off %v", partition, offset) - gen.Start(sub.run) - } - } - }() - - err := <-chErr + group, err := kafka.NewConsumerGroup(cgcfg) if err != nil { return nil, err } + sub := &subscriber{opts: opt, topic: topic, group: group} + go func() { for { select { case <-k.opts.Context.Done(): + if logger.V(logger.TraceLevel, logger.DefaultLogger) { + logger.Trace("[segmentio] consumer group closed %v", k.opts.Context.Err()) + } + // consumer group closed return - case err := <-chErr: - if err != nil { - logger.Error(err) + default: + /* + group, err := kafka.NewConsumerGroup(cgcfg) + if err != nil { + if logger.V(logger.TraceLevel, logger.DefaultLogger) { + logger.Trace("[segmentio] consumer group error %v", err) + } + continue + } + sub.Lock() + *(sub.group) = *group + sub.Unlock() + */ + generation, err := group.Next(k.opts.Context) + switch err { + case kafka.ErrGroupClosed: + if logger.V(logger.TraceLevel, logger.DefaultLogger) { + logger.Trace("[segmentio] consumer generation ended %v", k.opts.Context.Err()) + } + continue + default: + if logger.V(logger.TraceLevel, logger.DefaultLogger) { + logger.Trace("[segmentio] consumer error %v", k.opts.Context.Err()) + } + continue + case nil: + // continue + } + + for _, t := range cgcfg.Topics { + assignments := generation.Assignments[t] + for _, assignment := range assignments { + cfg := k.readerConfig + cfg.Topic = t + cfg.Partition = assignment.ID + cfg.GroupID = "" + // cfg.StartOffset = assignment.Offset + cgh := &cgHandler{generation: generation, brokerOpts: k.opts, subOpts: opt, reader: kafka.NewReader(cfg), handler: handler} + generation.Start(cgh.run) + } } } } @@ -346,54 +283,60 @@ func (k *kBroker) Subscribe(topic string, handler broker.Handler, opts ...broker return sub, nil } -func (s *subscriber) run(ctx context.Context) { - // create reader for this partition. - cfg := s.k.readerConfig - cfg.Topic = s.t - cfg.Partition = s.partition - cfg.GroupID = "" - p := &publication{t: s.t, ctx: s.k.opts.Context, gen: s.gen} - p.mp = map[string]map[int]int64{p.t: {s.partition: s.offset}} +type cgHandler struct { + topic string + generation *kafka.Generation + brokerOpts broker.Options + subOpts broker.SubscribeOptions + reader *kafka.Reader + handler broker.Handler +} - reader := kafka.NewReader(cfg) - defer reader.Close() - logger.Info("set offset") - // seek to the last committed offset for this partition. - reader.SetOffset(s.offset) +func (h *cgHandler) run(ctx context.Context) { + offsets := make(map[string]map[int]int64) + offsets[h.reader.Config().Topic] = make(map[int]int64) + + defer h.reader.Close() for { select { case <-ctx.Done(): return default: - msg, err := reader.ReadMessage(ctx) + msg, err := h.reader.ReadMessage(ctx) switch err { + default: + if logger.V(logger.TraceLevel, logger.DefaultLogger) { + logger.Tracef("[segmentio] unexpected error: %v", err) + } + return case kafka.ErrGenerationEnded: - logger.Infof("reader err: %v", err) // generation has ended - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debug("[kafka] subscription closed") + if logger.V(logger.TraceLevel, logger.DefaultLogger) { + logger.Trace("[segmentio] generation ended and subscription closed") } return case nil: var m broker.Message - eh := s.k.opts.ErrorHandler - p.m = &m - if err := s.k.opts.Codec.Unmarshal(msg.Value, &m); err != nil { + eh := h.brokerOpts.ErrorHandler + offsets[msg.Topic][msg.Partition] = msg.Offset + p := &publication{generation: h.generation, m: &m, offsets: offsets} + + if err := h.brokerOpts.Codec.Unmarshal(msg.Value, &m); err != nil { p.err = err p.m.Body = msg.Value if eh != nil { eh(p) } else { if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Errorf("[kafka]: failed to unmarshal: %v", err) + logger.Errorf("[segmentio]: failed to unmarshal: %v", err) } } continue } - err = s.handler(p) - if err == nil && s.opts.AutoAck { + err = h.handler(p) + if err == nil && h.subOpts.AutoAck { if err = p.Ack(); err != nil { - logger.Errorf("[kafka]: unable to commit msg: %v", err) + logger.Errorf("[segmentio]: unable to commit msg: %v", err) } } else if err != nil { p.err = err @@ -401,7 +344,7 @@ func (s *subscriber) run(ctx context.Context) { eh(p) } else { if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Errorf("[kafka]: subscriber error: %v", err) + logger.Errorf("[segmentio]: subscriber error: %v", err) } } } @@ -409,7 +352,7 @@ func (s *subscriber) run(ctx context.Context) { } } } -*/ + func (k *kBroker) String() string { return "kafka" }