Compare commits

..

37 Commits

Author SHA1 Message Date
c1fa2f639d minimize memory usage, name all traces with sdk.broker
Some checks failed
build / lint (push) Failing after 8s
build / test (push) Failing after 7s
codeql / analyze (go) (push) Failing after 11s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-27 13:28:17 +03:00
8e3f2c67d7 update deps
Some checks failed
build / test (push) Failing after 7s
build / lint (push) Failing after 8s
codeql / analyze (go) (push) Failing after 10s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-20 18:05:20 +03:00
e66194695e improve tracing
Some checks failed
build / test (push) Failing after 25s
build / lint (push) Successful in 22s
codeql / analyze (go) (push) Failing after 46s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-07-22 23:45:23 +03:00
894d6f4f20 tracing fixes
Some checks failed
build / lint (push) Successful in 27s
build / test (push) Failing after 29s
codeql / analyze (go) (push) Failing after 50s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-07-22 01:11:33 +03:00
d404fa31ab export Subscriber
Some checks failed
build / test (push) Failing after 1m38s
codeql / analyze (go) (push) Failing after 1m59s
build / lint (push) Successful in 9m15s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-05-22 18:28:51 +03:00
88777a29ad add helper funcs
Some checks failed
build / test (push) Failing after 1m39s
codeql / analyze (go) (push) Failing after 2m8s
build / lint (push) Successful in 9m13s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-05-22 17:49:36 +03:00
23c2903c21 fixup tracing
Some checks failed
build / test (push) Failing after 1m36s
codeql / analyze (go) (push) Failing after 1m42s
build / lint (push) Successful in 9m13s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-05-06 08:20:27 +03:00
8fcc23f639 fixup tracing
Some checks failed
build / test (push) Failing after 1m46s
codeql / analyze (go) (push) Failing after 1m45s
build / lint (push) Successful in 9m12s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-05-06 07:30:17 +03:00
25dda1f34c fixup tracing
Some checks failed
build / test (push) Failing after 1m31s
codeql / analyze (go) (push) Failing after 1m49s
build / lint (push) Successful in 9m17s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-05-05 20:20:34 +03:00
fe66086c40 fixup tracing
Some checks failed
build / test (push) Failing after 2m10s
codeql / analyze (go) (push) Failing after 2m7s
build / lint (push) Successful in 9m15s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-05-05 16:20:05 +03:00
7329bc23bc export lag for all partition, not only owned
Some checks failed
build / test (push) Failing after 1m14s
build / lint (push) Successful in 9m28s
codeql / analyze (go) (push) Failing after 14m55s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-05-02 23:01:04 +03:00
c240631cdb fixup panic
Some checks failed
build / test (push) Failing after 1m32s
codeql / analyze (go) (push) Failing after 2m37s
build / lint (push) Successful in 9m31s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-05-02 10:32:33 +03:00
Кирилл Горбунов
6a68533824 #133 fix race. (#134)
Some checks failed
build / test (push) Has been cancelled
build / lint (push) Has been cancelled
codeql / analyze (go) (push) Has been cancelled
Co-authored-by: Gorbunov Kirill Andreevich <kgorbunov@mtsbank.ru>
Reviewed-on: #134
Co-authored-by: Кирилл Горбунов <kirya_gorbunov_2015@mail.ru>
Co-committed-by: Кирилл Горбунов <kirya_gorbunov_2015@mail.ru>
2024-04-19 19:26:06 +03:00
058b6354c0 fixup tracing
Some checks failed
build / test (push) Failing after 1m27s
codeql / analyze (go) (push) Failing after 1m43s
build / lint (push) Successful in 9m20s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-04-14 23:17:38 +03:00
1f4cf11afe fix group lag exporter
Some checks failed
build / test (push) Failing after 1m36s
codeql / analyze (go) (push) Failing after 1m37s
build / lint (push) Successful in 9m14s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-04-13 02:40:45 +03:00
39177da1d0 massive meter usage
Some checks failed
build / test (push) Failing after 1m33s
codeql / analyze (go) (push) Failing after 1m39s
build / lint (push) Successful in 9m15s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-04-13 02:24:16 +03:00
d559db4050 fixup logger caller skip count
Some checks failed
build / test (push) Failing after 1m17s
codeql / analyze (go) (push) Failing after 8m52s
build / lint (push) Successful in 9m15s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-04-13 00:53:09 +03:00
aa946c469a fixup logger caller skip count
Some checks failed
codeql / analyze (go) (push) Failing after 1m51s
build / test (push) Failing after 1m55s
build / lint (push) Successful in 9m14s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-04-13 00:40:12 +03:00
9c4d88bb69 fixup for attrs
Some checks failed
build / test (push) Failing after 1m31s
codeql / analyze (go) (push) Failing after 1m53s
build / lint (push) Successful in 9m15s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-03-15 09:38:31 +03:00
56288f46b1 cleanup tracing
Some checks failed
build / test (push) Failing after 1m30s
codeql / analyze (go) (push) Failing after 1m57s
build / lint (push) Successful in 9m19s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-03-14 23:25:19 +03:00
81dcef8b28 fixup tracer span labels
Some checks failed
build / test (push) Failing after 1m29s
codeql / analyze (go) (push) Failing after 1m53s
build / lint (push) Successful in 9m15s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-03-14 16:47:44 +03:00
ec7a22b2dc fix double init
Some checks failed
build / test (push) Failing after 1m34s
codeql / analyze (go) (push) Failing after 2m5s
build / lint (push) Successful in 9m15s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-03-07 09:06:33 +03:00
d2ac0c1360 update deps
Some checks failed
build / test (push) Failing after 1m49s
codeql / analyze (go) (push) Failing after 2m6s
build / lint (push) Successful in 9m18s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-03-07 00:46:24 +03:00
69dd8c4eea fixup fields again
Some checks failed
build / test (push) Has been cancelled
build / lint (push) Has been cancelled
codeql / analyze (go) (push) Has been cancelled
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-02-22 09:04:02 +03:00
27a6a923cd gomod
Some checks are pending
build / test (push) Waiting to run
build / lint (push) Waiting to run
codeql / analyze (go) (push) Waiting to run
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-02-21 15:45:13 +03:00
0a395235d6 backport
Some checks are pending
build / test (push) Waiting to run
build / lint (push) Waiting to run
codeql / analyze (go) (push) Waiting to run
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-02-21 15:08:58 +03:00
23f0ad0f2f Merge pull request 'change log fields' (#129) from fix into v3
Some checks failed
build / test (push) Has been cancelled
build / lint (push) Has been cancelled
codeql / analyze (go) (push) Has been cancelled
Reviewed-on: #129
2024-02-15 10:45:37 +03:00
4fcbe0a770 change log fields
Some checks failed
codeql / analyze (go) (pull_request) Has been cancelled
prbuild / test (pull_request) Has been cancelled
prbuild / lint (pull_request) Has been cancelled
autoapprove / autoapprove (pull_request) Has been cancelled
automerge / automerge (pull_request) Has been cancelled
dependabot-automerge / automerge (pull_request) Has been cancelled
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-02-15 10:44:59 +03:00
28c9865121 Merge pull request 'logger fix' (#128) from logger_fix into v3
Some checks are pending
build / test (push) Waiting to run
build / lint (push) Waiting to run
codeql / analyze (go) (push) Waiting to run
Reviewed-on: #128
2024-02-15 10:21:06 +03:00
697413d829 logger fix
Some checks failed
codeql / analyze (go) (pull_request) Has been cancelled
prbuild / test (pull_request) Has been cancelled
prbuild / lint (pull_request) Has been cancelled
autoapprove / autoapprove (pull_request) Has been cancelled
automerge / automerge (pull_request) Has been cancelled
dependabot-automerge / automerge (pull_request) Has been cancelled
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-02-15 10:20:15 +03:00
8a64e8c5cc Merge pull request 'fixup panic' (#126) from rwfix into v3
Some checks failed
build / test (push) Failing after 1m27s
build / lint (push) Failing after 2m41s
codeql / analyze (go) (push) Failing after 2m44s
Reviewed-on: #126
2023-12-20 22:54:47 +03:00
2c8ca8d14f fixup panic
Some checks failed
codeql / analyze (go) (pull_request) Failing after 2m42s
prbuild / test (pull_request) Failing after 1m29s
prbuild / lint (pull_request) Failing after 2m37s
autoapprove / autoapprove (pull_request) Failing after 1m24s
automerge / automerge (pull_request) Failing after 4s
dependabot-automerge / automerge (pull_request) Has been skipped
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-12-20 22:54:10 +03:00
769ac6322f Merge pull request 'allow rebalance on unsubscribe' (#125) from unsubfix into v3
Reviewed-on: #125
2023-05-25 15:12:54 +03:00
52318d68b8 allow rebalance on unsubscribe
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-05-25 15:11:53 +03:00
5c4332ffc4 Merge pull request 'fix graceful shutdown' (#124) from graceful into v3
Reviewed-on: #124
2023-05-13 15:13:17 +03:00
3a86d4c0f4 fix graceful shutdown
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-05-13 15:12:26 +03:00
8bbcc30d04 Merge pull request 'improve performance and correctness' (#122) from improve into v3
Reviewed-on: #122
2023-03-14 09:31:58 +03:00
28 changed files with 936 additions and 286 deletions

View File

@@ -1,29 +0,0 @@
name: lint
on:
pull_request:
types: [opened, reopened, synchronize]
branches:
- master
- v3
- v4
jobs:
lint:
runs-on: ubuntu-latest
steps:
- name: checkout code
uses: actions/checkout@v4
with:
filter: 'blob:none'
- name: setup go
uses: actions/setup-go@v5
with:
cache-dependency-path: "**/*.sum"
go-version: 'stable'
- name: setup deps
run: go get -v ./...
- name: run lint
uses: https://github.com/golangci/golangci-lint-action@v6
with:
version: 'latest'

View File

@@ -1,34 +0,0 @@
name: test
on:
pull_request:
types: [opened, reopened, synchronize]
branches:
- master
- v3
- v4
push:
branches:
- master
- v3
- v4
jobs:
test:
runs-on: ubuntu-latest
steps:
- name: checkout code
uses: actions/checkout@v4
with:
filter: 'blob:none'
- name: setup go
uses: actions/setup-go@v5
with:
cache-dependency-path: "**/*.sum"
go-version: 'stable'
- name: setup deps
run: go get -v ./...
- name: run test
env:
INTEGRATION_TESTS: yes
run: go test -mod readonly -v ./...

View File

@@ -1,53 +0,0 @@
name: test
on:
pull_request:
types: [opened, reopened, synchronize]
branches:
- master
- v3
- v4
push:
branches:
- master
- v3
- v4
jobs:
test:
runs-on: ubuntu-latest
steps:
- name: checkout code
uses: actions/checkout@v4
with:
filter: 'blob:none'
- name: checkout tests
uses: actions/checkout@v4
with:
ref: master
filter: 'blob:none'
repository: unistack-org/micro-tests
path: micro-tests
- name: setup go
uses: actions/setup-go@v5
with:
cache-dependency-path: "**/*.sum"
go-version: 'stable'
- name: setup go work
env:
GOWORK: /workspace/${{ github.repository_owner }}/go.work
run: |
go work init
go work use .
go work use micro-tests
- name: setup deps
env:
GOWORK: /workspace/${{ github.repository_owner }}/go.work
run: go get -v ./...
- name: run tests
env:
INTEGRATION_TESTS: yes
GOWORK: /workspace/${{ github.repository_owner }}/go.work
run: |
cd micro-tests
go test -mod readonly -v ./... || true

19
.github/dependabot.yml vendored Normal file
View File

@@ -0,0 +1,19 @@
# To get started with Dependabot version updates, you'll need to specify which
# package ecosystems to update and where the package manifests are located.
# Please see the documentation for all configuration options:
# https://help.github.com/github/administering-a-repository/configuration-options-for-dependency-updates
version: 2
updates:
# Maintain dependencies for GitHub Actions
- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: "daily"
# Maintain dependencies for Golang
- package-ecosystem: "gomod"
directory: "/"
schedule:
interval: "daily"

20
.github/workflows/autoapprove.yml vendored Normal file
View File

@@ -0,0 +1,20 @@
name: "autoapprove"
on:
pull_request_target:
types: [assigned, opened, synchronize, reopened]
permissions:
pull-requests: write
contents: write
jobs:
autoapprove:
runs-on: ubuntu-latest
steps:
- name: approve
uses: hmarr/auto-approve-action@v3
if: github.actor == 'vtolstov' || github.actor == 'dependabot[bot]'
id: approve
with:
github-token: ${{ secrets.GITHUB_TOKEN }}

21
.github/workflows/automerge.yml vendored Normal file
View File

@@ -0,0 +1,21 @@
name: "automerge"
on:
pull_request_target:
types: [assigned, opened, synchronize, reopened]
permissions:
pull-requests: write
contents: write
jobs:
automerge:
runs-on: ubuntu-latest
if: github.actor == 'vtolstov'
steps:
- name: merge
id: merge
run: gh pr merge --auto --merge "$PR_URL"
env:
PR_URL: ${{github.event.pull_request.html_url}}
GITHUB_TOKEN: ${{secrets.TOKEN}}

47
.github/workflows/build.yml vendored Normal file
View File

@@ -0,0 +1,47 @@
name: build
on:
push:
branches:
- master
- v3
jobs:
test:
name: test
runs-on: ubuntu-latest
steps:
- name: setup
uses: actions/setup-go@v3
with:
go-version: 1.17
- name: checkout
uses: actions/checkout@v3
- name: cache
uses: actions/cache@v3
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: ${{ runner.os }}-go-
- name: deps
run: go get -v -t -d ./...
- name: test
env:
INTEGRATION_TESTS: yes
run: go test -mod readonly -v ./...
lint:
name: lint
runs-on: ubuntu-latest
steps:
- name: checkout
uses: actions/checkout@v3
- name: lint
uses: golangci/golangci-lint-action@v3.4.0
continue-on-error: true
with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
version: v1.30
# Optional: working directory, useful for monorepos
# working-directory: somedir
# Optional: golangci-lint command line arguments.
# args: --issues-exit-code=0
# Optional: show only new issues if it's a pull request. The default value is `false`.
# only-new-issues: true

78
.github/workflows/codeql-analysis.yml vendored Normal file
View File

@@ -0,0 +1,78 @@
# For most projects, this workflow file will not need changing; you simply need
# to commit it to your repository.
#
# You may wish to alter this file to override the set of languages analyzed,
# or to provide custom queries or build logic.
#
# ******** NOTE ********
# We have attempted to detect the languages in your repository. Please check
# the `language` matrix defined below to confirm you have the correct set of
# supported CodeQL languages.
#
name: "codeql"
on:
workflow_run:
workflows: ["prbuild"]
types:
- completed
push:
branches: [ master, v3 ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ master, v3 ]
schedule:
- cron: '34 1 * * 0'
jobs:
analyze:
name: analyze
runs-on: ubuntu-latest
permissions:
actions: read
contents: read
security-events: write
strategy:
fail-fast: false
matrix:
language: [ 'go' ]
# CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python' ]
# Learn more:
# https://docs.github.com/en/free-pro-team@latest/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#changing-the-languages-that-are-analyzed
steps:
- name: checkout
uses: actions/checkout@v3
- name: setup
uses: actions/setup-go@v3
with:
go-version: 1.17
# Initializes the CodeQL tools for scanning.
- name: init
uses: github/codeql-action/init@v2
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.
# queries: ./path/to/local/query, your-org/your-repo/queries@main
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: autobuild
uses: github/codeql-action/autobuild@v2
# Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl
# ✏️ If the Autobuild fails above, remove it and uncomment the following three lines
# and modify them (or add more) to build your code if your project
# uses a compiled language
#- run: |
# make bootstrap
# make release
- name: analyze
uses: github/codeql-action/analyze@v2

View File

@@ -0,0 +1,27 @@
name: "dependabot-automerge"
on:
pull_request_target:
types: [assigned, opened, synchronize, reopened]
permissions:
pull-requests: write
contents: write
jobs:
automerge:
runs-on: ubuntu-latest
if: github.actor == 'dependabot[bot]'
steps:
- name: metadata
id: metadata
uses: dependabot/fetch-metadata@v1.3.6
with:
github-token: "${{ secrets.TOKEN }}"
- name: merge
id: merge
if: ${{contains(steps.metadata.outputs.dependency-names, 'go.unistack.org')}}
run: gh pr merge --auto --merge "$PR_URL"
env:
PR_URL: ${{github.event.pull_request.html_url}}
GITHUB_TOKEN: ${{secrets.TOKEN}}

47
.github/workflows/pr.yml vendored Normal file
View File

@@ -0,0 +1,47 @@
name: prbuild
on:
pull_request:
branches:
- master
- v3
jobs:
test:
name: test
runs-on: ubuntu-latest
steps:
- name: setup
uses: actions/setup-go@v3
with:
go-version: 1.17
- name: checkout
uses: actions/checkout@v3
- name: cache
uses: actions/cache@v3
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: ${{ runner.os }}-go-
- name: deps
run: go get -v -t -d ./...
- name: test
env:
INTEGRATION_TESTS: yes
run: go test -mod readonly -v ./...
lint:
name: lint
runs-on: ubuntu-latest
steps:
- name: checkout
uses: actions/checkout@v3
- name: lint
uses: golangci/golangci-lint-action@v3.4.0
continue-on-error: true
with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
version: v1.30
# Optional: working directory, useful for monorepos
# working-directory: somedir
# Optional: golangci-lint command line arguments.
# args: --issues-exit-code=0
# Optional: show only new issues if it's a pull request. The default value is `false`.
# only-new-issues: true

View File

@@ -1,5 +0,0 @@
run:
concurrency: 8
deadline: 5m
issues-exit-code: 1
tests: true

View File

@@ -1,9 +1,2 @@
# micro-broker-kgo # broker-kgo
yet another micro kafka broker alternative
TODO:
* dont always append options from context on Init and New
* add SubscriberOptions(...kgo.Opt)
* add ServerSubscribeOptions(...kgo.Opt)
* check PublisherOptions(...kgo.Opt)
* check ClientPublisherOptions(...kgo.Opt)

76
carrier.go Normal file
View File

@@ -0,0 +1,76 @@
package kgo
import (
"github.com/twmb/franz-go/pkg/kgo"
"go.unistack.org/micro/v3/metadata"
)
// RecordCarrier injects and extracts traces from a kgo.Record.
//
// This type exists to satisfy the otel/propagation.TextMapCarrier interface.
type RecordCarrier struct {
record *kgo.Record
}
// NewRecordCarrier creates a new RecordCarrier.
func NewRecordCarrier(record *kgo.Record) RecordCarrier {
return RecordCarrier{record: record}
}
// Get retrieves a single value for a given key if it exists.
func (c RecordCarrier) Get(key string) string {
for _, h := range c.record.Headers {
if h.Key == key {
return string(h.Value)
}
}
return ""
}
// Set sets a header.
func (c RecordCarrier) Set(key, val string) {
// Check if key already exists.
for i, h := range c.record.Headers {
if h.Key == key {
// Key exist, update the value.
c.record.Headers[i].Value = []byte(val)
return
}
}
// Key does not exist, append new header.
c.record.Headers = append(c.record.Headers, kgo.RecordHeader{
Key: key,
Value: []byte(val),
})
}
// Keys returns a slice of all key identifiers in the carrier.
func (c RecordCarrier) Keys() []string {
out := make([]string, len(c.record.Headers))
for i, h := range c.record.Headers {
out[i] = h.Key
}
return out
}
func setHeaders(r *kgo.Record, md metadata.Metadata) {
seen := make(map[string]struct{})
loop:
for k, v := range md {
for i := 0; i < len(r.Headers); i++ {
if r.Headers[i].Key == k {
// Key exist, update the value.
r.Headers[i].Value = []byte(v)
continue loop
} else if _, ok := seen[k]; ok {
continue loop
}
// Key does not exist, append new header.
r.Headers = append(r.Headers, kgo.RecordHeader{
Key: k,
Value: []byte(v),
})
seen[k] = struct{}{}
}
}
}

View File

@@ -1,12 +1,14 @@
package kgo package kgo
import ( import (
"context"
"sync" "sync"
"go.unistack.org/micro/v4/broker" "go.unistack.org/micro/v3/broker"
) )
type event struct { type event struct {
ctx context.Context
topic string topic string
err error err error
sync.RWMutex sync.RWMutex
@@ -14,6 +16,10 @@ type event struct {
ack bool ack bool
} }
func (p *event) Context() context.Context {
return p.ctx
}
func (p *event) Topic() string { func (p *event) Topic() string {
return p.topic return p.topic
} }

23
go.mod
View File

@@ -1,14 +1,25 @@
module go.unistack.org/micro-broker-kgo/v4 module go.unistack.org/micro-broker-kgo/v3
go 1.19 go 1.22
toolchain go1.23.1
require ( require (
github.com/twmb/franz-go v1.16.1 github.com/google/uuid v1.6.0
github.com/twmb/franz-go/pkg/kmsg v1.7.0 github.com/twmb/franz-go v1.17.1
go.unistack.org/micro/v4 v4.0.17 github.com/twmb/franz-go/pkg/kadm v1.13.0
github.com/twmb/franz-go/pkg/kmsg v1.8.0
go.opentelemetry.io/otel v1.30.0
go.unistack.org/micro/v3 v3.10.91
) )
require ( require (
github.com/klauspost/compress v1.17.7 // indirect github.com/klauspost/compress v1.17.9 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect
go.unistack.org/micro-proto/v3 v3.4.1 // indirect
golang.org/x/crypto v0.27.0 // indirect
golang.org/x/sys v0.25.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
google.golang.org/grpc v1.67.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
) )

63
go.sum
View File

@@ -1,11 +1,64 @@
github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/twmb/franz-go v1.16.1 h1:rpWc7fB9jd7TgmCyfxzenBI+QbgS8ZfJOUQE+tzPtbE= github.com/twmb/franz-go v1.16.1 h1:rpWc7fB9jd7TgmCyfxzenBI+QbgS8ZfJOUQE+tzPtbE=
github.com/twmb/franz-go v1.16.1/go.mod h1:/pER254UPPGp/4WfGqRi+SIRGE50RSQzVubQp6+N4FA= github.com/twmb/franz-go v1.16.1/go.mod h1:/pER254UPPGp/4WfGqRi+SIRGE50RSQzVubQp6+N4FA=
github.com/twmb/franz-go v1.17.1 h1:0LwPsbbJeJ9R91DPUHSEd4su82WJWcTY1Zzbgbg4CeQ=
github.com/twmb/franz-go v1.17.1/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM=
github.com/twmb/franz-go/pkg/kadm v1.11.0 h1:FfeWJ0qadntFpAcQt8JzNXW4dijjytZNLrzJuzzzuxA=
github.com/twmb/franz-go/pkg/kadm v1.11.0/go.mod h1:qrhkdH+SWS3ivmbqOgHbpgVHamhaKcjH0UM+uOp0M1A=
github.com/twmb/franz-go/pkg/kadm v1.13.0 h1:bJq4C2ZikUE2jh/wl9MtMTQ/kpmnBgVFh8XMQBEC+60=
github.com/twmb/franz-go/pkg/kadm v1.13.0/go.mod h1:VMvpfjz/szpH9WB+vGM+rteTzVv0djyHFimci9qm2C0=
github.com/twmb/franz-go/pkg/kmsg v1.7.0 h1:a457IbvezYfA5UkiBvyV3zj0Is3y1i8EJgqjJYoij2E= github.com/twmb/franz-go/pkg/kmsg v1.7.0 h1:a457IbvezYfA5UkiBvyV3zj0Is3y1i8EJgqjJYoij2E=
github.com/twmb/franz-go/pkg/kmsg v1.7.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw= github.com/twmb/franz-go/pkg/kmsg v1.7.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw=
go.unistack.org/micro/v4 v4.0.17 h1:mF7uM+J4ILdG+1fcwzKYCwDlxhdbF/e1WnGzKKLnIXc= github.com/twmb/franz-go/pkg/kmsg v1.8.0 h1:lAQB9Z3aMrIP9qF9288XcFf/ccaSxEitNA1CDTEIeTA=
go.unistack.org/micro/v4 v4.0.17/go.mod h1:ZDgU9931vm2l7X6RN/6UuwRIVp24GRdmQ7dKmegArk4= github.com/twmb/franz-go/pkg/kmsg v1.8.0/go.mod h1:HzYEb8G3uu5XevZbtU0dVbkphaKTHk0X68N5ka4q6mU=
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= go.opentelemetry.io/otel v1.25.0 h1:gldB5FfhRl7OJQbUHt/8s0a7cE8fbsPAtdpRaApKy4k=
go.opentelemetry.io/otel v1.25.0/go.mod h1:Wa2ds5NOXEMkCmUou1WA7ZBfLTHWIsp034OVD7AO+Vg=
go.opentelemetry.io/otel v1.30.0 h1:F2t8sK4qf1fAmY9ua4ohFS/K+FUuOPemHUIXHtktrts=
go.opentelemetry.io/otel v1.30.0/go.mod h1:tFw4Br9b7fOS+uEao81PJjVMjW/5fvNCbpsDIXqP0pc=
go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q=
go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo=
go.unistack.org/micro/v3 v3.10.59 h1:eneYXJLgyu5MZpSvyI0K17CeXvgOoUCN5dWZaPV5lI4=
go.unistack.org/micro/v3 v3.10.59/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg=
go.unistack.org/micro/v3 v3.10.91 h1:vuJY4tXwpqimwIkEJ3TozMYNVQQs+C5QMlQWPgSY/YM=
go.unistack.org/micro/v3 v3.10.91/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg=
golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30=
golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M=
golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A=
golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70=
golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240412170617-26222e5d3d56 h1:zviK8GX4VlMstrK3JkexM5UHjH1VOkRebH9y3jhSBGk=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240412170617-26222e5d3d56/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM=
google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA=
google.golang.org/grpc v1.67.0 h1:IdH9y6PF5MPSdAntIcpjQ+tXO41pcQsfZV2RxtQgVcw=
google.golang.org/grpc v1.67.0/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

65
kadmtest.go Normal file
View File

@@ -0,0 +1,65 @@
//go:build ignore
package main
import (
"context"
"fmt"
"os"
"time"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kversion"
//"github.com/twmb/franz-go/pkg/sasl/scram"
"github.com/twmb/franz-go/pkg/sasl/plain"
)
func die(msg string, args ...any) {
fmt.Fprintf(os.Stderr, msg, args...)
os.Exit(1)
}
func main() {
seeds := []string{"vm-kafka-ump01tn.mbrd.ru:9092", "vm-kafka-ump02tn.mbrd.ru:9092", "vm-kafka-ump03tn.mbrd.ru:9092"}
pass := "XXXXX"
user := "XXXXX"
var adminClient *kadm.Client
{
client, err := kgo.NewClient(
kgo.SeedBrokers(seeds...),
// kgo.SASL((scram.Auth{User: user, Pass: pass}).AsSha512Mechanism()),
kgo.SASL((plain.Auth{User: user, Pass: pass}).AsMechanism()),
// Do not try to send requests newer than 2.4.0 to avoid breaking changes in the request struct.
// Sometimes there are breaking changes for newer versions where more properties are required to set.
kgo.MaxVersions(kversion.V2_4_0()),
)
if err != nil {
panic(err)
}
defer client.Close()
adminClient = kadm.NewClient(client)
}
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
dg, err := adminClient.DescribeGroups(ctx, "interestrate_loader")
if err != nil {
die("failed to describe group: %v", err)
}
for _, m := range dg["interestrate_loader"].Members {
mc, _ := m.Assigned.AsConsumer()
for _, mt := range mc.Topics {
for _, p := range mt.Partitions {
fmt.Printf("client:%s\tpartitions: %d\n", m.ClientID, p)
}
}
}
}

130
kgo.go
View File

@@ -1,21 +1,25 @@
// Package kgo provides a kafka broker using kgo // Package kgo provides a kafka broker using kgo
package kgo // import "go.unistack.org/micro-broker-kgo/v4" package kgo
import ( import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"math/rand" "math/rand"
"net/http"
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/google/uuid"
"github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg" "github.com/twmb/franz-go/pkg/kmsg"
"go.unistack.org/micro/v4/broker" "go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v4/metadata" "go.unistack.org/micro/v3/logger"
id "go.unistack.org/micro/v4/util/id" "go.unistack.org/micro/v3/metadata"
mrand "go.unistack.org/micro/v4/util/rand" "go.unistack.org/micro/v3/semconv"
"go.unistack.org/micro/v3/tracer"
mrand "go.unistack.org/micro/v3/util/rand"
) )
var _ broker.Broker = (*Broker)(nil) var _ broker.Broker = (*Broker)(nil)
@@ -59,7 +63,7 @@ type Broker struct {
connected bool connected bool
sync.RWMutex sync.RWMutex
opts broker.Options opts broker.Options
subs []*subscriber subs []*Subscriber
} }
func (k *Broker) Address() string { func (k *Broker) Address() string {
@@ -70,23 +74,54 @@ func (k *Broker) Name() string {
return k.opts.Name return k.opts.Name
} }
func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, error) { func (k *Broker) Client() *kgo.Client {
return k.c
}
func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, *hookTracer, error) {
var c *kgo.Client var c *kgo.Client
var err error var err error
sp, _ := tracer.SpanFromContext(ctx)
clientID := "kgo"
group := ""
if k.opts.Context != nil {
if id, ok := k.opts.Context.Value(clientIDKey{}).(string); ok {
clientID = id
}
if id, ok := k.opts.Context.Value(groupKey{}).(string); ok {
group = id
}
}
htracer := &hookTracer{group: group, clientID: clientID, tracer: k.opts.Tracer}
opts = append(opts,
kgo.WithHooks(&hookMeter{meter: k.opts.Meter}),
kgo.WithHooks(htracer),
)
select { select {
case <-ctx.Done(): case <-ctx.Done():
return nil, ctx.Err() if ctx.Err() != nil {
if sp != nil {
sp.SetStatus(tracer.SpanStatusError, ctx.Err().Error())
}
}
return nil, nil, ctx.Err()
default: default:
c, err = kgo.NewClient(opts...) c, err = kgo.NewClient(opts...)
if err == nil { if err == nil {
err = c.Ping(ctx) // check connectivity to cluster err = c.Ping(ctx) // check connectivity to cluster
} }
if err != nil { if err != nil {
return nil, err if sp != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
}
return nil, nil, err
} }
} }
return c, nil return c, htracer, nil
} }
func (k *Broker) Connect(ctx context.Context) error { func (k *Broker) Connect(ctx context.Context) error {
@@ -102,7 +137,7 @@ func (k *Broker) Connect(ctx context.Context) error {
nctx = ctx nctx = ctx
} }
c, err := k.connect(nctx, k.kopts...) c, _, err := k.connect(nctx, k.kopts...)
if err != nil { if err != nil {
return err return err
} }
@@ -127,6 +162,9 @@ func (k *Broker) Disconnect(ctx context.Context) error {
if ctx != nil { if ctx != nil {
nctx = ctx nctx = ctx
} }
var span tracer.Span
ctx, span = k.opts.Tracer.Start(ctx, "Disconnect")
defer span.Finish()
k.Lock() k.Lock()
defer k.Unlock() defer k.Unlock()
@@ -184,6 +222,7 @@ func (k *Broker) Init(opts ...broker.Option) error {
} }
k.init = true k.init = true
return nil return nil
} }
@@ -201,21 +240,17 @@ func (k *Broker) Publish(ctx context.Context, topic string, msg *broker.Message,
} }
func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
k.RLock()
ok := k.connected
k.RUnlock()
if !ok {
k.Lock() k.Lock()
c, err := k.connect(ctx, k.kopts...) if !k.connected {
c, _, err := k.connect(ctx, k.kopts...)
if err != nil { if err != nil {
k.Unlock() k.Unlock()
return err return err
} }
k.c = c k.c = c
k.connected = true k.connected = true
k.Unlock()
} }
k.Unlock()
options := broker.NewPublishOptions(opts...) options := broker.NewPublishOptions(opts...)
records := make([]*kgo.Record, 0, len(msgs)) records := make([]*kgo.Record, 0, len(msgs))
@@ -236,13 +271,12 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br
for _, msg := range msgs { for _, msg := range msgs {
rec := &kgo.Record{Context: ctx, Key: key} rec := &kgo.Record{Context: ctx, Key: key}
rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic) rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic)
k.opts.Meter.Counter(broker.PublishMessageInflight, "endpoint", rec.Topic).Inc() msg.Header.Del(metadata.HeaderTopic)
if options.BodyOnly { k.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Inc()
rec.Value = msg.Body if options.BodyOnly || k.opts.Codec.String() == "noop" {
} else if k.opts.Codec.String() == "noop" {
rec.Value = msg.Body rec.Value = msg.Body
for k, v := range msg.Header { for k, v := range msg.Header {
rec.Headers = append(rec.Headers, kgo.RecordHeader{Key: k, Value: []byte(v)}) rec.Headers = append(rec.Headers, kgo.RecordHeader{Key: http.CanonicalHeaderKey(k), Value: []byte(v)})
} }
} else { } else {
rec.Value, err = k.opts.Codec.Marshal(msg) rec.Value, err = k.opts.Codec.Marshal(msg)
@@ -258,13 +292,13 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br
for _, rec := range records { for _, rec := range records {
k.c.Produce(ctx, rec, func(r *kgo.Record, err error) { k.c.Produce(ctx, rec, func(r *kgo.Record, err error) {
te := time.Since(ts) te := time.Since(ts)
k.opts.Meter.Counter(broker.PublishMessageInflight, "endpoint", rec.Topic).Dec() k.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Dec()
k.opts.Meter.Summary(broker.PublishMessageLatencyMicroseconds, "endpoint", r.Topic).Update(te.Seconds()) k.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", rec.Topic, "topic", rec.Topic).Update(te.Seconds())
k.opts.Meter.Histogram(broker.PublishMessageDurationSeconds, "endpoint", r.Topic).Update(te.Seconds()) k.opts.Meter.Histogram(semconv.PublishMessageDurationSeconds, "endpoint", rec.Topic, "topic", rec.Topic).Update(te.Seconds())
if err != nil { if err != nil {
k.opts.Meter.Counter(broker.PublishMessageTotal, "endpoint", r.Topic, "status", "failure").Inc() k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "failure").Inc()
} else { } else {
k.opts.Meter.Counter(broker.PublishMessageTotal, "endpoint", r.Topic, "status", "success").Inc() k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "success").Inc()
} }
promise(r, err) promise(r, err)
}) })
@@ -275,14 +309,14 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br
results := k.c.ProduceSync(ctx, records...) results := k.c.ProduceSync(ctx, records...)
te := time.Since(ts) te := time.Since(ts)
for _, result := range results { for _, result := range results {
k.opts.Meter.Summary(broker.PublishMessageLatencyMicroseconds, "endpoint", result.Record.Topic).Update(te.Seconds()) k.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Update(te.Seconds())
k.opts.Meter.Histogram(broker.PublishMessageDurationSeconds, "endpoint", result.Record.Topic).Update(te.Seconds()) k.opts.Meter.Histogram(semconv.PublishMessageDurationSeconds, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Update(te.Seconds())
k.opts.Meter.Counter(broker.PublishMessageInflight, "endpoint", result.Record.Topic).Dec() k.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Dec()
if result.Err != nil { if result.Err != nil {
k.opts.Meter.Counter(broker.PublishMessageTotal, "endpoint", result.Record.Topic, "status", "failure").Inc() k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "failure").Inc()
errs = append(errs, result.Err.Error()) errs = append(errs, result.Err.Error())
} else { } else {
k.opts.Meter.Counter(broker.PublishMessageTotal, "endpoint", result.Record.Topic, "status", "success").Inc() k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "success").Inc()
} }
} }
@@ -293,6 +327,22 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br
return nil return nil
} }
func (k *Broker) TopicExists(ctx context.Context, topic string) error {
mdreq := kmsg.NewMetadataRequest()
mdreq.Topics = []kmsg.MetadataRequestTopic{
{Topic: &topic},
}
mdrsp, err := mdreq.RequestWith(ctx, k.c)
if err != nil {
return err
} else if mdrsp.Topics[0].ErrorCode != 0 {
return fmt.Errorf("topic %s not exists or permission error", topic)
}
return nil
}
func (k *Broker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { func (k *Broker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
return nil, nil return nil, nil
} }
@@ -301,11 +351,11 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han
options := broker.NewSubscribeOptions(opts...) options := broker.NewSubscribeOptions(opts...)
if options.Group == "" { if options.Group == "" {
uid, err := id.New() uid, err := uuid.NewRandom()
if err != nil { if err != nil {
return nil, err return nil, err
} }
options.Group = uid options.Group = uid.String()
} }
commitInterval := DefaultCommitInterval commitInterval := DefaultCommitInterval
@@ -315,7 +365,7 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han
} }
} }
sub := &subscriber{ sub := &Subscriber{
topic: topic, topic: topic,
opts: options, opts: options,
handler: handler, handler: handler,
@@ -342,7 +392,7 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han
} }
} }
c, err := k.connect(ctx, kopts...) c, htracer, err := k.connect(ctx, kopts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -360,6 +410,8 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han
} }
sub.c = c sub.c = c
sub.htracer = htracer
go sub.poll(ctx) go sub.poll(ctx)
k.Lock() k.Lock()
@@ -385,9 +437,7 @@ func NewBroker(opts ...broker.Option) *Broker {
kgo.DialTimeout(3 * time.Second), kgo.DialTimeout(3 * time.Second),
kgo.DisableIdempotentWrite(), kgo.DisableIdempotentWrite(),
kgo.ProducerBatchCompression(kgo.NoCompression()), kgo.ProducerBatchCompression(kgo.NoCompression()),
kgo.WithLogger(&mlogger{l: options.Logger, ctx: options.Context}), kgo.WithLogger(&mlogger{l: options.Logger.Clone(logger.WithCallerSkipCount(options.Logger.Options().CallerSkipCount + 2)), ctx: options.Context}),
// kgo.WithLogger(kgo.BasicLogger(os.Stderr, kgo.LogLevelDebug, func() string { return time.Now().Format(time.StampMilli) })),
kgo.WithHooks(&metrics{meter: options.Meter}),
kgo.SeedBrokers(kaddrs...), kgo.SeedBrokers(kaddrs...),
kgo.RetryBackoffFn(DefaultRetryBackoffFn), kgo.RetryBackoffFn(DefaultRetryBackoffFn),
kgo.BlockRebalanceOnPoll(), kgo.BlockRebalanceOnPoll(),

View File

@@ -10,10 +10,10 @@ import (
"time" "time"
kg "github.com/twmb/franz-go/pkg/kgo" kg "github.com/twmb/franz-go/pkg/kgo"
kgo "go.unistack.org/micro-broker-kgo/v4" kgo "go.unistack.org/micro-broker-kgo/v3"
"go.unistack.org/micro/v4/broker" "go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v4/logger" "go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v4/metadata" "go.unistack.org/micro/v3/metadata"
) )
var ( var (

View File

@@ -4,7 +4,7 @@ import (
"context" "context"
"github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kgo"
"go.unistack.org/micro/v4/logger" "go.unistack.org/micro/v3/logger"
) )
type mlogger struct { type mlogger struct {

View File

@@ -6,57 +6,29 @@ import (
"time" "time"
"github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kgo"
"go.unistack.org/micro/v4/meter" "go.unistack.org/micro/v3/meter"
) )
/* type hookMeter struct {
func NewServerSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
handler := &wrapper{
opts: NewOptions(opts...),
}
return handler.SubscriberFunc
}
func (w *wrapper) SubscriberFunc(fn server.SubscriberFunc) server.SubscriberFunc {
return func(ctx context.Context, msg server.Message) error {
endpoint := msg.Topic()
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Inc()
ts := time.Now()
err := fn(ctx, msg)
te := time.Since(ts)
w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Dec()
w.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, labels...).Update(te.Seconds())
w.opts.Meter.Histogram(SubscribeMessageDurationSeconds, labels...).Update(te.Seconds())
if err == nil {
labels = append(labels, labelStatus, labelSuccess)
} else {
labels = append(labels, labelStatus, labelFailure)
}
w.opts.Meter.Counter(SubscribeMessageTotal, labels...).Inc()
return err
}
}
*/
type metrics struct {
meter meter.Meter meter meter.Meter
} }
var ( var (
_ kgo.HookBrokerConnect = &metrics{} _ kgo.HookBrokerConnect = &hookMeter{}
_ kgo.HookBrokerDisconnect = &metrics{} _ kgo.HookBrokerDisconnect = &hookMeter{}
_ kgo.HookBrokerRead = &metrics{} // HookBrokerE2E
_ kgo.HookBrokerThrottle = &metrics{} _ kgo.HookBrokerRead = &hookMeter{}
_ kgo.HookBrokerWrite = &metrics{} _ kgo.HookBrokerThrottle = &hookMeter{}
_ kgo.HookFetchBatchRead = &metrics{} _ kgo.HookBrokerWrite = &hookMeter{}
_ kgo.HookProduceBatchWritten = &metrics{} _ kgo.HookFetchBatchRead = &hookMeter{}
_ kgo.HookGroupManageError = &metrics{} // HookFetchRecordBuffered
// HookFetchRecordUnbuffered
_ kgo.HookGroupManageError = &hookMeter{}
// HookNewClient
_ kgo.HookProduceBatchWritten = &hookMeter{}
// HookProduceRecordBuffered
// HookProduceRecordPartitioned
// HookProduceRecordUnbuffered
) )
const ( const (
@@ -89,11 +61,11 @@ const (
labelTopic = "topic" labelTopic = "topic"
) )
func (m *metrics) OnGroupManageError(err error) { func (m *hookMeter) OnGroupManageError(err error) {
m.meter.Counter(metricBrokerGroupErrors).Inc() m.meter.Counter(metricBrokerGroupErrors).Inc()
} }
func (m *metrics) OnBrokerConnect(meta kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) { func (m *hookMeter) OnBrokerConnect(meta kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) {
node := strconv.Itoa(int(meta.NodeID)) node := strconv.Itoa(int(meta.NodeID))
if err != nil { if err != nil {
m.meter.Counter(metricBrokerConnects, labelNode, node, labelStatus, labelFaulure).Inc() m.meter.Counter(metricBrokerConnects, labelNode, node, labelStatus, labelFaulure).Inc()
@@ -102,12 +74,12 @@ func (m *metrics) OnBrokerConnect(meta kgo.BrokerMetadata, _ time.Duration, _ ne
m.meter.Counter(metricBrokerConnects, labelNode, node, labelStatus, labelSuccess).Inc() m.meter.Counter(metricBrokerConnects, labelNode, node, labelStatus, labelSuccess).Inc()
} }
func (m *metrics) OnBrokerDisconnect(meta kgo.BrokerMetadata, _ net.Conn) { func (m *hookMeter) OnBrokerDisconnect(meta kgo.BrokerMetadata, _ net.Conn) {
node := strconv.Itoa(int(meta.NodeID)) node := strconv.Itoa(int(meta.NodeID))
m.meter.Counter(metricBrokerDisconnects, labelNode, node).Inc() m.meter.Counter(metricBrokerDisconnects, labelNode, node).Inc()
} }
func (m *metrics) OnBrokerWrite(meta kgo.BrokerMetadata, _ int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error) { func (m *hookMeter) OnBrokerWrite(meta kgo.BrokerMetadata, _ int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error) {
node := strconv.Itoa(int(meta.NodeID)) node := strconv.Itoa(int(meta.NodeID))
if err != nil { if err != nil {
m.meter.Counter(metricBrokerWriteErrors, labelNode, node).Inc() m.meter.Counter(metricBrokerWriteErrors, labelNode, node).Inc()
@@ -118,7 +90,7 @@ func (m *metrics) OnBrokerWrite(meta kgo.BrokerMetadata, _ int16, bytesWritten i
m.meter.Histogram(metricBrokerWriteLatencies, labelNode, node).Update(timeToWrite.Seconds()) m.meter.Histogram(metricBrokerWriteLatencies, labelNode, node).Update(timeToWrite.Seconds())
} }
func (m *metrics) OnBrokerRead(meta kgo.BrokerMetadata, _ int16, bytesRead int, readWait, timeToRead time.Duration, err error) { func (m *hookMeter) OnBrokerRead(meta kgo.BrokerMetadata, _ int16, bytesRead int, readWait, timeToRead time.Duration, err error) {
node := strconv.Itoa(int(meta.NodeID)) node := strconv.Itoa(int(meta.NodeID))
if err != nil { if err != nil {
m.meter.Counter(metricBrokerReadErrors, labelNode, node).Inc() m.meter.Counter(metricBrokerReadErrors, labelNode, node).Inc()
@@ -130,18 +102,18 @@ func (m *metrics) OnBrokerRead(meta kgo.BrokerMetadata, _ int16, bytesRead int,
m.meter.Histogram(metricBrokerReadLatencies, labelNode, node).Update(timeToRead.Seconds()) m.meter.Histogram(metricBrokerReadLatencies, labelNode, node).Update(timeToRead.Seconds())
} }
func (m *metrics) OnBrokerThrottle(meta kgo.BrokerMetadata, throttleInterval time.Duration, _ bool) { func (m *hookMeter) OnBrokerThrottle(meta kgo.BrokerMetadata, throttleInterval time.Duration, _ bool) {
node := strconv.Itoa(int(meta.NodeID)) node := strconv.Itoa(int(meta.NodeID))
m.meter.Histogram(metricBrokerThrottleLatencies, labelNode, node).Update(throttleInterval.Seconds()) m.meter.Histogram(metricBrokerThrottleLatencies, labelNode, node).Update(throttleInterval.Seconds())
} }
func (m *metrics) OnProduceBatchWritten(meta kgo.BrokerMetadata, topic string, _ int32, kmetrics kgo.ProduceBatchMetrics) { func (m *hookMeter) OnProduceBatchWritten(meta kgo.BrokerMetadata, topic string, _ int32, kmetrics kgo.ProduceBatchMetrics) {
node := strconv.Itoa(int(meta.NodeID)) node := strconv.Itoa(int(meta.NodeID))
m.meter.Counter(metricBrokerProduceBytesUncompressed, labelNode, node, labelTopic, topic).Add(kmetrics.UncompressedBytes) m.meter.Counter(metricBrokerProduceBytesUncompressed, labelNode, node, labelTopic, topic).Add(kmetrics.UncompressedBytes)
m.meter.Counter(metricBrokerProduceBytesCompressed, labelNode, node, labelTopic, topic).Add(kmetrics.CompressedBytes) m.meter.Counter(metricBrokerProduceBytesCompressed, labelNode, node, labelTopic, topic).Add(kmetrics.CompressedBytes)
} }
func (m *metrics) OnFetchBatchRead(meta kgo.BrokerMetadata, topic string, _ int32, kmetrics kgo.FetchBatchMetrics) { func (m *hookMeter) OnFetchBatchRead(meta kgo.BrokerMetadata, topic string, _ int32, kmetrics kgo.FetchBatchMetrics) {
node := strconv.Itoa(int(meta.NodeID)) node := strconv.Itoa(int(meta.NodeID))
m.meter.Counter(metricBrokerFetchBytesUncompressed, labelNode, node, labelTopic, topic).Add(kmetrics.UncompressedBytes) m.meter.Counter(metricBrokerFetchBytesUncompressed, labelNode, node, labelTopic, topic).Add(kmetrics.UncompressedBytes)
m.meter.Counter(metricBrokerFetchBytesCompressed, labelNode, node, labelTopic, topic).Add(kmetrics.CompressedBytes) m.meter.Counter(metricBrokerFetchBytesCompressed, labelNode, node, labelTopic, topic).Add(kmetrics.CompressedBytes)

View File

@@ -5,12 +5,21 @@ import (
"time" "time"
"github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kgo"
"go.unistack.org/micro/v4/broker" "go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v4/client" "go.unistack.org/micro/v3/client"
) )
var (
// DefaultCommitInterval specifies how fast send commit offsets to kafka // DefaultCommitInterval specifies how fast send commit offsets to kafka
var DefaultCommitInterval = 5 * time.Second DefaultCommitInterval = 5 * time.Second
// DefaultStatsInterval specifies how fast check consumer lag
DefaultStatsInterval = 5 * time.Second
// DefaultSubscribeMaxInflight specifies how much messages keep inflight
DefaultSubscribeMaxInflight = 100
)
type subscribeContextKey struct{} type subscribeContextKey struct{}
@@ -63,6 +72,18 @@ func SubscribeOptions(opts ...kgo.Opt) broker.SubscribeOption {
} }
} }
type clientIDKey struct{}
func ClientID(id string) broker.Option {
return broker.SetOption(clientIDKey{}, id)
}
type groupKey struct{}
func Group(id string) broker.Option {
return broker.SetOption(groupKey{}, id)
}
type commitIntervalKey struct{} type commitIntervalKey struct{}
// CommitInterval specifies interval to send commits // CommitInterval specifies interval to send commits
@@ -70,8 +91,6 @@ func CommitInterval(td time.Duration) broker.Option {
return broker.SetOption(commitIntervalKey{}, td) return broker.SetOption(commitIntervalKey{}, td)
} }
var DefaultSubscribeMaxInflight = 10
type subscribeMaxInflightKey struct{} type subscribeMaxInflightKey struct{}
// SubscribeMaxInFlight max queued messages // SubscribeMaxInFlight max queued messages

View File

@@ -2,13 +2,17 @@ package kgo
import ( import (
"context" "context"
"strconv"
"sync" "sync"
"time" "time"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kgo"
"go.unistack.org/micro/v4/broker" "go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v4/logger" "go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v4/metadata" "go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/semconv"
"go.unistack.org/micro/v3/tracer"
) )
type tp struct { type tp struct {
@@ -20,6 +24,7 @@ type consumer struct {
c *kgo.Client c *kgo.Client
topic string topic string
partition int32 partition int32
htracer *hookTracer
opts broker.SubscribeOptions opts broker.SubscribeOptions
kopts broker.Options kopts broker.Options
handler broker.Handler handler broker.Handler
@@ -28,9 +33,10 @@ type consumer struct {
recs chan kgo.FetchTopicPartition recs chan kgo.FetchTopicPartition
} }
type subscriber struct { type Subscriber struct {
c *kgo.Client c *kgo.Client
topic string topic string
htracer *hookTracer
opts broker.SubscribeOptions opts broker.SubscribeOptions
kopts broker.Options kopts broker.Options
handler broker.Handler handler broker.Handler
@@ -40,23 +46,28 @@ type subscriber struct {
sync.RWMutex sync.RWMutex
} }
func (s *subscriber) Options() broker.SubscribeOptions { func (s *Subscriber) Client() *kgo.Client {
return s.c
}
func (s *Subscriber) Options() broker.SubscribeOptions {
return s.opts return s.opts
} }
func (s *subscriber) Topic() string { func (s *Subscriber) Topic() string {
return s.topic return s.topic
} }
func (s *subscriber) Unsubscribe(ctx context.Context) error { func (s *Subscriber) Unsubscribe(ctx context.Context) error {
if s.closed { if s.closed {
return nil return nil
} }
select { select {
case <-ctx.Done(): // case <-ctx.Done():
return ctx.Err() // return ctx.Err()
default: default:
s.c.PauseFetchTopics(s.topic) s.c.PauseFetchTopics(s.topic)
s.c.CloseAllowingRebalance()
kc := make(map[string][]int32) kc := make(map[string][]int32)
for ctp := range s.consumers { for ctp := range s.consumers {
kc[ctp.t] = append(kc[ctp.t], ctp.p) kc[ctp.t] = append(kc[ctp.t], ctp.p)
@@ -64,24 +75,59 @@ func (s *subscriber) Unsubscribe(ctx context.Context) error {
s.killConsumers(ctx, kc) s.killConsumers(ctx, kc)
close(s.done) close(s.done)
s.closed = true s.closed = true
s.c.ResumeFetchTopics(s.topic)
} }
return nil return nil
} }
func (s *subscriber) poll(ctx context.Context) { func (s *Subscriber) poll(ctx context.Context) {
maxInflight := DefaultSubscribeMaxInflight maxInflight := DefaultSubscribeMaxInflight
if s.opts.Context != nil { if s.opts.Context != nil {
if n, ok := s.opts.Context.Value(subscribeMaxInflightKey{}).(int); n > 0 && ok { if n, ok := s.opts.Context.Value(subscribeMaxInflightKey{}).(int); n > 0 && ok {
maxInflight = n maxInflight = n
} }
} }
go func() {
ac := kadm.NewClient(s.c)
ticker := time.NewTicker(DefaultStatsInterval)
for {
select {
case <-ctx.Done():
ticker.Stop()
return
case <-ticker.C:
dgls, err := ac.Lag(ctx, s.opts.Group)
if err != nil || !dgls.Ok() {
continue
}
dgl, ok := dgls[s.opts.Group]
if !ok {
continue
}
lmap, ok := dgl.Lag[s.topic]
if !ok {
continue
}
s.Lock()
for p, l := range lmap {
s.kopts.Meter.Counter(semconv.BrokerGroupLag, "topic", s.topic, "group", s.opts.Group, "partition", strconv.Itoa(int(p)), "lag", strconv.Itoa(int(l.Lag)))
}
s.Unlock()
}
}
}()
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
s.c.CloseAllowingRebalance() s.c.CloseAllowingRebalance()
return return
case <-s.done: case <-s.done:
s.c.CloseAllowingRebalance()
return return
default: default:
fetches := s.c.PollRecords(ctx, maxInflight) fetches := s.c.PollRecords(ctx, maxInflight)
@@ -102,7 +148,7 @@ func (s *subscriber) poll(ctx context.Context) {
} }
} }
func (s *subscriber) killConsumers(ctx context.Context, lost map[string][]int32) { func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32) {
var wg sync.WaitGroup var wg sync.WaitGroup
defer wg.Wait() defer wg.Wait()
@@ -119,12 +165,12 @@ func (s *subscriber) killConsumers(ctx context.Context, lost map[string][]int32)
} }
} }
func (s *subscriber) lost(ctx context.Context, _ *kgo.Client, lost map[string][]int32) { func (s *Subscriber) lost(ctx context.Context, _ *kgo.Client, lost map[string][]int32) {
s.kopts.Logger.Debugf(ctx, "[kgo] lost %#+v", lost) s.kopts.Logger.Debugf(ctx, "[kgo] lost %#+v", lost)
s.killConsumers(ctx, lost) s.killConsumers(ctx, lost)
} }
func (s *subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[string][]int32) { func (s *Subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[string][]int32) {
s.kopts.Logger.Debugf(ctx, "[kgo] revoked %#+v", revoked) s.kopts.Logger.Debugf(ctx, "[kgo] revoked %#+v", revoked)
s.killConsumers(ctx, revoked) s.killConsumers(ctx, revoked)
if err := c.CommitMarkedOffsets(ctx); err != nil { if err := c.CommitMarkedOffsets(ctx); err != nil {
@@ -132,22 +178,24 @@ func (s *subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[str
} }
} }
func (s *subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[string][]int32) { func (s *Subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[string][]int32) {
for topic, partitions := range assigned { for topic, partitions := range assigned {
for _, partition := range partitions { for _, partition := range partitions {
pc := &consumer{ pc := &consumer{
c: c, c: c,
topic: topic, topic: topic,
partition: partition, partition: partition,
htracer: s.htracer,
quit: make(chan struct{}), quit: make(chan struct{}),
done: make(chan struct{}), done: make(chan struct{}),
recs: make(chan kgo.FetchTopicPartition, 4), recs: make(chan kgo.FetchTopicPartition, 100),
handler: s.handler, handler: s.handler,
kopts: s.kopts, kopts: s.kopts,
opts: s.opts, opts: s.opts,
} }
s.Lock()
s.consumers[tp{topic, partition}] = pc s.consumers[tp{topic, partition}] = pc
s.Unlock()
go pc.consume() go pc.consume()
} }
} }
@@ -169,30 +217,36 @@ func (pc *consumer) consume() {
return return
case p := <-pc.recs: case p := <-pc.recs:
for _, record := range p.Records { for _, record := range p.Records {
ctx, sp := pc.htracer.WithProcessSpan(record)
ts := time.Now() ts := time.Now()
pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Inc() pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Inc()
p := eventPool.Get().(*event) p := eventPool.Get().(*event)
p.msg.Header = nil p.msg.Header = nil
p.msg.Body = nil p.msg.Body = nil
p.topic = record.Topic p.topic = record.Topic
p.err = nil p.err = nil
p.ack = false p.ack = false
if pc.kopts.Codec.String() == "noop" {
p.msg.Header = metadata.New(len(record.Headers)) p.msg.Header = metadata.New(len(record.Headers))
p.ctx = ctx
for _, hdr := range record.Headers { for _, hdr := range record.Headers {
p.msg.Header.Set(hdr.Key, string(hdr.Value)) p.msg.Header.Set(hdr.Key, string(hdr.Value))
} }
if pc.kopts.Codec.String() == "noop" {
p.msg.Body = record.Value p.msg.Body = record.Value
} else if pc.opts.BodyOnly { } else if pc.opts.BodyOnly {
p.msg.Body = record.Value p.msg.Body = record.Value
} else { } else {
if err := pc.kopts.Codec.Unmarshal(record.Value, p.msg); err != nil { sp.AddEvent("codec unmarshal start")
pc.kopts.Meter.Counter(broker.SubscribeMessageTotal, "endpoint", record.Topic, "status", "failure").Inc() err := pc.kopts.Codec.Unmarshal(record.Value, p.msg)
sp.AddEvent("codec unmarshal stop")
if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "failure").Inc()
p.err = err p.err = err
p.msg.Body = record.Value p.msg.Body = record.Value
if eh != nil { if eh != nil {
_ = eh(p) _ = eh(p)
pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Dec() pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Dec()
if p.ack { if p.ack {
pc.c.MarkCommitRecords(record) pc.c.MarkCommitRecords(record)
} else { } else {
@@ -202,8 +256,8 @@ func (pc *consumer) consume() {
} }
eventPool.Put(p) eventPool.Put(p)
te := time.Since(ts) te := time.Since(ts)
pc.kopts.Meter.Summary(broker.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic).Update(te.Seconds()) pc.kopts.Meter.Summary(semconv.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds())
pc.kopts.Meter.Histogram(broker.SubscribeMessageDurationSeconds, "endpoint", record.Topic).Update(te.Seconds()) pc.kopts.Meter.Histogram(semconv.SubscribeMessageDurationSeconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds())
continue continue
} else { } else {
if pc.kopts.Logger.V(logger.ErrorLevel) { if pc.kopts.Logger.V(logger.ErrorLevel) {
@@ -211,27 +265,33 @@ func (pc *consumer) consume() {
} }
} }
te := time.Since(ts) te := time.Since(ts)
pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Dec() pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Dec()
pc.kopts.Meter.Summary(broker.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic).Update(te.Seconds()) pc.kopts.Meter.Summary(semconv.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds())
pc.kopts.Meter.Histogram(broker.SubscribeMessageDurationSeconds, "endpoint", record.Topic).Update(te.Seconds()) pc.kopts.Meter.Histogram(semconv.SubscribeMessageDurationSeconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds())
eventPool.Put(p) eventPool.Put(p)
pc.kopts.Logger.Fatalf(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?") pc.kopts.Logger.Fatalf(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?")
sp.Finish()
return return
} }
} }
sp.AddEvent("handler start")
err := pc.handler(p) err := pc.handler(p)
sp.AddEvent("handler stop")
if err == nil { if err == nil {
pc.kopts.Meter.Counter(broker.SubscribeMessageTotal, "endpoint", record.Topic, "status", "success").Inc() pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "success").Inc()
} else { } else {
pc.kopts.Meter.Counter(broker.SubscribeMessageTotal, "endpoint", record.Topic, "status", "failure").Inc() sp.SetStatus(tracer.SpanStatusError, err.Error())
pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "failure").Inc()
} }
pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Dec() pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Dec()
if err == nil && pc.opts.AutoAck { if err == nil && pc.opts.AutoAck {
p.ack = true p.ack = true
} else if err != nil { } else if err != nil {
p.err = err p.err = err
if eh != nil { if eh != nil {
sp.AddEvent("error handler start")
_ = eh(p) _ = eh(p)
sp.AddEvent("error handler stop")
} else { } else {
if pc.kopts.Logger.V(logger.ErrorLevel) { if pc.kopts.Logger.V(logger.ErrorLevel) {
pc.kopts.Logger.Errorf(pc.kopts.Context, "[kgo]: subscriber error: %v", err) pc.kopts.Logger.Errorf(pc.kopts.Context, "[kgo]: subscriber error: %v", err)
@@ -239,16 +299,19 @@ func (pc *consumer) consume() {
} }
} }
te := time.Since(ts) te := time.Since(ts)
pc.kopts.Meter.Summary(broker.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic).Update(te.Seconds()) pc.kopts.Meter.Summary(semconv.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds())
pc.kopts.Meter.Histogram(broker.SubscribeMessageDurationSeconds, "endpoint", record.Topic).Update(te.Seconds()) pc.kopts.Meter.Histogram(semconv.SubscribeMessageDurationSeconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds())
if p.ack { if p.ack {
eventPool.Put(p) eventPool.Put(p)
pc.c.MarkCommitRecords(record) pc.c.MarkCommitRecords(record)
} else { } else {
eventPool.Put(p) eventPool.Put(p)
pc.kopts.Logger.Fatalf(pc.kopts.Context, "[kgo] ErrLostMessage wtf?") pc.kopts.Logger.Fatalf(pc.kopts.Context, "[kgo] ErrLostMessage wtf?")
sp.SetStatus(tracer.SpanStatusError, "ErrLostMessage")
sp.Finish()
return return
} }
sp.Finish()
} }
} }
} }

204
tracer.go Normal file
View File

@@ -0,0 +1,204 @@
package kgo
import (
"context"
"unicode/utf8"
"github.com/twmb/franz-go/pkg/kgo"
semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/tracer"
)
type hookTracer struct {
clientID string
group string
tracer tracer.Tracer
}
var messagingSystem = semconv.MessagingSystemKey.String("kafka")
var (
_ kgo.HookProduceRecordBuffered = (*hookTracer)(nil)
_ kgo.HookProduceRecordUnbuffered = (*hookTracer)(nil)
_ kgo.HookFetchRecordBuffered = (*hookTracer)(nil)
_ kgo.HookFetchRecordUnbuffered = (*hookTracer)(nil)
)
// OnProduceRecordBuffered starts a new span for the "publish" operation on a
// buffered record.
//
// It sets span options and injects the span context into record and updates
// the record's context, so it can be ended in the OnProduceRecordUnbuffered
// hook.
func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) {
// Set up span options.
attrs := []interface{}{
messagingSystem,
semconv.MessagingDestinationKindTopic,
semconv.MessagingDestinationName(r.Topic),
semconv.MessagingOperationPublish,
}
attrs = maybeKeyAttr(attrs, r)
if m.clientID != "" {
attrs = append(attrs, semconv.MessagingKafkaClientIDKey.String(m.clientID))
}
opts := []tracer.SpanOption{
tracer.WithSpanLabels(attrs...),
tracer.WithSpanKind(tracer.SpanKindProducer),
}
if r.Context == nil {
r.Context = context.Background()
}
md, ok := metadata.FromOutgoingContext(r.Context)
if !ok {
md = metadata.New(len(r.Headers))
}
for _, h := range r.Headers {
md.Set(h.Key, string(h.Value))
}
if !ok {
r.Context, _ = m.tracer.Start(metadata.NewOutgoingContext(r.Context, md), "sdk.broker", opts...)
} else {
r.Context, _ = m.tracer.Start(r.Context, "sdk.broker", opts...)
}
md, _ = metadata.FromOutgoingContext(r.Context)
setHeaders(r, md)
}
// OnProduceRecordUnbuffered continues and ends the "publish" span for an
// unbuffered record.
//
// It sets attributes with values unset when producing and records any error
// that occurred during the publish operation.
func (m *hookTracer) OnProduceRecordUnbuffered(r *kgo.Record, err error) {
if span, ok := tracer.SpanFromContext(r.Context); ok {
span.AddLabels(
semconv.MessagingKafkaDestinationPartition(int(r.Partition)),
)
if err != nil {
span.SetStatus(tracer.SpanStatusError, err.Error())
}
span.Finish()
}
}
// OnFetchRecordBuffered starts a new span for the "receive" operation on a
// buffered record.
//
// It sets the span options and extracts the span context from the record,
// updates the record's context to ensure it can be ended in the
// OnFetchRecordUnbuffered hook and can be used in downstream consumer
// processing.
func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) {
// Set up the span options.
attrs := []interface{}{
messagingSystem,
semconv.MessagingSourceKindTopic,
semconv.MessagingSourceName(r.Topic),
semconv.MessagingOperationReceive,
semconv.MessagingKafkaSourcePartition(int(r.Partition)),
}
attrs = maybeKeyAttr(attrs, r)
if m.clientID != "" {
attrs = append(attrs, semconv.MessagingKafkaClientIDKey.String(m.clientID))
}
if m.group != "" {
attrs = append(attrs, semconv.MessagingKafkaConsumerGroupKey.String(m.group))
}
opts := []tracer.SpanOption{
tracer.WithSpanLabels(attrs...),
tracer.WithSpanKind(tracer.SpanKindConsumer),
}
if r.Context == nil {
r.Context = context.Background()
}
md, ok := metadata.FromIncomingContext(r.Context)
if !ok {
md = metadata.New(len(r.Headers))
}
for _, h := range r.Headers {
md.Set(h.Key, string(h.Value))
}
if !ok {
r.Context, _ = m.tracer.Start(metadata.NewIncomingContext(r.Context, md), "sdk.broker", opts...)
} else {
r.Context, _ = m.tracer.Start(r.Context, "sdk.broker", opts...)
}
md, _ = metadata.FromIncomingContext(r.Context)
setHeaders(r, md)
}
// OnFetchRecordUnbuffered continues and ends the "receive" span for an
// unbuffered record.
func (m *hookTracer) OnFetchRecordUnbuffered(r *kgo.Record, _ bool) {
span, _ := tracer.SpanFromContext(r.Context)
span.Finish()
}
// WithProcessSpan starts a new span for the "process" operation on a consumer
// record.
//
// It sets up the span options. The user's application code is responsible for
// ending the span.
//
// This should only ever be called within a polling loop of a consumed record and
// not a record which has been created for producing, so call this at the start of each
// iteration of your processing for the record.
func (m *hookTracer) WithProcessSpan(r *kgo.Record) (context.Context, tracer.Span) {
// Set up the span options.
attrs := []interface{}{
messagingSystem,
semconv.MessagingSourceKindTopic,
semconv.MessagingSourceName(r.Topic),
semconv.MessagingOperationProcess,
semconv.MessagingKafkaSourcePartition(int(r.Partition)),
semconv.MessagingKafkaMessageOffset(int(r.Offset)),
}
attrs = maybeKeyAttr(attrs, r)
if m.clientID != "" {
attrs = append(attrs, semconv.MessagingKafkaClientIDKey.String(m.clientID))
}
if m.group != "" {
attrs = append(attrs, semconv.MessagingKafkaConsumerGroupKey.String(m.group))
}
opts := []tracer.SpanOption{
tracer.WithSpanLabels(attrs...),
tracer.WithSpanKind(tracer.SpanKindConsumer),
}
if r.Context == nil {
r.Context = context.Background()
}
md, ok := metadata.FromIncomingContext(r.Context)
if !ok {
md = metadata.New(len(r.Headers))
}
for _, h := range r.Headers {
md.Set(h.Key, string(h.Value))
}
// Start a new span using the provided context and options.
return m.tracer.Start(r.Context, "sdk.broker", opts...)
}
func maybeKeyAttr(attrs []interface{}, r *kgo.Record) []interface{} {
if r.Key == nil {
return attrs
}
var keykey string
if !utf8.Valid(r.Key) {
return attrs
}
keykey = string(r.Key)
return append(attrs, semconv.MessagingKafkaMessageKeyKey.String(keykey))
}