client/grpc: dont use codec for raw bytes payload (#1847)

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2020-07-16 15:35:06 +03:00 committed by GitHub
parent 7d41c2224e
commit 3627e47f04
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -618,6 +618,16 @@ func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...cli
func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
var options client.PublishOptions var options client.PublishOptions
var body []byte
// fail early on connect error
if !g.once.Load().(bool) {
if err := g.opts.Broker.Connect(); err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
g.once.Store(true)
}
for _, o := range opts { for _, o := range opts {
o(&options) o(&options)
} }
@ -629,17 +639,15 @@ func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...clie
md["Content-Type"] = p.ContentType() md["Content-Type"] = p.ContentType()
md["Micro-Topic"] = p.Topic() md["Micro-Topic"] = p.Topic()
cf, err := g.newGRPCCodec(p.ContentType())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
var body []byte
// passed in raw data // passed in raw data
if d, ok := p.Payload().(*raw.Frame); ok { if d, ok := p.Payload().(*raw.Frame); ok {
body = d.Data body = d.Data
} else { } else {
// use codec for payload
cf, err := g.newGRPCCodec(p.ContentType())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
// set the body // set the body
b, err := cf.Marshal(p.Payload()) b, err := cf.Marshal(p.Payload())
if err != nil { if err != nil {
@ -648,13 +656,6 @@ func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...clie
body = b body = b
} }
if !g.once.Load().(bool) {
if err = g.opts.Broker.Connect(); err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
g.once.Store(true)
}
topic := p.Topic() topic := p.Topic()
// get the exchange // get the exchange