broker: improve option naming, move BatchBroker to Broker interface

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2021-07-24 16:16:18 +03:00
parent 9f3957d101
commit 5c9b3dae33
3 changed files with 66 additions and 52 deletions

View File

@ -18,14 +18,6 @@ var (
ErrDisconnected = errors.New("broker disconnected") ErrDisconnected = errors.New("broker disconnected")
) )
type BatchBroker interface {
Broker
// BatchPublish messages to broker with multiple topics
BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error
// BatchSubscribe subscribes to topic messages via handler
BatchSubscribe(ctx context.Context, topic string, h BatchHandler, opts ...SubscribeOption) (Subscriber, error)
}
// Broker is an interface used for asynchronous messaging. // Broker is an interface used for asynchronous messaging.
type Broker interface { type Broker interface {
// Name returns broker instance name // Name returns broker instance name
@ -44,6 +36,10 @@ type Broker interface {
Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error
// Subscribe subscribes to topic message via handler // Subscribe subscribes to topic message via handler
Subscribe(ctx context.Context, topic string, h Handler, opts ...SubscribeOption) (Subscriber, error) Subscribe(ctx context.Context, topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
// BatchPublish messages to broker with multiple topics
BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error
// BatchSubscribe subscribes to topic messages via handler
BatchSubscribe(ctx context.Context, topic string, h BatchHandler, opts ...SubscribeOption) (Subscriber, error)
// String type of broker // String type of broker
String() string String() string
} }

View File

@ -96,8 +96,9 @@ func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *Message,
} }
m.RUnlock() m.RUnlock()
options := NewPublishOptions(opts...)
vs := make([]msgWrapper, 0, 1) vs := make([]msgWrapper, 0, 1)
if m.opts.Codec == nil { if m.opts.Codec == nil || options.BodyOnly {
topic, _ := msg.Header.Get(metadata.HeaderTopic) topic, _ := msg.Header.Get(metadata.HeaderTopic)
vs = append(vs, msgWrapper{topic: topic, body: msg}) vs = append(vs, msgWrapper{topic: topic, body: msg})
} else { } else {
@ -125,8 +126,9 @@ func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts .
} }
m.RUnlock() m.RUnlock()
options := NewPublishOptions(opts...)
vs := make([]msgWrapper, 0, len(msgs)) vs := make([]msgWrapper, 0, len(msgs))
if m.opts.Codec == nil { if m.opts.Codec == nil || options.BodyOnly {
for _, msg := range msgs { for _, msg := range msgs {
topic, _ := msg.Header.Get(metadata.HeaderTopic) topic, _ := msg.Header.Get(metadata.HeaderTopic)
vs = append(vs, msgWrapper{topic: topic, body: msg}) vs = append(vs, msgWrapper{topic: topic, body: msg})
@ -360,7 +362,7 @@ func (m *memorySubscriber) Unsubscribe(ctx context.Context) error {
} }
// NewBroker return new memory broker // NewBroker return new memory broker
func NewBroker(opts ...Option) BatchBroker { func NewBroker(opts ...Option) Broker {
return &memoryBroker{ return &memoryBroker{
opts: NewOptions(opts...), opts: NewOptions(opts...),
subscribers: make(map[string][]*memorySubscriber), subscribers: make(map[string][]*memorySubscriber),

View File

@ -3,6 +3,7 @@ package broker
import ( import (
"context" "context"
"crypto/tls" "crypto/tls"
"time"
"github.com/unistack-org/micro/v3/codec" "github.com/unistack-org/micro/v3/codec"
"github.com/unistack-org/micro/v3/logger" "github.com/unistack-org/micro/v3/logger"
@ -73,11 +74,9 @@ func NewPublishOptions(opts ...PublishOption) PublishOptions {
options := PublishOptions{ options := PublishOptions{
Context: context.Background(), Context: context.Background(),
} }
for _, o := range opts { for _, o := range opts {
o(&options) o(&options)
} }
return options return options
} }
@ -95,6 +94,10 @@ type SubscribeOptions struct {
AutoAck bool AutoAck bool
// BodyOnly flag specifies that message contains only body bytes without header // BodyOnly flag specifies that message contains only body bytes without header
BodyOnly bool BodyOnly bool
// BatchSize flag specifies max batch size
BatchSize int
// BatchWait flag specifies max wait time for batch filling
BatchWait time.Duration
} }
// Option func // Option func
@ -117,23 +120,6 @@ func PublishContext(ctx context.Context) PublishOption {
} }
} }
// SubscribeOption func
type SubscribeOption func(*SubscribeOptions)
// NewSubscribeOptions creates new SubscribeOptions
func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions {
options := SubscribeOptions{
AutoAck: true,
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
return options
}
// Addrs sets the host addresses to be used by the broker // Addrs sets the host addresses to be used by the broker
func Addrs(addrs ...string) Option { func Addrs(addrs ...string) Option {
return func(o *Options) { return func(o *Options) {
@ -149,28 +135,6 @@ func Codec(c codec.Codec) Option {
} }
} }
// DisableAutoAck disables auto ack
func DisableAutoAck() SubscribeOption {
return func(o *SubscribeOptions) {
o.AutoAck = false
}
}
// SubscribeAutoAck will disable auto acking of messages
// after they have been handled.
func SubscribeAutoAck(b bool) SubscribeOption {
return func(o *SubscribeOptions) {
o.AutoAck = b
}
}
// SubscribeBodyOnly consumes only body of the message
func SubscribeBodyOnly(b bool) SubscribeOption {
return func(o *SubscribeOptions) {
o.BodyOnly = b
}
}
// ErrorHandler will catch all broker errors that cant be handled // ErrorHandler will catch all broker errors that cant be handled
// in normal way, for example Codec errors // in normal way, for example Codec errors
func ErrorHandler(h Handler) Option { func ErrorHandler(h Handler) Option {
@ -266,3 +230,55 @@ func SubscribeContext(ctx context.Context) SubscribeOption {
o.Context = ctx o.Context = ctx
} }
} }
// DisableAutoAck disables auto ack
// DEPRECATED
func DisableAutoAck() SubscribeOption {
return func(o *SubscribeOptions) {
o.AutoAck = false
}
}
// SubscribeAutoAck contol auto acking of messages
// after they have been handled.
func SubscribeAutoAck(b bool) SubscribeOption {
return func(o *SubscribeOptions) {
o.AutoAck = b
}
}
// SubscribeBodyOnly consumes only body of the message
func SubscribeBodyOnly(b bool) SubscribeOption {
return func(o *SubscribeOptions) {
o.BodyOnly = b
}
}
// SubscribeBatchSize specifies max batch size
func SubscribeBatchSize(n int) SubscribeOption {
return func(o *SubscribeOptions) {
o.BatchSize = n
}
}
// SubscribeBatchWait specifies max batch wait time
func SubscribeBatchWait(td time.Duration) SubscribeOption {
return func(o *SubscribeOptions) {
o.BatchWait = td
}
}
// SubscribeOption func
type SubscribeOption func(*SubscribeOptions)
// NewSubscribeOptions creates new SubscribeOptions
func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions {
options := SubscribeOptions{
AutoAck: true,
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
return options
}