diff --git a/server/rpc_request.go b/server/rpc_request.go index d01de930..31e2bb80 100644 --- a/server/rpc_request.go +++ b/server/rpc_request.go @@ -2,13 +2,16 @@ package server import ( "github.com/micro/go-micro/codec" + "github.com/micro/go-micro/transport" ) type rpcRequest struct { service string method string contentType string + socket transport.Socket codec codec.Codec + header map[string]string body []byte stream bool } @@ -35,8 +38,26 @@ func (r *rpcRequest) Method() string { return r.method } -func (r *rpcRequest) Body() []byte { - return r.body +func (r *rpcRequest) Header() map[string]string { + return r.header +} + +func (r *rpcRequest) Read() ([]byte, error) { + // got a body + if r.body != nil { + b := r.body + r.body = nil + return b, nil + } + + var msg transport.Message + err := r.socket.Recv(&msg) + if err != nil { + return nil, err + } + r.header = msg.Header + + return msg.Body, nil } func (r *rpcRequest) Stream() bool { diff --git a/server/rpc_response.go b/server/rpc_response.go new file mode 100644 index 00000000..92f974a1 --- /dev/null +++ b/server/rpc_response.go @@ -0,0 +1,29 @@ +package server + +import ( + "net/http" + + "github.com/micro/go-micro/transport" +) + +type rpcResponse struct { + header map[string]string + socket transport.Socket +} + +func (r *rpcResponse) WriteHeader(hdr map[string]string) { + for k, v := range hdr { + r.header[k] = v + } +} + +func (r *rpcResponse) Write(b []byte) error { + if _, ok := r.header["Content-Type"]; !ok { + r.header["Content-Type"] = http.DetectContentType(b) + } + + return r.socket.Send(&transport.Message{ + Header: r.header, + Body: b, + }) +} diff --git a/server/rpc_router.go b/server/rpc_router.go index 477d5acb..67db4f20 100644 --- a/server/rpc_router.go +++ b/server/rpc_router.go @@ -18,7 +18,6 @@ import ( "github.com/micro/go-log" "github.com/micro/go-micro/codec" - "github.com/micro/go-micro/transport" ) var ( @@ -449,7 +448,7 @@ func (router *router) Handle(h Handler) error { return nil } -func (router *router) ServeRequest(ctx context.Context, r Request, s transport.Socket) error { +func (router *router) ServeRequest(ctx context.Context, r Request, rsp Response) error { cc := r.Codec() sending := new(sync.Mutex) service, mtype, req, argv, replyv, keepReading, err := router.readRequest(r) diff --git a/server/rpc_server.go b/server/rpc_server.go index d0ed7715..1fc445a6 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -116,20 +116,26 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { method: msg.Header["X-Micro-Method"], contentType: ct, codec: newRpcCodec(&msg, sock, cf), + header: msg.Header, body: msg.Body, + socket: sock, stream: true, } - // set router - var r Router - r = s.router + // internal response + response := &rpcResponse{ + header: make(map[string]string), + socket: sock, + } - if s.opts.Router != nil { - r = s.opts.Router + // set router + r := s.opts.Router + if s.opts.Router == nil { + r = s.router } // TODO: needs better error handling - if err := r.ServeRequest(ctx, request, sock); err != nil { + if err := r.ServeRequest(ctx, request, response); err != nil { s.wg.Done() log.Logf("Unexpected error serving request, closing socket: %v", err) return diff --git a/server/server.go b/server/server.go index 2bc4a99a..4b390f0a 100644 --- a/server/server.go +++ b/server/server.go @@ -11,7 +11,6 @@ import ( "github.com/micro/go-log" "github.com/micro/go-micro/codec" "github.com/micro/go-micro/registry" - "github.com/micro/go-micro/transport" ) // Server is a simple micro server abstraction @@ -32,7 +31,7 @@ type Server interface { // Router handle serving messages type Router interface { // ServeRequest processes a request to completion - ServeRequest(context.Context, Request, transport.Socket) error + ServeRequest(context.Context, Request, Response) error } // Message is an async message interface @@ -48,16 +47,26 @@ type Request interface { Service() string // Method name requested Method() string - // The initial request body - Body() []byte // Content type provided ContentType() string - // The codec for encoding/decoding messages + // Header of the request + Header() map[string]string + // Read the undecoded request body + Read() ([]byte, error) + // The encoded message stream Codec() codec.Codec // Indicates whether its a stream Stream() bool } +// Response is the response writer for unencoded messages +type Response interface { + // Write the header + WriteHeader(map[string]string) + // write a response directly to the client + Write([]byte) error +} + // Stream represents a stream established with a client. // A stream can be bidirectional which is indicated by the request. // The last error will be left in Error().