Merge pull request #679 from micro/grpc-codec
Force grpc client/server to use grpc codec for broker
This commit is contained in:
		| @@ -18,7 +18,6 @@ import ( | |||||||
| 	"github.com/micro/go-micro/registry" | 	"github.com/micro/go-micro/registry" | ||||||
| 	"github.com/micro/go-micro/transport" | 	"github.com/micro/go-micro/transport" | ||||||
|  |  | ||||||
| 	"github.com/micro/go-micro/util/buf" |  | ||||||
| 	"google.golang.org/grpc" | 	"google.golang.org/grpc" | ||||||
| 	"google.golang.org/grpc/credentials" | 	"google.golang.org/grpc/credentials" | ||||||
| 	"google.golang.org/grpc/encoding" | 	"google.golang.org/grpc/encoding" | ||||||
| @@ -491,14 +490,13 @@ func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...clie | |||||||
| 	} | 	} | ||||||
| 	md["Content-Type"] = p.ContentType() | 	md["Content-Type"] = p.ContentType() | ||||||
|  |  | ||||||
| 	cf, err := g.newCodec(p.ContentType()) | 	cf, err := g.newGRPCCodec(p.ContentType()) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return errors.InternalServerError("go.micro.client", err.Error()) | 		return errors.InternalServerError("go.micro.client", err.Error()) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	b := buf.New(nil) | 	b, err := cf.Marshal(p.Payload()) | ||||||
|  | 	if err != nil { | ||||||
| 	if err := cf(b).Write(&codec.Message{Type: codec.Event}, p.Payload()); err != nil { |  | ||||||
| 		return errors.InternalServerError("go.micro.client", err.Error()) | 		return errors.InternalServerError("go.micro.client", err.Error()) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| @@ -508,7 +506,7 @@ func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...clie | |||||||
|  |  | ||||||
| 	return g.opts.Broker.Publish(p.Topic(), &broker.Message{ | 	return g.opts.Broker.Publish(p.Topic(), &broker.Message{ | ||||||
| 		Header: md, | 		Header: md, | ||||||
| 		Body:   b.Bytes(), | 		Body:   b, | ||||||
| 	}) | 	}) | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -1,18 +1,15 @@ | |||||||
| package grpc | package grpc | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
| 	"bytes" |  | ||||||
| 	"context" | 	"context" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"reflect" | 	"reflect" | ||||||
| 	"strings" | 	"strings" | ||||||
|  |  | ||||||
| 	"github.com/micro/go-micro/broker" | 	"github.com/micro/go-micro/broker" | ||||||
| 	"github.com/micro/go-micro/codec" |  | ||||||
| 	"github.com/micro/go-micro/metadata" | 	"github.com/micro/go-micro/metadata" | ||||||
| 	"github.com/micro/go-micro/registry" | 	"github.com/micro/go-micro/registry" | ||||||
| 	"github.com/micro/go-micro/server" | 	"github.com/micro/go-micro/server" | ||||||
| 	"github.com/micro/go-micro/util/buf" |  | ||||||
| ) | ) | ||||||
|  |  | ||||||
| const ( | const ( | ||||||
| @@ -175,7 +172,7 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke | |||||||
| 			msg.Header["Content-Type"] = defaultContentType | 			msg.Header["Content-Type"] = defaultContentType | ||||||
| 			ct = defaultContentType | 			ct = defaultContentType | ||||||
| 		} | 		} | ||||||
| 		cf, err := g.newCodec(ct) | 		cf, err := g.newGRPCCodec(ct) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return err | 			return err | ||||||
| 		} | 		} | ||||||
| @@ -205,15 +202,7 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke | |||||||
| 				req = req.Elem() | 				req = req.Elem() | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
| 			b := buf.New(bytes.NewBuffer(msg.Body)) | 			if err := cf.Unmarshal(msg.Body, req.Interface()); err != nil { | ||||||
| 			co := cf(b) |  | ||||||
| 			defer co.Close() |  | ||||||
|  |  | ||||||
| 			if err := co.ReadHeader(&codec.Message{}, codec.Event); err != nil { |  | ||||||
| 				return err |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			if err := co.ReadBody(req.Interface()); err != nil { |  | ||||||
| 				return err | 				return err | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user