Compare commits

...

21 Commits

Author SHA1 Message Date
bc0d2026ad Merge pull request #26 from unistack-org/master
add additional options for server and broke
2021-12-20 08:48:09 +03:00
1fc59df95b Merge pull request #25 from unistack-org/options
add additional options and metadata fixes
2021-12-20 08:47:34 +03:00
ff82f9f960 Merge pull request #22 from unistack-org/dependabot/go_modules/github.com/twmb/franz-go-1.2.5
Bump github.com/twmb/franz-go from 1.2.4 to 1.2.5
2021-11-29 16:25:46 +03:00
dependabot[bot]
221f07ca97 Bump github.com/twmb/franz-go from 1.2.4 to 1.2.5
Bumps [github.com/twmb/franz-go](https://github.com/twmb/franz-go) from 1.2.4 to 1.2.5.
- [Release notes](https://github.com/twmb/franz-go/releases)
- [Changelog](https://github.com/twmb/franz-go/blob/master/CHANGELOG.md)
- [Commits](https://github.com/twmb/franz-go/compare/v1.2.4...v1.2.5)

---
updated-dependencies:
- dependency-name: github.com/twmb/franz-go
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2021-11-29 11:21:34 +00:00
44b9a59aab Merge pull request #19 from unistack-org/dependabot/go_modules/github.com/twmb/franz-go-1.2.4
Bump github.com/twmb/franz-go from 1.2.3 to 1.2.4
2021-11-24 14:49:02 +03:00
dependabot[bot]
81cd4c1ec6 Bump github.com/twmb/franz-go from 1.2.3 to 1.2.4
Bumps [github.com/twmb/franz-go](https://github.com/twmb/franz-go) from 1.2.3 to 1.2.4.
- [Release notes](https://github.com/twmb/franz-go/releases)
- [Changelog](https://github.com/twmb/franz-go/blob/master/CHANGELOG.md)
- [Commits](https://github.com/twmb/franz-go/compare/v1.2.3...v1.2.4)

---
updated-dependencies:
- dependency-name: github.com/twmb/franz-go
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2021-11-24 11:25:36 +00:00
50d1343adb Merge pull request #21 from unistack-org/dependabot/go_modules/go.unistack.org/micro/v3-3.8.11
Bump go.unistack.org/micro/v3 from 3.8.10 to 3.8.11
2021-11-24 14:22:11 +03:00
dependabot[bot]
4c6b057a90 Bump go.unistack.org/micro/v3 from 3.8.10 to 3.8.11
Bumps [go.unistack.org/micro/v3](https://github.com/unistack-org/micro) from 3.8.10 to 3.8.11.
- [Release notes](https://github.com/unistack-org/micro/releases)
- [Commits](https://github.com/unistack-org/micro/compare/v3.8.10...v3.8.11)

---
updated-dependencies:
- dependency-name: go.unistack.org/micro/v3
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2021-11-24 11:20:08 +00:00
06acad41fe Merge pull request #20 from unistack-org/dependabot/go_modules/go.unistack.org/micro/v3-3.8.10
Bump go.unistack.org/micro/v3 from 3.8.7 to 3.8.10
2021-11-19 14:22:39 +03:00
dependabot[bot]
0ac121099f Bump go.unistack.org/micro/v3 from 3.8.7 to 3.8.10
Bumps [go.unistack.org/micro/v3](https://github.com/unistack-org/micro) from 3.8.7 to 3.8.10.
- [Release notes](https://github.com/unistack-org/micro/releases)
- [Commits](https://github.com/unistack-org/micro/compare/v3.8.7...v3.8.10)

---
updated-dependencies:
- dependency-name: go.unistack.org/micro/v3
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2021-11-19 11:20:35 +00:00
d30f3d200b fix panic on publish with not connected broker
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-11-13 18:58:21 +03:00
4373f485ce Merge pull request #18 from unistack-org/dependabot/go_modules/github.com/twmb/franz-go-1.2.3
Bump github.com/twmb/franz-go from 1.2.2 to 1.2.3
2021-11-08 15:19:49 +03:00
dependabot[bot]
9faac6a407 Bump github.com/twmb/franz-go from 1.2.2 to 1.2.3
Bumps [github.com/twmb/franz-go](https://github.com/twmb/franz-go) from 1.2.2 to 1.2.3.
- [Release notes](https://github.com/twmb/franz-go/releases)
- [Changelog](https://github.com/twmb/franz-go/blob/master/CHANGELOG.md)
- [Commits](https://github.com/twmb/franz-go/compare/v1.2.2...v1.2.3)

---
updated-dependencies:
- dependency-name: github.com/twmb/franz-go
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2021-11-08 11:22:04 +00:00
02134eacc2 Merge pull request #17 from unistack-org/dependabot/go_modules/go.unistack.org/micro/v3-3.8.7
Bump go.unistack.org/micro/v3 from 3.8.6 to 3.8.7
2021-10-28 14:22:12 +03:00
dependabot[bot]
be302e0816 Bump go.unistack.org/micro/v3 from 3.8.6 to 3.8.7
Bumps [go.unistack.org/micro/v3](https://github.com/unistack-org/micro) from 3.8.6 to 3.8.7.
- [Release notes](https://github.com/unistack-org/micro/releases)
- [Commits](https://github.com/unistack-org/micro/compare/v3.8.6...v3.8.7)

---
updated-dependencies:
- dependency-name: go.unistack.org/micro/v3
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2021-10-28 11:20:05 +00:00
cfd4fb611e Merge pull request #15 from unistack-org/dependabot/go_modules/github.com/twmb/franz-go-1.2.2
Bump github.com/twmb/franz-go from 1.1.1 to 1.2.2
2021-10-27 18:03:34 +03:00
dependabot[bot]
619d9d0d70 Bump github.com/twmb/franz-go from 1.1.1 to 1.2.2
Bumps [github.com/twmb/franz-go](https://github.com/twmb/franz-go) from 1.1.1 to 1.2.2.
- [Release notes](https://github.com/twmb/franz-go/releases)
- [Changelog](https://github.com/twmb/franz-go/blob/master/CHANGELOG.md)
- [Commits](https://github.com/twmb/franz-go/compare/v1.1.1...v1.2.2)

---
updated-dependencies:
- dependency-name: github.com/twmb/franz-go
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2021-10-27 14:50:15 +00:00
ebad3e1369 Merge pull request #16 from unistack-org/dependabot/go_modules/go.unistack.org/micro/v3-3.8.6
Bump go.unistack.org/micro/v3 from 3.8.0 to 3.8.6
2021-10-27 17:49:14 +03:00
dependabot[bot]
485de5136f Bump go.unistack.org/micro/v3 from 3.8.0 to 3.8.6
Bumps [go.unistack.org/micro/v3](https://github.com/unistack-org/micro) from 3.8.0 to 3.8.6.
- [Release notes](https://github.com/unistack-org/micro/releases)
- [Commits](https://github.com/unistack-org/micro/compare/v3.8.0...v3.8.6)

---
updated-dependencies:
- dependency-name: go.unistack.org/micro/v3
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2021-10-27 14:48:54 +00:00
6ce9ed8e84 update workflows
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-10-27 17:47:25 +03:00
2c74b3232b support key publish option
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-10-17 18:29:05 +03:00
8 changed files with 130 additions and 86 deletions

View File

@@ -9,7 +9,7 @@
# the `language` matrix defined below to confirm you have the correct set of
# supported CodeQL languages.
#
name: "CodeQL"
name: "codeql"
on:
workflow_run:
@@ -17,16 +17,16 @@ on:
types:
- completed
push:
branches: [ master ]
branches: [ master, v3 ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ master ]
branches: [ master, v3 ]
schedule:
- cron: '34 1 * * 0'
jobs:
analyze:
name: Analyze
name: analyze
runs-on: ubuntu-latest
permissions:
actions: read
@@ -42,11 +42,14 @@ jobs:
# https://docs.github.com/en/free-pro-team@latest/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#changing-the-languages-that-are-analyzed
steps:
- name: Checkout repository
- name: checkout
uses: actions/checkout@v2
- name: setup
uses: actions/setup-go@v2
with:
go-version: 1.16
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
- name: init
uses: github/codeql-action/init@v1
with:
languages: ${{ matrix.language }}
@@ -57,7 +60,7 @@ jobs:
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
- name: autobuild
uses: github/codeql-action/autobuild@v1
# Command-line programs to run using the OS shell.
@@ -71,5 +74,5 @@ jobs:
# make bootstrap
# make release
- name: Perform CodeQL Analysis
- name: analyze
uses: github/codeql-action/analyze@v1

View File

@@ -1,66 +1,31 @@
name: "prautomerge"
on:
workflow_run:
workflows: ["prbuild"]
types:
- completed
pull_request_target:
types: [assigned, opened, synchronize, reopened]
permissions:
contents: write
pull-requests: write
contents: write
jobs:
Dependabot-Automerge:
dependabot:
runs-on: ubuntu-latest
# Contains workaround to execute if dependabot updates the PR by checking for the base branch in the linked PR
# The the github.event.workflow_run.event value is 'push' and not 'pull_request'
# dont work with multiple workflows when last returns success
if: >-
github.event.workflow_run.conclusion == 'success'
&& github.actor == 'dependabot[bot]'
&& github.event.sender.login == 'dependabot[bot]'
&& github.event.sender.type == 'Bot'
&& (github.event.workflow_run.event == 'pull_request'
|| (github.event.workflow_run.event == 'push' && github.event.workflow_run.pull_requests[0].base.ref == github.event.repository.default_branch ))
if: ${{ github.actor == 'dependabot[bot]' }}
steps:
- name: Approve Changes and Merge changes if label 'dependencies' is set
uses: actions/github-script@v5
- name: metadata
id: metadata
uses: dependabot/fetch-metadata@v1.1.1
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
script: |
console.log(context.payload.workflow_run);
var labelNames = await github.paginate(
github.issues.listLabelsOnIssue,
{
repo: context.repo.repo,
owner: context.repo.owner,
issue_number: context.payload.workflow_run.pull_requests[0].number,
},
(response) => response.data.map(
(label) => label.name
)
);
console.log(labelNames);
if (labelNames.includes('dependencies')) {
console.log('Found label');
await github.pulls.createReview({
repo: context.repo.repo,
owner: context.repo.owner,
pull_number: context.payload.workflow_run.pull_requests[0].number,
event: 'APPROVE'
});
console.log('Approved PR');
await github.pulls.merge({
repo: context.repo.repo,
owner: context.repo.owner,
pull_number: context.payload.workflow_run.pull_requests[0].number,
});
console.log('Merged PR');
}
github-token: "${{ secrets.TOKEN }}"
- name: approve
run: gh pr review --approve "$PR_URL"
env:
PR_URL: ${{github.event.pull_request.html_url}}
GITHUB_TOKEN: ${{secrets.TOKEN}}
- name: merge
if: ${{contains(steps.metadata.outputs.dependency-names, 'go.unistack.org')}}
run: gh pr merge --auto --merge "$PR_URL"
env:
PR_URL: ${{github.event.pull_request.html_url}}
GITHUB_TOKEN: ${{secrets.TOKEN}}

View File

@@ -1,2 +1,9 @@
# micro-broker-kgo
yet another micro kafka broker alternative
TODO:
* dont always append options from context on Init and New
* add SubscriberOptions(...kgo.Opt)
* add ServerSubscribeOptions(...kgo.Opt)
* check PublisherOptions(...kgo.Opt)
* check ClientPublisherOptions(...kgo.Opt)

8
go.mod
View File

@@ -3,8 +3,8 @@ module go.unistack.org/micro-broker-kgo/v3
go 1.16
require (
github.com/klauspost/compress v1.13.6 // indirect
github.com/twmb/franz-go v1.1.2
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210930203137-7fbfd17de279
go.unistack.org/micro/v3 v3.8.0
github.com/pierrec/lz4/v4 v4.1.12 // indirect
github.com/twmb/franz-go v1.2.6
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20211207071611-6a03ca9e400b
go.unistack.org/micro/v3 v3.8.12
)

32
go.sum
View File

@@ -15,36 +15,37 @@ github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/U
github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg=
github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aWNYyvBVK62bc=
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
github.com/klauspost/compress v1.13.5/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pierrec/lz4/v4 v4.1.8 h1:ieHkV+i2BRzngO4Wd/3HGowuZStgq6QkPsD1eolNAO4=
github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.11/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.12 h1:44l88ehTZAUGW4VlO1QC4zkilL99M6Y9MXNwEs0uzP8=
github.com/pierrec/lz4/v4 v4.1.12/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/silas/dag v0.0.0-20210626123444-3804bac2d6d4/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/twmb/franz-go v1.1.2 h1:i9lCDEcXWpRd3YXFlffIc/koBl+JijI13F5sg69NPRk=
github.com/twmb/franz-go v1.1.2/go.mod h1:KerrVhzNpasYrWJLr2Yj6Cui43f1BxH4U9SJEDVOjqQ=
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210914042331-106aef61b693/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210930203137-7fbfd17de279 h1:58GX0Ju/PCG1zWVdsLEE5OUu70KlfOrxY7djPszuqiw=
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210930203137-7fbfd17de279/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
github.com/twmb/franz-go v1.2.6 h1:WVub2Sml7LqER9VU0WxsiOTom4LBK7YMj+7jbqadE3U=
github.com/twmb/franz-go v1.2.6/go.mod h1:P+i2DnBaec1o0z9EI8CyAM/WAjG99CHI3oCAhZDoy48=
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20211127185622-3b34db0c6d1e/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20211207071611-6a03ca9e400b h1:rCe006NN/89GvtdXqRbgsqtL/nVTj83/dQ9ok8DJFcM=
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20211207071611-6a03ca9e400b/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
github.com/twmb/go-rbtree v1.0.0 h1:KxN7dXJ8XaZ4cvmHV1qqXTshxX3EBvX/toG5+UR49Mg=
github.com/twmb/go-rbtree v1.0.0/go.mod h1:UlIAI8gu3KRPkXSobZnmJfVwCJgEhD/liWzT5ppzIyc=
github.com/unistack-org/micro-proto v0.0.9 h1:KrWLS4FUX7UAWNAilQf70uad6ZPf/0EudeddCXllRVc=
github.com/unistack-org/micro-proto v0.0.9/go.mod h1:Cckwmzd89gvS7ThxzZp9kQR/EOdksFQcsTAtDDyKwrg=
go.unistack.org/micro/v3 v3.8.0 h1:9k6C40xdJf65VW6cVB8sOJVSJhyz1UfFOhX7/II2ivo=
go.unistack.org/micro/v3 v3.8.0/go.mod h1:Tkteri0wiiybbH6aPqay26pZHFIAwL9LXJc2x1Jkakk=
go.unistack.org/micro-proto/v3 v3.1.0 h1:q39FwjFiRZn+Ux/tt+d3bJTmDtsQQWa+3SLYVo1vLfA=
go.unistack.org/micro-proto/v3 v3.1.0/go.mod h1:DpRhYCBXlmSJ/AAXTmntvlh7kQkYU6eFvlmYAx4BQS8=
go.unistack.org/micro/v3 v3.8.12 h1:ACaHE8ZIHFXqEGPSRvXzND4hcqCSQf04WkzOFY6Y1gQ=
go.unistack.org/micro/v3 v3.8.12/go.mod h1:KMMmOmbgo/D52/rCAbqeKbBsgEEbSKM69he54J3ZIuA=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210913180222-943fd674d43e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210928044308-7d9f5e0b762b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211123203042-d83791d6bcd9/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -52,7 +53,6 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

16
kgo.go
View File

@@ -243,13 +243,26 @@ func (k *kBroker) Publish(ctx context.Context, topic string, msg *broker.Message
}
func (k *kBroker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
k.RLock()
if !k.connected {
k.RUnlock()
return broker.ErrNotConnected
}
k.RUnlock()
options := broker.NewPublishOptions(opts...)
records := make([]*kgo.Record, 0, len(msgs))
var errs []string
var err error
var key []byte
if options.Context != nil {
if k, ok := options.Context.Value(publishKey{}).([]byte); ok && k != nil {
key = k
}
}
for _, msg := range msgs {
rec := &kgo.Record{}
rec := &kgo.Record{Key: key}
rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic)
if options.BodyOnly {
rec.Value = msg.Body
@@ -273,6 +286,7 @@ func (k *kBroker) publish(ctx context.Context, msgs []*broker.Message, opts ...b
errs = append(errs, result.Err.Error())
}
}
if len(errs) > 0 {
return fmt.Errorf("publish error: %s", strings.Join(errs, "\n"))
}

View File

@@ -7,6 +7,7 @@ import (
kgo "github.com/twmb/franz-go/pkg/kgo"
"go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/client"
"go.unistack.org/micro/v3/server"
)
// DefaultCommitInterval specifies how fast send commit offsets to kafka
@@ -48,6 +49,36 @@ func Options(opts ...kgo.Opt) broker.Option {
}
}
// SubscribeOptions pass additional options to broker
func SubscribeOptions(opts ...kgo.Opt) broker.SubscribeOption {
return func(o *broker.SubscribeOptions) {
if o.Context == nil {
o.Context = context.Background()
}
options, ok := o.Context.Value(optionsKey{}).([]kgo.Opt)
if !ok {
options = make([]kgo.Opt, 0, len(opts))
}
options = append(options, opts...)
o.Context = context.WithValue(o.Context, optionsKey{}, options)
}
}
// SubscriberOptions pass additional options to broker
func SubscriberOptions(opts ...kgo.Opt) server.SubscriberOption {
return func(o *server.SubscriberOptions) {
if o.Context == nil {
o.Context = context.Background()
}
options, ok := o.Context.Value(optionsKey{}).([]kgo.Opt)
if !ok {
options = make([]kgo.Opt, 0, len(opts))
}
options = append(options, opts...)
o.Context = context.WithValue(o.Context, optionsKey{}, options)
}
}
type commitIntervalKey struct{}
// CommitInterval specifies interval to send commits
@@ -59,7 +90,7 @@ var DefaultSubscribeMaxInflight = 1000
type subscribeMaxInflightKey struct{}
// SubscribeMaxInFlight specifies interval to send commits
// SubscribeMaxInFlight max queued messages
func SubscribeMaxInFlight(n int) broker.SubscribeOption {
return broker.SetSubscribeOption(subscribeMaxInflightKey{}, n)
}

24
util.go
View File

@@ -158,6 +158,14 @@ func (w *worker) handle() {
p.ack = false
if w.opts.BodyOnly {
p.msg.Body = record.Value
if l := len(record.Headers); l > 0 {
if p.msg.Header == nil {
p.msg.Header = metadata.New(l)
}
for _, h := range record.Headers {
p.msg.Header.Set(h.Key, string(h.Value))
}
}
} else if w.kopts.Codec.String() == "noop" {
p.msg.Body = record.Value
p.msg.Header = metadata.New(len(record.Headers))
@@ -168,6 +176,14 @@ func (w *worker) handle() {
if err := w.kopts.Codec.Unmarshal(record.Value, p.msg); err != nil {
p.err = err
p.msg.Body = record.Value
if l := len(record.Headers); l > 0 {
if p.msg.Header == nil {
p.msg.Header = metadata.New(l)
}
for _, h := range record.Headers {
p.msg.Header.Set(h.Key, string(h.Value))
}
}
if eh != nil {
_ = eh(p)
if p.ack {
@@ -188,6 +204,14 @@ func (w *worker) handle() {
w.cherr <- err
return
}
if l := len(record.Headers); l > 0 {
if p.msg.Header == nil {
p.msg.Header = metadata.New(l)
}
for _, h := range record.Headers {
p.msg.Header.Set(h.Key, string(h.Value))
}
}
}
err = w.handler(p)
if err == nil && w.opts.AutoAck {