Compare commits

..

2 Commits

Author SHA1 Message Date
98da69fbe8 Merge pull request 'rwfix' (#127) from rwfix into master
Some checks failed
build / test (push) Failing after 1m27s
build / lint (push) Failing after 2m45s
codeql / analyze (go) (push) Failing after 2m41s
Reviewed-on: #127
2023-12-20 22:57:32 +03:00
d6d2483d8d fixup panic
Some checks failed
codeql / analyze (go) (pull_request) Failing after 2m41s
prbuild / test (pull_request) Failing after 1m28s
prbuild / lint (pull_request) Failing after 2m37s
autoapprove / autoapprove (pull_request) Failing after 1m25s
automerge / automerge (pull_request) Failing after 4s
dependabot-automerge / automerge (pull_request) Has been skipped
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-12-20 22:56:28 +03:00
18 changed files with 104 additions and 71 deletions

View File

@@ -1,26 +0,0 @@
name: lint
on:
pull_request:
types: [opened, reopened, synchronize]
branches:
- master
- v3
- v4
jobs:
lint:
runs-on: ubuntu-latest
steps:
- name: setup-go
uses: actions/setup-go@v5
with:
go-version: 'stable'
- name: checkout
uses: actions/checkout@v3
- name: deps
run: go get -v -d ./...
- name: lint
uses: https://github.com/golangci/golangci-lint-action@v6
with:
version: 'latest'

View File

@@ -1,31 +0,0 @@
name: test
on:
pull_request:
types: [opened, reopened, synchronize]
branches:
- master
- v3
- v4
push:
branches:
- master
- v3
- v4
jobs:
test:
runs-on: ubuntu-latest
steps:
- name: setup-go
uses: actions/setup-go@v5
with:
go-version: 'stable'
- name: checkout
uses: actions/checkout@v3
- name: deps
run: go get -v -d ./...
- name: test
env:
INTEGRATION_TESTS: yes
run: go test -mod readonly -v ./...

47
.github/workflows/build.yml vendored Normal file
View File

@@ -0,0 +1,47 @@
name: build
on:
push:
branches:
- master
- v3
jobs:
test:
name: test
runs-on: ubuntu-latest
steps:
- name: setup
uses: actions/setup-go@v3
with:
go-version: 1.17
- name: checkout
uses: actions/checkout@v3
- name: cache
uses: actions/cache@v3
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: ${{ runner.os }}-go-
- name: deps
run: go get -v -t -d ./...
- name: test
env:
INTEGRATION_TESTS: yes
run: go test -mod readonly -v ./...
lint:
name: lint
runs-on: ubuntu-latest
steps:
- name: checkout
uses: actions/checkout@v3
- name: lint
uses: golangci/golangci-lint-action@v3.4.0
continue-on-error: true
with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
version: v1.30
# Optional: working directory, useful for monorepos
# working-directory: somedir
# Optional: golangci-lint command line arguments.
# args: --issues-exit-code=0
# Optional: show only new issues if it's a pull request. The default value is `false`.
# only-new-issues: true

47
.github/workflows/pr.yml vendored Normal file
View File

@@ -0,0 +1,47 @@
name: prbuild
on:
pull_request:
branches:
- master
- v3
jobs:
test:
name: test
runs-on: ubuntu-latest
steps:
- name: setup
uses: actions/setup-go@v3
with:
go-version: 1.17
- name: checkout
uses: actions/checkout@v3
- name: cache
uses: actions/cache@v3
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: ${{ runner.os }}-go-
- name: deps
run: go get -v -t -d ./...
- name: test
env:
INTEGRATION_TESTS: yes
run: go test -mod readonly -v ./...
lint:
name: lint
runs-on: ubuntu-latest
steps:
- name: checkout
uses: actions/checkout@v3
- name: lint
uses: golangci/golangci-lint-action@v3.4.0
continue-on-error: true
with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
version: v1.30
# Optional: working directory, useful for monorepos
# working-directory: somedir
# Optional: golangci-lint command line arguments.
# args: --issues-exit-code=0
# Optional: show only new issues if it's a pull request. The default value is `false`.
# only-new-issues: true

View File

@@ -1,5 +0,0 @@
run:
concurrency: 8
deadline: 5m
issues-exit-code: 1
tests: true

4
go.mod
View File

@@ -4,11 +4,11 @@ go 1.19
require (
github.com/twmb/franz-go v1.11.5
github.com/twmb/franz-go/pkg/kmsg v1.3.0
go.unistack.org/micro/v4 v4.0.2
go.unistack.org/micro/v4 v4.0.1
)
require (
github.com/klauspost/compress v1.15.9 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.3.0 // indirect
)

4
go.sum
View File

@@ -6,8 +6,8 @@ github.com/twmb/franz-go v1.11.5 h1:TTv5lVJd+87XkmP9dWN9Jgpf7IUUr7a7jee+byR8LBE=
github.com/twmb/franz-go v1.11.5/go.mod h1:FvaHNlpT6woVYIl6LAuIeL7yHol1Fp6Gv2Dn21AvH78=
github.com/twmb/franz-go/pkg/kmsg v1.3.0 h1:ouBETB7nTqRxiO5E8/pySoFZtVEW2VWw55z3/bsUzTw=
github.com/twmb/franz-go/pkg/kmsg v1.3.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
go.unistack.org/micro/v4 v4.0.2 h1:2LeG6jslE50c72f1XwJhfTiidy67xklIC3saptLoUys=
go.unistack.org/micro/v4 v4.0.2/go.mod h1:+wBa98rSf+mRXb/MuSVFPXtDrqN0k8rzPQiC8wRCwCo=
go.unistack.org/micro/v4 v4.0.1 h1:xo1IxbVfgh8i0eY0VeYa3cbb13u5n/Mxnp3FOgWD4Jo=
go.unistack.org/micro/v4 v4.0.1/go.mod h1:p/J5UcSJjfHsWGT31uKoghQ5rUQZzQJBAFy+Z4+ZVMs=
golang.org/x/crypto v0.0.0-20220817201139-bc19a97f63c8/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

7
kgo.go
View File

@@ -196,8 +196,10 @@ func (k *Broker) Publish(ctx context.Context, topic string, msg *broker.Message,
func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
k.RLock()
if !k.connected {
k.RUnlock()
ok := k.connected
k.RUnlock()
if !ok {
k.Lock()
c, err := k.connect(ctx, k.kopts...)
if err != nil {
@@ -208,7 +210,6 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br
k.connected = true
k.Unlock()
}
k.RUnlock()
options := broker.NewPublishOptions(opts...)
records := make([]*kgo.Record, 0, len(msgs))

View File

@@ -22,7 +22,7 @@ type consumer struct {
partition int32
opts broker.SubscribeOptions
kopts broker.Options
handler interface{}
handler broker.Handler
quit chan struct{}
done chan struct{}
recs chan kgo.FetchTopicPartition
@@ -33,7 +33,7 @@ type subscriber struct {
topic string
opts broker.SubscribeOptions
kopts broker.Options
handler interface{}
handler broker.Handler
closed bool
done chan struct{}
consumers map[tp]*consumer