v3_tombstone_panic #158

Open
devstigneev wants to merge 2 commits from devstigneev/micro-broker-kgo:v3_tombstone_panic into v3
6 changed files with 376 additions and 147 deletions

30
go.mod
View File

@@ -2,31 +2,47 @@ module go.unistack.org/micro-broker-kgo/v3
go 1.24.0 go 1.24.0
toolchain go1.24.3
require ( require (
github.com/google/uuid v1.6.0 github.com/google/uuid v1.6.0
github.com/stretchr/testify v1.11.1 github.com/stretchr/testify v1.11.1
github.com/twmb/franz-go v1.20.2 github.com/twmb/franz-go v1.20.4
github.com/twmb/franz-go/pkg/kadm v1.17.1 github.com/twmb/franz-go/pkg/kadm v1.17.1
github.com/twmb/franz-go/pkg/kfake v0.0.0-20251227070528-0c71f7e25fa1
github.com/twmb/franz-go/pkg/kmsg v1.12.0 github.com/twmb/franz-go/pkg/kmsg v1.12.0
go.opentelemetry.io/otel v1.38.0 go.opentelemetry.io/otel v1.38.0
go.unistack.org/micro-codec-jsonpb/v3 v3.10.6
go.unistack.org/micro-codec-proto/v3 v3.10.3
go.unistack.org/micro-wrapper-requestid/v3 v3.9.4
go.unistack.org/micro/v3 v3.11.48 go.unistack.org/micro/v3 v3.11.48
) )
require ( require (
dario.cat/mergo v1.0.1 // indirect
github.com/KimMachineGun/automemlimit v0.6.1 // indirect
github.com/ash3in/uuidv8 v1.2.0 // indirect github.com/ash3in/uuidv8 v1.2.0 // indirect
github.com/cilium/ebpf v0.16.0 // indirect
github.com/containerd/cgroups/v3 v3.0.4 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/klauspost/compress v1.18.1 // indirect github.com/klauspost/compress v1.18.1 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/matoous/go-nanoid v1.5.1 // indirect github.com/matoous/go-nanoid v1.5.1 // indirect
github.com/moby/sys/userns v0.1.0 // indirect
github.com/opencontainers/runtime-spec v1.2.0 // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
go.uber.org/automaxprocs v1.6.0 // indirect
go.unistack.org/micro-proto/v3 v3.4.1 // indirect go.unistack.org/micro-proto/v3 v3.4.1 // indirect
golang.org/x/crypto v0.43.0 // indirect golang.org/x/crypto v0.47.0 // indirect
golang.org/x/sys v0.37.0 // indirect golang.org/x/exp v0.0.0-20241210194714-1829a127f884 // indirect
golang.org/x/sys v0.40.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20251022142026-3a174f9686a8 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20251022142026-3a174f9686a8 // indirect
google.golang.org/grpc v1.76.0 // indirect google.golang.org/grpc v1.76.0 // indirect
google.golang.org/protobuf v1.36.10 // indirect google.golang.org/protobuf v1.36.11 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
) )

101
go.sum
View File

@@ -1,18 +1,40 @@
dario.cat/mergo v1.0.1 h1:Ra4+bf83h2ztPIQYNP99R6m+Y7KfnARDfID+a+vLl4s=
dario.cat/mergo v1.0.1/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
github.com/KimMachineGun/automemlimit v0.6.1 h1:ILa9j1onAAMadBsyyUJv5cack8Y1WT26yLj/V+ulKp8=
github.com/KimMachineGun/automemlimit v0.6.1/go.mod h1:T7xYht7B8r6AG/AqFcUdc7fzd2bIdBKmepfP2S1svPY=
github.com/ash3in/uuidv8 v1.2.0 h1:2oogGdtCPwaVtyvPPGin4TfZLtOGE5F+W++E880G6SI= github.com/ash3in/uuidv8 v1.2.0 h1:2oogGdtCPwaVtyvPPGin4TfZLtOGE5F+W++E880G6SI=
github.com/ash3in/uuidv8 v1.2.0/go.mod h1:BnU0wJBxnzdEKmVg4xckBkD+VZuecTFTUP3M0dWgyY4= github.com/ash3in/uuidv8 v1.2.0/go.mod h1:BnU0wJBxnzdEKmVg4xckBkD+VZuecTFTUP3M0dWgyY4=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/cilium/ebpf v0.16.0 h1:+BiEnHL6Z7lXnlGUsXQPPAE7+kenAd4ES8MQ5min0Ok=
github.com/cilium/ebpf v0.16.0/go.mod h1:L7u2Blt2jMM/vLAVgjxluxtBKlz3/GWjB0dMOEngfwE=
github.com/containerd/cgroups/v3 v3.0.4 h1:2fs7l3P0Qxb1nKWuJNFiwhp2CqiKzho71DQkDrHJIo4=
github.com/containerd/cgroups/v3 v3.0.4/go.mod h1:SA5DLYnXO8pTGYiAHXz94qvLQTKfVM5GEVisn4jpins=
github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I=
github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo=
github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/go-quicktest/qt v1.101.0 h1:O1K29Txy5P2OK0dGo59b7b0LR6wKfIhttaAhHUyn7eI=
github.com/go-quicktest/qt v1.101.0/go.mod h1:14Bz/f7NwaXPtdYEgzsx46kqSxVwTbzVZsDC26tQJow=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk=
github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/josharian/native v1.1.0 h1:uuaP0hAbW7Y4l0ZRQ6C9zfb7Mg1mbFKry/xzDAfmtLA=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/josharian/native v1.1.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w=
github.com/jsimonetti/rtnetlink/v2 v2.0.1 h1:xda7qaHDSVOsADNouv7ukSuicKZO7GgVUCXxpaIEIlM=
github.com/jsimonetti/rtnetlink/v2 v2.0.1/go.mod h1:7MoNYNbb3UaDHtF8udiJo/RH6VsTKP1pqKLUTVCvToE=
github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co= github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co=
github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0= github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
@@ -21,54 +43,79 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/matoous/go-nanoid v1.5.1 h1:aCjdvTyO9LLnTIi0fgdXhOPPvOHjpXN6Ik9DaNjIct4= github.com/matoous/go-nanoid v1.5.1 h1:aCjdvTyO9LLnTIi0fgdXhOPPvOHjpXN6Ik9DaNjIct4=
github.com/matoous/go-nanoid v1.5.1/go.mod h1:zyD2a71IubI24efhpvkJz+ZwfwagzgSO6UNiFsZKN7U= github.com/matoous/go-nanoid v1.5.1/go.mod h1:zyD2a71IubI24efhpvkJz+ZwfwagzgSO6UNiFsZKN7U=
github.com/mdlayher/netlink v1.7.2 h1:/UtM3ofJap7Vl4QWCPDGXY8d3GIY2UGSDbK+QWmY8/g=
github.com/mdlayher/netlink v1.7.2/go.mod h1:xraEF7uJbxLhc5fpHL4cPe221LI2bdttWlU+ZGLfQSw=
github.com/mdlayher/socket v0.4.1 h1:eM9y2/jlbs1M615oshPQOHZzj6R6wMT7bX5NPiQvn2U=
github.com/mdlayher/socket v0.4.1/go.mod h1:cAqeGjoufqdxWkD7DkpyS+wcefOtmu5OQ8KuoJGIReA=
github.com/moby/sys/userns v0.1.0 h1:tVLXkFOxVu9A64/yh59slHVv9ahO9UIev4JZusOLG/g=
github.com/moby/sys/userns v0.1.0/go.mod h1:IHUYgu/kao6N8YZlp9Cf444ySSvCmDlmzUcYfDHOl28=
github.com/opencontainers/runtime-spec v1.2.0 h1:z97+pHb3uELt/yiAWD691HNHQIF07bE7dzrbT927iTk=
github.com/opencontainers/runtime-spec v1.2.0/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y=
github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU=
github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 h1:G/FZtUu7a6NTWl3KUHMV9jkLAh/Rvtf03NWMHaEDl+E=
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/twmb/franz-go v1.19.5 h1:W7+o8D0RsQsedqib71OVlLeZ0zI6CbFra7yTYhZTs5Y= github.com/twmb/franz-go v1.20.4 h1:1wTvyLTOxS0oJh5ro/DVt2JHVdx7/kGNtmtFhbcr0O0=
github.com/twmb/franz-go v1.19.5/go.mod h1:4kFJ5tmbbl7asgwAGVuyG1ZMx0NNpYk7EqflvWfPCpM= github.com/twmb/franz-go v1.20.4/go.mod h1:YCnepDd4gl6vdzG03I5Wa57RnCTIC6DVEyMpDX/J8UA=
github.com/twmb/franz-go v1.20.2 h1:CiwhyKZHW6vqSHJkh+RTxFAJkio0jBjM/JQhx/HZ72A=
github.com/twmb/franz-go v1.20.2/go.mod h1:YCnepDd4gl6vdzG03I5Wa57RnCTIC6DVEyMpDX/J8UA=
github.com/twmb/franz-go/pkg/kadm v1.16.1 h1:IEkrhTljgLHJ0/hT/InhXGjPdmWfFvxp7o/MR7vJ8cw=
github.com/twmb/franz-go/pkg/kadm v1.16.1/go.mod h1:Ue/ye1cc9ipsQFg7udFbbGiFNzQMqiH73fGC2y0rwyc=
github.com/twmb/franz-go/pkg/kadm v1.17.1 h1:Bt02Y/RLgnFO2NP2HVP1kd2TFtGRiJZx+fSArjZDtpw= github.com/twmb/franz-go/pkg/kadm v1.17.1 h1:Bt02Y/RLgnFO2NP2HVP1kd2TFtGRiJZx+fSArjZDtpw=
github.com/twmb/franz-go/pkg/kadm v1.17.1/go.mod h1:s4duQmrDbloVW9QTMXhs6mViTepze7JLG43xwPcAeTg= github.com/twmb/franz-go/pkg/kadm v1.17.1/go.mod h1:s4duQmrDbloVW9QTMXhs6mViTepze7JLG43xwPcAeTg=
github.com/twmb/franz-go/pkg/kmsg v1.11.2 h1:hIw75FpwcAjgeyfIGFqivAvwC5uNIOWRGvQgZhH4mhg= github.com/twmb/franz-go/pkg/kfake v0.0.0-20251227070528-0c71f7e25fa1 h1:z21wmCm1zk2DZoi3khCD3OWA05FVj1fsEJOBIxpIJH0=
github.com/twmb/franz-go/pkg/kmsg v1.11.2/go.mod h1:CFfkkLysDNmukPYhGzuUcDtf46gQSqCZHMW1T4Z+wDE= github.com/twmb/franz-go/pkg/kfake v0.0.0-20251227070528-0c71f7e25fa1/go.mod h1:2W79ILYghTbIIi4y4j0k3PmV2mCxWoj6D7PtQlZmH3E=
github.com/twmb/franz-go/pkg/kmsg v1.12.0 h1:CbatD7ers1KzDNgJqPbKOq0Bz/WLBdsTH75wgzeVaPc= github.com/twmb/franz-go/pkg/kmsg v1.12.0 h1:CbatD7ers1KzDNgJqPbKOq0Bz/WLBdsTH75wgzeVaPc=
github.com/twmb/franz-go/pkg/kmsg v1.12.0/go.mod h1:+DPt4NC8RmI6hqb8G09+3giKObE6uD2Eya6CfqBpeJY= github.com/twmb/franz-go/pkg/kmsg v1.12.0/go.mod h1:+DPt4NC8RmI6hqb8G09+3giKObE6uD2Eya6CfqBpeJY=
go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8= go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8=
go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM= go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM=
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.unistack.org/micro-codec-jsonpb/v3 v3.10.6 h1:9vZGETgEqysuz+ZPyph772uTHaQbRFkAwR/3Zr1+ckk=
go.unistack.org/micro-codec-jsonpb/v3 v3.10.6/go.mod h1:sUXFkjeDz7FE+Zj2zEbVROMxhxIdQnp0VslUylZSzaY=
go.unistack.org/micro-codec-proto/v3 v3.10.3 h1:Olny5WDBl+RhrzEPzm29wtCK/a1d4Ife2hkL2jR5MZs=
go.unistack.org/micro-codec-proto/v3 v3.10.3/go.mod h1:tMt+0xZAl7rnoLEs13TIN6ycG1s8DlTGTN8m79sapQY=
go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q= go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q=
go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo= go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo=
go.unistack.org/micro/v3 v3.11.45 h1:fjTLZYWgsVf9FIMZBxOg8ios2/tmyimnjZrsrxEUeXU= go.unistack.org/micro-wrapper-requestid/v3 v3.9.4 h1:P3guOZmD10fW0OT7ZiqAXD95WgYyb8WpAZ9MLTmfBiU=
go.unistack.org/micro/v3 v3.11.45/go.mod h1:fDQ8Mu9wubaFP0L8hNQlpzHiEnWN0wbOlawN9HYo0N4= go.unistack.org/micro-wrapper-requestid/v3 v3.9.4/go.mod h1:+2+dT3tqgUCmQL63VGqtUrCz30uXc2G3MQHaOvVDeLI=
go.unistack.org/micro/v3 v3.11.48 h1:lHJYSHU2z1TTcuswItGwG7cZXN6n04EFqY7lk/0gA7w= go.unistack.org/micro/v3 v3.11.48 h1:lHJYSHU2z1TTcuswItGwG7cZXN6n04EFqY7lk/0gA7w=
go.unistack.org/micro/v3 v3.11.48/go.mod h1:fDQ8Mu9wubaFP0L8hNQlpzHiEnWN0wbOlawN9HYo0N4= go.unistack.org/micro/v3 v3.11.48/go.mod h1:fDQ8Mu9wubaFP0L8hNQlpzHiEnWN0wbOlawN9HYo0N4=
golang.org/x/crypto v0.43.0 h1:dduJYIi3A3KOfdGOHX8AVZ/jGiyPa3IbBozJ5kNuE04= golang.org/x/crypto v0.47.0 h1:V6e3FRj+n4dbpw86FJ8Fv7XVOql7TEwpHapKoMJ/GO8=
golang.org/x/crypto v0.43.0/go.mod h1:BFbav4mRNlXJL4wNeejLpWxB7wMbc79PdRGhWKncxR0= golang.org/x/crypto v0.47.0/go.mod h1:ff3Y9VzzKbwSSEzWqJsJVBnWmRwRSHt/6Op5n9bQc4A=
golang.org/x/net v0.45.0 h1:RLBg5JKixCy82FtLJpeNlVM0nrSqpCRYzVU1n8kj0tM= golang.org/x/exp v0.0.0-20241210194714-1829a127f884 h1:Y/Mj/94zIQQGHVSv1tTtQBDaQaJe62U9bkDZKKyhPCU=
golang.org/x/net v0.45.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY= golang.org/x/exp v0.0.0-20241210194714-1829a127f884/go.mod h1:qj5a5QZpwLU2NLQudwIN5koi3beDhSAlJwa67PuM98c=
golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ= golang.org/x/net v0.48.0 h1:zyQRTTrjc33Lhh0fBgT/H3oZq9WuvRR5gPC70xpDiQU=
golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/net v0.48.0/go.mod h1:+ndRgGjkh8FGtu1w1FGbEC31if4VrNVMuKTgcAAnQRY=
golang.org/x/text v0.30.0 h1:yznKA/E9zq54KzlzBEAWn1NXSQ8DIp/NYMy88xJjl4k= golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw=
golang.org/x/text v0.30.0/go.mod h1:yDdHFIX9t+tORqspjENWgzaCVXgk0yYnYuSZ8UzzBVM= golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
google.golang.org/genproto/googleapis/rpc v0.0.0-20251007200510-49b9836ed3ff h1:A90eA31Wq6HOMIQlLfzFwzqGKBTuaVztYu/g8sn+8Zc= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
google.golang.org/genproto/googleapis/rpc v0.0.0-20251007200510-49b9836ed3ff/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk= golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ=
golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE=
golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8=
google.golang.org/genproto/googleapis/rpc v0.0.0-20251022142026-3a174f9686a8 h1:M1rk8KBnUsBDg1oPGHNCxG4vc1f49epmTO7xscSajMk= google.golang.org/genproto/googleapis/rpc v0.0.0-20251022142026-3a174f9686a8 h1:M1rk8KBnUsBDg1oPGHNCxG4vc1f49epmTO7xscSajMk=
google.golang.org/genproto/googleapis/rpc v0.0.0-20251022142026-3a174f9686a8/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk= google.golang.org/genproto/googleapis/rpc v0.0.0-20251022142026-3a174f9686a8/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk=
google.golang.org/grpc v1.76.0 h1:UnVkv1+uMLYXoIz6o7chp59WfQUYA2ex/BXQ9rHZu7A= google.golang.org/grpc v1.76.0 h1:UnVkv1+uMLYXoIz6o7chp59WfQUYA2ex/BXQ9rHZu7A=
google.golang.org/grpc v1.76.0/go.mod h1:Ju12QI8M6iQJtbcsV+awF5a4hfJMLi4X0JLo94ULZ6c= google.golang.org/grpc v1.76.0/go.mod h1:Ju12QI8M6iQJtbcsV+awF5a4hfJMLi4X0JLo94ULZ6c=
google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE= google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -2,79 +2,149 @@ package kgo_test
import ( import (
"context" "context"
"crypto/tls"
"fmt"
"os" "os"
"strings"
"sync/atomic" "sync/atomic"
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kfake"
kg "github.com/twmb/franz-go/pkg/kgo" kg "github.com/twmb/franz-go/pkg/kgo"
kgo "go.unistack.org/micro-broker-kgo/v3" kgo "go.unistack.org/micro-broker-kgo/v3"
jsonpb "go.unistack.org/micro-codec-jsonpb/v3"
proto "go.unistack.org/micro-codec-proto/v3"
requestid "go.unistack.org/micro-wrapper-requestid/v3"
"go.unistack.org/micro/v3"
"go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/client"
"go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/logger/slog"
"go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/server"
) )
var ( var (
msgcnt = int64(10) msgcnt = int64(10)
group = "38" group = "38"
prefill = true prefill = true
loglevel = logger.DebugLevel loglevel = logger.ErrorLevel
cluster *kfake.Cluster
) )
var bm = &broker.Message{ func TestMain(m *testing.M) {
Header: map[string]string{"hkey": "hval", metadata.HeaderTopic: "test"}, cluster = kfake.MustCluster(
Body: []byte(`"body"`), kfake.AllowAutoTopicCreation(),
)
defer cluster.Close()
os.Exit(m.Run())
} }
func TestFail(t *testing.T) { func helperCreateBroker(t *testing.T) *kgo.Broker {
if tr := os.Getenv("INTEGRATION_TESTS"); len(tr) > 0 { t.Helper()
t.Skip()
}
logger.DefaultLogger = slog.NewLogger()
if err := logger.DefaultLogger.Init(logger.WithLevel(loglevel)); err != nil {
t.Fatal(err)
}
ctx := context.Background()
var addrs []string
if addr := os.Getenv("BROKER_ADDRS"); len(addr) == 0 {
addrs = []string{"127.0.0.1:9092"}
} else {
addrs = strings.Split(addr, ",")
}
b := kgo.NewBroker( b := kgo.NewBroker(
broker.Addrs(addrs...), broker.Addrs(cluster.ListenAddrs()...),
kgo.CommitInterval(5*time.Second), kgo.CommitInterval(5*time.Second),
kgo.Options(kg.ClientID("test"), kg.FetchMaxBytes(10*1024*1024), kgo.Options(kg.ClientID("test"), kg.FetchMaxBytes(10*1024*1024),
kg.AllowAutoTopicCreation(), kg.AllowAutoTopicCreation(),
), ),
kgo.CommitInterval(1*time.Second),
broker.ErrorHandler(func(event broker.Event) error {
msg := event.Message()
log := logger.DefaultLogger.
Fields("topic", event.Topic(),
"header", msg.Header,
"body", msg.Body)
err := event.Ack()
if err != nil {
log.Fields("ack_error", err).Error(context.Background(), fmt.Sprintf("brokerHandlerErr: Ack error | %v", event.Error()))
return err
}
log.Error(context.Background(), fmt.Sprintf("brokerHandlerErr: %v", event.Error()))
return nil
}),
) )
t.Logf("broker init") return b
if err := b.Init(); err != nil { }
t.Fatal(err)
func helperCreateService(t *testing.T, ctx context.Context, b *kgo.Broker) micro.Service {
t.Helper()
rh := requestid.NewHook()
sopts := []server.Option{
server.Name("test"),
server.Version("latest"),
server.Context(ctx),
server.Codec("application/json", jsonpb.NewCodec()),
server.Codec("application/protobuf", proto.NewCodec()),
server.Codec("application/grpc", proto.NewCodec()),
server.Codec("application/grpc+proto", proto.NewCodec()),
server.Codec("application/grpc+json", jsonpb.NewCodec()),
server.Broker(b),
server.Hooks(
server.HookHandler(rh.ServerHandler),
server.HookSubHandler(rh.ServerSubscriber),
),
} }
t.Logf("broker connect") copts := []client.Option{
if err := b.Connect(ctx); err != nil { client.Codec("application/json", jsonpb.NewCodec()),
t.Fatal(err) client.Codec("application/protobuf", proto.NewCodec()),
client.Codec("application/grpc", proto.NewCodec()),
client.Codec("application/grpc+proto", proto.NewCodec()),
client.Codec("application/grpc+json", jsonpb.NewCodec()),
client.ContentType("application/grpc"),
client.Retries(0),
client.TLSConfig(&tls.Config{InsecureSkipVerify: true}),
client.Broker(b),
client.Hooks(
client.HookPublish(rh.ClientPublish),
client.HookCall(rh.ClientCall),
client.HookBatchPublish(rh.ClientBatchPublish),
client.HookStream(rh.ClientStream),
),
} }
return micro.NewService(
micro.Server(server.NewServer(sopts...)),
micro.Client(client.NewClient(copts...)),
micro.Broker(b),
micro.Context(ctx),
micro.Name("test"),
micro.Version("latest"),
)
}
func TestFail(t *testing.T) {
ctx := context.Background()
err := logger.DefaultLogger.Init(logger.WithLevel(loglevel))
require.Nil(t, err)
b := helperCreateBroker(t)
t.Logf("broker init")
require.Nil(t, b.Init())
t.Logf("broker connect")
require.Nil(t, b.Connect(ctx))
defer func() { defer func() {
t.Logf("broker disconnect") t.Logf("broker disconnect")
if err := b.Disconnect(ctx); err != nil { require.Nil(t, b.Disconnect(ctx))
t.Fatal(err)
}
}() }()
t.Logf("broker health %v", b.Health()) t.Logf("broker health %v", b.Health())
msgs := make([]*broker.Message, 0, msgcnt) msgs := make([]*broker.Message, 0, msgcnt)
for i := int64(0); i < msgcnt; i++ { for i := int64(0); i < msgcnt; i++ {
msgs = append(msgs, bm) msgs = append(msgs, &broker.Message{
Header: map[string]string{"hkey": "hval", metadata.HeaderTopic: "test"},
Body: []byte(`"body"`),
})
} }
for _, msg := range msgs { for _, msg := range msgs {
@@ -83,7 +153,6 @@ func TestFail(t *testing.T) {
break break
} }
} }
// t.Skip()
idx := int64(0) idx := int64(0)
fn := func(msg broker.Event) error { fn := func(msg broker.Event) error {
@@ -97,13 +166,11 @@ func TestFail(t *testing.T) {
broker.SubscribeAutoAck(true), broker.SubscribeAutoAck(true),
broker.SubscribeGroup(group), broker.SubscribeGroup(group),
broker.SubscribeBodyOnly(true)) broker.SubscribeBodyOnly(true))
if err != nil {
t.Fatal(err) require.Nil(t, err)
}
defer func() { defer func() {
if err := sub.Unsubscribe(ctx); err != nil { require.Nil(t, sub.Unsubscribe(ctx))
t.Fatal(err)
}
}() }()
for { for {
@@ -117,92 +184,54 @@ func TestFail(t *testing.T) {
} }
func TestConnect(t *testing.T) { func TestConnect(t *testing.T) {
if tr := os.Getenv("INTEGRATION_TESTS"); len(tr) > 0 {
t.Skip()
}
var addrs []string
ctx := context.TODO() ctx := context.TODO()
b := kgo.NewBroker( b := helperCreateBroker(t)
broker.Addrs(addrs...),
kgo.CommitInterval(5*time.Second),
kgo.Options(kg.ClientID("test"), kg.FetchMaxBytes(10*1024*1024)),
)
if err := b.Init(); err != nil {
t.Fatal(err)
}
if err := b.Connect(ctx); err != nil { require.Nil(t, b.Init())
t.Fatal(err)
} require.Nil(t, b.Connect(ctx))
} }
func TestPubSub(t *testing.T) { func TestPubSub(t *testing.T) {
if tr := os.Getenv("INTEGRATION_TESTS"); len(tr) > 0 {
t.Skip()
}
if err := logger.DefaultLogger.Init(logger.WithLevel(loglevel)); err != nil {
t.Fatal(err)
}
ctx := context.Background() ctx := context.Background()
err := logger.DefaultLogger.Init(logger.WithLevel(loglevel))
require.Nil(t, err)
var addrs []string b := helperCreateBroker(t)
if addr := os.Getenv("BROKER_ADDRS"); len(addr) == 0 {
addrs = []string{"127.0.0.1:29091", "127.0.0.2:29092", "127.0.0.3:29093"}
} else {
addrs = strings.Split(addr, ",")
}
b := kgo.NewBroker(
broker.Addrs(addrs...),
kgo.CommitInterval(5*time.Second),
kgo.Options(kg.ClientID("test"), kg.FetchMaxBytes(10*1024*1024)),
)
if err := b.Init(); err != nil {
t.Fatal(err)
}
if err := b.Connect(ctx); err != nil {
t.Fatal(err)
}
require.Nil(t, b.Init())
require.Nil(t, b.Connect(ctx))
defer func() { defer func() {
if err := b.Disconnect(ctx); err != nil { require.Nil(t, b.Disconnect(ctx))
t.Fatal(err)
}
}() }()
if prefill {
msgs := make([]*broker.Message, 0, msgcnt)
for i := int64(0); i < msgcnt; i++ {
msgs = append(msgs, bm)
}
if err := b.BatchPublish(ctx, msgs); err != nil { if prefill {
t.Fatal(err) var msgs []*broker.Message
for i := int64(0); i < msgcnt; i++ {
msgs = append(msgs, &broker.Message{
Header: map[string]string{"hkey": "hval", metadata.HeaderTopic: "test.pubsub"},
Body: []byte(`"body"`),
})
} }
// t.Skip() require.Nil(t, b.BatchPublish(ctx, msgs))
} }
done := make(chan bool, 1) done := make(chan bool, 1)
idx := int64(0) idx := int64(0)
fn := func(msg broker.Event) error { fn := func(msg broker.Event) error {
atomic.AddInt64(&idx, 1) atomic.AddInt64(&idx, 1)
// time.Sleep(200 * time.Millisecond)
return msg.Ack() return msg.Ack()
} }
sub, err := b.Subscribe(ctx, "test", fn, sub, err := b.Subscribe(ctx, "test.pubsub", fn,
broker.SubscribeAutoAck(true), broker.SubscribeAutoAck(true),
broker.SubscribeGroup(group), broker.SubscribeGroup(group),
broker.SubscribeBodyOnly(true)) broker.SubscribeBodyOnly(true),
if err != nil { )
t.Fatal(err)
} require.Nil(t, err)
defer func() { defer func() {
if err := sub.Unsubscribe(ctx); err != nil { require.Nil(t, sub.Unsubscribe(ctx))
t.Fatal(err)
}
}() }()
ticker := time.NewTicker(2 * time.Minute) ticker := time.NewTicker(2 * time.Minute)
@@ -210,14 +239,16 @@ func TestPubSub(t *testing.T) {
pticker := time.NewTicker(1 * time.Second) pticker := time.NewTicker(1 * time.Second)
defer pticker.Stop() defer pticker.Stop()
go func() { go func() {
for { for {
select { select {
case <-pticker.C: case <-pticker.C:
if prc := atomic.LoadInt64(&idx); prc == msgcnt { if prc := atomic.LoadInt64(&idx); prc == msgcnt {
t.Log("everything is read")
close(done) close(done)
} else { } else {
t.Logf("processed %v\n", prc) t.Logf("processed %v of %v\n", prc, msgcnt)
} }
case <-ticker.C: case <-ticker.C:
close(done) close(done)

View File

@@ -56,7 +56,7 @@ const (
labelNode = "node_id" labelNode = "node_id"
labelSuccess = "success" labelSuccess = "success"
labelFaulure = "failure" labelFailure = "failure"
labelStatus = "status" labelStatus = "status"
labelTopic = "topic" labelTopic = "topic"
) )
@@ -68,7 +68,7 @@ func (m *hookMeter) OnGroupManageError(_ error) {
func (m *hookMeter) OnBrokerConnect(meta kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) { func (m *hookMeter) OnBrokerConnect(meta kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) {
node := strconv.Itoa(int(meta.NodeID)) node := strconv.Itoa(int(meta.NodeID))
if err != nil { if err != nil {
m.meter.Counter(metricBrokerConnects, labelNode, node, labelStatus, labelFaulure).Inc() m.meter.Counter(metricBrokerConnects, labelNode, node, labelStatus, labelFailure).Inc()
return return
} }
m.meter.Counter(metricBrokerConnects, labelNode, node, labelStatus, labelSuccess).Inc() m.meter.Counter(metricBrokerConnects, labelNode, node, labelStatus, labelSuccess).Inc()

View File

@@ -139,7 +139,11 @@ func (s *Subscriber) poll(ctx context.Context) {
fetches.EachPartition(func(p kgo.FetchTopicPartition) { fetches.EachPartition(func(p kgo.FetchTopicPartition) {
tps := tp{p.Topic, p.Partition} tps := tp{p.Topic, p.Partition}
s.consumers[tps].recs <- p s.mu.Lock()
if c := s.consumers[tps]; c != nil {
c.recs <- p
}
s.mu.Unlock()
}) })
s.c.AllowRebalance() s.c.AllowRebalance()
} }
@@ -153,17 +157,39 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32)
for topic, partitions := range lost { for topic, partitions := range lost {
for _, partition := range partitions { for _, partition := range partitions {
tps := tp{topic, partition} tps := tp{topic, partition}
s.mu.Lock()
pc, ok := s.consumers[tps] pc, ok := s.consumers[tps]
if ok {
delete(s.consumers, tps)
close(pc.quit)
}
s.mu.Unlock()
if !ok || pc == nil { if !ok || pc == nil {
continue continue
} }
delete(s.consumers, tps)
close(pc.quit)
if s.kopts.Logger.V(logger.DebugLevel) { if s.kopts.Logger.V(logger.DebugLevel) {
s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] waiting for work to finish topic %s partition %d", topic, partition)) s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] killing consumer topic %s partition %d", topic, partition))
} }
wg.Add(1) wg.Add(1)
go func() { <-pc.done; wg.Done() }() go func(c *consumer, t string, p int32) {
defer wg.Done()
timeout := time.NewTimer(s.kopts.GracefulTimeout)
defer timeout.Stop()
select {
case <-c.done:
if s.kopts.Logger.V(logger.DebugLevel) {
s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] consumer stopped topic %s partition %d", t, p))
}
case <-timeout.C:
if s.kopts.Logger.V(logger.DebugLevel) {
s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] timeout waiting for consumer topic %s partition %d", t, p))
}
}
}(pc, topic, partition)
} }
} }
} }

109
tombstone_test.go Normal file
View File

@@ -0,0 +1,109 @@
package kgo_test
import (
"context"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/require"
kg "github.com/twmb/franz-go/pkg/kgo"
"go.unistack.org/micro/v3"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/server"
)
func TestSubscriberHandlesTombstoneMessages(t *testing.T) {
ctx := context.Background()
err := logger.DefaultLogger.Init(logger.WithLevel(loglevel))
require.Nil(t, err)
cl := cluster
_ = cl
b := helperCreateBroker(t)
require.Nil(t, b.Init())
require.Nil(t, b.Connect(ctx))
defer func() {
require.Nil(t, b.Disconnect(ctx))
}()
svc := helperCreateService(t, ctx, b)
require.Nil(t, svc.Init())
var (
msgsPublished = 1000
done = make(chan bool, 1)
counterMsgs = atomic.Int64{}
topicName = "test.tombstone.topic"
// INCORRECT: func(ctx context.Context, req any) error
fnHandler = func(ctx context.Context, req *codec.Frame) error {
counterMsgs.Add(1)
return nil
}
)
err = micro.RegisterSubscriber(
topicName,
svc.Server(),
fnHandler,
server.SubscriberQueue("queue"),
server.SubscriberAck(true),
server.SubscriberBodyOnly(true),
)
require.Nil(t, err)
{ // PUBLISH tombstones via franz-go client
client, err := kg.NewClient(
kg.SeedBrokers(cluster.ListenAddrs()...),
kg.AllowAutoTopicCreation(),
)
require.Nil(t, err)
defer client.Close()
var records []*kg.Record
for i := 0; i < msgsPublished; i++ {
records = append(records, &kg.Record{
Topic: topicName,
Key: []byte("tombstone-key"),
})
}
results := client.ProduceSync(ctx, records...)
for _, r := range results {
require.Nil(t, r.Err)
}
}
require.Nil(t, svc.Start())
go func() {
require.Nil(t, svc.Run())
}()
ticker := time.NewTicker(2 * time.Minute)
defer ticker.Stop()
pticker := time.NewTicker(1 * time.Second)
defer pticker.Stop()
go func() {
for {
select {
case <-pticker.C:
if prc := counterMsgs.Load(); prc == int64(msgsPublished) {
t.Log("everything is read:", prc)
close(done)
} else {
t.Logf("processed %v of %v\n", prc, msgsPublished)
}
case <-ticker.C:
close(done)
}
}
}()
<-done
require.Nil(t, svc.Stop())
}