Compare commits
7 Commits
v3.8.49
...
937c9d5720
| Author | SHA1 | Date | |
|---|---|---|---|
| 937c9d5720 | |||
|
|
951fba55fa | ||
|
|
ec5238ed14 | ||
| d8f44a924e | |||
|
|
ffe9e5d952 | ||
| 8c362fd6ae | |||
| 90365a455c |
26
.gitea/workflows/job_lint.yml
Normal file
26
.gitea/workflows/job_lint.yml
Normal file
@@ -0,0 +1,26 @@
|
||||
name: lint
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
types: [opened, reopened, synchronize]
|
||||
branches:
|
||||
- master
|
||||
- v3
|
||||
- v4
|
||||
|
||||
jobs:
|
||||
lint:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: checkout
|
||||
uses: actions/checkout@v3
|
||||
- name: setup-go
|
||||
uses: actions/setup-go@v5
|
||||
with:
|
||||
go-version: 'stable'
|
||||
- name: deps
|
||||
run: go get -v -d ./...
|
||||
- name: lint
|
||||
uses: https://github.com/golangci/golangci-lint-action@v6
|
||||
with:
|
||||
version: 'latest'
|
||||
31
.gitea/workflows/job_test.yml
Normal file
31
.gitea/workflows/job_test.yml
Normal file
@@ -0,0 +1,31 @@
|
||||
name: test
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
types: [opened, reopened, synchronize]
|
||||
branches:
|
||||
- master
|
||||
- v3
|
||||
- v4
|
||||
push:
|
||||
branches:
|
||||
- master
|
||||
- v3
|
||||
- v4
|
||||
|
||||
jobs:
|
||||
test:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: checkout
|
||||
uses: actions/checkout@v3
|
||||
- name: setup-go
|
||||
uses: actions/setup-go@v5
|
||||
with:
|
||||
go-version: 'stable'
|
||||
- name: deps
|
||||
run: go get -v -d ./...
|
||||
- name: test
|
||||
env:
|
||||
INTEGRATION_TESTS: yes
|
||||
run: go test -mod readonly -v ./...
|
||||
47
.github/workflows/build.yml
vendored
47
.github/workflows/build.yml
vendored
@@ -1,47 +0,0 @@
|
||||
name: build
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- master
|
||||
- v3
|
||||
jobs:
|
||||
test:
|
||||
name: test
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: setup
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: 1.17
|
||||
- name: checkout
|
||||
uses: actions/checkout@v3
|
||||
- name: cache
|
||||
uses: actions/cache@v3
|
||||
with:
|
||||
path: ~/go/pkg/mod
|
||||
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
|
||||
restore-keys: ${{ runner.os }}-go-
|
||||
- name: deps
|
||||
run: go get -v -t -d ./...
|
||||
- name: test
|
||||
env:
|
||||
INTEGRATION_TESTS: yes
|
||||
run: go test -mod readonly -v ./...
|
||||
lint:
|
||||
name: lint
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: checkout
|
||||
uses: actions/checkout@v3
|
||||
- name: lint
|
||||
uses: golangci/golangci-lint-action@v3.4.0
|
||||
continue-on-error: true
|
||||
with:
|
||||
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
|
||||
version: v1.30
|
||||
# Optional: working directory, useful for monorepos
|
||||
# working-directory: somedir
|
||||
# Optional: golangci-lint command line arguments.
|
||||
# args: --issues-exit-code=0
|
||||
# Optional: show only new issues if it's a pull request. The default value is `false`.
|
||||
# only-new-issues: true
|
||||
47
.github/workflows/pr.yml
vendored
47
.github/workflows/pr.yml
vendored
@@ -1,47 +0,0 @@
|
||||
name: prbuild
|
||||
on:
|
||||
pull_request:
|
||||
branches:
|
||||
- master
|
||||
- v3
|
||||
jobs:
|
||||
test:
|
||||
name: test
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: setup
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: 1.17
|
||||
- name: checkout
|
||||
uses: actions/checkout@v3
|
||||
- name: cache
|
||||
uses: actions/cache@v3
|
||||
with:
|
||||
path: ~/go/pkg/mod
|
||||
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
|
||||
restore-keys: ${{ runner.os }}-go-
|
||||
- name: deps
|
||||
run: go get -v -t -d ./...
|
||||
- name: test
|
||||
env:
|
||||
INTEGRATION_TESTS: yes
|
||||
run: go test -mod readonly -v ./...
|
||||
lint:
|
||||
name: lint
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: checkout
|
||||
uses: actions/checkout@v3
|
||||
- name: lint
|
||||
uses: golangci/golangci-lint-action@v3.4.0
|
||||
continue-on-error: true
|
||||
with:
|
||||
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
|
||||
version: v1.30
|
||||
# Optional: working directory, useful for monorepos
|
||||
# working-directory: somedir
|
||||
# Optional: golangci-lint command line arguments.
|
||||
# args: --issues-exit-code=0
|
||||
# Optional: show only new issues if it's a pull request. The default value is `false`.
|
||||
# only-new-issues: true
|
||||
5
.golangci.yml
Normal file
5
.golangci.yml
Normal file
@@ -0,0 +1,5 @@
|
||||
run:
|
||||
concurrency: 8
|
||||
deadline: 5m
|
||||
issues-exit-code: 1
|
||||
tests: true
|
||||
10
go.mod
10
go.mod
@@ -1,8 +1,8 @@
|
||||
module go.unistack.org/micro-broker-kgo/v3
|
||||
|
||||
go 1.22
|
||||
go 1.22.7
|
||||
|
||||
toolchain go1.23.1
|
||||
toolchain go1.23.3
|
||||
|
||||
require (
|
||||
github.com/google/uuid v1.6.0
|
||||
@@ -10,7 +10,7 @@ require (
|
||||
github.com/twmb/franz-go/pkg/kadm v1.14.0
|
||||
github.com/twmb/franz-go/pkg/kmsg v1.9.0
|
||||
go.opentelemetry.io/otel v1.32.0
|
||||
go.unistack.org/micro/v3 v3.10.101
|
||||
go.unistack.org/micro/v3 v3.11.0
|
||||
)
|
||||
|
||||
require (
|
||||
@@ -19,7 +19,7 @@ require (
|
||||
go.unistack.org/micro-proto/v3 v3.4.1 // indirect
|
||||
golang.org/x/crypto v0.29.0 // indirect
|
||||
golang.org/x/sys v0.27.0 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20241118233622-e639e219e697 // indirect
|
||||
google.golang.org/grpc v1.67.1 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a // indirect
|
||||
google.golang.org/grpc v1.68.0 // indirect
|
||||
google.golang.org/protobuf v1.35.2 // indirect
|
||||
)
|
||||
|
||||
26
go.sum
26
go.sum
@@ -1,17 +1,9 @@
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
|
||||
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
|
||||
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
|
||||
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
|
||||
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
github.com/twmb/franz-go v1.18.0 h1:25FjMZfdozBywVX+5xrWC2W+W76i0xykKjTdEeD2ejw=
|
||||
github.com/twmb/franz-go v1.18.0/go.mod h1:zXCGy74M0p5FbXsLeASdyvfLFsBvTubVqctIaa5wQ+I=
|
||||
github.com/twmb/franz-go/pkg/kadm v1.14.0 h1:nAn1co1lXzJQocpzyIyOFOjUBf4WHWs5/fTprXy2IZs=
|
||||
@@ -22,21 +14,15 @@ go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U=
|
||||
go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg=
|
||||
go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q=
|
||||
go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo=
|
||||
go.unistack.org/micro/v3 v3.10.101 h1:CwMg7f2Mnsy+tRcsY0RTcAMkTVp+GMUgPU6pPaG8gpw=
|
||||
go.unistack.org/micro/v3 v3.10.101/go.mod h1:YzMldzHN9Ei+zy5t/Psu7RUWDZwUfrNYiStSQtTz90g=
|
||||
go.unistack.org/micro/v3 v3.11.0 h1:usQ+8wQuOWpQd4+DGhFXSgZ+e+wOBjuT3W5GJZ02bSs=
|
||||
go.unistack.org/micro/v3 v3.11.0/go.mod h1:YzMldzHN9Ei+zy5t/Psu7RUWDZwUfrNYiStSQtTz90g=
|
||||
golang.org/x/crypto v0.29.0 h1:L5SG1JTTXupVV3n6sUqMTeWbjAyfPwoda2DLX8J8FrQ=
|
||||
golang.org/x/crypto v0.29.0/go.mod h1:+F4F4N5hv6v38hfeYwTdx20oUvLLc+QfrE9Ax9HtgRg=
|
||||
golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE=
|
||||
golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg=
|
||||
golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s=
|
||||
golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/text v0.20.0 h1:gK/Kv2otX8gz+wn7Rmb3vT96ZwuoxnQlY+HlJVj7Qug=
|
||||
golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20241118233622-e639e219e697 h1:LWZqQOEjDyONlF1H6afSWpAL/znlREo2tHfLoe+8LMA=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20241118233622-e639e219e697/go.mod h1:5uTbfoYQed2U9p3KIj2/Zzm02PYhndfdmML0qC3q3FU=
|
||||
google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E=
|
||||
google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a h1:hgh8P4EuoxpsuKMXX/To36nOFD7vixReXgn8lPGnt+o=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a/go.mod h1:5uTbfoYQed2U9p3KIj2/Zzm02PYhndfdmML0qC3q3FU=
|
||||
google.golang.org/grpc v1.68.0 h1:aHQeeJbo8zAkAa3pRzrVjZlbz6uSfeOXlJNQM0RAbz0=
|
||||
google.golang.org/grpc v1.68.0/go.mod h1:fmSPC5AsjSBCK54MyHRx48kpOti1/jRfOlwEWywNjWA=
|
||||
google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io=
|
||||
google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
||||
38
kgo.go
38
kgo.go
@@ -9,6 +9,7 @@ import (
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
@@ -60,12 +61,24 @@ type Broker struct {
|
||||
init bool
|
||||
c *kgo.Client
|
||||
kopts []kgo.Opt
|
||||
connected bool
|
||||
connected *atomic.Uint32
|
||||
sync.RWMutex
|
||||
opts broker.Options
|
||||
subs []*Subscriber
|
||||
}
|
||||
|
||||
func (r *Broker) Live() bool {
|
||||
return r.connected.Load() == 1
|
||||
}
|
||||
|
||||
func (r *Broker) Ready() bool {
|
||||
return r.connected.Load() == 1
|
||||
}
|
||||
|
||||
func (r *Broker) Health() bool {
|
||||
return r.connected.Load() == 1
|
||||
}
|
||||
|
||||
func (k *Broker) Address() string {
|
||||
return strings.Join(k.opts.Addrs, ",")
|
||||
}
|
||||
@@ -125,12 +138,9 @@ func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, *ho
|
||||
}
|
||||
|
||||
func (k *Broker) Connect(ctx context.Context) error {
|
||||
k.RLock()
|
||||
if k.connected {
|
||||
k.RUnlock()
|
||||
if k.connected.Load() == 1 {
|
||||
return nil
|
||||
}
|
||||
k.RUnlock()
|
||||
|
||||
nctx := k.opts.Context
|
||||
if ctx != nil {
|
||||
@@ -144,19 +154,16 @@ func (k *Broker) Connect(ctx context.Context) error {
|
||||
|
||||
k.Lock()
|
||||
k.c = c
|
||||
k.connected = true
|
||||
k.connected.Store(1)
|
||||
k.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (k *Broker) Disconnect(ctx context.Context) error {
|
||||
k.RLock()
|
||||
if !k.connected {
|
||||
k.RUnlock()
|
||||
if k.connected.Load() == 0 {
|
||||
return nil
|
||||
}
|
||||
k.RUnlock()
|
||||
|
||||
nctx := k.opts.Context
|
||||
if ctx != nil {
|
||||
@@ -186,7 +193,7 @@ func (k *Broker) Disconnect(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
k.connected = false
|
||||
k.connected.Store(0)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -241,14 +248,14 @@ func (k *Broker) Publish(ctx context.Context, topic string, msg *broker.Message,
|
||||
|
||||
func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
|
||||
k.Lock()
|
||||
if !k.connected {
|
||||
if k.connected.Load() == 0 {
|
||||
c, _, err := k.connect(ctx, k.kopts...)
|
||||
if err != nil {
|
||||
k.Unlock()
|
||||
return err
|
||||
}
|
||||
k.c = c
|
||||
k.connected = true
|
||||
k.connected.Store(1)
|
||||
}
|
||||
k.Unlock()
|
||||
|
||||
@@ -453,7 +460,8 @@ func NewBroker(opts ...broker.Option) *Broker {
|
||||
}
|
||||
|
||||
return &Broker{
|
||||
opts: options,
|
||||
kopts: kopts,
|
||||
connected: &atomic.Uint32{},
|
||||
opts: options,
|
||||
kopts: kopts,
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user