From 9f3957d101ff9d256bb9ac15675018fbcd478fe7 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sat, 24 Jul 2021 16:14:42 +0300 Subject: [PATCH] client: improve option naming, add BatchPublish to noop client Signed-off-by: Vasiliy Tolstov --- client/client.go | 1 + client/noop.go | 76 +++++++++++++++++++++++++++-------------------- client/options.go | 24 +++++++++++++++ 3 files changed, 69 insertions(+), 32 deletions(-) diff --git a/client/client.go b/client/client.go index 16f049c3..dfb5abad 100644 --- a/client/client.go +++ b/client/client.go @@ -40,6 +40,7 @@ type Client interface { Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) Publish(ctx context.Context, msg Message, opts ...PublishOption) error + BatchPublish(ctx context.Context, msg []Message, opts ...PublishOption) error String() string } diff --git a/client/noop.go b/client/noop.go index b6ae8fbb..ec81a860 100644 --- a/client/noop.go +++ b/client/noop.go @@ -181,47 +181,59 @@ func (n *noopClient) Stream(ctx context.Context, req Request, opts ...CallOption return &noopStream{}, nil } -func (n *noopClient) Publish(ctx context.Context, p Message, opts ...PublishOption) error { - var body []byte +func (n *noopClient) BatchPublish(ctx context.Context, ps []Message, opts ...PublishOption) error { + return n.publish(ctx, ps, opts...) +} +func (n *noopClient) Publish(ctx context.Context, p Message, opts ...PublishOption) error { + return n.publish(ctx, []Message{p}, opts...) +} + +func (n *noopClient) publish(ctx context.Context, ps []Message, opts ...PublishOption) error { options := NewPublishOptions(opts...) - md, ok := metadata.FromOutgoingContext(ctx) - if !ok { - md = metadata.New(0) - } - md[metadata.HeaderContentType] = p.ContentType() - md[metadata.HeaderTopic] = p.Topic() + msgs := make([]*broker.Message, 0, len(ps)) - // passed in raw data - if d, ok := p.Payload().(*codec.Frame); ok { - body = d.Data - } else { - // use codec for payload - cf, err := n.newCodec(p.ContentType()) - if err != nil { - return errors.InternalServerError("go.micro.client", err.Error()) + for _, p := range ps { + md, ok := metadata.FromOutgoingContext(ctx) + if !ok { + md = metadata.New(0) + } + md[metadata.HeaderContentType] = p.ContentType() + + topic := p.Topic() + + // get the exchange + if len(options.Exchange) > 0 { + topic = options.Exchange } - // set the body - b, err := cf.Marshal(p.Payload()) - if err != nil { - return errors.InternalServerError("go.micro.client", err.Error()) + md[metadata.HeaderTopic] = topic + + var body []byte + + // passed in raw data + if d, ok := p.Payload().(*codec.Frame); ok { + body = d.Data + } else { + // use codec for payload + cf, err := n.newCodec(p.ContentType()) + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + + // set the body + b, err := cf.Marshal(p.Payload()) + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + body = b } - body = b + + msgs = append(msgs, &broker.Message{Header: md, Body: body}) } - topic := p.Topic() - - // get the exchange - if len(options.Exchange) > 0 { - topic = options.Exchange - } - - return n.opts.Broker.Publish(ctx, topic, &broker.Message{ - Header: md, - Body: body, - }, + return n.opts.Broker.BatchPublish(ctx, msgs, broker.PublishContext(options.Context), broker.PublishBodyOnly(options.BodyOnly), ) diff --git a/client/options.go b/client/options.go index 65914545..47c2a463 100644 --- a/client/options.go +++ b/client/options.go @@ -373,19 +373,35 @@ func DialTimeout(d time.Duration) Option { } // WithExchange sets the exchange to route a message through +// DEPRECATED func WithExchange(e string) PublishOption { return func(o *PublishOptions) { o.Exchange = e } } +// PublishExchange sets the exchange to route a message through +func PublishExchange(e string) PublishOption { + return func(o *PublishOptions) { + o.Exchange = e + } +} + // WithBodyOnly publish only message body +// DERECATED func WithBodyOnly(b bool) PublishOption { return func(o *PublishOptions) { o.BodyOnly = b } } +// PublishBodyOnly publish only message body +func PublishBodyOnly(b bool) PublishOption { + return func(o *PublishOptions) { + o.BodyOnly = b + } +} + // PublishContext sets the context in publish options func PublishContext(ctx context.Context) PublishOption { return func(o *PublishOptions) { @@ -498,12 +514,20 @@ func WithSelectOptions(sops ...selector.SelectOption) CallOption { } // WithMessageContentType sets the message content type +// DEPRECATED func WithMessageContentType(ct string) MessageOption { return func(o *MessageOptions) { o.ContentType = ct } } +// MessageContentType sets the message content type +func MessageContentType(ct string) MessageOption { + return func(o *MessageOptions) { + o.ContentType = ct + } +} + // StreamingRequest specifies that request is streaming func StreamingRequest(b bool) RequestOption { return func(o *RequestOptions) {