diff --git a/segmentio.go b/segmentio.go index 0599e5f..efb842a 100644 --- a/segmentio.go +++ b/segmentio.go @@ -36,9 +36,10 @@ type subscriber struct { partition int handler broker.Handler reader *kafka.Reader - exit bool + closed bool done chan struct{} group *kafka.ConsumerGroup + cgcfg kafka.ConsumerGroupConfig sync.RWMutex } @@ -91,6 +92,7 @@ func (s *subscriber) Unsubscribe() error { if s.group != nil { err = s.group.Close() } + s.closed = true return err } @@ -185,21 +187,19 @@ func (k *kBroker) Publish(topic string, msg *broker.Message, opts ...broker.Publ return err } - k.Lock() - writer, ok := k.writers[topic] - if !ok { - cfg := k.writerConfig - cfg.Topic = topic - if err = cfg.Validate(); err != nil { - k.Unlock() - return err - } - writer = kafka.NewWriter(cfg) - k.writers[topic] = writer + cfg := k.writerConfig + cfg.Topic = topic + if err = cfg.Validate(); err != nil { + return err } - k.Unlock() + writer := kafka.NewWriter(cfg) - return writer.WriteMessages(k.opts.Context, kafka.Message{Value: buf}) + err = writer.WriteMessages(k.opts.Context, kafka.Message{Value: buf}) + if err != nil { + return err + } + + return writer.Close() } func (k *kBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { @@ -222,49 +222,71 @@ func (k *kBroker) Subscribe(topic string, handler broker.Handler, opts ...broker return nil, err } - group, err := kafka.NewConsumerGroup(cgcfg) + cgroup, err := kafka.NewConsumerGroup(cgcfg) if err != nil { return nil, err } - sub := &subscriber{opts: opt, topic: topic, group: group} + sub := &subscriber{opts: opt, topic: topic, group: cgroup, cgcfg: cgcfg} go func() { for { select { case <-k.opts.Context.Done(): - if logger.V(logger.TraceLevel, logger.DefaultLogger) { - logger.Trace("[segmentio] consumer group closed %v", k.opts.Context.Err()) + sub.RLock() + closed := sub.closed + sub.RUnlock() + if closed { + // unsubcribed and closed + return + } + // unexpected context closed + if k.opts.Context.Err() != nil { + if logger.V(logger.TraceLevel, logger.DefaultLogger) { + logger.Errorf("[segmentio] context closed unexpected %v", k.opts.Context.Err()) + } } - // consumer group closed return default: - /* - group, err := kafka.NewConsumerGroup(cgcfg) - if err != nil { - if logger.V(logger.TraceLevel, logger.DefaultLogger) { - logger.Trace("[segmentio] consumer group error %v", err) - } - continue - } - sub.Lock() - *(sub.group) = *group - sub.Unlock() - */ + sub.RLock() + group := sub.group + sub.RUnlock() generation, err := group.Next(k.opts.Context) switch err { + case nil: + // normal execution case kafka.ErrGroupClosed: - if logger.V(logger.TraceLevel, logger.DefaultLogger) { - logger.Tracef("[segmentio] consumer generation ended %v", k.opts.Context.Err()) + sub.RLock() + closed := sub.closed + sub.RUnlock() + if !closed { + if logger.V(logger.TraceLevel, logger.DefaultLogger) { + logger.Errorf("[segmentio] recreate consumer group, as it closed %v", k.opts.Context.Err()) + } + if err = group.Close(); err != nil { + if logger.V(logger.TraceLevel, logger.DefaultLogger) { + logger.Errorf("[segmentio] consumer group close error %v", err) + } + } + sub.createGroup(k.opts.Context) } continue default: - if logger.V(logger.TraceLevel, logger.DefaultLogger) { - logger.Trace("[segmentio] consumer error %v", k.opts.Context.Err()) + sub.RLock() + closed := sub.closed + sub.RUnlock() + if !closed { + if logger.V(logger.TraceLevel, logger.DefaultLogger) { + logger.Errorf("[segmentio] recreate consumer group, as unexpected consumer error %v", err) + } } + if err = group.Close(); err != nil { + if logger.V(logger.TraceLevel, logger.DefaultLogger) { + logger.Errorf("[segmentio] consumer group close error %v", err) + } + } + sub.createGroup(k.opts.Context) continue - case nil: - // continue } for _, t := range cgcfg.Topics { @@ -274,13 +296,15 @@ func (k *kBroker) Subscribe(topic string, handler broker.Handler, opts ...broker cfg.Topic = t cfg.Partition = assignment.ID cfg.GroupID = "" - // cfg.StartOffset = assignment.Offset + // break reading + // cfg.StartOffset = assignment.Offset reader := kafka.NewReader(cfg) reader.SetOffset(assignment.Offset) cgh := &cgHandler{generation: generation, brokerOpts: k.opts, subOpts: opt, reader: reader, handler: handler} generation.Start(cgh.run) } } + } } }() @@ -317,7 +341,7 @@ func (h *cgHandler) run(ctx context.Context) { case kafka.ErrGenerationEnded: // generation has ended if logger.V(logger.TraceLevel, logger.DefaultLogger) { - logger.Trace("[segmentio] generation ended and subscription closed") + logger.Trace("[segmentio] generation ended") } return case nil: @@ -341,7 +365,9 @@ func (h *cgHandler) run(ctx context.Context) { err = h.handler(p) if err == nil && h.subOpts.AutoAck { if err = p.Ack(); err != nil { - logger.Errorf("[segmentio]: unable to commit msg: %v", err) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("[segmentio]: unable to commit msg: %v", err) + } } } else if err != nil { p.err = err @@ -358,6 +384,33 @@ func (h *cgHandler) run(ctx context.Context) { } } +func (sub *subscriber) createGroup(ctx context.Context) { + sub.RLock() + cgcfg := sub.cgcfg + sub.RUnlock() + + for { + select { + case <-ctx.Done(): + // closed + return + default: + cgroup, err := kafka.NewConsumerGroup(cgcfg) + if err != nil { + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("[segmentio]: consumer group error %v", err) + } + continue + } + sub.Lock() + sub.group = cgroup + sub.Unlock() + // return + return + } + } +} + func (k *kBroker) String() string { return "kafka" }