From c32a17b69bf483c1354777604d3425f140830b60 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Tue, 16 Dec 2025 08:25:59 +0300 Subject: [PATCH] broker: add ErrorHandler Signed-off-by: Vasiliy Tolstov --- broker/broker.go | 8 ++++---- broker/options.go | 17 ++++++++++------- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/broker/broker.go b/broker/broker.go index 582d6deb..92c56db4 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -41,11 +41,11 @@ type Broker interface { // Disconnect disconnect from broker Disconnect(ctx context.Context) error // NewMessage create new broker message to publish. - NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...MessageOption) (Message, error) + NewMessage(ctx context.Context, hdr metadata.Metadata, body any, opts ...MessageOption) (Message, error) // Publish message to broker topic Publish(ctx context.Context, topic string, messages ...Message) error // Subscribe subscribes to topic message via handler - Subscribe(ctx context.Context, topic string, handler interface{}, opts ...SubscribeOption) (Subscriber, error) + Subscribe(ctx context.Context, topic string, handler any, opts ...SubscribeOption) (Subscriber, error) // String type of broker String() string // Live returns broker liveness @@ -59,7 +59,7 @@ type Broker interface { type ( FuncPublish func(ctx context.Context, topic string, messages ...Message) error HookPublish func(next FuncPublish) FuncPublish - FuncSubscribe func(ctx context.Context, topic string, handler interface{}, opts ...SubscribeOption) (Subscriber, error) + FuncSubscribe func(ctx context.Context, topic string, handler any, opts ...SubscribeOption) (Subscriber, error) HookSubscribe func(next FuncSubscribe) FuncSubscribe ) @@ -75,7 +75,7 @@ type Message interface { Body() []byte // Unmarshal try to decode message body to dst. // This is helper method that uses codec.Unmarshal. - Unmarshal(dst interface{}, opts ...codec.Option) error + Unmarshal(dst any, opts ...codec.Option) error // Ack acknowledge message if supported. Ack() error } diff --git a/broker/options.go b/broker/options.go index c1f3588c..2138078d 100644 --- a/broker/options.go +++ b/broker/options.go @@ -18,7 +18,6 @@ import ( type Options struct { // Name holds the broker name Name string - // Tracer used for tracing Tracer tracer.Tracer // Register can be used for clustering @@ -31,23 +30,20 @@ type Options struct { Meter meter.Meter // Context holds external options Context context.Context - // Wait waits for a collection of goroutines to finish Wait *sync.WaitGroup // TLSConfig holds tls.TLSConfig options TLSConfig *tls.Config - // Addrs holds the broker address Addrs []string - // Hooks can be run before broker Publish/BatchPublish and - // Subscribe/BatchSubscribe methods + // Hooks can be run before broker Publishing and message processing in Subscribe Hooks options.Hooks - // GracefulTimeout contains time to wait to finish in flight requests GracefulTimeout time.Duration - // ContentType will be used if no content-type set when creating message ContentType string + // ErrorHandler specifies handler for all broker errors handling subscriber + ErrorHandler any } // NewOptions create new Options @@ -93,6 +89,13 @@ func ContentType(ct string) Option { } } +// ErrorHandler handles errors in broker +func ErrorHandler(h any) Option { + return func(o *Options) { + o.ErrorHandler = h + } +} + // MessageOptions struct type MessageOptions struct { // ContentType for message body