allow to publish message via broker on noop client

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2020-10-09 13:43:04 +03:00
parent 24be220f91
commit 62bfe9c06e

View File

@ -2,18 +2,24 @@ package client
import (
"context"
"sync/atomic"
raw "github.com/unistack-org/micro-codec-bytes"
"github.com/unistack-org/micro/v3/broker"
"github.com/unistack-org/micro/v3/codec"
"github.com/unistack-org/micro/v3/errors"
"github.com/unistack-org/micro/v3/metadata"
)
type noopClient struct {
once atomic.Value
opts Options
}
type noopMessage struct {
topic string
payload interface{}
contentType string
opts MessageOptions
}
type noopRequest struct {
@ -110,7 +116,7 @@ func (n *noopMessage) Payload() interface{} {
}
func (n *noopMessage) ContentType() string {
return n.contentType
return n.opts.ContentType
}
func (n *noopClient) Init(opts ...Option) error {
@ -137,14 +143,72 @@ func (n *noopClient) NewRequest(service, endpoint string, req interface{}, opts
}
func (n *noopClient) NewMessage(topic string, msg interface{}, opts ...MessageOption) Message {
return &noopMessage{}
options := MessageOptions{}
for _, o := range opts {
o(&options)
}
return &noopMessage{topic: topic, payload: msg, opts: options}
}
func (n *noopClient) Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) {
return &noopStream{}, nil
}
func (n *noopClient) Publish(ctx context.Context, msg Message, opts ...PublishOption) error {
func (n *noopClient) Publish(ctx context.Context, p Message, opts ...PublishOption) error {
var options PublishOptions
var body []byte
// fail early on connect error
if !n.once.Load().(bool) {
if err := n.opts.Broker.Connect(); err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
n.once.Store(true)
}
for _, o := range opts {
o(&options)
}
md, ok := metadata.FromContext(ctx)
if !ok {
md = make(map[string]string)
}
md["Content-Type"] = p.ContentType()
md["Micro-Topic"] = p.Topic()
// passed in raw data
if d, ok := p.Payload().(*raw.Frame); ok {
body = d.Data
} else {
/*
// use codec for payload
cf, err := n.opts.Codecs[p.ContentType()]
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
*/
// set the body
b, err := n.opts.Broker.Options().Codec.Marshal(p.Payload())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
body = b
}
topic := p.Topic()
// get the exchange
if len(options.Exchange) > 0 {
topic = options.Exchange
}
return n.opts.Broker.Publish(topic, &broker.Message{
Header: md,
Body: body,
}, broker.PublishContext(options.Context))
return nil
}