diff --git a/client/client.go b/client/client.go index 74ec51be..32cb9a6b 100644 --- a/client/client.go +++ b/client/client.go @@ -4,6 +4,8 @@ package client import ( "context" "time" + + "github.com/micro/go-micro/codec" ) // Client is the interface used to make requests to services. @@ -20,6 +22,11 @@ type Client interface { String() string } +// Router manages request routing +type Router interface { + SendRequest(context.Context, Request) (Response, error) +} + // Message is the interface for publishing asynchronously type Message interface { Topic() string @@ -29,14 +36,30 @@ type Message interface { // Request is the interface for a synchronous request used by Call or Stream type Request interface { + // The service to call Service() string + // The method to call Method() string + // The content type ContentType() string - Request() interface{} + // The unencoded request body + Body() interface{} + // Write to the encoded request writer. This is nil before a call is made + Codec() codec.Writer // indicates whether the request will be a streaming one rather than unary Stream() bool } +// Response is the response received from a service +type Response interface { + // Read the response + Codec() codec.Reader + // read the header + Header() map[string]string + // Read the undecoded response + Read() ([]byte, error) +} + // Stream is the inteface for a bidirectional synchronous stream type Stream interface { Context() context.Context diff --git a/client/rpc_client.go b/client/rpc_client.go index 04be269e..336dd34f 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -97,7 +97,7 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp request: req, closed: make(chan bool), codec: newRpcCodec(msg, c, cf), - seq: fmt.Sprintf("%v", seq), + id: fmt.Sprintf("%v", seq), } defer stream.Close() @@ -111,7 +111,7 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp }() // send request - if err := stream.Send(req.Request()); err != nil { + if err := stream.Send(req.Body()); err != nil { ch <- err return } @@ -183,7 +183,7 @@ func (r *rpcClient) stream(ctx context.Context, address string, req Request, opt ch := make(chan error, 1) go func() { - ch <- stream.Send(req.Request()) + ch <- stream.Send(req.Body()) }() var grr error diff --git a/client/rpc_codec.go b/client/rpc_codec.go index d269392f..fe694380 100644 --- a/client/rpc_codec.go +++ b/client/rpc_codec.go @@ -3,7 +3,6 @@ package client import ( "bytes" errs "errors" - "fmt" "github.com/micro/go-micro/codec" raw "github.com/micro/go-micro/codec/bytes" @@ -85,7 +84,7 @@ func (rwc *readWriteCloser) Close() error { return nil } -func newRpcCodec(req *transport.Message, client transport.Client, c codec.NewCodec) *rpcCodec { +func newRpcCodec(req *transport.Message, client transport.Client, c codec.NewCodec) codec.Codec { rwc := &readWriteCloser{ wbuf: bytes.NewBuffer(nil), rbuf: bytes.NewBuffer(nil), @@ -99,34 +98,45 @@ func newRpcCodec(req *transport.Message, client transport.Client, c codec.NewCod return r } -func (c *rpcCodec) Write(req *request, body interface{}) error { +func (c *rpcCodec) Write(wm *codec.Message, body interface{}) error { c.buf.wbuf.Reset() m := &codec.Message{ - Id: req.Seq, - Target: req.Service, - Method: req.ServiceMethod, + Id: wm.Id, + Target: wm.Target, + Method: wm.Method, Type: codec.Request, Header: map[string]string{ - "X-Micro-Id": fmt.Sprintf("%v", req.Seq), - "X-Micro-Service": req.Service, - "X-Micro-Method": req.ServiceMethod, + "X-Micro-Id": wm.Id, + "X-Micro-Service": wm.Target, + "X-Micro-Method": wm.Method, }, } + if err := c.codec.Write(m, body); err != nil { return errors.InternalServerError("go.micro.client.codec", err.Error()) } - c.req.Body = c.buf.wbuf.Bytes() + + // set body + if len(wm.Body) > 0 { + c.req.Body = wm.Body + } else { + c.req.Body = c.buf.wbuf.Bytes() + } + + // set header for k, v := range m.Header { c.req.Header[k] = v } + + // send the request if err := c.client.Send(c.req); err != nil { return errors.InternalServerError("go.micro.client.transport", err.Error()) } return nil } -func (c *rpcCodec) Read(r *response, b interface{}) error { +func (c *rpcCodec) ReadHeader(wm *codec.Message, r codec.MessageType) error { var m transport.Message if err := c.client.Recv(&m); err != nil { return errors.InternalServerError("go.micro.client.transport", err.Error()) @@ -139,34 +149,38 @@ func (c *rpcCodec) Read(r *response, b interface{}) error { me.Header = m.Header // read header - err := c.codec.ReadHeader(&me, codec.Response) - r.ServiceMethod = me.Method - r.Seq = me.Id - r.Error = me.Error + err := c.codec.ReadHeader(&me, r) + wm.Method = me.Method + wm.Id = me.Id + wm.Error = me.Error // check error in header if len(me.Error) == 0 { - r.Error = me.Header["X-Micro-Error"] + wm.Error = me.Header["X-Micro-Error"] } // check method in header if len(me.Method) == 0 { - r.ServiceMethod = me.Header["X-Micro-Method"] + wm.Method = me.Header["X-Micro-Method"] } if len(me.Id) == 0 { - r.Seq = me.Header["X-Micro-Id"] + wm.Id = me.Header["X-Micro-Id"] } + // return header error if err != nil { return errors.InternalServerError("go.micro.client.codec", err.Error()) } + return nil +} + +func (c *rpcCodec) ReadBody(b interface{}) error { // read body if err := c.codec.ReadBody(b); err != nil { return errors.InternalServerError("go.micro.client.codec", err.Error()) } - return nil } @@ -178,3 +192,7 @@ func (c *rpcCodec) Close() error { } return nil } + +func (c *rpcCodec) String() string { + return "rpc" +} diff --git a/client/rpc_request.go b/client/rpc_request.go index 04ce490a..b7f9695e 100644 --- a/client/rpc_request.go +++ b/client/rpc_request.go @@ -1,10 +1,15 @@ package client +import ( + "github.com/micro/go-micro/codec" +) + type rpcRequest struct { service string method string contentType string - request interface{} + codec codec.Codec + body interface{} opts RequestOptions } @@ -23,7 +28,7 @@ func newRequest(service, method string, request interface{}, contentType string, return &rpcRequest{ service: service, method: method, - request: request, + body: request, contentType: contentType, opts: opts, } @@ -41,8 +46,12 @@ func (r *rpcRequest) Method() string { return r.method } -func (r *rpcRequest) Request() interface{} { - return r.request +func (r *rpcRequest) Body() interface{} { + return r.body +} + +func (r *rpcRequest) Codec() codec.Writer { + return r.codec } func (r *rpcRequest) Stream() bool { diff --git a/client/rpc_stream.go b/client/rpc_stream.go index 334b1fc0..838172d4 100644 --- a/client/rpc_stream.go +++ b/client/rpc_stream.go @@ -4,16 +4,18 @@ import ( "context" "io" "sync" + + "github.com/micro/go-micro/codec" ) // Implements the streamer interface type rpcStream struct { sync.RWMutex - seq string + id string closed chan bool err error request Request - codec *rpcCodec + codec codec.Codec context context.Context } @@ -43,18 +45,18 @@ func (r *rpcStream) Send(msg interface{}) error { return errShutdown } - seq := r.seq - - req := request{ - Service: r.request.Service(), - Seq: seq, - ServiceMethod: r.request.Method(), + req := codec.Message{ + Id: r.id, + Target: r.request.Service(), + Method: r.request.Method(), + Type: codec.Request, } if err := r.codec.Write(&req, msg); err != nil { r.err = err return err } + return nil } @@ -67,9 +69,9 @@ func (r *rpcStream) Recv(msg interface{}) error { return errShutdown } - var resp response + var resp codec.Message - if err := r.codec.Read(&resp, msg); err != nil { + if err := r.codec.ReadHeader(&resp, codec.Response); err != nil { if err == io.EOF && !r.isClosed() { r.err = io.ErrUnexpectedEOF return io.ErrUnexpectedEOF @@ -81,13 +83,20 @@ func (r *rpcStream) Recv(msg interface{}) error { switch { case len(resp.Error) > 0: // We've got an error response. Give this to the request; - // any subsequent requests will get the ReadBody + // any subsequent requests will get the ReadResponseBody // error if there is one. if resp.Error != lastStreamResponseError { r.err = serverError(resp.Error) } else { r.err = io.EOF } + if err := r.codec.ReadBody(nil); err != nil { + r.err = err + } + default: + if err := r.codec.ReadBody(msg); err != nil { + r.err = err + } } return r.err