diff --git a/client/grpc/codec.go b/client/grpc/codec.go index 4994db70..b5e2a7bc 100644 --- a/client/grpc/codec.go +++ b/client/grpc/codec.go @@ -2,13 +2,16 @@ package grpc import ( "fmt" + "strings" "github.com/golang/protobuf/proto" "github.com/json-iterator/go" "github.com/micro/go-micro/codec" + "github.com/micro/go-micro/codec/bytes" "github.com/micro/go-micro/codec/jsonrpc" "github.com/micro/go-micro/codec/protorpc" "google.golang.org/grpc/encoding" + "google.golang.org/grpc" ) type jsonCodec struct{} @@ -52,7 +55,28 @@ func (w wrapCodec) String() string { return w.Codec.Name() } +func (w wrapCodec) Marshal(v interface{}) ([]byte, error) { + b, ok := v.(*bytes.Frame) + if ok { + return b.Data, nil + } + return w.Codec.Marshal(v) +} + +func (w wrapCodec) Unmarshal(data []byte, v interface{}) error { + b, ok := v.(*bytes.Frame) + if ok { + b.Data = data + return nil + } + return w.Codec.Unmarshal(data, v) +} + func (protoCodec) Marshal(v interface{}) ([]byte, error) { + b, ok := v.(*bytes.Frame) + if ok { + return b.Data, nil + } return proto.Marshal(v.(proto.Message)) } @@ -96,3 +120,68 @@ func (jsonCodec) Unmarshal(data []byte, v interface{}) error { func (jsonCodec) Name() string { return "json" } + +type grpcCodec struct { + // headers + id string + target string + method string + endpoint string + + s grpc.ClientStream + c encoding.Codec +} + +func (g *grpcCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error { + md, err := g.s.Header() + if err != nil { + return err + } + if m == nil { + m = new(codec.Message) + } + if m.Header == nil { + m.Header = make(map[string]string) + } + for k, v := range md { + m.Header[k] = strings.Join(v, ",") + } + m.Id = g.id + m.Target = g.target + m.Method = g.method + m.Endpoint = g.endpoint + return nil +} + +func (g *grpcCodec) ReadBody(v interface{}) error { + frame := &bytes.Frame{} + if err := g.s.RecvMsg(frame); err != nil { + return err + } + return g.c.Unmarshal(frame.Data, v) +} + +func (g *grpcCodec) Write(m *codec.Message, v interface{}) error { + // if we don't have a body + if len(m.Body) == 0 { + b, err := g.c.Marshal(v) + if err != nil { + return err + } + m.Body = b + } + + // create an encoded frame + frame := &bytes.Frame{m.Body} + // write the body using the framing codec + return g.s.SendMsg(frame) +} + +func (g *grpcCodec) Close() error { + return g.s.CloseSend() +} + +func (g *grpcCodec) String() string { + return g.c.Name() +} + diff --git a/client/grpc/grpc.go b/client/grpc/grpc.go index 9ed206c6..ebdbed5a 100644 --- a/client/grpc/grpc.go +++ b/client/grpc/grpc.go @@ -177,7 +177,10 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client dialCtx, cancel = context.WithCancel(ctx) } defer cancel() - cc, err := grpc.DialContext(dialCtx, address, grpc.WithDefaultCallOptions(grpc.ForceCodec(cf)), g.secure()) + + wc := wrapCodec{cf} + + cc, err := grpc.DialContext(dialCtx, address, grpc.WithDefaultCallOptions(grpc.ForceCodec(wc)), g.secure()) if err != nil { return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) } @@ -193,6 +196,14 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err)) } + // set request codec + if r, ok := req.(*grpcRequest); ok { + r.codec = &grpcCodec{ + s: st, + c: wc, + } + } + rsp := &response{ conn: cc, stream: st, diff --git a/server/grpc/codec.go b/server/grpc/codec.go index 50a96ef4..681a104f 100644 --- a/server/grpc/codec.go +++ b/server/grpc/codec.go @@ -6,6 +6,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/micro/go-micro/codec" + "github.com/micro/go-micro/codec/bytes" "github.com/micro/go-micro/codec/jsonrpc" "github.com/micro/go-micro/codec/protorpc" "google.golang.org/grpc/encoding" @@ -14,6 +15,7 @@ import ( type jsonCodec struct{} type bytesCodec struct{} type protoCodec struct{} +type wrapCodec struct { encoding.Codec } var ( defaultGRPCCodecs = map[string]encoding.Codec{ @@ -36,6 +38,27 @@ var ( } ) +func (w wrapCodec) String() string { + return w.Codec.Name() +} + +func (w wrapCodec) Marshal(v interface{}) ([]byte, error) { + b, ok := v.(*bytes.Frame) + if ok { + return b.Data, nil + } + return w.Codec.Marshal(v) +} + +func (w wrapCodec) Unmarshal(data []byte, v interface{}) error { + b, ok := v.(*bytes.Frame) + if ok { + b.Data = data + return nil + } + return w.Codec.Unmarshal(data, v) +} + func (protoCodec) Marshal(v interface{}) ([]byte, error) { return proto.Marshal(v.(proto.Message)) } diff --git a/server/grpc/grpc.go b/server/grpc/grpc.go index bde2e4c6..1b330812 100644 --- a/server/grpc/grpc.go +++ b/server/grpc/grpc.go @@ -56,8 +56,8 @@ type grpcServer struct { } func init() { - encoding.RegisterCodec(jsonCodec{}) - encoding.RegisterCodec(bytesCodec{}) + encoding.RegisterCodec(wrapCodec{jsonCodec{}}) + encoding.RegisterCodec(wrapCodec{bytesCodec{}}) } func newGRPCServer(opts ...server.Option) server.Server {