diff --git a/client/rpc_codec.go b/client/rpc_codec.go index 95ff1da1..67959b6a 100644 --- a/client/rpc_codec.go +++ b/client/rpc_codec.go @@ -5,7 +5,9 @@ import ( errs "errors" "github.com/micro/go-micro/codec" + "github.com/micro/go-micro/codec/json" "github.com/micro/go-micro/codec/jsonrpc" + "github.com/micro/go-micro/codec/proto" "github.com/micro/go-micro/codec/protorpc" "github.com/micro/go-micro/errors" "github.com/micro/go-micro/transport" @@ -65,9 +67,9 @@ var ( defaultContentType = "application/octet-stream" defaultCodecs = map[string]codec.NewCodec{ - "application/json": jsonrpc.NewCodec, + "application/protobuf": proto.NewCodec, + "application/json": json.NewCodec, "application/json-rpc": jsonrpc.NewCodec, - "application/protobuf": protorpc.NewCodec, "application/proto-rpc": protorpc.NewCodec, "application/octet-stream": protorpc.NewCodec, } @@ -110,8 +112,8 @@ func (c *rpcCodec) WriteRequest(req *request, body interface{}) error { Method: req.ServiceMethod, Type: codec.Request, Header: map[string]string{ - "X-Micro-Target": req.Service, - "X-Micro-Method": req.ServiceMethod, + "X-Micro-Service": req.Service, + "X-Micro-Method": req.ServiceMethod, }, } if err := c.codec.Write(m, body); err != nil { diff --git a/codec/grpc/grpc.go b/codec/grpc/grpc.go new file mode 100644 index 00000000..f3703840 --- /dev/null +++ b/codec/grpc/grpc.go @@ -0,0 +1,119 @@ +// Package grpc provides a grpc codec +package grpc + +import ( + "encoding/json" + "errors" + "io" + "strings" + + "github.com/golang/protobuf/proto" + "github.com/micro/go-micro/codec" +) + +type Codec struct { + Conn io.ReadWriteCloser + ContentType string +} + +func (c *Codec) ReadHeader(m *codec.Message, t codec.MessageType) error { + if ct := m.Header["Content-Type"]; len(ct) > 0 { + c.ContentType = ct + } + + if ct := m.Header["content-type"]; len(ct) > 0 { + c.ContentType = ct + } + + // service method + path := m.Header[":path"] + if len(path) == 0 || path[0] != '/' { + m.Target = m.Header["X-Micro-Service"] + m.Method = m.Header["X-Micro-Method"] + } else { + // [ , a.package.Foo, Bar] + parts := strings.Split(path, "/") + if len(parts) != 3 { + return errors.New("Unknown request path") + } + service := strings.Split(parts[1], ".") + m.Method = strings.Join([]string{service[len(service)-1], parts[2]}, ".") + m.Target = strings.Join(service[:len(service)-1], ".") + } + + return nil +} + +func (c *Codec) ReadBody(b interface{}) error { + // no body + if b == nil { + return nil + } + + _, buf, err := decode(c.Conn) + if err != nil { + return err + } + + switch c.ContentType { + case "application/grpc+json": + return json.Unmarshal(buf, b) + case "application/grpc+proto", "application/grpc": + return proto.Unmarshal(buf, b.(proto.Message)) + } + + return errors.New("Unsupported Content-Type") +} + +func (c *Codec) Write(m *codec.Message, b interface{}) error { + var buf []byte + var err error + + if ct := m.Header["Content-Type"]; len(ct) > 0 { + c.ContentType = ct + } + + if ct := m.Header["content-type"]; len(ct) > 0 { + c.ContentType = ct + } + + m.Header["Trailer"] = "grpc-status, grpc-message" + + switch c.ContentType { + case "application/grpc+json": + buf, err = json.Marshal(b) + case "application/grpc+proto", "application/grpc": + pb, ok := b.(proto.Message) + if ok { + buf, err = proto.Marshal(pb) + } + default: + err = errors.New("Unsupported Content-Type") + } + + if err != nil { + m.Header["grpc-status"] = "8" + m.Header["grpc-message"] = err.Error() + return err + } + + m.Header["grpc-status"] = "0" + m.Header["grpc-message"] = "" + + return encode(0, buf, c.Conn) +} + +func (c *Codec) Close() error { + return c.Conn.Close() +} + +func (c *Codec) String() string { + return "grpc" +} + +func NewCodec(c io.ReadWriteCloser) codec.Codec { + return &Codec{ + Conn: c, + ContentType: "application/grpc", + } +} diff --git a/codec/grpc/util.go b/codec/grpc/util.go new file mode 100644 index 00000000..04c5ee38 --- /dev/null +++ b/codec/grpc/util.go @@ -0,0 +1,70 @@ +package grpc + +import ( + "encoding/binary" + "fmt" + "io" +) + +var ( + maxMessageSize = 1024 * 1024 * 4 + maxInt = int(^uint(0) >> 1) +) + +func decode(r io.Reader) (uint8, []byte, error) { + header := make([]byte, 5) + + // read the header + if _, err := r.Read(header[:]); err != nil { + return uint8(0), nil, err + } + + // get encoding format e.g compressed + cf := uint8(header[0]) + + // get message length + length := binary.BigEndian.Uint32(header[1:]) + + // no encoding format + if length == 0 { + return cf, nil, nil + } + + // + if int64(length) > int64(maxInt) { + return cf, nil, fmt.Errorf("grpc: received message larger than max length allowed on current machine (%d vs. %d)", length, maxInt) + } + if int(length) > maxMessageSize { + return cf, nil, fmt.Errorf("grpc: received message larger than max (%d vs. %d)", length, maxMessageSize) + } + + msg := make([]byte, int(length)) + + if _, err := r.Read(msg); err != nil { + if err == io.EOF { + err = io.ErrUnexpectedEOF + } + return cf, nil, err + } + + return cf, msg, nil +} + +func encode(cf uint8, buf []byte, w io.Writer) error { + header := make([]byte, 5) + + // set compression + header[0] = byte(cf) + + // write length as header + binary.BigEndian.PutUint32(header[1:], uint32(len(buf))) + + // read the header + if _, err := w.Write(header[:]); err != nil { + return err + } + + // write the buffer + _, err := w.Write(buf) + return err +} diff --git a/codec/json/json.go b/codec/json/json.go new file mode 100644 index 00000000..6389541c --- /dev/null +++ b/codec/json/json.go @@ -0,0 +1,43 @@ +// Package json provides a json codec +package json + +import ( + "encoding/json" + "io" + + "github.com/micro/go-micro/codec" +) + +type Codec struct { + Conn io.ReadWriteCloser + Encoder *json.Encoder + Decoder *json.Decoder +} + +func (c *Codec) ReadHeader(m *codec.Message, t codec.MessageType) error { + return nil +} + +func (c *Codec) ReadBody(b interface{}) error { + return c.Decoder.Decode(b) +} + +func (c *Codec) Write(m *codec.Message, b interface{}) error { + return c.Encoder.Encode(b) +} + +func (c *Codec) Close() error { + return c.Conn.Close() +} + +func (c *Codec) String() string { + return "json" +} + +func NewCodec(c io.ReadWriteCloser) codec.Codec { + return &Codec{ + Conn: c, + Decoder: json.NewDecoder(c), + Encoder: json.NewEncoder(c), + } +} diff --git a/codec/proto/proto.go b/codec/proto/proto.go new file mode 100644 index 00000000..339c08c1 --- /dev/null +++ b/codec/proto/proto.go @@ -0,0 +1,49 @@ +// Package proto provides a proto codec +package proto + +import ( + "io" + "io/ioutil" + + "github.com/golang/protobuf/proto" + "github.com/micro/go-micro/codec" +) + +type Codec struct { + Conn io.ReadWriteCloser +} + +func (c *Codec) ReadHeader(m *codec.Message, t codec.MessageType) error { + return nil +} + +func (c *Codec) ReadBody(b interface{}) error { + buf, err := ioutil.ReadAll(c.Conn) + if err != nil { + return err + } + return proto.Unmarshal(buf, b.(proto.Message)) +} + +func (c *Codec) Write(m *codec.Message, b interface{}) error { + buf, err := proto.Marshal(b.(proto.Message)) + if err != nil { + return err + } + _, err = c.Conn.Write(buf) + return err +} + +func (c *Codec) Close() error { + return c.Conn.Close() +} + +func (c *Codec) String() string { + return "proto" +} + +func NewCodec(c io.ReadWriteCloser) codec.Codec { + return &Codec{ + Conn: c, + } +} diff --git a/server/rpc_codec.go b/server/rpc_codec.go index 87a32795..f19b2782 100644 --- a/server/rpc_codec.go +++ b/server/rpc_codec.go @@ -4,7 +4,10 @@ import ( "bytes" "github.com/micro/go-micro/codec" + "github.com/micro/go-micro/codec/grpc" + "github.com/micro/go-micro/codec/json" "github.com/micro/go-micro/codec/jsonrpc" + "github.com/micro/go-micro/codec/proto" "github.com/micro/go-micro/codec/protorpc" "github.com/micro/go-micro/transport" "github.com/pkg/errors" @@ -25,9 +28,12 @@ type readWriteCloser struct { var ( defaultCodecs = map[string]codec.NewCodec{ - "application/json": jsonrpc.NewCodec, + "application/grpc": grpc.NewCodec, + "application/grpc+json": grpc.NewCodec, + "application/grpc+proto": grpc.NewCodec, + "application/json": json.NewCodec, "application/json-rpc": jsonrpc.NewCodec, - "application/protobuf": protorpc.NewCodec, + "application/protobuf": proto.NewCodec, "application/proto-rpc": protorpc.NewCodec, "application/octet-stream": protorpc.NewCodec, } @@ -77,6 +83,10 @@ func (c *rpcCodec) ReadRequestHeader(r *request, first bool) error { m.Header = tm.Header } + // set some internal things + m.Target = m.Header["X-Micro-Service"] + m.Method = m.Header["X-Micro-Method"] + err := c.codec.ReadHeader(&m, codec.Request) r.ServiceMethod = m.Method r.Seq = m.Id diff --git a/transport/http_transport.go b/transport/http_transport.go index 4128404c..885459aa 100644 --- a/transport/http_transport.go +++ b/transport/http_transport.go @@ -1,7 +1,6 @@ package transport import ( - //"fmt" "bufio" "bytes" "crypto/tls" @@ -246,6 +245,9 @@ func (h *httpTransportSocket) Recv(m *Message) error { } } + // set path + m.Header[":path"] = h.r.URL.Path + // return early early return nil } @@ -277,6 +279,9 @@ func (h *httpTransportSocket) Recv(m *Message) error { } } + // set path + m.Header[":path"] = h.r.URL.Path + return nil }