diff --git a/client/client.go b/client/client.go index 494d579d..80d7f49b 100644 --- a/client/client.go +++ b/client/client.go @@ -38,7 +38,9 @@ type Message interface { type Request interface { // The service to call Service() string - // The endpoint to call + // The action to take + Method() string + // The endpoint to invoke Endpoint() string // The content type ContentType() string diff --git a/client/rpc_codec.go b/client/rpc_codec.go index 3853479d..ece77f05 100644 --- a/client/rpc_codec.go +++ b/client/rpc_codec.go @@ -104,6 +104,7 @@ func (c *rpcCodec) Write(m *codec.Message, body interface{}) error { // set the mucp headers m.Header["X-Micro-Id"] = m.Id m.Header["X-Micro-Service"] = m.Target + m.Header["X-Micro-Method"] = m.Method m.Header["X-Micro-Endpoint"] = m.Endpoint // if body is bytes Frame don't encode @@ -154,6 +155,7 @@ func (c *rpcCodec) ReadHeader(wm *codec.Message, r codec.MessageType) error { // read header err := c.codec.ReadHeader(&me, r) wm.Endpoint = me.Endpoint + wm.Method = me.Method wm.Id = me.Id wm.Error = me.Error @@ -162,11 +164,16 @@ func (c *rpcCodec) ReadHeader(wm *codec.Message, r codec.MessageType) error { wm.Error = me.Header["X-Micro-Error"] } - // check method in header + // check endpoint in header if len(me.Endpoint) == 0 { wm.Endpoint = me.Header["X-Micro-Endpoint"] } + // check method in header + if len(me.Method) == 0 { + wm.Method = me.Header["X-Micro-Method"] + } + if len(me.Id) == 0 { wm.Id = me.Header["X-Micro-Id"] } diff --git a/client/rpc_request.go b/client/rpc_request.go index 746b4be5..d23855f0 100644 --- a/client/rpc_request.go +++ b/client/rpc_request.go @@ -6,6 +6,7 @@ import ( type rpcRequest struct { service string + method string endpoint string contentType string codec codec.Codec @@ -27,6 +28,7 @@ func newRequest(service, endpoint string, request interface{}, contentType strin return &rpcRequest{ service: service, + method: endpoint, endpoint: endpoint, body: request, contentType: contentType, @@ -42,6 +44,10 @@ func (r *rpcRequest) Service() string { return r.service } +func (r *rpcRequest) Method() string { + return r.method +} + func (r *rpcRequest) Endpoint() string { return r.endpoint } diff --git a/client/rpc_stream.go b/client/rpc_stream.go index 39e82fe4..21b2d2b3 100644 --- a/client/rpc_stream.go +++ b/client/rpc_stream.go @@ -53,6 +53,7 @@ func (r *rpcStream) Send(msg interface{}) error { req := codec.Message{ Id: r.id, Target: r.request.Service(), + Method: r.request.Method(), Endpoint: r.request.Endpoint(), Type: codec.Request, } diff --git a/codec/codec.go b/codec/codec.go index 868d2dd1..092caa05 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -53,6 +53,7 @@ type Message struct { Id string Type MessageType Target string + Method string Endpoint string Error string diff --git a/codec/jsonrpc/client.go b/codec/jsonrpc/client.go index 768e297d..f5ec5636 100644 --- a/codec/jsonrpc/client.go +++ b/codec/jsonrpc/client.go @@ -45,9 +45,9 @@ func newClientCodec(conn io.ReadWriteCloser) *clientCodec { func (c *clientCodec) Write(m *codec.Message, b interface{}) error { c.Lock() - c.pending[m.Id] = m.Endpoint + c.pending[m.Id] = m.Method c.Unlock() - c.req.Method = m.Endpoint + c.req.Method = m.Method c.req.Params[0] = b c.req.ID = m.Id return c.enc.Encode(&c.req) @@ -66,7 +66,7 @@ func (c *clientCodec) ReadHeader(m *codec.Message) error { } c.Lock() - m.Endpoint = c.pending[c.resp.ID] + m.Method = c.pending[c.resp.ID] delete(c.pending, c.resp.ID) c.Unlock() diff --git a/codec/jsonrpc/server.go b/codec/jsonrpc/server.go index a56b3468..53f681ef 100644 --- a/codec/jsonrpc/server.go +++ b/codec/jsonrpc/server.go @@ -53,7 +53,7 @@ func (c *serverCodec) ReadHeader(m *codec.Message) error { if err := c.dec.Decode(&c.req); err != nil { return err } - m.Endpoint = c.req.Method + m.Method = c.req.Method m.Id = fmt.Sprintf("%v", c.req.ID) c.req.ID = nil return nil diff --git a/codec/protorpc/protorpc.go b/codec/protorpc/protorpc.go index f207e5ae..4732b98e 100644 --- a/codec/protorpc/protorpc.go +++ b/codec/protorpc/protorpc.go @@ -47,7 +47,7 @@ func (c *protoCodec) Write(m *codec.Message, b interface{}) error { c.Lock() defer c.Unlock() // This is protobuf, of course we copy it. - pbr := &Request{ServiceMethod: &m.Endpoint, Seq: id(m.Id)} + pbr := &Request{ServiceMethod: &m.Method, Seq: id(m.Id)} data, err := proto.Marshal(pbr) if err != nil { return err @@ -73,7 +73,7 @@ func (c *protoCodec) Write(m *codec.Message, b interface{}) error { case codec.Response, codec.Error: c.Lock() defer c.Unlock() - rtmp := &Response{ServiceMethod: &m.Endpoint, Seq: id(m.Id), Error: &m.Error} + rtmp := &Response{ServiceMethod: &m.Method, Seq: id(m.Id), Error: &m.Error} data, err := proto.Marshal(rtmp) if err != nil { return err @@ -126,7 +126,7 @@ func (c *protoCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error { if err != nil { return err } - m.Endpoint = rtmp.GetServiceMethod() + m.Method = rtmp.GetServiceMethod() m.Id = fmt.Sprintf("%d", rtmp.GetSeq()) case codec.Response: data, err := ReadNetString(c.rwc) @@ -138,7 +138,7 @@ func (c *protoCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error { if err != nil { return err } - m.Endpoint = rtmp.GetServiceMethod() + m.Method = rtmp.GetServiceMethod() m.Id = fmt.Sprintf("%d", rtmp.GetSeq()) m.Error = rtmp.GetError() case codec.Publication: diff --git a/server/rpc_codec.go b/server/rpc_codec.go index 107ab2c9..847c5ae9 100644 --- a/server/rpc_codec.go +++ b/server/rpc_codec.go @@ -41,6 +41,15 @@ var ( "application/proto-rpc": protorpc.NewCodec, "application/octet-stream": raw.NewCodec, } + + // TODO: remove legacy codec list + defaultCodecs = map[string]codec.NewCodec{ + "application/json": jsonrpc.NewCodec, + "application/json-rpc": jsonrpc.NewCodec, + "application/protobuf": protorpc.NewCodec, + "application/proto-rpc": protorpc.NewCodec, + "application/octet-stream": protorpc.NewCodec, + } ) func (rwc *readWriteCloser) Read(p []byte) (n int, err error) { @@ -57,6 +66,33 @@ func (rwc *readWriteCloser) Close() error { return nil } +// setupProtocol sets up the old protocol +func setupProtocol(msg *transport.Message) codec.NewCodec { + // if the protocol exists do nothing + if len(msg.Header["X-Micro-Protocol"]) > 0 { + return nil + } + + // if 0.17 - 0.21 + if len(msg.Header["X-Micro-Service"]) > 0 { + // set method to endpoint + if len(msg.Header["X-Micro-Method"]) == 0 { + msg.Header["X-Micro-Method"] = msg.Header["X-Micro-Endpoint"] + } + + // set endpoint to method + if len(msg.Header["X-Micro-Endpoint"]) == 0 { + msg.Header["X-Micro-Endpoint"] = msg.Header["X-Micro-Method"] + } + + // done + return nil + } + + // old ways + return defaultCodecs[msg.Header["Content-Type"]] +} + func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCodec) codec.Codec { rwc := &readWriteCloser{ rbuf: bytes.NewBuffer(req.Body), @@ -109,6 +145,7 @@ func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error { // set some internal things m.Target = m.Header["X-Micro-Service"] + m.Method = m.Header["X-Micro-Method"] m.Endpoint = m.Header["X-Micro-Endpoint"] m.Id = m.Header["X-Micro-Id"] @@ -141,6 +178,8 @@ func (c *rpcCodec) Write(r *codec.Message, b interface{}) error { // create a new message m := &codec.Message{ + Target: r.Target, + Method: r.Method, Endpoint: r.Endpoint, Id: r.Id, Error: r.Error, @@ -162,6 +201,11 @@ func (c *rpcCodec) Write(r *codec.Message, b interface{}) error { m.Header["X-Micro-Service"] = r.Target } + // set request method + if len(r.Method) > 0 { + m.Header["X-Micro-Method"] = r.Method + } + // set request endpoint if len(r.Endpoint) > 0 { m.Header["X-Micro-Endpoint"] = r.Endpoint diff --git a/server/rpc_request.go b/server/rpc_request.go index 17ede91b..6496bc52 100644 --- a/server/rpc_request.go +++ b/server/rpc_request.go @@ -7,6 +7,7 @@ import ( type rpcRequest struct { service string + method string endpoint string contentType string socket transport.Socket @@ -34,6 +35,10 @@ func (r *rpcRequest) Service() string { return r.service } +func (r *rpcRequest) Method() string { + return r.method +} + func (r *rpcRequest) Endpoint() string { return r.endpoint } diff --git a/server/rpc_router.go b/server/rpc_router.go index dee976e8..111b3fde 100644 --- a/server/rpc_router.go +++ b/server/rpc_router.go @@ -189,6 +189,7 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, r := &rpcRequest{ service: req.msg.Target, contentType: req.msg.Header["Content-Type"], + method: req.msg.Method, endpoint: req.msg.Endpoint, body: req.msg.Body, } diff --git a/server/rpc_server.go b/server/rpc_server.go index f2a7042c..757c90ad 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -97,17 +97,23 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { ct = DefaultContentType } - // TODO: needs better error handling - cf, err := s.newCodec(ct) - if err != nil { - sock.Send(&transport.Message{ - Header: map[string]string{ - "Content-Type": "text/plain", - }, - Body: []byte(err.Error()), - }) - s.wg.Done() - return + // setup old protocol + cf := setupProtocol(&msg) + + // no old codec + if cf == nil { + // TODO: needs better error handling + var err error + if cf, err = s.newCodec(ct); err != nil { + sock.Send(&transport.Message{ + Header: map[string]string{ + "Content-Type": "text/plain", + }, + Body: []byte(err.Error()), + }) + s.wg.Done() + return + } } rcodec := newRpcCodec(&msg, sock, cf) @@ -115,6 +121,7 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { // internal request request := &rpcRequest{ service: msg.Header["X-Micro-Service"], + method: msg.Header["X-Micro-Method"], endpoint: msg.Header["X-Micro-Endpoint"], contentType: ct, codec: rcodec, diff --git a/server/rpc_stream.go b/server/rpc_stream.go index 2f74049a..dc49b18a 100644 --- a/server/rpc_stream.go +++ b/server/rpc_stream.go @@ -31,6 +31,8 @@ func (r *rpcStream) Send(msg interface{}) error { defer r.Unlock() resp := codec.Message{ + Target: r.request.Service(), + Method: r.request.Method(), Endpoint: r.request.Endpoint(), Id: r.id, Type: codec.Response, diff --git a/server/server.go b/server/server.go index 6f7c2712..82113843 100644 --- a/server/server.go +++ b/server/server.go @@ -45,6 +45,8 @@ type Message interface { type Request interface { // Service name requested Service() string + // The action requested + Method() string // Endpoint name requested Endpoint() string // Content type provided