From 422fd980e983920c14b1b59c975cf6f204890fcd Mon Sep 17 00:00:00 2001 From: ben-toogood Date: Wed, 1 Jul 2020 17:06:59 +0100 Subject: [PATCH] Deprecate client/selector (#1767) * client/{grpc,rpc}: depricate selector (wip) * {client,cmd}: remove client/selector * deprecate client/selector * router/static: fix lookup * config/cmd: add support for legacy static selector flag * config/cmd: add support for legacy dns selector flag --- grpc.go | 134 ++++++++++++++++++++++++++++----------------------- grpc_test.go | 13 ++--- 2 files changed, 77 insertions(+), 70 deletions(-) diff --git a/grpc.go b/grpc.go index 42a31e0..b57b082 100644 --- a/grpc.go +++ b/grpc.go @@ -5,6 +5,7 @@ import ( "context" "crypto/tls" "fmt" + "math/rand" "net" "reflect" "strings" @@ -13,11 +14,11 @@ import ( "github.com/micro/go-micro/v2/broker" "github.com/micro/go-micro/v2/client" - "github.com/micro/go-micro/v2/client/selector" raw "github.com/micro/go-micro/v2/codec/bytes" "github.com/micro/go-micro/v2/errors" "github.com/micro/go-micro/v2/metadata" - "github.com/micro/go-micro/v2/registry" + "github.com/micro/go-micro/v2/router" + "github.com/micro/go-micro/v2/selector" pnet "github.com/micro/go-micro/v2/util/net" "google.golang.org/grpc" @@ -72,41 +73,59 @@ func (g *grpcClient) secure(addr string) grpc.DialOption { return grpc.WithInsecure() } -func (g *grpcClient) next(request client.Request, opts client.CallOptions) (selector.Next, error) { - service, address, _ := pnet.Proxy(request.Service(), opts.Address) - - // return remote address - if len(address) > 0 { - return func() (*registry.Node, error) { - return ®istry.Node{ - Address: address[0], - }, nil +// lookupRoute for a request using the router and then choose one using the selector +func (g *grpcClient) lookupRoute(req client.Request, opts client.CallOptions) (*router.Route, error) { + // check to see if the proxy has been set, if it has we don't need to lookup the routes; net.Proxy + // returns a slice of addresses, so we'll use a random one. Eventually we should to use the + // selector for this. + service, addresses, _ := pnet.Proxy(req.Service(), opts.Address) + if len(addresses) > 0 { + return &router.Route{ + Service: service, + Address: addresses[rand.Int()%len(addresses)], }, nil } - // if the network was set, pass it to the selector - sopts := opts.SelectOptions + // construct the router query + query := []router.QueryOption{router.QueryService(req.Service())} + + // if a custom network was requested, pass this to the router. By default the router will use it's + // own network, which is set during initialisation. if len(opts.Network) > 0 { - sopts = append(sopts, selector.WithDomain(opts.Network)) + query = append(query, router.QueryNetwork(opts.Network)) } - // get next nodes from the selector - next, err := g.opts.Selector.Select(service, sopts...) - if err != nil { - if err == selector.ErrNotFound { - return nil, errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error()) - } - return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %s", service, err.Error()) + // use the router passed as a call option, or fallback to the grpc clients router + if opts.Router == nil { + opts.Router = g.opts.Router } - return next, nil + // lookup the routes which can be used to execute the request + routes, err := opts.Router.Lookup(query...) + if err == router.ErrRouteNotFound { + return nil, errors.InternalServerError("go.micro.client", "service %s: %s", req.Service(), err.Error()) + } else if err != nil { + return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %s", req.Service(), err.Error()) + } + + // use the selector passed as a call option, or fallback to the grpc clients selector + if opts.Selector == nil { + opts.Selector = g.opts.Selector + } + + // select the route to use for the request + if route, err := opts.Selector.Select(routes); err == selector.ErrNoneAvailable { + return nil, errors.InternalServerError("go.micro.client", "service %s: %s", req.Service(), err.Error()) + } else if err != nil { + return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %s", req.Service(), err.Error()) + } else { + return route, nil + } } -func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.Request, rsp interface{}, opts client.CallOptions) error { +func (g *grpcClient) call(ctx context.Context, route *router.Route, req client.Request, rsp interface{}, opts client.CallOptions) error { var header map[string]string - address := node.Address - header = make(map[string]string) if md, ok := metadata.FromContext(ctx); ok { header = make(map[string]string, len(md)) @@ -137,7 +156,7 @@ func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.R grpcDialOptions := []grpc.DialOption{ grpc.WithTimeout(opts.DialTimeout), - g.secure(address), + g.secure(route.Address), grpc.WithDefaultCallOptions( grpc.MaxCallRecvMsgSize(maxRecvMsgSize), grpc.MaxCallSendMsgSize(maxSendMsgSize), @@ -148,13 +167,13 @@ func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.R grpcDialOptions = append(grpcDialOptions, opts...) } - cc, err := g.pool.getConn(address, grpcDialOptions...) + cc, err := g.pool.getConn(route.Address, grpcDialOptions...) if err != nil { return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) } defer func() { // defer execution of release - g.pool.release(address, cc, grr) + g.pool.release(route.Address, cc, grr) }() ch := make(chan error, 1) @@ -180,11 +199,9 @@ func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.R return grr } -func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client.Request, rsp interface{}, opts client.CallOptions) error { +func (g *grpcClient) stream(ctx context.Context, route *router.Route, req client.Request, rsp interface{}, opts client.CallOptions) error { var header map[string]string - address := node.Address - if md, ok := metadata.FromContext(ctx); ok { header = make(map[string]string, len(md)) for k, v := range md { @@ -222,14 +239,14 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client grpcDialOptions := []grpc.DialOption{ grpc.WithTimeout(opts.DialTimeout), - g.secure(address), + g.secure(route.Address), } if opts := g.getGrpcDialOptions(); opts != nil { grpcDialOptions = append(grpcDialOptions, opts...) } - cc, err := grpc.DialContext(dialCtx, address, grpcDialOptions...) + cc, err := grpc.DialContext(dialCtx, route.Address, grpcDialOptions...) if err != nil { return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) } @@ -396,11 +413,6 @@ func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface opt(&callOpts) } - next, err := g.next(req, callOpts) - if err != nil { - return err - } - // check if we already have a deadline d, ok := ctx.Deadline() if !ok { @@ -443,19 +455,19 @@ func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface time.Sleep(t) } - // select next node - node, err := next() - service := req.Service() + // lookup the route to send the reques to + route, err := g.lookupRoute(req, callOpts) if err != nil { - if err == selector.ErrNotFound { - return errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error()) - } - return errors.InternalServerError("go.micro.client", "error selecting %s node: %s", service, err.Error()) + return err } // make the call - err = gcall(ctx, node, req, rsp, callOpts) - g.opts.Selector.Mark(service, node, err) + err = gcall(ctx, route, req, rsp, callOpts) + + // record the result of the call to inform future routing decisions + g.opts.Selector.Record(*route, err) + + // try and transform the error to a go-micro error if verr, ok := err.(*errors.Error); ok { return verr } @@ -503,11 +515,6 @@ func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...cli opt(&callOpts) } - next, err := g.next(req, callOpts) - if err != nil { - return nil, err - } - // #200 - streams shouldn't have a request timeout set on the context // should we noop right here? @@ -537,20 +544,25 @@ func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...cli time.Sleep(t) } - node, err := next() - service := req.Service() + // lookup the route to send the reques to + route, err := g.lookupRoute(req, callOpts) if err != nil { - if err == selector.ErrNotFound { - return nil, errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error()) - } - return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %s", service, err.Error()) + return nil, err } // make the call stream := &grpcStream{} - err = g.stream(ctx, node, req, stream, callOpts) + err = g.stream(ctx, route, req, stream, callOpts) - g.opts.Selector.Mark(service, node, err) + // record the result of the call to inform future routing decisions + g.opts.Selector.Record(*route, err) + + // try and transform the error to a go-micro error + if verr, ok := err.(*errors.Error); ok { + return nil, verr + } + + g.opts.Selector.Record(*route, err) return stream, err } @@ -577,7 +589,7 @@ func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...cli return rsp.stream, nil } - retry, rerr := callOpts.Retry(ctx, req, i, err) + retry, rerr := callOpts.Retry(ctx, req, i, grr) if rerr != nil { return nil, rerr } diff --git a/grpc_test.go b/grpc_test.go index 1b37dad..bbec1b4 100644 --- a/grpc_test.go +++ b/grpc_test.go @@ -6,10 +6,10 @@ import ( "testing" "github.com/micro/go-micro/v2/client" - "github.com/micro/go-micro/v2/client/selector" "github.com/micro/go-micro/v2/errors" "github.com/micro/go-micro/v2/registry" "github.com/micro/go-micro/v2/registry/memory" + "github.com/micro/go-micro/v2/router" pgrpc "google.golang.org/grpc" pb "google.golang.org/grpc/examples/helloworld/helloworld" ) @@ -56,16 +56,11 @@ func TestGRPCClient(t *testing.T) { }, }) - // create selector - se := selector.NewSelector( - selector.Registry(r), - ) + // create router + rtr := router.NewRouter(router.Registry(r)) // create client - c := NewClient( - client.Registry(r), - client.Selector(se), - ) + c := NewClient(client.Router(rtr)) testMethods := []string{ "/helloworld.Greeter/SayHello",