From 3759fa2f7c2f611797563f721ebef8f744ca4757 Mon Sep 17 00:00:00 2001 From: Evstigneev Denis Date: Thu, 15 Jan 2026 11:46:54 +0300 Subject: [PATCH] refactor tests && add test for tombstone && fix concurent read\write map --- go.mod | 19 +++++++- go.sum | 63 +++++++++++++++++++++++++-- kgo_test.go | 84 +++++++++++++++++++++++++++++++++-- subscriber.go | 9 ++-- tombstone_test.go | 109 ++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 273 insertions(+), 11 deletions(-) create mode 100644 tombstone_test.go diff --git a/go.mod b/go.mod index 9a94c69..10dceb0 100644 --- a/go.mod +++ b/go.mod @@ -10,19 +10,36 @@ require ( github.com/twmb/franz-go/pkg/kfake v0.0.0-20251227070528-0c71f7e25fa1 github.com/twmb/franz-go/pkg/kmsg v1.12.0 go.opentelemetry.io/otel v1.38.0 + go.unistack.org/micro-codec-jsonpb/v3 v3.10.6 + go.unistack.org/micro-codec-proto/v3 v3.10.3 + go.unistack.org/micro-wrapper-requestid/v3 v3.9.4 go.unistack.org/micro/v3 v3.11.48 ) require ( + dario.cat/mergo v1.0.1 // indirect + github.com/KimMachineGun/automemlimit v0.6.1 // indirect github.com/ash3in/uuidv8 v1.2.0 // indirect + github.com/cilium/ebpf v0.16.0 // indirect + github.com/containerd/cgroups/v3 v3.0.4 // indirect + github.com/containerd/log v0.1.0 // indirect + github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/docker/go-units v0.5.0 // indirect + github.com/godbus/dbus/v5 v5.1.0 // indirect github.com/klauspost/compress v1.18.1 // indirect - github.com/kr/pretty v0.3.1 // indirect github.com/matoous/go-nanoid v1.5.1 // indirect + github.com/moby/sys/userns v0.1.0 // indirect + github.com/opencontainers/runtime-spec v1.2.0 // indirect + github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 // indirect + github.com/sirupsen/logrus v1.9.3 // indirect + go.uber.org/automaxprocs v1.6.0 // indirect go.unistack.org/micro-proto/v3 v3.4.1 // indirect golang.org/x/crypto v0.47.0 // indirect + golang.org/x/exp v0.0.0-20241210194714-1829a127f884 // indirect golang.org/x/sys v0.40.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20251022142026-3a174f9686a8 // indirect google.golang.org/grpc v1.76.0 // indirect diff --git a/go.sum b/go.sum index 224139d..3b1c9da 100644 --- a/go.sum +++ b/go.sum @@ -1,16 +1,40 @@ +dario.cat/mergo v1.0.1 h1:Ra4+bf83h2ztPIQYNP99R6m+Y7KfnARDfID+a+vLl4s= +dario.cat/mergo v1.0.1/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= +github.com/KimMachineGun/automemlimit v0.6.1 h1:ILa9j1onAAMadBsyyUJv5cack8Y1WT26yLj/V+ulKp8= +github.com/KimMachineGun/automemlimit v0.6.1/go.mod h1:T7xYht7B8r6AG/AqFcUdc7fzd2bIdBKmepfP2S1svPY= github.com/ash3in/uuidv8 v1.2.0 h1:2oogGdtCPwaVtyvPPGin4TfZLtOGE5F+W++E880G6SI= github.com/ash3in/uuidv8 v1.2.0/go.mod h1:BnU0wJBxnzdEKmVg4xckBkD+VZuecTFTUP3M0dWgyY4= -github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/cilium/ebpf v0.16.0 h1:+BiEnHL6Z7lXnlGUsXQPPAE7+kenAd4ES8MQ5min0Ok= +github.com/cilium/ebpf v0.16.0/go.mod h1:L7u2Blt2jMM/vLAVgjxluxtBKlz3/GWjB0dMOEngfwE= +github.com/containerd/cgroups/v3 v3.0.4 h1:2fs7l3P0Qxb1nKWuJNFiwhp2CqiKzho71DQkDrHJIo4= +github.com/containerd/cgroups/v3 v3.0.4/go.mod h1:SA5DLYnXO8pTGYiAHXz94qvLQTKfVM5GEVisn4jpins= +github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I= +github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo= +github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs= +github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= +github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= +github.com/go-quicktest/qt v1.101.0 h1:O1K29Txy5P2OK0dGo59b7b0LR6wKfIhttaAhHUyn7eI= +github.com/go-quicktest/qt v1.101.0/go.mod h1:14Bz/f7NwaXPtdYEgzsx46kqSxVwTbzVZsDC26tQJow= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk= +github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/josharian/native v1.1.0 h1:uuaP0hAbW7Y4l0ZRQ6C9zfb7Mg1mbFKry/xzDAfmtLA= +github.com/josharian/native v1.1.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w= +github.com/jsimonetti/rtnetlink/v2 v2.0.1 h1:xda7qaHDSVOsADNouv7ukSuicKZO7GgVUCXxpaIEIlM= +github.com/jsimonetti/rtnetlink/v2 v2.0.1/go.mod h1:7MoNYNbb3UaDHtF8udiJo/RH6VsTKP1pqKLUTVCvToE= github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co= github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= @@ -19,14 +43,31 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/matoous/go-nanoid v1.5.1 h1:aCjdvTyO9LLnTIi0fgdXhOPPvOHjpXN6Ik9DaNjIct4= github.com/matoous/go-nanoid v1.5.1/go.mod h1:zyD2a71IubI24efhpvkJz+ZwfwagzgSO6UNiFsZKN7U= +github.com/mdlayher/netlink v1.7.2 h1:/UtM3ofJap7Vl4QWCPDGXY8d3GIY2UGSDbK+QWmY8/g= +github.com/mdlayher/netlink v1.7.2/go.mod h1:xraEF7uJbxLhc5fpHL4cPe221LI2bdttWlU+ZGLfQSw= +github.com/mdlayher/socket v0.4.1 h1:eM9y2/jlbs1M615oshPQOHZzj6R6wMT7bX5NPiQvn2U= +github.com/mdlayher/socket v0.4.1/go.mod h1:cAqeGjoufqdxWkD7DkpyS+wcefOtmu5OQ8KuoJGIReA= +github.com/moby/sys/userns v0.1.0 h1:tVLXkFOxVu9A64/yh59slHVv9ahO9UIev4JZusOLG/g= +github.com/moby/sys/userns v0.1.0/go.mod h1:IHUYgu/kao6N8YZlp9Cf444ySSvCmDlmzUcYfDHOl28= +github.com/opencontainers/runtime-spec v1.2.0 h1:z97+pHb3uELt/yiAWD691HNHQIF07bE7dzrbT927iTk= +github.com/opencontainers/runtime-spec v1.2.0/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= +github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0= +github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y= github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= -github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= +github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 h1:G/FZtUu7a6NTWl3KUHMV9jkLAh/Rvtf03NWMHaEDl+E= +github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/twmb/franz-go v1.20.4 h1:1wTvyLTOxS0oJh5ro/DVt2JHVdx7/kGNtmtFhbcr0O0= @@ -39,14 +80,29 @@ github.com/twmb/franz-go/pkg/kmsg v1.12.0 h1:CbatD7ers1KzDNgJqPbKOq0Bz/WLBdsTH75 github.com/twmb/franz-go/pkg/kmsg v1.12.0/go.mod h1:+DPt4NC8RmI6hqb8G09+3giKObE6uD2Eya6CfqBpeJY= go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8= go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM= +go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= +go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.unistack.org/micro-codec-jsonpb/v3 v3.10.6 h1:9vZGETgEqysuz+ZPyph772uTHaQbRFkAwR/3Zr1+ckk= +go.unistack.org/micro-codec-jsonpb/v3 v3.10.6/go.mod h1:sUXFkjeDz7FE+Zj2zEbVROMxhxIdQnp0VslUylZSzaY= +go.unistack.org/micro-codec-proto/v3 v3.10.3 h1:Olny5WDBl+RhrzEPzm29wtCK/a1d4Ife2hkL2jR5MZs= +go.unistack.org/micro-codec-proto/v3 v3.10.3/go.mod h1:tMt+0xZAl7rnoLEs13TIN6ycG1s8DlTGTN8m79sapQY= go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q= go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo= +go.unistack.org/micro-wrapper-requestid/v3 v3.9.4 h1:P3guOZmD10fW0OT7ZiqAXD95WgYyb8WpAZ9MLTmfBiU= +go.unistack.org/micro-wrapper-requestid/v3 v3.9.4/go.mod h1:+2+dT3tqgUCmQL63VGqtUrCz30uXc2G3MQHaOvVDeLI= go.unistack.org/micro/v3 v3.11.48 h1:lHJYSHU2z1TTcuswItGwG7cZXN6n04EFqY7lk/0gA7w= go.unistack.org/micro/v3 v3.11.48/go.mod h1:fDQ8Mu9wubaFP0L8hNQlpzHiEnWN0wbOlawN9HYo0N4= golang.org/x/crypto v0.47.0 h1:V6e3FRj+n4dbpw86FJ8Fv7XVOql7TEwpHapKoMJ/GO8= golang.org/x/crypto v0.47.0/go.mod h1:ff3Y9VzzKbwSSEzWqJsJVBnWmRwRSHt/6Op5n9bQc4A= +golang.org/x/exp v0.0.0-20241210194714-1829a127f884 h1:Y/Mj/94zIQQGHVSv1tTtQBDaQaJe62U9bkDZKKyhPCU= +golang.org/x/exp v0.0.0-20241210194714-1829a127f884/go.mod h1:qj5a5QZpwLU2NLQudwIN5koi3beDhSAlJwa67PuM98c= golang.org/x/net v0.48.0 h1:zyQRTTrjc33Lhh0fBgT/H3oZq9WuvRR5gPC70xpDiQU= golang.org/x/net v0.48.0/go.mod h1:+ndRgGjkh8FGtu1w1FGbEC31if4VrNVMuKTgcAAnQRY= +golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw= +golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ= golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE= @@ -60,5 +116,6 @@ google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/kgo_test.go b/kgo_test.go index 2a208c3..5c34a9f 100644 --- a/kgo_test.go +++ b/kgo_test.go @@ -2,6 +2,8 @@ package kgo_test import ( "context" + "crypto/tls" + "fmt" "os" "sync/atomic" "testing" @@ -11,9 +13,15 @@ import ( "github.com/twmb/franz-go/pkg/kfake" kg "github.com/twmb/franz-go/pkg/kgo" kgo "go.unistack.org/micro-broker-kgo/v3" + jsonpb "go.unistack.org/micro-codec-jsonpb/v3" + proto "go.unistack.org/micro-codec-proto/v3" + requestid "go.unistack.org/micro-wrapper-requestid/v3" + "go.unistack.org/micro/v3" "go.unistack.org/micro/v3/broker" + "go.unistack.org/micro/v3/client" "go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/metadata" + "go.unistack.org/micro/v3/server" ) var ( @@ -32,24 +40,92 @@ func TestMain(m *testing.M) { os.Exit(m.Run()) } -func helperCreateBroker() *kgo.Broker { +func helperCreateBroker(t *testing.T) *kgo.Broker { + t.Helper() b := kgo.NewBroker( broker.Addrs(cluster.ListenAddrs()...), kgo.CommitInterval(5*time.Second), kgo.Options(kg.ClientID("test"), kg.FetchMaxBytes(10*1024*1024), kg.AllowAutoTopicCreation(), ), + kgo.CommitInterval(1*time.Second), + broker.ErrorHandler(func(event broker.Event) error { + msg := event.Message() + log := logger.DefaultLogger. + Fields("topic", event.Topic(), + "header", msg.Header, + "body", msg.Body) + err := event.Ack() + + if err != nil { + log.Fields("ack_error", err).Error(context.Background(), fmt.Sprintf("brokerHandlerErr: Ack error | %v", event.Error())) + return err + } + + log.Error(context.Background(), fmt.Sprintf("brokerHandlerErr: %v", event.Error())) + + return nil + }), ) return b } +func helperCreateService(t *testing.T, ctx context.Context, b *kgo.Broker) micro.Service { + t.Helper() + + rh := requestid.NewHook() + + sopts := []server.Option{ + server.Name("test"), + server.Version("latest"), + server.Context(ctx), + server.Codec("application/json", jsonpb.NewCodec()), + server.Codec("application/protobuf", proto.NewCodec()), + server.Codec("application/grpc", proto.NewCodec()), + server.Codec("application/grpc+proto", proto.NewCodec()), + server.Codec("application/grpc+json", jsonpb.NewCodec()), + server.Broker(b), + server.Hooks( + server.HookHandler(rh.ServerHandler), + server.HookSubHandler(rh.ServerSubscriber), + ), + } + + copts := []client.Option{ + client.Codec("application/json", jsonpb.NewCodec()), + client.Codec("application/protobuf", proto.NewCodec()), + client.Codec("application/grpc", proto.NewCodec()), + client.Codec("application/grpc+proto", proto.NewCodec()), + client.Codec("application/grpc+json", jsonpb.NewCodec()), + client.ContentType("application/grpc"), + client.Retries(0), + client.TLSConfig(&tls.Config{InsecureSkipVerify: true}), + client.Broker(b), + client.Hooks( + client.HookPublish(rh.ClientPublish), + client.HookCall(rh.ClientCall), + client.HookBatchPublish(rh.ClientBatchPublish), + client.HookStream(rh.ClientStream), + ), + } + + return micro.NewService( + micro.Server(server.NewServer(sopts...)), + micro.Client(client.NewClient(copts...)), + micro.Broker(b), + micro.Context(ctx), + micro.Name("test"), + micro.Version("latest"), + ) +} + func TestFail(t *testing.T) { ctx := context.Background() err := logger.DefaultLogger.Init(logger.WithLevel(loglevel)) require.Nil(t, err) - b := helperCreateBroker() + b := helperCreateBroker(t) t.Logf("broker init") require.Nil(t, b.Init()) @@ -109,7 +185,7 @@ func TestFail(t *testing.T) { func TestConnect(t *testing.T) { ctx := context.TODO() - b := helperCreateBroker() + b := helperCreateBroker(t) require.Nil(t, b.Init()) @@ -121,7 +197,7 @@ func TestPubSub(t *testing.T) { err := logger.DefaultLogger.Init(logger.WithLevel(loglevel)) require.Nil(t, err) - b := helperCreateBroker() + b := helperCreateBroker(t) require.Nil(t, b.Init()) require.Nil(t, b.Connect(ctx)) diff --git a/subscriber.go b/subscriber.go index e665cec..0900c74 100644 --- a/subscriber.go +++ b/subscriber.go @@ -139,7 +139,11 @@ func (s *Subscriber) poll(ctx context.Context) { fetches.EachPartition(func(p kgo.FetchTopicPartition) { tps := tp{p.Topic, p.Partition} - s.consumers[tps].recs <- p + s.mu.Lock() + if c := s.consumers[tps]; c != nil { + c.recs <- p + } + s.mu.Unlock() }) s.c.AllowRebalance() } @@ -157,6 +161,7 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32) pc, ok := s.consumers[tps] if ok { delete(s.consumers, tps) + close(pc.quit) } s.mu.Unlock() if !ok || pc == nil { @@ -167,8 +172,6 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32) s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] killing consumer topic %s partition %d", topic, partition)) } - close(pc.quit) - wg.Add(1) go func(c *consumer, t string, p int32) { defer wg.Done() diff --git a/tombstone_test.go b/tombstone_test.go new file mode 100644 index 0000000..f465708 --- /dev/null +++ b/tombstone_test.go @@ -0,0 +1,109 @@ +package kgo_test + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" + kg "github.com/twmb/franz-go/pkg/kgo" + "go.unistack.org/micro/v3" + "go.unistack.org/micro/v3/codec" + "go.unistack.org/micro/v3/logger" + "go.unistack.org/micro/v3/server" +) + +func TestSubscriberHandlesTombstoneMessages(t *testing.T) { + ctx := context.Background() + err := logger.DefaultLogger.Init(logger.WithLevel(loglevel)) + require.Nil(t, err) + + cl := cluster + _ = cl + + b := helperCreateBroker(t) + require.Nil(t, b.Init()) + require.Nil(t, b.Connect(ctx)) + defer func() { + require.Nil(t, b.Disconnect(ctx)) + }() + + svc := helperCreateService(t, ctx, b) + require.Nil(t, svc.Init()) + + var ( + msgsPublished = 1000 + done = make(chan bool, 1) + counterMsgs = atomic.Int64{} + topicName = "test.tombstone.topic" + + // INCORRECT: func(ctx context.Context, req any) error + fnHandler = func(ctx context.Context, req *codec.Frame) error { + counterMsgs.Add(1) + return nil + } + ) + + err = micro.RegisterSubscriber( + topicName, + svc.Server(), + fnHandler, + server.SubscriberQueue("queue"), + server.SubscriberAck(true), + server.SubscriberBodyOnly(true), + ) + require.Nil(t, err) + + { // PUBLISH tombstones via franz-go client + client, err := kg.NewClient( + kg.SeedBrokers(cluster.ListenAddrs()...), + kg.AllowAutoTopicCreation(), + ) + require.Nil(t, err) + defer client.Close() + + var records []*kg.Record + for i := 0; i < msgsPublished; i++ { + records = append(records, &kg.Record{ + Topic: topicName, + Key: []byte("tombstone-key"), + }) + } + + results := client.ProduceSync(ctx, records...) + for _, r := range results { + require.Nil(t, r.Err) + } + } + + require.Nil(t, svc.Start()) + + go func() { + require.Nil(t, svc.Run()) + }() + + ticker := time.NewTicker(2 * time.Minute) + defer ticker.Stop() + + pticker := time.NewTicker(1 * time.Second) + defer pticker.Stop() + + go func() { + for { + select { + case <-pticker.C: + if prc := counterMsgs.Load(); prc == int64(msgsPublished) { + t.Log("everything is read:", prc) + close(done) + } else { + t.Logf("processed %v of %v\n", prc, msgsPublished) + } + case <-ticker.C: + close(done) + } + } + }() + <-done + require.Nil(t, svc.Stop()) +}