Compare commits
6 Commits
b796c43257
...
v4.1.10
| Author | SHA1 | Date | |
|---|---|---|---|
| 63d82be92a | |||
| cced3b84c1 | |||
|
|
82c1bda50f | ||
| 4b0b70e7d0 | |||
|
|
da88718e03 | ||
| aed304b7e3 |
@@ -1,2 +1,2 @@
|
|||||||
# micro-broker-kgo
|
# micro-broker-kgo
|
||||||

|

|
||||||
|
|||||||
10
errors.go
Normal file
10
errors.go
Normal file
@@ -0,0 +1,10 @@
|
|||||||
|
package kgo
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
func isContextError(err error) bool {
|
||||||
|
return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)
|
||||||
|
}
|
||||||
4
go.mod
4
go.mod
@@ -3,6 +3,7 @@ module go.unistack.org/micro-broker-kgo/v4
|
|||||||
go 1.23.8
|
go 1.23.8
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
github.com/stretchr/testify v1.10.0
|
||||||
github.com/twmb/franz-go v1.19.5
|
github.com/twmb/franz-go v1.19.5
|
||||||
github.com/twmb/franz-go/pkg/kadm v1.16.0
|
github.com/twmb/franz-go/pkg/kadm v1.16.0
|
||||||
github.com/twmb/franz-go/pkg/kfake v0.0.0-20250508175730-72e1646135e3
|
github.com/twmb/franz-go/pkg/kfake v0.0.0-20250508175730-72e1646135e3
|
||||||
@@ -12,12 +13,15 @@ require (
|
|||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
|
||||||
github.com/google/uuid v1.6.0 // indirect
|
github.com/google/uuid v1.6.0 // indirect
|
||||||
github.com/klauspost/compress v1.18.0 // indirect
|
github.com/klauspost/compress v1.18.0 // indirect
|
||||||
github.com/matoous/go-nanoid v1.5.1 // indirect
|
github.com/matoous/go-nanoid v1.5.1 // indirect
|
||||||
github.com/pierrec/lz4/v4 v4.1.22 // indirect
|
github.com/pierrec/lz4/v4 v4.1.22 // indirect
|
||||||
|
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
|
||||||
github.com/spf13/cast v1.9.2 // indirect
|
github.com/spf13/cast v1.9.2 // indirect
|
||||||
go.unistack.org/micro-proto/v4 v4.1.0 // indirect
|
go.unistack.org/micro-proto/v4 v4.1.0 // indirect
|
||||||
golang.org/x/crypto v0.39.0 // indirect
|
golang.org/x/crypto v0.39.0 // indirect
|
||||||
google.golang.org/protobuf v1.36.6 // indirect
|
google.golang.org/protobuf v1.36.6 // indirect
|
||||||
|
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||||
)
|
)
|
||||||
|
|||||||
3
go.sum
3
go.sum
@@ -42,5 +42,8 @@ golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM=
|
|||||||
golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U=
|
golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U=
|
||||||
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
|
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
|
||||||
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
|
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
|
||||||
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
|
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||||
|
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
||||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
|
|||||||
@@ -26,50 +26,83 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func (m *hookEvent) OnGroupManageError(err error) {
|
func (m *hookEvent) OnGroupManageError(err error) {
|
||||||
if err != nil && !kgo.IsRetryableBrokerErr(err) {
|
switch {
|
||||||
m.connected.Store(0)
|
case err == nil || isContextError(err) || kgo.IsRetryableBrokerErr(err):
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
ctx := context.TODO()
|
||||||
|
logMsg := "kgo.OnGroupManageError"
|
||||||
|
|
||||||
if m.fatalOnError {
|
if m.fatalOnError {
|
||||||
m.log.Fatal(context.TODO(), "kgo.OnGroupManageError", err)
|
m.log.Fatal(ctx, logMsg, err)
|
||||||
|
} else {
|
||||||
|
m.log.Error(ctx, logMsg, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *hookEvent) OnBrokerConnect(_ kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) {
|
func (m *hookEvent) OnBrokerConnect(_ kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) {
|
||||||
if err != nil && !kgo.IsRetryableBrokerErr(err) {
|
switch {
|
||||||
m.connected.Store(0)
|
case err == nil || isContextError(err) || kgo.IsRetryableBrokerErr(err):
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
ctx := context.TODO()
|
||||||
|
logMsg := "kgo.OnBrokerConnect"
|
||||||
|
|
||||||
if m.fatalOnError {
|
if m.fatalOnError {
|
||||||
m.log.Fatal(context.TODO(), "kgo.OnBrokerConnect", err)
|
m.log.Fatal(ctx, logMsg, err)
|
||||||
|
} else {
|
||||||
|
m.log.Error(ctx, logMsg, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *hookEvent) OnBrokerDisconnect(_ kgo.BrokerMetadata, _ net.Conn) {
|
func (m *hookEvent) OnBrokerDisconnect(_ kgo.BrokerMetadata, _ net.Conn) {}
|
||||||
// m.connected.Store(0)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *hookEvent) OnBrokerWrite(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) {
|
func (m *hookEvent) OnBrokerWrite(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) {
|
||||||
if err != nil && !kgo.IsRetryableBrokerErr(err) {
|
switch {
|
||||||
m.connected.Store(0)
|
case err == nil || isContextError(err) || kgo.IsRetryableBrokerErr(err):
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
ctx := context.TODO()
|
||||||
|
logMsg := "kgo.OnBrokerWrite"
|
||||||
|
|
||||||
if m.fatalOnError {
|
if m.fatalOnError {
|
||||||
m.log.Fatal(context.TODO(), "kgo.OnBrokerWrite", err)
|
m.log.Fatal(ctx, logMsg, err)
|
||||||
|
} else {
|
||||||
|
m.log.Error(ctx, logMsg, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *hookEvent) OnBrokerRead(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) {
|
func (m *hookEvent) OnBrokerRead(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) {
|
||||||
if err != nil && !kgo.IsRetryableBrokerErr(err) {
|
switch {
|
||||||
m.connected.Store(0)
|
case err == nil || isContextError(err) || kgo.IsRetryableBrokerErr(err):
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
ctx := context.TODO()
|
||||||
|
logMsg := "kgo.OnBrokerRead"
|
||||||
|
|
||||||
if m.fatalOnError {
|
if m.fatalOnError {
|
||||||
m.log.Fatal(context.TODO(), "kgo.OnBrokerRead", err)
|
m.log.Fatal(ctx, logMsg, err)
|
||||||
|
} else {
|
||||||
|
m.log.Error(ctx, logMsg, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *hookEvent) OnProduceRecordUnbuffered(_ *kgo.Record, err error) {
|
func (m *hookEvent) OnProduceRecordUnbuffered(_ *kgo.Record, err error) {
|
||||||
if err != nil && !kgo.IsRetryableBrokerErr(err) {
|
switch {
|
||||||
m.connected.Store(0)
|
case err == nil || isContextError(err) || kgo.IsRetryableBrokerErr(err):
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
ctx := context.TODO()
|
||||||
|
logMsg := "kgo.OnProduceRecordUnbuffered"
|
||||||
|
|
||||||
if m.fatalOnError {
|
if m.fatalOnError {
|
||||||
m.log.Fatal(context.TODO(), "kgo.OnProduceRecordUnbuffered", err)
|
m.log.Fatal(ctx, logMsg, err)
|
||||||
|
} else {
|
||||||
|
m.log.Error(ctx, logMsg, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
471
hook_event_test.go
Normal file
471
hook_event_test.go
Normal file
@@ -0,0 +1,471 @@
|
|||||||
|
package kgo
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"io"
|
||||||
|
"net"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"github.com/twmb/franz-go/pkg/kgo"
|
||||||
|
"go.unistack.org/micro/v4/logger"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestHookEvent_OnGroupManageError(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
inputErr error
|
||||||
|
fatalOnError bool
|
||||||
|
expectedErrorIsCalled bool
|
||||||
|
expectedErrorMsg string
|
||||||
|
expectedFatalIsCalled bool
|
||||||
|
expectedFatalMsg string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "error is nil",
|
||||||
|
inputErr: nil,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "context canceled",
|
||||||
|
inputErr: context.Canceled,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "context deadline exceeded",
|
||||||
|
inputErr: context.DeadlineExceeded,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "retryable error: deadline exceeded (os package)",
|
||||||
|
inputErr: os.ErrDeadlineExceeded,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "retryable error: EOF (io package)",
|
||||||
|
inputErr: io.EOF,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "retryable error: closed network connection (net package)",
|
||||||
|
inputErr: net.ErrClosed,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "some error (non-fatal)",
|
||||||
|
inputErr: errors.New("some error"),
|
||||||
|
fatalOnError: false,
|
||||||
|
expectedErrorIsCalled: true,
|
||||||
|
expectedErrorMsg: "kgo.OnGroupManageError",
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "some error (fatal)",
|
||||||
|
inputErr: errors.New("some error"),
|
||||||
|
fatalOnError: true,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: true,
|
||||||
|
expectedFatalMsg: "kgo.OnGroupManageError",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
log := &mockLogger{}
|
||||||
|
he := &hookEvent{log: log, fatalOnError: tt.fatalOnError}
|
||||||
|
he.OnGroupManageError(tt.inputErr)
|
||||||
|
require.Equal(t, tt.expectedErrorIsCalled, log.errorIsCalled)
|
||||||
|
require.Equal(t, tt.expectedErrorMsg, log.errorMsg)
|
||||||
|
require.Equal(t, tt.expectedFatalIsCalled, log.fatalIsCalled)
|
||||||
|
require.Equal(t, tt.expectedFatalMsg, log.fatalMsg)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHookEvent_OnBrokerConnect(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
inputErr error
|
||||||
|
fatalOnError bool
|
||||||
|
expectedErrorIsCalled bool
|
||||||
|
expectedErrorMsg string
|
||||||
|
expectedFatalIsCalled bool
|
||||||
|
expectedFatalMsg string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "error is nil",
|
||||||
|
inputErr: nil,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "context canceled",
|
||||||
|
inputErr: context.Canceled,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "context deadline exceeded",
|
||||||
|
inputErr: context.DeadlineExceeded,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "retryable error: deadline exceeded (os package)",
|
||||||
|
inputErr: os.ErrDeadlineExceeded,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "retryable error: EOF (io package)",
|
||||||
|
inputErr: io.EOF,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "retryable error: closed network connection (net package)",
|
||||||
|
inputErr: net.ErrClosed,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "some error (non-fatal)",
|
||||||
|
inputErr: errors.New("some error"),
|
||||||
|
fatalOnError: false,
|
||||||
|
expectedErrorIsCalled: true,
|
||||||
|
expectedErrorMsg: "kgo.OnBrokerConnect",
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "some error (fatal)",
|
||||||
|
inputErr: errors.New("some error"),
|
||||||
|
fatalOnError: true,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: true,
|
||||||
|
expectedFatalMsg: "kgo.OnBrokerConnect",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
log := &mockLogger{}
|
||||||
|
he := &hookEvent{log: log, fatalOnError: tt.fatalOnError}
|
||||||
|
he.OnBrokerConnect(kgo.BrokerMetadata{}, 0, nil, tt.inputErr)
|
||||||
|
require.Equal(t, tt.expectedErrorIsCalled, log.errorIsCalled)
|
||||||
|
require.Equal(t, tt.expectedErrorMsg, log.errorMsg)
|
||||||
|
require.Equal(t, tt.expectedFatalIsCalled, log.fatalIsCalled)
|
||||||
|
require.Equal(t, tt.expectedFatalMsg, log.fatalMsg)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHookEvent_OnBrokerWrite(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
inputErr error
|
||||||
|
fatalOnError bool
|
||||||
|
expectedErrorIsCalled bool
|
||||||
|
expectedErrorMsg string
|
||||||
|
expectedFatalIsCalled bool
|
||||||
|
expectedFatalMsg string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "error is nil",
|
||||||
|
inputErr: nil,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "context canceled",
|
||||||
|
inputErr: context.Canceled,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "context deadline exceeded",
|
||||||
|
inputErr: context.DeadlineExceeded,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "retryable error: deadline exceeded (os package)",
|
||||||
|
inputErr: os.ErrDeadlineExceeded,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "retryable error: EOF (io package)",
|
||||||
|
inputErr: io.EOF,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "retryable error: closed network connection (net package)",
|
||||||
|
inputErr: net.ErrClosed,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "some error (non-fatal)",
|
||||||
|
inputErr: errors.New("some error"),
|
||||||
|
fatalOnError: false,
|
||||||
|
expectedErrorIsCalled: true,
|
||||||
|
expectedErrorMsg: "kgo.OnBrokerWrite",
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "some error (fatal)",
|
||||||
|
inputErr: errors.New("some error"),
|
||||||
|
fatalOnError: true,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: true,
|
||||||
|
expectedFatalMsg: "kgo.OnBrokerWrite",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
log := &mockLogger{}
|
||||||
|
he := &hookEvent{log: log, fatalOnError: tt.fatalOnError}
|
||||||
|
he.OnBrokerWrite(kgo.BrokerMetadata{}, 0, 0, 0, 0, tt.inputErr)
|
||||||
|
require.Equal(t, tt.expectedErrorIsCalled, log.errorIsCalled)
|
||||||
|
require.Equal(t, tt.expectedErrorMsg, log.errorMsg)
|
||||||
|
require.Equal(t, tt.expectedFatalIsCalled, log.fatalIsCalled)
|
||||||
|
require.Equal(t, tt.expectedFatalMsg, log.fatalMsg)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHookEvent_OnBrokerRead(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
inputErr error
|
||||||
|
fatalOnError bool
|
||||||
|
expectedErrorIsCalled bool
|
||||||
|
expectedErrorMsg string
|
||||||
|
expectedFatalIsCalled bool
|
||||||
|
expectedFatalMsg string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "error is nil",
|
||||||
|
inputErr: nil,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "context canceled",
|
||||||
|
inputErr: context.Canceled,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "context deadline exceeded",
|
||||||
|
inputErr: context.DeadlineExceeded,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "retryable error: deadline exceeded (os package)",
|
||||||
|
inputErr: os.ErrDeadlineExceeded,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "retryable error: EOF (io package)",
|
||||||
|
inputErr: io.EOF,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "retryable error: closed network connection (net package)",
|
||||||
|
inputErr: net.ErrClosed,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "some error (non-fatal)",
|
||||||
|
inputErr: errors.New("some error"),
|
||||||
|
fatalOnError: false,
|
||||||
|
expectedErrorIsCalled: true,
|
||||||
|
expectedErrorMsg: "kgo.OnBrokerRead",
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "some error (fatal)",
|
||||||
|
inputErr: errors.New("some error"),
|
||||||
|
fatalOnError: true,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: true,
|
||||||
|
expectedFatalMsg: "kgo.OnBrokerRead",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
log := &mockLogger{}
|
||||||
|
he := &hookEvent{log: log, fatalOnError: tt.fatalOnError}
|
||||||
|
he.OnBrokerRead(kgo.BrokerMetadata{}, 0, 0, 0, 0, tt.inputErr)
|
||||||
|
require.Equal(t, tt.expectedErrorIsCalled, log.errorIsCalled)
|
||||||
|
require.Equal(t, tt.expectedErrorMsg, log.errorMsg)
|
||||||
|
require.Equal(t, tt.expectedFatalIsCalled, log.fatalIsCalled)
|
||||||
|
require.Equal(t, tt.expectedFatalMsg, log.fatalMsg)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHookEvent_OnProduceRecordUnbuffered(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
inputErr error
|
||||||
|
fatalOnError bool
|
||||||
|
expectedErrorIsCalled bool
|
||||||
|
expectedErrorMsg string
|
||||||
|
expectedFatalIsCalled bool
|
||||||
|
expectedFatalMsg string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "error is nil",
|
||||||
|
inputErr: nil,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "context canceled",
|
||||||
|
inputErr: context.Canceled,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "context deadline exceeded",
|
||||||
|
inputErr: context.DeadlineExceeded,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "retryable error: deadline exceeded (os package)",
|
||||||
|
inputErr: os.ErrDeadlineExceeded,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "retryable error: EOF (io package)",
|
||||||
|
inputErr: io.EOF,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "retryable error: closed network connection (net package)",
|
||||||
|
inputErr: net.ErrClosed,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "some error (non-fatal)",
|
||||||
|
inputErr: errors.New("some error"),
|
||||||
|
fatalOnError: false,
|
||||||
|
expectedErrorIsCalled: true,
|
||||||
|
expectedErrorMsg: "kgo.OnProduceRecordUnbuffered",
|
||||||
|
expectedFatalIsCalled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "some error (fatal)",
|
||||||
|
inputErr: errors.New("some error"),
|
||||||
|
fatalOnError: true,
|
||||||
|
expectedErrorIsCalled: false,
|
||||||
|
expectedFatalIsCalled: true,
|
||||||
|
expectedFatalMsg: "kgo.OnProduceRecordUnbuffered",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
log := &mockLogger{}
|
||||||
|
he := &hookEvent{log: log, fatalOnError: tt.fatalOnError}
|
||||||
|
he.OnProduceRecordUnbuffered(&kgo.Record{}, tt.inputErr)
|
||||||
|
require.Equal(t, tt.expectedErrorIsCalled, log.errorIsCalled)
|
||||||
|
require.Equal(t, tt.expectedErrorMsg, log.errorMsg)
|
||||||
|
require.Equal(t, tt.expectedFatalIsCalled, log.fatalIsCalled)
|
||||||
|
require.Equal(t, tt.expectedFatalMsg, log.fatalMsg)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mocks
|
||||||
|
|
||||||
|
type mockLogger struct {
|
||||||
|
errorIsCalled bool
|
||||||
|
errorMsg string
|
||||||
|
|
||||||
|
fatalIsCalled bool
|
||||||
|
fatalMsg string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockLogger) Init(...logger.Option) error {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockLogger) Clone(...logger.Option) logger.Logger {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockLogger) V(logger.Level) bool {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockLogger) Level(logger.Level) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockLogger) Options() logger.Options {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockLogger) Fields(...interface{}) logger.Logger {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockLogger) Info(context.Context, string, ...interface{}) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockLogger) Trace(context.Context, string, ...interface{}) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockLogger) Debug(context.Context, string, ...interface{}) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockLogger) Warn(context.Context, string, ...interface{}) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockLogger) Error(ctx context.Context, msg string, args ...interface{}) {
|
||||||
|
m.errorIsCalled = true
|
||||||
|
m.errorMsg = msg
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockLogger) Fatal(ctx context.Context, msg string, args ...interface{}) {
|
||||||
|
m.fatalIsCalled = true
|
||||||
|
m.fatalMsg = msg
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockLogger) Log(context.Context, logger.Level, string, ...interface{}) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockLogger) Name() string {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockLogger) String() string {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
62
kgo.go
62
kgo.go
@@ -156,7 +156,7 @@ func (b *Broker) NewMessage(ctx context.Context, hdr metadata.Metadata, body int
|
|||||||
options.ContentType = b.opts.ContentType
|
options.ContentType = b.opts.ContentType
|
||||||
}
|
}
|
||||||
|
|
||||||
m := &kgoMessage{ctx: ctx, hdr: hdr, opts: options}
|
m := &kgoMessage{ctx: ctx, hdr: hdr.Copy(), opts: options}
|
||||||
c, err := b.newCodec(m.opts.ContentType)
|
c, err := b.newCodec(m.opts.ContentType)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
m.body, err = c.Marshal(body)
|
m.body, err = c.Marshal(body)
|
||||||
@@ -165,6 +165,8 @@ func (b *Broker) NewMessage(ctx context.Context, hdr metadata.Metadata, body int
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
m.hdr.Set(metadata.HeaderContentType, m.opts.ContentType)
|
||||||
|
|
||||||
return m, nil
|
return m, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -344,40 +346,44 @@ func (b *Broker) fnPublish(ctx context.Context, topic string, messages ...broker
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (b *Broker) publish(ctx context.Context, topic string, messages ...broker.Message) error {
|
func (b *Broker) publish(ctx context.Context, topic string, messages ...broker.Message) error {
|
||||||
records := make([]*kgo.Record, 0, len(messages))
|
var records []*kgo.Record
|
||||||
var errs []string
|
|
||||||
var key []byte
|
|
||||||
var promise func(*kgo.Record, error)
|
|
||||||
|
|
||||||
for _, msg := range messages {
|
for _, msg := range messages {
|
||||||
if mctx := msg.Context(); mctx != nil {
|
|
||||||
if k, ok := mctx.Value(messageKey{}).([]byte); ok && k != nil {
|
|
||||||
key = k
|
|
||||||
}
|
|
||||||
if p, ok := mctx.Value(messagePromiseKey{}).(func(*kgo.Record, error)); ok && p != nil {
|
|
||||||
promise = p
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
rec := &kgo.Record{
|
rec := &kgo.Record{
|
||||||
Context: ctx,
|
Context: msg.Context(),
|
||||||
Key: key,
|
|
||||||
Topic: topic,
|
Topic: topic,
|
||||||
Value: msg.Body(),
|
Value: msg.Body(),
|
||||||
}
|
}
|
||||||
|
|
||||||
b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Inc()
|
var promise func(*kgo.Record, error)
|
||||||
|
if rec.Context != nil {
|
||||||
|
if k, ok := rec.Context.Value(messageKey{}).([]byte); ok && k != nil {
|
||||||
|
rec.Key = k
|
||||||
|
}
|
||||||
|
if p, ok := rec.Context.Value(messagePromiseKey{}).(func(*kgo.Record, error)); ok && p != nil {
|
||||||
|
promise = p
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
kmsg, ok := msg.(*kgoMessage)
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if kmsg.opts.Context != nil {
|
||||||
|
if k, ok := kmsg.opts.Context.Value(messageKey{}).([]byte); ok && k != nil {
|
||||||
|
rec.Key = k
|
||||||
|
}
|
||||||
|
if p, ok := kmsg.opts.Context.Value(messagePromiseKey{}).(func(*kgo.Record, error)); ok && p != nil {
|
||||||
|
promise = p
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
setHeaders(rec, msg.Header())
|
setHeaders(rec, msg.Header())
|
||||||
|
|
||||||
records = append(records, rec)
|
|
||||||
}
|
|
||||||
|
|
||||||
ts := time.Now()
|
|
||||||
|
|
||||||
if promise != nil {
|
if promise != nil {
|
||||||
|
ts := time.Now()
|
||||||
for _, rec := range records {
|
b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Inc()
|
||||||
b.c.Produce(ctx, rec, func(r *kgo.Record, err error) {
|
b.c.Produce(ctx, rec, func(r *kgo.Record, err error) {
|
||||||
te := time.Since(ts)
|
te := time.Since(ts)
|
||||||
b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Dec()
|
b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Dec()
|
||||||
@@ -390,12 +396,17 @@ func (b *Broker) publish(ctx context.Context, topic string, messages ...broker.M
|
|||||||
}
|
}
|
||||||
promise(r, err)
|
promise(r, err)
|
||||||
})
|
})
|
||||||
|
continue
|
||||||
|
} else {
|
||||||
|
records = append(records, rec)
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(records) > 0 {
|
||||||
|
var errs []string
|
||||||
|
ts := time.Now()
|
||||||
|
b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", topic, "topic", topic).Set(uint64(len(records)))
|
||||||
results := b.c.ProduceSync(ctx, records...)
|
results := b.c.ProduceSync(ctx, records...)
|
||||||
|
|
||||||
te := time.Since(ts)
|
te := time.Since(ts)
|
||||||
for _, result := range results {
|
for _, result := range results {
|
||||||
b.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Update(te.Seconds())
|
b.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Update(te.Seconds())
|
||||||
@@ -412,6 +423,7 @@ func (b *Broker) publish(ctx context.Context, topic string, messages ...broker.M
|
|||||||
if len(errs) > 0 {
|
if len(errs) > 0 {
|
||||||
return fmt.Errorf("publish error: %s", strings.Join(errs, "\n"))
|
return fmt.Errorf("publish error: %s", strings.Join(errs, "\n"))
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -81,7 +81,7 @@ func TestFail(t *testing.T) {
|
|||||||
go func() {
|
go func() {
|
||||||
for _, msg := range msgs {
|
for _, msg := range msgs {
|
||||||
// t.Logf("broker publish")
|
// t.Logf("broker publish")
|
||||||
if err := b.Publish(ctx, "test", msg); err != nil {
|
if err := b.Publish(ctx, "test.fail", msg); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -96,7 +96,7 @@ func TestFail(t *testing.T) {
|
|||||||
return msg.Ack()
|
return msg.Ack()
|
||||||
}
|
}
|
||||||
|
|
||||||
sub, err := b.Subscribe(ctx, "test", fn,
|
sub, err := b.Subscribe(ctx, "test.fail", fn,
|
||||||
broker.SubscribeAutoAck(true),
|
broker.SubscribeAutoAck(true),
|
||||||
broker.SubscribeGroup(group),
|
broker.SubscribeGroup(group),
|
||||||
broker.SubscribeBodyOnly(true))
|
broker.SubscribeBodyOnly(true))
|
||||||
@@ -184,7 +184,7 @@ func TestPubSub(t *testing.T) {
|
|||||||
msgs = append(msgs, m)
|
msgs = append(msgs, m)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := b.Publish(ctx, "test", msgs...); err != nil {
|
if err := b.Publish(ctx, "test.pubsub", msgs...); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
// t.Skip()
|
// t.Skip()
|
||||||
@@ -197,7 +197,7 @@ func TestPubSub(t *testing.T) {
|
|||||||
return msg.Ack()
|
return msg.Ack()
|
||||||
}
|
}
|
||||||
|
|
||||||
sub, err := b.Subscribe(ctx, "test", fn,
|
sub, err := b.Subscribe(ctx, "test.pubsub", fn,
|
||||||
broker.SubscribeAutoAck(true),
|
broker.SubscribeAutoAck(true),
|
||||||
broker.SubscribeGroup(group),
|
broker.SubscribeGroup(group),
|
||||||
broker.SubscribeBodyOnly(true))
|
broker.SubscribeBodyOnly(true))
|
||||||
|
|||||||
@@ -116,10 +116,3 @@ type subscribeMessagePoolKey struct{}
|
|||||||
func SubscribeMessagePool(b bool) broker.SubscribeOption {
|
func SubscribeMessagePool(b bool) broker.SubscribeOption {
|
||||||
return broker.SetSubscribeOption(subscribeMessagePoolKey{}, b)
|
return broker.SetSubscribeOption(subscribeMessagePoolKey{}, b)
|
||||||
}
|
}
|
||||||
|
|
||||||
type subscribeMessagePoolKey struct{}
|
|
||||||
|
|
||||||
// SubscribeMessagePool optionaly enabled/disable message pool
|
|
||||||
func SubscribeMessagePool(b bool) broker.SubscribeOption {
|
|
||||||
return broker.SetSubscribeOption(subscribeMessagePoolKey{}, b)
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -39,7 +39,6 @@ type consumer struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Subscriber struct {
|
type Subscriber struct {
|
||||||
<<<<<<< HEAD
|
|
||||||
consumers map[tp]*consumer
|
consumers map[tp]*consumer
|
||||||
c *kgo.Client
|
c *kgo.Client
|
||||||
htracer *hookTracer
|
htracer *hookTracer
|
||||||
@@ -272,7 +271,11 @@ func (pc *consumer) consume() {
|
|||||||
for _, hdr := range record.Headers {
|
for _, hdr := range record.Headers {
|
||||||
pm.hdr.Set(hdr.Key, string(hdr.Value))
|
pm.hdr.Set(hdr.Key, string(hdr.Value))
|
||||||
}
|
}
|
||||||
|
pm.hdr.Set("Micro-Offset", strconv.FormatInt(record.Offset, 10))
|
||||||
|
pm.hdr.Set("Micro-Partition", strconv.FormatInt(int64(record.Partition), 10))
|
||||||
|
pm.hdr.Set("Micro-Topic", record.Topic)
|
||||||
|
pm.hdr.Set("Micro-Key", string(record.Key))
|
||||||
|
pm.hdr.Set("Micro-Timestamp", strconv.FormatInt(record.Timestamp.Unix(), 10))
|
||||||
switch h := pc.handler.(type) {
|
switch h := pc.handler.(type) {
|
||||||
case func(broker.Message) error:
|
case func(broker.Message) error:
|
||||||
err = h(pm)
|
err = h(pm)
|
||||||
|
|||||||
Reference in New Issue
Block a user