intermediate fixes for subscriber

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2021-08-04 16:40:48 +03:00
parent 83b037df20
commit 7916dafb4d

View File

@ -4,6 +4,7 @@ package segmentio
import ( import (
"context" "context"
"fmt" "fmt"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -88,21 +89,21 @@ func (s *subscriber) Topic() string {
func (s *subscriber) Unsubscribe(ctx context.Context) error { func (s *subscriber) Unsubscribe(ctx context.Context) error {
var err error var err error
s.Lock() s.Lock()
s.closed = true
group := s.group group := s.group
s.Unlock() s.Unlock()
if group != nil { if group != nil {
err = group.Close() err = group.Close()
} }
if err == nil {
s.Lock()
s.closed = true
s.Unlock()
}
return err return err
} }
func (k *kBroker) Address() string { func (k *kBroker) Address() string {
if len(k.addrs) > 0 { return strings.Join(k.addrs, ",")
return k.addrs[0]
}
return "127.0.0.1:9092"
} }
func (k *kBroker) Name() string { func (k *kBroker) Name() string {
@ -293,6 +294,21 @@ func (k *kBroker) Publish(ctx context.Context, topic string, msg *broker.Message
if ctx != nil { if ctx != nil {
wCtx = ctx wCtx = ctx
} }
if err = k.writer.WriteMessages(wCtx, kmsg); err == nil {
return nil
}
logger.Debugf(wCtx, "recreate writer because of err: %v", err)
k.Lock()
if err = k.writer.Close(); err != nil {
logger.Errorf(wCtx, "failed to close writer: %v", err)
k.Unlock()
return err
}
k.writer = newWriter(k.writerConfig)
k.Unlock()
return k.writer.WriteMessages(wCtx, kmsg) return k.writer.WriteMessages(wCtx, kmsg)
} }
@ -377,7 +393,7 @@ func (k *kBroker) BatchSubscribe(ctx context.Context, topic string, handler brok
case nil: case nil:
// normal execution // normal execution
case kafka.ErrGroupClosed: case kafka.ErrGroupClosed:
k.opts.Logger.Tracef(k.opts.Context, "group closed %v", err) k.opts.Logger.Debugf(k.opts.Context, "group closed %v", err)
sub.RLock() sub.RLock()
closed := sub.closed closed := sub.closed
sub.RUnlock() sub.RUnlock()
@ -390,15 +406,15 @@ func (k *kBroker) BatchSubscribe(ctx context.Context, topic string, handler brok
sub.createGroup(gCtx) sub.createGroup(gCtx)
continue continue
default: default:
k.opts.Logger.Tracef(k.opts.Context, "some error: %v", err) k.opts.Logger.Debugf(k.opts.Context, "some error: %v", err)
sub.RLock() sub.RLock()
closed := sub.closed closed := sub.closed
sub.RUnlock() sub.RUnlock()
if closed { if closed {
return return
} }
if k.opts.Logger.V(logger.TraceLevel) { if k.opts.Logger.V(logger.DebugLevel) {
k.opts.Logger.Tracef(k.opts.Context, "[segmentio] recreate consumer group, as unexpected consumer error %T %v", err, err) k.opts.Logger.Debugf(k.opts.Context, "[segmentio] recreate consumer group, as unexpected consumer error %T %v", err, err)
} }
sub.createGroup(gCtx) sub.createGroup(gCtx)
continue continue
@ -454,8 +470,8 @@ func (k *kBroker) BatchSubscribe(ctx context.Context, topic string, handler brok
generation.Start(cgh.run) generation.Start(cgh.run)
} }
} }
if k.opts.Logger.V(logger.TraceLevel) { if k.opts.Logger.V(logger.DebugLevel) {
k.opts.Logger.Trace(k.opts.Context, "start commit loop") k.opts.Logger.Debug(k.opts.Context, "start commit loop")
} }
// run async commit loop // run async commit loop
go k.commitLoop(generation, k.readerConfig.CommitInterval, ackCh, errChs, &readerDone, commitDoneCh, &cntWait) go k.commitLoop(generation, k.readerConfig.CommitInterval, ackCh, errChs, &readerDone, commitDoneCh, &cntWait)
@ -466,18 +482,6 @@ func (k *kBroker) BatchSubscribe(ctx context.Context, topic string, handler brok
return sub, nil return sub, nil
} }
type cgBatchHandler struct {
brokerOpts broker.Options
subOpts broker.SubscribeOptions
reader *kafka.Reader
handler broker.BatchHandler
ackCh chan map[string]map[int]int64
errCh chan error
readerDone *int32
commitDoneCh chan bool
cntWait *int32
}
func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
opt := broker.NewSubscribeOptions(opts...) opt := broker.NewSubscribeOptions(opts...)
@ -559,7 +563,7 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha
case nil: case nil:
// normal execution // normal execution
case kafka.ErrGroupClosed: case kafka.ErrGroupClosed:
k.opts.Logger.Tracef(k.opts.Context, "group closed %v", err) k.opts.Logger.Debugf(k.opts.Context, "group closed %v", err)
sub.RLock() sub.RLock()
closed := sub.closed closed := sub.closed
sub.RUnlock() sub.RUnlock()
@ -572,21 +576,21 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha
sub.createGroup(gCtx) sub.createGroup(gCtx)
continue continue
default: default:
k.opts.Logger.Tracef(k.opts.Context, "some error: %v", err) k.opts.Logger.Debugf(k.opts.Context, "some error: %v", err)
sub.RLock() sub.RLock()
closed := sub.closed closed := sub.closed
sub.RUnlock() sub.RUnlock()
if closed { if closed {
return return
} }
if k.opts.Logger.V(logger.TraceLevel) { if k.opts.Logger.V(logger.DebugLevel) {
k.opts.Logger.Tracef(k.opts.Context, "[segmentio] recreate consumer group, as unexpected consumer error %T %v", err, err) k.opts.Logger.Debugf(k.opts.Context, "[segmentio] recreate consumer group, as unexpected consumer error %T %v", err, err)
} }
sub.createGroup(gCtx) sub.createGroup(gCtx)
continue continue
} }
k.opts.Meter.Counter("broker_reader_partitions", "topic", topic).Set(uint64(0)) //k.opts.Meter.Counter("broker_reader_partitions", "topic", topic).Set(uint64(0))
ackCh := make(chan map[string]map[int]int64, DefaultCommitQueueSize) ackCh := make(chan map[string]map[int]int64, DefaultCommitQueueSize)
errChLen := 0 errChLen := 0
for _, assignments := range generation.Assignments { for _, assignments := range generation.Assignments {
@ -600,10 +604,10 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha
cntWait := int32(0) cntWait := int32(0)
for topic, assignments := range generation.Assignments { for topic, assignments := range generation.Assignments {
k.opts.Meter.Counter("broker_reader_partitions", "topic", topic).Set(uint64(len(assignments))) //k.opts.Meter.Counter("broker_reader_partitions", "topic", topic).Set(uint64(len(assignments)))
if k.opts.Logger.V(logger.TraceLevel) { if k.opts.Logger.V(logger.DebugLevel) {
k.opts.Logger.Tracef(k.opts.Context, "topic: %s assignments: %v", topic, assignments) k.opts.Logger.Debugf(k.opts.Context, "topic: %s assignments: %v", topic, assignments)
} }
for _, assignment := range assignments { for _, assignment := range assignments {
cfg := k.readerConfig cfg := k.readerConfig
@ -612,17 +616,9 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha
cfg.GroupID = "" cfg.GroupID = ""
reader := kafka.NewReader(cfg) reader := kafka.NewReader(cfg)
if err := reader.SetOffset(assignment.Offset); err != nil { // as we dont use consumer group in reader, reader not started before actuall fetch, so we can ignore all errors
if k.opts.Logger.V(logger.ErrorLevel) { _ = reader.SetOffset(assignment.Offset)
k.opts.Logger.Errorf(k.opts.Context, "assignments offset %d can be set by reader: %v", assignment.Offset, err)
}
if err = reader.Close(); err != nil {
if k.opts.Logger.V(logger.ErrorLevel) {
k.opts.Logger.Errorf(k.opts.Context, "reader close err: %v", err)
}
}
continue
}
errCh := make(chan error) errCh := make(chan error)
errChs = append(errChs, errCh) errChs = append(errChs, errCh)
cgh := &cgHandler{ cgh := &cgHandler{
@ -640,9 +636,6 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha
generation.Start(cgh.run) generation.Start(cgh.run)
} }
} }
if k.opts.Logger.V(logger.TraceLevel) {
k.opts.Logger.Trace(k.opts.Context, "start async commit loop")
}
// run async commit loop // run async commit loop
go k.commitLoop(generation, k.readerConfig.CommitInterval, ackCh, errChs, &readerDone, commitDoneCh, &cntWait) go k.commitLoop(generation, k.readerConfig.CommitInterval, ackCh, errChs, &readerDone, commitDoneCh, &cntWait)
} }
@ -667,6 +660,10 @@ type cgHandler struct {
func (k *kBroker) commitLoop(generation *kafka.Generation, commitInterval time.Duration, ackCh chan map[string]map[int]int64, errChs []chan error, readerDone *int32, commitDoneCh chan bool, cntWait *int32) { func (k *kBroker) commitLoop(generation *kafka.Generation, commitInterval time.Duration, ackCh chan map[string]map[int]int64, errChs []chan error, readerDone *int32, commitDoneCh chan bool, cntWait *int32) {
if k.opts.Logger.V(logger.DebugLevel) {
k.opts.Logger.Debug(k.opts.Context, "start async commit loop")
}
td := DefaultCommitInterval td := DefaultCommitInterval
if commitInterval > 0 { if commitInterval > 0 {
@ -682,6 +679,7 @@ func (k *kBroker) commitLoop(generation *kafka.Generation, commitInterval time.D
go func() { go func() {
defer func() { defer func() {
k.opts.Logger.Debug(k.opts.Context, "return from commitLoop and close commitDoneCh")
close(commitDoneCh) close(commitDoneCh)
}() }()
@ -691,26 +689,23 @@ func (k *kBroker) commitLoop(generation *kafka.Generation, commitInterval time.D
for { for {
select { select {
case <-checkTicker.C: case <-checkTicker.C:
if atomic.LoadInt32(cntWait) == 0 { if atomic.LoadInt32(cntWait) != 0 {
continue
}
mapMu.Lock() mapMu.Lock()
if len(offsets) > 0 {
if err := generation.CommitOffsets(offsets); err != nil { if err := generation.CommitOffsets(offsets); err != nil {
for _, errCh := range errChs { for _, errCh := range errChs {
errCh <- err errCh <- err
} }
mapMu.Unlock()
return return
} }
}
mapMu.Unlock() mapMu.Unlock()
if k.opts.Logger.V(logger.TraceLevel) { if k.opts.Logger.V(logger.DebugLevel) {
k.opts.Logger.Trace(k.opts.Context, "stop commit loop") k.opts.Logger.Debug(k.opts.Context, "stop commit filling loop")
} }
return return
}
case ack := <-ackCh: case ack := <-ackCh:
if k.opts.Logger.V(logger.TraceLevel) {
// k.opts.Logger.Tracef(k.opts.Context, "new commit offsets: %v", ack)
}
switch td { switch td {
case 0: // sync commits as CommitInterval == 0 case 0: // sync commits as CommitInterval == 0
if len(ack) > 0 { if len(ack) > 0 {
@ -734,27 +729,20 @@ func (k *kBroker) commitLoop(generation *kafka.Generation, commitInterval time.D
} }
mapMu.Unlock() mapMu.Unlock()
} }
// check for readers done and commit offsets
if atomic.LoadInt32(cntWait) == 0 {
mapMu.Lock()
if len(offsets) > 0 {
if err := generation.CommitOffsets(offsets); err != nil {
for _, errCh := range errChs {
errCh <- err
}
return
}
}
mapMu.Unlock()
if k.opts.Logger.V(logger.TraceLevel) {
k.opts.Logger.Trace(k.opts.Context, "stop commit loop")
}
return
}
} }
} }
}() }()
if td == 0 {
//sync commit loop
for {
if atomic.LoadInt32(readerDone) == 1 && atomic.LoadInt32(cntWait) == 0 {
break
}
time.Sleep(1 * time.Second)
}
}
// async commit loop // async commit loop
if td > 0 { if td > 0 {
ticker := time.NewTicker(td) ticker := time.NewTicker(td)
@ -764,22 +752,14 @@ func (k *kBroker) commitLoop(generation *kafka.Generation, commitInterval time.D
for { for {
select { select {
case <-doneTicker.C: case <-doneTicker.C:
if atomic.LoadInt32(readerDone) == 1 { if atomic.LoadInt32(readerDone) == 1 && atomic.LoadInt32(cntWait) == 0 {
mapMu.Lock() // fire immediate commit offsets
if len(offsets) == 0 {
defer ticker.Stop()
return
}
ticker.Stop() ticker.Stop()
} }
case <-ticker.C: case <-ticker.C:
mapMu.Lock() mapMu.Lock()
if len(offsets) == 0 { if k.opts.Logger.V(logger.DebugLevel) && len(offsets) > 0 {
mapMu.Unlock() k.opts.Logger.Debugf(k.opts.Context, "async commit offsets: %v", offsets)
continue
}
if k.opts.Logger.V(logger.TraceLevel) {
k.opts.Logger.Tracef(k.opts.Context, "async commit offsets: %v", offsets)
} }
err := generation.CommitOffsets(offsets) err := generation.CommitOffsets(offsets)
if err != nil { if err != nil {
@ -800,8 +780,8 @@ func (k *kBroker) commitLoop(generation *kafka.Generation, commitInterval time.D
} }
func (h *cgHandler) run(ctx context.Context) { func (h *cgHandler) run(ctx context.Context) {
if h.brokerOpts.Logger.V(logger.TraceLevel) { if h.brokerOpts.Logger.V(logger.DebugLevel) {
h.brokerOpts.Logger.Tracef(ctx, "start partition reader topic: %s partition: %d", h.reader.Config().Topic, h.reader.Config().Partition) h.brokerOpts.Logger.Debugf(ctx, "start partition reader topic: %s partition: %d", h.reader.Config().Topic, h.reader.Config().Partition)
} }
td := DefaultStatsInterval td := DefaultStatsInterval
@ -821,9 +801,11 @@ func (h *cgHandler) run(ctx context.Context) {
if err := h.reader.Close(); err != nil && h.brokerOpts.Logger.V(logger.ErrorLevel) { if err := h.reader.Close(); err != nil && h.brokerOpts.Logger.V(logger.ErrorLevel) {
h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] reader for topic %s partition %d close error: %v", h.reader.Config().Topic, h.reader.Config().Partition, err) h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] reader for topic %s partition %d close error: %v", h.reader.Config().Topic, h.reader.Config().Partition, err)
} }
h.brokerOpts.Logger.Debug(h.brokerOpts.Context, "wait start for commitDoneCh channel closing")
<-h.commitDoneCh <-h.commitDoneCh
if h.brokerOpts.Logger.V(logger.TraceLevel) { h.brokerOpts.Logger.Debug(h.brokerOpts.Context, "wait stop for commitDoneCh channel closing")
h.brokerOpts.Logger.Tracef(ctx, "stop partition reader topic: %s partition: %d", h.reader.Config().Topic, h.reader.Config().Partition) if h.brokerOpts.Logger.V(logger.DebugLevel) {
h.brokerOpts.Logger.Debugf(ctx, "stop partition reader topic: %s partition: %d", h.reader.Config().Topic, h.reader.Config().Partition)
} }
}() }()
@ -855,15 +837,23 @@ func (h *cgHandler) run(ctx context.Context) {
default: default:
msg, err := h.reader.ReadMessage(ctx) msg, err := h.reader.ReadMessage(ctx)
switch err { switch err {
default:
switch kerr := err.(type) {
case kafka.Error:
if h.brokerOpts.Logger.V(logger.DebugLevel) {
h.brokerOpts.Logger.Debugf(h.brokerOpts.Context, "[segmentio] kafka error %T err: %v", kerr, kerr)
}
return
default: default:
if h.brokerOpts.Logger.V(logger.ErrorLevel) { if h.brokerOpts.Logger.V(logger.ErrorLevel) {
h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] unexpected error type: %T err: %v", err, err) h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] unexpected error type: %T err: %v", err, err)
} }
return return
}
case kafka.ErrGenerationEnded: case kafka.ErrGenerationEnded:
// generation has ended // generation has ended
if h.brokerOpts.Logger.V(logger.TraceLevel) { if h.brokerOpts.Logger.V(logger.DebugLevel) {
h.brokerOpts.Logger.Trace(h.brokerOpts.Context, "[segmentio] generation ended, rebalance or close") h.brokerOpts.Logger.Debug(h.brokerOpts.Context, "[segmentio] generation ended, rebalance or close")
} }
return return
case nil: case nil:
@ -885,7 +875,7 @@ func (h *cgHandler) run(ctx context.Context) {
p.msg.Body = msg.Value p.msg.Body = msg.Value
} else { } else {
if err := h.brokerOpts.Codec.Unmarshal(msg.Value, p.msg); err != nil { if err := h.brokerOpts.Codec.Unmarshal(msg.Value, p.msg); err != nil {
p.SetError(err) p.err = err
p.msg.Body = msg.Value p.msg.Body = msg.Value
if eh != nil { if eh != nil {
_ = eh(p) _ = eh(p)
@ -927,6 +917,7 @@ func (h *cgHandler) run(ctx context.Context) {
} }
func (sub *subscriber) createGroup(ctx context.Context) { func (sub *subscriber) createGroup(ctx context.Context) {
var err error
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -934,8 +925,20 @@ func (sub *subscriber) createGroup(ctx context.Context) {
default: default:
sub.RLock() sub.RLock()
cgcfg := sub.cgcfg cgcfg := sub.cgcfg
closed := sub.closed
cgroup := sub.group
sub.RUnlock() sub.RUnlock()
cgroup, err := kafka.NewConsumerGroup(cgcfg) if closed {
return
}
if cgroup != nil {
if err = cgroup.Close(); err != nil {
if sub.brokerOpts.Logger.V(logger.ErrorLevel) {
sub.brokerOpts.Logger.Errorf(sub.brokerOpts.Context, "[segmentio]: consumer group close error %v", err)
}
}
}
cgroup, err = kafka.NewConsumerGroup(cgcfg)
if err != nil { if err != nil {
if sub.brokerOpts.Logger.V(logger.ErrorLevel) { if sub.brokerOpts.Logger.V(logger.ErrorLevel) {
sub.brokerOpts.Logger.Errorf(sub.brokerOpts.Context, "[segmentio]: consumer group error %v", err) sub.brokerOpts.Logger.Errorf(sub.brokerOpts.Context, "[segmentio]: consumer group error %v", err)
@ -1002,6 +1005,7 @@ func (k *kBroker) configure(opts ...broker.Option) error {
k.addrs = cAddrs k.addrs = cAddrs
k.readerConfig = readerConfig k.readerConfig = readerConfig
k.writerConfig = writerConfig k.writerConfig = writerConfig
k.writerConfig.Brokers = k.addrs
if k.readerConfig.Dialer == nil { if k.readerConfig.Dialer == nil {
k.readerConfig.Dialer = kafka.DefaultDialer k.readerConfig.Dialer = kafka.DefaultDialer
@ -1014,30 +1018,7 @@ func (k *kBroker) configure(opts ...broker.Option) error {
k.readerConfig.Dialer.ClientID = id k.readerConfig.Dialer.ClientID = id
} }
k.writer = &kafka.Writer{ k.writer = newWriter(k.writerConfig)
Addr: kafka.TCP(k.addrs...),
Balancer: k.writerConfig.Balancer,
MaxAttempts: k.writerConfig.MaxAttempts,
BatchSize: k.writerConfig.BatchSize,
BatchBytes: int64(k.writerConfig.BatchBytes),
BatchTimeout: k.writerConfig.BatchTimeout,
ReadTimeout: k.writerConfig.ReadTimeout,
WriteTimeout: k.writerConfig.WriteTimeout,
RequiredAcks: kafka.RequiredAcks(k.writerConfig.RequiredAcks),
Async: k.writerConfig.Async,
//Completion: writerConfig.Completion,
//Compression: writerConfig.Compression,
Logger: k.writerConfig.Logger,
ErrorLogger: k.writerConfig.ErrorLogger,
Transport: &kafka.Transport{
Dial: k.writerConfig.Dialer.DialFunc,
ClientID: k.writerConfig.Dialer.ClientID,
IdleTimeout: time.Second * 5,
MetadataTTL: time.Second * 9,
SASL: k.writerConfig.Dialer.SASLMechanism,
},
}
if fn, ok := k.opts.Context.Value(writerCompletionFunc{}).(func([]kafka.Message, error)); ok { if fn, ok := k.opts.Context.Value(writerCompletionFunc{}).(func([]kafka.Message, error)); ok {
k.writer.Completion = fn k.writer.Completion = fn
} }
@ -1046,6 +1027,30 @@ func (k *kBroker) configure(opts ...broker.Option) error {
return nil return nil
} }
func newWriter(writerConfig kafka.WriterConfig) *kafka.Writer {
return &kafka.Writer{
Addr: kafka.TCP(writerConfig.Brokers...),
Balancer: writerConfig.Balancer,
MaxAttempts: writerConfig.MaxAttempts,
BatchSize: writerConfig.BatchSize,
BatchBytes: int64(writerConfig.BatchBytes),
BatchTimeout: writerConfig.BatchTimeout,
ReadTimeout: writerConfig.ReadTimeout,
WriteTimeout: writerConfig.WriteTimeout,
RequiredAcks: kafka.RequiredAcks(writerConfig.RequiredAcks),
Async: writerConfig.Async,
Logger: writerConfig.Logger,
ErrorLogger: writerConfig.ErrorLogger,
Transport: &kafka.Transport{
Dial: writerConfig.Dialer.DialFunc,
ClientID: writerConfig.Dialer.ClientID,
IdleTimeout: time.Second * 5,
MetadataTTL: time.Second * 9,
SASL: writerConfig.Dialer.SASLMechanism,
},
}
}
func NewBroker(opts ...broker.Option) broker.Broker { func NewBroker(opts ...broker.Option) broker.Broker {
return &kBroker{ return &kBroker{
opts: broker.NewOptions(opts...), opts: broker.NewOptions(opts...),