segmentio improvements for broker and codec (#531)

* segmentio broker and codec improvements
* broker/segmentio: return error on subscribe

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2020-04-15 04:07:17 +03:00
parent 9353991c21
commit a03e2e267a
6 changed files with 387 additions and 119 deletions

60
broker_test.go Normal file
View File

@ -0,0 +1,60 @@
package segmentio_test
import (
"os"
"strings"
"testing"
"github.com/micro/go-micro/v2/broker"
segmentio "github.com/micro/go-plugins/broker/segmentio/v2"
)
var (
bm = &broker.Message{
Header: map[string]string{"hkey": "hval"},
Body: []byte("body"),
}
)
func TestPubSub(t *testing.T) {
if tr := os.Getenv("TRAVIS"); len(tr) > 0 {
t.Skip()
}
var addrs []string
if addr := os.Getenv("BROKER_ADDRS"); len(addr) == 0 {
addrs = []string{"127.0.0.1:9092"}
} else {
addrs = strings.Split(addr, ",")
}
b := segmentio.NewBroker(broker.Addrs(addrs...))
if err := b.Connect(); err != nil {
t.Fatal(err)
}
defer func() {
if err := b.Disconnect(); err != nil {
t.Fatal(err)
}
}()
done := make(chan bool, 1)
fn := func(msg broker.Event) error {
done <- true
return msg.Ack()
}
sub, err := b.Subscribe("test_topic", fn, broker.Queue("test"))
if err != nil {
t.Fatal(err)
}
defer func() {
if err := sub.Unsubscribe(); err != nil {
t.Fatal(err)
}
}()
if err := b.Publish("test_topic", bm); err != nil {
t.Fatal(err)
}
<-done
}

3
go.mod
View File

@ -6,8 +6,11 @@ require (
github.com/frankban/quicktest v1.4.1 // indirect github.com/frankban/quicktest v1.4.1 // indirect
github.com/google/uuid v1.1.1 github.com/google/uuid v1.1.1
github.com/micro/go-micro/v2 v2.3.0 github.com/micro/go-micro/v2 v2.3.0
github.com/micro/go-plugins/broker/kafka/v2 v2.3.0
github.com/micro/go-plugins/codec/segmentio/v2 v2.3.0 github.com/micro/go-plugins/codec/segmentio/v2 v2.3.0
github.com/pierrec/lz4 v2.2.6+incompatible // indirect github.com/pierrec/lz4 v2.2.6+incompatible // indirect
github.com/segmentio/kafka-go v0.3.5 github.com/segmentio/kafka-go v0.3.5
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect
) )
replace github.com/micro/go-plugins/codec/segmentio/v2 => ../../codec/segmentio

23
go.sum
View File

@ -35,6 +35,8 @@ github.com/Microsoft/go-winio v0.4.15-0.20190919025122-fc70bd9a86b5/go.mod h1:tT
github.com/Microsoft/hcsshim v0.8.7-0.20191101173118-65519b62243c/go.mod h1:7xhjOwRV2+0HXGmM0jxaEu+ZiXJFoVZOTfL/dmqbrD8= github.com/Microsoft/hcsshim v0.8.7-0.20191101173118-65519b62243c/go.mod h1:7xhjOwRV2+0HXGmM0jxaEu+ZiXJFoVZOTfL/dmqbrD8=
github.com/OpenDNS/vegadns2client v0.0.0-20180418235048-a3fa4a771d87/go.mod h1:iGLljf5n9GjT6kc0HBvyI1nOKnGQbNB66VzSNbK5iks= github.com/OpenDNS/vegadns2client v0.0.0-20180418235048-a3fa4a771d87/go.mod h1:iGLljf5n9GjT6kc0HBvyI1nOKnGQbNB66VzSNbK5iks=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
github.com/Shopify/sarama v1.25.0 h1:ch1ywjRLjfJtU+EaiJ+l0rWffQ6TRpyYmW4DX7Cb2SU=
github.com/Shopify/sarama v1.25.0/go.mod h1:y/CFFTO9eaMTNriwu/Q+W4eioLqiDMGkA1W+gmdfj8w=
github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/akamai/AkamaiOPEN-edgegrid-golang v0.9.0/go.mod h1:zpDJeKyp9ScW4NNrbdr+Eyxvry3ilGPewKoXw3XGN1k= github.com/akamai/AkamaiOPEN-edgegrid-golang v0.9.0/go.mod h1:zpDJeKyp9ScW4NNrbdr+Eyxvry3ilGPewKoXw3XGN1k=
@ -123,6 +125,7 @@ github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc= github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc=
github.com/forestgiant/sliceutil v0.0.0-20160425183142-94783f95db6c h1:pBgVXWDXju1m8W4lnEeIqTHPOzhTUO81a7yknM/xQR4= github.com/forestgiant/sliceutil v0.0.0-20160425183142-94783f95db6c h1:pBgVXWDXju1m8W4lnEeIqTHPOzhTUO81a7yknM/xQR4=
github.com/forestgiant/sliceutil v0.0.0-20160425183142-94783f95db6c/go.mod h1:pFdJbAhRf7rh6YYMUdIQGyzne6zYL1tCUW8QV2B3UfY= github.com/forestgiant/sliceutil v0.0.0-20160425183142-94783f95db6c/go.mod h1:pFdJbAhRf7rh6YYMUdIQGyzne6zYL1tCUW8QV2B3UfY=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/frankban/quicktest v1.4.1 h1:Wv2VwvNn73pAdFIVUQRXYDFp31lXKbqblIXo/Q5GPSg= github.com/frankban/quicktest v1.4.1 h1:Wv2VwvNn73pAdFIVUQRXYDFp31lXKbqblIXo/Q5GPSg=
github.com/frankban/quicktest v1.4.1/go.mod h1:36zfPVQyHxymz4cH7wlDmVwDrJuljRB60qkgn7rorfQ= github.com/frankban/quicktest v1.4.1/go.mod h1:36zfPVQyHxymz4cH7wlDmVwDrJuljRB60qkgn7rorfQ=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
@ -208,6 +211,8 @@ github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542/go.mod h1:Ow0tF8D4Kplbc8s8sSb3V2oUCygFHVp8gC3Dn6U4MNI= github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542/go.mod h1:Ow0tF8D4Kplbc8s8sSb3V2oUCygFHVp8gC3Dn6U4MNI=
github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v0.0.0-20161216184304-ed905158d874/go.mod h1:JMRHfdO9jKNzS/+BTlxCjKNQHg/jZAft8U7LloJvN7I= github.com/hashicorp/go-multierror v0.0.0-20161216184304-ed905158d874/go.mod h1:JMRHfdO9jKNzS/+BTlxCjKNQHg/jZAft8U7LloJvN7I=
github.com/hashicorp/go-uuid v1.0.1 h1:fv1ep09latC32wFoVwnqcnKJGnMSdBanPczbHAYm1BE=
github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.3/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/golang-lru v0.5.3/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
@ -219,6 +224,8 @@ github.com/iij/doapi v0.0.0-20190504054126-0bbf12d6d7df/go.mod h1:QMZY7/J/KSQEhK
github.com/imdario/mergo v0.3.8 h1:CGgOkSJeqMRmt0D9XLWExdT4m4F1vd3FV3VPt+0VxkQ= github.com/imdario/mergo v0.3.8 h1:CGgOkSJeqMRmt0D9XLWExdT4m4F1vd3FV3VPt+0VxkQ=
github.com/imdario/mergo v0.3.8/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/imdario/mergo v0.3.8/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo= github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo=
github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03 h1:FUwcHNlEqkqLjLBdCp5PRlCFijNjvcYANOZXzCfXwCM=
github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo= github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo=
@ -236,6 +243,8 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V
github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM= github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.9.7 h1:hYW1gP94JUmAhBtJ+LNz5My+gBobDxPR1iVuKug26aA=
github.com/klauspost/compress v1.9.7/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/cpuid v1.2.0 h1:NMpwD2G9JSFOE1/TJjGSo5zG7Yb2bTe7eq1jH+irmeE= github.com/klauspost/cpuid v1.2.0 h1:NMpwD2G9JSFOE1/TJjGSo5zG7Yb2bTe7eq1jH+irmeE=
github.com/klauspost/cpuid v1.2.0/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/klauspost/cpuid v1.2.0/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/kolo/xmlrpc v0.0.0-20190717152603-07c4ee3fd181/go.mod h1:o03bZfuBwAXHetKXuInt4S7omeXUu62/A845kiycsSQ= github.com/kolo/xmlrpc v0.0.0-20190717152603-07c4ee3fd181/go.mod h1:o03bZfuBwAXHetKXuInt4S7omeXUu62/A845kiycsSQ=
@ -274,6 +283,10 @@ github.com/micro/cli/v2 v2.1.2 h1:43J1lChg/rZCC1rvdqZNFSQDrGT7qfMrtp6/ztpIkEM=
github.com/micro/cli/v2 v2.1.2/go.mod h1:EguNh6DAoWKm9nmk+k/Rg0H3lQnDxqzu5x5srOtGtYg= github.com/micro/cli/v2 v2.1.2/go.mod h1:EguNh6DAoWKm9nmk+k/Rg0H3lQnDxqzu5x5srOtGtYg=
github.com/micro/go-micro/v2 v2.3.0 h1:3seJJ7/pbhleZNe6gGHFJjOsAqvYGcy2ivc3P5PYnVQ= github.com/micro/go-micro/v2 v2.3.0 h1:3seJJ7/pbhleZNe6gGHFJjOsAqvYGcy2ivc3P5PYnVQ=
github.com/micro/go-micro/v2 v2.3.0/go.mod h1:GR69d1AXMg/WjMNf/7K1VO6hCBJDIpqCqnVYNTV6M5w= github.com/micro/go-micro/v2 v2.3.0/go.mod h1:GR69d1AXMg/WjMNf/7K1VO6hCBJDIpqCqnVYNTV6M5w=
github.com/micro/go-plugins v1.5.1 h1:swcFD7ynCTUo98APqIEIbPu2XMd6yVGTnI8PqdnCwOQ=
github.com/micro/go-plugins/broker/kafka v0.0.0-20200119172437-4fe21aa238fd h1:heuPI2eEEovTaHpq1OFDRAY/3QKvqFOyB/A++Cq0/ZE=
github.com/micro/go-plugins/broker/kafka/v2 v2.3.0 h1:VuMxNMOeIPgklgNXz9kZ9h+Xsg4W+iSngR6Sk3ep9Xc=
github.com/micro/go-plugins/broker/kafka/v2 v2.3.0/go.mod h1:PXFX3ioNEVO3PT21BC1j06aOtzfUnX0qj9jhHEwLzmM=
github.com/micro/go-plugins/codec/segmentio/v2 v2.3.0 h1:VKWhtEHd1x0PYuU1YoGeBHgAs06aiThleV2v0LruK+g= github.com/micro/go-plugins/codec/segmentio/v2 v2.3.0 h1:VKWhtEHd1x0PYuU1YoGeBHgAs06aiThleV2v0LruK+g=
github.com/micro/go-plugins/codec/segmentio/v2 v2.3.0/go.mod h1:sblO7/JViOU+cTq4VvqzzWVbwEZvX2hoBgnIZ/cf+HI= github.com/micro/go-plugins/codec/segmentio/v2 v2.3.0/go.mod h1:sblO7/JViOU+cTq4VvqzzWVbwEZvX2hoBgnIZ/cf+HI=
github.com/micro/mdns v0.3.0 h1:bYycYe+98AXR3s8Nq5qvt6C573uFTDPIYzJemWON0QE= github.com/micro/mdns v0.3.0 h1:bYycYe+98AXR3s8Nq5qvt6C573uFTDPIYzJemWON0QE=
@ -450,6 +463,7 @@ golang.org/x/crypto v0.0.0-20190130090550-b01c7a725664/go.mod h1:6SG95UA2DQfeDnf
golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190219172222-a4c6cb3142f2/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190219172222-a4c6cb3142f2/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE=
golang.org/x/crypto v0.0.0-20190418165655-df01cb2cc480/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= golang.org/x/crypto v0.0.0-20190418165655-df01cb2cc480/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE=
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
@ -624,6 +638,15 @@ gopkg.in/go-playground/validator.v9 v9.31.0/go.mod h1:+c9/zcJMFNgbLvly1L1V+PpxWd
gopkg.in/h2non/gock.v1 v1.0.15/go.mod h1:sX4zAkdYX1TRGJ2JY156cFspQn4yRWn6p9EMdODlynE= gopkg.in/h2non/gock.v1 v1.0.15/go.mod h1:sX4zAkdYX1TRGJ2JY156cFspQn4yRWn6p9EMdODlynE=
gopkg.in/ini.v1 v1.42.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/ini.v1 v1.42.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/ini.v1 v1.44.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/ini.v1 v1.44.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/jcmturner/aescts.v1 v1.0.1 h1:cVVZBK2b1zY26haWB4vbBiZrfFQnfbTVrE3xZq6hrEw=
gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo=
gopkg.in/jcmturner/dnsutils.v1 v1.0.1 h1:cIuC1OLRGZrld+16ZJvvZxVJeKPsvd5eUIvxfoN5hSM=
gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q=
gopkg.in/jcmturner/goidentity.v3 v3.0.0/go.mod h1:oG2kH0IvSYNIu80dVAyu/yoefjq1mNfM5bm88whjWx4=
gopkg.in/jcmturner/gokrb5.v7 v7.2.3 h1:hHMV/yKPwMnJhPuPx7pH2Uw/3Qyf+thJYlisUc44010=
gopkg.in/jcmturner/gokrb5.v7 v7.2.3/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM=
gopkg.in/jcmturner/rpc.v1 v1.1.0 h1:QHIUxTX1ISuAv9dD2wJ9HWQVuWDX/Zc0PfeC2tjc4rU=
gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8=
gopkg.in/ns1/ns1-go.v2 v2.0.0-20190730140822-b51389932cbc/go.mod h1:VV+3haRsgDiVLxyifmMBrBIuCWFBPYKbRssXB9z67Hw= gopkg.in/ns1/ns1-go.v2 v2.0.0-20190730140822-b51389932cbc/go.mod h1:VV+3haRsgDiVLxyifmMBrBIuCWFBPYKbRssXB9z67Hw=
gopkg.in/resty.v1 v1.9.1/go.mod h1:vo52Hzryw9PnPHcJfPsBiFW62XhNx5OczbV9y+IMpgc= gopkg.in/resty.v1 v1.9.1/go.mod h1:vo52Hzryw9PnPHcJfPsBiFW62XhNx5OczbV9y+IMpgc=
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=

203
sarama_test.go Normal file
View File

@ -0,0 +1,203 @@
package segmentio_test
import (
"os"
"strings"
"sync/atomic"
"testing"
"github.com/micro/go-micro/v2/broker"
sarama "github.com/micro/go-plugins/broker/kafka/v2"
segjson "github.com/micro/go-plugins/codec/segmentio/v2"
)
func BenchmarkSaramaCodecJsonPublish(b *testing.B) {
b.Skip()
if tr := os.Getenv("TRAVIS"); len(tr) > 0 {
b.Skip()
}
var addrs []string
if addr := os.Getenv("BROKER_ADDRS"); len(addr) == 0 {
addrs = []string{"127.0.0.1:9092"}
} else {
addrs = strings.Split(addr, ",")
}
brk := sarama.NewBroker(broker.Addrs(addrs...))
if err := brk.Connect(); err != nil {
b.Fatal(err)
}
defer func() {
if err := brk.Disconnect(); err != nil {
b.Fatal(err)
}
}()
b.ResetTimer()
for i := 0; i < b.N; i++ {
if err := brk.Publish("test_topic", bm); err != nil {
b.Fatal(err)
}
}
}
func BenchmarkSaramaCodecSegmentioPublish(b *testing.B) {
if tr := os.Getenv("TRAVIS"); len(tr) > 0 {
b.Skip()
}
var addrs []string
if addr := os.Getenv("BROKER_ADDRS"); len(addr) == 0 {
addrs = []string{"127.0.0.1:9092"}
} else {
addrs = strings.Split(addr, ",")
}
brk := sarama.NewBroker(broker.Codec(segjson.Marshaler{}), broker.Addrs(addrs...))
if err := brk.Connect(); err != nil {
b.Fatal(err)
}
defer func() {
if err := brk.Disconnect(); err != nil {
b.Fatal(err)
}
}()
b.ResetTimer()
for i := 0; i < b.N; i++ {
if err := brk.Publish("test_topic", bm); err != nil {
b.Fatal(err)
}
}
}
func BenchmarkSaramaCodecJsonSubscribe(b *testing.B) {
b.Skip()
if tr := os.Getenv("TRAVIS"); len(tr) > 0 {
b.Skip()
}
var addrs []string
if addr := os.Getenv("BROKER_ADDRS"); len(addr) == 0 {
addrs = []string{"127.0.0.1:9092"}
} else {
addrs = strings.Split(addr, ",")
}
brk := sarama.NewBroker(broker.Addrs(addrs...))
if err := brk.Connect(); err != nil {
b.Fatal(err)
}
defer func() {
if err := brk.Disconnect(); err != nil {
b.Fatal(err)
}
}()
cnt := 0
var done atomic.Value
done.Store(false)
exit := make(chan struct{})
fn := func(msg broker.Event) error {
if cnt == 0 {
b.ResetTimer()
}
cnt++
if cnt >= b.N {
if v, ok := done.Load().(bool); ok && !v {
done.Store(true)
close(exit)
}
}
return msg.Ack()
}
go func() {
for i := 0; i < b.N; i++ {
if v := done.Load().(bool); v {
return
}
if err := brk.Publish("test_topic", bm); err != nil {
b.Fatal(err)
}
}
}()
sub, err := brk.Subscribe("test_topic", fn, broker.Queue("test"))
if err != nil {
b.Fatal(err)
}
defer func() {
if err := sub.Unsubscribe(); err != nil {
b.Fatal(err)
}
}()
<-exit
}
func BenchmarkSaramaCodecSegmentioSubscribe(b *testing.B) {
if tr := os.Getenv("TRAVIS"); len(tr) > 0 {
b.Skip()
}
var addrs []string
if addr := os.Getenv("BROKER_ADDRS"); len(addr) == 0 {
addrs = []string{"127.0.0.1:9092"}
} else {
addrs = strings.Split(addr, ",")
}
brk := sarama.NewBroker(broker.Codec(segjson.Marshaler{}), broker.Addrs(addrs...))
if err := brk.Connect(); err != nil {
b.Fatal(err)
}
defer func() {
if err := brk.Disconnect(); err != nil {
b.Fatal(err)
}
}()
cnt := 0
var done atomic.Value
done.Store(false)
exit := make(chan struct{})
fn := func(msg broker.Event) error {
if cnt == 0 {
b.ResetTimer()
}
cnt++
if cnt >= b.N {
if v, ok := done.Load().(bool); ok && !v {
done.Store(true)
close(exit)
}
}
return msg.Ack()
}
go func() {
for i := 0; i < b.N; i++ {
if v := done.Load().(bool); v {
return
}
if err := brk.Publish("test_topic", bm); err != nil {
b.Fatal(err)
}
}
}()
sub, err := brk.Subscribe("test_topic", fn, broker.Queue("test"))
if err != nil {
b.Fatal(err)
}
defer func() {
if err := sub.Unsubscribe(); err != nil {
b.Fatal(err)
}
}()
<-exit
}

View File

@ -21,7 +21,6 @@ type kBroker struct {
writerConfig kafka.WriterConfig writerConfig kafka.WriterConfig
writers map[string]*kafka.Writer writers map[string]*kafka.Writer
readers map[string]*kafka.Reader
connected bool connected bool
sync.RWMutex sync.RWMutex
@ -128,11 +127,6 @@ func (k *kBroker) Disconnect() error {
k.Lock() k.Lock()
defer k.Unlock() defer k.Unlock()
for _, reader := range k.readers {
if err := reader.Close(); err != nil {
return err
}
}
for _, writer := range k.writers { for _, writer := range k.writers {
if err := writer.Close(); err != nil { if err := writer.Close(); err != nil {
return err return err
@ -214,17 +208,20 @@ func (k *kBroker) Subscribe(topic string, handler broker.Handler, opts ...broker
sub := &subscriber{group: group, opts: opt, t: topic} sub := &subscriber{group: group, opts: opt, t: topic}
chErr := make(chan error)
go func() { go func() {
for { for {
gen, err := group.Next(k.opts.Context) gen, err := group.Next(k.opts.Context)
if err == kafka.ErrGroupClosed { if err == kafka.ErrGroupClosed {
chErr <- nil
return return
} else if err != nil { } else if err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) { chErr <- err
logger.Errorf("[kafka] subscribe error: %v", err)
}
return return
} }
chErr <- nil
assignments := gen.Assignments[topic] assignments := gen.Assignments[topic]
for _, assignment := range assignments { for _, assignment := range assignments {
partition, offset := assignment.ID, assignment.Offset partition, offset := assignment.ID, assignment.Offset
@ -233,12 +230,10 @@ func (k *kBroker) Subscribe(topic string, handler broker.Handler, opts ...broker
gen.Start(func(ctx context.Context) { gen.Start(func(ctx context.Context) {
// create reader for this partition. // create reader for this partition.
reader := kafka.NewReader(kafka.ReaderConfig{ cfg := k.readerConfig
//GroupID: gen.GroupID, cfg.Topic = topic
Brokers: gcfg.Brokers, cfg.Partition = partition
Topic: topic, reader := kafka.NewReader(cfg)
Partition: partition,
})
defer reader.Close() defer reader.Close()
// seek to the last committed offset for this partition. // seek to the last committed offset for this partition.
reader.SetOffset(offset) reader.SetOffset(offset)
@ -289,6 +284,11 @@ func (k *kBroker) Subscribe(topic string, handler broker.Handler, opts ...broker
} }
}() }()
err = <-chErr
if err != nil {
return nil, err
}
return sub, nil return sub, nil
} }
@ -327,7 +327,7 @@ func NewBroker(opts ...broker.Option) broker.Broker {
} }
readerConfig.WatchPartitionChanges = true readerConfig.WatchPartitionChanges = true
writerConfig := kafka.WriterConfig{} writerConfig := kafka.WriterConfig{CompressionCodec: nil}
if cfg, ok := options.Context.Value(writerConfigKey{}).(kafka.WriterConfig); ok { if cfg, ok := options.Context.Value(writerConfigKey{}).(kafka.WriterConfig); ok {
writerConfig = cfg writerConfig = cfg
} }
@ -340,7 +340,6 @@ func NewBroker(opts ...broker.Option) broker.Broker {
readerConfig: readerConfig, readerConfig: readerConfig,
writerConfig: writerConfig, writerConfig: writerConfig,
writers: make(map[string]*kafka.Writer), writers: make(map[string]*kafka.Writer),
readers: make(map[string]*kafka.Reader),
addrs: cAddrs, addrs: cAddrs,
opts: options, opts: options,
} }

View File

@ -1,66 +1,18 @@
package segmentio package segmentio_test
import ( import (
"os" "os"
"strings" "strings"
"sync/atomic"
"testing" "testing"
"github.com/micro/go-micro/v2/broker" "github.com/micro/go-micro/v2/broker"
segmentio "github.com/micro/go-plugins/broker/segmentio/v2"
segjson "github.com/micro/go-plugins/codec/segmentio/v2" segjson "github.com/micro/go-plugins/codec/segmentio/v2"
) )
var ( func BenchmarkSegmentioCodecJsonPublish(b *testing.B) {
bm = &broker.Message{ b.Skip()
Header: map[string]string{"hkey": "hval"},
Body: []byte("body"),
}
)
func TestPublish(t *testing.T) {
if tr := os.Getenv("TRAVIS"); len(tr) > 0 {
t.Skip()
}
var addrs []string
if addr := os.Getenv("BROKER_ADDRS"); len(addr) == 0 {
addrs = []string{"127.0.0.1:9092"}
} else {
addrs = strings.Split(addr, ",")
}
b := NewBroker(broker.Codec(segjson.Marshaler{}), broker.Addrs(addrs...))
if err := b.Connect(); err != nil {
t.Fatal(err)
}
defer func() {
if err := b.Disconnect(); err != nil {
t.Fatal(err)
}
}()
done := make(chan bool, 1)
fn := func(msg broker.Event) error {
done <- true
return msg.Ack()
}
sub, err := b.Subscribe("test_topic", fn, broker.Queue("test"))
if err != nil {
t.Fatal(err)
}
defer func() {
if err := sub.Unsubscribe(); err != nil {
t.Fatal(err)
}
}()
if err := b.Publish("test_topic", bm); err != nil {
t.Fatal(err)
}
<-done
}
func BenchmarkSegmentioPublish(b *testing.B) {
if tr := os.Getenv("TRAVIS"); len(tr) > 0 { if tr := os.Getenv("TRAVIS"); len(tr) > 0 {
b.Skip() b.Skip()
} }
@ -72,7 +24,7 @@ func BenchmarkSegmentioPublish(b *testing.B) {
addrs = strings.Split(addr, ",") addrs = strings.Split(addr, ",")
} }
brk := NewBroker(broker.Codec(segjson.Marshaler{}), broker.Addrs(addrs...)) brk := segmentio.NewBroker(broker.Addrs(addrs...))
if err := brk.Connect(); err != nil { if err := brk.Connect(); err != nil {
b.Fatal(err) b.Fatal(err)
} }
@ -83,16 +35,13 @@ func BenchmarkSegmentioPublish(b *testing.B) {
}() }()
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
if err := brk.Publish("test_topic", bm); err != nil { if err := brk.Publish("test_topic", bm); err != nil {
b.Fatal(err) b.Fatal(err)
} }
} }
} }
func BenchmarkSegmentioCodecSegmentioPublish(b *testing.B) {
func BenchmarkSegmentioSubscribe(b *testing.B) {
if tr := os.Getenv("TRAVIS"); len(tr) > 0 { if tr := os.Getenv("TRAVIS"); len(tr) > 0 {
b.Skip() b.Skip()
} }
@ -104,7 +53,39 @@ func BenchmarkSegmentioSubscribe(b *testing.B) {
addrs = strings.Split(addr, ",") addrs = strings.Split(addr, ",")
} }
brk := NewBroker(broker.Codec(segjson.Marshaler{}), broker.Addrs(addrs...)) brk := segmentio.NewBroker(broker.Codec(segjson.Marshaler{}), broker.Addrs(addrs...))
if err := brk.Connect(); err != nil {
b.Fatal(err)
}
defer func() {
if err := brk.Disconnect(); err != nil {
b.Fatal(err)
}
}()
b.ResetTimer()
for i := 0; i < b.N; i++ {
if err := brk.Publish("test_topic", bm); err != nil {
b.Fatal(err)
}
}
}
func BenchmarkSegmentioCodecJsonSubscribe(b *testing.B) {
b.Skip()
if tr := os.Getenv("TRAVIS"); len(tr) > 0 {
b.Skip()
}
var addrs []string
if addr := os.Getenv("BROKER_ADDRS"); len(addr) == 0 {
addrs = []string{"127.0.0.1:9092"}
} else {
addrs = strings.Split(addr, ",")
}
brk := segmentio.NewBroker(broker.Addrs(addrs...))
if err := brk.Connect(); err != nil { if err := brk.Connect(); err != nil {
b.Fatal(err) b.Fatal(err)
} }
@ -115,23 +96,33 @@ func BenchmarkSegmentioSubscribe(b *testing.B) {
}() }()
cnt := 0 cnt := 0
done := make(chan struct{}) var done atomic.Value
done.Store(false)
exit := make(chan struct{})
fn := func(msg broker.Event) error { fn := func(msg broker.Event) error {
if cnt == 0 { if cnt == 0 {
b.ResetTimer() b.ResetTimer()
} }
cnt++ cnt++
if cnt == b.N { if cnt == b.N {
close(done) if v := done.Load().(bool); !v {
done.Store(true)
close(exit)
}
} }
return msg.Ack() return msg.Ack()
} }
for i := 0; i < b.N; i++ { go func() {
if err := brk.Publish("test_topic", bm); err != nil { for i := 0; i < b.N; i++ {
b.Fatal(err) if v, ok := done.Load().(bool); ok && v {
return
}
if err := brk.Publish("test_topic", bm); err != nil {
b.Fatal(err)
}
} }
} }()
sub, err := brk.Subscribe("test_topic", fn, broker.Queue("test")) sub, err := brk.Subscribe("test_topic", fn, broker.Queue("test"))
if err != nil { if err != nil {
@ -142,40 +133,22 @@ func BenchmarkSegmentioSubscribe(b *testing.B) {
b.Fatal(err) b.Fatal(err)
} }
}() }()
<-done <-exit
} }
/* func BenchmarkSegmentioCodecSegmentioSubscribe(b *testing.B) {
func BenchmarkSaramaPublish(b *testing.B) {
if tr := os.Getenv("TRAVIS"); len(tr) > 0 { if tr := os.Getenv("TRAVIS"); len(tr) > 0 {
b.Skip() b.Skip()
} }
brk := sarama.NewBroker(broker.Addrs("127.0.0.1:9092"))
if err := brk.Connect(); err != nil {
b.Fatal(err)
}
defer func() {
if err := brk.Disconnect(); err != nil {
b.Fatal(err)
}
}()
b.ResetTimer() var addrs []string
if addr := os.Getenv("BROKER_ADDRS"); len(addr) == 0 {
for i := 0; i < b.N; i++ { addrs = []string{"127.0.0.1:9092"}
if err := brk.Publish("test_topic", bm); err != nil { } else {
b.Fatal(err) addrs = strings.Split(addr, ",")
}
} }
} brk := segmentio.NewBroker(broker.Codec(segjson.Marshaler{}), broker.Addrs(addrs...))
func BenchmarkSaramaSubscribe(b *testing.B) {
if tr := os.Getenv("TRAVIS"); len(tr) > 0 {
b.Skip()
}
brk := sarama.NewBroker(broker.Addrs("127.0.0.1:9092"))
if err := brk.Connect(); err != nil { if err := brk.Connect(); err != nil {
b.Fatal(err) b.Fatal(err)
} }
@ -186,24 +159,33 @@ func BenchmarkSaramaSubscribe(b *testing.B) {
}() }()
cnt := 0 cnt := 0
done := make(chan struct{}) var done atomic.Value
done.Store(false)
exit := make(chan struct{})
fn := func(msg broker.Event) error { fn := func(msg broker.Event) error {
if cnt == 0 { if cnt == 0 {
b.ResetTimer() b.ResetTimer()
} }
cnt++ cnt++
if cnt == 10000 { if cnt == b.N {
close(done) if v, ok := done.Load().(bool); ok && !v {
done.Store(true)
close(exit)
}
} }
return msg.Ack() return msg.Ack()
} }
for i := 0; i < 10000; i++ { go func() {
if err := brk.Publish("test_topic", bm); err != nil { for i := 0; i < b.N; i++ {
b.Fatal(err) if v := done.Load().(bool); v {
return
}
if err := brk.Publish("test_topic", bm); err != nil {
b.Fatal(err)
}
} }
} }()
sub, err := brk.Subscribe("test_topic", fn, broker.Queue("test")) sub, err := brk.Subscribe("test_topic", fn, broker.Queue("test"))
if err != nil { if err != nil {
@ -214,7 +196,5 @@ func BenchmarkSaramaSubscribe(b *testing.B) {
b.Fatal(err) b.Fatal(err)
} }
}() }()
<-exit
<-done
} }
*/