From f146b5241899599bfd8c1add8e3bdab608a79b6e Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Fri, 21 Aug 2020 09:23:01 +0100 Subject: [PATCH] Registry router fixes (#1961) * only cache routes if told to do so * Use roundrobin selector and retry in proxy * Update lookup to require service * Fix compile * Fix compile * Update * Update * rename query to lookup * Update router.go * Update --- client/lookup.go | 6 +- client/options.go | 4 +- network/mucp/mucp.go | 32 +++-- proxy/grpc/grpc.go | 5 +- proxy/mucp/mucp.go | 2 +- resolver/registry/registry.go | 2 +- router/dns/dns.go | 60 +++------ router/mdns/mdns.go | 10 +- router/options.go | 10 +- router/query.go | 117 ++++++++++++----- router/registry/registry.go | 138 +++++++++++++------- router/registry/table.go | 136 ++------------------ router/registry/table_test.go | 235 ++-------------------------------- router/router.go | 4 +- router/static/static.go | 49 ++----- server/mucp/subscriber.go | 10 +- util/http/roundtripper.go | 3 +- util/router/router.go | 2 +- 18 files changed, 267 insertions(+), 558 deletions(-) diff --git a/client/lookup.go b/client/lookup.go index 32161b2c..0bffd480 100644 --- a/client/lookup.go +++ b/client/lookup.go @@ -19,16 +19,16 @@ func LookupRoute(ctx context.Context, req Request, opts CallOptions) ([]string, } // construct the router query - query := []router.QueryOption{router.QueryService(req.Service())} + query := []router.LookupOption{} // if a custom network was requested, pass this to the router. By default the router will use it's // own network, which is set during initialisation. if len(opts.Network) > 0 { - query = append(query, router.QueryNetwork(opts.Network)) + query = append(query, router.LookupNetwork(opts.Network)) } // lookup the routes which can be used to execute the request - routes, err := opts.Router.Lookup(query...) + routes, err := opts.Router.Lookup(req.Service(), query...) if err == router.ErrRouteNotFound { return nil, errors.InternalServerError("go.micro.client", "service %s: %s", req.Service(), err.Error()) } else if err != nil { diff --git a/client/options.go b/client/options.go index 3fc5a3f2..4328c8e0 100644 --- a/client/options.go +++ b/client/options.go @@ -11,7 +11,7 @@ import ( "github.com/micro/go-micro/v3/router" regRouter "github.com/micro/go-micro/v3/router/registry" "github.com/micro/go-micro/v3/selector" - "github.com/micro/go-micro/v3/selector/random" + "github.com/micro/go-micro/v3/selector/roundrobin" "github.com/micro/go-micro/v3/transport" thttp "github.com/micro/go-micro/v3/transport/http" ) @@ -125,7 +125,7 @@ func NewOptions(options ...Option) Options { PoolTTL: DefaultPoolTTL, Broker: http.NewBroker(), Router: regRouter.NewRouter(), - Selector: random.NewSelector(), + Selector: roundrobin.NewSelector(), Transport: thttp.NewTransport(), } diff --git a/network/mucp/mucp.go b/network/mucp/mucp.go index 50723ea4..7859b740 100644 --- a/network/mucp/mucp.go +++ b/network/mucp/mucp.go @@ -961,12 +961,11 @@ func (n *mucpNetwork) processNetChan(listener tunnel.Listener) { route.Metric = d } - q := []router.QueryOption{ - router.QueryService(route.Service), - router.QueryLink(route.Link), + q := []router.LookupOption{ + router.LookupLink(route.Link), } - routes, err := n.router.Table().Query(q...) + routes, err := n.router.Lookup(route.Service, q...) if err != nil && err != router.ErrRouteNotFound { if logger.V(logger.DebugLevel, logger.DefaultLogger) { logger.Debugf("Network node %s failed listing best routes for %s: %v", n.id, route.Service, err) @@ -1079,16 +1078,15 @@ func (n *mucpNetwork) processNetChan(listener tunnel.Listener) { } // pruneRoutes prunes routes return by given query -func (n *mucpNetwork) pruneRoutes(q ...router.QueryOption) error { - routes, err := n.router.Table().Query(q...) +func (n *mucpNetwork) pruneRoutes(q ...router.LookupOption) error { + routes, err := n.router.Table().List() if err != nil && err != router.ErrRouteNotFound { return err } - for _, route := range routes { - if err := n.router.Table().Delete(route); err != nil && err != router.ErrRouteNotFound { - return err - } + // filter and delete the routes in question + for _, route := range router.Filter(routes, router.NewLookup(q...)) { + n.router.Table().Delete(route) } return nil @@ -1097,18 +1095,18 @@ func (n *mucpNetwork) pruneRoutes(q ...router.QueryOption) error { // pruneNodeRoutes prunes routes that were either originated by or routable via given node func (n *mucpNetwork) prunePeerRoutes(peer *node) error { // lookup all routes originated by router - q := []router.QueryOption{ - router.QueryRouter(peer.id), - router.QueryLink("*"), + q := []router.LookupOption{ + router.LookupRouter(peer.id), + router.LookupLink("*"), } if err := n.pruneRoutes(q...); err != nil { return err } // lookup all routes routable via gw - q = []router.QueryOption{ - router.QueryGateway(peer.address), - router.QueryLink("*"), + q = []router.LookupOption{ + router.LookupGateway(peer.address), + router.LookupLink("*"), } if err := n.pruneRoutes(q...); err != nil { return err @@ -1291,7 +1289,7 @@ func (n *mucpNetwork) manage() { } // otherwise delete all the routes originated by it - if err := n.pruneRoutes(router.QueryRouter(route.Router)); err != nil { + if err := n.pruneRoutes(router.LookupRouter(route.Router)); err != nil { if logger.V(logger.DebugLevel, logger.DefaultLogger) { logger.Debugf("Network failed deleting routes by %s: %v", route.Router, err) } diff --git a/proxy/grpc/grpc.go b/proxy/grpc/grpc.go index 511b0a92..c93f536b 100644 --- a/proxy/grpc/grpc.go +++ b/proxy/grpc/grpc.go @@ -94,10 +94,7 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server logger.Tracef("Proxy received request for %s %s", service, endpoint) } - // no retries with the proxy - opts := []client.CallOption{ - client.WithRetries(0), - } + var opts []client.CallOption // call a specific backend if len(p.Endpoint) > 0 { diff --git a/proxy/mucp/mucp.go b/proxy/mucp/mucp.go index 405c637a..35170729 100644 --- a/proxy/mucp/mucp.go +++ b/proxy/mucp/mucp.go @@ -208,7 +208,7 @@ func (p *Proxy) getRoute(ctx context.Context, service string) ([]router.Route, e func (p *Proxy) cacheRoutes(service string) ([]router.Route, error) { // lookup the routes in the router - results, err := p.Router.Lookup(router.QueryService(service), router.QueryNetwork("*")) + results, err := p.Router.Lookup(service, router.LookupNetwork("*")) if err != nil { // assumption that we're ok with stale routes logger.Debugf("Failed to lookup route for %s: %v", service, err) diff --git a/resolver/registry/registry.go b/resolver/registry/registry.go index 4f54b072..689cb29a 100644 --- a/resolver/registry/registry.go +++ b/resolver/registry/registry.go @@ -2,9 +2,9 @@ package registry import ( - "github.com/micro/go-micro/v3/resolver" "github.com/micro/go-micro/v3/registry" "github.com/micro/go-micro/v3/registry/mdns" + "github.com/micro/go-micro/v3/resolver" ) // Resolver is a registry network resolver diff --git a/router/dns/dns.go b/router/dns/dns.go index dfbd7b40..005d4295 100644 --- a/router/dns/dns.go +++ b/router/dns/dns.go @@ -17,19 +17,17 @@ func NewRouter(opts ...router.Option) router.Router { if len(options.Network) == 0 { options.Network = "micro" } - return &dns{options, &table{options}} + return &dns{options} } type dns struct { options router.Options - table *table } func (d *dns) Init(opts ...router.Option) error { for _, o := range opts { o(&d.options) } - d.table.options = d.options return nil } @@ -38,50 +36,16 @@ func (d *dns) Options() router.Options { } func (d *dns) Table() router.Table { - return d.table -} - -func (d *dns) Lookup(opts ...router.QueryOption) ([]router.Route, error) { - return d.table.Query(opts...) -} - -func (d *dns) Watch(opts ...router.WatchOption) (router.Watcher, error) { - return nil, nil + return nil } func (d *dns) Close() error { return nil } -func (d *dns) String() string { - return "dns" -} - -type table struct { - options router.Options -} - -func (t *table) Create(router.Route) error { - return nil -} - -func (t *table) Delete(router.Route) error { - return nil -} - -func (t *table) Update(router.Route) error { - return nil -} - -func (t *table) List() ([]router.Route, error) { - return nil, nil -} - -func (t *table) Query(opts ...router.QueryOption) ([]router.Route, error) { - options := router.NewQuery(opts...) - +func (d *dns) Lookup(service string, opts ...router.LookupOption) ([]router.Route, error) { // check to see if we have the port provided in the service, e.g. go-micro-srv-foo:8000 - host, port, err := net.SplitHostPort(options.Service) + host, port, err := net.SplitHostPort(service) if err == nil { // lookup the service using A records ips, err := net.LookupHost(host) @@ -95,7 +59,7 @@ func (t *table) Query(opts ...router.QueryOption) ([]router.Route, error) { result := make([]router.Route, len(ips)) for i, ip := range ips { result[i] = router.Route{ - Service: options.Service, + Service: service, Address: fmt.Sprintf("%s:%d", ip, uint16(p)), } } @@ -104,7 +68,7 @@ func (t *table) Query(opts ...router.QueryOption) ([]router.Route, error) { // we didn't get the port so we'll lookup the service using SRV records. If we can't lookup the // service using the SRV record, we return the error. - _, nodes, err := net.LookupSRV(options.Service, "tcp", t.options.Network) + _, nodes, err := net.LookupSRV(service, "tcp", d.options.Network) if err != nil { return nil, err } @@ -113,10 +77,18 @@ func (t *table) Query(opts ...router.QueryOption) ([]router.Route, error) { result := make([]router.Route, len(nodes)) for i, n := range nodes { result[i] = router.Route{ - Service: options.Service, + Service: service, Address: fmt.Sprintf("%s:%d", n.Target, n.Port), - Network: t.options.Network, + Network: d.options.Network, } } return result, nil } + +func (d *dns) Watch(opts ...router.WatchOption) (router.Watcher, error) { + return nil, nil +} + +func (d *dns) String() string { + return "dns" +} diff --git a/router/mdns/mdns.go b/router/mdns/mdns.go index c5812a20..ce11fd19 100644 --- a/router/mdns/mdns.go +++ b/router/mdns/mdns.go @@ -42,19 +42,19 @@ func (m *mdnsRouter) Table() router.Table { return nil } -func (m *mdnsRouter) Lookup(opts ...router.QueryOption) ([]router.Route, error) { - options := router.NewQuery(opts...) +func (m *mdnsRouter) Lookup(service string, opts ...router.LookupOption) ([]router.Route, error) { + options := router.NewLookup(opts...) // check to see if we have the port provided in the service, e.g. go-micro-srv-foo:8000 - service, port, err := net.SplitHostPort(options.Service) + srv, port, err := net.SplitHostPort(service) if err != nil { - service = options.Service + srv = service } // query for the host entries := make(chan *mdns.ServiceEntry) - p := mdns.DefaultParams(service) + p := mdns.DefaultParams(srv) p.Timeout = time.Millisecond * 100 p.Entries = entries diff --git a/router/options.go b/router/options.go index 8ea2f1a6..358afdfe 100644 --- a/router/options.go +++ b/router/options.go @@ -22,8 +22,8 @@ type Options struct { Registry registry.Registry // Context for additional options Context context.Context - // Precache routes - Precache bool + // Cache routes + Cache bool } // Id sets Router Id @@ -61,10 +61,10 @@ func Registry(r registry.Registry) Option { } } -// Precache the routes -func Precache() Option { +// Cache the routes +func Cache() Option { return func(o *Options) { - o.Precache = true + o.Cache = true } } diff --git a/router/query.go b/router/query.go index 39efd5c2..fee3ad52 100644 --- a/router/query.go +++ b/router/query.go @@ -1,13 +1,11 @@ package router -// QueryOption sets routing table query options -type QueryOption func(*QueryOptions) +// LookupOption sets routing table query options +type LookupOption func(*LookupOptions) -// QueryOptions are routing table query options +// LookupOptions are routing table query options // TODO replace with Filter(Route) bool -type QueryOptions struct { - // Service is destination service name - Service string +type LookupOptions struct { // Address of the service Address string // Gateway is route gateway @@ -20,53 +18,45 @@ type QueryOptions struct { Link string } -// QueryService sets service to query -func QueryService(s string) QueryOption { - return func(o *QueryOptions) { - o.Service = s - } -} - -// QueryAddress sets service to query -func QueryAddress(a string) QueryOption { - return func(o *QueryOptions) { +// LookupAddress sets service to query +func LookupAddress(a string) LookupOption { + return func(o *LookupOptions) { o.Address = a } } -// QueryGateway sets gateway address to query -func QueryGateway(g string) QueryOption { - return func(o *QueryOptions) { +// LookupGateway sets gateway address to query +func LookupGateway(g string) LookupOption { + return func(o *LookupOptions) { o.Gateway = g } } -// QueryNetwork sets network name to query -func QueryNetwork(n string) QueryOption { - return func(o *QueryOptions) { +// LookupNetwork sets network name to query +func LookupNetwork(n string) LookupOption { + return func(o *LookupOptions) { o.Network = n } } -// QueryRouter sets router id to query -func QueryRouter(r string) QueryOption { - return func(o *QueryOptions) { +// LookupRouter sets router id to query +func LookupRouter(r string) LookupOption { + return func(o *LookupOptions) { o.Router = r } } -// QueryLink sets the link to query -func QueryLink(link string) QueryOption { - return func(o *QueryOptions) { +// LookupLink sets the link to query +func LookupLink(link string) LookupOption { + return func(o *LookupOptions) { o.Link = link } } -// NewQuery creates new query and returns it -func NewQuery(opts ...QueryOption) QueryOptions { +// NewLookup creates new query and returns it +func NewLookup(opts ...LookupOption) LookupOptions { // default options - qopts := QueryOptions{ - Service: "*", + qopts := LookupOptions{ Address: "*", Gateway: "*", Network: "*", @@ -80,3 +70,66 @@ func NewQuery(opts ...QueryOption) QueryOptions { return qopts } + +// isMatch checks if the route matches given query options +func isMatch(route Route, address, gateway, network, rtr, link string) bool { + // matches the values provided + match := func(a, b string) bool { + if a == "*" || b == "*" || a == b { + return true + } + return false + } + + // a simple struct to hold our values + type compare struct { + a string + b string + } + + // compare the following values + values := []compare{ + {gateway, route.Gateway}, + {network, route.Network}, + {rtr, route.Router}, + {address, route.Address}, + {link, route.Link}, + } + + for _, v := range values { + // attempt to match each value + if !match(v.a, v.b) { + return false + } + } + + return true +} + +// filterRoutes finds all the routes for given network and router and returns them +func Filter(routes []Route, opts LookupOptions) []Route { + address := opts.Address + gateway := opts.Gateway + network := opts.Network + rtr := opts.Router + link := opts.Link + + // routeMap stores the routes we're going to advertise + routeMap := make(map[string][]Route) + + for _, route := range routes { + if isMatch(route, address, gateway, network, rtr, link) { + // add matchihg route to the routeMap + routeKey := route.Service + "@" + route.Network + routeMap[routeKey] = append(routeMap[routeKey], route) + } + } + + var results []Route + + for _, route := range routeMap { + results = append(results, route...) + } + + return results +} diff --git a/router/registry/registry.go b/router/registry/registry.go index e02bc418..644fdc9d 100644 --- a/router/registry/registry.go +++ b/router/registry/registry.go @@ -47,7 +47,7 @@ func NewRouter(opts ...router.Option) router.Router { // create the new table, passing the fetchRoute method in as a fallback if // the table doesn't contain the result for a query. - r.table = newTable(r.lookup) + r.table = newTable() // start the router r.start() @@ -241,8 +241,41 @@ func (r *rtr) loadRoutes(reg registry.Registry) error { return nil } +// Close the router +func (r *rtr) Close() error { + r.Lock() + defer r.Unlock() + + select { + case <-r.exit: + return nil + default: + if !r.running { + return nil + } + close(r.exit) + + } + + r.running = false + return nil +} + // lookup retrieves all the routes for a given service and creates them in the routing table -func (r *rtr) lookup(service string) ([]router.Route, error) { +func (r *rtr) Lookup(service string, opts ...router.LookupOption) ([]router.Route, error) { + q := router.NewLookup(opts...) + + // if we find the routes filter and return them + routes, err := r.table.Query(service) + if err == nil { + routes = router.Filter(routes, q) + if len(routes) == 0 { + return nil, router.ErrRouteNotFound + } + return routes, nil + } + + // lookup the route logger.Tracef("Fetching route for %s domain: %v", service, registry.WildcardDomain) services, err := r.options.Registry.GetService(service, registry.GetDomain(registry.WildcardDomain)) @@ -254,8 +287,6 @@ func (r *rtr) lookup(service string) ([]router.Route, error) { return nil, fmt.Errorf("failed getting services: %v", err) } - var routes []router.Route - for _, srv := range services { domain := getDomain(srv) // TODO: should we continue to send the event indicating we created a route? @@ -263,6 +294,17 @@ func (r *rtr) lookup(service string) ([]router.Route, error) { routes = append(routes, r.createRoutes(srv, domain)...) } + // if we're supposed to cache then save the routes + if r.options.Cache { + for _, route := range routes { + r.table.Create(route) + } + } + + routes = router.Filter(routes, q) + if len(routes) == 0 { + return nil, router.ErrRouteNotFound + } return routes, nil } @@ -324,13 +366,6 @@ func (r *rtr) start() error { return nil } - if r.options.Precache { - // add all local service routes into the routing table - if err := r.loadRoutes(r.options.Registry); err != nil { - return fmt.Errorf("failed loading registry routes: %s", err) - } - } - // add default gateway into routing table if r.options.Gateway != "" { // note, the only non-default value is the gateway @@ -350,25 +385,59 @@ func (r *rtr) start() error { // create error and exit channels r.exit = make(chan bool) + r.running = true - // periodically refresh all the routes + // only cache if told to do so + if !r.options.Cache { + return nil + } + + // create a refresh notify channel + refresh := make(chan bool, 1) + + // fires the refresh for loading routes + refreshRoutes := func() { + select { + case refresh <- true: + default: + } + } + + // refresh all the routes in the event of a failure watching the registry go func() { - t1 := time.NewTicker(RefreshInterval) - defer t1.Stop() + var lastRefresh time.Time - t2 := time.NewTicker(PruneInterval) - defer t2.Stop() + // load a refresh + refreshRoutes() for { select { case <-r.exit: return - case <-t2.C: - r.table.pruneRoutes(RefreshInterval) - case <-t1.C: + case <-refresh: + // don't refresh if we've done so in the past minute + if !lastRefresh.IsZero() && time.Since(lastRefresh) < time.Minute { + continue + } + + // load new routes if err := r.loadRoutes(r.options.Registry); err != nil { logger.Debugf("failed refreshing registry routes: %s", err) + // in this don't prune + continue } + + // first time so nothing to prune + if !lastRefresh.IsZero() { + // prune any routes since last refresh since we've + // updated basically everything we care about + r.table.pruneRoutes(time.Since(lastRefresh)) + } + + // update the refresh time + lastRefresh = time.Now() + case <-time.After(RefreshInterval): + refreshRoutes() } } }() @@ -386,6 +455,8 @@ func (r *rtr) start() error { logger.Debugf("failed creating registry watcher: %v", err) } time.Sleep(time.Second) + // in the event of an error reload routes + refreshRoutes() continue } @@ -395,46 +466,21 @@ func (r *rtr) start() error { logger.Debugf("Error watching the registry: %v", err) } time.Sleep(time.Second) + // in the event of an error reload routes + refreshRoutes() } } } }() - r.running = true - return nil } -// Lookup routes in the routing table -func (r *rtr) Lookup(q ...router.QueryOption) ([]router.Route, error) { - return r.Table().Query(q...) -} - // Watch routes func (r *rtr) Watch(opts ...router.WatchOption) (router.Watcher, error) { return r.table.Watch(opts...) } -// Close the router -func (r *rtr) Close() error { - r.Lock() - defer r.Unlock() - - select { - case <-r.exit: - return nil - default: - if !r.running { - return nil - } - close(r.exit) - - } - - r.running = false - return nil -} - // String prints debugging information about router func (r *rtr) String() string { return "registry" diff --git a/router/registry/table.go b/router/registry/table.go index cefd723c..c2682554 100644 --- a/router/registry/table.go +++ b/router/registry/table.go @@ -12,8 +12,6 @@ import ( // table is an in-memory routing table type table struct { sync.RWMutex - // lookup for a service - lookup func(string) ([]router.Route, error) // routes stores service routes routes map[string]map[uint64]*route // watchers stores table watchers @@ -26,9 +24,8 @@ type route struct { } // newtable creates a new routing table and returns it -func newTable(lookup func(string) ([]router.Route, error), opts ...router.Option) *table { +func newTable() *table { return &table{ - lookup: lookup, routes: make(map[string]map[uint64]*route), watchers: make(map[string]*tableWatcher), } @@ -216,136 +213,23 @@ func (t *table) List() ([]router.Route, error) { return routes, nil } -// isMatch checks if the route matches given query options -func isMatch(route router.Route, address, gateway, network, rtr, link string) bool { - // matches the values provided - match := func(a, b string) bool { - if a == "*" || b == "*" || a == b { - return true - } - return false - } - - // a simple struct to hold our values - type compare struct { - a string - b string - } - - // compare the following values - values := []compare{ - {gateway, route.Gateway}, - {network, route.Network}, - {rtr, route.Router}, - {address, route.Address}, - {link, route.Link}, - } - - for _, v := range values { - // attempt to match each value - if !match(v.a, v.b) { - return false - } - } - - return true -} - -// filterRoutes finds all the routes for given network and router and returns them -func filterRoutes(routes map[uint64]*route, opts router.QueryOptions) []router.Route { - address := opts.Address - gateway := opts.Gateway - network := opts.Network - rtr := opts.Router - link := opts.Link - - // routeMap stores the routes we're going to advertise - routeMap := make(map[string][]router.Route) - - for _, rt := range routes { - // get the actual route - route := rt.route - - if isMatch(route, address, gateway, network, rtr, link) { - // add matchihg route to the routeMap - routeKey := route.Service + "@" + route.Network - routeMap[routeKey] = append(routeMap[routeKey], route) - } - } - - var results []router.Route - - for _, route := range routeMap { - results = append(results, route...) - } - - return results -} - // Lookup queries routing table and returns all routes that match the lookup query -func (t *table) Query(q ...router.QueryOption) ([]router.Route, error) { - // create new query options - opts := router.NewQuery(q...) - - // create a cwslicelist of query results - results := make([]router.Route, 0, len(t.routes)) - - // readAndFilter routes for this service under read lock. - readAndFilter := func(q router.QueryOptions) ([]router.Route, bool) { - t.RLock() - defer t.RUnlock() - - routes, ok := t.routes[q.Service] - if !ok || len(routes) == 0 { - return nil, false - } - - return filterRoutes(routes, q), true - } - - if opts.Service != "*" { - // try and load services from the cache - if routes, ok := readAndFilter(opts); ok { - return routes, nil - } - - // lookup the route and try again - // TODO: move this logic out of the hot path - // being hammered on queries will require multiple lookups - routes, err := t.lookup(opts.Service) - if err != nil { - return nil, err - } - - // cache the routes - for _, rt := range routes { - t.Create(rt) - } - - // try again - if routes, ok := readAndFilter(opts); ok { - return routes, nil - } +func (t *table) Query(service string) ([]router.Route, error) { + t.RLock() + defer t.RUnlock() + routeMap, ok := t.routes[service] + if !ok { return nil, router.ErrRouteNotFound } - // search through all destinations - t.RLock() + var routes []router.Route - for _, routes := range t.routes { - // filter the routes - found := filterRoutes(routes, opts) - // ensure we don't append zero length routes - if len(found) == 0 { - continue - } - results = append(results, found...) + for _, rt := range routeMap { + routes = append(routes, rt.route) } - t.RUnlock() - - return results, nil + return routes, nil } // Watch returns routing table entry watcher diff --git a/router/registry/table_test.go b/router/registry/table_test.go index 1cb5fc5e..2aacd812 100644 --- a/router/registry/table_test.go +++ b/router/registry/table_test.go @@ -1,15 +1,13 @@ package registry import ( - "fmt" "testing" "github.com/micro/go-micro/v3/router" ) func testSetup() (*table, router.Route) { - routr := NewRouter().(*rtr) - table := newTable(routr.lookup) + table := newTable() route := router.Route{ Service: "dest.svc", @@ -114,235 +112,20 @@ func TestList(t *testing.T) { func TestQuery(t *testing.T) { table, route := testSetup() - svc := []string{"svc1", "svc2", "svc3", "svc1"} - net := []string{"net1", "net2", "net1", "net3"} - gw := []string{"gw1", "gw2", "gw3", "gw3"} - rtr := []string{"rtr1", "rt2", "rt3", "rtr3"} - - for i := 0; i < len(svc); i++ { - route.Service = svc[i] - route.Network = net[i] - route.Gateway = gw[i] - route.Router = rtr[i] - route.Link = router.DefaultLink - - if err := table.Create(route); err != nil { - t.Fatalf("error adding route: %s", err) - } + if err := table.Create(route); err != nil { + t.Fatalf("error adding route: %s", err) } - // return all routes - routes, err := table.Query() + rt, err := table.Query(route.Service) if err != nil { - t.Fatalf("error looking up routes: %s", err) - } else if len(routes) == 0 { - t.Fatalf("error looking up routes: not found") + t.Fatal("Expected a route got err", err) } - // query routes particular network - network := "net1" - - routes, err = table.Query(router.QueryNetwork(network)) - if err != nil { - t.Fatalf("error looking up routes: %s", err) + if len(rt) != 1 { + t.Fatalf("Expected one route got %d", len(rt)) } - if len(routes) != 2 { - t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 2, len(routes)) - } - - for _, route := range routes { - if route.Network != network { - t.Fatalf("incorrect route returned. Expected network: %s, found: %s", network, route.Network) - } - } - - // query routes for particular gateway - gateway := "gw1" - - routes, err = table.Query(router.QueryGateway(gateway)) - if err != nil { - t.Fatalf("error looking up routes: %s", err) - } - - if len(routes) != 1 { - t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 1, len(routes)) - } - - if routes[0].Gateway != gateway { - t.Fatalf("incorrect route returned. Expected gateway: %s, found: %s", gateway, routes[0].Gateway) - } - - // query routes for particular router - rt := "rtr1" - - routes, err = table.Query(router.QueryRouter(rt)) - if err != nil { - t.Fatalf("error looking up routes: %s", err) - } - - if len(routes) != 1 { - t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 1, len(routes)) - } - - if routes[0].Router != rt { - t.Fatalf("incorrect route returned. Expected router: %s, found: %s", rt, routes[0].Router) - } - - // query particular gateway and network - query := []router.QueryOption{ - router.QueryGateway(gateway), - router.QueryNetwork(network), - router.QueryRouter(rt), - } - - routes, err = table.Query(query...) - if err != nil { - t.Fatalf("error looking up routes: %s", err) - } - - if len(routes) != 1 { - t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 1, len(routes)) - } - - if routes[0].Gateway != gateway { - t.Fatalf("incorrect route returned. Expected gateway: %s, found: %s", gateway, routes[0].Gateway) - } - - if routes[0].Network != network { - t.Fatalf("incorrect network returned. Expected network: %s, found: %s", network, routes[0].Network) - } - - if routes[0].Router != rt { - t.Fatalf("incorrect route returned. Expected router: %s, found: %s", rt, routes[0].Router) - } - - // non-existen route query - routes, err = table.Query(router.QueryService("foobar")) - if err != router.ErrRouteNotFound { - t.Fatalf("error looking up routes. Expected: %s, found: %s", router.ErrRouteNotFound, err) - } - - if len(routes) != 0 { - t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 0, len(routes)) - } - - // query NO routes - query = []router.QueryOption{ - router.QueryGateway(gateway), - router.QueryNetwork(network), - router.QueryLink("network"), - } - - routes, err = table.Query(query...) - if err != nil { - t.Fatalf("error looking up routes: %s", err) - } - - if len(routes) > 0 { - t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 0, len(routes)) - } - - // insert local routes to query - for i := 0; i < 2; i++ { - route.Link = "foobar" - route.Address = fmt.Sprintf("local.route.address-%d", i) - if err := table.Create(route); err != nil { - t.Fatalf("error adding route: %s", err) - } - } - - // query local routes - query = []router.QueryOption{ - router.QueryGateway("*"), - router.QueryNetwork("*"), - router.QueryLink("foobar"), - } - - routes, err = table.Query(query...) - if err != nil { - t.Fatalf("error looking up routes: %s", err) - } - - if len(routes) != 2 { - t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 2, len(routes)) - } - - // add two different routes for svcX with different metric - for i := 0; i < 2; i++ { - route.Service = "svcX" - route.Address = fmt.Sprintf("svcX.route.address-%d", i) - route.Metric = int64(100 + i) - route.Link = router.DefaultLink - if err := table.Create(route); err != nil { - t.Fatalf("error adding route: %s", err) - } - } - - query = []router.QueryOption{ - router.QueryService("svcX"), - } - - routes, err = table.Query(query...) - if err != nil { - t.Fatalf("error looking up routes: %s", err) - } - - if len(routes) != 2 { - t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 1, len(routes)) + if rt[0].Hash() != route.Hash() { + t.Fatal("Mismatched routes received") } } - -func TestFallback(t *testing.T) { - - r := &rtr{ - options: router.DefaultOptions(), - } - route := router.Route{ - Service: "go.micro.service.foo", - Router: r.options.Id, - Link: router.DefaultLink, - Metric: router.DefaultLocalMetric, - } - r.table = newTable(func(s string) ([]router.Route, error) { - return []router.Route{route}, nil - }) - r.start() - - rts, err := r.Lookup(router.QueryService("go.micro.service.foo")) - if err != nil { - t.Fatalf("error looking up service %s", err) - } - if len(rts) != 1 { - t.Fatalf("incorrect number of routes returned %d", len(rts)) - } - - // deleting from the table but the next query should invoke the fallback that we passed during new table creation - if err := r.table.Delete(route); err != nil { - t.Fatalf("error deleting route %s", err) - } - - rts, err = r.Lookup(router.QueryService("go.micro.service.foo")) - if err != nil { - t.Fatalf("error looking up service %s", err) - } - if len(rts) != 1 { - t.Fatalf("incorrect number of routes returned %d", len(rts)) - } - -} - -func TestFallbackError(t *testing.T) { - r := &rtr{ - options: router.DefaultOptions(), - } - r.table = newTable(func(s string) ([]router.Route, error) { - return nil, fmt.Errorf("ERROR") - }) - r.start() - _, err := r.Lookup(router.QueryService("go.micro.service.foo")) - if err == nil { - t.Fatalf("expected error looking up service but none returned") - } - -} diff --git a/router/router.go b/router/router.go index e6be9258..9d6af88b 100644 --- a/router/router.go +++ b/router/router.go @@ -23,7 +23,7 @@ type Router interface { // The routing table Table() Table // Lookup queries routes in the routing table - Lookup(...QueryOption) ([]Route, error) + Lookup(service string, opts ...LookupOption) ([]Route, error) // Watch returns a watcher which tracks updates to the routing table Watch(opts ...WatchOption) (Watcher, error) // Close the router @@ -43,7 +43,7 @@ type Table interface { // List all routes in the table List() ([]Route, error) // Query routes in the routing table - Query(...QueryOption) ([]Route, error) + Query(service string) ([]Route, error) } // Option used by the router diff --git a/router/static/static.go b/router/static/static.go index f7822b19..a25a0e5b 100644 --- a/router/static/static.go +++ b/router/static/static.go @@ -10,12 +10,11 @@ func NewRouter(opts ...router.Option) router.Router { for _, o := range opts { o(&options) } - return &static{options, new(table)} + return &static{options} } type static struct { options router.Options - table router.Table } func (s *static) Init(opts ...router.Option) error { @@ -33,8 +32,18 @@ func (s *static) Table() router.Table { return nil } -func (s *static) Lookup(opts ...router.QueryOption) ([]router.Route, error) { - return s.table.Query(opts...) +func (s *static) Lookup(service string, opts ...router.LookupOption) ([]router.Route, error) { + options := router.NewLookup(opts...) + + return []router.Route{ + router.Route{ + Address: service, + Service: options.Address, + Gateway: options.Gateway, + Network: options.Network, + Router: options.Router, + }, + }, nil } func (s *static) Watch(opts ...router.WatchOption) (router.Watcher, error) { @@ -48,35 +57,3 @@ func (s *static) Close() error { func (s *static) String() string { return "static" } - -type table struct{} - -func (t *table) Create(router.Route) error { - return nil -} - -func (t *table) Delete(router.Route) error { - return nil -} - -func (t *table) Update(router.Route) error { - return nil -} - -func (t *table) List() ([]router.Route, error) { - return nil, nil -} - -func (t *table) Query(opts ...router.QueryOption) ([]router.Route, error) { - options := router.NewQuery(opts...) - - return []router.Route{ - router.Route{ - Address: options.Service, - Service: options.Address, - Gateway: options.Gateway, - Network: options.Network, - Router: options.Router, - }, - }, nil -} diff --git a/server/mucp/subscriber.go b/server/mucp/subscriber.go index 149226be..903f1ffd 100644 --- a/server/mucp/subscriber.go +++ b/server/mucp/subscriber.go @@ -4,9 +4,9 @@ import ( "fmt" "reflect" + "github.com/micro/go-micro/v3/broker" "github.com/micro/go-micro/v3/registry" "github.com/micro/go-micro/v3/server" - "github.com/micro/go-micro/v3/broker" "github.com/micro/go-micro/v3/transport" ) @@ -31,10 +31,10 @@ type subscriber struct { } func newMessage(msg transport.Message) *broker.Message { - return &broker.Message{ - Header: msg.Header, - Body: msg.Body, - } + return &broker.Message{ + Header: msg.Header, + Body: msg.Body, + } } func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOption) server.Subscriber { diff --git a/util/http/roundtripper.go b/util/http/roundtripper.go index a4e3988b..6e387ca7 100644 --- a/util/http/roundtripper.go +++ b/util/http/roundtripper.go @@ -4,7 +4,6 @@ import ( "errors" "net/http" - "github.com/micro/go-micro/v3/router" "github.com/micro/go-micro/v3/selector" ) @@ -15,7 +14,7 @@ type roundTripper struct { } func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { - routes, err := r.opts.Router.Lookup(router.QueryService(req.URL.Host)) + routes, err := r.opts.Router.Lookup(req.URL.Host) if err != nil { return nil, err } diff --git a/util/router/router.go b/util/router/router.go index b9b5a517..e5c0a971 100644 --- a/util/router/router.go +++ b/util/router/router.go @@ -10,7 +10,7 @@ type apiRouter struct { router.Router } -func (r *apiRouter) Lookup(...router.QueryOption) ([]router.Route, error) { +func (r *apiRouter) Lookup(service string, opts ...router.LookupOption) ([]router.Route, error) { return r.routes, nil }