From 80f53ab17600e3efde94dc1a79961a1ff4e9199b Mon Sep 17 00:00:00 2001 From: Asim Date: Sat, 28 Nov 2015 18:40:32 +0000 Subject: [PATCH] Add json codec --- client/rpc_codec.go | 5 +- codec/json/client.go | 97 +++++++++++++++++++++++++++++++++++++ codec/json/json.go | 88 ++++++++++++++++++++++++++++++++++ codec/json/server.go | 111 +++++++++++++++++++++++++++++++++++++++++++ server/rpc_codec.go | 5 +- 5 files changed, 302 insertions(+), 4 deletions(-) create mode 100644 codec/json/client.go create mode 100644 codec/json/json.go create mode 100644 codec/json/server.go diff --git a/client/rpc_codec.go b/client/rpc_codec.go index d151844f..c75f0029 100644 --- a/client/rpc_codec.go +++ b/client/rpc_codec.go @@ -4,6 +4,7 @@ import ( "bytes" "github.com/micro/go-micro/codec" + "github.com/micro/go-micro/codec/json" "github.com/micro/go-micro/codec/proto" "github.com/micro/go-micro/transport" rpc "github.com/youtube/vitess/go/rpcplus" @@ -26,8 +27,8 @@ var ( defaultContentType = "application/octet-stream" defaultCodecs = map[string]codec.NewCodec{ - // "application/json": jsonrpc.NewClientCodec, - // "application/json-rpc": jsonrpc.NewClientCodec, + "application/json": json.NewCodec, + "application/json-rpc": json.NewCodec, "application/protobuf": proto.NewCodec, "application/proto-rpc": proto.NewCodec, "application/octet-stream": proto.NewCodec, diff --git a/codec/json/client.go b/codec/json/client.go new file mode 100644 index 00000000..7a26bd9f --- /dev/null +++ b/codec/json/client.go @@ -0,0 +1,97 @@ +package json + +import ( + "encoding/json" + "fmt" + "io" + "sync" + + "github.com/micro/go-micro/codec" +) + +type clientCodec struct { + dec *json.Decoder // for reading JSON values + enc *json.Encoder // for writing JSON values + c io.Closer + + // temporary work space + req clientRequest + resp clientResponse + + sync.Mutex + pending map[uint64]string +} + +type clientRequest struct { + Method string `json:"method"` + Params [1]interface{} `json:"params"` + ID uint64 `json:"id"` +} + +type clientResponse struct { + ID uint64 `json:"id"` + Result *json.RawMessage `json:"result"` + Error interface{} `json:"error"` +} + +func newClientCodec(conn io.ReadWriteCloser) *clientCodec { + return &clientCodec{ + dec: json.NewDecoder(conn), + enc: json.NewEncoder(conn), + c: conn, + pending: make(map[uint64]string), + } +} + +func (c *clientCodec) Write(m *codec.Message, b interface{}) error { + c.Lock() + c.pending[m.Id] = m.Method + c.Unlock() + c.req.Method = m.Method + c.req.Params[0] = b + c.req.ID = m.Id + return c.enc.Encode(&c.req) +} + +func (r *clientResponse) reset() { + r.ID = 0 + r.Result = nil + r.Error = nil +} + +func (c *clientCodec) ReadHeader(m *codec.Message) error { + c.resp.reset() + if err := c.dec.Decode(&c.resp); err != nil { + return err + } + + c.Lock() + m.Method = c.pending[c.resp.ID] + delete(c.pending, c.resp.ID) + c.Unlock() + + m.Error = "" + m.Id = c.resp.ID + if c.resp.Error != nil { + x, ok := c.resp.Error.(string) + if !ok { + return fmt.Errorf("invalid error %v", c.resp.Error) + } + if x == "" { + x = "unspecified error" + } + m.Error = x + } + return nil +} + +func (c *clientCodec) ReadBody(x interface{}) error { + if x == nil { + return nil + } + return json.Unmarshal(*c.resp.Result, x) +} + +func (c *clientCodec) Close() error { + return c.c.Close() +} diff --git a/codec/json/json.go b/codec/json/json.go new file mode 100644 index 00000000..d2909b04 --- /dev/null +++ b/codec/json/json.go @@ -0,0 +1,88 @@ +package json + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + + "github.com/micro/go-micro/codec" +) + +type jsonCodec struct { + buf *bytes.Buffer + mt codec.MessageType + rwc io.ReadWriteCloser + c *clientCodec + s *serverCodec +} + +func (j *jsonCodec) Close() error { + j.buf.Reset() + return j.rwc.Close() +} + +func (j *jsonCodec) String() string { + return "json" +} + +func (j *jsonCodec) Write(m *codec.Message, b interface{}) error { + switch m.Type { + case codec.Request: + return j.c.Write(m, b) + case codec.Response: + return j.s.Write(m, b) + case codec.Publication: + data, err := json.Marshal(b) + if err != nil { + return err + } + _, err = j.rwc.Write(data) + return err + default: + return fmt.Errorf("Unrecognised message type: %v", m.Type) + } + return nil +} + +func (j *jsonCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error { + j.buf.Reset() + j.mt = mt + + switch mt { + case codec.Request: + return j.s.ReadHeader(m) + case codec.Response: + return j.c.ReadHeader(m) + case codec.Publication: + io.Copy(j.buf, j.rwc) + default: + return fmt.Errorf("Unrecognised message type: %v", mt) + } + return nil +} + +func (j *jsonCodec) ReadBody(b interface{}) error { + switch j.mt { + case codec.Request: + return j.s.ReadBody(b) + case codec.Response: + return j.c.ReadBody(b) + case codec.Publication: + if b != nil { + return json.Unmarshal(j.buf.Bytes(), b) + } + default: + return fmt.Errorf("Unrecognised message type: %v", j.mt) + } + return nil +} + +func NewCodec(rwc io.ReadWriteCloser) codec.Codec { + return &jsonCodec{ + buf: bytes.NewBuffer(nil), + rwc: rwc, + c: newClientCodec(rwc), + s: newServerCodec(rwc), + } +} diff --git a/codec/json/server.go b/codec/json/server.go new file mode 100644 index 00000000..ee035fcb --- /dev/null +++ b/codec/json/server.go @@ -0,0 +1,111 @@ +package json + +import ( + "encoding/json" + "errors" + "io" + "sync" + + "github.com/micro/go-micro/codec" +) + +type serverCodec struct { + dec *json.Decoder // for reading JSON values + enc *json.Encoder // for writing JSON values + c io.Closer + + // temporary work space + req serverRequest + resp serverResponse + + sync.Mutex + seq uint64 + pending map[uint64]*json.RawMessage +} + +type serverRequest struct { + Method string `json:"method"` + Params *json.RawMessage `json:"params"` + ID *json.RawMessage `json:"id"` +} + +type serverResponse struct { + ID *json.RawMessage `json:"id"` + Result interface{} `json:"result"` + Error interface{} `json:"error"` +} + +func newServerCodec(conn io.ReadWriteCloser) *serverCodec { + return &serverCodec{ + dec: json.NewDecoder(conn), + enc: json.NewEncoder(conn), + c: conn, + pending: make(map[uint64]*json.RawMessage), + } +} + +func (r *serverRequest) reset() { + r.Method = "" + if r.Params != nil { + *r.Params = (*r.Params)[0:0] + } + if r.ID != nil { + *r.ID = (*r.ID)[0:0] + } +} + +func (c *serverCodec) ReadHeader(m *codec.Message) error { + c.req.reset() + if err := c.dec.Decode(&c.req); err != nil { + return err + } + m.Method = c.req.Method + + c.Lock() + c.seq++ + c.pending[c.seq] = c.req.ID + c.req.ID = nil + m.Id = c.seq + c.Unlock() + + return nil +} + +func (c *serverCodec) ReadBody(x interface{}) error { + if x == nil { + return nil + } + var params [1]interface{} + params[0] = x + return json.Unmarshal(*c.req.Params, ¶ms) +} + +var null = json.RawMessage([]byte("null")) + +func (c *serverCodec) Write(m *codec.Message, x interface{}) error { + var resp serverResponse + c.Lock() + b, ok := c.pending[m.Id] + if !ok { + c.Unlock() + return errors.New("invalid sequence number in response") + } + c.Unlock() + + if b == nil { + // Invalid request so no id. Use JSON null. + b = &null + } + resp.ID = b + resp.Result = x + if m.Error == "" { + resp.Error = nil + } else { + resp.Error = m.Error + } + return c.enc.Encode(resp) +} + +func (c *serverCodec) Close() error { + return c.c.Close() +} diff --git a/server/rpc_codec.go b/server/rpc_codec.go index 7ec7cdef..9d567210 100644 --- a/server/rpc_codec.go +++ b/server/rpc_codec.go @@ -4,6 +4,7 @@ import ( "bytes" "github.com/micro/go-micro/codec" + "github.com/micro/go-micro/codec/json" "github.com/micro/go-micro/codec/proto" "github.com/micro/go-micro/transport" rpc "github.com/youtube/vitess/go/rpcplus" @@ -24,8 +25,8 @@ type readWriteCloser struct { var ( defaultCodecs = map[string]codec.NewCodec{ - // "application/json": jsonrpc.NewServerCodec, - // "application/json-rpc": jsonrpc.NewServerCodec, + "application/json": json.NewCodec, + "application/json-rpc": json.NewCodec, "application/protobuf": proto.NewCodec, "application/proto-rpc": proto.NewCodec, "application/octet-stream": proto.NewCodec,