diff --git a/codec/codec.go b/codec/codec.go index c41cfaa9..8277cf51 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -23,13 +23,21 @@ type NewCodec func(io.ReadWriteCloser) Codec // connection. ReadBody may be called with a nil argument to force the // body to be read and discarded. type Codec interface { - ReadHeader(*Message, MessageType) error - ReadBody(interface{}) error - Write(*Message, interface{}) error + Reader + Writer Close() error String() string } +type Reader interface { + ReadHeader(*Message, MessageType) error + ReadBody(interface{}) error +} + +type Writer interface { + Write(*Message, interface{}) error +} + // Message represents detailed information about // the communication, likely followed by the body. // In the case of an error, body may be nil. @@ -42,5 +50,5 @@ type Message struct { // The values read from the socket Header map[string]string - Body []byte + Body []byte } diff --git a/server/rpc_request.go b/server/rpc_request.go index 31e2bb80..cbc4179d 100644 --- a/server/rpc_request.go +++ b/server/rpc_request.go @@ -22,7 +22,7 @@ type rpcMessage struct { payload interface{} } -func (r *rpcRequest) Codec() codec.Codec { +func (r *rpcRequest) Codec() codec.Reader { return r.codec } diff --git a/server/rpc_response.go b/server/rpc_response.go index 92f974a1..d89fa0b6 100644 --- a/server/rpc_response.go +++ b/server/rpc_response.go @@ -3,12 +3,18 @@ package server import ( "net/http" + "github.com/micro/go-micro/codec" "github.com/micro/go-micro/transport" ) type rpcResponse struct { header map[string]string socket transport.Socket + codec codec.Codec +} + +func (r *rpcResponse) Codec() codec.Writer { + return r.codec } func (r *rpcResponse) WriteHeader(hdr map[string]string) { diff --git a/server/rpc_router.go b/server/rpc_router.go index 67db4f20..1043036b 100644 --- a/server/rpc_router.go +++ b/server/rpc_router.go @@ -164,7 +164,7 @@ func prepareMethod(method reflect.Method) *methodType { return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream} } -func (router *router) sendResponse(sending sync.Locker, req *request, reply interface{}, cc codec.Codec, errmsg string, last bool) (err error) { +func (router *router) sendResponse(sending sync.Locker, req *request, reply interface{}, cc codec.Writer, errmsg string, last bool) (err error) { msg := new(codec.Message) msg.Type = codec.Response resp := router.getResponse() @@ -184,7 +184,7 @@ func (router *router) sendResponse(sending sync.Locker, req *request, reply inte return err } -func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, mtype *methodType, req *request, argv, replyv reflect.Value, cc codec.Codec) { +func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, mtype *methodType, req *request, argv, replyv reflect.Value, cc codec.Writer) { function := mtype.method.Func var returnValues []reflect.Value @@ -232,7 +232,7 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, stream := &rpcStream{ context: ctx, - codec: cc, + codec: cc.(codec.Codec), request: r, id: req.msg.Id, } @@ -358,7 +358,7 @@ func (router *router) readRequest(r Request) (service *service, mtype *methodTyp return } -func (router *router) readHeader(cc codec.Codec) (service *service, mtype *methodType, req *request, keepReading bool, err error) { +func (router *router) readHeader(cc codec.Reader) (service *service, mtype *methodType, req *request, keepReading bool, err error) { // Grab the request header. msg := new(codec.Message) msg.Type = codec.Request @@ -449,7 +449,6 @@ func (router *router) Handle(h Handler) error { } func (router *router) ServeRequest(ctx context.Context, r Request, rsp Response) error { - cc := r.Codec() sending := new(sync.Mutex) service, mtype, req, argv, replyv, keepReading, err := router.readRequest(r) if err != nil { @@ -458,11 +457,11 @@ func (router *router) ServeRequest(ctx context.Context, r Request, rsp Response) } // send a response if we actually managed to read a header. if req != nil { - router.sendResponse(sending, req, invalidRequest, cc, err.Error(), true) + router.sendResponse(sending, req, invalidRequest, rsp.Codec(), err.Error(), true) router.freeRequest(req) } return err } - service.call(ctx, router, sending, mtype, req, argv, replyv, cc) + service.call(ctx, router, sending, mtype, req, argv, replyv, rsp.Codec()) return nil } diff --git a/server/rpc_server.go b/server/rpc_server.go index 1fc445a6..4481d9bb 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -110,12 +110,14 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { return } + codec := newRpcCodec(&msg, sock, cf) + // internal request request := &rpcRequest{ service: msg.Header["X-Micro-Service"], method: msg.Header["X-Micro-Method"], contentType: ct, - codec: newRpcCodec(&msg, sock, cf), + codec: codec, header: msg.Header, body: msg.Body, socket: sock, @@ -126,6 +128,7 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { response := &rpcResponse{ header: make(map[string]string), socket: sock, + codec: codec, } // set router diff --git a/server/server.go b/server/server.go index 4b390f0a..fbfacd0d 100644 --- a/server/server.go +++ b/server/server.go @@ -54,13 +54,15 @@ type Request interface { // Read the undecoded request body Read() ([]byte, error) // The encoded message stream - Codec() codec.Codec + Codec() codec.Reader // Indicates whether its a stream Stream() bool } // Response is the response writer for unencoded messages type Response interface { + // Encoded writer + Codec() codec.Writer // Write the header WriteHeader(map[string]string) // write a response directly to the client