diff --git a/server/rpc_router.go b/server/rpc_router.go index 111b3fde..5d23d5b8 100644 --- a/server/rpc_router.go +++ b/server/rpc_router.go @@ -162,27 +162,23 @@ func prepareMethod(method reflect.Method) *methodType { return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream} } -func (router *router) sendResponse(sending sync.Locker, req *request, reply interface{}, cc codec.Writer, errmsg string, last bool) (err error) { +func (router *router) sendResponse(sending sync.Locker, req *request, reply interface{}, cc codec.Writer, last bool) error { msg := new(codec.Message) msg.Type = codec.Response resp := router.getResponse() resp.msg = msg - // Encode the response header - resp.msg.Endpoint = req.msg.Endpoint - if errmsg != "" { - resp.msg.Error = errmsg - reply = invalidRequest - } resp.msg.Id = req.msg.Id sending.Lock() - err = cc.Write(resp.msg, reply) + err := cc.Write(resp.msg, reply) sending.Unlock() router.freeResponse(resp) return err } -func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, mtype *methodType, req *request, argv, replyv reflect.Value, cc codec.Writer) { +func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, mtype *methodType, req *request, argv, replyv reflect.Value, cc codec.Writer) error { + defer router.freeRequest(req) + function := mtype.method.Func var returnValues []reflect.Value @@ -206,18 +202,13 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, return nil } - errmsg := "" - err := fn(ctx, r, replyv.Interface()) - if err != nil { - errmsg = err.Error() + // execute handler + if err := fn(ctx, r, replyv.Interface()); err != nil { + return err } - err = router.sendResponse(sending, req, replyv.Interface(), cc, errmsg, true) - if err != nil { - log.Log("rpc call: unable to send response: ", err) - } - router.freeRequest(req) - return + // send response + return router.sendResponse(sending, req, replyv.Interface(), cc, true) } // declare a local error to see if we errored out already @@ -250,16 +241,15 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, // client.Stream request r.stream = true - errmsg := "" + // execute handler if err := fn(ctx, r, stream); err != nil { - errmsg = err.Error() + return err } // this is the last packet, we don't do anything with // the error here (well sendStreamResponse will log it // already) - router.sendResponse(sending, req, nil, cc, errmsg, true) - router.freeRequest(req) + return router.sendResponse(sending, req, nil, cc, true) } func (m *methodType) prepareContext(ctx context.Context) reflect.Value { @@ -448,11 +438,9 @@ func (router *router) ServeRequest(ctx context.Context, r Request, rsp Response) } // send a response if we actually managed to read a header. if req != nil { - router.sendResponse(sending, req, invalidRequest, rsp.Codec(), err.Error(), true) router.freeRequest(req) } return err } - service.call(ctx, router, sending, mtype, req, argv, replyv, rsp.Codec()) - return nil + return service.call(ctx, router, sending, mtype, req, argv, replyv, rsp.Codec()) } diff --git a/server/rpc_server.go b/server/rpc_server.go index 469a4bc4..41462723 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -158,12 +158,15 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { // TODO: handle error better if err := handler(ctx, request, response); err != nil { // write an error response - rcodec.Write(&codec.Message{ + err = rcodec.Write(&codec.Message{ Header: msg.Header, Error: err.Error(), Type: codec.Error, }, nil) - + // could not write the error response + if err != nil { + log.Logf("rpc: unable to write error response: %v", err) + } s.wg.Done() return }