From 2cd2258731b7b0a7ea9d3d150677e101487db020 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Fri, 18 Jan 2019 10:12:57 +0000 Subject: [PATCH 1/4] For the legacy --- client/client.go | 4 +++- client/rpc_codec.go | 9 +++++++- client/rpc_request.go | 6 ++++++ client/rpc_stream.go | 1 + codec/codec.go | 1 + codec/jsonrpc/client.go | 6 +++--- codec/jsonrpc/server.go | 2 +- codec/protorpc/protorpc.go | 8 +++---- server/rpc_codec.go | 44 ++++++++++++++++++++++++++++++++++++++ server/rpc_request.go | 5 +++++ server/rpc_router.go | 1 + server/rpc_server.go | 29 +++++++++++++++---------- server/rpc_stream.go | 2 ++ server/server.go | 2 ++ 14 files changed, 99 insertions(+), 21 deletions(-) diff --git a/client/client.go b/client/client.go index 494d579d..80d7f49b 100644 --- a/client/client.go +++ b/client/client.go @@ -38,7 +38,9 @@ type Message interface { type Request interface { // The service to call Service() string - // The endpoint to call + // The action to take + Method() string + // The endpoint to invoke Endpoint() string // The content type ContentType() string diff --git a/client/rpc_codec.go b/client/rpc_codec.go index 3853479d..ece77f05 100644 --- a/client/rpc_codec.go +++ b/client/rpc_codec.go @@ -104,6 +104,7 @@ func (c *rpcCodec) Write(m *codec.Message, body interface{}) error { // set the mucp headers m.Header["X-Micro-Id"] = m.Id m.Header["X-Micro-Service"] = m.Target + m.Header["X-Micro-Method"] = m.Method m.Header["X-Micro-Endpoint"] = m.Endpoint // if body is bytes Frame don't encode @@ -154,6 +155,7 @@ func (c *rpcCodec) ReadHeader(wm *codec.Message, r codec.MessageType) error { // read header err := c.codec.ReadHeader(&me, r) wm.Endpoint = me.Endpoint + wm.Method = me.Method wm.Id = me.Id wm.Error = me.Error @@ -162,11 +164,16 @@ func (c *rpcCodec) ReadHeader(wm *codec.Message, r codec.MessageType) error { wm.Error = me.Header["X-Micro-Error"] } - // check method in header + // check endpoint in header if len(me.Endpoint) == 0 { wm.Endpoint = me.Header["X-Micro-Endpoint"] } + // check method in header + if len(me.Method) == 0 { + wm.Method = me.Header["X-Micro-Method"] + } + if len(me.Id) == 0 { wm.Id = me.Header["X-Micro-Id"] } diff --git a/client/rpc_request.go b/client/rpc_request.go index 746b4be5..d23855f0 100644 --- a/client/rpc_request.go +++ b/client/rpc_request.go @@ -6,6 +6,7 @@ import ( type rpcRequest struct { service string + method string endpoint string contentType string codec codec.Codec @@ -27,6 +28,7 @@ func newRequest(service, endpoint string, request interface{}, contentType strin return &rpcRequest{ service: service, + method: endpoint, endpoint: endpoint, body: request, contentType: contentType, @@ -42,6 +44,10 @@ func (r *rpcRequest) Service() string { return r.service } +func (r *rpcRequest) Method() string { + return r.method +} + func (r *rpcRequest) Endpoint() string { return r.endpoint } diff --git a/client/rpc_stream.go b/client/rpc_stream.go index 39e82fe4..21b2d2b3 100644 --- a/client/rpc_stream.go +++ b/client/rpc_stream.go @@ -53,6 +53,7 @@ func (r *rpcStream) Send(msg interface{}) error { req := codec.Message{ Id: r.id, Target: r.request.Service(), + Method: r.request.Method(), Endpoint: r.request.Endpoint(), Type: codec.Request, } diff --git a/codec/codec.go b/codec/codec.go index 868d2dd1..092caa05 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -53,6 +53,7 @@ type Message struct { Id string Type MessageType Target string + Method string Endpoint string Error string diff --git a/codec/jsonrpc/client.go b/codec/jsonrpc/client.go index 768e297d..f5ec5636 100644 --- a/codec/jsonrpc/client.go +++ b/codec/jsonrpc/client.go @@ -45,9 +45,9 @@ func newClientCodec(conn io.ReadWriteCloser) *clientCodec { func (c *clientCodec) Write(m *codec.Message, b interface{}) error { c.Lock() - c.pending[m.Id] = m.Endpoint + c.pending[m.Id] = m.Method c.Unlock() - c.req.Method = m.Endpoint + c.req.Method = m.Method c.req.Params[0] = b c.req.ID = m.Id return c.enc.Encode(&c.req) @@ -66,7 +66,7 @@ func (c *clientCodec) ReadHeader(m *codec.Message) error { } c.Lock() - m.Endpoint = c.pending[c.resp.ID] + m.Method = c.pending[c.resp.ID] delete(c.pending, c.resp.ID) c.Unlock() diff --git a/codec/jsonrpc/server.go b/codec/jsonrpc/server.go index a56b3468..53f681ef 100644 --- a/codec/jsonrpc/server.go +++ b/codec/jsonrpc/server.go @@ -53,7 +53,7 @@ func (c *serverCodec) ReadHeader(m *codec.Message) error { if err := c.dec.Decode(&c.req); err != nil { return err } - m.Endpoint = c.req.Method + m.Method = c.req.Method m.Id = fmt.Sprintf("%v", c.req.ID) c.req.ID = nil return nil diff --git a/codec/protorpc/protorpc.go b/codec/protorpc/protorpc.go index f207e5ae..4732b98e 100644 --- a/codec/protorpc/protorpc.go +++ b/codec/protorpc/protorpc.go @@ -47,7 +47,7 @@ func (c *protoCodec) Write(m *codec.Message, b interface{}) error { c.Lock() defer c.Unlock() // This is protobuf, of course we copy it. - pbr := &Request{ServiceMethod: &m.Endpoint, Seq: id(m.Id)} + pbr := &Request{ServiceMethod: &m.Method, Seq: id(m.Id)} data, err := proto.Marshal(pbr) if err != nil { return err @@ -73,7 +73,7 @@ func (c *protoCodec) Write(m *codec.Message, b interface{}) error { case codec.Response, codec.Error: c.Lock() defer c.Unlock() - rtmp := &Response{ServiceMethod: &m.Endpoint, Seq: id(m.Id), Error: &m.Error} + rtmp := &Response{ServiceMethod: &m.Method, Seq: id(m.Id), Error: &m.Error} data, err := proto.Marshal(rtmp) if err != nil { return err @@ -126,7 +126,7 @@ func (c *protoCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error { if err != nil { return err } - m.Endpoint = rtmp.GetServiceMethod() + m.Method = rtmp.GetServiceMethod() m.Id = fmt.Sprintf("%d", rtmp.GetSeq()) case codec.Response: data, err := ReadNetString(c.rwc) @@ -138,7 +138,7 @@ func (c *protoCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error { if err != nil { return err } - m.Endpoint = rtmp.GetServiceMethod() + m.Method = rtmp.GetServiceMethod() m.Id = fmt.Sprintf("%d", rtmp.GetSeq()) m.Error = rtmp.GetError() case codec.Publication: diff --git a/server/rpc_codec.go b/server/rpc_codec.go index 107ab2c9..847c5ae9 100644 --- a/server/rpc_codec.go +++ b/server/rpc_codec.go @@ -41,6 +41,15 @@ var ( "application/proto-rpc": protorpc.NewCodec, "application/octet-stream": raw.NewCodec, } + + // TODO: remove legacy codec list + defaultCodecs = map[string]codec.NewCodec{ + "application/json": jsonrpc.NewCodec, + "application/json-rpc": jsonrpc.NewCodec, + "application/protobuf": protorpc.NewCodec, + "application/proto-rpc": protorpc.NewCodec, + "application/octet-stream": protorpc.NewCodec, + } ) func (rwc *readWriteCloser) Read(p []byte) (n int, err error) { @@ -57,6 +66,33 @@ func (rwc *readWriteCloser) Close() error { return nil } +// setupProtocol sets up the old protocol +func setupProtocol(msg *transport.Message) codec.NewCodec { + // if the protocol exists do nothing + if len(msg.Header["X-Micro-Protocol"]) > 0 { + return nil + } + + // if 0.17 - 0.21 + if len(msg.Header["X-Micro-Service"]) > 0 { + // set method to endpoint + if len(msg.Header["X-Micro-Method"]) == 0 { + msg.Header["X-Micro-Method"] = msg.Header["X-Micro-Endpoint"] + } + + // set endpoint to method + if len(msg.Header["X-Micro-Endpoint"]) == 0 { + msg.Header["X-Micro-Endpoint"] = msg.Header["X-Micro-Method"] + } + + // done + return nil + } + + // old ways + return defaultCodecs[msg.Header["Content-Type"]] +} + func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCodec) codec.Codec { rwc := &readWriteCloser{ rbuf: bytes.NewBuffer(req.Body), @@ -109,6 +145,7 @@ func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error { // set some internal things m.Target = m.Header["X-Micro-Service"] + m.Method = m.Header["X-Micro-Method"] m.Endpoint = m.Header["X-Micro-Endpoint"] m.Id = m.Header["X-Micro-Id"] @@ -141,6 +178,8 @@ func (c *rpcCodec) Write(r *codec.Message, b interface{}) error { // create a new message m := &codec.Message{ + Target: r.Target, + Method: r.Method, Endpoint: r.Endpoint, Id: r.Id, Error: r.Error, @@ -162,6 +201,11 @@ func (c *rpcCodec) Write(r *codec.Message, b interface{}) error { m.Header["X-Micro-Service"] = r.Target } + // set request method + if len(r.Method) > 0 { + m.Header["X-Micro-Method"] = r.Method + } + // set request endpoint if len(r.Endpoint) > 0 { m.Header["X-Micro-Endpoint"] = r.Endpoint diff --git a/server/rpc_request.go b/server/rpc_request.go index 17ede91b..6496bc52 100644 --- a/server/rpc_request.go +++ b/server/rpc_request.go @@ -7,6 +7,7 @@ import ( type rpcRequest struct { service string + method string endpoint string contentType string socket transport.Socket @@ -34,6 +35,10 @@ func (r *rpcRequest) Service() string { return r.service } +func (r *rpcRequest) Method() string { + return r.method +} + func (r *rpcRequest) Endpoint() string { return r.endpoint } diff --git a/server/rpc_router.go b/server/rpc_router.go index dee976e8..111b3fde 100644 --- a/server/rpc_router.go +++ b/server/rpc_router.go @@ -189,6 +189,7 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, r := &rpcRequest{ service: req.msg.Target, contentType: req.msg.Header["Content-Type"], + method: req.msg.Method, endpoint: req.msg.Endpoint, body: req.msg.Body, } diff --git a/server/rpc_server.go b/server/rpc_server.go index f2a7042c..757c90ad 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -97,17 +97,23 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { ct = DefaultContentType } - // TODO: needs better error handling - cf, err := s.newCodec(ct) - if err != nil { - sock.Send(&transport.Message{ - Header: map[string]string{ - "Content-Type": "text/plain", - }, - Body: []byte(err.Error()), - }) - s.wg.Done() - return + // setup old protocol + cf := setupProtocol(&msg) + + // no old codec + if cf == nil { + // TODO: needs better error handling + var err error + if cf, err = s.newCodec(ct); err != nil { + sock.Send(&transport.Message{ + Header: map[string]string{ + "Content-Type": "text/plain", + }, + Body: []byte(err.Error()), + }) + s.wg.Done() + return + } } rcodec := newRpcCodec(&msg, sock, cf) @@ -115,6 +121,7 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { // internal request request := &rpcRequest{ service: msg.Header["X-Micro-Service"], + method: msg.Header["X-Micro-Method"], endpoint: msg.Header["X-Micro-Endpoint"], contentType: ct, codec: rcodec, diff --git a/server/rpc_stream.go b/server/rpc_stream.go index 2f74049a..dc49b18a 100644 --- a/server/rpc_stream.go +++ b/server/rpc_stream.go @@ -31,6 +31,8 @@ func (r *rpcStream) Send(msg interface{}) error { defer r.Unlock() resp := codec.Message{ + Target: r.request.Service(), + Method: r.request.Method(), Endpoint: r.request.Endpoint(), Id: r.id, Type: codec.Response, diff --git a/server/server.go b/server/server.go index 6f7c2712..82113843 100644 --- a/server/server.go +++ b/server/server.go @@ -45,6 +45,8 @@ type Message interface { type Request interface { // Service name requested Service() string + // The action requested + Method() string // Endpoint name requested Endpoint() string // Content type provided From f41be53ff8f51295eddec13d1a71bcfa27165cc1 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Fri, 18 Jan 2019 10:23:36 +0000 Subject: [PATCH 2/4] Add ability to process legacy requests --- server/rpc_codec.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/server/rpc_codec.go b/server/rpc_codec.go index 847c5ae9..a06c3c3c 100644 --- a/server/rpc_codec.go +++ b/server/rpc_codec.go @@ -153,9 +153,15 @@ func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error { err := c.codec.ReadHeader(&m, codec.Request) // set the method/id + r.Method = m.Method r.Endpoint = m.Endpoint r.Id = m.Id + // TODO: remove the old legacy cruft + if len(r.Endpoint) == 0 { + r.Endpoint = r.Method + } + return err } From 9bd32645be7090d20c68c941b9f7891f525513d3 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Fri, 18 Jan 2019 10:43:41 +0000 Subject: [PATCH 3/4] Account for old target --- server/rpc_codec.go | 45 +++++++++++++++++++++++++++------------------ 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/server/rpc_codec.go b/server/rpc_codec.go index a06c3c3c..261b5fd1 100644 --- a/server/rpc_codec.go +++ b/server/rpc_codec.go @@ -68,29 +68,38 @@ func (rwc *readWriteCloser) Close() error { // setupProtocol sets up the old protocol func setupProtocol(msg *transport.Message) codec.NewCodec { - // if the protocol exists do nothing - if len(msg.Header["X-Micro-Protocol"]) > 0 { + service := msg.Header["X-Micro-Service"] + method := msg.Header["X-Micro-Method"] + endpoint := msg.Header["X-Micro-Endpoint"] + protocol := msg.Header["X-Micro-Protocol"] + target := msg.Header["X-Micro-Target"] + + // if the protocol exists (mucp) do nothing + if len(protocol) > 0 { return nil } - // if 0.17 - 0.21 - if len(msg.Header["X-Micro-Service"]) > 0 { - // set method to endpoint - if len(msg.Header["X-Micro-Method"]) == 0 { - msg.Header["X-Micro-Method"] = msg.Header["X-Micro-Endpoint"] - } - - // set endpoint to method - if len(msg.Header["X-Micro-Endpoint"]) == 0 { - msg.Header["X-Micro-Endpoint"] = msg.Header["X-Micro-Method"] - } - - // done - return nil + // if no service/method/endpoint then it's the old protocol + if len(service) == 0 && len(method) == 0 && len(endpoint) == 0 { + return defaultCodecs[msg.Header["Content-Type"]] } - // old ways - return defaultCodecs[msg.Header["Content-Type"]] + // old target method specified + if len(target) > 0 { + return defaultCodecs[msg.Header["Content-Type"]] + } + + // no method then set to endpoint + if len(method) == 0 { + msg.Header["X-Micro-Method"] = method + } + + // no endpoint then set to method + if len(endpoint) == 0 { + msg.Header["X-Micro-Endpoint"] = method + } + + return nil } func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCodec) codec.Codec { From 6468733d98f1084c188e4c523dd8e63b7f8eb076 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Fri, 18 Jan 2019 12:30:39 +0000 Subject: [PATCH 4/4] Use protocol from node metadata --- client/rpc_client.go | 72 ++++++++++++++++++++++++++------------- client/rpc_client_test.go | 30 +++++++++------- client/rpc_codec.go | 31 +++++++++++++++++ client/rpc_request.go | 4 +-- client/rpc_stream.go | 2 +- client/wrapper.go | 4 ++- server/rpc_server.go | 1 + 7 files changed, 103 insertions(+), 41 deletions(-) diff --git a/client/rpc_client.go b/client/rpc_client.go index 16db46cf..3028d366 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -4,10 +4,11 @@ import ( "bytes" "context" "fmt" + "net" + "strconv" "sync" - "time" - "sync/atomic" + "time" "github.com/google/uuid" "github.com/micro/go-micro/broker" @@ -56,7 +57,12 @@ func (r *rpcClient) newCodec(contentType string) (codec.NewCodec, error) { return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType) } -func (r *rpcClient) call(ctx context.Context, address string, req Request, resp interface{}, opts CallOptions) error { +func (r *rpcClient) call(ctx context.Context, node *registry.Node, req Request, resp interface{}, opts CallOptions) error { + address := node.Address + if node.Port > 0 { + address = fmt.Sprintf("%s:%d", address, node.Port) + } + msg := &transport.Message{ Header: make(map[string]string), } @@ -75,9 +81,16 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp // set the accept header msg.Header["Accept"] = req.ContentType() - cf, err := r.newCodec(req.ContentType()) - if err != nil { - return errors.InternalServerError("go.micro.client", err.Error()) + // setup old protocol + cf := setupProtocol(msg, node) + + // no codec specified + if cf == nil { + var err error + cf, err = r.newCodec(req.ContentType()) + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } } var grr error @@ -144,7 +157,12 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp } } -func (r *rpcClient) stream(ctx context.Context, address string, req Request, opts CallOptions) (Stream, error) { +func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request, opts CallOptions) (Stream, error) { + address := node.Address + if node.Port > 0 { + address = fmt.Sprintf("%s:%d", address, node.Port) + } + msg := &transport.Message{ Header: make(map[string]string), } @@ -163,9 +181,16 @@ func (r *rpcClient) stream(ctx context.Context, address string, req Request, opt // set the accept header msg.Header["Accept"] = req.ContentType() - cf, err := r.newCodec(req.ContentType()) - if err != nil { - return nil, errors.InternalServerError("go.micro.client", err.Error()) + // set old codecs + cf := setupProtocol(msg, node) + + // no codec specified + if cf == nil { + var err error + cf, err = r.newCodec(req.ContentType()) + if err != nil { + return nil, errors.InternalServerError("go.micro.client", err.Error()) + } } dOpts := []transport.DialOption{ @@ -245,9 +270,19 @@ func (r *rpcClient) Options() Options { func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, error) { // return remote address if len(opts.Address) > 0 { + address := opts.Address + port := 0 + + host, sport, err := net.SplitHostPort(opts.Address) + if err == nil { + address = host + port, _ = strconv.Atoi(sport) + } + return func() (*registry.Node, error) { return ®istry.Node{ - Address: opts.Address, + Address: address, + Port: port, }, nil }, nil } @@ -323,14 +358,8 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac return errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error()) } - // set the address - address := node.Address - if node.Port > 0 { - address = fmt.Sprintf("%s:%d", address, node.Port) - } - // make the call - err = rcall(ctx, address, request, response, callOpts) + err = rcall(ctx, node, request, response, callOpts) r.opts.Selector.Mark(request.Service(), node, err) return err } @@ -406,12 +435,7 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error()) } - address := node.Address - if node.Port > 0 { - address = fmt.Sprintf("%s:%d", address, node.Port) - } - - stream, err := r.stream(ctx, address, request, callOpts) + stream, err := r.stream(ctx, node, request, callOpts) r.opts.Selector.Mark(request.Service(), node, err) return stream, err } diff --git a/client/rpc_client_test.go b/client/rpc_client_test.go index 33b5a493..26fa1e77 100644 --- a/client/rpc_client_test.go +++ b/client/rpc_client_test.go @@ -21,10 +21,11 @@ func TestCallAddress(t *testing.T) { var called bool service := "test.service" endpoint := "Test.Endpoint" - address := "10.1.10.1:8080" + address := "10.1.10.1" + port := 8080 wrap := func(cf CallFunc) CallFunc { - return func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error { + return func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error { called = true if req.Service() != service { @@ -35,8 +36,12 @@ func TestCallAddress(t *testing.T) { return fmt.Errorf("expected service: %s got %s", endpoint, req.Endpoint()) } - if addr != address { - return fmt.Errorf("expected address: %s got %s", address, addr) + if node.Address != address { + return fmt.Errorf("expected address: %s got %s", address, node.Address) + } + + if node.Port != port { + return fmt.Errorf("expected address: %d got %d", port, node.Port) } // don't do the call @@ -54,7 +59,7 @@ func TestCallAddress(t *testing.T) { req := c.NewRequest(service, endpoint, nil) // test calling remote address - if err := c.Call(context.Background(), req, nil, WithAddress(address)); err != nil { + if err := c.Call(context.Background(), req, nil, WithAddress(fmt.Sprintf("%s:%d", address, port))); err != nil { t.Fatal("call with address error", err) } @@ -67,12 +72,12 @@ func TestCallAddress(t *testing.T) { func TestCallRetry(t *testing.T) { service := "test.service" endpoint := "Test.Endpoint" - address := "10.1.10.1:8080" + address := "10.1.10.1" var called int wrap := func(cf CallFunc) CallFunc { - return func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error { + return func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error { called++ if called == 1 { return errors.InternalServerError("test.error", "retry request") @@ -108,12 +113,11 @@ func TestCallWrapper(t *testing.T) { id := "test.1" service := "test.service" endpoint := "Test.Endpoint" - host := "10.1.10.1" + address := "10.1.10.1" port := 8080 - address := "10.1.10.1:8080" wrap := func(cf CallFunc) CallFunc { - return func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error { + return func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error { called = true if req.Service() != service { @@ -124,8 +128,8 @@ func TestCallWrapper(t *testing.T) { return fmt.Errorf("expected service: %s got %s", endpoint, req.Endpoint()) } - if addr != address { - return fmt.Errorf("expected address: %s got %s", address, addr) + if node.Address != address { + return fmt.Errorf("expected address: %s got %s", address, node.Address) } // don't do the call @@ -146,7 +150,7 @@ func TestCallWrapper(t *testing.T) { Nodes: []*registry.Node{ ®istry.Node{ Id: id, - Address: host, + Address: address, Port: port, }, }, diff --git a/client/rpc_codec.go b/client/rpc_codec.go index ece77f05..35f64e41 100644 --- a/client/rpc_codec.go +++ b/client/rpc_codec.go @@ -12,6 +12,7 @@ import ( "github.com/micro/go-micro/codec/proto" "github.com/micro/go-micro/codec/protorpc" "github.com/micro/go-micro/errors" + "github.com/micro/go-micro/registry" "github.com/micro/go-micro/transport" ) @@ -58,6 +59,15 @@ var ( "application/proto-rpc": protorpc.NewCodec, "application/octet-stream": raw.NewCodec, } + + // TODO: remove legacy codec list + defaultCodecs = map[string]codec.NewCodec{ + "application/json": jsonrpc.NewCodec, + "application/json-rpc": jsonrpc.NewCodec, + "application/protobuf": protorpc.NewCodec, + "application/proto-rpc": protorpc.NewCodec, + "application/octet-stream": protorpc.NewCodec, + } ) func (rwc *readWriteCloser) Read(p []byte) (n int, err error) { @@ -74,6 +84,27 @@ func (rwc *readWriteCloser) Close() error { return nil } +// setupProtocol sets up the old protocol +func setupProtocol(msg *transport.Message, node *registry.Node) codec.NewCodec { + protocol := node.Metadata["protocol"] + + // got protocol + if len(protocol) > 0 { + return nil + } + + // no protocol use old codecs + switch msg.Header["Content-Type"] { + case "application/json": + msg.Header["Content-Type"] = "application/json-rpc" + case "application/protobuf": + msg.Header["Content-Type"] = "application/proto-rpc" + } + + // now return codec + return defaultCodecs[msg.Header["Content-Type"]] +} + func newRpcCodec(req *transport.Message, client transport.Client, c codec.NewCodec) codec.Codec { rwc := &readWriteCloser{ wbuf: bytes.NewBuffer(nil), diff --git a/client/rpc_request.go b/client/rpc_request.go index d23855f0..7fd5762b 100644 --- a/client/rpc_request.go +++ b/client/rpc_request.go @@ -6,7 +6,7 @@ import ( type rpcRequest struct { service string - method string + method string endpoint string contentType string codec codec.Codec @@ -28,7 +28,7 @@ func newRequest(service, endpoint string, request interface{}, contentType strin return &rpcRequest{ service: service, - method: endpoint, + method: endpoint, endpoint: endpoint, body: request, contentType: contentType, diff --git a/client/rpc_stream.go b/client/rpc_stream.go index 21b2d2b3..f605c11e 100644 --- a/client/rpc_stream.go +++ b/client/rpc_stream.go @@ -53,7 +53,7 @@ func (r *rpcStream) Send(msg interface{}) error { req := codec.Message{ Id: r.id, Target: r.request.Service(), - Method: r.request.Method(), + Method: r.request.Method(), Endpoint: r.request.Endpoint(), Type: codec.Request, } diff --git a/client/wrapper.go b/client/wrapper.go index aacb5ae5..ab86dcd1 100644 --- a/client/wrapper.go +++ b/client/wrapper.go @@ -2,10 +2,12 @@ package client import ( "context" + + "github.com/micro/go-micro/registry" ) // CallFunc represents the individual call func -type CallFunc func(ctx context.Context, address string, req Request, rsp interface{}, opts CallOptions) error +type CallFunc func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error // CallWrapper is a low level wrapper for the CallFunc type CallWrapper func(CallFunc) CallFunc diff --git a/server/rpc_server.go b/server/rpc_server.go index 757c90ad..1afeff6e 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -283,6 +283,7 @@ func (s *rpcServer) Register() error { node.Metadata["broker"] = config.Broker.String() node.Metadata["server"] = s.String() node.Metadata["registry"] = config.Registry.String() + node.Metadata["protocol"] = "mucp" s.RLock() // Maps are ordered randomly, sort the keys for consistency