v3_tombstone_panic #158
30
go.mod
30
go.mod
@@ -2,31 +2,47 @@ module go.unistack.org/micro-broker-kgo/v3
|
|||||||
|
|
||||||
go 1.24.0
|
go 1.24.0
|
||||||
|
|
||||||
toolchain go1.24.3
|
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/google/uuid v1.6.0
|
github.com/google/uuid v1.6.0
|
||||||
github.com/stretchr/testify v1.11.1
|
github.com/stretchr/testify v1.11.1
|
||||||
github.com/twmb/franz-go v1.20.2
|
github.com/twmb/franz-go v1.20.4
|
||||||
github.com/twmb/franz-go/pkg/kadm v1.17.1
|
github.com/twmb/franz-go/pkg/kadm v1.17.1
|
||||||
|
github.com/twmb/franz-go/pkg/kfake v0.0.0-20251227070528-0c71f7e25fa1
|
||||||
github.com/twmb/franz-go/pkg/kmsg v1.12.0
|
github.com/twmb/franz-go/pkg/kmsg v1.12.0
|
||||||
go.opentelemetry.io/otel v1.38.0
|
go.opentelemetry.io/otel v1.38.0
|
||||||
|
go.unistack.org/micro-codec-jsonpb/v3 v3.10.6
|
||||||
|
go.unistack.org/micro-codec-proto/v3 v3.10.3
|
||||||
|
go.unistack.org/micro-wrapper-requestid/v3 v3.9.4
|
||||||
go.unistack.org/micro/v3 v3.11.48
|
go.unistack.org/micro/v3 v3.11.48
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
dario.cat/mergo v1.0.1 // indirect
|
||||||
|
github.com/KimMachineGun/automemlimit v0.6.1 // indirect
|
||||||
github.com/ash3in/uuidv8 v1.2.0 // indirect
|
github.com/ash3in/uuidv8 v1.2.0 // indirect
|
||||||
|
github.com/cilium/ebpf v0.16.0 // indirect
|
||||||
|
github.com/containerd/cgroups/v3 v3.0.4 // indirect
|
||||||
|
github.com/containerd/log v0.1.0 // indirect
|
||||||
|
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
|
||||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
|
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
|
||||||
|
github.com/docker/go-units v0.5.0 // indirect
|
||||||
|
github.com/godbus/dbus/v5 v5.1.0 // indirect
|
||||||
github.com/klauspost/compress v1.18.1 // indirect
|
github.com/klauspost/compress v1.18.1 // indirect
|
||||||
github.com/kr/pretty v0.3.1 // indirect
|
|
||||||
github.com/matoous/go-nanoid v1.5.1 // indirect
|
github.com/matoous/go-nanoid v1.5.1 // indirect
|
||||||
|
github.com/moby/sys/userns v0.1.0 // indirect
|
||||||
|
github.com/opencontainers/runtime-spec v1.2.0 // indirect
|
||||||
|
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
|
||||||
github.com/pierrec/lz4/v4 v4.1.22 // indirect
|
github.com/pierrec/lz4/v4 v4.1.22 // indirect
|
||||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
|
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
|
||||||
|
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 // indirect
|
||||||
|
github.com/sirupsen/logrus v1.9.3 // indirect
|
||||||
|
go.uber.org/automaxprocs v1.6.0 // indirect
|
||||||
go.unistack.org/micro-proto/v3 v3.4.1 // indirect
|
go.unistack.org/micro-proto/v3 v3.4.1 // indirect
|
||||||
golang.org/x/crypto v0.43.0 // indirect
|
golang.org/x/crypto v0.47.0 // indirect
|
||||||
golang.org/x/sys v0.37.0 // indirect
|
golang.org/x/exp v0.0.0-20241210194714-1829a127f884 // indirect
|
||||||
|
golang.org/x/sys v0.40.0 // indirect
|
||||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20251022142026-3a174f9686a8 // indirect
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20251022142026-3a174f9686a8 // indirect
|
||||||
google.golang.org/grpc v1.76.0 // indirect
|
google.golang.org/grpc v1.76.0 // indirect
|
||||||
google.golang.org/protobuf v1.36.10 // indirect
|
google.golang.org/protobuf v1.36.11 // indirect
|
||||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||||
)
|
)
|
||||||
|
|||||||
101
go.sum
101
go.sum
@@ -1,18 +1,40 @@
|
|||||||
|
dario.cat/mergo v1.0.1 h1:Ra4+bf83h2ztPIQYNP99R6m+Y7KfnARDfID+a+vLl4s=
|
||||||
|
dario.cat/mergo v1.0.1/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
|
||||||
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
|
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
|
||||||
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
|
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
|
||||||
|
github.com/KimMachineGun/automemlimit v0.6.1 h1:ILa9j1onAAMadBsyyUJv5cack8Y1WT26yLj/V+ulKp8=
|
||||||
|
github.com/KimMachineGun/automemlimit v0.6.1/go.mod h1:T7xYht7B8r6AG/AqFcUdc7fzd2bIdBKmepfP2S1svPY=
|
||||||
github.com/ash3in/uuidv8 v1.2.0 h1:2oogGdtCPwaVtyvPPGin4TfZLtOGE5F+W++E880G6SI=
|
github.com/ash3in/uuidv8 v1.2.0 h1:2oogGdtCPwaVtyvPPGin4TfZLtOGE5F+W++E880G6SI=
|
||||||
github.com/ash3in/uuidv8 v1.2.0/go.mod h1:BnU0wJBxnzdEKmVg4xckBkD+VZuecTFTUP3M0dWgyY4=
|
github.com/ash3in/uuidv8 v1.2.0/go.mod h1:BnU0wJBxnzdEKmVg4xckBkD+VZuecTFTUP3M0dWgyY4=
|
||||||
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
|
github.com/cilium/ebpf v0.16.0 h1:+BiEnHL6Z7lXnlGUsXQPPAE7+kenAd4ES8MQ5min0Ok=
|
||||||
|
github.com/cilium/ebpf v0.16.0/go.mod h1:L7u2Blt2jMM/vLAVgjxluxtBKlz3/GWjB0dMOEngfwE=
|
||||||
|
github.com/containerd/cgroups/v3 v3.0.4 h1:2fs7l3P0Qxb1nKWuJNFiwhp2CqiKzho71DQkDrHJIo4=
|
||||||
|
github.com/containerd/cgroups/v3 v3.0.4/go.mod h1:SA5DLYnXO8pTGYiAHXz94qvLQTKfVM5GEVisn4jpins=
|
||||||
|
github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I=
|
||||||
|
github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo=
|
||||||
|
github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs=
|
||||||
|
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
|
||||||
|
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
|
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
|
||||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
|
||||||
|
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
|
||||||
|
github.com/go-quicktest/qt v1.101.0 h1:O1K29Txy5P2OK0dGo59b7b0LR6wKfIhttaAhHUyn7eI=
|
||||||
|
github.com/go-quicktest/qt v1.101.0/go.mod h1:14Bz/f7NwaXPtdYEgzsx46kqSxVwTbzVZsDC26tQJow=
|
||||||
|
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
|
||||||
|
github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk=
|
||||||
|
github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
|
||||||
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
|
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
|
||||||
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
|
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
|
||||||
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
|
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
|
||||||
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
|
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
|
||||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
|
github.com/josharian/native v1.1.0 h1:uuaP0hAbW7Y4l0ZRQ6C9zfb7Mg1mbFKry/xzDAfmtLA=
|
||||||
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
|
github.com/josharian/native v1.1.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w=
|
||||||
|
github.com/jsimonetti/rtnetlink/v2 v2.0.1 h1:xda7qaHDSVOsADNouv7ukSuicKZO7GgVUCXxpaIEIlM=
|
||||||
|
github.com/jsimonetti/rtnetlink/v2 v2.0.1/go.mod h1:7MoNYNbb3UaDHtF8udiJo/RH6VsTKP1pqKLUTVCvToE=
|
||||||
github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co=
|
github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co=
|
||||||
github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0=
|
github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0=
|
||||||
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
|
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
|
||||||
@@ -21,54 +43,79 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
|||||||
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||||
github.com/matoous/go-nanoid v1.5.1 h1:aCjdvTyO9LLnTIi0fgdXhOPPvOHjpXN6Ik9DaNjIct4=
|
github.com/matoous/go-nanoid v1.5.1 h1:aCjdvTyO9LLnTIi0fgdXhOPPvOHjpXN6Ik9DaNjIct4=
|
||||||
github.com/matoous/go-nanoid v1.5.1/go.mod h1:zyD2a71IubI24efhpvkJz+ZwfwagzgSO6UNiFsZKN7U=
|
github.com/matoous/go-nanoid v1.5.1/go.mod h1:zyD2a71IubI24efhpvkJz+ZwfwagzgSO6UNiFsZKN7U=
|
||||||
|
github.com/mdlayher/netlink v1.7.2 h1:/UtM3ofJap7Vl4QWCPDGXY8d3GIY2UGSDbK+QWmY8/g=
|
||||||
|
github.com/mdlayher/netlink v1.7.2/go.mod h1:xraEF7uJbxLhc5fpHL4cPe221LI2bdttWlU+ZGLfQSw=
|
||||||
|
github.com/mdlayher/socket v0.4.1 h1:eM9y2/jlbs1M615oshPQOHZzj6R6wMT7bX5NPiQvn2U=
|
||||||
|
github.com/mdlayher/socket v0.4.1/go.mod h1:cAqeGjoufqdxWkD7DkpyS+wcefOtmu5OQ8KuoJGIReA=
|
||||||
|
github.com/moby/sys/userns v0.1.0 h1:tVLXkFOxVu9A64/yh59slHVv9ahO9UIev4JZusOLG/g=
|
||||||
|
github.com/moby/sys/userns v0.1.0/go.mod h1:IHUYgu/kao6N8YZlp9Cf444ySSvCmDlmzUcYfDHOl28=
|
||||||
|
github.com/opencontainers/runtime-spec v1.2.0 h1:z97+pHb3uELt/yiAWD691HNHQIF07bE7dzrbT927iTk=
|
||||||
|
github.com/opencontainers/runtime-spec v1.2.0/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0=
|
||||||
|
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0=
|
||||||
|
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y=
|
||||||
github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU=
|
github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU=
|
||||||
github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
|
github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
|
||||||
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
|
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
|
||||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
|
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
|
||||||
|
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
|
||||||
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
|
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
|
||||||
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
|
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
|
||||||
|
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 h1:G/FZtUu7a6NTWl3KUHMV9jkLAh/Rvtf03NWMHaEDl+E=
|
||||||
|
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
|
||||||
|
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
|
||||||
|
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
|
||||||
|
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||||
|
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||||
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
|
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
|
||||||
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
||||||
github.com/twmb/franz-go v1.19.5 h1:W7+o8D0RsQsedqib71OVlLeZ0zI6CbFra7yTYhZTs5Y=
|
github.com/twmb/franz-go v1.20.4 h1:1wTvyLTOxS0oJh5ro/DVt2JHVdx7/kGNtmtFhbcr0O0=
|
||||||
github.com/twmb/franz-go v1.19.5/go.mod h1:4kFJ5tmbbl7asgwAGVuyG1ZMx0NNpYk7EqflvWfPCpM=
|
github.com/twmb/franz-go v1.20.4/go.mod h1:YCnepDd4gl6vdzG03I5Wa57RnCTIC6DVEyMpDX/J8UA=
|
||||||
github.com/twmb/franz-go v1.20.2 h1:CiwhyKZHW6vqSHJkh+RTxFAJkio0jBjM/JQhx/HZ72A=
|
|
||||||
github.com/twmb/franz-go v1.20.2/go.mod h1:YCnepDd4gl6vdzG03I5Wa57RnCTIC6DVEyMpDX/J8UA=
|
|
||||||
github.com/twmb/franz-go/pkg/kadm v1.16.1 h1:IEkrhTljgLHJ0/hT/InhXGjPdmWfFvxp7o/MR7vJ8cw=
|
|
||||||
github.com/twmb/franz-go/pkg/kadm v1.16.1/go.mod h1:Ue/ye1cc9ipsQFg7udFbbGiFNzQMqiH73fGC2y0rwyc=
|
|
||||||
github.com/twmb/franz-go/pkg/kadm v1.17.1 h1:Bt02Y/RLgnFO2NP2HVP1kd2TFtGRiJZx+fSArjZDtpw=
|
github.com/twmb/franz-go/pkg/kadm v1.17.1 h1:Bt02Y/RLgnFO2NP2HVP1kd2TFtGRiJZx+fSArjZDtpw=
|
||||||
github.com/twmb/franz-go/pkg/kadm v1.17.1/go.mod h1:s4duQmrDbloVW9QTMXhs6mViTepze7JLG43xwPcAeTg=
|
github.com/twmb/franz-go/pkg/kadm v1.17.1/go.mod h1:s4duQmrDbloVW9QTMXhs6mViTepze7JLG43xwPcAeTg=
|
||||||
github.com/twmb/franz-go/pkg/kmsg v1.11.2 h1:hIw75FpwcAjgeyfIGFqivAvwC5uNIOWRGvQgZhH4mhg=
|
github.com/twmb/franz-go/pkg/kfake v0.0.0-20251227070528-0c71f7e25fa1 h1:z21wmCm1zk2DZoi3khCD3OWA05FVj1fsEJOBIxpIJH0=
|
||||||
github.com/twmb/franz-go/pkg/kmsg v1.11.2/go.mod h1:CFfkkLysDNmukPYhGzuUcDtf46gQSqCZHMW1T4Z+wDE=
|
github.com/twmb/franz-go/pkg/kfake v0.0.0-20251227070528-0c71f7e25fa1/go.mod h1:2W79ILYghTbIIi4y4j0k3PmV2mCxWoj6D7PtQlZmH3E=
|
||||||
github.com/twmb/franz-go/pkg/kmsg v1.12.0 h1:CbatD7ers1KzDNgJqPbKOq0Bz/WLBdsTH75wgzeVaPc=
|
github.com/twmb/franz-go/pkg/kmsg v1.12.0 h1:CbatD7ers1KzDNgJqPbKOq0Bz/WLBdsTH75wgzeVaPc=
|
||||||
github.com/twmb/franz-go/pkg/kmsg v1.12.0/go.mod h1:+DPt4NC8RmI6hqb8G09+3giKObE6uD2Eya6CfqBpeJY=
|
github.com/twmb/franz-go/pkg/kmsg v1.12.0/go.mod h1:+DPt4NC8RmI6hqb8G09+3giKObE6uD2Eya6CfqBpeJY=
|
||||||
go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8=
|
go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8=
|
||||||
go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM=
|
go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM=
|
||||||
|
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
|
||||||
|
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
|
||||||
|
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||||
|
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||||
|
go.unistack.org/micro-codec-jsonpb/v3 v3.10.6 h1:9vZGETgEqysuz+ZPyph772uTHaQbRFkAwR/3Zr1+ckk=
|
||||||
|
go.unistack.org/micro-codec-jsonpb/v3 v3.10.6/go.mod h1:sUXFkjeDz7FE+Zj2zEbVROMxhxIdQnp0VslUylZSzaY=
|
||||||
|
go.unistack.org/micro-codec-proto/v3 v3.10.3 h1:Olny5WDBl+RhrzEPzm29wtCK/a1d4Ife2hkL2jR5MZs=
|
||||||
|
go.unistack.org/micro-codec-proto/v3 v3.10.3/go.mod h1:tMt+0xZAl7rnoLEs13TIN6ycG1s8DlTGTN8m79sapQY=
|
||||||
go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q=
|
go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q=
|
||||||
go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo=
|
go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo=
|
||||||
go.unistack.org/micro/v3 v3.11.45 h1:fjTLZYWgsVf9FIMZBxOg8ios2/tmyimnjZrsrxEUeXU=
|
go.unistack.org/micro-wrapper-requestid/v3 v3.9.4 h1:P3guOZmD10fW0OT7ZiqAXD95WgYyb8WpAZ9MLTmfBiU=
|
||||||
go.unistack.org/micro/v3 v3.11.45/go.mod h1:fDQ8Mu9wubaFP0L8hNQlpzHiEnWN0wbOlawN9HYo0N4=
|
go.unistack.org/micro-wrapper-requestid/v3 v3.9.4/go.mod h1:+2+dT3tqgUCmQL63VGqtUrCz30uXc2G3MQHaOvVDeLI=
|
||||||
go.unistack.org/micro/v3 v3.11.48 h1:lHJYSHU2z1TTcuswItGwG7cZXN6n04EFqY7lk/0gA7w=
|
go.unistack.org/micro/v3 v3.11.48 h1:lHJYSHU2z1TTcuswItGwG7cZXN6n04EFqY7lk/0gA7w=
|
||||||
go.unistack.org/micro/v3 v3.11.48/go.mod h1:fDQ8Mu9wubaFP0L8hNQlpzHiEnWN0wbOlawN9HYo0N4=
|
go.unistack.org/micro/v3 v3.11.48/go.mod h1:fDQ8Mu9wubaFP0L8hNQlpzHiEnWN0wbOlawN9HYo0N4=
|
||||||
golang.org/x/crypto v0.43.0 h1:dduJYIi3A3KOfdGOHX8AVZ/jGiyPa3IbBozJ5kNuE04=
|
golang.org/x/crypto v0.47.0 h1:V6e3FRj+n4dbpw86FJ8Fv7XVOql7TEwpHapKoMJ/GO8=
|
||||||
golang.org/x/crypto v0.43.0/go.mod h1:BFbav4mRNlXJL4wNeejLpWxB7wMbc79PdRGhWKncxR0=
|
golang.org/x/crypto v0.47.0/go.mod h1:ff3Y9VzzKbwSSEzWqJsJVBnWmRwRSHt/6Op5n9bQc4A=
|
||||||
golang.org/x/net v0.45.0 h1:RLBg5JKixCy82FtLJpeNlVM0nrSqpCRYzVU1n8kj0tM=
|
golang.org/x/exp v0.0.0-20241210194714-1829a127f884 h1:Y/Mj/94zIQQGHVSv1tTtQBDaQaJe62U9bkDZKKyhPCU=
|
||||||
golang.org/x/net v0.45.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY=
|
golang.org/x/exp v0.0.0-20241210194714-1829a127f884/go.mod h1:qj5a5QZpwLU2NLQudwIN5koi3beDhSAlJwa67PuM98c=
|
||||||
golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ=
|
golang.org/x/net v0.48.0 h1:zyQRTTrjc33Lhh0fBgT/H3oZq9WuvRR5gPC70xpDiQU=
|
||||||
golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
|
golang.org/x/net v0.48.0/go.mod h1:+ndRgGjkh8FGtu1w1FGbEC31if4VrNVMuKTgcAAnQRY=
|
||||||
golang.org/x/text v0.30.0 h1:yznKA/E9zq54KzlzBEAWn1NXSQ8DIp/NYMy88xJjl4k=
|
golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw=
|
||||||
golang.org/x/text v0.30.0/go.mod h1:yDdHFIX9t+tORqspjENWgzaCVXgk0yYnYuSZ8UzzBVM=
|
golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
|
||||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20251007200510-49b9836ed3ff h1:A90eA31Wq6HOMIQlLfzFwzqGKBTuaVztYu/g8sn+8Zc=
|
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20251007200510-49b9836ed3ff/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk=
|
golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ=
|
||||||
|
golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
|
||||||
|
golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE=
|
||||||
|
golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8=
|
||||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20251022142026-3a174f9686a8 h1:M1rk8KBnUsBDg1oPGHNCxG4vc1f49epmTO7xscSajMk=
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20251022142026-3a174f9686a8 h1:M1rk8KBnUsBDg1oPGHNCxG4vc1f49epmTO7xscSajMk=
|
||||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20251022142026-3a174f9686a8/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk=
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20251022142026-3a174f9686a8/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk=
|
||||||
google.golang.org/grpc v1.76.0 h1:UnVkv1+uMLYXoIz6o7chp59WfQUYA2ex/BXQ9rHZu7A=
|
google.golang.org/grpc v1.76.0 h1:UnVkv1+uMLYXoIz6o7chp59WfQUYA2ex/BXQ9rHZu7A=
|
||||||
google.golang.org/grpc v1.76.0/go.mod h1:Ju12QI8M6iQJtbcsV+awF5a4hfJMLi4X0JLo94ULZ6c=
|
google.golang.org/grpc v1.76.0/go.mod h1:Ju12QI8M6iQJtbcsV+awF5a4hfJMLi4X0JLo94ULZ6c=
|
||||||
google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE=
|
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
|
||||||
google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
|
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
||||||
|
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
|
|||||||
243
kgo_test.go
243
kgo_test.go
@@ -2,79 +2,149 @@ package kgo_test
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"crypto/tls"
|
||||||
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"github.com/twmb/franz-go/pkg/kfake"
|
||||||
kg "github.com/twmb/franz-go/pkg/kgo"
|
kg "github.com/twmb/franz-go/pkg/kgo"
|
||||||
kgo "go.unistack.org/micro-broker-kgo/v3"
|
kgo "go.unistack.org/micro-broker-kgo/v3"
|
||||||
|
jsonpb "go.unistack.org/micro-codec-jsonpb/v3"
|
||||||
|
proto "go.unistack.org/micro-codec-proto/v3"
|
||||||
|
requestid "go.unistack.org/micro-wrapper-requestid/v3"
|
||||||
|
"go.unistack.org/micro/v3"
|
||||||
"go.unistack.org/micro/v3/broker"
|
"go.unistack.org/micro/v3/broker"
|
||||||
|
"go.unistack.org/micro/v3/client"
|
||||||
"go.unistack.org/micro/v3/logger"
|
"go.unistack.org/micro/v3/logger"
|
||||||
"go.unistack.org/micro/v3/logger/slog"
|
|
||||||
"go.unistack.org/micro/v3/metadata"
|
"go.unistack.org/micro/v3/metadata"
|
||||||
|
"go.unistack.org/micro/v3/server"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
msgcnt = int64(10)
|
msgcnt = int64(10)
|
||||||
group = "38"
|
group = "38"
|
||||||
prefill = true
|
prefill = true
|
||||||
loglevel = logger.DebugLevel
|
loglevel = logger.ErrorLevel
|
||||||
|
cluster *kfake.Cluster
|
||||||
)
|
)
|
||||||
|
|
||||||
var bm = &broker.Message{
|
func TestMain(m *testing.M) {
|
||||||
Header: map[string]string{"hkey": "hval", metadata.HeaderTopic: "test"},
|
cluster = kfake.MustCluster(
|
||||||
Body: []byte(`"body"`),
|
kfake.AllowAutoTopicCreation(),
|
||||||
|
)
|
||||||
|
defer cluster.Close()
|
||||||
|
os.Exit(m.Run())
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFail(t *testing.T) {
|
func helperCreateBroker(t *testing.T) *kgo.Broker {
|
||||||
if tr := os.Getenv("INTEGRATION_TESTS"); len(tr) > 0 {
|
t.Helper()
|
||||||
t.Skip()
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.DefaultLogger = slog.NewLogger()
|
|
||||||
if err := logger.DefaultLogger.Init(logger.WithLevel(loglevel)); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
var addrs []string
|
|
||||||
if addr := os.Getenv("BROKER_ADDRS"); len(addr) == 0 {
|
|
||||||
addrs = []string{"127.0.0.1:9092"}
|
|
||||||
} else {
|
|
||||||
addrs = strings.Split(addr, ",")
|
|
||||||
}
|
|
||||||
|
|
||||||
b := kgo.NewBroker(
|
b := kgo.NewBroker(
|
||||||
broker.Addrs(addrs...),
|
broker.Addrs(cluster.ListenAddrs()...),
|
||||||
kgo.CommitInterval(5*time.Second),
|
kgo.CommitInterval(5*time.Second),
|
||||||
kgo.Options(kg.ClientID("test"), kg.FetchMaxBytes(10*1024*1024),
|
kgo.Options(kg.ClientID("test"), kg.FetchMaxBytes(10*1024*1024),
|
||||||
kg.AllowAutoTopicCreation(),
|
kg.AllowAutoTopicCreation(),
|
||||||
),
|
),
|
||||||
|
kgo.CommitInterval(1*time.Second),
|
||||||
|
broker.ErrorHandler(func(event broker.Event) error {
|
||||||
|
msg := event.Message()
|
||||||
|
log := logger.DefaultLogger.
|
||||||
|
Fields("topic", event.Topic(),
|
||||||
|
"header", msg.Header,
|
||||||
|
"body", msg.Body)
|
||||||
|
err := event.Ack()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
log.Fields("ack_error", err).Error(context.Background(), fmt.Sprintf("brokerHandlerErr: Ack error | %v", event.Error()))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Error(context.Background(), fmt.Sprintf("brokerHandlerErr: %v", event.Error()))
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}),
|
||||||
)
|
)
|
||||||
|
|
||||||
t.Logf("broker init")
|
return b
|
||||||
if err := b.Init(); err != nil {
|
}
|
||||||
t.Fatal(err)
|
|
||||||
|
func helperCreateService(t *testing.T, ctx context.Context, b *kgo.Broker) micro.Service {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
rh := requestid.NewHook()
|
||||||
|
|
||||||
|
sopts := []server.Option{
|
||||||
|
server.Name("test"),
|
||||||
|
server.Version("latest"),
|
||||||
|
server.Context(ctx),
|
||||||
|
server.Codec("application/json", jsonpb.NewCodec()),
|
||||||
|
server.Codec("application/protobuf", proto.NewCodec()),
|
||||||
|
server.Codec("application/grpc", proto.NewCodec()),
|
||||||
|
server.Codec("application/grpc+proto", proto.NewCodec()),
|
||||||
|
server.Codec("application/grpc+json", jsonpb.NewCodec()),
|
||||||
|
server.Broker(b),
|
||||||
|
server.Hooks(
|
||||||
|
server.HookHandler(rh.ServerHandler),
|
||||||
|
server.HookSubHandler(rh.ServerSubscriber),
|
||||||
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Logf("broker connect")
|
copts := []client.Option{
|
||||||
if err := b.Connect(ctx); err != nil {
|
client.Codec("application/json", jsonpb.NewCodec()),
|
||||||
t.Fatal(err)
|
client.Codec("application/protobuf", proto.NewCodec()),
|
||||||
|
client.Codec("application/grpc", proto.NewCodec()),
|
||||||
|
client.Codec("application/grpc+proto", proto.NewCodec()),
|
||||||
|
client.Codec("application/grpc+json", jsonpb.NewCodec()),
|
||||||
|
client.ContentType("application/grpc"),
|
||||||
|
client.Retries(0),
|
||||||
|
client.TLSConfig(&tls.Config{InsecureSkipVerify: true}),
|
||||||
|
client.Broker(b),
|
||||||
|
client.Hooks(
|
||||||
|
client.HookPublish(rh.ClientPublish),
|
||||||
|
client.HookCall(rh.ClientCall),
|
||||||
|
client.HookBatchPublish(rh.ClientBatchPublish),
|
||||||
|
client.HookStream(rh.ClientStream),
|
||||||
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return micro.NewService(
|
||||||
|
micro.Server(server.NewServer(sopts...)),
|
||||||
|
micro.Client(client.NewClient(copts...)),
|
||||||
|
micro.Broker(b),
|
||||||
|
micro.Context(ctx),
|
||||||
|
micro.Name("test"),
|
||||||
|
micro.Version("latest"),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFail(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
err := logger.DefaultLogger.Init(logger.WithLevel(loglevel))
|
||||||
|
require.Nil(t, err)
|
||||||
|
|
||||||
|
b := helperCreateBroker(t)
|
||||||
|
|
||||||
|
t.Logf("broker init")
|
||||||
|
require.Nil(t, b.Init())
|
||||||
|
|
||||||
|
t.Logf("broker connect")
|
||||||
|
require.Nil(t, b.Connect(ctx))
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
t.Logf("broker disconnect")
|
t.Logf("broker disconnect")
|
||||||
if err := b.Disconnect(ctx); err != nil {
|
require.Nil(t, b.Disconnect(ctx))
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
t.Logf("broker health %v", b.Health())
|
t.Logf("broker health %v", b.Health())
|
||||||
msgs := make([]*broker.Message, 0, msgcnt)
|
msgs := make([]*broker.Message, 0, msgcnt)
|
||||||
for i := int64(0); i < msgcnt; i++ {
|
for i := int64(0); i < msgcnt; i++ {
|
||||||
msgs = append(msgs, bm)
|
msgs = append(msgs, &broker.Message{
|
||||||
|
Header: map[string]string{"hkey": "hval", metadata.HeaderTopic: "test"},
|
||||||
|
Body: []byte(`"body"`),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, msg := range msgs {
|
for _, msg := range msgs {
|
||||||
@@ -83,7 +153,6 @@ func TestFail(t *testing.T) {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// t.Skip()
|
|
||||||
|
|
||||||
idx := int64(0)
|
idx := int64(0)
|
||||||
fn := func(msg broker.Event) error {
|
fn := func(msg broker.Event) error {
|
||||||
@@ -97,13 +166,11 @@ func TestFail(t *testing.T) {
|
|||||||
broker.SubscribeAutoAck(true),
|
broker.SubscribeAutoAck(true),
|
||||||
broker.SubscribeGroup(group),
|
broker.SubscribeGroup(group),
|
||||||
broker.SubscribeBodyOnly(true))
|
broker.SubscribeBodyOnly(true))
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
require.Nil(t, err)
|
||||||
}
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := sub.Unsubscribe(ctx); err != nil {
|
require.Nil(t, sub.Unsubscribe(ctx))
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
@@ -117,92 +184,54 @@ func TestFail(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestConnect(t *testing.T) {
|
func TestConnect(t *testing.T) {
|
||||||
if tr := os.Getenv("INTEGRATION_TESTS"); len(tr) > 0 {
|
|
||||||
t.Skip()
|
|
||||||
}
|
|
||||||
|
|
||||||
var addrs []string
|
|
||||||
ctx := context.TODO()
|
ctx := context.TODO()
|
||||||
b := kgo.NewBroker(
|
b := helperCreateBroker(t)
|
||||||
broker.Addrs(addrs...),
|
|
||||||
kgo.CommitInterval(5*time.Second),
|
|
||||||
kgo.Options(kg.ClientID("test"), kg.FetchMaxBytes(10*1024*1024)),
|
|
||||||
)
|
|
||||||
if err := b.Init(); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := b.Connect(ctx); err != nil {
|
require.Nil(t, b.Init())
|
||||||
t.Fatal(err)
|
|
||||||
}
|
require.Nil(t, b.Connect(ctx))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPubSub(t *testing.T) {
|
func TestPubSub(t *testing.T) {
|
||||||
if tr := os.Getenv("INTEGRATION_TESTS"); len(tr) > 0 {
|
|
||||||
t.Skip()
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := logger.DefaultLogger.Init(logger.WithLevel(loglevel)); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
err := logger.DefaultLogger.Init(logger.WithLevel(loglevel))
|
||||||
|
require.Nil(t, err)
|
||||||
|
|
||||||
var addrs []string
|
b := helperCreateBroker(t)
|
||||||
if addr := os.Getenv("BROKER_ADDRS"); len(addr) == 0 {
|
|
||||||
addrs = []string{"127.0.0.1:29091", "127.0.0.2:29092", "127.0.0.3:29093"}
|
|
||||||
} else {
|
|
||||||
addrs = strings.Split(addr, ",")
|
|
||||||
}
|
|
||||||
|
|
||||||
b := kgo.NewBroker(
|
|
||||||
broker.Addrs(addrs...),
|
|
||||||
kgo.CommitInterval(5*time.Second),
|
|
||||||
kgo.Options(kg.ClientID("test"), kg.FetchMaxBytes(10*1024*1024)),
|
|
||||||
)
|
|
||||||
|
|
||||||
if err := b.Init(); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := b.Connect(ctx); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
|
require.Nil(t, b.Init())
|
||||||
|
require.Nil(t, b.Connect(ctx))
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := b.Disconnect(ctx); err != nil {
|
require.Nil(t, b.Disconnect(ctx))
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
}()
|
}()
|
||||||
if prefill {
|
|
||||||
msgs := make([]*broker.Message, 0, msgcnt)
|
|
||||||
for i := int64(0); i < msgcnt; i++ {
|
|
||||||
msgs = append(msgs, bm)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := b.BatchPublish(ctx, msgs); err != nil {
|
if prefill {
|
||||||
t.Fatal(err)
|
var msgs []*broker.Message
|
||||||
|
for i := int64(0); i < msgcnt; i++ {
|
||||||
|
msgs = append(msgs, &broker.Message{
|
||||||
|
Header: map[string]string{"hkey": "hval", metadata.HeaderTopic: "test.pubsub"},
|
||||||
|
Body: []byte(`"body"`),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
// t.Skip()
|
require.Nil(t, b.BatchPublish(ctx, msgs))
|
||||||
}
|
}
|
||||||
done := make(chan bool, 1)
|
done := make(chan bool, 1)
|
||||||
idx := int64(0)
|
idx := int64(0)
|
||||||
fn := func(msg broker.Event) error {
|
fn := func(msg broker.Event) error {
|
||||||
atomic.AddInt64(&idx, 1)
|
atomic.AddInt64(&idx, 1)
|
||||||
// time.Sleep(200 * time.Millisecond)
|
|
||||||
return msg.Ack()
|
return msg.Ack()
|
||||||
}
|
}
|
||||||
|
|
||||||
sub, err := b.Subscribe(ctx, "test", fn,
|
sub, err := b.Subscribe(ctx, "test.pubsub", fn,
|
||||||
broker.SubscribeAutoAck(true),
|
broker.SubscribeAutoAck(true),
|
||||||
broker.SubscribeGroup(group),
|
broker.SubscribeGroup(group),
|
||||||
broker.SubscribeBodyOnly(true))
|
broker.SubscribeBodyOnly(true),
|
||||||
if err != nil {
|
)
|
||||||
t.Fatal(err)
|
|
||||||
}
|
require.Nil(t, err)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := sub.Unsubscribe(ctx); err != nil {
|
require.Nil(t, sub.Unsubscribe(ctx))
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
ticker := time.NewTicker(2 * time.Minute)
|
ticker := time.NewTicker(2 * time.Minute)
|
||||||
@@ -210,14 +239,16 @@ func TestPubSub(t *testing.T) {
|
|||||||
|
|
||||||
pticker := time.NewTicker(1 * time.Second)
|
pticker := time.NewTicker(1 * time.Second)
|
||||||
defer pticker.Stop()
|
defer pticker.Stop()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-pticker.C:
|
case <-pticker.C:
|
||||||
if prc := atomic.LoadInt64(&idx); prc == msgcnt {
|
if prc := atomic.LoadInt64(&idx); prc == msgcnt {
|
||||||
|
t.Log("everything is read")
|
||||||
close(done)
|
close(done)
|
||||||
} else {
|
} else {
|
||||||
t.Logf("processed %v\n", prc)
|
t.Logf("processed %v of %v\n", prc, msgcnt)
|
||||||
}
|
}
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
close(done)
|
close(done)
|
||||||
|
|||||||
4
meter.go
4
meter.go
@@ -56,7 +56,7 @@ const (
|
|||||||
|
|
||||||
labelNode = "node_id"
|
labelNode = "node_id"
|
||||||
labelSuccess = "success"
|
labelSuccess = "success"
|
||||||
labelFaulure = "failure"
|
labelFailure = "failure"
|
||||||
labelStatus = "status"
|
labelStatus = "status"
|
||||||
labelTopic = "topic"
|
labelTopic = "topic"
|
||||||
)
|
)
|
||||||
@@ -68,7 +68,7 @@ func (m *hookMeter) OnGroupManageError(_ error) {
|
|||||||
func (m *hookMeter) OnBrokerConnect(meta kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) {
|
func (m *hookMeter) OnBrokerConnect(meta kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) {
|
||||||
node := strconv.Itoa(int(meta.NodeID))
|
node := strconv.Itoa(int(meta.NodeID))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
m.meter.Counter(metricBrokerConnects, labelNode, node, labelStatus, labelFaulure).Inc()
|
m.meter.Counter(metricBrokerConnects, labelNode, node, labelStatus, labelFailure).Inc()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
m.meter.Counter(metricBrokerConnects, labelNode, node, labelStatus, labelSuccess).Inc()
|
m.meter.Counter(metricBrokerConnects, labelNode, node, labelStatus, labelSuccess).Inc()
|
||||||
|
|||||||
@@ -139,7 +139,11 @@ func (s *Subscriber) poll(ctx context.Context) {
|
|||||||
|
|
||||||
fetches.EachPartition(func(p kgo.FetchTopicPartition) {
|
fetches.EachPartition(func(p kgo.FetchTopicPartition) {
|
||||||
tps := tp{p.Topic, p.Partition}
|
tps := tp{p.Topic, p.Partition}
|
||||||
s.consumers[tps].recs <- p
|
s.mu.Lock()
|
||||||
|
if c := s.consumers[tps]; c != nil {
|
||||||
|
c.recs <- p
|
||||||
|
}
|
||||||
|
s.mu.Unlock()
|
||||||
})
|
})
|
||||||
s.c.AllowRebalance()
|
s.c.AllowRebalance()
|
||||||
}
|
}
|
||||||
@@ -153,17 +157,39 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32)
|
|||||||
for topic, partitions := range lost {
|
for topic, partitions := range lost {
|
||||||
for _, partition := range partitions {
|
for _, partition := range partitions {
|
||||||
tps := tp{topic, partition}
|
tps := tp{topic, partition}
|
||||||
|
s.mu.Lock()
|
||||||
pc, ok := s.consumers[tps]
|
pc, ok := s.consumers[tps]
|
||||||
|
if ok {
|
||||||
|
delete(s.consumers, tps)
|
||||||
|
close(pc.quit)
|
||||||
|
}
|
||||||
|
s.mu.Unlock()
|
||||||
if !ok || pc == nil {
|
if !ok || pc == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
delete(s.consumers, tps)
|
|
||||||
close(pc.quit)
|
|
||||||
if s.kopts.Logger.V(logger.DebugLevel) {
|
if s.kopts.Logger.V(logger.DebugLevel) {
|
||||||
s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] waiting for work to finish topic %s partition %d", topic, partition))
|
s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] killing consumer topic %s partition %d", topic, partition))
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() { <-pc.done; wg.Done() }()
|
go func(c *consumer, t string, p int32) {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
timeout := time.NewTimer(s.kopts.GracefulTimeout)
|
||||||
|
defer timeout.Stop()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-c.done:
|
||||||
|
if s.kopts.Logger.V(logger.DebugLevel) {
|
||||||
|
s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] consumer stopped topic %s partition %d", t, p))
|
||||||
|
}
|
||||||
|
case <-timeout.C:
|
||||||
|
if s.kopts.Logger.V(logger.DebugLevel) {
|
||||||
|
s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] timeout waiting for consumer topic %s partition %d", t, p))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(pc, topic, partition)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
109
tombstone_test.go
Normal file
109
tombstone_test.go
Normal file
@@ -0,0 +1,109 @@
|
|||||||
|
package kgo_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
kg "github.com/twmb/franz-go/pkg/kgo"
|
||||||
|
"go.unistack.org/micro/v3"
|
||||||
|
"go.unistack.org/micro/v3/codec"
|
||||||
|
"go.unistack.org/micro/v3/logger"
|
||||||
|
"go.unistack.org/micro/v3/server"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSubscriberHandlesTombstoneMessages(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
err := logger.DefaultLogger.Init(logger.WithLevel(loglevel))
|
||||||
|
require.Nil(t, err)
|
||||||
|
|
||||||
|
cl := cluster
|
||||||
|
_ = cl
|
||||||
|
|
||||||
|
b := helperCreateBroker(t)
|
||||||
|
require.Nil(t, b.Init())
|
||||||
|
require.Nil(t, b.Connect(ctx))
|
||||||
|
defer func() {
|
||||||
|
require.Nil(t, b.Disconnect(ctx))
|
||||||
|
}()
|
||||||
|
|
||||||
|
svc := helperCreateService(t, ctx, b)
|
||||||
|
require.Nil(t, svc.Init())
|
||||||
|
|
||||||
|
var (
|
||||||
|
msgsPublished = 1000
|
||||||
|
done = make(chan bool, 1)
|
||||||
|
counterMsgs = atomic.Int64{}
|
||||||
|
topicName = "test.tombstone.topic"
|
||||||
|
|
||||||
|
// INCORRECT: func(ctx context.Context, req any) error
|
||||||
|
fnHandler = func(ctx context.Context, req *codec.Frame) error {
|
||||||
|
counterMsgs.Add(1)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
err = micro.RegisterSubscriber(
|
||||||
|
topicName,
|
||||||
|
svc.Server(),
|
||||||
|
fnHandler,
|
||||||
|
server.SubscriberQueue("queue"),
|
||||||
|
server.SubscriberAck(true),
|
||||||
|
server.SubscriberBodyOnly(true),
|
||||||
|
)
|
||||||
|
require.Nil(t, err)
|
||||||
|
|
||||||
|
{ // PUBLISH tombstones via franz-go client
|
||||||
|
client, err := kg.NewClient(
|
||||||
|
kg.SeedBrokers(cluster.ListenAddrs()...),
|
||||||
|
kg.AllowAutoTopicCreation(),
|
||||||
|
)
|
||||||
|
require.Nil(t, err)
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
var records []*kg.Record
|
||||||
|
for i := 0; i < msgsPublished; i++ {
|
||||||
|
records = append(records, &kg.Record{
|
||||||
|
Topic: topicName,
|
||||||
|
Key: []byte("tombstone-key"),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
results := client.ProduceSync(ctx, records...)
|
||||||
|
for _, r := range results {
|
||||||
|
require.Nil(t, r.Err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Nil(t, svc.Start())
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
require.Nil(t, svc.Run())
|
||||||
|
}()
|
||||||
|
|
||||||
|
ticker := time.NewTicker(2 * time.Minute)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
pticker := time.NewTicker(1 * time.Second)
|
||||||
|
defer pticker.Stop()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-pticker.C:
|
||||||
|
if prc := counterMsgs.Load(); prc == int64(msgsPublished) {
|
||||||
|
t.Log("everything is read:", prc)
|
||||||
|
close(done)
|
||||||
|
} else {
|
||||||
|
t.Logf("processed %v of %v\n", prc, msgsPublished)
|
||||||
|
}
|
||||||
|
case <-ticker.C:
|
||||||
|
close(done)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
<-done
|
||||||
|
require.Nil(t, svc.Stop())
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user