From d5d1e26d7bceeef5041f5ef910610d6c8aed4410 Mon Sep 17 00:00:00 2001 From: pugnack Date: Thu, 10 Jul 2025 12:49:49 +0500 Subject: [PATCH] remove noisy logs from event hooks and add tests (#192) --- errors.go | 10 + go.mod | 17 +- go.sum | 24 +-- broker.go => hook_event.go | 40 ++-- hook_event_test.go | 362 +++++++++++++++++++++++++++++++++++++ 5 files changed, 416 insertions(+), 37 deletions(-) create mode 100644 errors.go rename broker.go => hook_event.go (72%) create mode 100644 hook_event_test.go diff --git a/errors.go b/errors.go new file mode 100644 index 0000000..ef173af --- /dev/null +++ b/errors.go @@ -0,0 +1,10 @@ +package kgo + +import ( + "context" + "errors" +) + +func isContextError(err error) bool { + return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) +} diff --git a/go.mod b/go.mod index 1da867e..543740d 100644 --- a/go.mod +++ b/go.mod @@ -1,25 +1,30 @@ module go.unistack.org/micro-broker-kgo/v3 -go 1.23.0 +go 1.23.8 + +toolchain go1.24.3 require ( github.com/google/uuid v1.6.0 - github.com/twmb/franz-go v1.18.1 - github.com/twmb/franz-go/pkg/kadm v1.15.0 - github.com/twmb/franz-go/pkg/kmsg v1.9.0 + github.com/stretchr/testify v1.10.0 + github.com/twmb/franz-go v1.19.5 + github.com/twmb/franz-go/pkg/kadm v1.16.0 + github.com/twmb/franz-go/pkg/kmsg v1.11.2 go.opentelemetry.io/otel v1.34.0 go.unistack.org/micro/v3 v3.11.44 ) require ( github.com/ash3in/uuidv8 v1.2.0 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/klauspost/compress v1.18.0 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/matoous/go-nanoid v1.5.1 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect go.unistack.org/micro-proto/v3 v3.4.1 // indirect - golang.org/x/crypto v0.35.0 // indirect - golang.org/x/sys v0.30.0 // indirect + golang.org/x/crypto v0.38.0 // indirect + golang.org/x/sys v0.33.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250224174004-546df14abb99 // indirect google.golang.org/grpc v1.70.0 // indirect google.golang.org/protobuf v1.36.5 // indirect diff --git a/go.sum b/go.sum index 4af0b07..7105789 100644 --- a/go.sum +++ b/go.sum @@ -29,26 +29,26 @@ github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/twmb/franz-go v1.18.1 h1:D75xxCDyvTqBSiImFx2lkPduE39jz1vaD7+FNc+vMkc= -github.com/twmb/franz-go v1.18.1/go.mod h1:Uzo77TarcLTUZeLuGq+9lNpSkfZI+JErv7YJhlDjs9M= -github.com/twmb/franz-go/pkg/kadm v1.15.0 h1:Yo3NAPfcsx3Gg9/hdhq4vmwO77TqRRkvpUcGWzjworc= -github.com/twmb/franz-go/pkg/kadm v1.15.0/go.mod h1:MUdcUtnf9ph4SFBLLA/XxE29rvLhWYLM9Ygb8dfSCvw= -github.com/twmb/franz-go/pkg/kmsg v1.9.0 h1:JojYUph2TKAau6SBtErXpXGC7E3gg4vGZMv9xFU/B6M= -github.com/twmb/franz-go/pkg/kmsg v1.9.0/go.mod h1:CMbfazviCyY6HM0SXuG5t9vOwYDHRCSrJJyBAe5paqg= +github.com/twmb/franz-go v1.19.5 h1:W7+o8D0RsQsedqib71OVlLeZ0zI6CbFra7yTYhZTs5Y= +github.com/twmb/franz-go v1.19.5/go.mod h1:4kFJ5tmbbl7asgwAGVuyG1ZMx0NNpYk7EqflvWfPCpM= +github.com/twmb/franz-go/pkg/kadm v1.16.0 h1:STMs1t5lYR5mR974PSiwNzE5TvsosByTp+rKXLOhAjE= +github.com/twmb/franz-go/pkg/kadm v1.16.0/go.mod h1:MUdcUtnf9ph4SFBLLA/XxE29rvLhWYLM9Ygb8dfSCvw= +github.com/twmb/franz-go/pkg/kmsg v1.11.2 h1:hIw75FpwcAjgeyfIGFqivAvwC5uNIOWRGvQgZhH4mhg= +github.com/twmb/franz-go/pkg/kmsg v1.11.2/go.mod h1:CFfkkLysDNmukPYhGzuUcDtf46gQSqCZHMW1T4Z+wDE= go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q= go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo= go.unistack.org/micro/v3 v3.11.44 h1:A+T8zVcL2vlL66kn/Y4rqhtBybLO829wFEYZJYorDOU= go.unistack.org/micro/v3 v3.11.44/go.mod h1:13EFW2ps3BN9mpYbp9K0oQu/VDjEN6LJ4wwdom7hcXQ= -golang.org/x/crypto v0.35.0 h1:b15kiHdrGCHrP6LvwaQ3c03kgNhhiMgvlhxHQhmg2Xs= -golang.org/x/crypto v0.35.0/go.mod h1:dy7dXNW32cAb/6/PRuTNsix8T+vJAqvuIy5Bli/x0YQ= +golang.org/x/crypto v0.38.0 h1:jt+WWG8IZlBnVbomuhg2Mdq0+BBQaHbtqHEFEigjUV8= +golang.org/x/crypto v0.38.0/go.mod h1:MvrbAqul58NNYPKnOra203SB9vpuZW0e+RRZV+Ggqjw= golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= -golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= -golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM= -golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= +golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= +golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.25.0 h1:qVyWApTSYLk/drJRO5mDlNYskwQznZmkpV2c8q9zls4= +golang.org/x/text v0.25.0/go.mod h1:WEdwpYrmk1qmdHvhkSTNPm3app7v4rsT8F2UD6+VHIA= google.golang.org/genproto/googleapis/rpc v0.0.0-20250224174004-546df14abb99 h1:ZSlhAUqC4r8TPzqLXQ0m3upBNZeF+Y8jQ3c4CR3Ujms= google.golang.org/genproto/googleapis/rpc v0.0.0-20250224174004-546df14abb99/go.mod h1:LuRYeWDFV6WOn90g357N17oMCaxpgCnbi/44qJvDn2I= google.golang.org/grpc v1.70.0 h1:pWFv03aZoHzlRKHWicjsZytKAiYCtNS0dHbXnIdq7jQ= diff --git a/broker.go b/hook_event.go similarity index 72% rename from broker.go rename to hook_event.go index bd045c4..3372f42 100644 --- a/broker.go +++ b/hook_event.go @@ -26,46 +26,48 @@ var ( ) func (m *hookEvent) OnGroupManageError(err error) { - if err != nil { - // m.connected.Store(0) - // if m.fatalOnError { + switch { + case err == nil || isContextError(err) || kgo.IsRetryableBrokerErr(err): + return + default: m.log.Error(context.TODO(), "kgo.OnGroupManageError", err) - //} } } func (m *hookEvent) OnBrokerConnect(_ kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) { - if err != nil { - // m.connected.Store(0) - // if m.fatalOnError { + switch { + case err == nil || isContextError(err) || kgo.IsRetryableBrokerErr(err): + return + default: m.log.Error(context.TODO(), "kgo.OnBrokerConnect", err) - //} } } -func (m *hookEvent) OnBrokerDisconnect(_ kgo.BrokerMetadata, _ net.Conn) { - // m.connected.Store(0) -} +func (m *hookEvent) OnBrokerDisconnect(_ kgo.BrokerMetadata, _ net.Conn) {} func (m *hookEvent) OnBrokerWrite(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) { - if err != nil { - // m.connected.Store(0) - // if m.fatalOnError { + switch { + case err == nil || isContextError(err) || kgo.IsRetryableBrokerErr(err): + return + default: m.log.Error(context.TODO(), "kgo.OnBrokerWrite", err) - //} } } func (m *hookEvent) OnBrokerRead(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) { - if err != nil { - // m.connected.Store(0) + switch { + case err == nil || isContextError(err) || kgo.IsRetryableBrokerErr(err): + return + default: m.log.Error(context.TODO(), "kgo.OnBrokerRead", err) } } func (m *hookEvent) OnProduceRecordUnbuffered(_ *kgo.Record, err error) { - if err != nil { - // m.connected.Store(0) + switch { + case err == nil || isContextError(err) || kgo.IsRetryableBrokerErr(err): + return + default: m.log.Error(context.TODO(), "kgo.OnProduceRecordUnbuffered", err) } } diff --git a/hook_event_test.go b/hook_event_test.go new file mode 100644 index 0000000..ceffaea --- /dev/null +++ b/hook_event_test.go @@ -0,0 +1,362 @@ +package kgo + +import ( + "context" + "errors" + "io" + "net" + "os" + "testing" + + "github.com/stretchr/testify/require" + "github.com/twmb/franz-go/pkg/kgo" + "go.unistack.org/micro/v3/logger" +) + +func TestHookEvent_OnGroupManageError(t *testing.T) { + tests := []struct { + name string + inputErr error + expectedErrorIsCalled bool + expectedErrorMsg string + }{ + { + name: "error is nil", + inputErr: nil, + expectedErrorIsCalled: false, + }, + { + name: "context canceled", + inputErr: context.Canceled, + expectedErrorIsCalled: false, + }, + { + name: "context deadline exceeded", + inputErr: context.DeadlineExceeded, + expectedErrorIsCalled: false, + }, + { + name: "retryable error: deadline exceeded (os package)", + inputErr: os.ErrDeadlineExceeded, + expectedErrorIsCalled: false, + }, + { + name: "retryable error: EOF (io package)", + inputErr: io.EOF, + expectedErrorIsCalled: false, + }, + { + name: "retryable error: closed network connection (net package)", + inputErr: net.ErrClosed, + expectedErrorIsCalled: false, + }, + { + name: "some error", + inputErr: errors.New("some error"), + expectedErrorIsCalled: true, + expectedErrorMsg: "kgo.OnGroupManageError", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + log := &mockLogger{} + he := &hookEvent{log: log} + he.OnGroupManageError(tt.inputErr) + require.Equal(t, tt.expectedErrorIsCalled, log.errorIsCalled) + require.Equal(t, tt.expectedErrorMsg, log.errorMsg) + }) + } +} + +func TestHookEvent_OnBrokerConnect(t *testing.T) { + tests := []struct { + name string + inputErr error + expectedErrorIsCalled bool + expectedErrorMsg string + }{ + { + name: "error is nil", + inputErr: nil, + expectedErrorIsCalled: false, + }, + { + name: "context canceled", + inputErr: context.Canceled, + expectedErrorIsCalled: false, + }, + { + name: "context deadline exceeded", + inputErr: context.DeadlineExceeded, + expectedErrorIsCalled: false, + }, + { + name: "retryable error: deadline exceeded (os package)", + inputErr: os.ErrDeadlineExceeded, + expectedErrorIsCalled: false, + }, + { + name: "retryable error: EOF (io package)", + inputErr: io.EOF, + expectedErrorIsCalled: false, + }, + { + name: "retryable error: closed network connection (net package)", + inputErr: net.ErrClosed, + expectedErrorIsCalled: false, + }, + { + name: "some error", + inputErr: errors.New("some error"), + expectedErrorIsCalled: true, + expectedErrorMsg: "kgo.OnBrokerConnect", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + log := &mockLogger{} + he := &hookEvent{log: log} + he.OnBrokerConnect(kgo.BrokerMetadata{}, 0, nil, tt.inputErr) + require.Equal(t, tt.expectedErrorIsCalled, log.errorIsCalled) + require.Equal(t, tt.expectedErrorMsg, log.errorMsg) + }) + } +} + +func TestHookEvent_OnBrokerWrite(t *testing.T) { + tests := []struct { + name string + inputErr error + expectedErrorIsCalled bool + expectedErrorMsg string + }{ + { + name: "error is nil", + inputErr: nil, + expectedErrorIsCalled: false, + }, + { + name: "context canceled", + inputErr: context.Canceled, + expectedErrorIsCalled: false, + }, + { + name: "context deadline exceeded", + inputErr: context.DeadlineExceeded, + expectedErrorIsCalled: false, + }, + { + name: "retryable error: deadline exceeded (os package)", + inputErr: os.ErrDeadlineExceeded, + expectedErrorIsCalled: false, + }, + { + name: "retryable error: EOF (io package)", + inputErr: io.EOF, + expectedErrorIsCalled: false, + }, + { + name: "retryable error: closed network connection (net package)", + inputErr: net.ErrClosed, + expectedErrorIsCalled: false, + }, + { + name: "some error", + inputErr: errors.New("some error"), + expectedErrorIsCalled: true, + expectedErrorMsg: "kgo.OnBrokerWrite", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + log := &mockLogger{} + he := &hookEvent{log: log} + he.OnBrokerWrite(kgo.BrokerMetadata{}, 0, 0, 0, 0, tt.inputErr) + require.Equal(t, tt.expectedErrorIsCalled, log.errorIsCalled) + require.Equal(t, tt.expectedErrorMsg, log.errorMsg) + }) + } +} + +func TestHookEvent_OnBrokerRead(t *testing.T) { + tests := []struct { + name string + inputErr error + expectedErrorIsCalled bool + expectedErrorMsg string + }{ + { + name: "error is nil", + inputErr: nil, + expectedErrorIsCalled: false, + }, + { + name: "context canceled", + inputErr: context.Canceled, + expectedErrorIsCalled: false, + }, + { + name: "context deadline exceeded", + inputErr: context.DeadlineExceeded, + expectedErrorIsCalled: false, + }, + { + name: "retryable error: deadline exceeded (os package)", + inputErr: os.ErrDeadlineExceeded, + expectedErrorIsCalled: false, + }, + { + name: "retryable error: EOF (io package)", + inputErr: io.EOF, + expectedErrorIsCalled: false, + }, + { + name: "retryable error: closed network connection (net package)", + inputErr: net.ErrClosed, + expectedErrorIsCalled: false, + }, + { + name: "some error", + inputErr: errors.New("some error"), + expectedErrorIsCalled: true, + expectedErrorMsg: "kgo.OnBrokerRead", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + log := &mockLogger{} + he := &hookEvent{log: log} + he.OnBrokerRead(kgo.BrokerMetadata{}, 0, 0, 0, 0, tt.inputErr) + require.Equal(t, tt.expectedErrorIsCalled, log.errorIsCalled) + require.Equal(t, tt.expectedErrorMsg, log.errorMsg) + }) + } +} + +func TestHookEvent_OnProduceRecordUnbuffered(t *testing.T) { + tests := []struct { + name string + inputErr error + expectedErrorIsCalled bool + expectedErrorMsg string + }{ + { + name: "error is nil", + inputErr: nil, + expectedErrorIsCalled: false, + }, + { + name: "context canceled", + inputErr: context.Canceled, + expectedErrorIsCalled: false, + }, + { + name: "context deadline exceeded", + inputErr: context.DeadlineExceeded, + expectedErrorIsCalled: false, + }, + { + name: "retryable error: deadline exceeded (os package)", + inputErr: os.ErrDeadlineExceeded, + expectedErrorIsCalled: false, + }, + { + name: "retryable error: EOF (io package)", + inputErr: io.EOF, + expectedErrorIsCalled: false, + }, + { + name: "retryable error: closed network connection (net package)", + inputErr: net.ErrClosed, + expectedErrorIsCalled: false, + }, + { + name: "some error", + inputErr: errors.New("some error"), + expectedErrorIsCalled: true, + expectedErrorMsg: "kgo.OnProduceRecordUnbuffered", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + log := &mockLogger{} + he := &hookEvent{log: log} + he.OnProduceRecordUnbuffered(&kgo.Record{}, tt.inputErr) + require.Equal(t, tt.expectedErrorIsCalled, log.errorIsCalled) + require.Equal(t, tt.expectedErrorMsg, log.errorMsg) + }) + } +} + +// Mocks + +type mockLogger struct { + errorIsCalled bool + errorMsg string +} + +func (m *mockLogger) Init(...logger.Option) error { + panic("implement me") +} + +func (m *mockLogger) Clone(...logger.Option) logger.Logger { + panic("implement me") +} + +func (m *mockLogger) V(logger.Level) bool { + panic("implement me") +} + +func (m *mockLogger) Level(logger.Level) { + panic("implement me") +} + +func (m *mockLogger) Options() logger.Options { + panic("implement me") +} + +func (m *mockLogger) Fields(...interface{}) logger.Logger { + panic("implement me") +} + +func (m *mockLogger) Info(context.Context, string, ...interface{}) { + panic("implement me") +} + +func (m *mockLogger) Trace(context.Context, string, ...interface{}) { + panic("implement me") +} + +func (m *mockLogger) Debug(context.Context, string, ...interface{}) { + panic("implement me") +} + +func (m *mockLogger) Warn(context.Context, string, ...interface{}) { + panic("implement me") +} + +func (m *mockLogger) Error(ctx context.Context, msg string, args ...interface{}) { + m.errorIsCalled = true + m.errorMsg = msg +} + +func (m *mockLogger) Fatal(context.Context, string, ...interface{}) { + panic("implement me") +} + +func (m *mockLogger) Log(context.Context, logger.Level, string, ...interface{}) { + panic("implement me") +} + +func (m *mockLogger) Name() string { + panic("implement me") +} + +func (m *mockLogger) String() string { + panic("implement me") +}