diff --git a/go.mod b/go.mod index fe40005..bbb3d40 100644 --- a/go.mod +++ b/go.mod @@ -3,12 +3,10 @@ module github.com/unistack-org/micro-broker-kgo/v3 go 1.16 require ( - github.com/davecgh/go-spew v1.1.1 // indirect - github.com/golang/snappy v0.0.4 // indirect - github.com/klauspost/compress v1.13.4 // indirect - github.com/pierrec/lz4/v4 v4.1.8 // indirect - github.com/twmb/franz-go v0.10.2-0.20210823212011-0d01f7456b4d + github.com/twmb/franz-go v0.10.3-0.20210825060253-d5e80b38ca2b + github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210825163214-e185676761dd // indirect github.com/unistack-org/micro-codec-json/v3 v3.2.5 + github.com/unistack-org/micro-proto v0.0.8 // indirect github.com/unistack-org/micro/v3 v3.6.3 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c ) diff --git a/go.sum b/go.sum index d259dad..ec79e04 100644 --- a/go.sum +++ b/go.sum @@ -1,14 +1,34 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/ef-ds/deque v1.0.4/go.mod h1:gXDnTC3yqvBcHbq2lcExjtAcVrOnJCbMcZXmuj8Z4tg= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/golang-jwt/jwt/v4 v4.0.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= +github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -22,55 +42,91 @@ github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/U github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aWNYyvBVK62bc= github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= -github.com/klauspost/compress v1.13.0/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEEeX6s= github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= -github.com/pierrec/lz4/v4 v4.1.7/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pierrec/lz4/v4 v4.1.8 h1:ieHkV+i2BRzngO4Wd/3HGowuZStgq6QkPsD1eolNAO4= github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/silas/dag v0.0.0-20210121180416-41cf55125c34/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/twmb/franz-go v0.10.2-0.20210823212011-0d01f7456b4d h1:O7QsIa7lVn+P60nsOBAjkajGKcOpaN3zsOGYboCgfs4= -github.com/twmb/franz-go v0.10.2-0.20210823212011-0d01f7456b4d/go.mod h1:Qa6npC7EIi4WoLmnUz9Ue/sPL0k+ex4OyHCYJ2pCAqw= -github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210726202344-c376dbc9081f h1:Opx7EKsXb4IOPj1ammVNksPFpbXx6aaxdIn4hGjiIpk= -github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210726202344-c376dbc9081f/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= +github.com/twmb/franz-go v0.10.3-0.20210825060253-d5e80b38ca2b h1:t7+Qdd/GHtUBN6D71CwwOEfsZmK0QQ/up3M6gaoaHQ4= +github.com/twmb/franz-go v0.10.3-0.20210825060253-d5e80b38ca2b/go.mod h1:Txc5/v0DIKGcdCa1VZhyasECMJ4svN/LHUKYUPYihL0= +github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210823212011-0d01f7456b4d/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= +github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210825163214-e185676761dd h1:8TvJbg0dnOaq4Oex1wnYejPokyUQ4zG2OQUfD76yq4M= +github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210825163214-e185676761dd/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= github.com/twmb/go-rbtree v1.0.0 h1:KxN7dXJ8XaZ4cvmHV1qqXTshxX3EBvX/toG5+UR49Mg= github.com/twmb/go-rbtree v1.0.0/go.mod h1:UlIAI8gu3KRPkXSobZnmJfVwCJgEhD/liWzT5ppzIyc= github.com/unistack-org/micro-codec-json/v3 v3.2.5 h1:WOilhbL0YSu58iIQIIxpawRYZyx6CR16tCpbX4ai3Vc= github.com/unistack-org/micro-codec-json/v3 v3.2.5/go.mod h1:LSzfrD9GYWCl6KOyihywx1wlbOgStrpyy3NVHNZAvHA= -github.com/unistack-org/micro-proto v0.0.5 h1:DIC97Hufa2nGjuvTsfToD9laEOKddWMRTzeCfBwJ1j8= github.com/unistack-org/micro-proto v0.0.5/go.mod h1:EuI7UlfGXmT1hy6WacULib9LbNgRnDYQvTCFoLgKM2I= +github.com/unistack-org/micro-proto v0.0.8 h1:g4UZGQGeYGI3CFJtjuEm47aouYPviG8SDhSifl0831w= +github.com/unistack-org/micro-proto v0.0.8/go.mod h1:GYO53DWmeldRIo90cAdQx8bLr/WJMxW62W4ja74p1Ac= github.com/unistack-org/micro/v3 v3.3.19/go.mod h1:LXmPfbJnJNvL0kQs8HfnkV3Wya2Wb+C7keVq++RCZnk= github.com/unistack-org/micro/v3 v3.6.3 h1:CvC4B2kOgwzhPx+w9fcbEIJkgzxS7i84PRB+vL6Vz0U= github.com/unistack-org/micro/v3 v3.6.3/go.mod h1:Hp+DmX1Bt+/z+ihk5coYwNtXhPMwr4SJuCFRy2kyUl8= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= +golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210510120150-4163338589ed/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= @@ -79,3 +135,5 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/kgo.go b/kgo.go index 56cf581..0eda025 100644 --- a/kgo.go +++ b/kgo.go @@ -5,14 +5,12 @@ import ( "context" "fmt" "math/rand" - "net" "strings" "sync" "sync/atomic" "time" kgo "github.com/twmb/franz-go/pkg/kgo" - sasl "github.com/twmb/franz-go/pkg/sasl" "github.com/unistack-org/micro/v3/broker" "github.com/unistack-org/micro/v3/logger" "github.com/unistack-org/micro/v3/metadata" @@ -201,92 +199,14 @@ func (k *kBroker) Init(opts ...broker.Option) error { return err } - var kopts []kgo.Opt - if k.opts.Context != nil { if v, ok := k.opts.Context.Value(optionsKey{}).([]kgo.Opt); ok && len(v) > 0 { - kopts = append(kopts, v...) - } - if v, ok := k.opts.Context.Value(clientIDKey{}).(string); ok && v != "" { - kopts = append(kopts, kgo.ClientID(v)) - } - if v, ok := k.opts.Context.Value(maxReadBytesKey{}).(int32); ok { - kopts = append(kopts, kgo.BrokerMaxReadBytes(v)) - } - if v, ok := k.opts.Context.Value(maxWriteBytesKey{}).(int32); ok { - kopts = append(kopts, kgo.BrokerMaxWriteBytes(v)) - } - if v, ok := k.opts.Context.Value(connIdleTimeoutKey{}).(time.Duration); ok { - kopts = append(kopts, kgo.ConnIdleTimeout(v)) - } - if v, ok := k.opts.Context.Value(connTimeoutOverheadKey{}).(time.Duration); ok { - kopts = append(kopts, kgo.ConnTimeoutOverhead(v)) - } - if v, ok := k.opts.Context.Value(dialerKey{}).(func(ctx context.Context, network, host string) (net.Conn, error)); ok { - kopts = append(kopts, kgo.Dialer(v)) - } - if v, ok := k.opts.Context.Value(metadataMaxAgeKey{}).(time.Duration); ok { - kopts = append(kopts, kgo.MetadataMaxAge(v)) - } - if v, ok := k.opts.Context.Value(metadataMinAgeKey{}).(time.Duration); ok { - kopts = append(kopts, kgo.MetadataMinAge(v)) - } - // if v, ok := k.opts.Context.Value(produceRetriesKey{}).(int); ok { - // kopts = append(kopts, kgo.ProduceRetries(v)) - // } - if v, ok := k.opts.Context.Value(requestRetriesKey{}).(int); ok { - kopts = append(kopts, kgo.RequestRetries(v)) - } - if v, ok := k.opts.Context.Value(retryBackoffFnKey{}).(func(int) time.Duration); ok { - kopts = append(kopts, kgo.RetryBackoffFn(v)) - } else { - kopts = append(kopts, kgo.RetryBackoffFn( - func() func(int) time.Duration { - var rng mrand.Rand - return func(fails int) time.Duration { - const ( - min = 250 * time.Millisecond - max = 2 * time.Second - ) - if fails <= 0 { - return min - } - if fails > 10 { - return max - } - - backoff := min * time.Duration(1<<(fails-1)) - jitter := 0.8 + 0.4*rng.Float64() - backoff = time.Duration(float64(backoff) * jitter) - - if backoff > max { - return max - } - return backoff - } - }(), - )) - } - if v, ok := k.opts.Context.Value(retryTimeoutFnKey{}).(func(int16) time.Duration); ok { - kopts = append(kopts, kgo.RetryTimeoutFn(v)) - } - if v, ok := k.opts.Context.Value(retryTimeoutKey{}).(time.Duration); ok { - kopts = append(kopts, kgo.RetryTimeout(v)) - } - if v, ok := k.opts.Context.Value(saslKey{}).([]sasl.Mechanism); ok { - kopts = append(kopts, kgo.SASL(v...)) - } - if v, ok := k.opts.Context.Value(hooksKey{}).([]kgo.Hook); ok { - kopts = append(kopts, kgo.WithHooks(v...)) + k.kopts = append(k.kopts, v...) } } - kopts = append(kopts, - kgo.WithLogger(&mlogger{l: k.opts.Logger, ctx: k.opts.Context}), - kgo.RequiredAcks(kgo.AllISRAcks()), - // kgo.RecordPartitioner(), - ) - k.kopts = kopts + // kgo.RecordPartitioner(), + k.init = true return nil @@ -306,12 +226,23 @@ func (k *kBroker) Publish(ctx context.Context, topic string, msg *broker.Message } func (k *kBroker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { + options := broker.NewPublishOptions(opts...) records := make([]*kgo.Record, 0, len(msgs)) var errs []string + var err error + var buf []byte for _, msg := range msgs { + if options.BodyOnly { + buf = msg.Body + } else { + buf, err = k.opts.Codec.Marshal(msg) + if err != nil { + return err + } + } topic, _ := msg.Header.Get(metadata.HeaderTopic) - rec := &kgo.Record{Value: msg.Body, Topic: topic} + rec := &kgo.Record{Value: buf, Topic: topic} records = append(records, rec) } @@ -328,10 +259,6 @@ func (k *kBroker) publish(ctx context.Context, msgs []*broker.Message, opts ...b return nil } -func (k *kBroker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { - return nil, nil -} - type mlogger struct { l logger.Logger ctx context.Context @@ -353,7 +280,11 @@ func (l *mlogger) Log(lvl kgo.LogLevel, msg string, args ...interface{}) { default: return } - l.l.Fields(args...).Log(l.ctx, mlvl, msg) + fields := make(map[string]interface{}, int(len(args)/2)) + for i := 0; i < len(args)/2; i += 2 { + fields[fmt.Sprintf("%v", args[i])] = args[i+1] + } + l.l.Fields(fields).Log(l.ctx, mlvl, msg) } func (l *mlogger) Level() kgo.LogLevel { @@ -370,6 +301,10 @@ func (l *mlogger) Level() kgo.LogLevel { return kgo.LogLevelNone } +func (k *kBroker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { + return nil, nil +} + func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { options := broker.NewSubscribeOptions(opts...) @@ -440,21 +375,15 @@ func (s *subscriber) run(ctx context.Context) { if err := s.handleFetches(ctx, fetches); err != nil { s.kopts.Logger.Errorf(ctx, "fetch handler err: %v", err) // TODO: fatal ? - //return + // return } } } } func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) error { - var batch bool var err error - if s.batchhandler != nil { - batch = true - } - _ = batch - mprecords := make(map[int32][]*kgo.Record) cnt := int64(0) @@ -500,7 +429,9 @@ func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) err mu.Unlock() return err case <-ticker.C: - if atomic.LoadInt64(&cnt) == 0 { + v := atomic.LoadInt64(&cnt) + s.kopts.Logger.Debugf(ctx, "records to commit: %v", v) + if v == 0 { return nil } mu.Lock() @@ -508,8 +439,9 @@ func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) err mu.Unlock() return err } + s.kopts.Logger.Debugf(ctx, "records to need process after commit: %v", v-int64(len(crecords))) atomic.AddInt64(&cnt, -int64(len(crecords))) - crecords = crecords[:] + crecords = nil mu.Unlock() } } @@ -591,7 +523,46 @@ func (k *kBroker) String() string { } func NewBroker(opts ...broker.Option) broker.Broker { + options := broker.NewOptions(opts...) + kopts := []kgo.Opt{ + kgo.WithLogger(&mlogger{l: options.Logger, ctx: options.Context}), + kgo.RequiredAcks(kgo.AllISRAcks()), + kgo.RetryBackoffFn( + func() func(int) time.Duration { + var rng mrand.Rand + return func(fails int) time.Duration { + const ( + min = 250 * time.Millisecond + max = 2 * time.Second + ) + if fails <= 0 { + return min + } + if fails > 10 { + return max + } + + backoff := min * time.Duration(1<<(fails-1)) + jitter := 0.8 + 0.4*rng.Float64() + backoff = time.Duration(float64(backoff) * jitter) + + if backoff > max { + return max + } + return backoff + } + }(), + ), + } + + if options.Context != nil { + if v, ok := options.Context.Value(optionsKey{}).([]kgo.Opt); ok && len(v) > 0 { + kopts = append(kopts, v...) + } + } + return &kBroker{ - opts: broker.NewOptions(opts...), + opts: options, + kopts: kopts, } } diff --git a/kgo_test.go b/kgo_test.go index 86bb214..8f11fb3 100644 --- a/kgo_test.go +++ b/kgo_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + kg "github.com/twmb/franz-go/pkg/kgo" kgo "github.com/unistack-org/micro-broker-kgo/v3" jsoncodec "github.com/unistack-org/micro-codec-json/v3" "github.com/unistack-org/micro/v3/broker" @@ -16,12 +17,10 @@ import ( "github.com/unistack-org/micro/v3/metadata" ) -var ( - bm = &broker.Message{ - Header: map[string]string{"hkey": "hval", metadata.HeaderTopic: "test"}, - Body: []byte(`"body"`), - } -) +var bm = &broker.Message{ + Header: map[string]string{"hkey": "hval", metadata.HeaderTopic: "test"}, + Body: []byte(`"body"`), +} func TestPubSub(t *testing.T) { if tr := os.Getenv("INTEGRATION_TESTS"); len(tr) > 0 { @@ -43,6 +42,7 @@ func TestPubSub(t *testing.T) { broker.Addrs(addrs...), kgo.ClientID("test"), kgo.CommitInterval(1*time.Second), + kgo.Options(kg.FetchMaxBytes(10*1024)), ) if err := b.Init(); err != nil { t.Fatal(err) @@ -75,11 +75,11 @@ func TestPubSub(t *testing.T) { idx := int64(0) fn := func(msg broker.Event) error { atomic.AddInt64(&idx, 1) - //time.Sleep(200 * time.Millisecond) + // time.Sleep(200 * time.Millisecond) return msg.Ack() } - sub, err := b.Subscribe(ctx, "test", fn, broker.SubscribeAutoAck(true), broker.SubscribeGroup("test14"), broker.SubscribeBodyOnly(true)) + sub, err := b.Subscribe(ctx, "test", fn, broker.SubscribeAutoAck(true), broker.SubscribeGroup("test17"), broker.SubscribeBodyOnly(true)) if err != nil { t.Fatal(err) } diff --git a/options.go b/options.go index 7720000..cdaaefc 100644 --- a/options.go +++ b/options.go @@ -2,19 +2,15 @@ package kgo import ( "context" - "net" "time" kgo "github.com/twmb/franz-go/pkg/kgo" - sasl "github.com/twmb/franz-go/pkg/sasl" "github.com/unistack-org/micro/v3/broker" "github.com/unistack-org/micro/v3/client" ) -var ( - // DefaultCommitInterval specifies how fast send commit offsets to kafka - DefaultCommitInterval = 5 * time.Second -) +// DefaultCommitInterval specifies how fast send commit offsets to kafka +var DefaultCommitInterval = 5 * time.Second type subscribeContextKey struct{} @@ -35,116 +31,21 @@ func ClientPublishKey(key []byte) client.PublishOption { return client.SetPublishOption(publishKey{}, key) } -type clientIDKey struct{} - -// ClientID sets the kafka client id -func ClientID(id string) broker.Option { - return broker.SetOption(clientIDKey{}, id) -} - -type maxReadBytesKey struct{} - -// MaxReadBytes limit max bytes to read -func MaxReadBytes(n int32) broker.Option { - return broker.SetOption(maxReadBytesKey{}, n) -} - -type maxWriteBytesKey struct{} - -// MaxWriteBytes limit max bytes to write -func MaxWriteBytes(n int32) broker.Option { - return broker.SetOption(maxWriteBytesKey{}, n) -} - -type connIdleTimeoutKey struct{} - -// ConnIdleTimeout limit timeout for connection -func ConnIdleTimeout(td time.Duration) broker.Option { - return broker.SetOption(connIdleTimeoutKey{}, td) -} - -type connTimeoutOverheadKey struct{} - -// ConnTimeoutOverhead ... -func ConnTimeoutOverhead(td time.Duration) broker.Option { - return broker.SetOption(connTimeoutOverheadKey{}, td) -} - -type dialerKey struct{} - -// Dialer pass dialer -func Dialer(fn func(ctx context.Context, network, host string) (net.Conn, error)) broker.Option { - return broker.SetOption(dialerKey{}, fn) -} - -type metadataMaxAgeKey struct{} - -// MetadataMaxAge limit metadata max age -func MetadataMaxAge(td time.Duration) broker.Option { - return broker.SetOption(metadataMaxAgeKey{}, td) -} - -type metadataMinAgeKey struct{} - -// MetadataMinAge limit metadata min age -func MetadataMinAge(td time.Duration) broker.Option { - return broker.SetOption(metadataMinAgeKey{}, td) -} - -type produceRetriesKey struct{} - -// ProduceRetries limit number of retries -func ProduceRetries(n int) broker.Option { - return broker.SetOption(produceRetriesKey{}, n) -} - -type requestRetriesKey struct{} - -// RequestRetries limit number of retries -func RequestRetries(n int) broker.Option { - return broker.SetOption(requestRetriesKey{}, n) -} - -type retryBackoffFnKey struct{} - -// RetryBackoffFn set backoff func for retry -func RetryBackoffFn(fn func(int) time.Duration) broker.Option { - return broker.SetOption(retryBackoffFnKey{}, fn) -} - -type retryTimeoutKey struct{} - -// RetryTimeout limit retry timeout -func RetryTimeout(td time.Duration) broker.Option { - return broker.SetOption(retryTimeoutKey{}, td) -} - -type retryTimeoutFnKey struct{} - -// RetryTimeoutFn set func limit retry timeout -func RetryTimeoutFn(fn func(int16) time.Duration) broker.Option { - return broker.SetOption(retryTimeoutFnKey{}, fn) -} - -type saslKey struct{} - -// SASL pass sasl mechanism to auth -func SASL(sasls ...sasl.Mechanism) broker.Option { - return broker.SetOption(saslKey{}, sasls) -} - -type hooksKey struct{} - -// Hooks pass hooks (useful for tracing and logging) -func Hooks(hooks ...kgo.Hook) broker.Option { - return broker.SetOption(hooksKey{}, hooks) -} - type optionsKey struct{} // Options pass additional options to broker func Options(opts ...kgo.Opt) broker.Option { - return broker.SetOption(optionsKey{}, opts) + return func(o *broker.Options) { + if o.Context == nil { + o.Context = context.Background() + } + options, ok := o.Context.Value(optionsKey{}).([]kgo.Opt) + if !ok { + options = make([]kgo.Opt, 0, len(opts)) + } + options = append(options, opts...) + o.Context = context.WithValue(o.Context, optionsKey{}, options) + } } type commitIntervalKey struct{}