From c44fd633010deec4728cdb8eafd60aec43709cf6 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Sun, 18 Aug 2019 11:28:21 +0100 Subject: [PATCH] Force grpc client/server to use grpc codec for broker --- client/grpc/grpc.go | 10 ++++------ server/grpc/subscriber.go | 15 ++------------- 2 files changed, 6 insertions(+), 19 deletions(-) diff --git a/client/grpc/grpc.go b/client/grpc/grpc.go index 9d33e017..439be459 100644 --- a/client/grpc/grpc.go +++ b/client/grpc/grpc.go @@ -18,7 +18,6 @@ import ( "github.com/micro/go-micro/registry" "github.com/micro/go-micro/transport" - "github.com/micro/go-micro/util/buf" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/encoding" @@ -491,14 +490,13 @@ func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...clie } md["Content-Type"] = p.ContentType() - cf, err := g.newCodec(p.ContentType()) + cf, err := g.newGRPCCodec(p.ContentType()) if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } - b := buf.New(nil) - - if err := cf(b).Write(&codec.Message{Type: codec.Event}, p.Payload()); err != nil { + b, err := cf.Marshal(p.Payload()) + if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } @@ -508,7 +506,7 @@ func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...clie return g.opts.Broker.Publish(p.Topic(), &broker.Message{ Header: md, - Body: b.Bytes(), + Body: b, }) } diff --git a/server/grpc/subscriber.go b/server/grpc/subscriber.go index 3bd4abef..1f885cab 100644 --- a/server/grpc/subscriber.go +++ b/server/grpc/subscriber.go @@ -1,18 +1,15 @@ package grpc import ( - "bytes" "context" "fmt" "reflect" "strings" "github.com/micro/go-micro/broker" - "github.com/micro/go-micro/codec" "github.com/micro/go-micro/metadata" "github.com/micro/go-micro/registry" "github.com/micro/go-micro/server" - "github.com/micro/go-micro/util/buf" ) const ( @@ -175,7 +172,7 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke msg.Header["Content-Type"] = defaultContentType ct = defaultContentType } - cf, err := g.newCodec(ct) + cf, err := g.newGRPCCodec(ct) if err != nil { return err } @@ -205,15 +202,7 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke req = req.Elem() } - b := buf.New(bytes.NewBuffer(msg.Body)) - co := cf(b) - defer co.Close() - - if err := co.ReadHeader(&codec.Message{}, codec.Event); err != nil { - return err - } - - if err := co.ReadBody(req.Interface()); err != nil { + if err := cf.Unmarshal(msg.Body, req.Interface()); err != nil { return err }