diff --git a/client/rpc_client.go b/client/rpc_client.go index 16db46cf..3028d366 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -4,10 +4,11 @@ import ( "bytes" "context" "fmt" + "net" + "strconv" "sync" - "time" - "sync/atomic" + "time" "github.com/google/uuid" "github.com/micro/go-micro/broker" @@ -56,7 +57,12 @@ func (r *rpcClient) newCodec(contentType string) (codec.NewCodec, error) { return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType) } -func (r *rpcClient) call(ctx context.Context, address string, req Request, resp interface{}, opts CallOptions) error { +func (r *rpcClient) call(ctx context.Context, node *registry.Node, req Request, resp interface{}, opts CallOptions) error { + address := node.Address + if node.Port > 0 { + address = fmt.Sprintf("%s:%d", address, node.Port) + } + msg := &transport.Message{ Header: make(map[string]string), } @@ -75,9 +81,16 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp // set the accept header msg.Header["Accept"] = req.ContentType() - cf, err := r.newCodec(req.ContentType()) - if err != nil { - return errors.InternalServerError("go.micro.client", err.Error()) + // setup old protocol + cf := setupProtocol(msg, node) + + // no codec specified + if cf == nil { + var err error + cf, err = r.newCodec(req.ContentType()) + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } } var grr error @@ -144,7 +157,12 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp } } -func (r *rpcClient) stream(ctx context.Context, address string, req Request, opts CallOptions) (Stream, error) { +func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request, opts CallOptions) (Stream, error) { + address := node.Address + if node.Port > 0 { + address = fmt.Sprintf("%s:%d", address, node.Port) + } + msg := &transport.Message{ Header: make(map[string]string), } @@ -163,9 +181,16 @@ func (r *rpcClient) stream(ctx context.Context, address string, req Request, opt // set the accept header msg.Header["Accept"] = req.ContentType() - cf, err := r.newCodec(req.ContentType()) - if err != nil { - return nil, errors.InternalServerError("go.micro.client", err.Error()) + // set old codecs + cf := setupProtocol(msg, node) + + // no codec specified + if cf == nil { + var err error + cf, err = r.newCodec(req.ContentType()) + if err != nil { + return nil, errors.InternalServerError("go.micro.client", err.Error()) + } } dOpts := []transport.DialOption{ @@ -245,9 +270,19 @@ func (r *rpcClient) Options() Options { func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, error) { // return remote address if len(opts.Address) > 0 { + address := opts.Address + port := 0 + + host, sport, err := net.SplitHostPort(opts.Address) + if err == nil { + address = host + port, _ = strconv.Atoi(sport) + } + return func() (*registry.Node, error) { return ®istry.Node{ - Address: opts.Address, + Address: address, + Port: port, }, nil }, nil } @@ -323,14 +358,8 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac return errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error()) } - // set the address - address := node.Address - if node.Port > 0 { - address = fmt.Sprintf("%s:%d", address, node.Port) - } - // make the call - err = rcall(ctx, address, request, response, callOpts) + err = rcall(ctx, node, request, response, callOpts) r.opts.Selector.Mark(request.Service(), node, err) return err } @@ -406,12 +435,7 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error()) } - address := node.Address - if node.Port > 0 { - address = fmt.Sprintf("%s:%d", address, node.Port) - } - - stream, err := r.stream(ctx, address, request, callOpts) + stream, err := r.stream(ctx, node, request, callOpts) r.opts.Selector.Mark(request.Service(), node, err) return stream, err } diff --git a/client/rpc_client_test.go b/client/rpc_client_test.go index 33b5a493..26fa1e77 100644 --- a/client/rpc_client_test.go +++ b/client/rpc_client_test.go @@ -21,10 +21,11 @@ func TestCallAddress(t *testing.T) { var called bool service := "test.service" endpoint := "Test.Endpoint" - address := "10.1.10.1:8080" + address := "10.1.10.1" + port := 8080 wrap := func(cf CallFunc) CallFunc { - return func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error { + return func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error { called = true if req.Service() != service { @@ -35,8 +36,12 @@ func TestCallAddress(t *testing.T) { return fmt.Errorf("expected service: %s got %s", endpoint, req.Endpoint()) } - if addr != address { - return fmt.Errorf("expected address: %s got %s", address, addr) + if node.Address != address { + return fmt.Errorf("expected address: %s got %s", address, node.Address) + } + + if node.Port != port { + return fmt.Errorf("expected address: %d got %d", port, node.Port) } // don't do the call @@ -54,7 +59,7 @@ func TestCallAddress(t *testing.T) { req := c.NewRequest(service, endpoint, nil) // test calling remote address - if err := c.Call(context.Background(), req, nil, WithAddress(address)); err != nil { + if err := c.Call(context.Background(), req, nil, WithAddress(fmt.Sprintf("%s:%d", address, port))); err != nil { t.Fatal("call with address error", err) } @@ -67,12 +72,12 @@ func TestCallAddress(t *testing.T) { func TestCallRetry(t *testing.T) { service := "test.service" endpoint := "Test.Endpoint" - address := "10.1.10.1:8080" + address := "10.1.10.1" var called int wrap := func(cf CallFunc) CallFunc { - return func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error { + return func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error { called++ if called == 1 { return errors.InternalServerError("test.error", "retry request") @@ -108,12 +113,11 @@ func TestCallWrapper(t *testing.T) { id := "test.1" service := "test.service" endpoint := "Test.Endpoint" - host := "10.1.10.1" + address := "10.1.10.1" port := 8080 - address := "10.1.10.1:8080" wrap := func(cf CallFunc) CallFunc { - return func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error { + return func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error { called = true if req.Service() != service { @@ -124,8 +128,8 @@ func TestCallWrapper(t *testing.T) { return fmt.Errorf("expected service: %s got %s", endpoint, req.Endpoint()) } - if addr != address { - return fmt.Errorf("expected address: %s got %s", address, addr) + if node.Address != address { + return fmt.Errorf("expected address: %s got %s", address, node.Address) } // don't do the call @@ -146,7 +150,7 @@ func TestCallWrapper(t *testing.T) { Nodes: []*registry.Node{ ®istry.Node{ Id: id, - Address: host, + Address: address, Port: port, }, }, diff --git a/client/rpc_codec.go b/client/rpc_codec.go index ece77f05..35f64e41 100644 --- a/client/rpc_codec.go +++ b/client/rpc_codec.go @@ -12,6 +12,7 @@ import ( "github.com/micro/go-micro/codec/proto" "github.com/micro/go-micro/codec/protorpc" "github.com/micro/go-micro/errors" + "github.com/micro/go-micro/registry" "github.com/micro/go-micro/transport" ) @@ -58,6 +59,15 @@ var ( "application/proto-rpc": protorpc.NewCodec, "application/octet-stream": raw.NewCodec, } + + // TODO: remove legacy codec list + defaultCodecs = map[string]codec.NewCodec{ + "application/json": jsonrpc.NewCodec, + "application/json-rpc": jsonrpc.NewCodec, + "application/protobuf": protorpc.NewCodec, + "application/proto-rpc": protorpc.NewCodec, + "application/octet-stream": protorpc.NewCodec, + } ) func (rwc *readWriteCloser) Read(p []byte) (n int, err error) { @@ -74,6 +84,27 @@ func (rwc *readWriteCloser) Close() error { return nil } +// setupProtocol sets up the old protocol +func setupProtocol(msg *transport.Message, node *registry.Node) codec.NewCodec { + protocol := node.Metadata["protocol"] + + // got protocol + if len(protocol) > 0 { + return nil + } + + // no protocol use old codecs + switch msg.Header["Content-Type"] { + case "application/json": + msg.Header["Content-Type"] = "application/json-rpc" + case "application/protobuf": + msg.Header["Content-Type"] = "application/proto-rpc" + } + + // now return codec + return defaultCodecs[msg.Header["Content-Type"]] +} + func newRpcCodec(req *transport.Message, client transport.Client, c codec.NewCodec) codec.Codec { rwc := &readWriteCloser{ wbuf: bytes.NewBuffer(nil), diff --git a/client/rpc_request.go b/client/rpc_request.go index d23855f0..7fd5762b 100644 --- a/client/rpc_request.go +++ b/client/rpc_request.go @@ -6,7 +6,7 @@ import ( type rpcRequest struct { service string - method string + method string endpoint string contentType string codec codec.Codec @@ -28,7 +28,7 @@ func newRequest(service, endpoint string, request interface{}, contentType strin return &rpcRequest{ service: service, - method: endpoint, + method: endpoint, endpoint: endpoint, body: request, contentType: contentType, diff --git a/client/rpc_stream.go b/client/rpc_stream.go index 21b2d2b3..f605c11e 100644 --- a/client/rpc_stream.go +++ b/client/rpc_stream.go @@ -53,7 +53,7 @@ func (r *rpcStream) Send(msg interface{}) error { req := codec.Message{ Id: r.id, Target: r.request.Service(), - Method: r.request.Method(), + Method: r.request.Method(), Endpoint: r.request.Endpoint(), Type: codec.Request, } diff --git a/client/wrapper.go b/client/wrapper.go index aacb5ae5..ab86dcd1 100644 --- a/client/wrapper.go +++ b/client/wrapper.go @@ -2,10 +2,12 @@ package client import ( "context" + + "github.com/micro/go-micro/registry" ) // CallFunc represents the individual call func -type CallFunc func(ctx context.Context, address string, req Request, rsp interface{}, opts CallOptions) error +type CallFunc func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error // CallWrapper is a low level wrapper for the CallFunc type CallWrapper func(CallFunc) CallFunc diff --git a/server/rpc_server.go b/server/rpc_server.go index 757c90ad..1afeff6e 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -283,6 +283,7 @@ func (s *rpcServer) Register() error { node.Metadata["broker"] = config.Broker.String() node.Metadata["server"] = s.String() node.Metadata["registry"] = config.Registry.String() + node.Metadata["protocol"] = "mucp" s.RLock() // Maps are ordered randomly, sort the keys for consistency