broker/segmentio: parallel partition processing

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2020-04-13 15:51:23 +03:00
parent 0a547f3b11
commit cba22d5cf0
4 changed files with 130 additions and 73 deletions

1
go.mod
View File

@ -6,6 +6,7 @@ require (
github.com/frankban/quicktest v1.4.1 // indirect
github.com/google/uuid v1.1.1
github.com/micro/go-micro/v2 v2.3.0
github.com/micro/go-plugins/codec/segmentio/v2 v2.3.0
github.com/pierrec/lz4 v2.2.6+incompatible // indirect
github.com/segmentio/kafka-go v0.3.5
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect

4
go.sum
View File

@ -274,6 +274,8 @@ github.com/micro/cli/v2 v2.1.2 h1:43J1lChg/rZCC1rvdqZNFSQDrGT7qfMrtp6/ztpIkEM=
github.com/micro/cli/v2 v2.1.2/go.mod h1:EguNh6DAoWKm9nmk+k/Rg0H3lQnDxqzu5x5srOtGtYg=
github.com/micro/go-micro/v2 v2.3.0 h1:3seJJ7/pbhleZNe6gGHFJjOsAqvYGcy2ivc3P5PYnVQ=
github.com/micro/go-micro/v2 v2.3.0/go.mod h1:GR69d1AXMg/WjMNf/7K1VO6hCBJDIpqCqnVYNTV6M5w=
github.com/micro/go-plugins/codec/segmentio/v2 v2.3.0 h1:VKWhtEHd1x0PYuU1YoGeBHgAs06aiThleV2v0LruK+g=
github.com/micro/go-plugins/codec/segmentio/v2 v2.3.0/go.mod h1:sblO7/JViOU+cTq4VvqzzWVbwEZvX2hoBgnIZ/cf+HI=
github.com/micro/mdns v0.3.0 h1:bYycYe+98AXR3s8Nq5qvt6C573uFTDPIYzJemWON0QE=
github.com/micro/mdns v0.3.0/go.mod h1:KJ0dW7KmicXU2BV++qkLlmHYcVv7/hHnbtguSWt9Aoc=
github.com/miekg/dns v1.1.3/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
@ -376,6 +378,8 @@ github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/sacloud/libsacloud v1.26.1/go.mod h1:79ZwATmHLIFZIMd7sxA3LwzVy/B77uj3LDoToVTxDoQ=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/segmentio/encoding v0.1.10 h1:0b8dva47cSuNQR5ZcU3d0pfi9EnPpSK6q7y5ZGEW36Q=
github.com/segmentio/encoding v0.1.10/go.mod h1:RWhr02uzMB9gQC1x+MfYxedtmBibb9cZ6Vv9VxRSSbw=
github.com/segmentio/kafka-go v0.3.5 h1:2JVT1inno7LxEASWj+HflHh5sWGfM0gkRiLAxkXhGG4=
github.com/segmentio/kafka-go v0.3.5/go.mod h1:OT5KXBPbaJJTcvokhWR2KFmm0niEx3mnccTwjmLvSi4=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=

156
kafka.go
View File

@ -4,7 +4,6 @@ package segmentio
import (
"context"
"errors"
"io"
"sync"
"github.com/google/uuid"
@ -30,19 +29,18 @@ type kBroker struct {
}
type subscriber struct {
reader *kafka.Reader
t string
opts broker.SubscribeOptions
group *kafka.ConsumerGroup
t string
opts broker.SubscribeOptions
}
type publication struct {
t string
err error
reader *kafka.Reader
// deprecate broker.Message and use kafka.Message directly?
t string
err error
m *broker.Message
ctx context.Context
km kafka.Message
gen *kafka.Generation
mp map[string]map[int]int64 // for commit offsets
}
func init() {
@ -58,7 +56,7 @@ func (p *publication) Message() *broker.Message {
}
func (p *publication) Ack() error {
return p.reader.CommitMessages(p.ctx, p.km)
return p.gen.CommitOffsets(p.mp)
}
func (p *publication) Error() error {
@ -74,7 +72,7 @@ func (s *subscriber) Topic() string {
}
func (s *subscriber) Unsubscribe() error {
return s.reader.Close()
return s.group.Close()
}
func (k *kBroker) Address() string {
@ -197,73 +195,99 @@ func (k *kBroker) Subscribe(topic string, handler broker.Handler, opts ...broker
o(&opt)
}
k.Lock()
reader, ok := k.readers[topic]
if !ok {
cfg := k.readerConfig
cfg.Topic = topic
cfg.GroupID = opt.Queue
if err := cfg.Validate(); err != nil {
k.Unlock()
return nil, err
}
reader = kafka.NewReader(cfg)
k.readers[topic] = reader
gcfg := kafka.ConsumerGroupConfig{
ID: opt.Queue,
WatchPartitionChanges: true,
Brokers: k.readerConfig.Brokers,
Topics: []string{topic},
}
k.Unlock()
if err := gcfg.Validate(); err != nil {
return nil, err
}
group, err := kafka.NewConsumerGroup(gcfg)
if err != nil {
return nil, err
}
sub := &subscriber{group: group, opts: opt, t: topic}
go func() {
for {
select {
case <-k.opts.Context.Done():
gen, err := group.Next(k.opts.Context)
if err == kafka.ErrGroupClosed {
return
default:
msg, err := reader.FetchMessage(k.opts.Context)
if err != nil && err != io.EOF {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("[kafka] subscribe error: %v", err)
return
}
} else if err == io.EOF {
// reader closed
return
} else if err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("[kafka] subscribe error: %v", err)
}
return
}
assignments := gen.Assignments[topic]
for _, assignment := range assignments {
partition, offset := assignment.ID, assignment.Offset
p := &publication{t: topic, ctx: k.opts.Context, gen: gen}
p.mp = map[string]map[int]int64{p.t: {partition: offset}}
var m broker.Message
p := &publication{m: &m, t: msg.Topic, km: msg, ctx: k.opts.Context, reader: reader}
eh := k.opts.ErrorHandler
if err := k.opts.Codec.Unmarshal(msg.Value, &m); err != nil {
p.err = err
p.m.Body = msg.Value
if eh != nil {
eh(p)
} else {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("[kafka]: failed to unmarshal: %v", err)
gen.Start(func(ctx context.Context) {
// create reader for this partition.
reader := kafka.NewReader(kafka.ReaderConfig{
//GroupID: gen.GroupID,
Brokers: gcfg.Brokers,
Topic: topic,
Partition: partition,
})
defer reader.Close()
// seek to the last committed offset for this partition.
reader.SetOffset(offset)
for {
msg, err := reader.ReadMessage(ctx)
switch err {
case kafka.ErrGenerationEnded:
// generation has ended
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debug("[kafka] subscription closed")
}
return
case nil:
var m broker.Message
eh := k.opts.ErrorHandler
p.m = &m
if err := k.opts.Codec.Unmarshal(msg.Value, &m); err != nil {
p.err = err
p.m.Body = msg.Value
if eh != nil {
eh(p)
} else {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("[kafka]: failed to unmarshal: %v", err)
}
}
continue
}
err = handler(p)
if err == nil && opt.AutoAck {
if err = p.Ack(); err != nil {
logger.Errorf("[kafka]: unable to commit msg: %v", err)
}
} else if err != nil {
p.err = err
if eh != nil {
eh(p)
} else {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("[kafka]: subscriber error: %v", err)
}
}
}
}
}
continue
}
err = handler(p)
if err == nil && opt.AutoAck {
if err = reader.CommitMessages(k.opts.Context, msg); err != nil {
logger.Errorf("[kafka]: unable to commit msg: %v", err)
}
} else if err != nil {
p.err = err
if eh != nil {
eh(p)
} else {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("[kafka]: subscriber error: %v", err)
}
}
}
})
}
}
}()
return &subscriber{reader: reader, opts: opt, t: topic}, nil
return sub, nil
}
func (k *kBroker) String() string {

View File

@ -2,9 +2,11 @@ package segmentio
import (
"os"
"strings"
"testing"
"github.com/micro/go-micro/v2/broker"
segjson "github.com/micro/go-plugins/codec/segmentio/v2"
)
var (
@ -18,7 +20,15 @@ func TestPublish(t *testing.T) {
if tr := os.Getenv("TRAVIS"); len(tr) > 0 {
t.Skip()
}
b := NewBroker(broker.Addrs("127.0.0.1:9092"))
var addrs []string
if addr := os.Getenv("BROKER_ADDRS"); len(addr) == 0 {
addrs = []string{"127.0.0.1:9092"}
} else {
addrs = strings.Split(addr, ",")
}
b := NewBroker(broker.Codec(segjson.Marshaler{}), broker.Addrs(addrs...))
if err := b.Connect(); err != nil {
t.Fatal(err)
}
@ -28,7 +38,9 @@ func TestPublish(t *testing.T) {
}
}()
done := make(chan bool, 1)
fn := func(msg broker.Event) error {
done <- true
return msg.Ack()
}
@ -45,14 +57,22 @@ func TestPublish(t *testing.T) {
if err := b.Publish("test_topic", bm); err != nil {
t.Fatal(err)
}
select {}
<-done
}
func BenchmarkSegmentioPublish(b *testing.B) {
if tr := os.Getenv("TRAVIS"); len(tr) > 0 {
b.Skip()
}
brk := NewBroker(broker.Addrs("127.0.0.1:9092"))
var addrs []string
if addr := os.Getenv("BROKER_ADDRS"); len(addr) == 0 {
addrs = []string{"127.0.0.1:9092"}
} else {
addrs = strings.Split(addr, ",")
}
brk := NewBroker(broker.Codec(segjson.Marshaler{}), broker.Addrs(addrs...))
if err := brk.Connect(); err != nil {
b.Fatal(err)
}
@ -76,7 +96,15 @@ func BenchmarkSegmentioSubscribe(b *testing.B) {
if tr := os.Getenv("TRAVIS"); len(tr) > 0 {
b.Skip()
}
brk := NewBroker(broker.Addrs("127.0.0.1:9092"))
var addrs []string
if addr := os.Getenv("BROKER_ADDRS"); len(addr) == 0 {
addrs = []string{"127.0.0.1:9092"}
} else {
addrs = strings.Split(addr, ",")
}
brk := NewBroker(broker.Codec(segjson.Marshaler{}), broker.Addrs(addrs...))
if err := brk.Connect(); err != nil {
b.Fatal(err)
}
@ -93,12 +121,13 @@ func BenchmarkSegmentioSubscribe(b *testing.B) {
b.ResetTimer()
}
cnt++
if cnt == 10000 {
if cnt == b.N {
close(done)
}
return msg.Ack()
}
for i := 0; i < 10000; i++ {
for i := 0; i < b.N; i++ {
if err := brk.Publish("test_topic", bm); err != nil {
b.Fatal(err)
}
@ -113,7 +142,6 @@ func BenchmarkSegmentioSubscribe(b *testing.B) {
b.Fatal(err)
}
}()
<-done
}