From 50ec6c748fcde5d1a64fe4d7d0887d341103a4ff Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Mon, 17 Aug 2020 22:44:45 +0100 Subject: [PATCH] cleanup client/selector/lookup (#1937) * cleanup client/selector/lookup * add mdns router, remove registry from client * fix roundtripper * remove comment * fix compile issue * fix mucp test * fix api router --- api/router/router_test.go | 8 +- client/grpc/grpc.go | 129 ++++++++++---------- client/{util.go => lookup.go} | 26 ++--- client/mucp/mucp.go | 155 ++++++++++++------------- client/mucp/mucp_test.go | 39 ++++--- client/options.go | 22 ++-- client/wrapper.go | 4 +- proxy/http/http_test.go | 7 +- router/mdns/mdns.go | 117 +++++++++++++++++++ selector/options.go | 20 +--- selector/random.go | 56 --------- selector/random/random.go | 36 +++++- selector/roundrobin/roundrobin.go | 99 ++-------------- selector/roundrobin/roundrobin_test.go | 37 +++--- selector/selector.go | 23 +--- selector/tests.go | 30 ++--- util/http/roundtripper.go | 8 +- 17 files changed, 398 insertions(+), 418 deletions(-) rename client/{util.go => lookup.go} (60%) create mode 100644 router/mdns/mdns.go delete mode 100644 selector/random.go diff --git a/api/router/router_test.go b/api/router/router_test.go index f56c8d23..be802c33 100644 --- a/api/router/router_test.go +++ b/api/router/router_test.go @@ -18,6 +18,8 @@ import ( "github.com/micro/go-micro/v3/client" gcli "github.com/micro/go-micro/v3/client/grpc" rmemory "github.com/micro/go-micro/v3/registry/memory" + rt "github.com/micro/go-micro/v3/router" + regRouter "github.com/micro/go-micro/v3/router/registry" "github.com/micro/go-micro/v3/server" gsrv "github.com/micro/go-micro/v3/server/grpc" pb "github.com/micro/go-micro/v3/server/grpc/proto" @@ -55,9 +57,13 @@ func initial(t *testing.T) (server.Server, client.Client) { server.Registry(r), ) + rtr := regRouter.NewRouter( + rt.Registry(r), + ) + // create a new server c := gcli.NewClient( - client.Registry(r), + client.Router(rtr), ) h := &testServer{} diff --git a/client/grpc/grpc.go b/client/grpc/grpc.go index b7287b30..f230fa46 100644 --- a/client/grpc/grpc.go +++ b/client/grpc/grpc.go @@ -16,7 +16,6 @@ import ( raw "github.com/micro/go-micro/v3/codec/bytes" "github.com/micro/go-micro/v3/errors" "github.com/micro/go-micro/v3/metadata" - "github.com/micro/go-micro/v3/registry" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -70,7 +69,7 @@ func (g *grpcClient) secure(addr string) grpc.DialOption { return grpc.WithInsecure() } -func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.Request, rsp interface{}, opts client.CallOptions) error { +func (g *grpcClient) call(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error { var header map[string]string header = make(map[string]string) @@ -103,7 +102,7 @@ func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.R grpcDialOptions := []grpc.DialOption{ grpc.WithTimeout(opts.DialTimeout), - g.secure(node.Address), + g.secure(addr), grpc.WithDefaultCallOptions( grpc.MaxCallRecvMsgSize(maxRecvMsgSize), grpc.MaxCallSendMsgSize(maxSendMsgSize), @@ -114,13 +113,13 @@ func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.R grpcDialOptions = append(grpcDialOptions, opts...) } - cc, err := g.pool.getConn(node.Address, grpcDialOptions...) + cc, err := g.pool.getConn(addr, grpcDialOptions...) if err != nil { return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) } defer func() { // defer execution of release - g.pool.release(node.Address, cc, grr) + g.pool.release(addr, cc, grr) }() ch := make(chan error, 1) @@ -146,7 +145,7 @@ func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.R return grr } -func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client.Request, rsp interface{}, opts client.CallOptions) error { +func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error { var header map[string]string if md, ok := metadata.FromContext(ctx); ok { @@ -186,14 +185,14 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client grpcDialOptions := []grpc.DialOption{ grpc.WithTimeout(opts.DialTimeout), - g.secure(node.Address), + g.secure(addr), } if opts := g.getGrpcDialOptions(); opts != nil { grpcDialOptions = append(grpcDialOptions, opts...) } - cc, err := grpc.DialContext(dialCtx, node.Address, grpcDialOptions...) + cc, err := grpc.DialContext(dialCtx, addr, grpcDialOptions...) if err != nil { return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) } @@ -389,6 +388,34 @@ func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface gcall = callOpts.CallWrappers[i-1](gcall) } + // use the router passed as a call option, or fallback to the rpc clients router + if callOpts.Router == nil { + callOpts.Router = g.opts.Router + } + + if callOpts.Selector == nil { + callOpts.Selector = g.opts.Selector + } + + // inject proxy address + // TODO: don't even bother using Lookup/Select in this case + if len(g.opts.Proxy) > 0 { + callOpts.Address = []string{g.opts.Proxy} + } + + // lookup the route to send the reques to + // TODO apply any filtering here + routes, err := g.opts.Lookup(ctx, req, callOpts) + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + + // balance the list of nodes + next, err := callOpts.Selector.Select(routes) + if err != nil { + return err + } + // return errors.New("go.micro.client", "request timeout", 408) call := func(i int) error { // call backoff first. Someone may want an initial start delay @@ -402,36 +429,14 @@ func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface time.Sleep(t) } - // use the router passed as a call option, or fallback to the rpc clients router - if callOpts.Router == nil { - callOpts.Router = g.opts.Router - } - // use the selector passed as a call option, or fallback to the rpc clients selector - if callOpts.Selector == nil { - callOpts.Selector = g.opts.Selector - } - - // inject proxy address - if len(g.opts.Proxy) > 0 { - callOpts.Address = []string{g.opts.Proxy} - } - - // lookup the route to send the reques to - route, err := client.LookupRoute(req, callOpts) - if err != nil { - return errors.InternalServerError("go.micro.client", err.Error()) - } - - // pass a node to enable backwards compatability as changing the - // call func would be a breaking change. - // todo v3: change the call func to accept a route - node := ®istry.Node{Address: route.Address} + // get the next node + node := next() // make the call err = gcall(ctx, node, req, rsp, callOpts) // record the result of the call to inform future routing decisions - g.opts.Selector.Record(*route, err) + g.opts.Selector.Record(node, err) // try and transform the error to a go-micro error if verr, ok := err.(*errors.Error); ok { @@ -498,6 +503,34 @@ func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...cli gstream = callOpts.CallWrappers[i-1](gstream) } + // use the router passed as a call option, or fallback to the rpc clients router + if callOpts.Router == nil { + callOpts.Router = g.opts.Router + } + + if callOpts.Selector == nil { + callOpts.Selector = g.opts.Selector + } + + // inject proxy address + // TODO: don't even bother using Lookup/Select in this case + if len(g.opts.Proxy) > 0 { + callOpts.Address = []string{g.opts.Proxy} + } + + // lookup the route to send the reques to + // TODO: move to internal lookup func + routes, err := g.opts.Lookup(ctx, req, callOpts) + if err != nil { + return nil, errors.InternalServerError("go.micro.client", err.Error()) + } + + // balance the list of nodes + next, err := callOpts.Selector.Select(routes) + if err != nil { + return nil, err + } + call := func(i int) (client.Stream, error) { // call backoff first. Someone may want an initial start delay t, err := callOpts.Backoff(ctx, req, i) @@ -510,44 +543,22 @@ func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...cli time.Sleep(t) } - // use the router passed as a call option, or fallback to the rpc clients router - if callOpts.Router == nil { - callOpts.Router = g.opts.Router - } - // use the selector passed as a call option, or fallback to the rpc clients selector - if callOpts.Selector == nil { - callOpts.Selector = g.opts.Selector - } - - // inject proxy address - if len(g.opts.Proxy) > 0 { - callOpts.Address = []string{g.opts.Proxy} - } - - // lookup the route to send the reques to - route, err := client.LookupRoute(req, callOpts) - if err != nil { - return nil, errors.InternalServerError("go.micro.client", err.Error()) - } - - // pass a node to enable backwards compatability as changing the - // call func would be a breaking change. - // todo v3: change the call func to accept a route - node := ®istry.Node{Address: route.Address} + // get the next node + node := next() // make the call stream := &grpcStream{} err = g.stream(ctx, node, req, stream, callOpts) // record the result of the call to inform future routing decisions - g.opts.Selector.Record(*route, err) + g.opts.Selector.Record(node, err) // try and transform the error to a go-micro error if verr, ok := err.(*errors.Error); ok { return nil, verr } - g.opts.Selector.Record(*route, err) + g.opts.Selector.Record(node, err) return stream, err } diff --git a/client/util.go b/client/lookup.go similarity index 60% rename from client/util.go rename to client/lookup.go index 169690e3..76528bc5 100644 --- a/client/util.go +++ b/client/lookup.go @@ -1,21 +1,20 @@ package client import ( - "math/rand" + "context" "github.com/micro/go-micro/v3/errors" "github.com/micro/go-micro/v3/router" - "github.com/micro/go-micro/v3/selector" ) +// LookupFunc is used to lookup routes for a service +type LookupFunc func(context.Context, Request, CallOptions) ([]string, error) + // LookupRoute for a request using the router and then choose one using the selector -func LookupRoute(req Request, opts CallOptions) (*router.Route, error) { +func LookupRoute(ctx context.Context, req Request, opts CallOptions) ([]string, error) { // check to see if an address was provided as a call option if len(opts.Address) > 0 { - return &router.Route{ - Service: req.Service(), - Address: opts.Address[rand.Int()%len(opts.Address)], - }, nil + return opts.Address, nil } // construct the router query @@ -35,12 +34,11 @@ func LookupRoute(req Request, opts CallOptions) (*router.Route, error) { return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %s", req.Service(), err.Error()) } - // select the route to use for the request - if route, err := opts.Selector.Select(routes, opts.SelectOptions...); err == selector.ErrNoneAvailable { - return nil, errors.InternalServerError("go.micro.client", "service %s: %s", req.Service(), err.Error()) - } else if err != nil { - return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %s", req.Service(), err.Error()) - } else { - return route, nil + var addrs []string + + for _, route := range routes { + addrs = append(addrs, route.Address) } + + return addrs, nil } diff --git a/client/mucp/mucp.go b/client/mucp/mucp.go index 54214e55..b961f25c 100644 --- a/client/mucp/mucp.go +++ b/client/mucp/mucp.go @@ -14,17 +14,11 @@ import ( raw "github.com/micro/go-micro/v3/codec/bytes" "github.com/micro/go-micro/v3/errors" "github.com/micro/go-micro/v3/metadata" - "github.com/micro/go-micro/v3/registry" "github.com/micro/go-micro/v3/transport" "github.com/micro/go-micro/v3/util/buf" "github.com/micro/go-micro/v3/util/pool" ) -// NewClient returns a new micro client interface -func NewClient(opts ...client.Option) client.Client { - return newClient(opts...) -} - type rpcClient struct { once atomic.Value opts client.Options @@ -32,7 +26,8 @@ type rpcClient struct { seq uint64 } -func newClient(opt ...client.Option) client.Client { +// NewClient returns a new micro client interface +func NewClient(opt ...client.Option) client.Client { opts := client.NewOptions(opt...) p := pool.NewPool( @@ -68,7 +63,7 @@ func (r *rpcClient) newCodec(contentType string) (codec.NewCodec, error) { return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType) } -func (r *rpcClient) call(ctx context.Context, node *registry.Node, req client.Request, resp interface{}, opts client.CallOptions) error { +func (r *rpcClient) call(ctx context.Context, addr string, req client.Request, resp interface{}, opts client.CallOptions) error { msg := &transport.Message{ Header: make(map[string]string), } @@ -92,16 +87,9 @@ func (r *rpcClient) call(ctx context.Context, node *registry.Node, req client.Re // set the accept header msg.Header["Accept"] = req.ContentType() - // setup old protocol - cf := setupProtocol(msg, node) - - // no codec specified - if cf == nil { - var err error - cf, err = r.newCodec(req.ContentType()) - if err != nil { - return errors.InternalServerError("go.micro.client", err.Error()) - } + cf, err := r.newCodec(req.ContentType()) + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) } dOpts := []transport.DialOption{ @@ -112,7 +100,7 @@ func (r *rpcClient) call(ctx context.Context, node *registry.Node, req client.Re dOpts = append(dOpts, transport.WithTimeout(opts.DialTimeout)) } - c, err := r.pool.Get(node.Address, dOpts...) + c, err := r.pool.Get(addr, dOpts...) if err != nil { return errors.InternalServerError("go.micro.client", "connection error: %v", err) } @@ -185,7 +173,7 @@ func (r *rpcClient) call(ctx context.Context, node *registry.Node, req client.Re return nil } -func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req client.Request, opts client.CallOptions) (client.Stream, error) { +func (r *rpcClient) stream(ctx context.Context, addr string, req client.Request, opts client.CallOptions) (client.Stream, error) { msg := &transport.Message{ Header: make(map[string]string), } @@ -206,16 +194,9 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req client. // set the accept header msg.Header["Accept"] = req.ContentType() - // set old codecs - cf := setupProtocol(msg, node) - - // no codec specified - if cf == nil { - var err error - cf, err = r.newCodec(req.ContentType()) - if err != nil { - return nil, errors.InternalServerError("go.micro.client", err.Error()) - } + cf, err := r.newCodec(req.ContentType()) + if err != nil { + return nil, errors.InternalServerError("go.micro.client", err.Error()) } dOpts := []transport.DialOption{ @@ -226,7 +207,7 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req client. dOpts = append(dOpts, transport.WithTimeout(opts.DialTimeout)) } - c, err := r.opts.Transport.Dial(node.Address, dOpts...) + c, err := r.opts.Transport.Dial(addr, dOpts...) if err != nil { return nil, errors.InternalServerError("go.micro.client", "connection error: %v", err) } @@ -356,6 +337,34 @@ func (r *rpcClient) Call(ctx context.Context, request client.Request, response i rcall = callOpts.CallWrappers[i-1](rcall) } + // use the router passed as a call option, or fallback to the rpc clients router + if callOpts.Router == nil { + callOpts.Router = r.opts.Router + } + + if callOpts.Selector == nil { + callOpts.Selector = r.opts.Selector + } + + // inject proxy address + // TODO: don't even bother using Lookup/Select in this case + if len(r.opts.Proxy) > 0 { + callOpts.Address = []string{r.opts.Proxy} + } + + // lookup the route to send the reques to + // TODO apply any filtering here + routes, err := r.opts.Lookup(ctx, request, callOpts) + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + + // balance the list of nodes + next, err := callOpts.Selector.Select(routes) + if err != nil { + return err + } + // return errors.New("go.micro.client", "request timeout", 408) call := func(i int) error { // call backoff first. Someone may want an initial start delay @@ -369,36 +378,14 @@ func (r *rpcClient) Call(ctx context.Context, request client.Request, response i time.Sleep(t) } - // use the router passed as a call option, or fallback to the rpc clients router - if callOpts.Router == nil { - callOpts.Router = r.opts.Router - } - // use the selector passed as a call option, or fallback to the rpc clients selector - if callOpts.Selector == nil { - callOpts.Selector = r.opts.Selector - } - - // inject proxy address - if len(r.opts.Proxy) > 0 { - callOpts.Address = []string{r.opts.Proxy} - } - - // lookup the route to send the request via - route, err := client.LookupRoute(request, callOpts) - if err != nil { - return err - } - - // pass a node to enable backwards comparability as changing the - // call func would be a breaking change. - // todo v3: change the call func to accept a route - node := ®istry.Node{Address: route.Address, Metadata: route.Metadata} + // get the next node + node := next() // make the call err = rcall(ctx, node, request, response, callOpts) // record the result of the call to inform future routing decisions - r.opts.Selector.Record(*route, err) + r.opts.Selector.Record(node, err) return err } @@ -458,6 +445,34 @@ func (r *rpcClient) Stream(ctx context.Context, request client.Request, opts ... default: } + // use the router passed as a call option, or fallback to the rpc clients router + if callOpts.Router == nil { + callOpts.Router = r.opts.Router + } + + if callOpts.Selector == nil { + callOpts.Selector = r.opts.Selector + } + + // inject proxy address + // TODO: don't even bother using Lookup/Select in this case + if len(r.opts.Proxy) > 0 { + callOpts.Address = []string{r.opts.Proxy} + } + + // lookup the route to send the reques to + // TODO apply any filtering here + routes, err := r.opts.Lookup(ctx, request, callOpts) + if err != nil { + return nil, errors.InternalServerError("go.micro.client", err.Error()) + } + + // balance the list of nodes + next, err := callOpts.Selector.Select(routes) + if err != nil { + return nil, err + } + call := func(i int) (client.Stream, error) { // call backoff first. Someone may want an initial start delay t, err := callOpts.Backoff(ctx, request, i) @@ -470,36 +485,14 @@ func (r *rpcClient) Stream(ctx context.Context, request client.Request, opts ... time.Sleep(t) } - // use the router passed as a call option, or fallback to the rpc clients router - if callOpts.Router == nil { - callOpts.Router = r.opts.Router - } - // use the selector passed as a call option, or fallback to the rpc clients selector - if callOpts.Selector == nil { - callOpts.Selector = r.opts.Selector - } - - // inject proxy address - if len(r.opts.Proxy) > 0 { - callOpts.Address = []string{r.opts.Proxy} - } - - // lookup the route to send the request via - route, err := client.LookupRoute(request, callOpts) - if err != nil { - return nil, err - } - - // pass a node to enable backwards compatability as changing the - // call func would be a breaking change. - // todo v3: change the call func to accept a route - node := ®istry.Node{Address: route.Address, Metadata: route.Metadata} + // get the next node + node := next() // perform the call stream, err := r.stream(ctx, node, request, callOpts) // record the result of the call to inform future routing decisions - r.opts.Selector.Record(*route, err) + r.opts.Selector.Record(node, err) return stream, err } diff --git a/client/mucp/mucp_test.go b/client/mucp/mucp_test.go index 5c81c9ed..72735095 100644 --- a/client/mucp/mucp_test.go +++ b/client/mucp/mucp_test.go @@ -9,10 +9,13 @@ import ( "github.com/micro/go-micro/v3/errors" "github.com/micro/go-micro/v3/registry" "github.com/micro/go-micro/v3/registry/memory" + "github.com/micro/go-micro/v3/router" + regRouter "github.com/micro/go-micro/v3/router/registry" ) -func newTestRegistry() registry.Registry { - return memory.NewRegistry(memory.Services(testData)) +func newTestRouter() router.Router { + reg := memory.NewRegistry(memory.Services(testData)) + return regRouter.NewRouter(router.Registry(reg)) } func TestCallAddress(t *testing.T) { @@ -22,7 +25,7 @@ func TestCallAddress(t *testing.T) { address := "10.1.10.1:8080" wrap := func(cf client.CallFunc) client.CallFunc { - return func(ctx context.Context, node *registry.Node, req client.Request, rsp interface{}, opts client.CallOptions) error { + return func(ctx context.Context, node string, req client.Request, rsp interface{}, opts client.CallOptions) error { called = true if req.Service() != service { @@ -33,8 +36,8 @@ func TestCallAddress(t *testing.T) { return fmt.Errorf("expected service: %s got %s", endpoint, req.Endpoint()) } - if node.Address != address { - return fmt.Errorf("expected address: %s got %s", address, node.Address) + if node != address { + return fmt.Errorf("expected address: %s got %s", address, node) } // don't do the call @@ -42,9 +45,10 @@ func TestCallAddress(t *testing.T) { } } - r := newTestRegistry() + r := newTestRouter() + c := NewClient( - client.Registry(r), + client.Router(r), client.WrapCall(wrap), ) @@ -69,7 +73,7 @@ func TestCallRetry(t *testing.T) { var called int wrap := func(cf client.CallFunc) client.CallFunc { - return func(ctx context.Context, node *registry.Node, req client.Request, rsp interface{}, opts client.CallOptions) error { + return func(ctx context.Context, node string, req client.Request, rsp interface{}, opts client.CallOptions) error { called++ if called == 1 { return errors.InternalServerError("test.error", "retry request") @@ -80,9 +84,9 @@ func TestCallRetry(t *testing.T) { } } - r := newTestRegistry() + r := newTestRouter() c := NewClient( - client.Registry(r), + client.Router(r), client.WrapCall(wrap), ) @@ -107,7 +111,7 @@ func TestCallWrapper(t *testing.T) { address := "10.1.10.1:8080" wrap := func(cf client.CallFunc) client.CallFunc { - return func(ctx context.Context, node *registry.Node, req client.Request, rsp interface{}, opts client.CallOptions) error { + return func(ctx context.Context, node string, req client.Request, rsp interface{}, opts client.CallOptions) error { called = true if req.Service() != service { @@ -118,8 +122,8 @@ func TestCallWrapper(t *testing.T) { return fmt.Errorf("expected service: %s got %s", endpoint, req.Endpoint()) } - if node.Address != address { - return fmt.Errorf("expected address: %s got %s", address, node.Address) + if node != address { + return fmt.Errorf("expected address: %s got %s", address, node) } // don't do the call @@ -127,22 +131,19 @@ func TestCallWrapper(t *testing.T) { } } - r := newTestRegistry() + r := newTestRouter() c := NewClient( - client.Registry(r), + client.Router(r), client.WrapCall(wrap), ) - r.Register(®istry.Service{ + r.Options().Registry.Register(®istry.Service{ Name: service, Version: "latest", Nodes: []*registry.Node{ { Id: id, Address: address, - Metadata: map[string]string{ - "protocol": "mucp", - }, }, }, }) diff --git a/client/options.go b/client/options.go index 26c116e3..a4adeb0a 100644 --- a/client/options.go +++ b/client/options.go @@ -7,10 +7,10 @@ import ( "github.com/micro/go-micro/v3/broker" "github.com/micro/go-micro/v3/broker/http" "github.com/micro/go-micro/v3/codec" - "github.com/micro/go-micro/v3/registry" "github.com/micro/go-micro/v3/router" regRouter "github.com/micro/go-micro/v3/router/registry" "github.com/micro/go-micro/v3/selector" + "github.com/micro/go-micro/v3/selector/random" "github.com/micro/go-micro/v3/transport" thttp "github.com/micro/go-micro/v3/transport/http" ) @@ -28,6 +28,9 @@ type Options struct { Selector selector.Selector Transport transport.Transport + // Lookup used for looking up routes + Lookup LookupFunc + // Connection Pool PoolSize int PoolTTL time.Duration @@ -116,11 +119,12 @@ func NewOptions(options ...Option) Options { RequestTimeout: DefaultRequestTimeout, DialTimeout: transport.DefaultDialTimeout, }, + Lookup: LookupRoute, PoolSize: DefaultPoolSize, PoolTTL: DefaultPoolTTL, Broker: http.NewBroker(), Router: regRouter.NewRouter(), - Selector: selector.DefaultSelector, + Selector: random.NewSelector(), Transport: thttp.NewTransport(), } @@ -216,6 +220,13 @@ func Backoff(fn BackoffFunc) Option { } } +// Lookup sets the lookup function to use for resolving service names +func Lookup(l LookupFunc) Option { + return func(o *Options) { + o.Lookup = l + } +} + // Number of retries when making the request. // Should this be a Call Option? func Retries(i int) Option { @@ -231,13 +242,6 @@ func Retry(fn RetryFunc) Option { } } -// Registry sets the routers registry -func Registry(r registry.Registry) Option { - return func(o *Options) { - o.Router.Init(router.Registry(r)) - } -} - // The request timeout. // Should this be a Call Option? func RequestTimeout(d time.Duration) Option { diff --git a/client/wrapper.go b/client/wrapper.go index e68c31e6..79f29234 100644 --- a/client/wrapper.go +++ b/client/wrapper.go @@ -2,12 +2,10 @@ package client import ( "context" - - "github.com/micro/go-micro/v3/registry" ) // CallFunc represents the individual call func -type CallFunc func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error +type CallFunc func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error // CallWrapper is a low level wrapper for the CallFunc type CallWrapper func(CallFunc) CallFunc diff --git a/proxy/http/http_test.go b/proxy/http/http_test.go index de2efbd3..8821856b 100644 --- a/proxy/http/http_test.go +++ b/proxy/http/http_test.go @@ -10,6 +10,8 @@ import ( "github.com/micro/go-micro/v3/client" cmucp "github.com/micro/go-micro/v3/client/mucp" "github.com/micro/go-micro/v3/registry/memory" + "github.com/micro/go-micro/v3/router" + "github.com/micro/go-micro/v3/router/registry" "github.com/micro/go-micro/v3/server" "github.com/micro/go-micro/v3/server/mucp" ) @@ -53,6 +55,9 @@ func TestHTTPProxy(t *testing.T) { defer cancel() reg := memory.NewRegistry() + rtr := registry.NewRouter( + router.Registry(reg), + ) // new micro service service := mucp.NewServer( @@ -70,7 +75,7 @@ func TestHTTPProxy(t *testing.T) { go http.Serve(c, nil) cl := cmucp.NewClient( - client.Registry(reg), + client.Router(rtr), ) for _, test := range testCases { diff --git a/router/mdns/mdns.go b/router/mdns/mdns.go new file mode 100644 index 00000000..c5812a20 --- /dev/null +++ b/router/mdns/mdns.go @@ -0,0 +1,117 @@ +// Package mdns is an mdns router +package mdns + +import ( + "fmt" + "net" + "strconv" + "time" + + "github.com/micro/go-micro/v3/router" + "github.com/micro/go-micro/v3/util/mdns" +) + +// NewRouter returns an initialized dns router +func NewRouter(opts ...router.Option) router.Router { + options := router.DefaultOptions() + for _, o := range opts { + o(&options) + } + if len(options.Network) == 0 { + options.Network = "micro" + } + return &mdnsRouter{options} +} + +type mdnsRouter struct { + options router.Options +} + +func (m *mdnsRouter) Init(opts ...router.Option) error { + for _, o := range opts { + o(&m.options) + } + return nil +} + +func (m *mdnsRouter) Options() router.Options { + return m.options +} + +func (m *mdnsRouter) Table() router.Table { + return nil +} + +func (m *mdnsRouter) Lookup(opts ...router.QueryOption) ([]router.Route, error) { + options := router.NewQuery(opts...) + + // check to see if we have the port provided in the service, e.g. go-micro-srv-foo:8000 + service, port, err := net.SplitHostPort(options.Service) + if err != nil { + service = options.Service + } + + // query for the host + entries := make(chan *mdns.ServiceEntry) + + p := mdns.DefaultParams(service) + p.Timeout = time.Millisecond * 100 + p.Entries = entries + + // check if we're using our own network + if len(options.Network) > 0 { + p.Domain = options.Network + } + + // do the query + if err := mdns.Query(p); err != nil { + return nil, err + } + + var routes []router.Route + + // compose the routes based on the entries + for e := range entries { + addr := e.Host + // prefer ipv4 addrs + if len(e.AddrV4) > 0 { + addr = e.AddrV4.String() + // else use ipv6 + } else if len(e.AddrV6) > 0 { + addr = "[" + e.AddrV6.String() + "]" + } else if len(addr) == 0 { + continue + } + + pt := 443 + + if e.Port > 0 { + pt = e.Port + } + + // set the port + if len(port) > 0 { + pt, _ = strconv.Atoi(port) + } + + routes = append(routes, router.Route{ + Service: service, + Address: fmt.Sprintf("%s:%d", addr, pt), + Network: p.Domain, + }) + } + + return routes, nil +} + +func (m *mdnsRouter) Watch(opts ...router.WatchOption) (router.Watcher, error) { + return nil, nil +} + +func (m *mdnsRouter) Close() error { + return nil +} + +func (m *mdnsRouter) String() string { + return "mdns" +} diff --git a/selector/options.go b/selector/options.go index 2a27becc..99ac6bc7 100644 --- a/selector/options.go +++ b/selector/options.go @@ -1,31 +1,17 @@ package selector -import "github.com/micro/go-micro/v3/router" - // Options used to configure a selector type Options struct{} // Option updates the options type Option func(*Options) -// Filter the routes -type Filter func([]router.Route) []router.Route - // SelectOptions used to configure selection -type SelectOptions struct { - Filters []Filter -} +type SelectOptions struct{} // SelectOption updates the select options type SelectOption func(*SelectOptions) -// WithFilter adds a filter to the options -func WithFilter(f Filter) SelectOption { - return func(o *SelectOptions) { - o.Filters = append(o.Filters, f) - } -} - // NewSelectOptions parses select options func NewSelectOptions(opts ...SelectOption) SelectOptions { var options SelectOptions @@ -33,9 +19,5 @@ func NewSelectOptions(opts ...SelectOption) SelectOptions { o(&options) } - if options.Filters == nil { - options.Filters = make([]Filter, 0) - } - return options } diff --git a/selector/random.go b/selector/random.go deleted file mode 100644 index 6c40409e..00000000 --- a/selector/random.go +++ /dev/null @@ -1,56 +0,0 @@ -package selector - -import ( - "math/rand" - - "github.com/micro/go-micro/v3/router" -) - -type random struct{} - -func (r *random) Init(opts ...Option) error { - return nil -} - -func (r *random) Options() Options { - return Options{} -} - -func (r *random) Select(routes []router.Route, opts ...SelectOption) (*router.Route, error) { - // parse the options - options := NewSelectOptions(opts...) - - // apply the filters - for _, f := range options.Filters { - routes = f(routes) - } - - // we can't select from an empty pool of routes - if len(routes) == 0 { - return nil, ErrNoneAvailable - } - - // if there is only one route provided we'll select it - if len(routes) == 1 { - return &routes[0], nil - } - - // select a random route from the slice - return &routes[rand.Intn(len(routes)-1)], nil -} - -func (r *random) Record(route router.Route, err error) error { - return nil -} - -func (r *random) Close() error { - return nil -} - -func (r *random) String() string { - return "random" -} - -func newSelector(...Option) Selector { - return &random{} -} diff --git a/selector/random/random.go b/selector/random/random.go index 72486ad2..a8ab7461 100644 --- a/selector/random/random.go +++ b/selector/random/random.go @@ -1,10 +1,44 @@ package random import ( + "math/rand" + "github.com/micro/go-micro/v3/selector" ) +type random struct{} + +func (r *random) Select(routes []string, opts ...selector.SelectOption) (selector.Next, error) { + // we can't select from an empty pool of routes + if len(routes) == 0 { + return nil, selector.ErrNoneAvailable + } + + // return the next func + return func() string { + // if there is only one route provided we'll select it + if len(routes) == 1 { + return routes[0] + } + + // select a random route from the slice + return routes[rand.Intn(len(routes)-1)] + }, nil +} + +func (r *random) Record(addr string, err error) error { + return nil +} + +func (r *random) Reset() error { + return nil +} + +func (r *random) String() string { + return "random" +} + // NewSelector returns a random selector func NewSelector(opts ...selector.Option) selector.Selector { - return selector.DefaultSelector + return new(random) } diff --git a/selector/roundrobin/roundrobin.go b/selector/roundrobin/roundrobin.go index b7573e15..10d8d873 100644 --- a/selector/roundrobin/roundrobin.go +++ b/selector/roundrobin/roundrobin.go @@ -1,110 +1,35 @@ package roundrobin import ( - "sort" - "sync" - "time" - - "github.com/micro/go-micro/v3/router" "github.com/micro/go-micro/v3/selector" ) -var routeTTL = time.Minute * 15 - // NewSelector returns an initalised round robin selector func NewSelector(opts ...selector.Option) selector.Selector { - r := &roundrobin{ - routes: make(map[uint64]time.Time), - ticker: time.NewTicker(time.Minute), - } - go r.cleanRoutes() - return r + return new(roundrobin) } -type roundrobin struct { - ticker *time.Ticker - - // routes is a map with the key being a route's hash and the value being the last time it - // was used to perform a request - routes map[uint64]time.Time - sync.Mutex -} - -func (r *roundrobin) Init(opts ...selector.Option) error { - return nil -} - -func (r *roundrobin) Options() selector.Options { - return selector.Options{} -} - -func (r *roundrobin) Select(routes []router.Route, opts ...selector.SelectOption) (*router.Route, error) { - // parse the options - options := selector.NewSelectOptions(opts...) - - // apply the filters - for _, f := range options.Filters { - routes = f(routes) - } +type roundrobin struct{} +func (r *roundrobin) Select(routes []string, opts ...selector.SelectOption) (selector.Next, error) { if len(routes) == 0 { return nil, selector.ErrNoneAvailable } - r.Lock() - defer r.Unlock() + var i int - // setLastUsed will update the last used time for a route - setLastUsed := func(hash uint64) { - r.routes[hash] = time.Now() - } - - // if a route hasn't yet been seen, prioritise it - for _, route := range routes { - if _, ok := r.routes[route.Hash()]; !ok { - setLastUsed(route.Hash()) - return &route, nil - } - } - - // sort the services by the time they were last used - sort.SliceStable(routes, func(i, j int) bool { - iLastSeen := r.routes[routes[i].Hash()] - jLastSeen := r.routes[routes[j].Hash()] - return iLastSeen.UnixNano() < jLastSeen.UnixNano() - }) - - // return the route which was last used - setLastUsed(routes[0].Hash()) - return &routes[0], nil + return func() string { + route := routes[i%len(routes)] + // increment + i++ + return route + }, nil } -func (r *roundrobin) Record(srv router.Route, err error) error { - return nil -} +func (r *roundrobin) Record(addr string, err error) error { return nil } -func (r *roundrobin) Close() error { - r.ticker.Stop() - return nil -} +func (r *roundrobin) Reset() error { return nil } func (r *roundrobin) String() string { return "roundrobin" } - -func (r *roundrobin) cleanRoutes() { - for _ = range r.ticker.C { - r.Lock() - - // copy the slice to prevent concurrent map iteration and map write - rts := r.routes - - for hash, t := range rts { - if t.Unix() < time.Now().Add(-routeTTL).Unix() { - delete(r.routes, hash) - } - } - - r.Unlock() - } -} diff --git a/selector/roundrobin/roundrobin_test.go b/selector/roundrobin/roundrobin_test.go index 3a5d97e9..48bf8528 100644 --- a/selector/roundrobin/roundrobin_test.go +++ b/selector/roundrobin/roundrobin_test.go @@ -3,7 +3,6 @@ package roundrobin import ( "testing" - "github.com/micro/go-micro/v3/router" "github.com/micro/go-micro/v3/selector" "github.com/stretchr/testify/assert" ) @@ -11,39 +10,31 @@ import ( func TestRoundRobin(t *testing.T) { selector.Tests(t, NewSelector()) - r1 := router.Route{Service: "go.micro.service.foo", Address: "127.0.0.1:8000"} - r2 := router.Route{Service: "go.micro.service.foo", Address: "127.0.0.1:8001"} - r3 := router.Route{Service: "go.micro.service.foo", Address: "127.0.0.1:8002"} + r1 := "127.0.0.1:8000" + r2 := "127.0.0.1:8001" + r3 := "127.0.0.1:8002" sel := NewSelector() // By passing r1 and r2 first, it forces a set sequence of (r1 => r2 => r3 => r1) - r, err := sel.Select([]router.Route{r1}) + next, err := sel.Select([]string{r1}) + r := next() assert.Nil(t, err, "Error should be nil") - assert.Equal(t, r1, *r, "Expected route to be r1") + assert.Equal(t, r1, r, "Expected route to be r1") - r, err = sel.Select([]router.Route{r2}) + next, err = sel.Select([]string{r2}) + r = next() assert.Nil(t, err, "Error should be nil") - assert.Equal(t, r2, *r, "Expected route to be r2") + assert.Equal(t, r2, r, "Expected route to be r2") // Because r1 and r2 have been recently called, r3 should be chosen - r, err = sel.Select([]router.Route{r1, r2, r3}) + next, err = sel.Select([]string{r1, r2, r3}) + n1, n2, n3 := next(), next(), next() assert.Nil(t, err, "Error should be nil") - assert.Equal(t, r3, *r, "Expected route to be r3") + assert.Equal(t, r1, n1, "Expected route to be r3") + assert.Equal(t, r2, n2, "Expected route to be r3") + assert.Equal(t, r3, n3, "Expected route to be r3") - // r1 was called longest ago, so it should be prioritised - - r, err = sel.Select([]router.Route{r1, r2, r3}) - assert.Nil(t, err, "Error should be nil") - assert.Equal(t, r1, *r, "Expected route to be r1") - - r, err = sel.Select([]router.Route{r1, r2, r3}) - assert.Nil(t, err, "Error should be nil") - assert.Equal(t, r2, *r, "Expected route to be r2") - - r, err = sel.Select([]router.Route{r1, r2, r3}) - assert.Nil(t, err, "Error should be nil") - assert.Equal(t, r3, *r, "Expected route to be r3") } diff --git a/selector/selector.go b/selector/selector.go index 727afc22..0169a338 100644 --- a/selector/selector.go +++ b/selector/selector.go @@ -3,35 +3,24 @@ package selector import ( "errors" - - "github.com/micro/go-micro/v3/router" ) var ( - // DefaultSelector is the default selector - DefaultSelector = NewSelector() - // ErrNoneAvailable is returned by select when no routes were provided to select from ErrNoneAvailable = errors.New("none available") ) // Selector selects a route from a pool type Selector interface { - // Init a selector with options - Init(...Option) error - // Options the selector is using - Options() Options // Select a route from the pool using the strategy - Select([]router.Route, ...SelectOption) (*router.Route, error) + Select([]string, ...SelectOption) (Next, error) // Record the error returned from a route to inform future selection - Record(router.Route, error) error - // Close the selector - Close() error + Record(string, error) error + // Reset the selector + Reset() error // String returns the name of the selector String() string } -// NewSelector creates new selector and returns it -func NewSelector(opts ...Option) Selector { - return newSelector(opts...) -} +// Next returns the next node +type Next func() string diff --git a/selector/tests.go b/selector/tests.go index ff854107..6829b0c9 100644 --- a/selector/tests.go +++ b/selector/tests.go @@ -3,47 +3,35 @@ package selector import ( "testing" - "github.com/micro/go-micro/v3/router" "github.com/stretchr/testify/assert" ) // Tests runs all the tests against a selector to ensure the implementations are consistent func Tests(t *testing.T, s Selector) { - r1 := router.Route{Service: "go.micro.service.foo", Address: "127.0.0.1:8000"} - r2 := router.Route{Service: "go.micro.service.foo", Address: "127.0.0.1:8001"} + r1 := "127.0.0.1:8000" + r2 := "127.0.0.1:8001" t.Run("Select", func(t *testing.T) { t.Run("NoRoutes", func(t *testing.T) { - srv, err := s.Select([]router.Route{}) - assert.Nil(t, srv, "Route should be nil") + _, err := s.Select([]string{}) assert.Equal(t, ErrNoneAvailable, err, "Expected error to be none available") }) t.Run("OneRoute", func(t *testing.T) { - srv, err := s.Select([]router.Route{r1}) + next, err := s.Select([]string{r1}) + srv := next() assert.Nil(t, err, "Error should be nil") - assert.Equal(t, r1, *srv, "Expected the route to be returned") + assert.Equal(t, r1, srv, "Expected the route to be returned") }) t.Run("MultipleRoutes", func(t *testing.T) { - srv, err := s.Select([]router.Route{r1, r2}) + next, err := s.Select([]string{r1, r2}) assert.Nil(t, err, "Error should be nil") - if srv.Address != r1.Address && srv.Address != r2.Address { + srv := next() + if srv != r1 && srv != r2 { t.Errorf("Expected the route to be one of the inputs") } }) - - t.Run("Filters", func(t *testing.T) { - var filterApplied bool - filter := func(rts []router.Route) []router.Route { - filterApplied = true - return rts - } - - _, err := s.Select([]router.Route{r1, r2}, WithFilter(filter)) - assert.Nil(t, err, "Error should be nil") - assert.True(t, filterApplied, "Filters should be applied") - }) }) t.Run("Record", func(t *testing.T) { diff --git a/util/http/roundtripper.go b/util/http/roundtripper.go index 26718e5f..a4e3988b 100644 --- a/util/http/roundtripper.go +++ b/util/http/roundtripper.go @@ -21,18 +21,12 @@ func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { } // rudimentary retry 3 times - for i := 0; i < 3; i++ { - route, err := r.st.Select(routes) - if err != nil { - continue - } - + for _, route := range routes { req.URL.Host = route.Address w, err := r.rt.RoundTrip(req) if err != nil { continue } - return w, nil }