diff --git a/codec/codec.go b/codec/codec.go index 7909d3f8..c41cfaa9 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -39,5 +39,8 @@ type Message struct { Target string Method string Error string + + // The values read from the socket Header map[string]string + Body []byte } diff --git a/server/rpc_codec.go b/server/rpc_codec.go index 8cf209dd..abceda03 100644 --- a/server/rpc_codec.go +++ b/server/rpc_codec.go @@ -73,7 +73,11 @@ func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCod } func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error { - m := codec.Message{Header: c.req.Header} + // the initieal message + m := codec.Message{ + Header: c.req.Header, + Body: c.req.Body, + } // if its a follow on request read it if !c.first { @@ -93,6 +97,8 @@ func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error { // set the message header m.Header = tm.Header + // set the message body + m.Body = tm.Body } // no longer first read @@ -117,7 +123,7 @@ func (c *rpcCodec) ReadBody(b interface{}) error { return c.codec.ReadBody(b) } -func (c *rpcCodec) Write(r *codec.Message, body interface{}) error { +func (c *rpcCodec) Write(r *codec.Message, b interface{}) error { c.buf.wbuf.Reset() // create a new message @@ -134,22 +140,33 @@ func (c *rpcCodec) Write(r *codec.Message, body interface{}) error { }, } - // write to the body - if err := c.codec.Write(m, body); err != nil { + // the body being sent + var body []byte + + // if we have encoded data just send it + if len(r.Body) > 0 { + body = r.Body + // write to the body + } else if err := c.codec.Write(m, b); err != nil { c.buf.wbuf.Reset() // write an error if it failed m.Error = errors.Wrapf(err, "Unable to encode body").Error() m.Header["X-Micro-Error"] = m.Error + // no body to write if err := c.codec.Write(m, nil); err != nil { return err } + // write the body + } else { + // set the body + body = c.buf.wbuf.Bytes() } // send on the socket return c.socket.Send(&transport.Message{ Header: m.Header, - Body: c.buf.wbuf.Bytes(), + Body: body, }) } diff --git a/server/rpc_request.go b/server/rpc_request.go index b96286c2..d01de930 100644 --- a/server/rpc_request.go +++ b/server/rpc_request.go @@ -1,10 +1,15 @@ package server +import ( + "github.com/micro/go-micro/codec" +) + type rpcRequest struct { service string method string contentType string - request interface{} + codec codec.Codec + body []byte stream bool } @@ -14,6 +19,10 @@ type rpcMessage struct { payload interface{} } +func (r *rpcRequest) Codec() codec.Codec { + return r.codec +} + func (r *rpcRequest) ContentType() string { return r.contentType } @@ -26,8 +35,8 @@ func (r *rpcRequest) Method() string { return r.method } -func (r *rpcRequest) Request() interface{} { - return r.request +func (r *rpcRequest) Body() []byte { + return r.body } func (r *rpcRequest) Stream() bool { diff --git a/server/rpc_router.go b/server/rpc_router.go index 291e3bac..477d5acb 100644 --- a/server/rpc_router.go +++ b/server/rpc_router.go @@ -18,6 +18,7 @@ import ( "github.com/micro/go-log" "github.com/micro/go-micro/codec" + "github.com/micro/go-micro/transport" ) var ( @@ -184,21 +185,20 @@ func (router *router) sendResponse(sending sync.Locker, req *request, reply inte return err } -func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, mtype *methodType, req *request, argv, replyv reflect.Value, codec codec.Codec) { +func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, mtype *methodType, req *request, argv, replyv reflect.Value, cc codec.Codec) { function := mtype.method.Func var returnValues []reflect.Value r := &rpcRequest{ - service: router.name, + service: req.msg.Target, contentType: req.msg.Header["Content-Type"], method: req.msg.Method, + body: req.msg.Body, } if !mtype.stream { - r.request = argv.Interface() - fn := func(ctx context.Context, req Request, rsp interface{}) error { - returnValues = function.Call([]reflect.Value{s.rcvr, mtype.prepareContext(ctx), reflect.ValueOf(req.Request()), reflect.ValueOf(rsp)}) + returnValues = function.Call([]reflect.Value{s.rcvr, mtype.prepareContext(ctx), reflect.ValueOf(argv.Interface()), reflect.ValueOf(rsp)}) // The return value for the method is an error. if err := returnValues[0].Interface(); err != nil { @@ -218,7 +218,7 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, errmsg = err.Error() } - err = router.sendResponse(sending, req, replyv.Interface(), codec, errmsg, true) + err = router.sendResponse(sending, req, replyv.Interface(), cc, errmsg, true) if err != nil { log.Log("rpc call: unable to send response: ", err) } @@ -233,7 +233,7 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, stream := &rpcStream{ context: ctx, - codec: codec, + codec: cc, request: r, id: req.msg.Id, } @@ -268,7 +268,7 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, // this is the last packet, we don't do anything with // the error here (well sendStreamResponse will log it // already) - router.sendResponse(sending, req, nil, codec, errmsg, true) + router.sendResponse(sending, req, nil, cc, errmsg, true) router.freeRequest(req) } @@ -319,7 +319,9 @@ func (router *router) freeResponse(resp *response) { router.respLock.Unlock() } -func (router *router) readRequest(cc codec.Codec) (service *service, mtype *methodType, req *request, argv, replyv reflect.Value, keepReading bool, err error) { +func (router *router) readRequest(r Request) (service *service, mtype *methodType, req *request, argv, replyv reflect.Value, keepReading bool, err error) { + cc := r.Codec() + service, mtype, req, keepReading, err = router.readHeader(cc) if err != nil { if !keepReading { @@ -447,9 +449,10 @@ func (router *router) Handle(h Handler) error { return nil } -func (router *router) ServeRequest(ctx context.Context, cc codec.Codec) error { +func (router *router) ServeRequest(ctx context.Context, r Request, s transport.Socket) error { + cc := r.Codec() sending := new(sync.Mutex) - service, mtype, req, argv, replyv, keepReading, err := router.readRequest(cc) + service, mtype, req, argv, replyv, keepReading, err := router.readRequest(r) if err != nil { if !keepReading { return err diff --git a/server/rpc_server.go b/server/rpc_server.go index 6d73f63b..d0ed7715 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -110,8 +110,15 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { return } - // create the internal server codec - codec := newRpcCodec(&msg, sock, cf) + // internal request + request := &rpcRequest{ + service: msg.Header["X-Micro-Service"], + method: msg.Header["X-Micro-Method"], + contentType: ct, + codec: newRpcCodec(&msg, sock, cf), + body: msg.Body, + stream: true, + } // set router var r Router @@ -122,7 +129,7 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { } // TODO: needs better error handling - if err := r.ServeRequest(ctx, codec); err != nil { + if err := r.ServeRequest(ctx, request, sock); err != nil { s.wg.Done() log.Logf("Unexpected error serving request, closing socket: %v", err) return diff --git a/server/server.go b/server/server.go index 5e454824..2bc4a99a 100644 --- a/server/server.go +++ b/server/server.go @@ -11,6 +11,7 @@ import ( "github.com/micro/go-log" "github.com/micro/go-micro/codec" "github.com/micro/go-micro/registry" + "github.com/micro/go-micro/transport" ) // Server is a simple micro server abstraction @@ -30,7 +31,8 @@ type Server interface { // Router handle serving messages type Router interface { - ServeRequest(context.Context, codec.Codec) error + // ServeRequest processes a request to completion + ServeRequest(context.Context, Request, transport.Socket) error } // Message is an async message interface @@ -42,11 +44,17 @@ type Message interface { // Request is a synchronous request interface type Request interface { + // Service name requested Service() string + // Method name requested Method() string + // The initial request body + Body() []byte + // Content type provided ContentType() string - Request() interface{} - // indicates whether the request will be streamed + // The codec for encoding/decoding messages + Codec() codec.Codec + // Indicates whether its a stream Stream() bool }