package broker import ( "context" "crypto/tls" "github.com/unistack-org/micro/v3/codec" "github.com/unistack-org/micro/v3/logger" "github.com/unistack-org/micro/v3/registry" ) type Options struct { Addrs []string Secure bool Codec codec.Marshaler // Logger Logger logger.Logger // Handler executed when errors occur processing messages ErrorHandler Handler TLSConfig *tls.Config // Registry used for clustering Registry registry.Registry // Other options for implementations of the interface // can be stored in a context Context context.Context } func NewOptions(opts ...Option) Options { options := Options{ Registry: registry.DefaultRegistry, Logger: logger.DefaultLogger, Context: context.Background(), } for _, o := range opts { o(&options) } return options } type PublishOptions struct { // Other options for implementations of the interface // can be stored in a context Context context.Context } type SubscribeOptions struct { // AutoAck ack messages if handler returns nil err AutoAck bool // Handler executed when errors occur processing messages ErrorHandler Handler // Subscribers with the same group name // will create a shared subscription where each // receives a subset of messages. Group string // Other options for implementations of the interface // can be stored in a context Context context.Context } type Option func(*Options) type PublishOption func(*PublishOptions) // PublishContext set context func PublishContext(ctx context.Context) PublishOption { return func(o *PublishOptions) { o.Context = ctx } } type SubscribeOption func(*SubscribeOptions) func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions { opt := SubscribeOptions{ AutoAck: true, Context: context.Background(), } for _, o := range opts { o(&opt) } return opt } // Addrs sets the host addresses to be used by the broker func Addrs(addrs ...string) Option { return func(o *Options) { o.Addrs = addrs } } // Codec sets the codec used for encoding/decoding used where // a broker does not support headers func Codec(c codec.Marshaler) Option { return func(o *Options) { o.Codec = c } } func DisableAutoAck() SubscribeOption { return func(o *SubscribeOptions) { o.AutoAck = false } } // SubscribeAutoAck will disable auto acking of messages // after they have been handled. func SubscribeAutoAck(b bool) SubscribeOption { return func(o *SubscribeOptions) { o.AutoAck = b } } // ErrorHandler will catch all broker errors that cant be handled // in normal way, for example Codec errors func ErrorHandler(h Handler) Option { return func(o *Options) { o.ErrorHandler = h } } // SubscribeErrorHandler will catch all broker errors that cant be handled // in normal way, for example Codec errors func SubscribeErrorHandler(h Handler) SubscribeOption { return func(o *SubscribeOptions) { o.ErrorHandler = h } } func Queue(name string) SubscribeOption { return func(o *SubscribeOptions) { o.Group = name } } // SubscribeGroup sets the name of the queue to share messages on func SubscribeGroup(name string) SubscribeOption { return func(o *SubscribeOptions) { o.Group = name } } func Registry(r registry.Registry) Option { return func(o *Options) { o.Registry = r } } // Secure communication with the broker func Secure(b bool) Option { return func(o *Options) { o.Secure = b } } // Specify TLS Config func TLSConfig(t *tls.Config) Option { return func(o *Options) { o.TLSConfig = t } } // Logger sets the logger func Logger(l logger.Logger) Option { return func(o *Options) { o.Logger = l } } // SubscribeContext set context func SubscribeContext(ctx context.Context) SubscribeOption { return func(o *SubscribeOptions) { o.Context = ctx } }