diff --git a/go.mod b/go.mod index da9d848..d84b4a1 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/frankban/quicktest v1.4.1 // indirect github.com/google/uuid v1.1.1 github.com/micro/go-micro/v2 v2.3.0 + github.com/micro/go-plugins/codec/segmentio/v2 v2.3.0 github.com/pierrec/lz4 v2.2.6+incompatible // indirect github.com/segmentio/kafka-go v0.3.5 golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect diff --git a/go.sum b/go.sum index 07e44d9..375c2af 100644 --- a/go.sum +++ b/go.sum @@ -274,6 +274,8 @@ github.com/micro/cli/v2 v2.1.2 h1:43J1lChg/rZCC1rvdqZNFSQDrGT7qfMrtp6/ztpIkEM= github.com/micro/cli/v2 v2.1.2/go.mod h1:EguNh6DAoWKm9nmk+k/Rg0H3lQnDxqzu5x5srOtGtYg= github.com/micro/go-micro/v2 v2.3.0 h1:3seJJ7/pbhleZNe6gGHFJjOsAqvYGcy2ivc3P5PYnVQ= github.com/micro/go-micro/v2 v2.3.0/go.mod h1:GR69d1AXMg/WjMNf/7K1VO6hCBJDIpqCqnVYNTV6M5w= +github.com/micro/go-plugins/codec/segmentio/v2 v2.3.0 h1:VKWhtEHd1x0PYuU1YoGeBHgAs06aiThleV2v0LruK+g= +github.com/micro/go-plugins/codec/segmentio/v2 v2.3.0/go.mod h1:sblO7/JViOU+cTq4VvqzzWVbwEZvX2hoBgnIZ/cf+HI= github.com/micro/mdns v0.3.0 h1:bYycYe+98AXR3s8Nq5qvt6C573uFTDPIYzJemWON0QE= github.com/micro/mdns v0.3.0/go.mod h1:KJ0dW7KmicXU2BV++qkLlmHYcVv7/hHnbtguSWt9Aoc= github.com/miekg/dns v1.1.3/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= @@ -376,6 +378,8 @@ github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0 github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/sacloud/libsacloud v1.26.1/go.mod h1:79ZwATmHLIFZIMd7sxA3LwzVy/B77uj3LDoToVTxDoQ= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= +github.com/segmentio/encoding v0.1.10 h1:0b8dva47cSuNQR5ZcU3d0pfi9EnPpSK6q7y5ZGEW36Q= +github.com/segmentio/encoding v0.1.10/go.mod h1:RWhr02uzMB9gQC1x+MfYxedtmBibb9cZ6Vv9VxRSSbw= github.com/segmentio/kafka-go v0.3.5 h1:2JVT1inno7LxEASWj+HflHh5sWGfM0gkRiLAxkXhGG4= github.com/segmentio/kafka-go v0.3.5/go.mod h1:OT5KXBPbaJJTcvokhWR2KFmm0niEx3mnccTwjmLvSi4= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= diff --git a/kafka.go b/kafka.go index 3cd06c8..f3d915d 100644 --- a/kafka.go +++ b/kafka.go @@ -4,7 +4,6 @@ package segmentio import ( "context" "errors" - "io" "sync" "github.com/google/uuid" @@ -30,19 +29,18 @@ type kBroker struct { } type subscriber struct { - reader *kafka.Reader - t string - opts broker.SubscribeOptions + group *kafka.ConsumerGroup + t string + opts broker.SubscribeOptions } type publication struct { - t string - err error - reader *kafka.Reader - // deprecate broker.Message and use kafka.Message directly? + t string + err error m *broker.Message ctx context.Context - km kafka.Message + gen *kafka.Generation + mp map[string]map[int]int64 // for commit offsets } func init() { @@ -58,7 +56,7 @@ func (p *publication) Message() *broker.Message { } func (p *publication) Ack() error { - return p.reader.CommitMessages(p.ctx, p.km) + return p.gen.CommitOffsets(p.mp) } func (p *publication) Error() error { @@ -74,7 +72,7 @@ func (s *subscriber) Topic() string { } func (s *subscriber) Unsubscribe() error { - return s.reader.Close() + return s.group.Close() } func (k *kBroker) Address() string { @@ -197,73 +195,99 @@ func (k *kBroker) Subscribe(topic string, handler broker.Handler, opts ...broker o(&opt) } - k.Lock() - reader, ok := k.readers[topic] - if !ok { - cfg := k.readerConfig - cfg.Topic = topic - cfg.GroupID = opt.Queue - if err := cfg.Validate(); err != nil { - k.Unlock() - return nil, err - } - reader = kafka.NewReader(cfg) - k.readers[topic] = reader + gcfg := kafka.ConsumerGroupConfig{ + ID: opt.Queue, + WatchPartitionChanges: true, + Brokers: k.readerConfig.Brokers, + Topics: []string{topic}, } - k.Unlock() + if err := gcfg.Validate(); err != nil { + return nil, err + } + + group, err := kafka.NewConsumerGroup(gcfg) + if err != nil { + return nil, err + } + + sub := &subscriber{group: group, opts: opt, t: topic} + go func() { for { - select { - case <-k.opts.Context.Done(): + gen, err := group.Next(k.opts.Context) + if err == kafka.ErrGroupClosed { return - default: - msg, err := reader.FetchMessage(k.opts.Context) - if err != nil && err != io.EOF { - if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Errorf("[kafka] subscribe error: %v", err) - return - } - } else if err == io.EOF { - // reader closed - return + } else if err != nil { + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("[kafka] subscribe error: %v", err) } + return + } + assignments := gen.Assignments[topic] + for _, assignment := range assignments { + partition, offset := assignment.ID, assignment.Offset + p := &publication{t: topic, ctx: k.opts.Context, gen: gen} + p.mp = map[string]map[int]int64{p.t: {partition: offset}} - var m broker.Message - p := &publication{m: &m, t: msg.Topic, km: msg, ctx: k.opts.Context, reader: reader} - eh := k.opts.ErrorHandler - - if err := k.opts.Codec.Unmarshal(msg.Value, &m); err != nil { - p.err = err - p.m.Body = msg.Value - if eh != nil { - eh(p) - } else { - if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Errorf("[kafka]: failed to unmarshal: %v", err) + gen.Start(func(ctx context.Context) { + // create reader for this partition. + reader := kafka.NewReader(kafka.ReaderConfig{ + //GroupID: gen.GroupID, + Brokers: gcfg.Brokers, + Topic: topic, + Partition: partition, + }) + defer reader.Close() + // seek to the last committed offset for this partition. + reader.SetOffset(offset) + for { + msg, err := reader.ReadMessage(ctx) + switch err { + case kafka.ErrGenerationEnded: + // generation has ended + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debug("[kafka] subscription closed") + } + return + case nil: + var m broker.Message + eh := k.opts.ErrorHandler + p.m = &m + if err := k.opts.Codec.Unmarshal(msg.Value, &m); err != nil { + p.err = err + p.m.Body = msg.Value + if eh != nil { + eh(p) + } else { + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("[kafka]: failed to unmarshal: %v", err) + } + } + continue + } + err = handler(p) + if err == nil && opt.AutoAck { + if err = p.Ack(); err != nil { + logger.Errorf("[kafka]: unable to commit msg: %v", err) + } + } else if err != nil { + p.err = err + if eh != nil { + eh(p) + } else { + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("[kafka]: subscriber error: %v", err) + } + } + } } } - continue - } - err = handler(p) - if err == nil && opt.AutoAck { - if err = reader.CommitMessages(k.opts.Context, msg); err != nil { - logger.Errorf("[kafka]: unable to commit msg: %v", err) - } - } else if err != nil { - p.err = err - if eh != nil { - eh(p) - } else { - if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Errorf("[kafka]: subscriber error: %v", err) - } - } - } + }) } } }() - return &subscriber{reader: reader, opts: opt, t: topic}, nil + return sub, nil } func (k *kBroker) String() string { diff --git a/kafka_test.go b/kafka_test.go index 645038f..d78dd61 100644 --- a/kafka_test.go +++ b/kafka_test.go @@ -2,9 +2,11 @@ package segmentio import ( "os" + "strings" "testing" "github.com/micro/go-micro/v2/broker" + segjson "github.com/micro/go-plugins/codec/segmentio/v2" ) var ( @@ -18,7 +20,15 @@ func TestPublish(t *testing.T) { if tr := os.Getenv("TRAVIS"); len(tr) > 0 { t.Skip() } - b := NewBroker(broker.Addrs("127.0.0.1:9092")) + + var addrs []string + if addr := os.Getenv("BROKER_ADDRS"); len(addr) == 0 { + addrs = []string{"127.0.0.1:9092"} + } else { + addrs = strings.Split(addr, ",") + } + + b := NewBroker(broker.Codec(segjson.Marshaler{}), broker.Addrs(addrs...)) if err := b.Connect(); err != nil { t.Fatal(err) } @@ -28,7 +38,9 @@ func TestPublish(t *testing.T) { } }() + done := make(chan bool, 1) fn := func(msg broker.Event) error { + done <- true return msg.Ack() } @@ -45,14 +57,22 @@ func TestPublish(t *testing.T) { if err := b.Publish("test_topic", bm); err != nil { t.Fatal(err) } - select {} + <-done } func BenchmarkSegmentioPublish(b *testing.B) { if tr := os.Getenv("TRAVIS"); len(tr) > 0 { b.Skip() } - brk := NewBroker(broker.Addrs("127.0.0.1:9092")) + + var addrs []string + if addr := os.Getenv("BROKER_ADDRS"); len(addr) == 0 { + addrs = []string{"127.0.0.1:9092"} + } else { + addrs = strings.Split(addr, ",") + } + + brk := NewBroker(broker.Codec(segjson.Marshaler{}), broker.Addrs(addrs...)) if err := brk.Connect(); err != nil { b.Fatal(err) } @@ -76,7 +96,15 @@ func BenchmarkSegmentioSubscribe(b *testing.B) { if tr := os.Getenv("TRAVIS"); len(tr) > 0 { b.Skip() } - brk := NewBroker(broker.Addrs("127.0.0.1:9092")) + + var addrs []string + if addr := os.Getenv("BROKER_ADDRS"); len(addr) == 0 { + addrs = []string{"127.0.0.1:9092"} + } else { + addrs = strings.Split(addr, ",") + } + + brk := NewBroker(broker.Codec(segjson.Marshaler{}), broker.Addrs(addrs...)) if err := brk.Connect(); err != nil { b.Fatal(err) } @@ -93,12 +121,13 @@ func BenchmarkSegmentioSubscribe(b *testing.B) { b.ResetTimer() } cnt++ - if cnt == 10000 { + if cnt == b.N { close(done) } return msg.Ack() } - for i := 0; i < 10000; i++ { + + for i := 0; i < b.N; i++ { if err := brk.Publish("test_topic", bm); err != nil { b.Fatal(err) } @@ -113,7 +142,6 @@ func BenchmarkSegmentioSubscribe(b *testing.B) { b.Fatal(err) } }() - <-done }