From 3627e47f04ebc5e1f54535d465a0e6c187525103 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Thu, 16 Jul 2020 15:35:06 +0300 Subject: [PATCH] client/grpc: dont use codec for raw bytes payload (#1847) Signed-off-by: Vasiliy Tolstov --- client/grpc/grpc.go | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/client/grpc/grpc.go b/client/grpc/grpc.go index d1be62ba..aaaad0f3 100644 --- a/client/grpc/grpc.go +++ b/client/grpc/grpc.go @@ -618,6 +618,16 @@ func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...cli func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { var options client.PublishOptions + var body []byte + + // fail early on connect error + if !g.once.Load().(bool) { + if err := g.opts.Broker.Connect(); err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + g.once.Store(true) + } + for _, o := range opts { o(&options) } @@ -629,17 +639,15 @@ func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...clie md["Content-Type"] = p.ContentType() md["Micro-Topic"] = p.Topic() - cf, err := g.newGRPCCodec(p.ContentType()) - if err != nil { - return errors.InternalServerError("go.micro.client", err.Error()) - } - - var body []byte - // passed in raw data if d, ok := p.Payload().(*raw.Frame); ok { body = d.Data } else { + // use codec for payload + cf, err := g.newGRPCCodec(p.ContentType()) + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } // set the body b, err := cf.Marshal(p.Payload()) if err != nil { @@ -648,13 +656,6 @@ func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...clie body = b } - if !g.once.Load().(bool) { - if err = g.opts.Broker.Connect(); err != nil { - return errors.InternalServerError("go.micro.client", err.Error()) - } - g.once.Store(true) - } - topic := p.Topic() // get the exchange