From a03e2e267a941190ebc2658da8540802e9477fee Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Wed, 15 Apr 2020 04:07:17 +0300 Subject: [PATCH] segmentio improvements for broker and codec (#531) * segmentio broker and codec improvements * broker/segmentio: return error on subscribe Signed-off-by: Vasiliy Tolstov --- broker_test.go | 60 +++++++++ go.mod | 3 + go.sum | 23 ++++ sarama_test.go | 203 +++++++++++++++++++++++++++++ kafka.go => segmentio.go | 33 +++-- kafka_test.go => segmentio_test.go | 184 ++++++++++++-------------- 6 files changed, 387 insertions(+), 119 deletions(-) create mode 100644 broker_test.go create mode 100644 sarama_test.go rename kafka.go => segmentio.go (93%) rename kafka_test.go => segmentio_test.go (59%) diff --git a/broker_test.go b/broker_test.go new file mode 100644 index 0000000..8dfa80f --- /dev/null +++ b/broker_test.go @@ -0,0 +1,60 @@ +package segmentio_test + +import ( + "os" + "strings" + "testing" + + "github.com/micro/go-micro/v2/broker" + segmentio "github.com/micro/go-plugins/broker/segmentio/v2" +) + +var ( + bm = &broker.Message{ + Header: map[string]string{"hkey": "hval"}, + Body: []byte("body"), + } +) + +func TestPubSub(t *testing.T) { + if tr := os.Getenv("TRAVIS"); len(tr) > 0 { + t.Skip() + } + + var addrs []string + if addr := os.Getenv("BROKER_ADDRS"); len(addr) == 0 { + addrs = []string{"127.0.0.1:9092"} + } else { + addrs = strings.Split(addr, ",") + } + + b := segmentio.NewBroker(broker.Addrs(addrs...)) + if err := b.Connect(); err != nil { + t.Fatal(err) + } + defer func() { + if err := b.Disconnect(); err != nil { + t.Fatal(err) + } + }() + + done := make(chan bool, 1) + fn := func(msg broker.Event) error { + done <- true + return msg.Ack() + } + + sub, err := b.Subscribe("test_topic", fn, broker.Queue("test")) + if err != nil { + t.Fatal(err) + } + defer func() { + if err := sub.Unsubscribe(); err != nil { + t.Fatal(err) + } + }() + if err := b.Publish("test_topic", bm); err != nil { + t.Fatal(err) + } + <-done +} diff --git a/go.mod b/go.mod index d84b4a1..9d1b844 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,11 @@ require ( github.com/frankban/quicktest v1.4.1 // indirect github.com/google/uuid v1.1.1 github.com/micro/go-micro/v2 v2.3.0 + github.com/micro/go-plugins/broker/kafka/v2 v2.3.0 github.com/micro/go-plugins/codec/segmentio/v2 v2.3.0 github.com/pierrec/lz4 v2.2.6+incompatible // indirect github.com/segmentio/kafka-go v0.3.5 golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect ) + +replace github.com/micro/go-plugins/codec/segmentio/v2 => ../../codec/segmentio diff --git a/go.sum b/go.sum index 375c2af..c079539 100644 --- a/go.sum +++ b/go.sum @@ -35,6 +35,8 @@ github.com/Microsoft/go-winio v0.4.15-0.20190919025122-fc70bd9a86b5/go.mod h1:tT github.com/Microsoft/hcsshim v0.8.7-0.20191101173118-65519b62243c/go.mod h1:7xhjOwRV2+0HXGmM0jxaEu+ZiXJFoVZOTfL/dmqbrD8= github.com/OpenDNS/vegadns2client v0.0.0-20180418235048-a3fa4a771d87/go.mod h1:iGLljf5n9GjT6kc0HBvyI1nOKnGQbNB66VzSNbK5iks= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= +github.com/Shopify/sarama v1.25.0 h1:ch1ywjRLjfJtU+EaiJ+l0rWffQ6TRpyYmW4DX7Cb2SU= +github.com/Shopify/sarama v1.25.0/go.mod h1:y/CFFTO9eaMTNriwu/Q+W4eioLqiDMGkA1W+gmdfj8w= github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/akamai/AkamaiOPEN-edgegrid-golang v0.9.0/go.mod h1:zpDJeKyp9ScW4NNrbdr+Eyxvry3ilGPewKoXw3XGN1k= @@ -123,6 +125,7 @@ github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc= github.com/forestgiant/sliceutil v0.0.0-20160425183142-94783f95db6c h1:pBgVXWDXju1m8W4lnEeIqTHPOzhTUO81a7yknM/xQR4= github.com/forestgiant/sliceutil v0.0.0-20160425183142-94783f95db6c/go.mod h1:pFdJbAhRf7rh6YYMUdIQGyzne6zYL1tCUW8QV2B3UfY= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/frankban/quicktest v1.4.1 h1:Wv2VwvNn73pAdFIVUQRXYDFp31lXKbqblIXo/Q5GPSg= github.com/frankban/quicktest v1.4.1/go.mod h1:36zfPVQyHxymz4cH7wlDmVwDrJuljRB60qkgn7rorfQ= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= @@ -208,6 +211,8 @@ github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542/go.mod h1:Ow0tF8D4Kplbc8s8sSb3V2oUCygFHVp8gC3Dn6U4MNI= github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-multierror v0.0.0-20161216184304-ed905158d874/go.mod h1:JMRHfdO9jKNzS/+BTlxCjKNQHg/jZAft8U7LloJvN7I= +github.com/hashicorp/go-uuid v1.0.1 h1:fv1ep09latC32wFoVwnqcnKJGnMSdBanPczbHAYm1BE= +github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.3/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= @@ -219,6 +224,8 @@ github.com/iij/doapi v0.0.0-20190504054126-0bbf12d6d7df/go.mod h1:QMZY7/J/KSQEhK github.com/imdario/mergo v0.3.8 h1:CGgOkSJeqMRmt0D9XLWExdT4m4F1vd3FV3VPt+0VxkQ= github.com/imdario/mergo v0.3.8/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo= +github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03 h1:FUwcHNlEqkqLjLBdCp5PRlCFijNjvcYANOZXzCfXwCM= +github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo= @@ -236,6 +243,8 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.9.7 h1:hYW1gP94JUmAhBtJ+LNz5My+gBobDxPR1iVuKug26aA= +github.com/klauspost/compress v1.9.7/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/cpuid v1.2.0 h1:NMpwD2G9JSFOE1/TJjGSo5zG7Yb2bTe7eq1jH+irmeE= github.com/klauspost/cpuid v1.2.0/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/kolo/xmlrpc v0.0.0-20190717152603-07c4ee3fd181/go.mod h1:o03bZfuBwAXHetKXuInt4S7omeXUu62/A845kiycsSQ= @@ -274,6 +283,10 @@ github.com/micro/cli/v2 v2.1.2 h1:43J1lChg/rZCC1rvdqZNFSQDrGT7qfMrtp6/ztpIkEM= github.com/micro/cli/v2 v2.1.2/go.mod h1:EguNh6DAoWKm9nmk+k/Rg0H3lQnDxqzu5x5srOtGtYg= github.com/micro/go-micro/v2 v2.3.0 h1:3seJJ7/pbhleZNe6gGHFJjOsAqvYGcy2ivc3P5PYnVQ= github.com/micro/go-micro/v2 v2.3.0/go.mod h1:GR69d1AXMg/WjMNf/7K1VO6hCBJDIpqCqnVYNTV6M5w= +github.com/micro/go-plugins v1.5.1 h1:swcFD7ynCTUo98APqIEIbPu2XMd6yVGTnI8PqdnCwOQ= +github.com/micro/go-plugins/broker/kafka v0.0.0-20200119172437-4fe21aa238fd h1:heuPI2eEEovTaHpq1OFDRAY/3QKvqFOyB/A++Cq0/ZE= +github.com/micro/go-plugins/broker/kafka/v2 v2.3.0 h1:VuMxNMOeIPgklgNXz9kZ9h+Xsg4W+iSngR6Sk3ep9Xc= +github.com/micro/go-plugins/broker/kafka/v2 v2.3.0/go.mod h1:PXFX3ioNEVO3PT21BC1j06aOtzfUnX0qj9jhHEwLzmM= github.com/micro/go-plugins/codec/segmentio/v2 v2.3.0 h1:VKWhtEHd1x0PYuU1YoGeBHgAs06aiThleV2v0LruK+g= github.com/micro/go-plugins/codec/segmentio/v2 v2.3.0/go.mod h1:sblO7/JViOU+cTq4VvqzzWVbwEZvX2hoBgnIZ/cf+HI= github.com/micro/mdns v0.3.0 h1:bYycYe+98AXR3s8Nq5qvt6C573uFTDPIYzJemWON0QE= @@ -450,6 +463,7 @@ golang.org/x/crypto v0.0.0-20190130090550-b01c7a725664/go.mod h1:6SG95UA2DQfeDnf golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190219172222-a4c6cb3142f2/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= golang.org/x/crypto v0.0.0-20190418165655-df01cb2cc480/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -624,6 +638,15 @@ gopkg.in/go-playground/validator.v9 v9.31.0/go.mod h1:+c9/zcJMFNgbLvly1L1V+PpxWd gopkg.in/h2non/gock.v1 v1.0.15/go.mod h1:sX4zAkdYX1TRGJ2JY156cFspQn4yRWn6p9EMdODlynE= gopkg.in/ini.v1 v1.42.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/ini.v1 v1.44.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/jcmturner/aescts.v1 v1.0.1 h1:cVVZBK2b1zY26haWB4vbBiZrfFQnfbTVrE3xZq6hrEw= +gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo= +gopkg.in/jcmturner/dnsutils.v1 v1.0.1 h1:cIuC1OLRGZrld+16ZJvvZxVJeKPsvd5eUIvxfoN5hSM= +gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q= +gopkg.in/jcmturner/goidentity.v3 v3.0.0/go.mod h1:oG2kH0IvSYNIu80dVAyu/yoefjq1mNfM5bm88whjWx4= +gopkg.in/jcmturner/gokrb5.v7 v7.2.3 h1:hHMV/yKPwMnJhPuPx7pH2Uw/3Qyf+thJYlisUc44010= +gopkg.in/jcmturner/gokrb5.v7 v7.2.3/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM= +gopkg.in/jcmturner/rpc.v1 v1.1.0 h1:QHIUxTX1ISuAv9dD2wJ9HWQVuWDX/Zc0PfeC2tjc4rU= +gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8= gopkg.in/ns1/ns1-go.v2 v2.0.0-20190730140822-b51389932cbc/go.mod h1:VV+3haRsgDiVLxyifmMBrBIuCWFBPYKbRssXB9z67Hw= gopkg.in/resty.v1 v1.9.1/go.mod h1:vo52Hzryw9PnPHcJfPsBiFW62XhNx5OczbV9y+IMpgc= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= diff --git a/sarama_test.go b/sarama_test.go new file mode 100644 index 0000000..61070c0 --- /dev/null +++ b/sarama_test.go @@ -0,0 +1,203 @@ +package segmentio_test + +import ( + "os" + "strings" + "sync/atomic" + "testing" + + "github.com/micro/go-micro/v2/broker" + sarama "github.com/micro/go-plugins/broker/kafka/v2" + segjson "github.com/micro/go-plugins/codec/segmentio/v2" +) + +func BenchmarkSaramaCodecJsonPublish(b *testing.B) { + b.Skip() + if tr := os.Getenv("TRAVIS"); len(tr) > 0 { + b.Skip() + } + + var addrs []string + if addr := os.Getenv("BROKER_ADDRS"); len(addr) == 0 { + addrs = []string{"127.0.0.1:9092"} + } else { + addrs = strings.Split(addr, ",") + } + + brk := sarama.NewBroker(broker.Addrs(addrs...)) + if err := brk.Connect(); err != nil { + b.Fatal(err) + } + defer func() { + if err := brk.Disconnect(); err != nil { + b.Fatal(err) + } + }() + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + if err := brk.Publish("test_topic", bm); err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkSaramaCodecSegmentioPublish(b *testing.B) { + if tr := os.Getenv("TRAVIS"); len(tr) > 0 { + b.Skip() + } + + var addrs []string + if addr := os.Getenv("BROKER_ADDRS"); len(addr) == 0 { + addrs = []string{"127.0.0.1:9092"} + } else { + addrs = strings.Split(addr, ",") + } + + brk := sarama.NewBroker(broker.Codec(segjson.Marshaler{}), broker.Addrs(addrs...)) + if err := brk.Connect(); err != nil { + b.Fatal(err) + } + defer func() { + if err := brk.Disconnect(); err != nil { + b.Fatal(err) + } + }() + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + if err := brk.Publish("test_topic", bm); err != nil { + b.Fatal(err) + } + } + +} + +func BenchmarkSaramaCodecJsonSubscribe(b *testing.B) { + b.Skip() + if tr := os.Getenv("TRAVIS"); len(tr) > 0 { + b.Skip() + } + + var addrs []string + if addr := os.Getenv("BROKER_ADDRS"); len(addr) == 0 { + addrs = []string{"127.0.0.1:9092"} + } else { + addrs = strings.Split(addr, ",") + } + + brk := sarama.NewBroker(broker.Addrs(addrs...)) + if err := brk.Connect(); err != nil { + b.Fatal(err) + } + defer func() { + if err := brk.Disconnect(); err != nil { + b.Fatal(err) + } + }() + + cnt := 0 + var done atomic.Value + done.Store(false) + exit := make(chan struct{}) + fn := func(msg broker.Event) error { + if cnt == 0 { + b.ResetTimer() + } + cnt++ + if cnt >= b.N { + if v, ok := done.Load().(bool); ok && !v { + done.Store(true) + close(exit) + } + } + return msg.Ack() + } + + go func() { + for i := 0; i < b.N; i++ { + if v := done.Load().(bool); v { + return + } + if err := brk.Publish("test_topic", bm); err != nil { + b.Fatal(err) + } + } + }() + + sub, err := brk.Subscribe("test_topic", fn, broker.Queue("test")) + if err != nil { + b.Fatal(err) + } + defer func() { + if err := sub.Unsubscribe(); err != nil { + b.Fatal(err) + } + }() + <-exit +} + +func BenchmarkSaramaCodecSegmentioSubscribe(b *testing.B) { + if tr := os.Getenv("TRAVIS"); len(tr) > 0 { + b.Skip() + } + + var addrs []string + if addr := os.Getenv("BROKER_ADDRS"); len(addr) == 0 { + addrs = []string{"127.0.0.1:9092"} + } else { + addrs = strings.Split(addr, ",") + } + + brk := sarama.NewBroker(broker.Codec(segjson.Marshaler{}), broker.Addrs(addrs...)) + if err := brk.Connect(); err != nil { + b.Fatal(err) + } + defer func() { + if err := brk.Disconnect(); err != nil { + b.Fatal(err) + } + }() + + cnt := 0 + var done atomic.Value + done.Store(false) + exit := make(chan struct{}) + fn := func(msg broker.Event) error { + if cnt == 0 { + b.ResetTimer() + } + cnt++ + if cnt >= b.N { + if v, ok := done.Load().(bool); ok && !v { + done.Store(true) + close(exit) + } + } + return msg.Ack() + } + + go func() { + for i := 0; i < b.N; i++ { + if v := done.Load().(bool); v { + return + } + if err := brk.Publish("test_topic", bm); err != nil { + b.Fatal(err) + } + } + }() + + sub, err := brk.Subscribe("test_topic", fn, broker.Queue("test")) + if err != nil { + b.Fatal(err) + } + defer func() { + if err := sub.Unsubscribe(); err != nil { + b.Fatal(err) + } + }() + <-exit +} diff --git a/kafka.go b/segmentio.go similarity index 93% rename from kafka.go rename to segmentio.go index d579317..b039e16 100644 --- a/kafka.go +++ b/segmentio.go @@ -21,7 +21,6 @@ type kBroker struct { writerConfig kafka.WriterConfig writers map[string]*kafka.Writer - readers map[string]*kafka.Reader connected bool sync.RWMutex @@ -128,11 +127,6 @@ func (k *kBroker) Disconnect() error { k.Lock() defer k.Unlock() - for _, reader := range k.readers { - if err := reader.Close(); err != nil { - return err - } - } for _, writer := range k.writers { if err := writer.Close(); err != nil { return err @@ -214,17 +208,20 @@ func (k *kBroker) Subscribe(topic string, handler broker.Handler, opts ...broker sub := &subscriber{group: group, opts: opt, t: topic} + chErr := make(chan error) + go func() { for { gen, err := group.Next(k.opts.Context) if err == kafka.ErrGroupClosed { + chErr <- nil return } else if err != nil { - if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Errorf("[kafka] subscribe error: %v", err) - } + chErr <- err return } + chErr <- nil + assignments := gen.Assignments[topic] for _, assignment := range assignments { partition, offset := assignment.ID, assignment.Offset @@ -233,12 +230,10 @@ func (k *kBroker) Subscribe(topic string, handler broker.Handler, opts ...broker gen.Start(func(ctx context.Context) { // create reader for this partition. - reader := kafka.NewReader(kafka.ReaderConfig{ - //GroupID: gen.GroupID, - Brokers: gcfg.Brokers, - Topic: topic, - Partition: partition, - }) + cfg := k.readerConfig + cfg.Topic = topic + cfg.Partition = partition + reader := kafka.NewReader(cfg) defer reader.Close() // seek to the last committed offset for this partition. reader.SetOffset(offset) @@ -289,6 +284,11 @@ func (k *kBroker) Subscribe(topic string, handler broker.Handler, opts ...broker } }() + err = <-chErr + if err != nil { + return nil, err + } + return sub, nil } @@ -327,7 +327,7 @@ func NewBroker(opts ...broker.Option) broker.Broker { } readerConfig.WatchPartitionChanges = true - writerConfig := kafka.WriterConfig{} + writerConfig := kafka.WriterConfig{CompressionCodec: nil} if cfg, ok := options.Context.Value(writerConfigKey{}).(kafka.WriterConfig); ok { writerConfig = cfg } @@ -340,7 +340,6 @@ func NewBroker(opts ...broker.Option) broker.Broker { readerConfig: readerConfig, writerConfig: writerConfig, writers: make(map[string]*kafka.Writer), - readers: make(map[string]*kafka.Reader), addrs: cAddrs, opts: options, } diff --git a/kafka_test.go b/segmentio_test.go similarity index 59% rename from kafka_test.go rename to segmentio_test.go index d78dd61..b46d7e3 100644 --- a/kafka_test.go +++ b/segmentio_test.go @@ -1,66 +1,18 @@ -package segmentio +package segmentio_test import ( "os" "strings" + "sync/atomic" "testing" "github.com/micro/go-micro/v2/broker" + segmentio "github.com/micro/go-plugins/broker/segmentio/v2" segjson "github.com/micro/go-plugins/codec/segmentio/v2" ) -var ( - bm = &broker.Message{ - Header: map[string]string{"hkey": "hval"}, - Body: []byte("body"), - } -) - -func TestPublish(t *testing.T) { - if tr := os.Getenv("TRAVIS"); len(tr) > 0 { - t.Skip() - } - - var addrs []string - if addr := os.Getenv("BROKER_ADDRS"); len(addr) == 0 { - addrs = []string{"127.0.0.1:9092"} - } else { - addrs = strings.Split(addr, ",") - } - - b := NewBroker(broker.Codec(segjson.Marshaler{}), broker.Addrs(addrs...)) - if err := b.Connect(); err != nil { - t.Fatal(err) - } - defer func() { - if err := b.Disconnect(); err != nil { - t.Fatal(err) - } - }() - - done := make(chan bool, 1) - fn := func(msg broker.Event) error { - done <- true - return msg.Ack() - } - - sub, err := b.Subscribe("test_topic", fn, broker.Queue("test")) - if err != nil { - t.Fatal(err) - } - defer func() { - if err := sub.Unsubscribe(); err != nil { - t.Fatal(err) - } - }() - - if err := b.Publish("test_topic", bm); err != nil { - t.Fatal(err) - } - <-done -} - -func BenchmarkSegmentioPublish(b *testing.B) { +func BenchmarkSegmentioCodecJsonPublish(b *testing.B) { + b.Skip() if tr := os.Getenv("TRAVIS"); len(tr) > 0 { b.Skip() } @@ -72,7 +24,7 @@ func BenchmarkSegmentioPublish(b *testing.B) { addrs = strings.Split(addr, ",") } - brk := NewBroker(broker.Codec(segjson.Marshaler{}), broker.Addrs(addrs...)) + brk := segmentio.NewBroker(broker.Addrs(addrs...)) if err := brk.Connect(); err != nil { b.Fatal(err) } @@ -83,16 +35,13 @@ func BenchmarkSegmentioPublish(b *testing.B) { }() b.ResetTimer() - for i := 0; i < b.N; i++ { if err := brk.Publish("test_topic", bm); err != nil { b.Fatal(err) } } - } - -func BenchmarkSegmentioSubscribe(b *testing.B) { +func BenchmarkSegmentioCodecSegmentioPublish(b *testing.B) { if tr := os.Getenv("TRAVIS"); len(tr) > 0 { b.Skip() } @@ -104,7 +53,39 @@ func BenchmarkSegmentioSubscribe(b *testing.B) { addrs = strings.Split(addr, ",") } - brk := NewBroker(broker.Codec(segjson.Marshaler{}), broker.Addrs(addrs...)) + brk := segmentio.NewBroker(broker.Codec(segjson.Marshaler{}), broker.Addrs(addrs...)) + if err := brk.Connect(); err != nil { + b.Fatal(err) + } + defer func() { + if err := brk.Disconnect(); err != nil { + b.Fatal(err) + } + }() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + if err := brk.Publish("test_topic", bm); err != nil { + b.Fatal(err) + } + } + +} + +func BenchmarkSegmentioCodecJsonSubscribe(b *testing.B) { + b.Skip() + if tr := os.Getenv("TRAVIS"); len(tr) > 0 { + b.Skip() + } + + var addrs []string + if addr := os.Getenv("BROKER_ADDRS"); len(addr) == 0 { + addrs = []string{"127.0.0.1:9092"} + } else { + addrs = strings.Split(addr, ",") + } + + brk := segmentio.NewBroker(broker.Addrs(addrs...)) if err := brk.Connect(); err != nil { b.Fatal(err) } @@ -115,23 +96,33 @@ func BenchmarkSegmentioSubscribe(b *testing.B) { }() cnt := 0 - done := make(chan struct{}) + var done atomic.Value + done.Store(false) + exit := make(chan struct{}) fn := func(msg broker.Event) error { if cnt == 0 { b.ResetTimer() } cnt++ if cnt == b.N { - close(done) + if v := done.Load().(bool); !v { + done.Store(true) + close(exit) + } } return msg.Ack() } - for i := 0; i < b.N; i++ { - if err := brk.Publish("test_topic", bm); err != nil { - b.Fatal(err) + go func() { + for i := 0; i < b.N; i++ { + if v, ok := done.Load().(bool); ok && v { + return + } + if err := brk.Publish("test_topic", bm); err != nil { + b.Fatal(err) + } } - } + }() sub, err := brk.Subscribe("test_topic", fn, broker.Queue("test")) if err != nil { @@ -142,40 +133,22 @@ func BenchmarkSegmentioSubscribe(b *testing.B) { b.Fatal(err) } }() - <-done + <-exit } -/* -func BenchmarkSaramaPublish(b *testing.B) { +func BenchmarkSegmentioCodecSegmentioSubscribe(b *testing.B) { if tr := os.Getenv("TRAVIS"); len(tr) > 0 { b.Skip() } - brk := sarama.NewBroker(broker.Addrs("127.0.0.1:9092")) - if err := brk.Connect(); err != nil { - b.Fatal(err) - } - defer func() { - if err := brk.Disconnect(); err != nil { - b.Fatal(err) - } - }() - b.ResetTimer() - - for i := 0; i < b.N; i++ { - if err := brk.Publish("test_topic", bm); err != nil { - b.Fatal(err) - } + var addrs []string + if addr := os.Getenv("BROKER_ADDRS"); len(addr) == 0 { + addrs = []string{"127.0.0.1:9092"} + } else { + addrs = strings.Split(addr, ",") } -} - - -func BenchmarkSaramaSubscribe(b *testing.B) { - if tr := os.Getenv("TRAVIS"); len(tr) > 0 { - b.Skip() - } - brk := sarama.NewBroker(broker.Addrs("127.0.0.1:9092")) + brk := segmentio.NewBroker(broker.Codec(segjson.Marshaler{}), broker.Addrs(addrs...)) if err := brk.Connect(); err != nil { b.Fatal(err) } @@ -186,24 +159,33 @@ func BenchmarkSaramaSubscribe(b *testing.B) { }() cnt := 0 - done := make(chan struct{}) + var done atomic.Value + done.Store(false) + exit := make(chan struct{}) fn := func(msg broker.Event) error { if cnt == 0 { b.ResetTimer() } - cnt++ - if cnt == 10000 { - close(done) + if cnt == b.N { + if v, ok := done.Load().(bool); ok && !v { + done.Store(true) + close(exit) + } } return msg.Ack() } - for i := 0; i < 10000; i++ { - if err := brk.Publish("test_topic", bm); err != nil { - b.Fatal(err) + go func() { + for i := 0; i < b.N; i++ { + if v := done.Load().(bool); v { + return + } + if err := brk.Publish("test_topic", bm); err != nil { + b.Fatal(err) + } } - } + }() sub, err := brk.Subscribe("test_topic", fn, broker.Queue("test")) if err != nil { @@ -214,7 +196,5 @@ func BenchmarkSaramaSubscribe(b *testing.B) { b.Fatal(err) } }() - - <-done + <-exit } -*/