From 9c695ac343dca7d433cefe4e116ba6186af82bc5 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Fri, 28 Aug 2020 12:19:47 +0300 Subject: [PATCH] split router implementations Signed-off-by: Vasiliy Tolstov --- router/dns/dns.go | 122 --------- router/mdns/mdns.go | 117 -------- router/registry/registry.go | 445 ------------------------------- router/registry/registry_test.go | 27 -- router/registry/table.go | 385 -------------------------- router/registry/table_test.go | 355 ------------------------ router/registry/watcher.go | 52 ---- router/static/static.go | 82 ------ 8 files changed, 1585 deletions(-) delete mode 100644 router/dns/dns.go delete mode 100644 router/mdns/mdns.go delete mode 100644 router/registry/registry.go delete mode 100644 router/registry/registry_test.go delete mode 100644 router/registry/table.go delete mode 100644 router/registry/table_test.go delete mode 100644 router/registry/watcher.go delete mode 100644 router/static/static.go diff --git a/router/dns/dns.go b/router/dns/dns.go deleted file mode 100644 index 21a8ecac..00000000 --- a/router/dns/dns.go +++ /dev/null @@ -1,122 +0,0 @@ -package dns - -import ( - "fmt" - "net" - "strconv" - - "github.com/unistack-org/micro/v3/router" -) - -// NewRouter returns an initialized dns router -func NewRouter(opts ...router.Option) router.Router { - options := router.DefaultOptions() - for _, o := range opts { - o(&options) - } - if len(options.Network) == 0 { - options.Network = "micro" - } - return &dns{options, &table{options}} -} - -type dns struct { - options router.Options - table *table -} - -func (d *dns) Init(opts ...router.Option) error { - for _, o := range opts { - o(&d.options) - } - d.table.options = d.options - return nil -} - -func (d *dns) Options() router.Options { - return d.options -} - -func (d *dns) Table() router.Table { - return d.table -} - -func (d *dns) Lookup(opts ...router.QueryOption) ([]router.Route, error) { - return d.table.Query(opts...) -} - -func (d *dns) Watch(opts ...router.WatchOption) (router.Watcher, error) { - return nil, nil -} - -func (d *dns) Close() error { - return nil -} - -func (d *dns) String() string { - return "dns" -} - -type table struct { - options router.Options -} - -func (t *table) Create(router.Route) error { - return nil -} - -func (t *table) Delete(router.Route) error { - return nil -} - -func (t *table) Update(router.Route) error { - return nil -} - -func (t *table) List() ([]router.Route, error) { - return nil, nil -} - -func (t *table) Query(opts ...router.QueryOption) ([]router.Route, error) { - options := router.NewQuery(opts...) - - // check to see if we have the port provided in the service, e.g. go-micro-srv-foo:8000 - host, port, err := net.SplitHostPort(options.Service) - if err == nil { - // lookup the service using A records - ips, err := net.LookupHost(host) - if err != nil { - return nil, err - } - - p, _ := strconv.Atoi(port) - - // convert the ip addresses to routes - result := make([]router.Route, len(ips)) - for i, ip := range ips { - result[i] = router.Route{ - Service: options.Service, - Address: fmt.Sprintf("%s:%d", ip, uint16(p)), - } - } - return result, nil - } - - // we didn't get the port so we'll lookup the service using SRV records. If we can't lookup the - // service using the SRV record, we return the error. - _, nodes, err := net.LookupSRV(options.Service, "tcp", t.options.Network) - if err != nil { - return nil, err - } - - // convert the nodes (net services) to routes - result := make([]router.Route, len(nodes)) - for i, n := range nodes { - result[i] = router.Route{ - Service: options.Service, - Address: fmt.Sprintf("%s:%d", n.Target, n.Port), - Network: t.options.Network, - } - } - return result, nil -} diff --git a/router/mdns/mdns.go b/router/mdns/mdns.go deleted file mode 100644 index 19335e5b..00000000 --- a/router/mdns/mdns.go +++ /dev/null @@ -1,117 +0,0 @@ -// Package mdns is an mdns router -package mdns - -import ( - "fmt" - "net" - "strconv" - "time" - - "github.com/unistack-org/micro/v3/router" - "github.com/unistack-org/micro/v3/util/mdns" -) - -// NewRouter returns an initialized dns router -func NewRouter(opts ...router.Option) router.Router { - options := router.DefaultOptions() - for _, o := range opts { - o(&options) - } - if len(options.Network) == 0 { - options.Network = "micro" - } - return &mdnsRouter{options} -} - -type mdnsRouter struct { - options router.Options -} - -func (m *mdnsRouter) Init(opts ...router.Option) error { - for _, o := range opts { - o(&m.options) - } - return nil -} - -func (m *mdnsRouter) Options() router.Options { - return m.options -} - -func (m *mdnsRouter) Table() router.Table { - return nil -} - -func (m *mdnsRouter) Lookup(opts ...router.QueryOption) ([]router.Route, error) { - options := router.NewQuery(opts...) - - // check to see if we have the port provided in the service, e.g. go-micro-srv-foo:8000 - service, port, err := net.SplitHostPort(options.Service) - if err != nil { - service = options.Service - } - - // query for the host - entries := make(chan *mdns.ServiceEntry) - - p := mdns.DefaultParams(service) - p.Timeout = time.Millisecond * 100 - p.Entries = entries - - // check if we're using our own network - if len(options.Network) > 0 { - p.Domain = options.Network - } - - // do the query - if err := mdns.Query(p); err != nil { - return nil, err - } - - var routes []router.Route - - // compose the routes based on the entries - for e := range entries { - addr := e.Host - // prefer ipv4 addrs - if len(e.AddrV4) > 0 { - addr = e.AddrV4.String() - // else use ipv6 - } else if len(e.AddrV6) > 0 { - addr = "[" + e.AddrV6.String() + "]" - } else if len(addr) == 0 { - continue - } - - pt := 443 - - if e.Port > 0 { - pt = e.Port - } - - // set the port - if len(port) > 0 { - pt, _ = strconv.Atoi(port) - } - - routes = append(routes, router.Route{ - Service: service, - Address: fmt.Sprintf("%s:%d", addr, pt), - Network: p.Domain, - }) - } - - return routes, nil -} - -func (m *mdnsRouter) Watch(opts ...router.WatchOption) (router.Watcher, error) { - return nil, nil -} - -func (m *mdnsRouter) Close() error { - return nil -} - -func (m *mdnsRouter) String() string { - return "mdns" -} diff --git a/router/registry/registry.go b/router/registry/registry.go deleted file mode 100644 index 7f5d7378..00000000 --- a/router/registry/registry.go +++ /dev/null @@ -1,445 +0,0 @@ -package registry - -import ( - "fmt" - "strings" - "sync" - "time" - - "github.com/unistack-org/micro/v3/logger" - "github.com/unistack-org/micro/v3/registry" - "github.com/unistack-org/micro/v3/router" -) - -var ( - // RefreshInterval is the time at which we completely refresh the table - RefreshInterval = time.Second * 120 - // PruneInterval is how often we prune the routing table - PruneInterval = time.Second * 10 -) - -// rtr implements router interface -type rtr struct { - sync.RWMutex - - running bool - table *table - options router.Options - exit chan bool - initChan chan bool -} - -// NewRouter creates new router and returns it -func NewRouter(opts ...router.Option) (router.Router, error) { - // get default options - options := router.DefaultOptions() - - // apply requested options - for _, o := range opts { - o(&options) - } - - // construct the router - r := &rtr{ - options: options, - initChan: make(chan bool), - } - - if options.Registry == nil { - return nil, fmt.Errorf("registry not set") - } - - // create the new table, passing the fetchRoute method in as a fallback if - // the table doesn't contain the result for a query. - r.table = newTable(r.lookup) - - // start the router - r.start() - return r, nil -} - -// Init initializes router with given options -func (r *rtr) Init(opts ...router.Option) error { - r.Lock() - for _, o := range opts { - o(&r.options) - } - r.Unlock() - - // push a message to the init chan so the watchers - // can reset in the case the registry was changed - go func() { - r.initChan <- true - }() - - return nil -} - -// Options returns router options -func (r *rtr) Options() router.Options { - r.RLock() - defer r.RUnlock() - - options := r.options - - return options -} - -// Table returns routing table -func (r *rtr) Table() router.Table { - r.Lock() - defer r.Unlock() - return r.table -} - -func getDomain(srv *registry.Service) string { - // check the service metadata for domain - // TODO: domain as Domain field in registry? - if srv.Metadata != nil && len(srv.Metadata["domain"]) > 0 { - return srv.Metadata["domain"] - } else if len(srv.Nodes) > 0 && srv.Nodes[0].Metadata != nil { - return srv.Nodes[0].Metadata["domain"] - } - - // otherwise return wildcard - // TODO: return GlobalDomain or PublicDomain - return registry.DefaultDomain -} - -// manageRoute applies action on a given route -func (r *rtr) manageRoute(route router.Route, action string) error { - switch action { - case "create": - if err := r.table.Create(route); err != nil && err != router.ErrDuplicateRoute { - return fmt.Errorf("failed adding route for service %s: %s", route.Service, err) - } - case "delete": - if err := r.table.Delete(route); err != nil && err != router.ErrRouteNotFound { - return fmt.Errorf("failed deleting route for service %s: %s", route.Service, err) - } - case "update": - if err := r.table.Update(route); err != nil { - return fmt.Errorf("failed updating route for service %s: %s", route.Service, err) - } - default: - return fmt.Errorf("failed to manage route for service %s: unknown action %s", route.Service, action) - } - - return nil -} - -// createRoutes turns a service into a list routes basically converting nodes to routes -func (r *rtr) createRoutes(service *registry.Service, network string) []router.Route { - routes := make([]router.Route, 0, len(service.Nodes)) - - for _, node := range service.Nodes { - routes = append(routes, router.Route{ - Service: service.Name, - Address: node.Address, - Gateway: "", - Network: network, - Router: r.options.Id, - Link: router.DefaultLink, - Metric: router.DefaultLocalMetric, - Metadata: node.Metadata, - }) - } - - return routes -} - -// manageServiceRoutes applies action to all routes of the service. -// It returns error of the action fails with error. -func (r *rtr) manageRoutes(service *registry.Service, action, network string) error { - // action is the routing table action - action = strings.ToLower(action) - - // create a set of routes from the service - routes := r.createRoutes(service, network) - - // if its a delete action and there's no nodes - // it means we need to wipe out all the routes - // for that service - if action == "delete" && len(routes) == 0 { - // delete the service entirely - r.table.deleteService(service.Name, network) - return nil - } - - // create the routes in the table - for _, route := range routes { - logger.Tracef("Creating route %v domain: %v", route, network) - if err := r.manageRoute(route, action); err != nil { - return err - } - } - - return nil -} - -// manageRegistryRoutes applies action to all routes of each service found in the registry. -// It returns error if either the services failed to be listed or the routing table action fails. -func (r *rtr) loadRoutes(reg registry.Registry) error { - services, err := reg.ListServices(registry.ListDomain(registry.WildcardDomain)) - if err != nil { - return fmt.Errorf("failed listing services: %v", err) - } - - // add each service node as a separate route - for _, service := range services { - // get the services domain from metadata. Fallback to wildcard. - domain := getDomain(service) - - // create the routes - routes := r.createRoutes(service, domain) - - // if the routes exist save them - if len(routes) > 0 { - logger.Tracef("Creating routes for service %v domain: %v", service, domain) - for _, rt := range routes { - err := r.table.Create(rt) - - // update the route to prevent it from expiring - if err == router.ErrDuplicateRoute { - err = r.table.Update(rt) - } - - if err != nil { - logger.Errorf("Error creating route for service %v in domain %v: %v", service, domain, err) - } - } - continue - } - - // otherwise get all the service info - - // get the service to retrieve all its info - srvs, err := reg.GetService(service.Name, registry.GetDomain(domain)) - if err != nil { - logger.Tracef("Failed to get service %s domain: %s", service.Name, domain) - continue - } - - // manage the routes for all returned services - for _, srv := range srvs { - routes := r.createRoutes(srv, domain) - - if len(routes) > 0 { - logger.Tracef("Creating routes for service %v domain: %v", srv, domain) - for _, rt := range routes { - err := r.table.Create(rt) - - // update the route to prevent it from expiring - if err == router.ErrDuplicateRoute { - err = r.table.Update(rt) - } - - if err != nil { - logger.Errorf("Error creating route for service %v in domain %v: %v", service, domain, err) - } - } - } - } - } - - return nil -} - -// lookup retrieves all the routes for a given service and creates them in the routing table -func (r *rtr) lookup(service string) ([]router.Route, error) { - logger.Tracef("Fetching route for %s domain: %v", service, registry.WildcardDomain) - - services, err := r.options.Registry.GetService(service, registry.GetDomain(registry.WildcardDomain)) - if err == registry.ErrNotFound { - logger.Tracef("Failed to find route for %s", service) - return nil, router.ErrRouteNotFound - } else if err != nil { - logger.Tracef("Failed to find route for %s: %v", service, err) - return nil, fmt.Errorf("failed getting services: %v", err) - } - - var routes []router.Route - - for _, srv := range services { - domain := getDomain(srv) - // TODO: should we continue to send the event indicating we created a route? - // lookup is only called in the query path so probably not - routes = append(routes, r.createRoutes(srv, domain)...) - } - - return routes, nil -} - -// watchRegistry watches registry and updates routing table based on the received events. -// It returns error if either the registry watcher fails with error or if the routing table update fails. -func (r *rtr) watchRegistry(w registry.Watcher) error { - exit := make(chan bool) - - defer func() { - close(exit) - }() - - go func() { - defer w.Stop() - - select { - case <-exit: - return - case <-r.initChan: - return - case <-r.exit: - return - } - }() - - for { - // get the next service - res, err := w.Next() - if err != nil { - if err != registry.ErrWatcherStopped { - return err - } - break - } - - // don't process nil entries - if res.Service == nil { - logger.Trace("Received a nil service") - continue - } - - logger.Tracef("Router dealing with next route %s %+v\n", res.Action, res.Service) - - // get the services domain from metadata. Fallback to wildcard. - domain := getDomain(res.Service) - - // create/update or delete the route - if err := r.manageRoutes(res.Service, res.Action, domain); err != nil { - return err - } - } - - return nil -} - -// start the router. Should be called under lock. -func (r *rtr) start() error { - if r.running { - return nil - } - - if r.options.Precache { - // add all local service routes into the routing table - if err := r.loadRoutes(r.options.Registry); err != nil { - return fmt.Errorf("failed loading registry routes: %s", err) - } - } - - // add default gateway into routing table - if r.options.Gateway != "" { - // note, the only non-default value is the gateway - route := router.Route{ - Service: "*", - Address: "*", - Gateway: r.options.Gateway, - Network: "*", - Router: r.options.Id, - Link: router.DefaultLink, - Metric: router.DefaultLocalMetric, - } - if err := r.table.Create(route); err != nil { - return fmt.Errorf("failed adding default gateway route: %s", err) - } - } - - // create error and exit channels - r.exit = make(chan bool) - - // periodically refresh all the routes - go func() { - t1 := time.NewTicker(RefreshInterval) - defer t1.Stop() - - t2 := time.NewTicker(PruneInterval) - defer t2.Stop() - - for { - select { - case <-r.exit: - return - case <-t2.C: - r.table.pruneRoutes(RefreshInterval) - case <-t1.C: - if err := r.loadRoutes(r.options.Registry); err != nil { - logger.Debugf("failed refreshing registry routes: %s", err) - } - } - } - }() - - go func() { - for { - select { - case <-r.exit: - return - default: - logger.Tracef("Router starting registry watch") - w, err := r.options.Registry.Watch(registry.WatchDomain(registry.WildcardDomain)) - if err != nil { - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("failed creating registry watcher: %v", err) - } - time.Sleep(time.Second) - continue - } - - // watchRegistry calls stop when it's done - if err := r.watchRegistry(w); err != nil { - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Error watching the registry: %v", err) - } - time.Sleep(time.Second) - } - } - } - }() - - r.running = true - - return nil -} - -// Lookup routes in the routing table -func (r *rtr) Lookup(q ...router.QueryOption) ([]router.Route, error) { - return r.Table().Query(q...) -} - -// Watch routes -func (r *rtr) Watch(opts ...router.WatchOption) (router.Watcher, error) { - return r.table.Watch(opts...) -} - -// Close the router -func (r *rtr) Close() error { - r.Lock() - defer r.Unlock() - - select { - case <-r.exit: - return nil - default: - if !r.running { - return nil - } - close(r.exit) - - } - - r.running = false - return nil -} - -// String prints debugging information about router -func (r *rtr) String() string { - return "registry" -} diff --git a/router/registry/registry_test.go b/router/registry/registry_test.go deleted file mode 100644 index 01a729ad..00000000 --- a/router/registry/registry_test.go +++ /dev/null @@ -1,27 +0,0 @@ -// +build ignore - -package registry - -import ( - "os" - "testing" - - "github.com/unistack-org/micro/v3/registry/memory" - "github.com/unistack-org/micro/v3/router" -) - -func routerTestSetup() router.Router { - r := memory.NewRegistry() - return NewRouter(router.Registry(r)) -} - -func TestRouterClose(t *testing.T) { - r := routerTestSetup() - - if err := r.Close(); err != nil { - t.Errorf("failed to stop router: %v", err) - } - if len(os.Getenv("INTEGRATION_TESTS")) == 0 { - t.Logf("TestRouterStartStop STOPPED") - } -} diff --git a/router/registry/table.go b/router/registry/table.go deleted file mode 100644 index cb4ee8b9..00000000 --- a/router/registry/table.go +++ /dev/null @@ -1,385 +0,0 @@ -package registry - -import ( - "sync" - "time" - - "github.com/google/uuid" - "github.com/unistack-org/micro/v3/logger" - "github.com/unistack-org/micro/v3/router" -) - -// table is an in-memory routing table -type table struct { - sync.RWMutex - // lookup for a service - lookup func(string) ([]router.Route, error) - // routes stores service routes - routes map[string]map[uint64]*route - // watchers stores table watchers - watchers map[string]*tableWatcher -} - -type route struct { - route router.Route - updated time.Time -} - -// newtable creates a new routing table and returns it -func newTable(lookup func(string) ([]router.Route, error), opts ...router.Option) *table { - return &table{ - lookup: lookup, - routes: make(map[string]map[uint64]*route), - watchers: make(map[string]*tableWatcher), - } -} - -// pruneRoutes will prune routes older than the time specified -func (t *table) pruneRoutes(olderThan time.Duration) { - var routes []router.Route - - t.Lock() - - // search for all the routes - for _, routeList := range t.routes { - for _, r := range routeList { - // if any route is older than - if time.Since(r.updated).Seconds() > olderThan.Seconds() { - routes = append(routes, r.route) - } - } - } - - t.Unlock() - - // delete the routes we've found - for _, route := range routes { - t.Delete(route) - } -} - -// deleteService removes the entire service -func (t *table) deleteService(service, network string) { - t.Lock() - defer t.Unlock() - - routes, ok := t.routes[service] - if !ok { - return - } - - // delete the routes for the service - for hash, rt := range routes { - // TODO: check if this causes a problem - // with * in the network if that is a thing - // or blank strings - if rt.route.Network != network { - continue - } - delete(routes, hash) - } - - // delete the map for the service if its empty - if len(routes) == 0 { - delete(t.routes, service) - return - } - - // save the routes - t.routes[service] = routes -} - -// sendEvent sends events to all subscribed watchers -func (t *table) sendEvent(e *router.Event) { - t.RLock() - defer t.RUnlock() - - if len(e.Id) == 0 { - e.Id = uuid.New().String() - } - - for _, w := range t.watchers { - select { - case w.resChan <- e: - case <-w.done: - // don't block forever - case <-time.After(time.Second): - } - } -} - -// Create creates new route in the routing table -func (t *table) Create(r router.Route) error { - service := r.Service - sum := r.Hash() - - t.Lock() - defer t.Unlock() - - // check if there are any routes in the table for the route destination - if _, ok := t.routes[service]; !ok { - t.routes[service] = make(map[uint64]*route) - } - - // add new route to the table for the route destination - if _, ok := t.routes[service][sum]; ok { - return router.ErrDuplicateRoute - } - - // create the route - t.routes[service][sum] = &route{r, time.Now()} - - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Router emitting %s for route: %s", router.Create, r.Address) - } - - // send a route created event - go t.sendEvent(&router.Event{Type: router.Create, Timestamp: time.Now(), Route: r}) - - return nil -} - -// Delete deletes the route from the routing table -func (t *table) Delete(r router.Route) error { - service := r.Service - sum := r.Hash() - - t.Lock() - defer t.Unlock() - - if _, ok := t.routes[service]; !ok { - return router.ErrRouteNotFound - } - - if _, ok := t.routes[service][sum]; !ok { - return router.ErrRouteNotFound - } - - // delete the route from the service - delete(t.routes[service], sum) - - // delete the whole map if there are no routes left - if len(t.routes[service]) == 0 { - delete(t.routes, service) - } - - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Router emitting %s for route: %s", router.Delete, r.Address) - } - go t.sendEvent(&router.Event{Type: router.Delete, Timestamp: time.Now(), Route: r}) - - return nil -} - -// Update updates routing table with the new route -func (t *table) Update(r router.Route) error { - service := r.Service - sum := r.Hash() - - t.Lock() - defer t.Unlock() - - // check if the route destination has any routes in the table - if _, ok := t.routes[service]; !ok { - t.routes[service] = make(map[uint64]*route) - } - - if _, ok := t.routes[service][sum]; !ok { - // update the route - t.routes[service][sum] = &route{r, time.Now()} - - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Router emitting %s for route: %s", router.Update, r.Address) - } - go t.sendEvent(&router.Event{Type: router.Update, Timestamp: time.Now(), Route: r}) - return nil - } - - // just update the route, but dont emit Update event - t.routes[service][sum] = &route{r, time.Now()} - - return nil -} - -// List returns a list of all routes in the table -func (t *table) List() ([]router.Route, error) { - t.RLock() - defer t.RUnlock() - - var routes []router.Route - for _, rmap := range t.routes { - for _, route := range rmap { - routes = append(routes, route.route) - } - } - - return routes, nil -} - -// isMatch checks if the route matches given query options -func isMatch(route router.Route, address, gateway, network, rtr, link string) bool { - // matches the values provided - match := func(a, b string) bool { - if a == "*" || b == "*" || a == b { - return true - } - return false - } - - // a simple struct to hold our values - type compare struct { - a string - b string - } - - // compare the following values - values := []compare{ - {gateway, route.Gateway}, - {network, route.Network}, - {rtr, route.Router}, - {address, route.Address}, - {link, route.Link}, - } - - for _, v := range values { - // attempt to match each value - if !match(v.a, v.b) { - return false - } - } - - return true -} - -// filterRoutes finds all the routes for given network and router and returns them -func filterRoutes(routes map[uint64]*route, opts router.QueryOptions) []router.Route { - address := opts.Address - gateway := opts.Gateway - network := opts.Network - rtr := opts.Router - link := opts.Link - - // routeMap stores the routes we're going to advertise - routeMap := make(map[string][]router.Route) - - var routeCnt int - - for _, rt := range routes { - // get the actual route - route := rt.route - - if isMatch(route, address, gateway, network, rtr, link) { - // add matchihg route to the routeMap - routeKey := route.Service + "@" + route.Network - routeMap[routeKey] = append(routeMap[routeKey], route) - routeCnt++ - } - } - - results := make([]router.Route, 0, routeCnt) - for _, route := range routeMap { - results = append(results, route...) - } - - return results -} - -// Lookup queries routing table and returns all routes that match the lookup query -func (t *table) Query(q ...router.QueryOption) ([]router.Route, error) { - // create new query options - opts := router.NewQuery(q...) - - // create a cwslicelist of query results - results := make([]router.Route, 0, len(t.routes)) - - // readAndFilter routes for this service under read lock. - readAndFilter := func(q router.QueryOptions) ([]router.Route, bool) { - t.RLock() - defer t.RUnlock() - - routes, ok := t.routes[q.Service] - if !ok || len(routes) == 0 { - return nil, false - } - - return filterRoutes(routes, q), true - } - - if opts.Service != "*" { - // try and load services from the cache - if routes, ok := readAndFilter(opts); ok { - return routes, nil - } - - // lookup the route and try again - // TODO: move this logic out of the hot path - // being hammered on queries will require multiple lookups - routes, err := t.lookup(opts.Service) - if err != nil { - return nil, err - } - - // cache the routes - for _, rt := range routes { - t.Create(rt) - } - - // try again - if routes, ok := readAndFilter(opts); ok { - return routes, nil - } - - return nil, router.ErrRouteNotFound - } - - // search through all destinations - t.RLock() - - for _, routes := range t.routes { - // filter the routes - found := filterRoutes(routes, opts) - // ensure we don't append zero length routes - if len(found) == 0 { - continue - } - results = append(results, found...) - } - - t.RUnlock() - - return results, nil -} - -// Watch returns routing table entry watcher -func (t *table) Watch(opts ...router.WatchOption) (router.Watcher, error) { - // by default watch everything - wopts := router.WatchOptions{ - Service: "*", - } - - for _, o := range opts { - o(&wopts) - } - - w := &tableWatcher{ - id: uuid.New().String(), - opts: wopts, - resChan: make(chan *router.Event, 10), - done: make(chan struct{}), - } - - // when the watcher is stopped delete it - go func() { - <-w.done - t.Lock() - delete(t.watchers, w.id) - t.Unlock() - }() - - // save the watcher - t.Lock() - t.watchers[w.id] = w - t.Unlock() - - return w, nil -} diff --git a/router/registry/table_test.go b/router/registry/table_test.go deleted file mode 100644 index db4492fb..00000000 --- a/router/registry/table_test.go +++ /dev/null @@ -1,355 +0,0 @@ -// +build ignore - -package registry - -import ( - "fmt" - "testing" - - "github.com/unistack-org/micro/v3/router" -) - -func testSetup(t *testing.T) (*table, router.Route) { - r, err := NewRouter() - if err != nil { - t.Fatal(err) - } - routr := r.(*rtr) - - table := newTable(routr.lookup) - - route := router.Route{ - Service: "dest.svc", - Address: "dest.addr", - Gateway: "dest.gw", - Network: "dest.network", - Router: "src.router", - Link: "det.link", - Metric: 10, - } - - return table, route -} - -func TestCreate(t *testing.T) { - table, route := testSetup(t) - - if err := table.Create(route); err != nil { - t.Fatalf("error adding route: %s", err) - } - - // adds new route for the original destination - route.Gateway = "dest.gw2" - - if err := table.Create(route); err != nil { - t.Fatalf("error adding route: %s", err) - } - - // adding the same route under Insert policy must error - if err := table.Create(route); err != router.ErrDuplicateRoute { - t.Fatalf("error adding route. Expected error: %s, found: %s", router.ErrDuplicateRoute, err) - } -} - -func TestDelete(t *testing.T) { - table, route := testSetup(t) - - if err := table.Create(route); err != nil { - t.Fatalf("error adding route: %s", err) - } - - // should fail to delete non-existent route - prevSvc := route.Service - route.Service = "randDest" - - if err := table.Delete(route); err != router.ErrRouteNotFound { - t.Fatalf("error deleting route. Expected: %s, found: %s", router.ErrRouteNotFound, err) - } - - // we should be able to delete the existing route - route.Service = prevSvc - - if err := table.Delete(route); err != nil { - t.Fatalf("error deleting route: %s", err) - } -} - -func TestUpdate(t *testing.T) { - table, route := testSetup(t) - - if err := table.Create(route); err != nil { - t.Fatalf("error adding route: %s", err) - } - - // change the metric of the original route - route.Metric = 200 - - if err := table.Update(route); err != nil { - t.Fatalf("error updating route: %s", err) - } - - // this should add a new route - route.Service = "rand.dest" - - if err := table.Update(route); err != nil { - t.Fatalf("error updating route: %s", err) - } -} - -func TestList(t *testing.T) { - table, route := testSetup(t) - - svc := []string{"one.svc", "two.svc", "three.svc"} - - for i := 0; i < len(svc); i++ { - route.Service = svc[i] - if err := table.Create(route); err != nil { - t.Fatalf("error adding route: %s", err) - } - } - - routes, err := table.List() - if err != nil { - t.Fatalf("error listing routes: %s", err) - } - - if len(routes) != len(svc) { - t.Fatalf("incorrect number of routes listed. Expected: %d, found: %d", len(svc), len(routes)) - } -} - -func TestQuery(t *testing.T) { - table, route := testSetup(t) - - svc := []string{"svc1", "svc2", "svc3", "svc1"} - net := []string{"net1", "net2", "net1", "net3"} - gw := []string{"gw1", "gw2", "gw3", "gw3"} - rtr := []string{"rtr1", "rt2", "rt3", "rtr3"} - - for i := 0; i < len(svc); i++ { - route.Service = svc[i] - route.Network = net[i] - route.Gateway = gw[i] - route.Router = rtr[i] - route.Link = router.DefaultLink - - if err := table.Create(route); err != nil { - t.Fatalf("error adding route: %s", err) - } - } - - // return all routes - routes, err := table.Query() - if err != nil { - t.Fatalf("error looking up routes: %s", err) - } else if len(routes) == 0 { - t.Fatalf("error looking up routes: not found") - } - - // query routes particular network - network := "net1" - - routes, err = table.Query(router.QueryNetwork(network)) - if err != nil { - t.Fatalf("error looking up routes: %s", err) - } - - if len(routes) != 2 { - t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 2, len(routes)) - } - - for _, route := range routes { - if route.Network != network { - t.Fatalf("incorrect route returned. Expected network: %s, found: %s", network, route.Network) - } - } - - // query routes for particular gateway - gateway := "gw1" - - routes, err = table.Query(router.QueryGateway(gateway)) - if err != nil { - t.Fatalf("error looking up routes: %s", err) - } - - if len(routes) != 1 { - t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 1, len(routes)) - } - - if routes[0].Gateway != gateway { - t.Fatalf("incorrect route returned. Expected gateway: %s, found: %s", gateway, routes[0].Gateway) - } - - // query routes for particular router - rt := "rtr1" - - routes, err = table.Query(router.QueryRouter(rt)) - if err != nil { - t.Fatalf("error looking up routes: %s", err) - } - - if len(routes) != 1 { - t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 1, len(routes)) - } - - if routes[0].Router != rt { - t.Fatalf("incorrect route returned. Expected router: %s, found: %s", rt, routes[0].Router) - } - - // query particular gateway and network - query := []router.QueryOption{ - router.QueryGateway(gateway), - router.QueryNetwork(network), - router.QueryRouter(rt), - } - - routes, err = table.Query(query...) - if err != nil { - t.Fatalf("error looking up routes: %s", err) - } - - if len(routes) != 1 { - t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 1, len(routes)) - } - - if routes[0].Gateway != gateway { - t.Fatalf("incorrect route returned. Expected gateway: %s, found: %s", gateway, routes[0].Gateway) - } - - if routes[0].Network != network { - t.Fatalf("incorrect network returned. Expected network: %s, found: %s", network, routes[0].Network) - } - - if routes[0].Router != rt { - t.Fatalf("incorrect route returned. Expected router: %s, found: %s", rt, routes[0].Router) - } - - // non-existen route query - routes, err = table.Query(router.QueryService("foobar")) - if err != router.ErrRouteNotFound { - t.Fatalf("error looking up routes. Expected: %s, found: %s", router.ErrRouteNotFound, err) - } - - if len(routes) != 0 { - t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 0, len(routes)) - } - - // query NO routes - query = []router.QueryOption{ - router.QueryGateway(gateway), - router.QueryNetwork(network), - router.QueryLink("network"), - } - - routes, err = table.Query(query...) - if err != nil { - t.Fatalf("error looking up routes: %s", err) - } - - if len(routes) > 0 { - t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 0, len(routes)) - } - - // insert local routes to query - for i := 0; i < 2; i++ { - route.Link = "foobar" - route.Address = fmt.Sprintf("local.route.address-%d", i) - if err := table.Create(route); err != nil { - t.Fatalf("error adding route: %s", err) - } - } - - // query local routes - query = []router.QueryOption{ - router.QueryGateway("*"), - router.QueryNetwork("*"), - router.QueryLink("foobar"), - } - - routes, err = table.Query(query...) - if err != nil { - t.Fatalf("error looking up routes: %s", err) - } - - if len(routes) != 2 { - t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 2, len(routes)) - } - - // add two different routes for svcX with different metric - for i := 0; i < 2; i++ { - route.Service = "svcX" - route.Address = fmt.Sprintf("svcX.route.address-%d", i) - route.Metric = int64(100 + i) - route.Link = router.DefaultLink - if err := table.Create(route); err != nil { - t.Fatalf("error adding route: %s", err) - } - } - - query = []router.QueryOption{ - router.QueryService("svcX"), - } - - routes, err = table.Query(query...) - if err != nil { - t.Fatalf("error looking up routes: %s", err) - } - - if len(routes) != 2 { - t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 1, len(routes)) - } -} - -func TestFallback(t *testing.T) { - - r := &rtr{ - options: router.DefaultOptions(), - } - route := router.Route{ - Service: "go.micro.service.foo", - Router: r.options.Id, - Link: router.DefaultLink, - Metric: router.DefaultLocalMetric, - } - r.table = newTable(func(s string) ([]router.Route, error) { - return []router.Route{route}, nil - }) - r.start() - - rts, err := r.Lookup(router.QueryService("go.micro.service.foo")) - if err != nil { - t.Fatalf("error looking up service %s", err) - } - if len(rts) != 1 { - t.Fatalf("incorrect number of routes returned %d", len(rts)) - } - - // deleting from the table but the next query should invoke the fallback that we passed during new table creation - if err := r.table.Delete(route); err != nil { - t.Fatalf("error deleting route %s", err) - } - - rts, err = r.Lookup(router.QueryService("go.micro.service.foo")) - if err != nil { - t.Fatalf("error looking up service %s", err) - } - if len(rts) != 1 { - t.Fatalf("incorrect number of routes returned %d", len(rts)) - } - -} - -func TestFallbackError(t *testing.T) { - r := &rtr{ - options: router.DefaultOptions(), - } - r.table = newTable(func(s string) ([]router.Route, error) { - return nil, fmt.Errorf("ERROR") - }) - r.start() - _, err := r.Lookup(router.QueryService("go.micro.service.foo")) - if err == nil { - t.Fatalf("expected error looking up service but none returned") - } - -} diff --git a/router/registry/watcher.go b/router/registry/watcher.go deleted file mode 100644 index 63e0f43c..00000000 --- a/router/registry/watcher.go +++ /dev/null @@ -1,52 +0,0 @@ -package registry - -import ( - "sync" - - "github.com/unistack-org/micro/v3/router" -) - -// tableWatcher implements routing table Watcher -type tableWatcher struct { - sync.RWMutex - id string - opts router.WatchOptions - resChan chan *router.Event - done chan struct{} -} - -// Next returns the next noticed action taken on table -// TODO: right now we only allow to watch particular service -func (w *tableWatcher) Next() (*router.Event, error) { - for { - select { - case res := <-w.resChan: - switch w.opts.Service { - case res.Route.Service, "*": - return res, nil - default: - continue - } - case <-w.done: - return nil, router.ErrWatcherStopped - } - } -} - -// Chan returns watcher events channel -func (w *tableWatcher) Chan() (<-chan *router.Event, error) { - return w.resChan, nil -} - -// Stop stops routing table watcher -func (w *tableWatcher) Stop() { - w.Lock() - defer w.Unlock() - - select { - case <-w.done: - return - default: - close(w.done) - } -} diff --git a/router/static/static.go b/router/static/static.go deleted file mode 100644 index 18f0cfdd..00000000 --- a/router/static/static.go +++ /dev/null @@ -1,82 +0,0 @@ -package static - -import ( - "github.com/unistack-org/micro/v3/router" -) - -// NewRouter returns an initialized static router -func NewRouter(opts ...router.Option) router.Router { - options := router.DefaultOptions() - for _, o := range opts { - o(&options) - } - return &static{options, new(table)} -} - -type static struct { - options router.Options - table router.Table -} - -func (s *static) Init(opts ...router.Option) error { - for _, o := range opts { - o(&s.options) - } - return nil -} - -func (s *static) Options() router.Options { - return s.options -} - -func (s *static) Table() router.Table { - return nil -} - -func (s *static) Lookup(opts ...router.QueryOption) ([]router.Route, error) { - return s.table.Query(opts...) -} - -func (s *static) Watch(opts ...router.WatchOption) (router.Watcher, error) { - return nil, nil -} - -func (s *static) Close() error { - return nil -} - -func (s *static) String() string { - return "static" -} - -type table struct{} - -func (t *table) Create(router.Route) error { - return nil -} - -func (t *table) Delete(router.Route) error { - return nil -} - -func (t *table) Update(router.Route) error { - return nil -} - -func (t *table) List() ([]router.Route, error) { - return nil, nil -} - -func (t *table) Query(opts ...router.QueryOption) ([]router.Route, error) { - options := router.NewQuery(opts...) - - return []router.Route{ - { - Address: options.Service, - Service: options.Address, - Gateway: options.Gateway, - Network: options.Network, - Router: options.Router, - }, - }, nil -}