broker: add ErrorHandler
Some checks failed
coverage / build (push) Failing after 4m49s
test / test (push) Failing after 16m1s
sync / sync (push) Failing after 20s

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2025-12-16 08:25:59 +03:00
parent 13b01f59ee
commit 46eb739dff
2 changed files with 14 additions and 11 deletions

View File

@@ -41,11 +41,11 @@ type Broker interface {
// Disconnect disconnect from broker
Disconnect(ctx context.Context) error
// NewMessage create new broker message to publish.
NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...MessageOption) (Message, error)
NewMessage(ctx context.Context, hdr metadata.Metadata, body any, opts ...MessageOption) (Message, error)
// Publish message to broker topic
Publish(ctx context.Context, topic string, messages ...Message) error
// Subscribe subscribes to topic message via handler
Subscribe(ctx context.Context, topic string, handler interface{}, opts ...SubscribeOption) (Subscriber, error)
Subscribe(ctx context.Context, topic string, handler any, opts ...SubscribeOption) (Subscriber, error)
// String type of broker
String() string
// Live returns broker liveness
@@ -59,7 +59,7 @@ type Broker interface {
type (
FuncPublish func(ctx context.Context, topic string, messages ...Message) error
HookPublish func(next FuncPublish) FuncPublish
FuncSubscribe func(ctx context.Context, topic string, handler interface{}, opts ...SubscribeOption) (Subscriber, error)
FuncSubscribe func(ctx context.Context, topic string, handler any, opts ...SubscribeOption) (Subscriber, error)
HookSubscribe func(next FuncSubscribe) FuncSubscribe
)
@@ -75,7 +75,7 @@ type Message interface {
Body() []byte
// Unmarshal try to decode message body to dst.
// This is helper method that uses codec.Unmarshal.
Unmarshal(dst interface{}, opts ...codec.Option) error
Unmarshal(dst any, opts ...codec.Option) error
// Ack acknowledge message if supported.
Ack() error
}

View File

@@ -18,7 +18,6 @@ import (
type Options struct {
// Name holds the broker name
Name string
// Tracer used for tracing
Tracer tracer.Tracer
// Register can be used for clustering
@@ -31,23 +30,20 @@ type Options struct {
Meter meter.Meter
// Context holds external options
Context context.Context
// Wait waits for a collection of goroutines to finish
Wait *sync.WaitGroup
// TLSConfig holds tls.TLSConfig options
TLSConfig *tls.Config
// Addrs holds the broker address
Addrs []string
// Hooks can be run before broker Publish/BatchPublish and
// Subscribe/BatchSubscribe methods
// Hooks can be run before broker Publishing and message processing in Subscribe
Hooks options.Hooks
// GracefulTimeout contains time to wait to finish in flight requests
GracefulTimeout time.Duration
// ContentType will be used if no content-type set when creating message
ContentType string
// ErrorHandler specifies handler for all broker errors handling subscriber
ErrorHandler any
}
// NewOptions create new Options
@@ -93,6 +89,13 @@ func ContentType(ct string) Option {
}
}
// ErrorHandler handles errors in broker
func ErrorHandler(h any) Option {
return func(o *Options) {
o.ErrorHandler = h
}
}
// MessageOptions struct
type MessageOptions struct {
// ContentType for message body