diff --git a/broker/broker.go b/broker/broker.go index d511cccf..55faa7a4 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -18,6 +18,14 @@ var ( ErrDisconnected = errors.New("broker disconnected") ) +type BatchBroker interface { + Broker + // BatchPublish messages to broker with multiple topics + BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error + // BatchSubscribe subscribes to topic messages via handler + BatchSubscribe(ctx context.Context, topic string, h BatchHandler, opts ...SubscribeOption) (Subscriber, error) +} + // Broker is an interface used for asynchronous messaging. type Broker interface { // Name returns broker instance name @@ -34,12 +42,8 @@ type Broker interface { Disconnect(ctx context.Context) error // Publish message to broker topic Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error - // BatchPublish messages to broker with multiple topics - BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error // Subscribe subscribes to topic message via handler Subscribe(ctx context.Context, topic string, h Handler, opts ...SubscribeOption) (Subscriber, error) - // BatchSubscribe subscribes to topic messages via handler - BatchSubscribe(ctx context.Context, topic string, h BatchHandler, opts ...SubscribeOption) (Subscriber, error) // String type of broker String() string } diff --git a/broker/memory.go b/broker/memory.go index 10420cc8..15e0c907 100644 --- a/broker/memory.go +++ b/broker/memory.go @@ -303,8 +303,6 @@ func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler }() return sub, nil - - return nil, nil } func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) { @@ -421,7 +419,7 @@ func (m *memorySubscriber) Unsubscribe(ctx context.Context) error { } // NewBroker return new memory broker -func NewBroker(opts ...Option) Broker { +func NewBroker(opts ...Option) BatchBroker { return &memoryBroker{ opts: NewOptions(opts...), subscribers: make(map[string][]*memorySubscriber),