client: improve option naming, add BatchPublish to noop client

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2021-07-24 16:14:42 +03:00
parent 8fd8bdcb39
commit 9f3957d101
3 changed files with 69 additions and 32 deletions

@ -40,6 +40,7 @@ type Client interface {
Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error
Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error)
Publish(ctx context.Context, msg Message, opts ...PublishOption) error
BatchPublish(ctx context.Context, msg []Message, opts ...PublishOption) error
String() string
}

@ -181,17 +181,36 @@ func (n *noopClient) Stream(ctx context.Context, req Request, opts ...CallOption
return &noopStream{}, nil
}
func (n *noopClient) Publish(ctx context.Context, p Message, opts ...PublishOption) error {
var body []byte
func (n *noopClient) BatchPublish(ctx context.Context, ps []Message, opts ...PublishOption) error {
return n.publish(ctx, ps, opts...)
}
func (n *noopClient) Publish(ctx context.Context, p Message, opts ...PublishOption) error {
return n.publish(ctx, []Message{p}, opts...)
}
func (n *noopClient) publish(ctx context.Context, ps []Message, opts ...PublishOption) error {
options := NewPublishOptions(opts...)
msgs := make([]*broker.Message, 0, len(ps))
for _, p := range ps {
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.New(0)
}
md[metadata.HeaderContentType] = p.ContentType()
md[metadata.HeaderTopic] = p.Topic()
topic := p.Topic()
// get the exchange
if len(options.Exchange) > 0 {
topic = options.Exchange
}
md[metadata.HeaderTopic] = topic
var body []byte
// passed in raw data
if d, ok := p.Payload().(*codec.Frame); ok {
@ -211,17 +230,10 @@ func (n *noopClient) Publish(ctx context.Context, p Message, opts ...PublishOpti
body = b
}
topic := p.Topic()
// get the exchange
if len(options.Exchange) > 0 {
topic = options.Exchange
msgs = append(msgs, &broker.Message{Header: md, Body: body})
}
return n.opts.Broker.Publish(ctx, topic, &broker.Message{
Header: md,
Body: body,
},
return n.opts.Broker.BatchPublish(ctx, msgs,
broker.PublishContext(options.Context),
broker.PublishBodyOnly(options.BodyOnly),
)

@ -373,19 +373,35 @@ func DialTimeout(d time.Duration) Option {
}
// WithExchange sets the exchange to route a message through
// DEPRECATED
func WithExchange(e string) PublishOption {
return func(o *PublishOptions) {
o.Exchange = e
}
}
// PublishExchange sets the exchange to route a message through
func PublishExchange(e string) PublishOption {
return func(o *PublishOptions) {
o.Exchange = e
}
}
// WithBodyOnly publish only message body
// DERECATED
func WithBodyOnly(b bool) PublishOption {
return func(o *PublishOptions) {
o.BodyOnly = b
}
}
// PublishBodyOnly publish only message body
func PublishBodyOnly(b bool) PublishOption {
return func(o *PublishOptions) {
o.BodyOnly = b
}
}
// PublishContext sets the context in publish options
func PublishContext(ctx context.Context) PublishOption {
return func(o *PublishOptions) {
@ -498,12 +514,20 @@ func WithSelectOptions(sops ...selector.SelectOption) CallOption {
}
// WithMessageContentType sets the message content type
// DEPRECATED
func WithMessageContentType(ct string) MessageOption {
return func(o *MessageOptions) {
o.ContentType = ct
}
}
// MessageContentType sets the message content type
func MessageContentType(ct string) MessageOption {
return func(o *MessageOptions) {
o.ContentType = ct
}
}
// StreamingRequest specifies that request is streaming
func StreamingRequest(b bool) RequestOption {
return func(o *RequestOptions) {