diff --git a/broker/broker.go b/broker/broker.go index 55faa7a4..6eaa9e2b 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -18,14 +18,6 @@ var ( ErrDisconnected = errors.New("broker disconnected") ) -type BatchBroker interface { - Broker - // BatchPublish messages to broker with multiple topics - BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error - // BatchSubscribe subscribes to topic messages via handler - BatchSubscribe(ctx context.Context, topic string, h BatchHandler, opts ...SubscribeOption) (Subscriber, error) -} - // Broker is an interface used for asynchronous messaging. type Broker interface { // Name returns broker instance name @@ -44,6 +36,10 @@ type Broker interface { Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error // Subscribe subscribes to topic message via handler Subscribe(ctx context.Context, topic string, h Handler, opts ...SubscribeOption) (Subscriber, error) + // BatchPublish messages to broker with multiple topics + BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error + // BatchSubscribe subscribes to topic messages via handler + BatchSubscribe(ctx context.Context, topic string, h BatchHandler, opts ...SubscribeOption) (Subscriber, error) // String type of broker String() string } diff --git a/broker/memory.go b/broker/memory.go index 2b31408f..39eaaa1c 100644 --- a/broker/memory.go +++ b/broker/memory.go @@ -96,8 +96,9 @@ func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *Message, } m.RUnlock() + options := NewPublishOptions(opts...) vs := make([]msgWrapper, 0, 1) - if m.opts.Codec == nil { + if m.opts.Codec == nil || options.BodyOnly { topic, _ := msg.Header.Get(metadata.HeaderTopic) vs = append(vs, msgWrapper{topic: topic, body: msg}) } else { @@ -125,8 +126,9 @@ func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts . } m.RUnlock() + options := NewPublishOptions(opts...) vs := make([]msgWrapper, 0, len(msgs)) - if m.opts.Codec == nil { + if m.opts.Codec == nil || options.BodyOnly { for _, msg := range msgs { topic, _ := msg.Header.Get(metadata.HeaderTopic) vs = append(vs, msgWrapper{topic: topic, body: msg}) @@ -360,7 +362,7 @@ func (m *memorySubscriber) Unsubscribe(ctx context.Context) error { } // NewBroker return new memory broker -func NewBroker(opts ...Option) BatchBroker { +func NewBroker(opts ...Option) Broker { return &memoryBroker{ opts: NewOptions(opts...), subscribers: make(map[string][]*memorySubscriber), diff --git a/broker/options.go b/broker/options.go index d081adbe..94db357f 100644 --- a/broker/options.go +++ b/broker/options.go @@ -3,6 +3,7 @@ package broker import ( "context" "crypto/tls" + "time" "github.com/unistack-org/micro/v3/codec" "github.com/unistack-org/micro/v3/logger" @@ -73,11 +74,9 @@ func NewPublishOptions(opts ...PublishOption) PublishOptions { options := PublishOptions{ Context: context.Background(), } - for _, o := range opts { o(&options) } - return options } @@ -95,6 +94,10 @@ type SubscribeOptions struct { AutoAck bool // BodyOnly flag specifies that message contains only body bytes without header BodyOnly bool + // BatchSize flag specifies max batch size + BatchSize int + // BatchWait flag specifies max wait time for batch filling + BatchWait time.Duration } // Option func @@ -117,23 +120,6 @@ func PublishContext(ctx context.Context) PublishOption { } } -// SubscribeOption func -type SubscribeOption func(*SubscribeOptions) - -// NewSubscribeOptions creates new SubscribeOptions -func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions { - options := SubscribeOptions{ - AutoAck: true, - Context: context.Background(), - } - - for _, o := range opts { - o(&options) - } - - return options -} - // Addrs sets the host addresses to be used by the broker func Addrs(addrs ...string) Option { return func(o *Options) { @@ -149,28 +135,6 @@ func Codec(c codec.Codec) Option { } } -// DisableAutoAck disables auto ack -func DisableAutoAck() SubscribeOption { - return func(o *SubscribeOptions) { - o.AutoAck = false - } -} - -// SubscribeAutoAck will disable auto acking of messages -// after they have been handled. -func SubscribeAutoAck(b bool) SubscribeOption { - return func(o *SubscribeOptions) { - o.AutoAck = b - } -} - -// SubscribeBodyOnly consumes only body of the message -func SubscribeBodyOnly(b bool) SubscribeOption { - return func(o *SubscribeOptions) { - o.BodyOnly = b - } -} - // ErrorHandler will catch all broker errors that cant be handled // in normal way, for example Codec errors func ErrorHandler(h Handler) Option { @@ -266,3 +230,55 @@ func SubscribeContext(ctx context.Context) SubscribeOption { o.Context = ctx } } + +// DisableAutoAck disables auto ack +// DEPRECATED +func DisableAutoAck() SubscribeOption { + return func(o *SubscribeOptions) { + o.AutoAck = false + } +} + +// SubscribeAutoAck contol auto acking of messages +// after they have been handled. +func SubscribeAutoAck(b bool) SubscribeOption { + return func(o *SubscribeOptions) { + o.AutoAck = b + } +} + +// SubscribeBodyOnly consumes only body of the message +func SubscribeBodyOnly(b bool) SubscribeOption { + return func(o *SubscribeOptions) { + o.BodyOnly = b + } +} + +// SubscribeBatchSize specifies max batch size +func SubscribeBatchSize(n int) SubscribeOption { + return func(o *SubscribeOptions) { + o.BatchSize = n + } +} + +// SubscribeBatchWait specifies max batch wait time +func SubscribeBatchWait(td time.Duration) SubscribeOption { + return func(o *SubscribeOptions) { + o.BatchWait = td + } +} + +// SubscribeOption func +type SubscribeOption func(*SubscribeOptions) + +// NewSubscribeOptions creates new SubscribeOptions +func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions { + options := SubscribeOptions{ + AutoAck: true, + Context: context.Background(), + } + for _, o := range opts { + o(&options) + } + return options +}