handle codec and transport errors

This commit is contained in:
Asim Aslam 2016-12-06 19:40:44 +00:00
parent e10259940b
commit 095793ee96
3 changed files with 25 additions and 13 deletions

View File

@ -78,7 +78,7 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp
var grr error var grr error
c, err := r.pool.getConn(address, r.opts.Transport, transport.WithTimeout(opts.DialTimeout)) c, err := r.pool.getConn(address, r.opts.Transport, transport.WithTimeout(opts.DialTimeout))
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) return errors.InternalServerError("go.micro.client", fmt.Sprintf("connection error: %v", err))
} }
defer func() { defer func() {
// defer execution of release // defer execution of release
@ -154,7 +154,7 @@ func (r *rpcClient) stream(ctx context.Context, address string, req Request, opt
c, err := r.opts.Transport.Dial(address, transport.WithStream(), transport.WithTimeout(opts.DialTimeout)) c, err := r.opts.Transport.Dial(address, transport.WithStream(), transport.WithTimeout(opts.DialTimeout))
if err != nil { if err != nil {
return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("connection error: %v", err))
} }
stream := &rpcStream{ stream := &rpcStream{

View File

@ -2,11 +2,12 @@ package client
import ( import (
"bytes" "bytes"
"errors" errs "errors"
"github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec"
"github.com/micro/go-micro/codec/jsonrpc" "github.com/micro/go-micro/codec/jsonrpc"
"github.com/micro/go-micro/codec/protorpc" "github.com/micro/go-micro/codec/protorpc"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/transport" "github.com/micro/go-micro/transport"
) )
@ -24,7 +25,7 @@ func (e serverError) Error() string {
// errShutdown holds the specific error for closing/closed connections // errShutdown holds the specific error for closing/closed connections
var ( var (
errShutdown = errors.New("connection is shut down") errShutdown = errs.New("connection is shut down")
) )
type rpcPlusCodec struct { type rpcPlusCodec struct {
@ -112,19 +113,22 @@ func (c *rpcPlusCodec) WriteRequest(req *request, body interface{}) error {
Header: map[string]string{}, Header: map[string]string{},
} }
if err := c.codec.Write(m, body); err != nil { if err := c.codec.Write(m, body); err != nil {
return err return errors.InternalServerError("go.micro.client.codec", err.Error())
} }
c.req.Body = c.buf.wbuf.Bytes() c.req.Body = c.buf.wbuf.Bytes()
for k, v := range m.Header { for k, v := range m.Header {
c.req.Header[k] = v c.req.Header[k] = v
} }
return c.client.Send(c.req) if err := c.client.Send(c.req); err != nil {
return errors.InternalServerError("go.micro.client.transport", err.Error())
}
return nil
} }
func (c *rpcPlusCodec) ReadResponseHeader(r *response) error { func (c *rpcPlusCodec) ReadResponseHeader(r *response) error {
var m transport.Message var m transport.Message
if err := c.client.Recv(&m); err != nil { if err := c.client.Recv(&m); err != nil {
return err return errors.InternalServerError("go.micro.client.transport", err.Error())
} }
c.buf.rbuf.Reset() c.buf.rbuf.Reset()
c.buf.rbuf.Write(m.Body) c.buf.rbuf.Write(m.Body)
@ -133,15 +137,24 @@ func (c *rpcPlusCodec) ReadResponseHeader(r *response) error {
r.ServiceMethod = me.Method r.ServiceMethod = me.Method
r.Seq = me.Id r.Seq = me.Id
r.Error = me.Error r.Error = me.Error
return err if err != nil {
return errors.InternalServerError("go.micro.client.codec", err.Error())
}
return nil
} }
func (c *rpcPlusCodec) ReadResponseBody(b interface{}) error { func (c *rpcPlusCodec) ReadResponseBody(b interface{}) error {
return c.codec.ReadBody(b) if err := c.codec.ReadBody(b); err != nil {
return errors.InternalServerError("go.micro.client.codec", err.Error())
}
return nil
} }
func (c *rpcPlusCodec) Close() error { func (c *rpcPlusCodec) Close() error {
c.buf.Close() c.buf.Close()
c.codec.Close() c.codec.Close()
return c.client.Close() if err := c.client.Close(); err != nil {
return errors.InternalServerError("go.micro.client.transport", err.Error())
}
return nil
} }

View File

@ -1,7 +1,6 @@
package client package client
import ( import (
"errors"
"io" "io"
"sync" "sync"
@ -91,11 +90,11 @@ func (r *rpcStream) Recv(msg interface{}) error {
r.err = io.EOF r.err = io.EOF
} }
if err := r.codec.ReadResponseBody(nil); err != nil { if err := r.codec.ReadResponseBody(nil); err != nil {
r.err = errors.New("reading error payload: " + err.Error()) r.err = err
} }
default: default:
if err := r.codec.ReadResponseBody(msg); err != nil { if err := r.codec.ReadResponseBody(msg); err != nil {
r.err = errors.New("reading body " + err.Error()) r.err = err
} }
} }