broker: add BatchBroker interface to avoid breaking older brokers

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2021-07-23 12:55:36 +03:00
parent d18429e024
commit e64269b2a8
2 changed files with 9 additions and 7 deletions

View File

@ -18,6 +18,14 @@ var (
ErrDisconnected = errors.New("broker disconnected")
)
type BatchBroker interface {
Broker
// BatchPublish messages to broker with multiple topics
BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error
// BatchSubscribe subscribes to topic messages via handler
BatchSubscribe(ctx context.Context, topic string, h BatchHandler, opts ...SubscribeOption) (Subscriber, error)
}
// Broker is an interface used for asynchronous messaging.
type Broker interface {
// Name returns broker instance name
@ -34,12 +42,8 @@ type Broker interface {
Disconnect(ctx context.Context) error
// Publish message to broker topic
Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error
// BatchPublish messages to broker with multiple topics
BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error
// Subscribe subscribes to topic message via handler
Subscribe(ctx context.Context, topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
// BatchSubscribe subscribes to topic messages via handler
BatchSubscribe(ctx context.Context, topic string, h BatchHandler, opts ...SubscribeOption) (Subscriber, error)
// String type of broker
String() string
}

View File

@ -303,8 +303,6 @@ func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler
}()
return sub, nil
return nil, nil
}
func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
@ -421,7 +419,7 @@ func (m *memorySubscriber) Unsubscribe(ctx context.Context) error {
}
// NewBroker return new memory broker
func NewBroker(opts ...Option) Broker {
func NewBroker(opts ...Option) BatchBroker {
return &memoryBroker{
opts: NewOptions(opts...),
subscribers: make(map[string][]*memorySubscriber),