From e64269b2a884cd0a7a230d856f3c4dc605a734d3 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Fri, 23 Jul 2021 12:55:36 +0300 Subject: [PATCH] broker: add BatchBroker interface to avoid breaking older brokers Signed-off-by: Vasiliy Tolstov --- broker/broker.go | 12 ++++++++---- broker/memory.go | 4 +--- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/broker/broker.go b/broker/broker.go index d511cccf..55faa7a4 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -18,6 +18,14 @@ var ( ErrDisconnected = errors.New("broker disconnected") ) +type BatchBroker interface { + Broker + // BatchPublish messages to broker with multiple topics + BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error + // BatchSubscribe subscribes to topic messages via handler + BatchSubscribe(ctx context.Context, topic string, h BatchHandler, opts ...SubscribeOption) (Subscriber, error) +} + // Broker is an interface used for asynchronous messaging. type Broker interface { // Name returns broker instance name @@ -34,12 +42,8 @@ type Broker interface { Disconnect(ctx context.Context) error // Publish message to broker topic Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error - // BatchPublish messages to broker with multiple topics - BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error // Subscribe subscribes to topic message via handler Subscribe(ctx context.Context, topic string, h Handler, opts ...SubscribeOption) (Subscriber, error) - // BatchSubscribe subscribes to topic messages via handler - BatchSubscribe(ctx context.Context, topic string, h BatchHandler, opts ...SubscribeOption) (Subscriber, error) // String type of broker String() string } diff --git a/broker/memory.go b/broker/memory.go index 10420cc8..15e0c907 100644 --- a/broker/memory.go +++ b/broker/memory.go @@ -303,8 +303,6 @@ func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler }() return sub, nil - - return nil, nil } func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) { @@ -421,7 +419,7 @@ func (m *memorySubscriber) Unsubscribe(ctx context.Context) error { } // NewBroker return new memory broker -func NewBroker(opts ...Option) Broker { +func NewBroker(opts ...Option) BatchBroker { return &memoryBroker{ opts: NewOptions(opts...), subscribers: make(map[string][]*memorySubscriber),