broker/segmentio: fixup kafka reconnection logic

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2020-06-02 01:01:53 +03:00
parent 62d02b4297
commit 033d130253

View File

@ -36,9 +36,10 @@ type subscriber struct {
partition int partition int
handler broker.Handler handler broker.Handler
reader *kafka.Reader reader *kafka.Reader
exit bool closed bool
done chan struct{} done chan struct{}
group *kafka.ConsumerGroup group *kafka.ConsumerGroup
cgcfg kafka.ConsumerGroupConfig
sync.RWMutex sync.RWMutex
} }
@ -91,6 +92,7 @@ func (s *subscriber) Unsubscribe() error {
if s.group != nil { if s.group != nil {
err = s.group.Close() err = s.group.Close()
} }
s.closed = true
return err return err
} }
@ -185,21 +187,19 @@ func (k *kBroker) Publish(topic string, msg *broker.Message, opts ...broker.Publ
return err return err
} }
k.Lock()
writer, ok := k.writers[topic]
if !ok {
cfg := k.writerConfig cfg := k.writerConfig
cfg.Topic = topic cfg.Topic = topic
if err = cfg.Validate(); err != nil { if err = cfg.Validate(); err != nil {
k.Unlock()
return err return err
} }
writer = kafka.NewWriter(cfg) writer := kafka.NewWriter(cfg)
k.writers[topic] = writer
}
k.Unlock()
return writer.WriteMessages(k.opts.Context, kafka.Message{Value: buf}) err = writer.WriteMessages(k.opts.Context, kafka.Message{Value: buf})
if err != nil {
return err
}
return writer.Close()
} }
func (k *kBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { func (k *kBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
@ -222,49 +222,71 @@ func (k *kBroker) Subscribe(topic string, handler broker.Handler, opts ...broker
return nil, err return nil, err
} }
group, err := kafka.NewConsumerGroup(cgcfg) cgroup, err := kafka.NewConsumerGroup(cgcfg)
if err != nil { if err != nil {
return nil, err return nil, err
} }
sub := &subscriber{opts: opt, topic: topic, group: group} sub := &subscriber{opts: opt, topic: topic, group: cgroup, cgcfg: cgcfg}
go func() { go func() {
for { for {
select { select {
case <-k.opts.Context.Done(): case <-k.opts.Context.Done():
if logger.V(logger.TraceLevel, logger.DefaultLogger) { sub.RLock()
logger.Trace("[segmentio] consumer group closed %v", k.opts.Context.Err()) closed := sub.closed
sub.RUnlock()
if closed {
// unsubcribed and closed
return
}
// unexpected context closed
if k.opts.Context.Err() != nil {
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
logger.Errorf("[segmentio] context closed unexpected %v", k.opts.Context.Err())
}
} }
// consumer group closed
return return
default: default:
/* sub.RLock()
group, err := kafka.NewConsumerGroup(cgcfg) group := sub.group
if err != nil { sub.RUnlock()
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
logger.Trace("[segmentio] consumer group error %v", err)
}
continue
}
sub.Lock()
*(sub.group) = *group
sub.Unlock()
*/
generation, err := group.Next(k.opts.Context) generation, err := group.Next(k.opts.Context)
switch err { switch err {
case nil:
// normal execution
case kafka.ErrGroupClosed: case kafka.ErrGroupClosed:
sub.RLock()
closed := sub.closed
sub.RUnlock()
if !closed {
if logger.V(logger.TraceLevel, logger.DefaultLogger) { if logger.V(logger.TraceLevel, logger.DefaultLogger) {
logger.Tracef("[segmentio] consumer generation ended %v", k.opts.Context.Err()) logger.Errorf("[segmentio] recreate consumer group, as it closed %v", k.opts.Context.Err())
}
if err = group.Close(); err != nil {
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
logger.Errorf("[segmentio] consumer group close error %v", err)
}
}
sub.createGroup(k.opts.Context)
} }
continue continue
default: default:
sub.RLock()
closed := sub.closed
sub.RUnlock()
if !closed {
if logger.V(logger.TraceLevel, logger.DefaultLogger) { if logger.V(logger.TraceLevel, logger.DefaultLogger) {
logger.Trace("[segmentio] consumer error %v", k.opts.Context.Err()) logger.Errorf("[segmentio] recreate consumer group, as unexpected consumer error %v", err)
} }
}
if err = group.Close(); err != nil {
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
logger.Errorf("[segmentio] consumer group close error %v", err)
}
}
sub.createGroup(k.opts.Context)
continue continue
case nil:
// continue
} }
for _, t := range cgcfg.Topics { for _, t := range cgcfg.Topics {
@ -274,6 +296,7 @@ func (k *kBroker) Subscribe(topic string, handler broker.Handler, opts ...broker
cfg.Topic = t cfg.Topic = t
cfg.Partition = assignment.ID cfg.Partition = assignment.ID
cfg.GroupID = "" cfg.GroupID = ""
// break reading
// cfg.StartOffset = assignment.Offset // cfg.StartOffset = assignment.Offset
reader := kafka.NewReader(cfg) reader := kafka.NewReader(cfg)
reader.SetOffset(assignment.Offset) reader.SetOffset(assignment.Offset)
@ -281,6 +304,7 @@ func (k *kBroker) Subscribe(topic string, handler broker.Handler, opts ...broker
generation.Start(cgh.run) generation.Start(cgh.run)
} }
} }
} }
} }
}() }()
@ -317,7 +341,7 @@ func (h *cgHandler) run(ctx context.Context) {
case kafka.ErrGenerationEnded: case kafka.ErrGenerationEnded:
// generation has ended // generation has ended
if logger.V(logger.TraceLevel, logger.DefaultLogger) { if logger.V(logger.TraceLevel, logger.DefaultLogger) {
logger.Trace("[segmentio] generation ended and subscription closed") logger.Trace("[segmentio] generation ended")
} }
return return
case nil: case nil:
@ -341,8 +365,10 @@ func (h *cgHandler) run(ctx context.Context) {
err = h.handler(p) err = h.handler(p)
if err == nil && h.subOpts.AutoAck { if err == nil && h.subOpts.AutoAck {
if err = p.Ack(); err != nil { if err = p.Ack(); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("[segmentio]: unable to commit msg: %v", err) logger.Errorf("[segmentio]: unable to commit msg: %v", err)
} }
}
} else if err != nil { } else if err != nil {
p.err = err p.err = err
if eh != nil { if eh != nil {
@ -358,6 +384,33 @@ func (h *cgHandler) run(ctx context.Context) {
} }
} }
func (sub *subscriber) createGroup(ctx context.Context) {
sub.RLock()
cgcfg := sub.cgcfg
sub.RUnlock()
for {
select {
case <-ctx.Done():
// closed
return
default:
cgroup, err := kafka.NewConsumerGroup(cgcfg)
if err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("[segmentio]: consumer group error %v", err)
}
continue
}
sub.Lock()
sub.group = cgroup
sub.Unlock()
// return
return
}
}
}
func (k *kBroker) String() string { func (k *kBroker) String() string {
return "kafka" return "kafka"
} }