diff --git a/go.mod b/go.mod index 22477ef..ef52282 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,8 @@ module github.com/unistack-org/micro-broker-kgo/v3 go 1.16 require ( - github.com/klauspost/compress v1.13.5 // indirect - github.com/twmb/franz-go v0.11.0 - github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210901054312-f2002b3e2313 // indirect + github.com/twmb/franz-go v0.11.1 + github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210903012929-a6c6e3b9e991 // indirect github.com/unistack-org/micro-codec-json/v3 v3.2.5 github.com/unistack-org/micro/v3 v3.7.0 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c diff --git a/go.sum b/go.sum index dc4c3e7..20f16b4 100644 --- a/go.sum +++ b/go.sum @@ -21,8 +21,6 @@ github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvq github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -41,7 +39,6 @@ github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/U github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aWNYyvBVK62bc= github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= -github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/klauspost/compress v1.13.5 h1:9O69jUPDcsT9fEm74W92rZL9FQY7rCdaXVneq+yyzl4= github.com/klauspost/compress v1.13.5/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= @@ -56,11 +53,11 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/twmb/franz-go v0.11.0 h1:WHGZWV4rZVbkQndjqxYL3dXU1VdF4CZ68KgaHLwEtH8= -github.com/twmb/franz-go v0.11.0/go.mod h1:a8tUwwic5WYy32hMT7QUrsuwQ9b/AwN3Ub61PoEQagg= -github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210829174113-fcaaf3f18f2f/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= -github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210901054312-f2002b3e2313 h1:iS6Upu5PnhziQaIUHg2eXHLvjHGwmk+1eK1WGIl7gOU= -github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210901054312-f2002b3e2313/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= +github.com/twmb/franz-go v0.11.1 h1:9NbczLCWztTtlEhEXQKNvCLZUxsIMoKKJUlRstSNgB0= +github.com/twmb/franz-go v0.11.1/go.mod h1:cdFLk8d/5/ox88y38xgiDKP3Yo338OO0t5QbTEM2K6I= +github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210901051457-3c197a133ddd/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= +github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210903012929-a6c6e3b9e991 h1:NzPFcZyF5GhGIpxmFf/KL9XtIko6+q8u68uAi7ayoRU= +github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210903012929-a6c6e3b9e991/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= github.com/twmb/go-rbtree v1.0.0 h1:KxN7dXJ8XaZ4cvmHV1qqXTshxX3EBvX/toG5+UR49Mg= github.com/twmb/go-rbtree v1.0.0/go.mod h1:UlIAI8gu3KRPkXSobZnmJfVwCJgEhD/liWzT5ppzIyc= github.com/unistack-org/micro-codec-json/v3 v3.2.5 h1:WOilhbL0YSu58iIQIIxpawRYZyx6CR16tCpbX4ai3Vc= diff --git a/kgo_test.go b/kgo_test.go index 2f04028..94702c8 100644 --- a/kgo_test.go +++ b/kgo_test.go @@ -17,6 +17,12 @@ import ( "github.com/unistack-org/micro/v3/metadata" ) +var ( + msgcnt = int64(12000000) + group = "32" + prefill = false +) + var bm = &broker.Message{ Header: map[string]string{"hkey": "hval", metadata.HeaderTopic: "test"}, Body: []byte(`"body"`), @@ -56,18 +62,17 @@ func TestPubSub(t *testing.T) { t.Fatal(err) } }() + if prefill { + msgs := make([]*broker.Message, 0, msgcnt) + for i := int64(0); i < msgcnt; i++ { + msgs = append(msgs, bm) + } - /* - msgs := make([]*broker.Message, 0, 600000) - for i := 0; i < 600000; i++ { - msgs = append(msgs, bm) - } - - if err := b.BatchPublish(ctx, msgs); err != nil { - t.Fatal(err) - } - t.Skip() - */ + if err := b.BatchPublish(ctx, msgs); err != nil { + t.Fatal(err) + } + // t.Skip() + } done := make(chan bool, 1) idx := int64(0) fn := func(msg broker.Event) error { @@ -78,7 +83,7 @@ func TestPubSub(t *testing.T) { sub, err := b.Subscribe(ctx, "test", fn, broker.SubscribeAutoAck(true), - broker.SubscribeGroup("test29"), + broker.SubscribeGroup(group), broker.SubscribeBodyOnly(true)) if err != nil { t.Fatal(err) @@ -98,7 +103,11 @@ func TestPubSub(t *testing.T) { for { select { case <-pticker.C: - fmt.Printf("processed %v\n", atomic.LoadInt64(&idx)) + if prc := atomic.LoadInt64(&idx); prc == msgcnt { + close(done) + } else { + fmt.Printf("processed %v\n", prc) + } case <-ticker.C: close(done) }