From ed580204a822dfdabc4e36d79fa66d50ba1380fd Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Wed, 2 Jan 2019 12:55:06 +0000 Subject: [PATCH] Add grpc codec --- client/rpc_codec.go | 4 +- codec/grpc/grpc.go | 119 ++++++++++++++++++++++++++++++++++++ codec/grpc/util.go | 70 +++++++++++++++++++++ server/rpc_codec.go | 8 ++- transport/http_transport.go | 6 ++ 5 files changed, 203 insertions(+), 4 deletions(-) create mode 100644 codec/grpc/grpc.go create mode 100644 codec/grpc/util.go diff --git a/client/rpc_codec.go b/client/rpc_codec.go index f5c9401f..67959b6a 100644 --- a/client/rpc_codec.go +++ b/client/rpc_codec.go @@ -6,8 +6,8 @@ import ( "github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec/json" - "github.com/micro/go-micro/codec/proto" "github.com/micro/go-micro/codec/jsonrpc" + "github.com/micro/go-micro/codec/proto" "github.com/micro/go-micro/codec/protorpc" "github.com/micro/go-micro/errors" "github.com/micro/go-micro/transport" @@ -113,7 +113,7 @@ func (c *rpcCodec) WriteRequest(req *request, body interface{}) error { Type: codec.Request, Header: map[string]string{ "X-Micro-Service": req.Service, - "X-Micro-Method": req.ServiceMethod, + "X-Micro-Method": req.ServiceMethod, }, } if err := c.codec.Write(m, body); err != nil { diff --git a/codec/grpc/grpc.go b/codec/grpc/grpc.go new file mode 100644 index 00000000..f3703840 --- /dev/null +++ b/codec/grpc/grpc.go @@ -0,0 +1,119 @@ +// Package grpc provides a grpc codec +package grpc + +import ( + "encoding/json" + "errors" + "io" + "strings" + + "github.com/golang/protobuf/proto" + "github.com/micro/go-micro/codec" +) + +type Codec struct { + Conn io.ReadWriteCloser + ContentType string +} + +func (c *Codec) ReadHeader(m *codec.Message, t codec.MessageType) error { + if ct := m.Header["Content-Type"]; len(ct) > 0 { + c.ContentType = ct + } + + if ct := m.Header["content-type"]; len(ct) > 0 { + c.ContentType = ct + } + + // service method + path := m.Header[":path"] + if len(path) == 0 || path[0] != '/' { + m.Target = m.Header["X-Micro-Service"] + m.Method = m.Header["X-Micro-Method"] + } else { + // [ , a.package.Foo, Bar] + parts := strings.Split(path, "/") + if len(parts) != 3 { + return errors.New("Unknown request path") + } + service := strings.Split(parts[1], ".") + m.Method = strings.Join([]string{service[len(service)-1], parts[2]}, ".") + m.Target = strings.Join(service[:len(service)-1], ".") + } + + return nil +} + +func (c *Codec) ReadBody(b interface{}) error { + // no body + if b == nil { + return nil + } + + _, buf, err := decode(c.Conn) + if err != nil { + return err + } + + switch c.ContentType { + case "application/grpc+json": + return json.Unmarshal(buf, b) + case "application/grpc+proto", "application/grpc": + return proto.Unmarshal(buf, b.(proto.Message)) + } + + return errors.New("Unsupported Content-Type") +} + +func (c *Codec) Write(m *codec.Message, b interface{}) error { + var buf []byte + var err error + + if ct := m.Header["Content-Type"]; len(ct) > 0 { + c.ContentType = ct + } + + if ct := m.Header["content-type"]; len(ct) > 0 { + c.ContentType = ct + } + + m.Header["Trailer"] = "grpc-status, grpc-message" + + switch c.ContentType { + case "application/grpc+json": + buf, err = json.Marshal(b) + case "application/grpc+proto", "application/grpc": + pb, ok := b.(proto.Message) + if ok { + buf, err = proto.Marshal(pb) + } + default: + err = errors.New("Unsupported Content-Type") + } + + if err != nil { + m.Header["grpc-status"] = "8" + m.Header["grpc-message"] = err.Error() + return err + } + + m.Header["grpc-status"] = "0" + m.Header["grpc-message"] = "" + + return encode(0, buf, c.Conn) +} + +func (c *Codec) Close() error { + return c.Conn.Close() +} + +func (c *Codec) String() string { + return "grpc" +} + +func NewCodec(c io.ReadWriteCloser) codec.Codec { + return &Codec{ + Conn: c, + ContentType: "application/grpc", + } +} diff --git a/codec/grpc/util.go b/codec/grpc/util.go new file mode 100644 index 00000000..04c5ee38 --- /dev/null +++ b/codec/grpc/util.go @@ -0,0 +1,70 @@ +package grpc + +import ( + "encoding/binary" + "fmt" + "io" +) + +var ( + maxMessageSize = 1024 * 1024 * 4 + maxInt = int(^uint(0) >> 1) +) + +func decode(r io.Reader) (uint8, []byte, error) { + header := make([]byte, 5) + + // read the header + if _, err := r.Read(header[:]); err != nil { + return uint8(0), nil, err + } + + // get encoding format e.g compressed + cf := uint8(header[0]) + + // get message length + length := binary.BigEndian.Uint32(header[1:]) + + // no encoding format + if length == 0 { + return cf, nil, nil + } + + // + if int64(length) > int64(maxInt) { + return cf, nil, fmt.Errorf("grpc: received message larger than max length allowed on current machine (%d vs. %d)", length, maxInt) + } + if int(length) > maxMessageSize { + return cf, nil, fmt.Errorf("grpc: received message larger than max (%d vs. %d)", length, maxMessageSize) + } + + msg := make([]byte, int(length)) + + if _, err := r.Read(msg); err != nil { + if err == io.EOF { + err = io.ErrUnexpectedEOF + } + return cf, nil, err + } + + return cf, msg, nil +} + +func encode(cf uint8, buf []byte, w io.Writer) error { + header := make([]byte, 5) + + // set compression + header[0] = byte(cf) + + // write length as header + binary.BigEndian.PutUint32(header[1:], uint32(len(buf))) + + // read the header + if _, err := w.Write(header[:]); err != nil { + return err + } + + // write the buffer + _, err := w.Write(buf) + return err +} diff --git a/server/rpc_codec.go b/server/rpc_codec.go index fee01940..f19b2782 100644 --- a/server/rpc_codec.go +++ b/server/rpc_codec.go @@ -4,9 +4,10 @@ import ( "bytes" "github.com/micro/go-micro/codec" - "github.com/micro/go-micro/codec/json" - "github.com/micro/go-micro/codec/proto" + "github.com/micro/go-micro/codec/grpc" + "github.com/micro/go-micro/codec/json" "github.com/micro/go-micro/codec/jsonrpc" + "github.com/micro/go-micro/codec/proto" "github.com/micro/go-micro/codec/protorpc" "github.com/micro/go-micro/transport" "github.com/pkg/errors" @@ -27,6 +28,9 @@ type readWriteCloser struct { var ( defaultCodecs = map[string]codec.NewCodec{ + "application/grpc": grpc.NewCodec, + "application/grpc+json": grpc.NewCodec, + "application/grpc+proto": grpc.NewCodec, "application/json": json.NewCodec, "application/json-rpc": jsonrpc.NewCodec, "application/protobuf": proto.NewCodec, diff --git a/transport/http_transport.go b/transport/http_transport.go index 653b74b9..206bdec5 100644 --- a/transport/http_transport.go +++ b/transport/http_transport.go @@ -245,6 +245,9 @@ func (h *httpTransportSocket) Recv(m *Message) error { } } + // set path + m.Header[":path"] = h.r.URL.Path + // return early early return nil } @@ -276,6 +279,9 @@ func (h *httpTransportSocket) Recv(m *Message) error { } } + // set path + m.Header[":path"] = h.r.URL.Path + return nil }