diff --git a/server/rpc_codec.go b/server/rpc_codec.go index 03651bad..dc203bd5 100644 --- a/server/rpc_codec.go +++ b/server/rpc_codec.go @@ -3,7 +3,6 @@ package server import ( "bytes" "fmt" - "github.com/myodc/go-micro/transport" rpc "github.com/youtube/vitess/go/rpcplus" js "github.com/youtube/vitess/go/rpcplus/jsonrpc" @@ -15,34 +14,52 @@ type rpcPlusCodec struct { codec rpc.ServerCodec req *transport.Message + buf *readWriteCloser +} +type readWriteCloser struct { wbuf *bytes.Buffer rbuf *bytes.Buffer } -func newRpcPlusCodec(req *transport.Message, socket transport.Socket) *rpcPlusCodec { - return &rpcPlusCodec{ - socket: socket, - req: req, - wbuf: bytes.NewBuffer(nil), - rbuf: bytes.NewBuffer(nil), +func (rwc *readWriteCloser) Read(p []byte) (n int, err error) { + return rwc.rbuf.Read(p) +} + +func (rwc *readWriteCloser) Write(p []byte) (n int, err error) { + return rwc.wbuf.Write(p) +} + +func (rwc *readWriteCloser) Close() error { + rwc.rbuf.Reset() + rwc.wbuf.Reset() + return nil +} + +func newRpcPlusCodec(req *transport.Message, socket transport.Socket) rpc.ServerCodec { + r := &rpcPlusCodec{ + socket: socket, + req: req, + buf: &readWriteCloser{ + rbuf: bytes.NewBuffer(req.Body), + wbuf: bytes.NewBuffer(nil), + }, + } + + switch req.Header["Content-Type"] { + case "application/octet-stream": + r.codec = pb.NewServerCodec(r.buf) + case "application/json": + r.codec = js.NewServerCodec(r.buf) } + + return r } func (c *rpcPlusCodec) ReadRequestHeader(r *rpc.Request) error { - c.rbuf.Reset() - c.rbuf.Write(c.req.Body) - buf := &buffer{c.rbuf} - - switch c.req.Header["Content-Type"] { - case "application/octet-stream": - c.codec = pb.NewServerCodec(buf) - case "application/json": - c.codec = js.NewServerCodec(buf) - default: + if c.codec == nil { return fmt.Errorf("unsupported content type %s", c.req.Header["Content-Type"]) } - return c.codec.ReadRequestHeader(r) } @@ -51,32 +68,22 @@ func (c *rpcPlusCodec) ReadRequestBody(r interface{}) error { } func (c *rpcPlusCodec) WriteResponse(r *rpc.Response, body interface{}, last bool) error { - c.wbuf.Reset() - buf := &buffer{c.wbuf} - - var cc rpc.ServerCodec - switch c.req.Header["Content-Type"] { - case "application/octet-stream": - cc = pb.NewServerCodec(buf) - case "application/json": - cc = js.NewServerCodec(buf) - default: + if c.codec == nil { return fmt.Errorf("unsupported request type: %s", c.req.Header["Content-Type"]) } - - if err := cc.WriteResponse(r, body, last); err != nil { + c.buf.wbuf.Reset() + if err := c.codec.WriteResponse(r, body, last); err != nil { return err } return c.socket.Send(&transport.Message{ Header: map[string]string{"Content-Type": c.req.Header["Content-Type"]}, - Body: c.wbuf.Bytes(), + Body: c.buf.wbuf.Bytes(), }) } func (c *rpcPlusCodec) Close() error { - c.wbuf.Reset() - c.rbuf.Reset() + c.buf.Close() return c.socket.Close() }