Process header/body in one call

This commit is contained in:
Asim Aslam 2018-12-31 17:53:16 +00:00
parent dcf7a56f9b
commit 386ced576a
2 changed files with 12 additions and 15 deletions

View File

@ -43,9 +43,7 @@ type readWriteCloser struct {
type clientCodec interface { type clientCodec interface {
WriteRequest(*request, interface{}) error WriteRequest(*request, interface{}) error
ReadResponseHeader(*response) error ReadResponse(*response, interface{}) error
ReadResponseBody(interface{}) error
Close() error Close() error
} }
@ -129,28 +127,33 @@ func (c *rpcCodec) WriteRequest(req *request, body interface{}) error {
return nil return nil
} }
func (c *rpcCodec) ReadResponseHeader(r *response) error { func (c *rpcCodec) ReadResponse(r *response, b interface{}) error {
var m transport.Message var m transport.Message
if err := c.client.Recv(&m); err != nil { if err := c.client.Recv(&m); err != nil {
return errors.InternalServerError("go.micro.client.transport", err.Error()) return errors.InternalServerError("go.micro.client.transport", err.Error())
} }
c.buf.rbuf.Reset() c.buf.rbuf.Reset()
c.buf.rbuf.Write(m.Body) c.buf.rbuf.Write(m.Body)
var me codec.Message var me codec.Message
// set headers
me.Header = m.Header
// read header
err := c.codec.ReadHeader(&me, codec.Response) err := c.codec.ReadHeader(&me, codec.Response)
r.ServiceMethod = me.Method r.ServiceMethod = me.Method
r.Seq = me.Id r.Seq = me.Id
r.Error = me.Error r.Error = me.Error
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client.codec", err.Error()) return errors.InternalServerError("go.micro.client.codec", err.Error())
} }
return nil
}
func (c *rpcCodec) ReadResponseBody(b interface{}) error { // read body
if err := c.codec.ReadBody(b); err != nil { if err := c.codec.ReadBody(b); err != nil {
return errors.InternalServerError("go.micro.client.codec", err.Error()) return errors.InternalServerError("go.micro.client.codec", err.Error())
} }
return nil return nil
} }

View File

@ -68,7 +68,8 @@ func (r *rpcStream) Recv(msg interface{}) error {
} }
var resp response var resp response
if err := r.codec.ReadResponseHeader(&resp); err != nil {
if err := r.codec.ReadResponse(&resp, msg); err != nil {
if err == io.EOF && !r.isClosed() { if err == io.EOF && !r.isClosed() {
r.err = io.ErrUnexpectedEOF r.err = io.ErrUnexpectedEOF
return io.ErrUnexpectedEOF return io.ErrUnexpectedEOF
@ -87,13 +88,6 @@ func (r *rpcStream) Recv(msg interface{}) error {
} else { } else {
r.err = io.EOF r.err = io.EOF
} }
if err := r.codec.ReadResponseBody(nil); err != nil {
r.err = err
}
default:
if err := r.codec.ReadResponseBody(msg); err != nil {
r.err = err
}
} }
return r.err return r.err