Merge pull request #362 from micro/codec

Make json/protobuf/grpc codecs
This commit is contained in:
Asim Aslam 2019-01-02 18:01:34 +00:00 committed by GitHub
commit ce36d0156d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 305 additions and 7 deletions

View File

@ -5,7 +5,9 @@ import (
errs "errors" errs "errors"
"github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec"
"github.com/micro/go-micro/codec/json"
"github.com/micro/go-micro/codec/jsonrpc" "github.com/micro/go-micro/codec/jsonrpc"
"github.com/micro/go-micro/codec/proto"
"github.com/micro/go-micro/codec/protorpc" "github.com/micro/go-micro/codec/protorpc"
"github.com/micro/go-micro/errors" "github.com/micro/go-micro/errors"
"github.com/micro/go-micro/transport" "github.com/micro/go-micro/transport"
@ -65,9 +67,9 @@ var (
defaultContentType = "application/octet-stream" defaultContentType = "application/octet-stream"
defaultCodecs = map[string]codec.NewCodec{ defaultCodecs = map[string]codec.NewCodec{
"application/json": jsonrpc.NewCodec, "application/protobuf": proto.NewCodec,
"application/json": json.NewCodec,
"application/json-rpc": jsonrpc.NewCodec, "application/json-rpc": jsonrpc.NewCodec,
"application/protobuf": protorpc.NewCodec,
"application/proto-rpc": protorpc.NewCodec, "application/proto-rpc": protorpc.NewCodec,
"application/octet-stream": protorpc.NewCodec, "application/octet-stream": protorpc.NewCodec,
} }
@ -110,7 +112,7 @@ func (c *rpcCodec) WriteRequest(req *request, body interface{}) error {
Method: req.ServiceMethod, Method: req.ServiceMethod,
Type: codec.Request, Type: codec.Request,
Header: map[string]string{ Header: map[string]string{
"X-Micro-Target": req.Service, "X-Micro-Service": req.Service,
"X-Micro-Method": req.ServiceMethod, "X-Micro-Method": req.ServiceMethod,
}, },
} }

119
codec/grpc/grpc.go Normal file
View File

@ -0,0 +1,119 @@
// Package grpc provides a grpc codec
package grpc
import (
"encoding/json"
"errors"
"io"
"strings"
"github.com/golang/protobuf/proto"
"github.com/micro/go-micro/codec"
)
type Codec struct {
Conn io.ReadWriteCloser
ContentType string
}
func (c *Codec) ReadHeader(m *codec.Message, t codec.MessageType) error {
if ct := m.Header["Content-Type"]; len(ct) > 0 {
c.ContentType = ct
}
if ct := m.Header["content-type"]; len(ct) > 0 {
c.ContentType = ct
}
// service method
path := m.Header[":path"]
if len(path) == 0 || path[0] != '/' {
m.Target = m.Header["X-Micro-Service"]
m.Method = m.Header["X-Micro-Method"]
} else {
// [ , a.package.Foo, Bar]
parts := strings.Split(path, "/")
if len(parts) != 3 {
return errors.New("Unknown request path")
}
service := strings.Split(parts[1], ".")
m.Method = strings.Join([]string{service[len(service)-1], parts[2]}, ".")
m.Target = strings.Join(service[:len(service)-1], ".")
}
return nil
}
func (c *Codec) ReadBody(b interface{}) error {
// no body
if b == nil {
return nil
}
_, buf, err := decode(c.Conn)
if err != nil {
return err
}
switch c.ContentType {
case "application/grpc+json":
return json.Unmarshal(buf, b)
case "application/grpc+proto", "application/grpc":
return proto.Unmarshal(buf, b.(proto.Message))
}
return errors.New("Unsupported Content-Type")
}
func (c *Codec) Write(m *codec.Message, b interface{}) error {
var buf []byte
var err error
if ct := m.Header["Content-Type"]; len(ct) > 0 {
c.ContentType = ct
}
if ct := m.Header["content-type"]; len(ct) > 0 {
c.ContentType = ct
}
m.Header["Trailer"] = "grpc-status, grpc-message"
switch c.ContentType {
case "application/grpc+json":
buf, err = json.Marshal(b)
case "application/grpc+proto", "application/grpc":
pb, ok := b.(proto.Message)
if ok {
buf, err = proto.Marshal(pb)
}
default:
err = errors.New("Unsupported Content-Type")
}
if err != nil {
m.Header["grpc-status"] = "8"
m.Header["grpc-message"] = err.Error()
return err
}
m.Header["grpc-status"] = "0"
m.Header["grpc-message"] = ""
return encode(0, buf, c.Conn)
}
func (c *Codec) Close() error {
return c.Conn.Close()
}
func (c *Codec) String() string {
return "grpc"
}
func NewCodec(c io.ReadWriteCloser) codec.Codec {
return &Codec{
Conn: c,
ContentType: "application/grpc",
}
}

70
codec/grpc/util.go Normal file
View File

@ -0,0 +1,70 @@
package grpc
import (
"encoding/binary"
"fmt"
"io"
)
var (
maxMessageSize = 1024 * 1024 * 4
maxInt = int(^uint(0) >> 1)
)
func decode(r io.Reader) (uint8, []byte, error) {
header := make([]byte, 5)
// read the header
if _, err := r.Read(header[:]); err != nil {
return uint8(0), nil, err
}
// get encoding format e.g compressed
cf := uint8(header[0])
// get message length
length := binary.BigEndian.Uint32(header[1:])
// no encoding format
if length == 0 {
return cf, nil, nil
}
//
if int64(length) > int64(maxInt) {
return cf, nil, fmt.Errorf("grpc: received message larger than max length allowed on current machine (%d vs. %d)", length, maxInt)
}
if int(length) > maxMessageSize {
return cf, nil, fmt.Errorf("grpc: received message larger than max (%d vs. %d)", length, maxMessageSize)
}
msg := make([]byte, int(length))
if _, err := r.Read(msg); err != nil {
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
return cf, nil, err
}
return cf, msg, nil
}
func encode(cf uint8, buf []byte, w io.Writer) error {
header := make([]byte, 5)
// set compression
header[0] = byte(cf)
// write length as header
binary.BigEndian.PutUint32(header[1:], uint32(len(buf)))
// read the header
if _, err := w.Write(header[:]); err != nil {
return err
}
// write the buffer
_, err := w.Write(buf)
return err
}

43
codec/json/json.go Normal file
View File

@ -0,0 +1,43 @@
// Package json provides a json codec
package json
import (
"encoding/json"
"io"
"github.com/micro/go-micro/codec"
)
type Codec struct {
Conn io.ReadWriteCloser
Encoder *json.Encoder
Decoder *json.Decoder
}
func (c *Codec) ReadHeader(m *codec.Message, t codec.MessageType) error {
return nil
}
func (c *Codec) ReadBody(b interface{}) error {
return c.Decoder.Decode(b)
}
func (c *Codec) Write(m *codec.Message, b interface{}) error {
return c.Encoder.Encode(b)
}
func (c *Codec) Close() error {
return c.Conn.Close()
}
func (c *Codec) String() string {
return "json"
}
func NewCodec(c io.ReadWriteCloser) codec.Codec {
return &Codec{
Conn: c,
Decoder: json.NewDecoder(c),
Encoder: json.NewEncoder(c),
}
}

49
codec/proto/proto.go Normal file
View File

@ -0,0 +1,49 @@
// Package proto provides a proto codec
package proto
import (
"io"
"io/ioutil"
"github.com/golang/protobuf/proto"
"github.com/micro/go-micro/codec"
)
type Codec struct {
Conn io.ReadWriteCloser
}
func (c *Codec) ReadHeader(m *codec.Message, t codec.MessageType) error {
return nil
}
func (c *Codec) ReadBody(b interface{}) error {
buf, err := ioutil.ReadAll(c.Conn)
if err != nil {
return err
}
return proto.Unmarshal(buf, b.(proto.Message))
}
func (c *Codec) Write(m *codec.Message, b interface{}) error {
buf, err := proto.Marshal(b.(proto.Message))
if err != nil {
return err
}
_, err = c.Conn.Write(buf)
return err
}
func (c *Codec) Close() error {
return c.Conn.Close()
}
func (c *Codec) String() string {
return "proto"
}
func NewCodec(c io.ReadWriteCloser) codec.Codec {
return &Codec{
Conn: c,
}
}

View File

@ -4,7 +4,10 @@ import (
"bytes" "bytes"
"github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec"
"github.com/micro/go-micro/codec/grpc"
"github.com/micro/go-micro/codec/json"
"github.com/micro/go-micro/codec/jsonrpc" "github.com/micro/go-micro/codec/jsonrpc"
"github.com/micro/go-micro/codec/proto"
"github.com/micro/go-micro/codec/protorpc" "github.com/micro/go-micro/codec/protorpc"
"github.com/micro/go-micro/transport" "github.com/micro/go-micro/transport"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -25,9 +28,12 @@ type readWriteCloser struct {
var ( var (
defaultCodecs = map[string]codec.NewCodec{ defaultCodecs = map[string]codec.NewCodec{
"application/json": jsonrpc.NewCodec, "application/grpc": grpc.NewCodec,
"application/grpc+json": grpc.NewCodec,
"application/grpc+proto": grpc.NewCodec,
"application/json": json.NewCodec,
"application/json-rpc": jsonrpc.NewCodec, "application/json-rpc": jsonrpc.NewCodec,
"application/protobuf": protorpc.NewCodec, "application/protobuf": proto.NewCodec,
"application/proto-rpc": protorpc.NewCodec, "application/proto-rpc": protorpc.NewCodec,
"application/octet-stream": protorpc.NewCodec, "application/octet-stream": protorpc.NewCodec,
} }
@ -77,6 +83,10 @@ func (c *rpcCodec) ReadRequestHeader(r *request, first bool) error {
m.Header = tm.Header m.Header = tm.Header
} }
// set some internal things
m.Target = m.Header["X-Micro-Service"]
m.Method = m.Header["X-Micro-Method"]
err := c.codec.ReadHeader(&m, codec.Request) err := c.codec.ReadHeader(&m, codec.Request)
r.ServiceMethod = m.Method r.ServiceMethod = m.Method
r.Seq = m.Id r.Seq = m.Id

View File

@ -1,7 +1,6 @@
package transport package transport
import ( import (
//"fmt"
"bufio" "bufio"
"bytes" "bytes"
"crypto/tls" "crypto/tls"
@ -246,6 +245,9 @@ func (h *httpTransportSocket) Recv(m *Message) error {
} }
} }
// set path
m.Header[":path"] = h.r.URL.Path
// return early early // return early early
return nil return nil
} }
@ -277,6 +279,9 @@ func (h *httpTransportSocket) Recv(m *Message) error {
} }
} }
// set path
m.Header[":path"] = h.r.URL.Path
return nil return nil
} }