Merge pull request #396 from micro/error

Fix #394 invalid error handling in rpc_router ServeRequest
This commit is contained in:
Asim Aslam 2019-01-22 14:28:41 +00:00 committed by GitHub
commit d090a97a3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 19 additions and 28 deletions

View File

@ -162,27 +162,23 @@ func prepareMethod(method reflect.Method) *methodType {
return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream} return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream}
} }
func (router *router) sendResponse(sending sync.Locker, req *request, reply interface{}, cc codec.Writer, errmsg string, last bool) (err error) { func (router *router) sendResponse(sending sync.Locker, req *request, reply interface{}, cc codec.Writer, last bool) error {
msg := new(codec.Message) msg := new(codec.Message)
msg.Type = codec.Response msg.Type = codec.Response
resp := router.getResponse() resp := router.getResponse()
resp.msg = msg resp.msg = msg
// Encode the response header
resp.msg.Endpoint = req.msg.Endpoint
if errmsg != "" {
resp.msg.Error = errmsg
reply = invalidRequest
}
resp.msg.Id = req.msg.Id resp.msg.Id = req.msg.Id
sending.Lock() sending.Lock()
err = cc.Write(resp.msg, reply) err := cc.Write(resp.msg, reply)
sending.Unlock() sending.Unlock()
router.freeResponse(resp) router.freeResponse(resp)
return err return err
} }
func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, mtype *methodType, req *request, argv, replyv reflect.Value, cc codec.Writer) { func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, mtype *methodType, req *request, argv, replyv reflect.Value, cc codec.Writer) error {
defer router.freeRequest(req)
function := mtype.method.Func function := mtype.method.Func
var returnValues []reflect.Value var returnValues []reflect.Value
@ -206,18 +202,13 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex,
return nil return nil
} }
errmsg := "" // execute handler
err := fn(ctx, r, replyv.Interface()) if err := fn(ctx, r, replyv.Interface()); err != nil {
if err != nil { return err
errmsg = err.Error()
} }
err = router.sendResponse(sending, req, replyv.Interface(), cc, errmsg, true) // send response
if err != nil { return router.sendResponse(sending, req, replyv.Interface(), cc, true)
log.Log("rpc call: unable to send response: ", err)
}
router.freeRequest(req)
return
} }
// declare a local error to see if we errored out already // declare a local error to see if we errored out already
@ -250,16 +241,15 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex,
// client.Stream request // client.Stream request
r.stream = true r.stream = true
errmsg := "" // execute handler
if err := fn(ctx, r, stream); err != nil { if err := fn(ctx, r, stream); err != nil {
errmsg = err.Error() return err
} }
// this is the last packet, we don't do anything with // this is the last packet, we don't do anything with
// the error here (well sendStreamResponse will log it // the error here (well sendStreamResponse will log it
// already) // already)
router.sendResponse(sending, req, nil, cc, errmsg, true) return router.sendResponse(sending, req, nil, cc, true)
router.freeRequest(req)
} }
func (m *methodType) prepareContext(ctx context.Context) reflect.Value { func (m *methodType) prepareContext(ctx context.Context) reflect.Value {
@ -448,11 +438,9 @@ func (router *router) ServeRequest(ctx context.Context, r Request, rsp Response)
} }
// send a response if we actually managed to read a header. // send a response if we actually managed to read a header.
if req != nil { if req != nil {
router.sendResponse(sending, req, invalidRequest, rsp.Codec(), err.Error(), true)
router.freeRequest(req) router.freeRequest(req)
} }
return err return err
} }
service.call(ctx, router, sending, mtype, req, argv, replyv, rsp.Codec()) return service.call(ctx, router, sending, mtype, req, argv, replyv, rsp.Codec())
return nil
} }

View File

@ -158,12 +158,15 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
// TODO: handle error better // TODO: handle error better
if err := handler(ctx, request, response); err != nil { if err := handler(ctx, request, response); err != nil {
// write an error response // write an error response
rcodec.Write(&codec.Message{ err = rcodec.Write(&codec.Message{
Header: msg.Header, Header: msg.Header,
Error: err.Error(), Error: err.Error(),
Type: codec.Error, Type: codec.Error,
}, nil) }, nil)
// could not write the error response
if err != nil {
log.Logf("rpc: unable to write error response: %v", err)
}
s.wg.Done() s.wg.Done()
return return
} }