Compare commits
10 Commits
Author | SHA1 | Date | |
---|---|---|---|
8f7d51ed3b | |||
09f4a15ec4 | |||
b445a7eeb7 | |||
c89df95fdc | |||
9ff61bdb1b | |||
2241de6d87 | |||
a856f20f15 | |||
07fd48cd38 | |||
b2975262af | |||
3f74ec025e |
24
.github/ISSUE_TEMPLATE/bug_report.md
vendored
Normal file
24
.github/ISSUE_TEMPLATE/bug_report.md
vendored
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
---
|
||||||
|
name: Bug report
|
||||||
|
about: For reporting bugs in go-micro
|
||||||
|
title: "[BUG]"
|
||||||
|
labels: ''
|
||||||
|
assignees: ''
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
**Describe the bug**
|
||||||
|
|
||||||
|
1. What are you trying to do?
|
||||||
|
2. What did you expect to happen?
|
||||||
|
3. What happens instead?
|
||||||
|
|
||||||
|
**How to reproduce the bug:**
|
||||||
|
|
||||||
|
If possible, please include a minimal code snippet here.
|
||||||
|
|
||||||
|
**Environment:**
|
||||||
|
Go Version: please paste `go version` output here
|
||||||
|
```
|
||||||
|
please paste `go env` output here
|
||||||
|
```
|
17
.github/ISSUE_TEMPLATE/feature-request---enhancement.md
vendored
Normal file
17
.github/ISSUE_TEMPLATE/feature-request---enhancement.md
vendored
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
---
|
||||||
|
name: Feature request / Enhancement
|
||||||
|
about: If you have a need not served by go-micro
|
||||||
|
title: "[FEATURE]"
|
||||||
|
labels: ''
|
||||||
|
assignees: ''
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
**Is your feature request related to a problem? Please describe.**
|
||||||
|
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
|
||||||
|
|
||||||
|
**Describe the solution you'd like**
|
||||||
|
A clear and concise description of what you want to happen.
|
||||||
|
|
||||||
|
**Additional context**
|
||||||
|
Add any other context or screenshots about the feature request here.
|
14
.github/ISSUE_TEMPLATE/question.md
vendored
Normal file
14
.github/ISSUE_TEMPLATE/question.md
vendored
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
---
|
||||||
|
name: Question
|
||||||
|
about: Ask a question about go-micro
|
||||||
|
title: ''
|
||||||
|
labels: ''
|
||||||
|
assignees: ''
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
Before asking, please check if your question has already been answered:
|
||||||
|
|
||||||
|
1. Check the documentation - https://micro.mu/docs/
|
||||||
|
2. Check the examples and plugins - https://github.com/micro/examples & https://github.com/micro/go-plugins
|
||||||
|
3. Search existing issues
|
9
.github/PULL_REQUEST_TEMPLATE.md
vendored
Normal file
9
.github/PULL_REQUEST_TEMPLATE.md
vendored
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
## Pull Request template
|
||||||
|
Please, go through these steps before clicking submit on this PR.
|
||||||
|
|
||||||
|
1. Give a descriptive title to your PR.
|
||||||
|
2. Provide a description of your changes.
|
||||||
|
3. Make sure you have some relevant tests.
|
||||||
|
4. Put `closes #XXXX` in your comment to auto-close the issue that your PR fixes (if applicable).
|
||||||
|
|
||||||
|
**PLEASE REMOVE THIS TEMPLATE BEFORE SUBMITTING**
|
19
.github/dependabot.yml
vendored
Normal file
19
.github/dependabot.yml
vendored
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
# To get started with Dependabot version updates, you'll need to specify which
|
||||||
|
# package ecosystems to update and where the package manifests are located.
|
||||||
|
# Please see the documentation for all configuration options:
|
||||||
|
# https://help.github.com/github/administering-a-repository/configuration-options-for-dependency-updates
|
||||||
|
|
||||||
|
version: 2
|
||||||
|
updates:
|
||||||
|
|
||||||
|
# Maintain dependencies for GitHub Actions
|
||||||
|
- package-ecosystem: "github-actions"
|
||||||
|
directory: "/"
|
||||||
|
schedule:
|
||||||
|
interval: "daily"
|
||||||
|
|
||||||
|
# Maintain dependencies for Golang
|
||||||
|
- package-ecosystem: "gomod"
|
||||||
|
directory: "/"
|
||||||
|
schedule:
|
||||||
|
interval: "daily"
|
46
.github/workflows/build.yml
vendored
Normal file
46
.github/workflows/build.yml
vendored
Normal file
@@ -0,0 +1,46 @@
|
|||||||
|
name: build
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
branches:
|
||||||
|
- master
|
||||||
|
jobs:
|
||||||
|
test:
|
||||||
|
name: test
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- name: setup
|
||||||
|
uses: actions/setup-go@v2
|
||||||
|
with:
|
||||||
|
go-version: 1.16
|
||||||
|
- name: checkout
|
||||||
|
uses: actions/checkout@v2
|
||||||
|
- name: cache
|
||||||
|
uses: actions/cache@v2
|
||||||
|
with:
|
||||||
|
path: ~/go/pkg/mod
|
||||||
|
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
|
||||||
|
restore-keys: ${{ runner.os }}-go-
|
||||||
|
- name: deps
|
||||||
|
run: go get -v -t -d ./...
|
||||||
|
- name: test
|
||||||
|
env:
|
||||||
|
INTEGRATION_TESTS: yes
|
||||||
|
run: go test -mod readonly -v ./...
|
||||||
|
lint:
|
||||||
|
name: lint
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- name: checkout
|
||||||
|
uses: actions/checkout@v2
|
||||||
|
- name: lint
|
||||||
|
uses: golangci/golangci-lint-action@v2
|
||||||
|
continue-on-error: true
|
||||||
|
with:
|
||||||
|
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
|
||||||
|
version: v1.30
|
||||||
|
# Optional: working directory, useful for monorepos
|
||||||
|
# working-directory: somedir
|
||||||
|
# Optional: golangci-lint command line arguments.
|
||||||
|
# args: --issues-exit-code=0
|
||||||
|
# Optional: show only new issues if it's a pull request. The default value is `false`.
|
||||||
|
# only-new-issues: true
|
71
.github/workflows/codeql-analysis.yml
vendored
Normal file
71
.github/workflows/codeql-analysis.yml
vendored
Normal file
@@ -0,0 +1,71 @@
|
|||||||
|
# For most projects, this workflow file will not need changing; you simply need
|
||||||
|
# to commit it to your repository.
|
||||||
|
#
|
||||||
|
# You may wish to alter this file to override the set of languages analyzed,
|
||||||
|
# or to provide custom queries or build logic.
|
||||||
|
#
|
||||||
|
# ******** NOTE ********
|
||||||
|
# We have attempted to detect the languages in your repository. Please check
|
||||||
|
# the `language` matrix defined below to confirm you have the correct set of
|
||||||
|
# supported CodeQL languages.
|
||||||
|
#
|
||||||
|
name: "CodeQL"
|
||||||
|
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
branches: [ master ]
|
||||||
|
pull_request:
|
||||||
|
# The branches below must be a subset of the branches above
|
||||||
|
branches: [ master ]
|
||||||
|
schedule:
|
||||||
|
- cron: '34 1 * * 0'
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
analyze:
|
||||||
|
name: Analyze
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
permissions:
|
||||||
|
actions: read
|
||||||
|
contents: read
|
||||||
|
security-events: write
|
||||||
|
|
||||||
|
strategy:
|
||||||
|
fail-fast: false
|
||||||
|
matrix:
|
||||||
|
language: [ 'go' ]
|
||||||
|
# CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python' ]
|
||||||
|
# Learn more:
|
||||||
|
# https://docs.github.com/en/free-pro-team@latest/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#changing-the-languages-that-are-analyzed
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- name: Checkout repository
|
||||||
|
uses: actions/checkout@v2
|
||||||
|
|
||||||
|
# Initializes the CodeQL tools for scanning.
|
||||||
|
- name: Initialize CodeQL
|
||||||
|
uses: github/codeql-action/init@v1
|
||||||
|
with:
|
||||||
|
languages: ${{ matrix.language }}
|
||||||
|
# If you wish to specify custom queries, you can do so here or in a config file.
|
||||||
|
# By default, queries listed here will override any specified in a config file.
|
||||||
|
# Prefix the list here with "+" to use these queries and those in the config file.
|
||||||
|
# queries: ./path/to/local/query, your-org/your-repo/queries@main
|
||||||
|
|
||||||
|
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
|
||||||
|
# If this step fails, then you should remove it and run the build manually (see below)
|
||||||
|
- name: Autobuild
|
||||||
|
uses: github/codeql-action/autobuild@v1
|
||||||
|
|
||||||
|
# ℹ️ Command-line programs to run using the OS shell.
|
||||||
|
# 📚 https://git.io/JvXDl
|
||||||
|
|
||||||
|
# ✏️ If the Autobuild fails above, remove it and uncomment the following three lines
|
||||||
|
# and modify them (or add more) to build your code if your project
|
||||||
|
# uses a compiled language
|
||||||
|
|
||||||
|
#- run: |
|
||||||
|
# make bootstrap
|
||||||
|
# make release
|
||||||
|
|
||||||
|
- name: Perform CodeQL Analysis
|
||||||
|
uses: github/codeql-action/analyze@v1
|
46
.github/workflows/pr.yml
vendored
Normal file
46
.github/workflows/pr.yml
vendored
Normal file
@@ -0,0 +1,46 @@
|
|||||||
|
name: prbuild
|
||||||
|
on:
|
||||||
|
pull_request:
|
||||||
|
branches:
|
||||||
|
- master
|
||||||
|
jobs:
|
||||||
|
test:
|
||||||
|
name: test
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- name: setup
|
||||||
|
uses: actions/setup-go@v2
|
||||||
|
with:
|
||||||
|
go-version: 1.16
|
||||||
|
- name: checkout
|
||||||
|
uses: actions/checkout@v2
|
||||||
|
- name: cache
|
||||||
|
uses: actions/cache@v2
|
||||||
|
with:
|
||||||
|
path: ~/go/pkg/mod
|
||||||
|
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
|
||||||
|
restore-keys: ${{ runner.os }}-go-
|
||||||
|
- name: deps
|
||||||
|
run: go get -v -t -d ./...
|
||||||
|
- name: test
|
||||||
|
env:
|
||||||
|
INTEGRATION_TESTS: yes
|
||||||
|
run: go test -mod readonly -v ./...
|
||||||
|
lint:
|
||||||
|
name: lint
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- name: checkout
|
||||||
|
uses: actions/checkout@v2
|
||||||
|
- name: lint
|
||||||
|
uses: golangci/golangci-lint-action@v2
|
||||||
|
continue-on-error: true
|
||||||
|
with:
|
||||||
|
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
|
||||||
|
version: v1.30
|
||||||
|
# Optional: working directory, useful for monorepos
|
||||||
|
# working-directory: somedir
|
||||||
|
# Optional: golangci-lint command line arguments.
|
||||||
|
# args: --issues-exit-code=0
|
||||||
|
# Optional: show only new issues if it's a pull request. The default value is `false`.
|
||||||
|
# only-new-issues: true
|
10
go.mod
10
go.mod
@@ -3,11 +3,11 @@ module github.com/unistack-org/micro-broker-kgo/v3
|
|||||||
go 1.16
|
go 1.16
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/twmb/franz-go v0.10.3-0.20210825060253-d5e80b38ca2b
|
github.com/klauspost/compress v1.13.6 // indirect
|
||||||
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210826221812-c6df11da978a
|
github.com/twmb/franz-go v1.0.0
|
||||||
|
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210914174821-2f676c0a574b // indirect
|
||||||
github.com/unistack-org/micro-codec-json/v3 v3.2.5
|
github.com/unistack-org/micro-codec-json/v3 v3.2.5
|
||||||
github.com/unistack-org/micro-proto v0.0.8 // indirect
|
github.com/unistack-org/micro/v3 v3.7.1
|
||||||
github.com/unistack-org/micro/v3 v3.6.3
|
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect
|
||||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
|
|
||||||
google.golang.org/protobuf v1.27.1 // indirect
|
google.golang.org/protobuf v1.27.1 // indirect
|
||||||
)
|
)
|
||||||
|
33
go.sum
33
go.sum
@@ -21,9 +21,6 @@ github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvq
|
|||||||
github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
|
github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
|
||||||
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
|
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
|
||||||
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
|
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
|
||||||
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
|
||||||
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
|
|
||||||
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
|
||||||
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
|
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
|
||||||
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||||
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||||
@@ -42,8 +39,10 @@ github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/U
|
|||||||
github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg=
|
github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg=
|
||||||
github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aWNYyvBVK62bc=
|
github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aWNYyvBVK62bc=
|
||||||
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
|
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
|
||||||
github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEEeX6s=
|
github.com/klauspost/compress v1.13.5 h1:9O69jUPDcsT9fEm74W92rZL9FQY7rCdaXVneq+yyzl4=
|
||||||
github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
|
github.com/klauspost/compress v1.13.5/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
|
||||||
|
github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc=
|
||||||
|
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
|
||||||
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
|
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
|
||||||
github.com/pierrec/lz4/v4 v4.1.8 h1:ieHkV+i2BRzngO4Wd/3HGowuZStgq6QkPsD1eolNAO4=
|
github.com/pierrec/lz4/v4 v4.1.8 h1:ieHkV+i2BRzngO4Wd/3HGowuZStgq6QkPsD1eolNAO4=
|
||||||
github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
|
github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
|
||||||
@@ -56,21 +55,25 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
|
|||||||
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||||
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
|
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
|
||||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||||
github.com/twmb/franz-go v0.10.3-0.20210825060253-d5e80b38ca2b h1:t7+Qdd/GHtUBN6D71CwwOEfsZmK0QQ/up3M6gaoaHQ4=
|
github.com/twmb/franz-go v1.0.0 h1:JcsqEImDhr7H/eQx/V58fwzjmVi2FRwz6dyylmdJ0NU=
|
||||||
github.com/twmb/franz-go v0.10.3-0.20210825060253-d5e80b38ca2b/go.mod h1:Txc5/v0DIKGcdCa1VZhyasECMJ4svN/LHUKYUPYihL0=
|
github.com/twmb/franz-go v1.0.0/go.mod h1:cdFLk8d/5/ox88y38xgiDKP3Yo338OO0t5QbTEM2K6I=
|
||||||
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210823212011-0d01f7456b4d/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
|
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210901051457-3c197a133ddd h1:o+cb+mqRFpVKrusJy/Ebt4zTWn94nPbI1ooMeNhovqM=
|
||||||
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210826221812-c6df11da978a h1:UBO58MRI/GH5eD5+fKXZoJMGyEcsUs9LQnHJbNL0yKs=
|
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210901051457-3c197a133ddd/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
|
||||||
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210826221812-c6df11da978a/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
|
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210914174821-2f676c0a574b h1:7d6eRt9HEqXVxMzD2fry9qtJ0kRkgeJ5olqW9K+aXv8=
|
||||||
|
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210914174821-2f676c0a574b/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
|
||||||
github.com/twmb/go-rbtree v1.0.0 h1:KxN7dXJ8XaZ4cvmHV1qqXTshxX3EBvX/toG5+UR49Mg=
|
github.com/twmb/go-rbtree v1.0.0 h1:KxN7dXJ8XaZ4cvmHV1qqXTshxX3EBvX/toG5+UR49Mg=
|
||||||
github.com/twmb/go-rbtree v1.0.0/go.mod h1:UlIAI8gu3KRPkXSobZnmJfVwCJgEhD/liWzT5ppzIyc=
|
github.com/twmb/go-rbtree v1.0.0/go.mod h1:UlIAI8gu3KRPkXSobZnmJfVwCJgEhD/liWzT5ppzIyc=
|
||||||
github.com/unistack-org/micro-codec-json/v3 v3.2.5 h1:WOilhbL0YSu58iIQIIxpawRYZyx6CR16tCpbX4ai3Vc=
|
github.com/unistack-org/micro-codec-json/v3 v3.2.5 h1:WOilhbL0YSu58iIQIIxpawRYZyx6CR16tCpbX4ai3Vc=
|
||||||
github.com/unistack-org/micro-codec-json/v3 v3.2.5/go.mod h1:LSzfrD9GYWCl6KOyihywx1wlbOgStrpyy3NVHNZAvHA=
|
github.com/unistack-org/micro-codec-json/v3 v3.2.5/go.mod h1:LSzfrD9GYWCl6KOyihywx1wlbOgStrpyy3NVHNZAvHA=
|
||||||
github.com/unistack-org/micro-proto v0.0.5/go.mod h1:EuI7UlfGXmT1hy6WacULib9LbNgRnDYQvTCFoLgKM2I=
|
|
||||||
github.com/unistack-org/micro-proto v0.0.8 h1:g4UZGQGeYGI3CFJtjuEm47aouYPviG8SDhSifl0831w=
|
github.com/unistack-org/micro-proto v0.0.8 h1:g4UZGQGeYGI3CFJtjuEm47aouYPviG8SDhSifl0831w=
|
||||||
github.com/unistack-org/micro-proto v0.0.8/go.mod h1:GYO53DWmeldRIo90cAdQx8bLr/WJMxW62W4ja74p1Ac=
|
github.com/unistack-org/micro-proto v0.0.8/go.mod h1:GYO53DWmeldRIo90cAdQx8bLr/WJMxW62W4ja74p1Ac=
|
||||||
|
github.com/unistack-org/micro-proto v0.0.9 h1:KrWLS4FUX7UAWNAilQf70uad6ZPf/0EudeddCXllRVc=
|
||||||
|
github.com/unistack-org/micro-proto v0.0.9/go.mod h1:Cckwmzd89gvS7ThxzZp9kQR/EOdksFQcsTAtDDyKwrg=
|
||||||
github.com/unistack-org/micro/v3 v3.3.19/go.mod h1:LXmPfbJnJNvL0kQs8HfnkV3Wya2Wb+C7keVq++RCZnk=
|
github.com/unistack-org/micro/v3 v3.3.19/go.mod h1:LXmPfbJnJNvL0kQs8HfnkV3Wya2Wb+C7keVq++RCZnk=
|
||||||
github.com/unistack-org/micro/v3 v3.6.3 h1:CvC4B2kOgwzhPx+w9fcbEIJkgzxS7i84PRB+vL6Vz0U=
|
github.com/unistack-org/micro/v3 v3.7.0 h1:jEEegoVh1VIgT/+4gHw3TmwI3p7ufAdV37RVNxx9kNc=
|
||||||
github.com/unistack-org/micro/v3 v3.6.3/go.mod h1:Hp+DmX1Bt+/z+ihk5coYwNtXhPMwr4SJuCFRy2kyUl8=
|
github.com/unistack-org/micro/v3 v3.7.0/go.mod h1:xXGbjNQShqlth0hv+q7ijGvciXGVqxUnVdkFm0t95rk=
|
||||||
|
github.com/unistack-org/micro/v3 v3.7.1 h1:gjCon1U8i9upNgw9+iEgbZh2LCeBizDYotQ+THHV0lo=
|
||||||
|
github.com/unistack-org/micro/v3 v3.7.1/go.mod h1:gBoY6gvzeFiJTZ4FgDttGNSs4Y1+1PRg2cV1yTRMSlg=
|
||||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||||
golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||||
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||||
@@ -86,13 +89,11 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
|
|||||||
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||||
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
|
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
|
||||||
golang.org/x/net v0.0.0-20210510120150-4163338589ed/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
golang.org/x/net v0.0.0-20210510120150-4163338589ed/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||||
golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
golang.org/x/net v0.0.0-20210825183410-e898025ed96a/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||||
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||||
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
|
|
||||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
|
||||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||||
|
239
kgo.go
239
kgo.go
@@ -4,29 +4,18 @@ package kgo
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
kerr "github.com/twmb/franz-go/pkg/kerr"
|
|
||||||
kgo "github.com/twmb/franz-go/pkg/kgo"
|
kgo "github.com/twmb/franz-go/pkg/kgo"
|
||||||
kmsg "github.com/twmb/franz-go/pkg/kmsg"
|
|
||||||
"github.com/unistack-org/micro/v3/broker"
|
"github.com/unistack-org/micro/v3/broker"
|
||||||
"github.com/unistack-org/micro/v3/logger"
|
"github.com/unistack-org/micro/v3/logger"
|
||||||
"github.com/unistack-org/micro/v3/metadata"
|
"github.com/unistack-org/micro/v3/metadata"
|
||||||
"github.com/unistack-org/micro/v3/util/id"
|
"github.com/unistack-org/micro/v3/util/id"
|
||||||
mrand "github.com/unistack-org/micro/v3/util/rand"
|
mrand "github.com/unistack-org/micro/v3/util/rand"
|
||||||
"golang.org/x/sync/errgroup"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var pPool = sync.Pool{
|
|
||||||
New: func() interface{} {
|
|
||||||
return &publication{msg: &broker.Message{}}
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
type kBroker struct {
|
type kBroker struct {
|
||||||
writer *kgo.Client // used only to push messages
|
writer *kgo.Client // used only to push messages
|
||||||
kopts []kgo.Opt
|
kopts []kgo.Opt
|
||||||
@@ -46,6 +35,7 @@ type subscriber struct {
|
|||||||
batchhandler broker.BatchHandler
|
batchhandler broker.BatchHandler
|
||||||
closed bool
|
closed bool
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
|
consumers map[string]map[int32]worker
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -57,10 +47,6 @@ type publication struct {
|
|||||||
ack bool
|
ack bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
|
||||||
rand.Seed(time.Now().UnixNano())
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *publication) Topic() string {
|
func (p *publication) Topic() string {
|
||||||
return p.topic
|
return p.topic
|
||||||
}
|
}
|
||||||
@@ -128,7 +114,8 @@ func (k *kBroker) Connect(ctx context.Context) error {
|
|||||||
kaddrs := k.opts.Addrs
|
kaddrs := k.opts.Addrs
|
||||||
|
|
||||||
// shuffle addrs
|
// shuffle addrs
|
||||||
rand.Shuffle(len(kaddrs), func(i, j int) {
|
var rng mrand.Rand
|
||||||
|
rng.Shuffle(len(kaddrs), func(i, j int) {
|
||||||
kaddrs[i], kaddrs[j] = kaddrs[j], kaddrs[i]
|
kaddrs[i], kaddrs[j] = kaddrs[j], kaddrs[i]
|
||||||
})
|
})
|
||||||
|
|
||||||
@@ -327,22 +314,42 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha
|
|||||||
kaddrs := k.opts.Addrs
|
kaddrs := k.opts.Addrs
|
||||||
|
|
||||||
// shuffle addrs
|
// shuffle addrs
|
||||||
rand.Shuffle(len(kaddrs), func(i, j int) {
|
var rng mrand.Rand
|
||||||
|
rng.Shuffle(len(kaddrs), func(i, j int) {
|
||||||
kaddrs[i], kaddrs[j] = kaddrs[j], kaddrs[i]
|
kaddrs[i], kaddrs[j] = kaddrs[j], kaddrs[i]
|
||||||
})
|
})
|
||||||
|
|
||||||
|
td := DefaultCommitInterval
|
||||||
|
if k.opts.Context != nil {
|
||||||
|
if v, ok := k.opts.Context.Value(commitIntervalKey{}).(time.Duration); ok && v > 0 {
|
||||||
|
td = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sub := &subscriber{
|
||||||
|
topic: topic,
|
||||||
|
done: make(chan struct{}),
|
||||||
|
opts: options,
|
||||||
|
handler: handler,
|
||||||
|
kopts: k.opts,
|
||||||
|
consumers: make(map[string]map[int32]worker),
|
||||||
|
}
|
||||||
|
|
||||||
kopts := append(k.kopts,
|
kopts := append(k.kopts,
|
||||||
kgo.SeedBrokers(kaddrs...),
|
kgo.SeedBrokers(kaddrs...),
|
||||||
kgo.ConsumerGroup(options.Group),
|
kgo.ConsumerGroup(options.Group),
|
||||||
kgo.ConsumeTopics(topic),
|
kgo.ConsumeTopics(topic),
|
||||||
kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),
|
kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),
|
||||||
kgo.DisableAutoCommit(),
|
|
||||||
kgo.FetchMaxWait(1*time.Second),
|
kgo.FetchMaxWait(1*time.Second),
|
||||||
// kgo.KeepControlRecords(),
|
// kgo.KeepControlRecords(),
|
||||||
kgo.Balancers(kgo.CooperativeStickyBalancer(), kgo.StickyBalancer()),
|
kgo.Balancers(kgo.CooperativeStickyBalancer(), kgo.StickyBalancer()),
|
||||||
kgo.FetchIsolationLevel(kgo.ReadUncommitted()),
|
kgo.FetchIsolationLevel(kgo.ReadUncommitted()),
|
||||||
kgo.WithHooks(&metrics{meter: k.opts.Meter}),
|
kgo.WithHooks(&metrics{meter: k.opts.Meter}),
|
||||||
// TODO: must set https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#OnRevoked
|
kgo.AutoCommitMarks(),
|
||||||
|
kgo.AutoCommitInterval(td),
|
||||||
|
kgo.OnPartitionsAssigned(sub.assigned),
|
||||||
|
kgo.OnPartitionsRevoked(sub.revoked),
|
||||||
|
kgo.OnPartitionsLost(sub.revoked),
|
||||||
)
|
)
|
||||||
|
|
||||||
reader, err := kgo.NewClient(kopts...)
|
reader, err := kgo.NewClient(kopts...)
|
||||||
@@ -350,7 +357,7 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
sub := &subscriber{topic: topic, done: make(chan struct{}), opts: options, reader: reader, handler: handler, kopts: k.opts}
|
sub.reader = reader
|
||||||
go sub.run(ctx)
|
go sub.run(ctx)
|
||||||
|
|
||||||
k.Lock()
|
k.Lock()
|
||||||
@@ -359,195 +366,6 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha
|
|||||||
return sub, nil
|
return sub, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *subscriber) run(ctx context.Context) {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case <-s.kopts.Context.Done():
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
fetches := s.reader.PollFetches(ctx)
|
|
||||||
if fetches.IsClientClosed() {
|
|
||||||
// TODO: fatal ?
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if len(fetches.Errors()) > 0 {
|
|
||||||
for _, err := range fetches.Errors() {
|
|
||||||
s.kopts.Logger.Errorf(ctx, "fetch err topic %s partition %d: %v", err.Topic, err.Partition, err.Err)
|
|
||||||
}
|
|
||||||
// TODO: fatal ?
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := s.handleFetches(ctx, fetches); err != nil {
|
|
||||||
s.kopts.Logger.Errorf(ctx, "fetch handler err: %v", err)
|
|
||||||
// TODO: fatal ?
|
|
||||||
// return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) error {
|
|
||||||
var err error
|
|
||||||
|
|
||||||
eh := s.kopts.ErrorHandler
|
|
||||||
if s.opts.ErrorHandler != nil {
|
|
||||||
eh = s.opts.ErrorHandler
|
|
||||||
}
|
|
||||||
|
|
||||||
var mu sync.Mutex
|
|
||||||
|
|
||||||
done := int32(0)
|
|
||||||
doneCh := make(chan struct{})
|
|
||||||
g, gctx := errgroup.WithContext(ctx)
|
|
||||||
|
|
||||||
td := DefaultCommitInterval
|
|
||||||
if s.kopts.Context != nil {
|
|
||||||
if v, ok := s.kopts.Context.Value(commitIntervalKey{}).(time.Duration); ok && v > 0 {
|
|
||||||
td = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ticker for commit offsets
|
|
||||||
ticker := time.NewTicker(td)
|
|
||||||
defer ticker.Stop()
|
|
||||||
|
|
||||||
offsets := make(map[string]map[int32]kgo.EpochOffset)
|
|
||||||
offsets[s.topic] = make(map[int32]kgo.EpochOffset)
|
|
||||||
|
|
||||||
fillOffsets := func(off map[string]map[int32]kgo.EpochOffset, rec *kgo.Record) {
|
|
||||||
mu.Lock()
|
|
||||||
if at, ok := off[s.topic][rec.Partition]; ok {
|
|
||||||
if at.Epoch > rec.LeaderEpoch || at.Epoch == rec.LeaderEpoch && at.Offset > rec.Offset {
|
|
||||||
mu.Unlock()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
off[s.topic][rec.Partition] = kgo.EpochOffset{Epoch: rec.LeaderEpoch, Offset: rec.Offset + 1}
|
|
||||||
mu.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
commitOffsets := func(cl *kgo.Client, ctx context.Context, off map[string]map[int32]kgo.EpochOffset) error {
|
|
||||||
var rerr error
|
|
||||||
|
|
||||||
mu.Lock()
|
|
||||||
offsets := off
|
|
||||||
mu.Unlock()
|
|
||||||
|
|
||||||
cl.CommitOffsetsSync(ctx, offsets, func(_ *kgo.Client, _ *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
|
|
||||||
if err != nil {
|
|
||||||
rerr = err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, topic := range resp.Topics {
|
|
||||||
for _, partition := range topic.Partitions {
|
|
||||||
if err := kerr.ErrorForCode(partition.ErrorCode); err != nil {
|
|
||||||
rerr = err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
return rerr
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-gctx.Done():
|
|
||||||
return
|
|
||||||
case <-s.done:
|
|
||||||
atomic.StoreInt32(&done, 1)
|
|
||||||
if err := commitOffsets(s.reader, ctx, offsets); err != nil && s.kopts.Logger.V(logger.ErrorLevel) {
|
|
||||||
s.kopts.Logger.Errorf(s.kopts.Context, "[kgo]: failed to commit offsets: %v", err)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
case <-doneCh:
|
|
||||||
return
|
|
||||||
case <-ticker.C:
|
|
||||||
if err := commitOffsets(s.reader, ctx, offsets); err != nil {
|
|
||||||
if s.kopts.Logger.V(logger.ErrorLevel) {
|
|
||||||
s.kopts.Logger.Errorf(s.kopts.Context, "[kgo]: failed to commit offsets: %v", err)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
for _, fetch := range fetches {
|
|
||||||
for _, ftopic := range fetch.Topics {
|
|
||||||
for _, partition := range ftopic.Partitions {
|
|
||||||
precords := partition.Records
|
|
||||||
g.Go(func() error {
|
|
||||||
for _, record := range precords {
|
|
||||||
if atomic.LoadInt32(&done) == 1 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
p := pPool.Get().(*publication)
|
|
||||||
p.msg.Header = nil
|
|
||||||
p.msg.Body = nil
|
|
||||||
p.topic = s.topic
|
|
||||||
p.err = nil
|
|
||||||
p.ack = false
|
|
||||||
if s.opts.BodyOnly {
|
|
||||||
p.msg.Body = record.Value
|
|
||||||
} else {
|
|
||||||
if err := s.kopts.Codec.Unmarshal(record.Value, p.msg); err != nil {
|
|
||||||
p.err = err
|
|
||||||
p.msg.Body = record.Value
|
|
||||||
if eh != nil {
|
|
||||||
_ = eh(p)
|
|
||||||
if p.ack {
|
|
||||||
fillOffsets(offsets, record)
|
|
||||||
}
|
|
||||||
pPool.Put(p)
|
|
||||||
continue
|
|
||||||
} else {
|
|
||||||
if s.kopts.Logger.V(logger.ErrorLevel) {
|
|
||||||
s.kopts.Logger.Errorf(s.kopts.Context, "[kgo]: failed to unmarshal: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pPool.Put(p)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
err = s.handler(p)
|
|
||||||
if err == nil && s.opts.AutoAck {
|
|
||||||
p.ack = true
|
|
||||||
} else if err != nil {
|
|
||||||
p.err = err
|
|
||||||
if eh != nil {
|
|
||||||
_ = eh(p)
|
|
||||||
} else {
|
|
||||||
if s.kopts.Logger.V(logger.ErrorLevel) {
|
|
||||||
s.kopts.Logger.Errorf(s.kopts.Context, "[kgo]: subscriber error: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if p.ack {
|
|
||||||
fillOffsets(offsets, record)
|
|
||||||
}
|
|
||||||
pPool.Put(p)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
if err := g.Wait(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
close(doneCh)
|
|
||||||
|
|
||||||
return commitOffsets(s.reader, ctx, offsets)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (k *kBroker) String() string {
|
func (k *kBroker) String() string {
|
||||||
return "kgo"
|
return "kgo"
|
||||||
}
|
}
|
||||||
@@ -555,7 +373,8 @@ func (k *kBroker) String() string {
|
|||||||
func NewBroker(opts ...broker.Option) broker.Broker {
|
func NewBroker(opts ...broker.Option) broker.Broker {
|
||||||
options := broker.NewOptions(opts...)
|
options := broker.NewOptions(opts...)
|
||||||
kopts := []kgo.Opt{
|
kopts := []kgo.Opt{
|
||||||
kgo.BatchCompression(kgo.NoCompression()),
|
kgo.DisableIdempotentWrite(),
|
||||||
|
kgo.ProducerBatchCompression(kgo.NoCompression()),
|
||||||
kgo.WithLogger(&mlogger{l: options.Logger, ctx: options.Context}),
|
kgo.WithLogger(&mlogger{l: options.Logger, ctx: options.Context}),
|
||||||
kgo.RetryBackoffFn(
|
kgo.RetryBackoffFn(
|
||||||
func() func(int) time.Duration {
|
func() func(int) time.Duration {
|
||||||
|
33
kgo_test.go
33
kgo_test.go
@@ -10,11 +10,19 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
kg "github.com/twmb/franz-go/pkg/kgo"
|
kg "github.com/twmb/franz-go/pkg/kgo"
|
||||||
kgo "github.com/unistack-org/micro-broker-kgo/v3"
|
|
||||||
jsoncodec "github.com/unistack-org/micro-codec-json/v3"
|
jsoncodec "github.com/unistack-org/micro-codec-json/v3"
|
||||||
"github.com/unistack-org/micro/v3/broker"
|
"github.com/unistack-org/micro/v3/broker"
|
||||||
"github.com/unistack-org/micro/v3/logger"
|
"github.com/unistack-org/micro/v3/logger"
|
||||||
"github.com/unistack-org/micro/v3/metadata"
|
"github.com/unistack-org/micro/v3/metadata"
|
||||||
|
|
||||||
|
kgo "github.com/unistack-org/micro-broker-kgo/v3"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
msgcnt = int64(12000000)
|
||||||
|
group = "38"
|
||||||
|
prefill = false
|
||||||
|
loglevel = logger.InfoLevel
|
||||||
)
|
)
|
||||||
|
|
||||||
var bm = &broker.Message{
|
var bm = &broker.Message{
|
||||||
@@ -27,7 +35,9 @@ func TestPubSub(t *testing.T) {
|
|||||||
t.Skip()
|
t.Skip()
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.DefaultLogger.Init(logger.WithLevel(logger.TraceLevel), logger.WithCallerSkipCount(3))
|
if err := logger.DefaultLogger.Init(logger.WithLevel(loglevel), logger.WithCallerSkipCount(3)); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
var addrs []string
|
var addrs []string
|
||||||
@@ -56,18 +66,17 @@ func TestPubSub(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
if prefill {
|
||||||
/*
|
msgs := make([]*broker.Message, 0, msgcnt)
|
||||||
msgs := make([]*broker.Message, 0, 600000)
|
for i := int64(0); i < msgcnt; i++ {
|
||||||
for i := 0; i < 600000; i++ {
|
|
||||||
msgs = append(msgs, bm)
|
msgs = append(msgs, bm)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := b.BatchPublish(ctx, msgs); err != nil {
|
if err := b.BatchPublish(ctx, msgs); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
t.Skip()
|
// t.Skip()
|
||||||
*/
|
}
|
||||||
done := make(chan bool, 1)
|
done := make(chan bool, 1)
|
||||||
idx := int64(0)
|
idx := int64(0)
|
||||||
fn := func(msg broker.Event) error {
|
fn := func(msg broker.Event) error {
|
||||||
@@ -78,7 +87,7 @@ func TestPubSub(t *testing.T) {
|
|||||||
|
|
||||||
sub, err := b.Subscribe(ctx, "test", fn,
|
sub, err := b.Subscribe(ctx, "test", fn,
|
||||||
broker.SubscribeAutoAck(true),
|
broker.SubscribeAutoAck(true),
|
||||||
broker.SubscribeGroup("test29"),
|
broker.SubscribeGroup(group),
|
||||||
broker.SubscribeBodyOnly(true))
|
broker.SubscribeBodyOnly(true))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@@ -98,7 +107,11 @@ func TestPubSub(t *testing.T) {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-pticker.C:
|
case <-pticker.C:
|
||||||
fmt.Printf("processed %v\n", atomic.LoadInt64(&idx))
|
if prc := atomic.LoadInt64(&idx); prc == msgcnt {
|
||||||
|
close(done)
|
||||||
|
} else {
|
||||||
|
fmt.Printf("processed %v\n", prc)
|
||||||
|
}
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
close(done)
|
close(done)
|
||||||
}
|
}
|
||||||
|
@@ -40,10 +40,10 @@ const (
|
|||||||
|
|
||||||
metricBrokerThrottleLatencies = "broker_throttle_latencies"
|
metricBrokerThrottleLatencies = "broker_throttle_latencies"
|
||||||
|
|
||||||
metricBrokerProduceBytesCompressed = "produce_bytes_compressed_total"
|
metricBrokerProduceBytesCompressed = "broker_produce_bytes_compressed_total"
|
||||||
metricBrokerProduceBytesUncompressed = "produce_bytes_uncompressed_total"
|
metricBrokerProduceBytesUncompressed = "broker_produce_bytes_uncompressed_total"
|
||||||
metricBrokerFetchBytesUncompressed = "broker_fetch_bytes_uncompressed_total"
|
metricBrokerFetchBytesCompressed = "broker_consume_bytes_compressed_total"
|
||||||
metricBrokerFetchBytesCompressed = "broker_fetch_bytes_compressed_total"
|
metricBrokerFetchBytesUncompressed = "broker_consume_bytes_uncompressed_total"
|
||||||
|
|
||||||
metricBrokerGroupErrors = "broker_group_errors_total"
|
metricBrokerGroupErrors = "broker_group_errors_total"
|
||||||
|
|
||||||
|
@@ -54,3 +54,12 @@ type commitIntervalKey struct{}
|
|||||||
func CommitInterval(td time.Duration) broker.Option {
|
func CommitInterval(td time.Duration) broker.Option {
|
||||||
return broker.SetOption(commitIntervalKey{}, td)
|
return broker.SetOption(commitIntervalKey{}, td)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var DefaultSubscribeMaxInflight = 1000
|
||||||
|
|
||||||
|
type subscribeMaxInflightKey struct{}
|
||||||
|
|
||||||
|
// SubscribeMaxInFlight specifies interval to send commits
|
||||||
|
func SubscribeMaxInFlight(n int) broker.SubscribeOption {
|
||||||
|
return broker.SetSubscribeOption(subscribeMaxInflightKey{}, n)
|
||||||
|
}
|
||||||
|
213
util.go
Normal file
213
util.go
Normal file
@@ -0,0 +1,213 @@
|
|||||||
|
package kgo
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
kgo "github.com/twmb/franz-go/pkg/kgo"
|
||||||
|
"github.com/unistack-org/micro/v3/broker"
|
||||||
|
"github.com/unistack-org/micro/v3/logger"
|
||||||
|
)
|
||||||
|
|
||||||
|
var ErrLostMessage = errors.New("message not marked for offsets commit and will be lost in next iteration")
|
||||||
|
|
||||||
|
var pPool = sync.Pool{
|
||||||
|
New: func() interface{} {
|
||||||
|
return &publication{msg: &broker.Message{}}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
type worker struct {
|
||||||
|
done chan struct{}
|
||||||
|
recs chan []*kgo.Record
|
||||||
|
cherr chan error
|
||||||
|
handler broker.Handler
|
||||||
|
batchHandler broker.BatchHandler
|
||||||
|
opts broker.SubscribeOptions
|
||||||
|
kopts broker.Options
|
||||||
|
tpmap map[string][]int32
|
||||||
|
maxInflight int
|
||||||
|
reader *kgo.Client
|
||||||
|
ctx context.Context
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *subscriber) run(ctx context.Context) {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-s.kopts.Context.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
fetches := s.reader.PollFetches(ctx)
|
||||||
|
if fetches.IsClientClosed() {
|
||||||
|
// TODO: fatal ?
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(fetches.Errors()) > 0 {
|
||||||
|
for _, err := range fetches.Errors() {
|
||||||
|
s.kopts.Logger.Fatalf(ctx, "fetch err topic %s partition %d: %v", err.Topic, err.Partition, err.Err)
|
||||||
|
}
|
||||||
|
// TODO: fatal ?
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
fetches.EachPartition(func(p kgo.FetchTopicPartition) {
|
||||||
|
s.Lock()
|
||||||
|
consumers := s.consumers[p.Topic]
|
||||||
|
s.Unlock()
|
||||||
|
if consumers == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w, ok := consumers[p.Partition]
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case err := <-w.cherr:
|
||||||
|
s.kopts.Logger.Fatalf(ctx, "handle err: %v", err)
|
||||||
|
return
|
||||||
|
case w.recs <- p.Records:
|
||||||
|
case <-w.done:
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *subscriber) assigned(ctx context.Context, _ *kgo.Client, assigned map[string][]int32) {
|
||||||
|
maxInflight := DefaultSubscribeMaxInflight
|
||||||
|
|
||||||
|
if s.opts.Context != nil {
|
||||||
|
if n, ok := s.opts.Context.Value(subscribeMaxInflightKey{}).(int); n > 0 && ok {
|
||||||
|
maxInflight = n
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
s.Lock()
|
||||||
|
for topic, partitions := range assigned {
|
||||||
|
if s.consumers[topic] == nil {
|
||||||
|
s.consumers[topic] = make(map[int32]worker)
|
||||||
|
}
|
||||||
|
for _, partition := range partitions {
|
||||||
|
w := worker{
|
||||||
|
done: make(chan struct{}),
|
||||||
|
recs: make(chan []*kgo.Record),
|
||||||
|
cherr: make(chan error),
|
||||||
|
kopts: s.kopts,
|
||||||
|
opts: s.opts,
|
||||||
|
ctx: ctx,
|
||||||
|
tpmap: map[string][]int32{topic: []int32{partition}},
|
||||||
|
reader: s.reader,
|
||||||
|
handler: s.handler,
|
||||||
|
batchHandler: s.batchhandler,
|
||||||
|
maxInflight: maxInflight,
|
||||||
|
}
|
||||||
|
s.consumers[topic][partition] = w
|
||||||
|
go w.handle()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *subscriber) revoked(_ context.Context, _ *kgo.Client, revoked map[string][]int32) {
|
||||||
|
s.Lock()
|
||||||
|
for topic, partitions := range revoked {
|
||||||
|
ptopics := s.consumers[topic]
|
||||||
|
for _, partition := range partitions {
|
||||||
|
w := ptopics[partition]
|
||||||
|
delete(ptopics, partition)
|
||||||
|
if len(ptopics) == 0 {
|
||||||
|
delete(s.consumers, topic)
|
||||||
|
}
|
||||||
|
close(w.done)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *worker) handle() {
|
||||||
|
var err error
|
||||||
|
|
||||||
|
eh := w.kopts.ErrorHandler
|
||||||
|
if w.opts.ErrorHandler != nil {
|
||||||
|
eh = w.opts.ErrorHandler
|
||||||
|
}
|
||||||
|
|
||||||
|
paused := false
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-w.ctx.Done():
|
||||||
|
w.cherr <- w.ctx.Err()
|
||||||
|
return
|
||||||
|
case <-w.done:
|
||||||
|
return
|
||||||
|
case recs := <-w.recs:
|
||||||
|
if len(recs) >= w.maxInflight {
|
||||||
|
paused = true
|
||||||
|
w.reader.PauseFetchPartitions(w.tpmap)
|
||||||
|
}
|
||||||
|
for _, record := range recs {
|
||||||
|
p := pPool.Get().(*publication)
|
||||||
|
p.msg.Header = nil
|
||||||
|
p.msg.Body = nil
|
||||||
|
p.topic = record.Topic
|
||||||
|
p.err = nil
|
||||||
|
p.ack = false
|
||||||
|
if w.opts.BodyOnly {
|
||||||
|
p.msg.Body = record.Value
|
||||||
|
} else {
|
||||||
|
if err := w.kopts.Codec.Unmarshal(record.Value, p.msg); err != nil {
|
||||||
|
p.err = err
|
||||||
|
p.msg.Body = record.Value
|
||||||
|
if eh != nil {
|
||||||
|
_ = eh(p)
|
||||||
|
if p.ack {
|
||||||
|
w.reader.MarkCommitRecords(record)
|
||||||
|
} else {
|
||||||
|
w.cherr <- ErrLostMessage
|
||||||
|
pPool.Put(p)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
pPool.Put(p)
|
||||||
|
continue
|
||||||
|
} else {
|
||||||
|
if w.kopts.Logger.V(logger.ErrorLevel) {
|
||||||
|
w.kopts.Logger.Errorf(w.kopts.Context, "[kgo]: failed to unmarshal: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pPool.Put(p)
|
||||||
|
w.cherr <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
err = w.handler(p)
|
||||||
|
if err == nil && w.opts.AutoAck {
|
||||||
|
p.ack = true
|
||||||
|
} else if err != nil {
|
||||||
|
p.err = err
|
||||||
|
if eh != nil {
|
||||||
|
_ = eh(p)
|
||||||
|
} else {
|
||||||
|
if w.kopts.Logger.V(logger.ErrorLevel) {
|
||||||
|
w.kopts.Logger.Errorf(w.kopts.Context, "[kgo]: subscriber error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if p.ack {
|
||||||
|
pPool.Put(p)
|
||||||
|
w.reader.MarkCommitRecords(record)
|
||||||
|
} else {
|
||||||
|
pPool.Put(p)
|
||||||
|
w.cherr <- ErrLostMessage
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if paused {
|
||||||
|
paused = false
|
||||||
|
w.reader.ResumeFetchPartitions(w.tpmap)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Reference in New Issue
Block a user