Merge pull request #396 from micro/error
Fix #394 invalid error handling in rpc_router ServeRequest
This commit is contained in:
		| @@ -162,27 +162,23 @@ func prepareMethod(method reflect.Method) *methodType { | ||||
| 	return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream} | ||||
| } | ||||
|  | ||||
| func (router *router) sendResponse(sending sync.Locker, req *request, reply interface{}, cc codec.Writer, errmsg string, last bool) (err error) { | ||||
| func (router *router) sendResponse(sending sync.Locker, req *request, reply interface{}, cc codec.Writer, last bool) error { | ||||
| 	msg := new(codec.Message) | ||||
| 	msg.Type = codec.Response | ||||
| 	resp := router.getResponse() | ||||
| 	resp.msg = msg | ||||
|  | ||||
| 	// Encode the response header | ||||
| 	resp.msg.Endpoint = req.msg.Endpoint | ||||
| 	if errmsg != "" { | ||||
| 		resp.msg.Error = errmsg | ||||
| 		reply = invalidRequest | ||||
| 	} | ||||
| 	resp.msg.Id = req.msg.Id | ||||
| 	sending.Lock() | ||||
| 	err = cc.Write(resp.msg, reply) | ||||
| 	err := cc.Write(resp.msg, reply) | ||||
| 	sending.Unlock() | ||||
| 	router.freeResponse(resp) | ||||
| 	return err | ||||
| } | ||||
|  | ||||
| func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, mtype *methodType, req *request, argv, replyv reflect.Value, cc codec.Writer) { | ||||
| func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, mtype *methodType, req *request, argv, replyv reflect.Value, cc codec.Writer) error { | ||||
| 	defer router.freeRequest(req) | ||||
|  | ||||
| 	function := mtype.method.Func | ||||
| 	var returnValues []reflect.Value | ||||
|  | ||||
| @@ -206,18 +202,13 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, | ||||
| 			return nil | ||||
| 		} | ||||
|  | ||||
| 		errmsg := "" | ||||
| 		err := fn(ctx, r, replyv.Interface()) | ||||
| 		if err != nil { | ||||
| 			errmsg = err.Error() | ||||
| 		// execute handler | ||||
| 		if err := fn(ctx, r, replyv.Interface()); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		err = router.sendResponse(sending, req, replyv.Interface(), cc, errmsg, true) | ||||
| 		if err != nil { | ||||
| 			log.Log("rpc call: unable to send response: ", err) | ||||
| 		} | ||||
| 		router.freeRequest(req) | ||||
| 		return | ||||
| 		// send response | ||||
| 		return router.sendResponse(sending, req, replyv.Interface(), cc, true) | ||||
| 	} | ||||
|  | ||||
| 	// declare a local error to see if we errored out already | ||||
| @@ -250,16 +241,15 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, | ||||
| 	// client.Stream request | ||||
| 	r.stream = true | ||||
|  | ||||
| 	errmsg := "" | ||||
| 	// execute handler | ||||
| 	if err := fn(ctx, r, stream); err != nil { | ||||
| 		errmsg = err.Error() | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	// this is the last packet, we don't do anything with | ||||
| 	// the error here (well sendStreamResponse will log it | ||||
| 	// already) | ||||
| 	router.sendResponse(sending, req, nil, cc, errmsg, true) | ||||
| 	router.freeRequest(req) | ||||
| 	return router.sendResponse(sending, req, nil, cc, true) | ||||
| } | ||||
|  | ||||
| func (m *methodType) prepareContext(ctx context.Context) reflect.Value { | ||||
| @@ -448,11 +438,9 @@ func (router *router) ServeRequest(ctx context.Context, r Request, rsp Response) | ||||
| 		} | ||||
| 		// send a response if we actually managed to read a header. | ||||
| 		if req != nil { | ||||
| 			router.sendResponse(sending, req, invalidRequest, rsp.Codec(), err.Error(), true) | ||||
| 			router.freeRequest(req) | ||||
| 		} | ||||
| 		return err | ||||
| 	} | ||||
| 	service.call(ctx, router, sending, mtype, req, argv, replyv, rsp.Codec()) | ||||
| 	return nil | ||||
| 	return service.call(ctx, router, sending, mtype, req, argv, replyv, rsp.Codec()) | ||||
| } | ||||
|   | ||||
| @@ -158,12 +158,15 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { | ||||
| 		// TODO: handle error better | ||||
| 		if err := handler(ctx, request, response); err != nil { | ||||
| 			// write an error response | ||||
| 			rcodec.Write(&codec.Message{ | ||||
| 			err = rcodec.Write(&codec.Message{ | ||||
| 				Header: msg.Header, | ||||
| 				Error:  err.Error(), | ||||
| 				Type:   codec.Error, | ||||
| 			}, nil) | ||||
|  | ||||
| 			// could not write the error response | ||||
| 			if err != nil { | ||||
| 				log.Logf("rpc: unable to write error response: %v", err) | ||||
| 			} | ||||
| 			s.wg.Done() | ||||
| 			return | ||||
| 		} | ||||
|   | ||||
		Reference in New Issue
	
	Block a user