Fix client rpc codec
This commit is contained in:
		| @@ -15,39 +15,58 @@ type rpcPlusCodec struct { | |||||||
| 	codec  rpc.ClientCodec | 	codec  rpc.ClientCodec | ||||||
|  |  | ||||||
| 	req *transport.Message | 	req *transport.Message | ||||||
|  | 	buf *readWriteCloser | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type readWriteCloser struct { | ||||||
| 	wbuf *bytes.Buffer | 	wbuf *bytes.Buffer | ||||||
| 	rbuf *bytes.Buffer | 	rbuf *bytes.Buffer | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func (rwc *readWriteCloser) Read(p []byte) (n int, err error) { | ||||||
|  | 	return rwc.rbuf.Read(p) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (rwc *readWriteCloser) Write(p []byte) (n int, err error) { | ||||||
|  | 	return rwc.wbuf.Write(p) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (rwc *readWriteCloser) Close() error { | ||||||
|  | 	rwc.rbuf.Reset() | ||||||
|  | 	rwc.wbuf.Reset() | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
| func newRpcPlusCodec(req *transport.Message, client transport.Client) *rpcPlusCodec { | func newRpcPlusCodec(req *transport.Message, client transport.Client) *rpcPlusCodec { | ||||||
| 	return &rpcPlusCodec{ | 	r := &rpcPlusCodec{ | ||||||
| 		req:    req, | 		req:    req, | ||||||
| 		client: client, | 		client: client, | ||||||
| 		wbuf:   bytes.NewBuffer(nil), | 		buf: &readWriteCloser{ | ||||||
| 		rbuf:   bytes.NewBuffer(nil), | 			wbuf: bytes.NewBuffer(nil), | ||||||
|  | 			rbuf: bytes.NewBuffer(nil), | ||||||
|  | 		}, | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	switch req.Header["Content-Type"] { | ||||||
|  | 	case "application/octet-stream": | ||||||
|  | 		r.codec = pb.NewClientCodec(r.buf) | ||||||
|  | 	case "application/json": | ||||||
|  | 		r.codec = js.NewClientCodec(r.buf) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return r | ||||||
| } | } | ||||||
|  |  | ||||||
| func (c *rpcPlusCodec) WriteRequest(req *rpc.Request, body interface{}) error { | func (c *rpcPlusCodec) WriteRequest(req *rpc.Request, body interface{}) error { | ||||||
| 	c.wbuf.Reset() | 	if c.codec == nil { | ||||||
| 	buf := &buffer{c.wbuf} |  | ||||||
|  |  | ||||||
| 	var cc rpc.ClientCodec |  | ||||||
| 	switch c.req.Header["Content-Type"] { |  | ||||||
| 	case "application/octet-stream": |  | ||||||
| 		cc = pb.NewClientCodec(buf) |  | ||||||
| 	case "application/json": |  | ||||||
| 		cc = js.NewClientCodec(buf) |  | ||||||
| 	default: |  | ||||||
| 		return fmt.Errorf("unsupported request type: %s", c.req.Header["Content-Type"]) | 		return fmt.Errorf("unsupported request type: %s", c.req.Header["Content-Type"]) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if err := cc.WriteRequest(req, body); err != nil { | 	if err := c.codec.WriteRequest(req, body); err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	c.req.Body = c.wbuf.Bytes() | 	c.req.Body = c.buf.wbuf.Bytes() | ||||||
| 	return c.client.Send(c.req) | 	return c.client.Send(c.req) | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -58,19 +77,12 @@ func (c *rpcPlusCodec) ReadResponseHeader(r *rpc.Response) error { | |||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	c.rbuf.Reset() | 	if c.codec == nil { | ||||||
| 	c.rbuf.Write(m.Body) |  | ||||||
| 	buf := &buffer{c.rbuf} |  | ||||||
|  |  | ||||||
| 	switch m.Header["Content-Type"] { |  | ||||||
| 	case "application/octet-stream": |  | ||||||
| 		c.codec = pb.NewClientCodec(buf) |  | ||||||
| 	case "application/json": |  | ||||||
| 		c.codec = js.NewClientCodec(buf) |  | ||||||
| 	default: |  | ||||||
| 		return fmt.Errorf("%s", string(m.Body)) | 		return fmt.Errorf("%s", string(m.Body)) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	c.buf.rbuf.Reset() | ||||||
|  | 	c.buf.rbuf.Write(m.Body) | ||||||
| 	return c.codec.ReadResponseHeader(r) | 	return c.codec.ReadResponseHeader(r) | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -79,7 +91,6 @@ func (c *rpcPlusCodec) ReadResponseBody(r interface{}) error { | |||||||
| } | } | ||||||
|  |  | ||||||
| func (c *rpcPlusCodec) Close() error { | func (c *rpcPlusCodec) Close() error { | ||||||
| 	c.rbuf.Reset() | 	c.buf.Close() | ||||||
| 	c.wbuf.Reset() |  | ||||||
| 	return c.client.Close() | 	return c.client.Close() | ||||||
| } | } | ||||||
|   | |||||||
| @@ -37,14 +37,14 @@ func (rwc *readWriteCloser) Close() error { | |||||||
| } | } | ||||||
|  |  | ||||||
| func newRpcPlusCodec(req *transport.Message, socket transport.Socket) rpc.ServerCodec { | func newRpcPlusCodec(req *transport.Message, socket transport.Socket) rpc.ServerCodec { | ||||||
|         r := &rpcPlusCodec{ | 	r := &rpcPlusCodec{ | ||||||
|                 socket: socket, | 		socket: socket, | ||||||
|                 req:    req, | 		req:    req, | ||||||
| 		buf: &readWriteCloser{ | 		buf: &readWriteCloser{ | ||||||
| 			rbuf: bytes.NewBuffer(req.Body), | 			rbuf: bytes.NewBuffer(req.Body), | ||||||
| 			wbuf: bytes.NewBuffer(nil), | 			wbuf: bytes.NewBuffer(nil), | ||||||
| 		}, | 		}, | ||||||
|         } | 	} | ||||||
|  |  | ||||||
| 	switch req.Header["Content-Type"] { | 	switch req.Header["Content-Type"] { | ||||||
| 	case "application/octet-stream": | 	case "application/octet-stream": | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user