From 62bfe9c06e038b7de0da1cb81265a8eb267a9c68 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Fri, 9 Oct 2020 13:43:04 +0300 Subject: [PATCH] allow to publish message via broker on noop client Signed-off-by: Vasiliy Tolstov --- client/noop.go | 76 ++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 70 insertions(+), 6 deletions(-) diff --git a/client/noop.go b/client/noop.go index deb43b55..ba3c811f 100644 --- a/client/noop.go +++ b/client/noop.go @@ -2,18 +2,24 @@ package client import ( "context" + "sync/atomic" + raw "github.com/unistack-org/micro-codec-bytes" + "github.com/unistack-org/micro/v3/broker" "github.com/unistack-org/micro/v3/codec" + "github.com/unistack-org/micro/v3/errors" + "github.com/unistack-org/micro/v3/metadata" ) type noopClient struct { + once atomic.Value opts Options } type noopMessage struct { - topic string - payload interface{} - contentType string + topic string + payload interface{} + opts MessageOptions } type noopRequest struct { @@ -110,7 +116,7 @@ func (n *noopMessage) Payload() interface{} { } func (n *noopMessage) ContentType() string { - return n.contentType + return n.opts.ContentType } func (n *noopClient) Init(opts ...Option) error { @@ -137,14 +143,72 @@ func (n *noopClient) NewRequest(service, endpoint string, req interface{}, opts } func (n *noopClient) NewMessage(topic string, msg interface{}, opts ...MessageOption) Message { - return &noopMessage{} + options := MessageOptions{} + for _, o := range opts { + o(&options) + } + + return &noopMessage{topic: topic, payload: msg, opts: options} } func (n *noopClient) Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) { return &noopStream{}, nil } -func (n *noopClient) Publish(ctx context.Context, msg Message, opts ...PublishOption) error { +func (n *noopClient) Publish(ctx context.Context, p Message, opts ...PublishOption) error { + var options PublishOptions + var body []byte + + // fail early on connect error + if !n.once.Load().(bool) { + if err := n.opts.Broker.Connect(); err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + n.once.Store(true) + } + + for _, o := range opts { + o(&options) + } + + md, ok := metadata.FromContext(ctx) + if !ok { + md = make(map[string]string) + } + md["Content-Type"] = p.ContentType() + md["Micro-Topic"] = p.Topic() + + // passed in raw data + if d, ok := p.Payload().(*raw.Frame); ok { + body = d.Data + } else { + /* + // use codec for payload + cf, err := n.opts.Codecs[p.ContentType()] + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + */ + // set the body + b, err := n.opts.Broker.Options().Codec.Marshal(p.Payload()) + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + body = b + } + + topic := p.Topic() + + // get the exchange + if len(options.Exchange) > 0 { + topic = options.Exchange + } + + return n.opts.Broker.Publish(topic, &broker.Message{ + Header: md, + Body: body, + }, broker.PublishContext(options.Context)) + return nil }