Compare commits
12 Commits
Author | SHA1 | Date | |
---|---|---|---|
63d82be92a | |||
cced3b84c1 | |||
|
82c1bda50f | ||
4b0b70e7d0 | |||
|
da88718e03 | ||
aed304b7e3 | |||
b796c43257 | |||
b0691edf82 | |||
d43b59f2f3 | |||
f976516d6b | |||
8f5c03ad60 | |||
|
9e9c1434ff |
@@ -1,2 +1,2 @@
|
|||||||
# micro-broker-kgo
|
# micro-broker-kgo
|
||||||

|

|
||||||
|
10
errors.go
Normal file
10
errors.go
Normal file
@@ -0,0 +1,10 @@
|
|||||||
|
package kgo
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
func isContextError(err error) bool {
|
||||||
|
return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)
|
||||||
|
}
|
4
go.mod
4
go.mod
@@ -3,6 +3,7 @@ module go.unistack.org/micro-broker-kgo/v4
|
|||||||
go 1.23.8
|
go 1.23.8
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
github.com/stretchr/testify v1.10.0
|
||||||
github.com/twmb/franz-go v1.19.5
|
github.com/twmb/franz-go v1.19.5
|
||||||
github.com/twmb/franz-go/pkg/kadm v1.16.0
|
github.com/twmb/franz-go/pkg/kadm v1.16.0
|
||||||
github.com/twmb/franz-go/pkg/kfake v0.0.0-20250508175730-72e1646135e3
|
github.com/twmb/franz-go/pkg/kfake v0.0.0-20250508175730-72e1646135e3
|
||||||
@@ -12,12 +13,15 @@ require (
|
|||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
|
||||||
github.com/google/uuid v1.6.0 // indirect
|
github.com/google/uuid v1.6.0 // indirect
|
||||||
github.com/klauspost/compress v1.18.0 // indirect
|
github.com/klauspost/compress v1.18.0 // indirect
|
||||||
github.com/matoous/go-nanoid v1.5.1 // indirect
|
github.com/matoous/go-nanoid v1.5.1 // indirect
|
||||||
github.com/pierrec/lz4/v4 v4.1.22 // indirect
|
github.com/pierrec/lz4/v4 v4.1.22 // indirect
|
||||||
|
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
|
||||||
github.com/spf13/cast v1.9.2 // indirect
|
github.com/spf13/cast v1.9.2 // indirect
|
||||||
go.unistack.org/micro-proto/v4 v4.1.0 // indirect
|
go.unistack.org/micro-proto/v4 v4.1.0 // indirect
|
||||||
golang.org/x/crypto v0.39.0 // indirect
|
golang.org/x/crypto v0.39.0 // indirect
|
||||||
google.golang.org/protobuf v1.36.6 // indirect
|
google.golang.org/protobuf v1.36.6 // indirect
|
||||||
|
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||||
)
|
)
|
||||||
|
15
go.sum
15
go.sum
@@ -20,16 +20,10 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI
|
|||||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
|
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
|
||||||
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
|
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
|
||||||
github.com/spf13/cast v1.8.0 h1:gEN9K4b8Xws4EX0+a0reLmhq8moKn7ntRlQYgjPeCDk=
|
|
||||||
github.com/spf13/cast v1.8.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
|
|
||||||
github.com/spf13/cast v1.9.2 h1:SsGfm7M8QOFtEzumm7UZrZdLLquNdzFYfIbEXntcFbE=
|
github.com/spf13/cast v1.9.2 h1:SsGfm7M8QOFtEzumm7UZrZdLLquNdzFYfIbEXntcFbE=
|
||||||
github.com/spf13/cast v1.9.2/go.mod h1:jNfB8QC9IA6ZuY2ZjDp0KtFO2LZZlg4S/7bzP6qqeHo=
|
github.com/spf13/cast v1.9.2/go.mod h1:jNfB8QC9IA6ZuY2ZjDp0KtFO2LZZlg4S/7bzP6qqeHo=
|
||||||
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
|
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
|
||||||
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||||
github.com/twmb/franz-go v1.19.1 h1:cOhDFUkGvUFHSQ7UYW6bO77BJa2fYEk5mA2AX+1NIdE=
|
|
||||||
github.com/twmb/franz-go v1.19.1/go.mod h1:4kFJ5tmbbl7asgwAGVuyG1ZMx0NNpYk7EqflvWfPCpM=
|
|
||||||
github.com/twmb/franz-go v1.19.4 h1:0ktflzm5YU7+YYdie8RQWFcU9uDJ03xLefplO1iMwO4=
|
|
||||||
github.com/twmb/franz-go v1.19.4/go.mod h1:4kFJ5tmbbl7asgwAGVuyG1ZMx0NNpYk7EqflvWfPCpM=
|
|
||||||
github.com/twmb/franz-go v1.19.5 h1:W7+o8D0RsQsedqib71OVlLeZ0zI6CbFra7yTYhZTs5Y=
|
github.com/twmb/franz-go v1.19.5 h1:W7+o8D0RsQsedqib71OVlLeZ0zI6CbFra7yTYhZTs5Y=
|
||||||
github.com/twmb/franz-go v1.19.5/go.mod h1:4kFJ5tmbbl7asgwAGVuyG1ZMx0NNpYk7EqflvWfPCpM=
|
github.com/twmb/franz-go v1.19.5/go.mod h1:4kFJ5tmbbl7asgwAGVuyG1ZMx0NNpYk7EqflvWfPCpM=
|
||||||
github.com/twmb/franz-go/pkg/kadm v1.16.0 h1:STMs1t5lYR5mR974PSiwNzE5TvsosByTp+rKXLOhAjE=
|
github.com/twmb/franz-go/pkg/kadm v1.16.0 h1:STMs1t5lYR5mR974PSiwNzE5TvsosByTp+rKXLOhAjE=
|
||||||
@@ -38,21 +32,18 @@ github.com/twmb/franz-go/pkg/kfake v0.0.0-20250508175730-72e1646135e3 h1:p24opKW
|
|||||||
github.com/twmb/franz-go/pkg/kfake v0.0.0-20250508175730-72e1646135e3/go.mod h1:7uQs3Ae6HkWT1Y9elMbqtAcNFCI0y6+iS+Phw49L49U=
|
github.com/twmb/franz-go/pkg/kfake v0.0.0-20250508175730-72e1646135e3/go.mod h1:7uQs3Ae6HkWT1Y9elMbqtAcNFCI0y6+iS+Phw49L49U=
|
||||||
github.com/twmb/franz-go/pkg/kmsg v1.11.2 h1:hIw75FpwcAjgeyfIGFqivAvwC5uNIOWRGvQgZhH4mhg=
|
github.com/twmb/franz-go/pkg/kmsg v1.11.2 h1:hIw75FpwcAjgeyfIGFqivAvwC5uNIOWRGvQgZhH4mhg=
|
||||||
github.com/twmb/franz-go/pkg/kmsg v1.11.2/go.mod h1:CFfkkLysDNmukPYhGzuUcDtf46gQSqCZHMW1T4Z+wDE=
|
github.com/twmb/franz-go/pkg/kmsg v1.11.2/go.mod h1:CFfkkLysDNmukPYhGzuUcDtf46gQSqCZHMW1T4Z+wDE=
|
||||||
go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ=
|
|
||||||
go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y=
|
|
||||||
go.opentelemetry.io/otel v1.36.0 h1:UumtzIklRBY6cI/lllNZlALOF5nNIzJVb16APdvgTXg=
|
go.opentelemetry.io/otel v1.36.0 h1:UumtzIklRBY6cI/lllNZlALOF5nNIzJVb16APdvgTXg=
|
||||||
go.opentelemetry.io/otel v1.36.0/go.mod h1:/TcFMXYjyRNh8khOAO9ybYkqaDBb/70aVwkNML4pP8E=
|
go.opentelemetry.io/otel v1.36.0/go.mod h1:/TcFMXYjyRNh8khOAO9ybYkqaDBb/70aVwkNML4pP8E=
|
||||||
go.unistack.org/micro-proto/v4 v4.1.0 h1:qPwL2n/oqh9RE3RTTDgt28XK3QzV597VugQPaw9lKUk=
|
go.unistack.org/micro-proto/v4 v4.1.0 h1:qPwL2n/oqh9RE3RTTDgt28XK3QzV597VugQPaw9lKUk=
|
||||||
go.unistack.org/micro-proto/v4 v4.1.0/go.mod h1:ArmK7o+uFvxSY3dbJhKBBX4Pm1rhWdLEFf3LxBrMtec=
|
go.unistack.org/micro-proto/v4 v4.1.0/go.mod h1:ArmK7o+uFvxSY3dbJhKBBX4Pm1rhWdLEFf3LxBrMtec=
|
||||||
go.unistack.org/micro/v4 v4.1.14 h1:6EotPq9kz/gaFb5YulHdKuuUwmj/7Hk44DpOlzh/A6k=
|
|
||||||
go.unistack.org/micro/v4 v4.1.14/go.mod h1:xleO2M5Yxh4s6I+RUcLrEpUjobefh+71ctrdIfn7TUs=
|
|
||||||
go.unistack.org/micro/v4 v4.1.17 h1:26QDtRSYVpozYuassyvLP4sEQRo3dxgD3sVILRXmIPo=
|
go.unistack.org/micro/v4 v4.1.17 h1:26QDtRSYVpozYuassyvLP4sEQRo3dxgD3sVILRXmIPo=
|
||||||
go.unistack.org/micro/v4 v4.1.17/go.mod h1:xleO2M5Yxh4s6I+RUcLrEpUjobefh+71ctrdIfn7TUs=
|
go.unistack.org/micro/v4 v4.1.17/go.mod h1:xleO2M5Yxh4s6I+RUcLrEpUjobefh+71ctrdIfn7TUs=
|
||||||
golang.org/x/crypto v0.38.0 h1:jt+WWG8IZlBnVbomuhg2Mdq0+BBQaHbtqHEFEigjUV8=
|
|
||||||
golang.org/x/crypto v0.38.0/go.mod h1:MvrbAqul58NNYPKnOra203SB9vpuZW0e+RRZV+Ggqjw=
|
|
||||||
golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM=
|
golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM=
|
||||||
golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U=
|
golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U=
|
||||||
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
|
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
|
||||||
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
|
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
|
||||||
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
|
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||||
|
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
||||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
|
@@ -26,50 +26,83 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func (m *hookEvent) OnGroupManageError(err error) {
|
func (m *hookEvent) OnGroupManageError(err error) {
|
||||||
if err != nil && !kgo.IsRetryableBrokerErr(err) {
|
switch {
|
||||||
m.connected.Store(0)
|
case err == nil || isContextError(err) || kgo.IsRetryableBrokerErr(err):
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
ctx := context.TODO()
|
||||||
|
logMsg := "kgo.OnGroupManageError"
|
||||||
|
|
||||||
if m.fatalOnError {
|
if m.fatalOnError {
|
||||||
m.log.Fatal(context.TODO(), "kgo.OnGroupManageError", err)
|
m.log.Fatal(ctx, logMsg, err)
|
||||||
|
} else {
|
||||||
|
m.log.Error(ctx, logMsg, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *hookEvent) OnBrokerConnect(_ kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) {
|
func (m *hookEvent) OnBrokerConnect(_ kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) {
|
||||||
if err != nil && !kgo.IsRetryableBrokerErr(err) {
|
switch {
|
||||||
m.connected.Store(0)
|
case err == nil || isContextError(err) || kgo.IsRetryableBrokerErr(err):
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
ctx := context.TODO()
|
||||||
|
logMsg := "kgo.OnBrokerConnect"
|
||||||
|
|
||||||
if m.fatalOnError {
|
if m.fatalOnError {
|
||||||
m.log.Fatal(context.TODO(), "kgo.OnBrokerConnect", err)
|
m.log.Fatal(ctx, logMsg, err)
|
||||||
|
} else {
|
||||||
|
m.log.Error(ctx, logMsg, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *hookEvent) OnBrokerDisconnect(_ kgo.BrokerMetadata, _ net.Conn) {
|
func (m *hookEvent) OnBrokerDisconnect(_ kgo.BrokerMetadata, _ net.Conn) {}
|
||||||
// m.connected.Store(0)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *hookEvent) OnBrokerWrite(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) {
|
func (m *hookEvent) OnBrokerWrite(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) {
|
||||||
if err != nil && !kgo.IsRetryableBrokerErr(err) {
|
switch {
|
||||||
m.connected.Store(0)
|
case err == nil || isContextError(err) || kgo.IsRetryableBrokerErr(err):
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
ctx := context.TODO()
|
||||||
|
logMsg := "kgo.OnBrokerWrite"
|
||||||
|
|
||||||
if m.fatalOnError {
|
if m.fatalOnError {
|
||||||
m.log.Fatal(context.TODO(), "kgo.OnBrokerWrite", err)
|
m.log.Fatal(ctx, logMsg, err)
|
||||||
|
} else {
|
||||||
|
m.log.Error(ctx, logMsg, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *hookEvent) OnBrokerRead(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) {
|
func (m *hookEvent) OnBrokerRead(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) {
|
||||||
if err != nil && !kgo.IsRetryableBrokerErr(err) {
|
switch {
|
||||||
m.connected.Store(0)
|
case err == nil || isContextError(err) || kgo.IsRetryableBrokerErr(err):
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
ctx := context.TODO()
|
||||||
|
logMsg := "kgo.OnBrokerRead"
|
||||||
|
|
||||||
if m.fatalOnError {
|
if m.fatalOnError {
|
||||||
m.log.Fatal(context.TODO(), "kgo.OnBrokerRead", err)
|
m.log.Fatal(ctx, logMsg, err)
|
||||||
|
} else {
|
||||||
|
m.log.Error(ctx, logMsg, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *hookEvent) OnProduceRecordUnbuffered(_ *kgo.Record, err error) {
|
func (m *hookEvent) OnProduceRecordUnbuffered(_ *kgo.Record, err error) {
|
||||||
if err != nil && !kgo.IsRetryableBrokerErr(err) {
|
switch {
|
||||||
m.connected.Store(0)
|
case err == nil || isContextError(err) || kgo.IsRetryableBrokerErr(err):
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
ctx := context.TODO()
|
||||||
|
logMsg := "kgo.OnProduceRecordUnbuffered"
|
||||||
|
|
||||||
if m.fatalOnError {
|
if m.fatalOnError {
|
||||||
m.log.Fatal(context.TODO(), "kgo.OnProduceRecordUnbuffered", err)
|
m.log.Fatal(ctx, logMsg, err)
|
||||||
|
} else {
|
||||||
|
m.log.Error(ctx, logMsg, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
471
hook_event_test.go
Normal file
471
hook_event_test.go
Normal file
@@ -0,0 +1,471 @@
|
|||||||
|
package kgo
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"io"
|
||||||
|
"net"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"github.com/twmb/franz-go/pkg/kgo"
|
||||||
|
"go.unistack.org/micro/v4/logger"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestHookEvent_OnGroupManageError(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
inputErr error
|
||||||
|
fatalOnError bool
|
||||||
|
expectedErrorIsCalled bool
|
||||||
|
expectedErrorMsg string
|
||||||
|
expectedFatalIsCalled bool
|
||||||
|
expectedFatalMsg string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "error is nil",
|
||||||
|
inputErr: nil,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "context canceled",
|
||||||
|
inputErr: context.Canceled,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "context deadline exceeded",
|
||||||
|
inputErr: context.DeadlineExceeded,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "retryable error: deadline exceeded (os package)",
|
||||||
|
inputErr: os.ErrDeadlineExceeded,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "retryable error: EOF (io package)",
|
||||||
|
inputErr: io.EOF,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "retryable error: closed network connection (net package)",
|
||||||
|
inputErr: net.ErrClosed,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "some error (non-fatal)",
|
||||||
|
inputErr: errors.New("some error"),
|
||||||
|
fatalOnError: false,
|
||||||
|
expectedErrorIsCalled: true,
|
||||||
|
expectedErrorMsg: "kgo.OnGroupManageError",
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "some error (fatal)",
|
||||||
|
inputErr: errors.New("some error"),
|
||||||
|
fatalOnError: true,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: true,
|
||||||
|
expectedFatalMsg: "kgo.OnGroupManageError",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
log := &mockLogger{}
|
||||||
|
he := &hookEvent{log: log, fatalOnError: tt.fatalOnError}
|
||||||
|
he.OnGroupManageError(tt.inputErr)
|
||||||
|
require.Equal(t, tt.expectedErrorIsCalled, log.errorIsCalled)
|
||||||
|
require.Equal(t, tt.expectedErrorMsg, log.errorMsg)
|
||||||
|
require.Equal(t, tt.expectedFatalIsCalled, log.fatalIsCalled)
|
||||||
|
require.Equal(t, tt.expectedFatalMsg, log.fatalMsg)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHookEvent_OnBrokerConnect(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
inputErr error
|
||||||
|
fatalOnError bool
|
||||||
|
expectedErrorIsCalled bool
|
||||||
|
expectedErrorMsg string
|
||||||
|
expectedFatalIsCalled bool
|
||||||
|
expectedFatalMsg string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "error is nil",
|
||||||
|
inputErr: nil,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "context canceled",
|
||||||
|
inputErr: context.Canceled,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "context deadline exceeded",
|
||||||
|
inputErr: context.DeadlineExceeded,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "retryable error: deadline exceeded (os package)",
|
||||||
|
inputErr: os.ErrDeadlineExceeded,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "retryable error: EOF (io package)",
|
||||||
|
inputErr: io.EOF,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "retryable error: closed network connection (net package)",
|
||||||
|
inputErr: net.ErrClosed,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "some error (non-fatal)",
|
||||||
|
inputErr: errors.New("some error"),
|
||||||
|
fatalOnError: false,
|
||||||
|
expectedErrorIsCalled: true,
|
||||||
|
expectedErrorMsg: "kgo.OnBrokerConnect",
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "some error (fatal)",
|
||||||
|
inputErr: errors.New("some error"),
|
||||||
|
fatalOnError: true,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: true,
|
||||||
|
expectedFatalMsg: "kgo.OnBrokerConnect",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
log := &mockLogger{}
|
||||||
|
he := &hookEvent{log: log, fatalOnError: tt.fatalOnError}
|
||||||
|
he.OnBrokerConnect(kgo.BrokerMetadata{}, 0, nil, tt.inputErr)
|
||||||
|
require.Equal(t, tt.expectedErrorIsCalled, log.errorIsCalled)
|
||||||
|
require.Equal(t, tt.expectedErrorMsg, log.errorMsg)
|
||||||
|
require.Equal(t, tt.expectedFatalIsCalled, log.fatalIsCalled)
|
||||||
|
require.Equal(t, tt.expectedFatalMsg, log.fatalMsg)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHookEvent_OnBrokerWrite(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
inputErr error
|
||||||
|
fatalOnError bool
|
||||||
|
expectedErrorIsCalled bool
|
||||||
|
expectedErrorMsg string
|
||||||
|
expectedFatalIsCalled bool
|
||||||
|
expectedFatalMsg string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "error is nil",
|
||||||
|
inputErr: nil,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "context canceled",
|
||||||
|
inputErr: context.Canceled,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "context deadline exceeded",
|
||||||
|
inputErr: context.DeadlineExceeded,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "retryable error: deadline exceeded (os package)",
|
||||||
|
inputErr: os.ErrDeadlineExceeded,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "retryable error: EOF (io package)",
|
||||||
|
inputErr: io.EOF,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "retryable error: closed network connection (net package)",
|
||||||
|
inputErr: net.ErrClosed,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "some error (non-fatal)",
|
||||||
|
inputErr: errors.New("some error"),
|
||||||
|
fatalOnError: false,
|
||||||
|
expectedErrorIsCalled: true,
|
||||||
|
expectedErrorMsg: "kgo.OnBrokerWrite",
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "some error (fatal)",
|
||||||
|
inputErr: errors.New("some error"),
|
||||||
|
fatalOnError: true,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: true,
|
||||||
|
expectedFatalMsg: "kgo.OnBrokerWrite",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
log := &mockLogger{}
|
||||||
|
he := &hookEvent{log: log, fatalOnError: tt.fatalOnError}
|
||||||
|
he.OnBrokerWrite(kgo.BrokerMetadata{}, 0, 0, 0, 0, tt.inputErr)
|
||||||
|
require.Equal(t, tt.expectedErrorIsCalled, log.errorIsCalled)
|
||||||
|
require.Equal(t, tt.expectedErrorMsg, log.errorMsg)
|
||||||
|
require.Equal(t, tt.expectedFatalIsCalled, log.fatalIsCalled)
|
||||||
|
require.Equal(t, tt.expectedFatalMsg, log.fatalMsg)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHookEvent_OnBrokerRead(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
inputErr error
|
||||||
|
fatalOnError bool
|
||||||
|
expectedErrorIsCalled bool
|
||||||
|
expectedErrorMsg string
|
||||||
|
expectedFatalIsCalled bool
|
||||||
|
expectedFatalMsg string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "error is nil",
|
||||||
|
inputErr: nil,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "context canceled",
|
||||||
|
inputErr: context.Canceled,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "context deadline exceeded",
|
||||||
|
inputErr: context.DeadlineExceeded,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "retryable error: deadline exceeded (os package)",
|
||||||
|
inputErr: os.ErrDeadlineExceeded,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "retryable error: EOF (io package)",
|
||||||
|
inputErr: io.EOF,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "retryable error: closed network connection (net package)",
|
||||||
|
inputErr: net.ErrClosed,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "some error (non-fatal)",
|
||||||
|
inputErr: errors.New("some error"),
|
||||||
|
fatalOnError: false,
|
||||||
|
expectedErrorIsCalled: true,
|
||||||
|
expectedErrorMsg: "kgo.OnBrokerRead",
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "some error (fatal)",
|
||||||
|
inputErr: errors.New("some error"),
|
||||||
|
fatalOnError: true,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: true,
|
||||||
|
expectedFatalMsg: "kgo.OnBrokerRead",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
log := &mockLogger{}
|
||||||
|
he := &hookEvent{log: log, fatalOnError: tt.fatalOnError}
|
||||||
|
he.OnBrokerRead(kgo.BrokerMetadata{}, 0, 0, 0, 0, tt.inputErr)
|
||||||
|
require.Equal(t, tt.expectedErrorIsCalled, log.errorIsCalled)
|
||||||
|
require.Equal(t, tt.expectedErrorMsg, log.errorMsg)
|
||||||
|
require.Equal(t, tt.expectedFatalIsCalled, log.fatalIsCalled)
|
||||||
|
require.Equal(t, tt.expectedFatalMsg, log.fatalMsg)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHookEvent_OnProduceRecordUnbuffered(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
inputErr error
|
||||||
|
fatalOnError bool
|
||||||
|
expectedErrorIsCalled bool
|
||||||
|
expectedErrorMsg string
|
||||||
|
expectedFatalIsCalled bool
|
||||||
|
expectedFatalMsg string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "error is nil",
|
||||||
|
inputErr: nil,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "context canceled",
|
||||||
|
inputErr: context.Canceled,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "context deadline exceeded",
|
||||||
|
inputErr: context.DeadlineExceeded,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "retryable error: deadline exceeded (os package)",
|
||||||
|
inputErr: os.ErrDeadlineExceeded,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "retryable error: EOF (io package)",
|
||||||
|
inputErr: io.EOF,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "retryable error: closed network connection (net package)",
|
||||||
|
inputErr: net.ErrClosed,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "some error (non-fatal)",
|
||||||
|
inputErr: errors.New("some error"),
|
||||||
|
fatalOnError: false,
|
||||||
|
expectedErrorIsCalled: true,
|
||||||
|
expectedErrorMsg: "kgo.OnProduceRecordUnbuffered",
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "some error (fatal)",
|
||||||
|
inputErr: errors.New("some error"),
|
||||||
|
fatalOnError: true,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: true,
|
||||||
|
expectedFatalMsg: "kgo.OnProduceRecordUnbuffered",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
log := &mockLogger{}
|
||||||
|
he := &hookEvent{log: log, fatalOnError: tt.fatalOnError}
|
||||||
|
he.OnProduceRecordUnbuffered(&kgo.Record{}, tt.inputErr)
|
||||||
|
require.Equal(t, tt.expectedErrorIsCalled, log.errorIsCalled)
|
||||||
|
require.Equal(t, tt.expectedErrorMsg, log.errorMsg)
|
||||||
|
require.Equal(t, tt.expectedFatalIsCalled, log.fatalIsCalled)
|
||||||
|
require.Equal(t, tt.expectedFatalMsg, log.fatalMsg)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mocks
|
||||||
|
|
||||||
|
type mockLogger struct {
|
||||||
|
errorIsCalled bool
|
||||||
|
errorMsg string
|
||||||
|
|
||||||
|
fatalIsCalled bool
|
||||||
|
fatalMsg string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockLogger) Init(...logger.Option) error {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockLogger) Clone(...logger.Option) logger.Logger {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockLogger) V(logger.Level) bool {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockLogger) Level(logger.Level) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockLogger) Options() logger.Options {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockLogger) Fields(...interface{}) logger.Logger {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockLogger) Info(context.Context, string, ...interface{}) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockLogger) Trace(context.Context, string, ...interface{}) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockLogger) Debug(context.Context, string, ...interface{}) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockLogger) Warn(context.Context, string, ...interface{}) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockLogger) Error(ctx context.Context, msg string, args ...interface{}) {
|
||||||
|
m.errorIsCalled = true
|
||||||
|
m.errorMsg = msg
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockLogger) Fatal(ctx context.Context, msg string, args ...interface{}) {
|
||||||
|
m.fatalIsCalled = true
|
||||||
|
m.fatalMsg = msg
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockLogger) Log(context.Context, logger.Level, string, ...interface{}) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockLogger) Name() string {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockLogger) String() string {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
62
kgo.go
62
kgo.go
@@ -156,7 +156,7 @@ func (b *Broker) NewMessage(ctx context.Context, hdr metadata.Metadata, body int
|
|||||||
options.ContentType = b.opts.ContentType
|
options.ContentType = b.opts.ContentType
|
||||||
}
|
}
|
||||||
|
|
||||||
m := &kgoMessage{ctx: ctx, hdr: hdr, opts: options}
|
m := &kgoMessage{ctx: ctx, hdr: hdr.Copy(), opts: options}
|
||||||
c, err := b.newCodec(m.opts.ContentType)
|
c, err := b.newCodec(m.opts.ContentType)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
m.body, err = c.Marshal(body)
|
m.body, err = c.Marshal(body)
|
||||||
@@ -165,6 +165,8 @@ func (b *Broker) NewMessage(ctx context.Context, hdr metadata.Metadata, body int
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
m.hdr.Set(metadata.HeaderContentType, m.opts.ContentType)
|
||||||
|
|
||||||
return m, nil
|
return m, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -344,40 +346,44 @@ func (b *Broker) fnPublish(ctx context.Context, topic string, messages ...broker
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (b *Broker) publish(ctx context.Context, topic string, messages ...broker.Message) error {
|
func (b *Broker) publish(ctx context.Context, topic string, messages ...broker.Message) error {
|
||||||
records := make([]*kgo.Record, 0, len(messages))
|
var records []*kgo.Record
|
||||||
var errs []string
|
|
||||||
var key []byte
|
|
||||||
var promise func(*kgo.Record, error)
|
|
||||||
|
|
||||||
for _, msg := range messages {
|
for _, msg := range messages {
|
||||||
if mctx := msg.Context(); mctx != nil {
|
|
||||||
if k, ok := mctx.Value(messageKey{}).([]byte); ok && k != nil {
|
|
||||||
key = k
|
|
||||||
}
|
|
||||||
if p, ok := mctx.Value(messagePromiseKey{}).(func(*kgo.Record, error)); ok && p != nil {
|
|
||||||
promise = p
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
rec := &kgo.Record{
|
rec := &kgo.Record{
|
||||||
Context: ctx,
|
Context: msg.Context(),
|
||||||
Key: key,
|
|
||||||
Topic: topic,
|
Topic: topic,
|
||||||
Value: msg.Body(),
|
Value: msg.Body(),
|
||||||
}
|
}
|
||||||
|
|
||||||
b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Inc()
|
var promise func(*kgo.Record, error)
|
||||||
|
if rec.Context != nil {
|
||||||
|
if k, ok := rec.Context.Value(messageKey{}).([]byte); ok && k != nil {
|
||||||
|
rec.Key = k
|
||||||
|
}
|
||||||
|
if p, ok := rec.Context.Value(messagePromiseKey{}).(func(*kgo.Record, error)); ok && p != nil {
|
||||||
|
promise = p
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
kmsg, ok := msg.(*kgoMessage)
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if kmsg.opts.Context != nil {
|
||||||
|
if k, ok := kmsg.opts.Context.Value(messageKey{}).([]byte); ok && k != nil {
|
||||||
|
rec.Key = k
|
||||||
|
}
|
||||||
|
if p, ok := kmsg.opts.Context.Value(messagePromiseKey{}).(func(*kgo.Record, error)); ok && p != nil {
|
||||||
|
promise = p
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
setHeaders(rec, msg.Header())
|
setHeaders(rec, msg.Header())
|
||||||
|
|
||||||
records = append(records, rec)
|
|
||||||
}
|
|
||||||
|
|
||||||
ts := time.Now()
|
|
||||||
|
|
||||||
if promise != nil {
|
if promise != nil {
|
||||||
|
ts := time.Now()
|
||||||
for _, rec := range records {
|
b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Inc()
|
||||||
b.c.Produce(ctx, rec, func(r *kgo.Record, err error) {
|
b.c.Produce(ctx, rec, func(r *kgo.Record, err error) {
|
||||||
te := time.Since(ts)
|
te := time.Since(ts)
|
||||||
b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Dec()
|
b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Dec()
|
||||||
@@ -390,12 +396,17 @@ func (b *Broker) publish(ctx context.Context, topic string, messages ...broker.M
|
|||||||
}
|
}
|
||||||
promise(r, err)
|
promise(r, err)
|
||||||
})
|
})
|
||||||
|
continue
|
||||||
|
} else {
|
||||||
|
records = append(records, rec)
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(records) > 0 {
|
||||||
|
var errs []string
|
||||||
|
ts := time.Now()
|
||||||
|
b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", topic, "topic", topic).Set(uint64(len(records)))
|
||||||
results := b.c.ProduceSync(ctx, records...)
|
results := b.c.ProduceSync(ctx, records...)
|
||||||
|
|
||||||
te := time.Since(ts)
|
te := time.Since(ts)
|
||||||
for _, result := range results {
|
for _, result := range results {
|
||||||
b.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Update(te.Seconds())
|
b.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Update(te.Seconds())
|
||||||
@@ -412,6 +423,7 @@ func (b *Broker) publish(ctx context.Context, topic string, messages ...broker.M
|
|||||||
if len(errs) > 0 {
|
if len(errs) > 0 {
|
||||||
return fmt.Errorf("publish error: %s", strings.Join(errs, "\n"))
|
return fmt.Errorf("publish error: %s", strings.Join(errs, "\n"))
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@@ -81,7 +81,7 @@ func TestFail(t *testing.T) {
|
|||||||
go func() {
|
go func() {
|
||||||
for _, msg := range msgs {
|
for _, msg := range msgs {
|
||||||
// t.Logf("broker publish")
|
// t.Logf("broker publish")
|
||||||
if err := b.Publish(ctx, "test", msg); err != nil {
|
if err := b.Publish(ctx, "test.fail", msg); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -96,7 +96,7 @@ func TestFail(t *testing.T) {
|
|||||||
return msg.Ack()
|
return msg.Ack()
|
||||||
}
|
}
|
||||||
|
|
||||||
sub, err := b.Subscribe(ctx, "test", fn,
|
sub, err := b.Subscribe(ctx, "test.fail", fn,
|
||||||
broker.SubscribeAutoAck(true),
|
broker.SubscribeAutoAck(true),
|
||||||
broker.SubscribeGroup(group),
|
broker.SubscribeGroup(group),
|
||||||
broker.SubscribeBodyOnly(true))
|
broker.SubscribeBodyOnly(true))
|
||||||
@@ -184,7 +184,7 @@ func TestPubSub(t *testing.T) {
|
|||||||
msgs = append(msgs, m)
|
msgs = append(msgs, m)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := b.Publish(ctx, "test", msgs...); err != nil {
|
if err := b.Publish(ctx, "test.pubsub", msgs...); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
// t.Skip()
|
// t.Skip()
|
||||||
@@ -197,7 +197,7 @@ func TestPubSub(t *testing.T) {
|
|||||||
return msg.Ack()
|
return msg.Ack()
|
||||||
}
|
}
|
||||||
|
|
||||||
sub, err := b.Subscribe(ctx, "test", fn,
|
sub, err := b.Subscribe(ctx, "test.pubsub", fn,
|
||||||
broker.SubscribeAutoAck(true),
|
broker.SubscribeAutoAck(true),
|
||||||
broker.SubscribeGroup(group),
|
broker.SubscribeGroup(group),
|
||||||
broker.SubscribeBodyOnly(true))
|
broker.SubscribeBodyOnly(true))
|
||||||
|
@@ -48,7 +48,6 @@ type Subscriber struct {
|
|||||||
done chan struct{}
|
done chan struct{}
|
||||||
kopts broker.Options
|
kopts broker.Options
|
||||||
opts broker.SubscribeOptions
|
opts broker.SubscribeOptions
|
||||||
|
|
||||||
connected *atomic.Uint32
|
connected *atomic.Uint32
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
closed bool
|
closed bool
|
||||||
@@ -272,7 +271,11 @@ func (pc *consumer) consume() {
|
|||||||
for _, hdr := range record.Headers {
|
for _, hdr := range record.Headers {
|
||||||
pm.hdr.Set(hdr.Key, string(hdr.Value))
|
pm.hdr.Set(hdr.Key, string(hdr.Value))
|
||||||
}
|
}
|
||||||
|
pm.hdr.Set("Micro-Offset", strconv.FormatInt(record.Offset, 10))
|
||||||
|
pm.hdr.Set("Micro-Partition", strconv.FormatInt(int64(record.Partition), 10))
|
||||||
|
pm.hdr.Set("Micro-Topic", record.Topic)
|
||||||
|
pm.hdr.Set("Micro-Key", string(record.Key))
|
||||||
|
pm.hdr.Set("Micro-Timestamp", strconv.FormatInt(record.Timestamp.Unix(), 10))
|
||||||
switch h := pc.handler.(type) {
|
switch h := pc.handler.(type) {
|
||||||
case func(broker.Message) error:
|
case func(broker.Message) error:
|
||||||
err = h(pm)
|
err = h(pm)
|
||||||
|
Reference in New Issue
Block a user