diff --git a/client/grpc/grpc.go b/client/grpc/grpc.go index 3dc91523..d1be62ba 100644 --- a/client/grpc/grpc.go +++ b/client/grpc/grpc.go @@ -17,6 +17,7 @@ import ( raw "github.com/micro/go-micro/v2/codec/bytes" "github.com/micro/go-micro/v2/errors" "github.com/micro/go-micro/v2/metadata" + "github.com/micro/go-micro/v2/registry" "github.com/micro/go-micro/v2/router" "github.com/micro/go-micro/v2/selector" pnet "github.com/micro/go-micro/v2/util/net" @@ -123,7 +124,7 @@ func (g *grpcClient) lookupRoute(req client.Request, opts client.CallOptions) (* } } -func (g *grpcClient) call(ctx context.Context, route *router.Route, req client.Request, rsp interface{}, opts client.CallOptions) error { +func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.Request, rsp interface{}, opts client.CallOptions) error { var header map[string]string header = make(map[string]string) @@ -156,7 +157,7 @@ func (g *grpcClient) call(ctx context.Context, route *router.Route, req client.R grpcDialOptions := []grpc.DialOption{ grpc.WithTimeout(opts.DialTimeout), - g.secure(route.Address), + g.secure(node.Address), grpc.WithDefaultCallOptions( grpc.MaxCallRecvMsgSize(maxRecvMsgSize), grpc.MaxCallSendMsgSize(maxSendMsgSize), @@ -167,13 +168,13 @@ func (g *grpcClient) call(ctx context.Context, route *router.Route, req client.R grpcDialOptions = append(grpcDialOptions, opts...) } - cc, err := g.pool.getConn(route.Address, grpcDialOptions...) + cc, err := g.pool.getConn(node.Address, grpcDialOptions...) if err != nil { return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) } defer func() { // defer execution of release - g.pool.release(route.Address, cc, grr) + g.pool.release(node.Address, cc, grr) }() ch := make(chan error, 1) @@ -199,7 +200,7 @@ func (g *grpcClient) call(ctx context.Context, route *router.Route, req client.R return grr } -func (g *grpcClient) stream(ctx context.Context, route *router.Route, req client.Request, rsp interface{}, opts client.CallOptions) error { +func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client.Request, rsp interface{}, opts client.CallOptions) error { var header map[string]string if md, ok := metadata.FromContext(ctx); ok { @@ -239,14 +240,14 @@ func (g *grpcClient) stream(ctx context.Context, route *router.Route, req client grpcDialOptions := []grpc.DialOption{ grpc.WithTimeout(opts.DialTimeout), - g.secure(route.Address), + g.secure(node.Address), } if opts := g.getGrpcDialOptions(); opts != nil { grpcDialOptions = append(grpcDialOptions, opts...) } - cc, err := grpc.DialContext(dialCtx, route.Address, grpcDialOptions...) + cc, err := grpc.DialContext(dialCtx, node.Address, grpcDialOptions...) if err != nil { return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) } @@ -461,8 +462,13 @@ func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface return err } + // pass a node to enable backwards compatability as changing the + // call func would be a breaking change. + // todo v3: change the call func to accept a route + node := ®istry.Node{Address: route.Address} + // make the call - err = gcall(ctx, route, req, rsp, callOpts) + err = gcall(ctx, node, req, rsp, callOpts) // record the result of the call to inform future routing decisions g.opts.Selector.Record(*route, err) @@ -550,9 +556,14 @@ func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...cli return nil, err } + // pass a node to enable backwards compatability as changing the + // call func would be a breaking change. + // todo v3: change the call func to accept a route + node := ®istry.Node{Address: route.Address} + // make the call stream := &grpcStream{} - err = g.stream(ctx, route, req, stream, callOpts) + err = g.stream(ctx, node, req, stream, callOpts) // record the result of the call to inform future routing decisions g.opts.Selector.Record(*route, err) diff --git a/client/rpc_client.go b/client/rpc_client.go index 2a3f9dd2..4e9682eb 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -13,6 +13,7 @@ import ( raw "github.com/micro/go-micro/v2/codec/bytes" "github.com/micro/go-micro/v2/errors" "github.com/micro/go-micro/v2/metadata" + "github.com/micro/go-micro/v2/registry" "github.com/micro/go-micro/v2/router" "github.com/micro/go-micro/v2/selector" "github.com/micro/go-micro/v2/transport" @@ -113,7 +114,7 @@ func (r *rpcClient) lookupRoute(req Request, opts CallOptions) (*router.Route, e } } -func (r *rpcClient) call(ctx context.Context, route *router.Route, req Request, resp interface{}, opts CallOptions) error { +func (r *rpcClient) call(ctx context.Context, node *registry.Node, req Request, resp interface{}, opts CallOptions) error { msg := &transport.Message{ Header: make(map[string]string), } @@ -138,7 +139,7 @@ func (r *rpcClient) call(ctx context.Context, route *router.Route, req Request, msg.Header["Accept"] = req.ContentType() // setup old protocol - cf := setupProtocol(msg, route) + cf := setupProtocol(msg, node) // no codec specified if cf == nil { @@ -157,7 +158,7 @@ func (r *rpcClient) call(ctx context.Context, route *router.Route, req Request, dOpts = append(dOpts, transport.WithTimeout(opts.DialTimeout)) } - c, err := r.pool.Get(route.Address, dOpts...) + c, err := r.pool.Get(node.Address, dOpts...) if err != nil { return errors.InternalServerError("go.micro.client", "connection error: %v", err) } @@ -230,7 +231,7 @@ func (r *rpcClient) call(ctx context.Context, route *router.Route, req Request, return nil } -func (r *rpcClient) stream(ctx context.Context, route *router.Route, req Request, opts CallOptions) (Stream, error) { +func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request, opts CallOptions) (Stream, error) { msg := &transport.Message{ Header: make(map[string]string), } @@ -252,7 +253,7 @@ func (r *rpcClient) stream(ctx context.Context, route *router.Route, req Request msg.Header["Accept"] = req.ContentType() // set old codecs - cf := setupProtocol(msg, route) + cf := setupProtocol(msg, node) // no codec specified if cf == nil { @@ -271,7 +272,7 @@ func (r *rpcClient) stream(ctx context.Context, route *router.Route, req Request dOpts = append(dOpts, transport.WithTimeout(opts.DialTimeout)) } - c, err := r.opts.Transport.Dial(route.Address, dOpts...) + c, err := r.opts.Transport.Dial(node.Address, dOpts...) if err != nil { return nil, errors.InternalServerError("go.micro.client", "connection error: %v", err) } @@ -420,8 +421,13 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac return err } + // pass a node to enable backwards compatability as changing the + // call func would be a breaking change. + // todo v3: change the call func to accept a route + node := ®istry.Node{Address: route.Address, Metadata: route.Metadata} + // make the call - err = rcall(ctx, route, request, response, callOpts) + err = rcall(ctx, node, request, response, callOpts) // record the result of the call to inform future routing decisions r.opts.Selector.Record(*route, err) @@ -502,8 +508,13 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt return nil, err } + // pass a node to enable backwards compatability as changing the + // call func would be a breaking change. + // todo v3: change the call func to accept a route + node := ®istry.Node{Address: route.Address, Metadata: route.Metadata} + // perform the call - stream, err := r.stream(ctx, route, request, callOpts) + stream, err := r.stream(ctx, node, request, callOpts) // record the result of the call to inform future routing decisions r.opts.Selector.Record(*route, err) diff --git a/client/rpc_client_test.go b/client/rpc_client_test.go index ebe1128a..73b671b3 100644 --- a/client/rpc_client_test.go +++ b/client/rpc_client_test.go @@ -8,7 +8,6 @@ import ( "github.com/micro/go-micro/v2/errors" "github.com/micro/go-micro/v2/registry" "github.com/micro/go-micro/v2/registry/memory" - "github.com/micro/go-micro/v2/router" ) func newTestRegistry() registry.Registry { @@ -22,7 +21,7 @@ func TestCallAddress(t *testing.T) { address := "10.1.10.1:8080" wrap := func(cf CallFunc) CallFunc { - return func(ctx context.Context, route *router.Route, req Request, rsp interface{}, opts CallOptions) error { + return func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error { called = true if req.Service() != service { @@ -33,8 +32,8 @@ func TestCallAddress(t *testing.T) { return fmt.Errorf("expected service: %s got %s", endpoint, req.Endpoint()) } - if route.Address != address { - return fmt.Errorf("expected address: %s got %s", address, route.Address) + if node.Address != address { + return fmt.Errorf("expected address: %s got %s", address, node.Address) } // don't do the call @@ -69,7 +68,7 @@ func TestCallRetry(t *testing.T) { var called int wrap := func(cf CallFunc) CallFunc { - return func(ctx context.Context, route *router.Route, req Request, rsp interface{}, opts CallOptions) error { + return func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error { called++ if called == 1 { return errors.InternalServerError("test.error", "retry request") @@ -107,7 +106,7 @@ func TestCallWrapper(t *testing.T) { address := "10.1.10.1:8080" wrap := func(cf CallFunc) CallFunc { - return func(ctx context.Context, route *router.Route, req Request, rsp interface{}, opts CallOptions) error { + return func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error { called = true if req.Service() != service { @@ -118,8 +117,8 @@ func TestCallWrapper(t *testing.T) { return fmt.Errorf("expected service: %s got %s", endpoint, req.Endpoint()) } - if route.Address != address { - return fmt.Errorf("expected address: %s got %s", address, route.Address) + if node.Address != address { + return fmt.Errorf("expected address: %s got %s", address, node.Address) } // don't do the call diff --git a/client/rpc_codec.go b/client/rpc_codec.go index 5c62530a..35a392b5 100644 --- a/client/rpc_codec.go +++ b/client/rpc_codec.go @@ -12,7 +12,7 @@ import ( "github.com/micro/go-micro/v2/codec/proto" "github.com/micro/go-micro/v2/codec/protorpc" "github.com/micro/go-micro/v2/errors" - "github.com/micro/go-micro/v2/router" + "github.com/micro/go-micro/v2/registry" "github.com/micro/go-micro/v2/transport" ) @@ -128,9 +128,9 @@ func setHeaders(m *codec.Message, stream string) { } // setupProtocol sets up the old protocol -func setupProtocol(msg *transport.Message, route *router.Route) codec.NewCodec { - // get the protocol from route metadata - if protocol := route.Metadata["protocol"]; len(protocol) > 0 { +func setupProtocol(msg *transport.Message, node *registry.Node) codec.NewCodec { + // get the protocol from node metadata + if protocol := node.Metadata["protocol"]; len(protocol) > 0 { return nil } diff --git a/client/wrapper.go b/client/wrapper.go index 2c1f6de8..d5138bcc 100644 --- a/client/wrapper.go +++ b/client/wrapper.go @@ -3,11 +3,11 @@ package client import ( "context" - "github.com/micro/go-micro/v2/router" + "github.com/micro/go-micro/v2/registry" ) // CallFunc represents the individual call func -type CallFunc func(ctx context.Context, route *router.Route, req Request, rsp interface{}, opts CallOptions) error +type CallFunc func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error // CallWrapper is a low level wrapper for the CallFunc type CallWrapper func(CallFunc) CallFunc