diff --git a/broker_test.go b/broker_test.go index e9d55c7..63ec32b 100644 --- a/broker_test.go +++ b/broker_test.go @@ -23,7 +23,10 @@ func TestPubSub(t *testing.T) { t.Skip() } - logger.DefaultLogger.Init(logger.WithLevel(logger.TraceLevel)) + if err := logger.DefaultLogger.Init(logger.WithLevel(logger.TraceLevel)); err != nil { + t.Fatal(err) + } + ctx := context.Background() var addrs []string @@ -33,7 +36,7 @@ func TestPubSub(t *testing.T) { addrs = strings.Split(addr, ",") } - b := segmentio.NewBroker(broker.Addrs(addrs...)) + b := segmentio.NewBroker(broker.Addrs(addrs...), segmentio.ClientID("test")) if err := b.Init(); err != nil { t.Fatal(err) } diff --git a/go.mod b/go.mod index cdf3918..0c9b901 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,10 @@ module github.com/unistack-org/micro-broker-segmentio/v3 go 1.16 require ( - github.com/google/uuid v1.2.0 - github.com/segmentio/kafka-go v0.4.16 - github.com/unistack-org/micro/v3 v3.4.7 + github.com/golang/snappy v0.0.4 // indirect + github.com/google/uuid v1.3.0 + github.com/klauspost/compress v1.13.1 // indirect + github.com/pierrec/lz4 v2.6.1+incompatible // indirect + github.com/segmentio/kafka-go v0.4.17 + github.com/unistack-org/micro/v3 v3.4.9 ) diff --git a/go.sum b/go.sum index 018b23a..fa52e7e 100644 --- a/go.sum +++ b/go.sum @@ -8,13 +8,20 @@ github.com/frankban/quicktest v1.11.3 h1:8sXhOn0uLys67V8EsXLc6eszDs8VXWxL3iRvebP github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= github.com/klauspost/compress v1.9.8 h1:VMAMUUOh+gaxKTMk+zqbjsSjsIcUcL/LF4o63i82QyA= github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= +github.com/klauspost/compress v1.13.1 h1:wXr2uRxZTJXHLly6qhJabee5JqIhTRoLBhDOA74hDEQ= +github.com/klauspost/compress v1.13.1/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -23,16 +30,22 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A= github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= +github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/segmentio/kafka-go v0.4.16 h1:9dt78ehM9qzAkekA60D6A96RlqDzC3hnYYa8y5Szd+U= github.com/segmentio/kafka-go v0.4.16/go.mod h1:19+Eg7KwrNKy/PFhiIthEPkO8k+ac7/ZYXwYM9Df10w= +github.com/segmentio/kafka-go v0.4.17 h1:IyqRstL9KUTDb3kyGPOOa5VffokKWSEzN6geJ92dSDY= +github.com/segmentio/kafka-go v0.4.17/go.mod h1:19+Eg7KwrNKy/PFhiIthEPkO8k+ac7/ZYXwYM9Df10w= github.com/silas/dag v0.0.0-20210121180416-41cf55125c34/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/unistack-org/micro/v3 v3.4.7 h1:zmGFx2J6tIbmr4IGLcc+LNtbftQFZI42bfuNV5xNYM0= github.com/unistack-org/micro/v3 v3.4.7/go.mod h1:LXmPfbJnJNvL0kQs8HfnkV3Wya2Wb+C7keVq++RCZnk= +github.com/unistack-org/micro/v3 v3.4.9 h1:IBCW/yxQijO/X+2zNzTdSsvzlgE0+y49bvjWemtY2zA= +github.com/unistack-org/micro/v3 v3.4.9/go.mod h1:1ZkwpEqpiHiVhM2hiF9DamtpsF04oFybFhEQ4zEMcro= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0= diff --git a/options.go b/options.go index cc03ccf..efdffda 100644 --- a/options.go +++ b/options.go @@ -10,9 +10,11 @@ import ( ) var ( - DefaultReaderConfig = kafka.ReaderConfig{} - DefaultWriterConfig = kafka.WriterConfig{} - DefaultStatsInterval = time.Second * 10 + DefaultReaderConfig = kafka.ReaderConfig{} + DefaultWriterConfig = kafka.WriterConfig{} + DefaultStatsInterval = time.Second * 10 + DefaultCommitInterval = time.Second * 2 + DefaultCommitQueueSize = 2000 ) type readerConfigKey struct{} @@ -66,3 +68,15 @@ type writerCompletionFunc struct{} func WriterCompletionFunc(fn func([]kafka.Message, error)) broker.Option { return broker.SetOption(writerCompletionFunc{}, fn) } + +type clientIDKey struct{} + +func ClientID(id string) broker.Option { + return broker.SetOption(clientIDKey{}, id) +} + +type commitIntervalKey struct{} + +func CommitInterval(td time.Duration) broker.Option { + return broker.SetOption(commitIntervalKey{}, td) +} diff --git a/segmentio.go b/segmentio.go index 5f17325..6e340b3 100644 --- a/segmentio.go +++ b/segmentio.go @@ -28,14 +28,8 @@ type kBroker struct { } type subscriber struct { - k *kBroker topic string opts broker.SubscribeOptions - offset int64 - gen *kafka.Generation - partition int - handler broker.Handler - reader *kafka.Reader closed bool done chan struct{} group *kafka.ConsumerGroup @@ -45,15 +39,14 @@ type subscriber struct { } type publication struct { - topic string - err error - m *broker.Message - opts broker.Options - ctx context.Context - generation *kafka.Generation - reader *kafka.Reader - km kafka.Message - offsets map[string]map[int]int64 // for commit offsets + topic string + partition int + offset int64 + err error + ackErr *error + msg *broker.Message + ackCh chan map[string]map[int]int64 + sync.Mutex } func (p *publication) Topic() string { @@ -61,14 +54,12 @@ func (p *publication) Topic() string { } func (p *publication) Message() *broker.Message { - return p.m + return p.msg } func (p *publication) Ack() error { - if p.opts.Logger.V(logger.TraceLevel) { - p.opts.Logger.Tracef(p.opts.Context, "commit offset %#+v\n", p.offsets) - } - return p.generation.CommitOffsets(p.offsets) + p.ackCh <- map[string]map[int]int64{p.topic: {p.partition: p.offset}} + return *p.ackErr } func (p *publication) Error() error { @@ -276,7 +267,6 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha Logger: k.readerConfig.Logger, ErrorLogger: k.readerConfig.ErrorLogger, } - cgcfg.StartOffset = kafka.LastOffset if err := cgcfg.Validate(); err != nil { return nil, err } @@ -333,7 +323,7 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha sub.RUnlock() if !closed { if k.opts.Logger.V(logger.ErrorLevel) { - k.opts.Logger.Errorf(k.opts.Context, "[segmentio] recreate consumer group, as it closed %v", k.opts.Context.Err()) + k.opts.Logger.Errorf(k.opts.Context, "[segmentio] recreate consumer group, as it closed by kafka %v", k.opts.Context.Err()) } if err = group.Close(); err != nil && k.opts.Logger.V(logger.ErrorLevel) { k.opts.Logger.Errorf(k.opts.Context, "[segmentio] consumer group close error %v", err) @@ -349,35 +339,52 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha sub.RUnlock() if !closed { if k.opts.Logger.V(logger.TraceLevel) { - k.opts.Logger.Tracef(k.opts.Context, "[segmentio] recreate consumer group, as unexpected consumer error %v", err) + k.opts.Logger.Tracef(k.opts.Context, "[segmentio] recreate consumer group, as unexpected consumer error %T %v", err, err) } } if err = group.Close(); err != nil && k.opts.Logger.V(logger.ErrorLevel) { k.opts.Logger.Errorf(k.opts.Context, "[segmentio] consumer group close error %v", err) } - sub.createGroup(k.opts.Context) + sub.createGroup(gCtx) continue } - for _, t := range cgcfg.Topics { - assignments := generation.Assignments[t] + var wg sync.WaitGroup + + ackCh := make(chan map[string]map[int]int64, DefaultCommitQueueSize) + errChLen := 0 + for _, assignments := range generation.Assignments { + errChLen += len(assignments) + } + errChs := make([]chan error, errChLen) + + for topic, assignments := range generation.Assignments { + if k.opts.Logger.V(logger.TraceLevel) { + k.opts.Logger.Tracef(k.opts.Context, "topic: %s assignments: %v", topic, assignments) + } for _, assignment := range assignments { + errCh := make(chan error) cfg := k.readerConfig - cfg.Topic = t + cfg.Topic = topic cfg.Partition = assignment.ID cfg.GroupID = "" - // break reading reader := kafka.NewReader(cfg) - if k.opts.Logger.V(logger.TraceLevel) { - k.opts.Logger.Tracef(k.opts.Context, "[segmentio] reader current offset: %v new offset: %v", reader.Offset(), assignment.Offset) - } - reader.SetOffset(assignment.Offset) - cgh := &cgHandler{generation: generation, brokerOpts: k.opts, subOpts: opt, reader: reader, handler: handler} + if err := reader.SetOffset(assignment.Offset); err != nil { + if k.opts.Logger.V(logger.ErrorLevel) { + k.opts.Logger.Errorf(k.opts.Context, "assignments offset %d can be set by reader: %v", assignment.Offset, err) + } + continue + } + cgh := &cgHandler{brokerOpts: k.opts, subOpts: opt, reader: reader, handler: handler, ackCh: ackCh, errCh: errCh, wg: &wg} generation.Start(cgh.run) } } - + if k.opts.Logger.V(logger.TraceLevel) { + k.opts.Logger.Trace(k.opts.Context, "start async commit loop") + } + // run async commit loop + go k.commitLoop(generation, ackCh, errChs, &wg) } } }() @@ -386,18 +393,72 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha } type cgHandler struct { - topic string - generation *kafka.Generation brokerOpts broker.Options subOpts broker.SubscribeOptions reader *kafka.Reader handler broker.Handler + ackCh chan map[string]map[int]int64 + errCh chan error + wg *sync.WaitGroup +} + +func (k *kBroker) commitLoop(generation *kafka.Generation, ackCh chan map[string]map[int]int64, errChs []chan error, wg *sync.WaitGroup) { + var mapMu sync.Mutex + + td := DefaultCommitInterval + if v, ok := k.opts.Context.Value(commitIntervalKey{}).(time.Duration); ok && td > 0 { + td = v + } + + ticker := time.NewTicker(td) + defer ticker.Stop() + + offsets := make(map[string]map[int]int64, 4) + + for { + select { + default: + wg.Wait() + if k.opts.Logger.V(logger.TraceLevel) { + k.opts.Logger.Trace(k.opts.Context, "all readers are done, return from commit loop") + } + return + case ack := <-ackCh: + mapMu.Lock() + for k, v := range ack { + if _, ok := offsets[k]; !ok { + offsets[k] = make(map[int]int64, 4) + } + for p, o := range v { + offsets[k][p] = o + 1 + } + } + mapMu.Unlock() + case <-ticker.C: + mapMu.Lock() + if len(offsets) == 0 { + mapMu.Unlock() + continue + } + if k.opts.Logger.V(logger.TraceLevel) { + k.opts.Logger.Tracef(k.opts.Context, "commit offsets: %v", offsets) + } + err := generation.CommitOffsets(offsets) + if err != nil { + for _, errCh := range errChs { + errCh <- err + close(errCh) + } + mapMu.Unlock() + return + } + mapMu.Unlock() + offsets = make(map[string]map[int]int64, 4) + } + } } func (h *cgHandler) run(ctx context.Context) { - offsets := make(map[string]map[int]int64) - offsets[h.reader.Config().Topic] = make(map[int]int64) - td := DefaultStatsInterval if v, ok := h.brokerOpts.Context.Value(statsIntervalKey{}).(time.Duration); ok && td > 0 { td = v @@ -405,72 +466,91 @@ func (h *cgHandler) run(ctx context.Context) { go readerStats(ctx, h.reader, td, h.brokerOpts.Meter) + commitDuration := DefaultCommitInterval + if v, ok := h.brokerOpts.Context.Value(commitIntervalKey{}).(time.Duration); ok && td > 0 { + commitDuration = v + } + + var commitErr error + + h.wg.Add(1) + defer func() { + h.wg.Done() if err := h.reader.Close(); err != nil && h.brokerOpts.Logger.V(logger.ErrorLevel) { h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] reader close error: %v", err) } }() + + go func() { + for { + select { + case err := <-h.errCh: + commitErr = err + case <-ctx.Done(): + time.Sleep(commitDuration) + return + } + } + }() + for { - select { - case <-ctx.Done(): - return + msg, err := h.reader.ReadMessage(ctx) + switch err { default: - msg, err := h.reader.ReadMessage(ctx) - switch err { - default: - if h.brokerOpts.Logger.V(logger.ErrorLevel) { - h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] unexpected error: %v", err) - } - return - case kafka.ErrGenerationEnded: - // generation has ended - if h.brokerOpts.Logger.V(logger.TraceLevel) { - h.brokerOpts.Logger.Trace(h.brokerOpts.Context, "[segmentio] generation ended") - } - return - case nil: - eh := h.brokerOpts.ErrorHandler + if h.brokerOpts.Logger.V(logger.ErrorLevel) { + h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] unexpected error type: %T err: %v", err, err) + } + return + case kafka.ErrGenerationEnded: + // generation has ended + if h.brokerOpts.Logger.V(logger.TraceLevel) { + h.brokerOpts.Logger.Trace(h.brokerOpts.Context, "[segmentio] generation ended, rebalance") + } + return + case nil: + eh := h.brokerOpts.ErrorHandler - if h.subOpts.ErrorHandler != nil { - eh = h.subOpts.ErrorHandler - } - offsets[msg.Topic][msg.Partition] = msg.Offset + 1 - // github.com/segmentio/kafka-go/commit.go makeCommit builds commit message with offset + 1 - // zookeeper store offset which needs to be sent on new consumer, so current + 1 - p := &publication{topic: msg.Topic, opts: h.brokerOpts, generation: h.generation, m: &broker.Message{}, offsets: offsets} + if h.subOpts.ErrorHandler != nil { + eh = h.subOpts.ErrorHandler + } + p := &publication{ackCh: h.ackCh, partition: msg.Partition, offset: msg.Offset, topic: msg.Topic, msg: &broker.Message{}} - if h.subOpts.BodyOnly { - p.m.Body = msg.Value - } else { - if err := h.brokerOpts.Codec.Unmarshal(msg.Value, p.m); err != nil { - p.err = err - p.m.Body = msg.Value - if eh != nil { - eh(p) - } else { - if h.brokerOpts.Logger.V(logger.ErrorLevel) { - h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio]: failed to unmarshal: %v", err) - } - } - continue - } - } - err = h.handler(p) - if err == nil && h.subOpts.AutoAck { - if err = p.Ack(); err != nil { - if h.brokerOpts.Logger.V(logger.ErrorLevel) { - h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio]: unable to commit msg: %v", err) - } - } - } else if err != nil { + if h.subOpts.BodyOnly { + p.msg.Body = msg.Value + } else { + if err := h.brokerOpts.Codec.Unmarshal(msg.Value, p.msg); err != nil { p.err = err + p.msg.Body = msg.Value if eh != nil { - eh(p) + _ = eh(p) } else { if h.brokerOpts.Logger.V(logger.ErrorLevel) { - h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio]: subscriber error: %v", err) + h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio]: failed to unmarshal: %v", err) } } + continue + } + } + p.Lock() + p.ackErr = &commitErr + p.Unlock() + err = h.handler(p) + if err == nil && h.subOpts.AutoAck { + if err = p.Ack(); err != nil { + if h.brokerOpts.Logger.V(logger.ErrorLevel) { + h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio]: unable to commit msg: %v", err) + return + } + } + } else if err != nil { + p.err = err + if eh != nil { + _ = eh(p) + } else { + if h.brokerOpts.Logger.V(logger.ErrorLevel) { + h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio]: subscriber error: %v", err) + } } } } @@ -572,6 +652,12 @@ func (k *kBroker) configure(opts ...broker.Option) error { ErrorLogger: writerConfig.ErrorLogger, //Transport: writerConfig.Transport, } + if id, ok := k.opts.Context.Value(clientIDKey{}).(string); ok { + if k.readerConfig.Dialer == nil { + k.readerConfig.Dialer = kafka.DefaultDialer + } + k.readerConfig.Dialer.ClientID = id + } if fn, ok := k.opts.Context.Value(writerCompletionFunc{}).(func([]kafka.Message, error)); ok { k.writer.Completion = fn diff --git a/segmentio_test.go b/segmentio_test.go index e8cf849..6865839 100644 --- a/segmentio_test.go +++ b/segmentio_test.go @@ -14,7 +14,10 @@ import ( func TestSegmentioSubscribe(t *testing.T) { ctx := context.Background() - logger.DefaultLogger.Init(logger.WithLevel(logger.TraceLevel)) + if err := logger.DefaultLogger.Init(logger.WithLevel(logger.TraceLevel)); err != nil { + t.Fatal(err) + } + if tr := os.Getenv("INTEGRATION_TESTS"); len(tr) > 0 { t.Skip() } @@ -43,7 +46,6 @@ func TestSegmentioSubscribe(t *testing.T) { done := make(chan struct{}, 100) fn := func(msg broker.Event) error { if err := msg.Ack(); err != nil { - panic(err) return err } done <- struct{}{} @@ -179,7 +181,7 @@ func BenchmarkSegmentioCodecJsonSubscribe(b *testing.B) { return } if err := brk.Publish(ctx, "test_topic", bm); err != nil { - b.Fatal(err) + panic(err) } } }()