From 59035ab80121cbf8c30105a8301a666bb6815298 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Wed, 19 Jun 2019 18:01:48 +0100 Subject: [PATCH] Removed debug logs. advertiseToNetwork() replaced watchTable(). Debug logs that were helpful when squashing bugs have been removed. advertiseToNetwork replaced the watchTable which originally watched the routing table entries. We now take a different approach to propagating the local registry services into the network registry. --- router/default_router.go | 173 +++++++++++++++------------------------ router/default_table.go | 31 +++---- router/table.go | 2 + router/table_watcher.go | 24 +++++- 4 files changed, 109 insertions(+), 121 deletions(-) diff --git a/router/default_router.go b/router/default_router.go index 6952bc87..0923dffe 100644 --- a/router/default_router.go +++ b/router/default_router.go @@ -7,11 +7,19 @@ import ( "sync" "time" - "github.com/micro/go-log" "github.com/micro/go-micro/registry" "github.com/olekukonko/tablewriter" ) +var ( + // AdvertiseToNetworkTick defines how often in seconds do we scal the local registry + // to advertise the local services to the network registry + AdvertiseToNetworkTick = 5 * time.Second + // AdvertiseNetworkTTL defines network registry TTL in seconds + // NOTE: this is a rather arbitrary picked value subject to change + AdvertiseNetworkTTL = 120 * time.Second +) + type router struct { opts Options exit chan struct{} @@ -85,56 +93,46 @@ func (r *router) Start() error { return fmt.Errorf("failed adding routes for network services: %v", err) } - // routing table has been bootstrapped; - // NOTE: we only need to advertise local services upstream - // lookup local service routes and advertise them upstream - query := NewQuery(QueryNetwork("local")) - localRoutes, err := r.opts.Table.Lookup(query) - if err != nil && err != ErrRouteNotFound { - return fmt.Errorf("failed to lookup local service routes: %v", err) - } - node, err := r.parseToNode() if err != nil { return fmt.Errorf("failed to parse router into service node: %v", err) } - for _, route := range localRoutes { - service := ®istry.Service{ - Name: route.Options().DestAddr, - Nodes: []*registry.Node{node}, - } - if err := r.opts.NetworkRegistry.Register(service, registry.RegisterTTL(120*time.Second)); err != nil { - return fmt.Errorf("failed to register service %s in network registry: %v", service.Name, err) - } - } - - localWatcher, err := r.opts.LocalRegistry.Watch() + localRegWatcher, err := r.opts.LocalRegistry.Watch() if err != nil { return fmt.Errorf("failed to create local registry watcher: %v", err) } - networkWatcher, err := r.opts.NetworkRegistry.Watch() + networkRegWatcher, err := r.opts.NetworkRegistry.Watch() if err != nil { return fmt.Errorf("failed to create network registry watcher: %v", err) } - // NOTE: we only watch local netwrork entries which we then propagate upstream to network - tableWatcher, err := r.opts.Table.Watch(WatchNetwork("local")) - if err != nil { - return fmt.Errorf("failed to create routing table watcher: %v", err) - } + // error channel collecting goroutine errors + errChan := make(chan error, 3) r.wg.Add(1) - go r.manageServiceRoutes(localWatcher, "local", DefaultLocalMetric) + go func() { + defer r.wg.Done() + // watch local registry and register routes in routine table + errChan <- r.manageServiceRoutes(localRegWatcher, "local", DefaultLocalMetric) + }() r.wg.Add(1) - go r.manageServiceRoutes(networkWatcher, r.opts.NetworkAddress, DefaultNetworkMetric) + go func() { + defer r.wg.Done() + // watch network registry and register routes in routine table + errChan <- r.manageServiceRoutes(networkRegWatcher, r.opts.NetworkAddress, DefaultNetworkMetric) + }() r.wg.Add(1) - go r.watchTable(tableWatcher) + go func() { + defer r.wg.Done() + // watch local registry and advertise local service to the network + errChan <- r.advertiseToNetwork(node) + }() - return nil + return <-errChan } // addServiceRouteslists all available services in given registry and adds them to the routing table. @@ -182,11 +180,40 @@ func (r *router) parseToNode() (*registry.Node, error) { return node, nil } +// advertiseToNetwork periodically scans local registry and registers (i.e. advertises) all the local services in the network registry. +// It returns error if either the local services failed to be listed or if it fails to register local service in network registry. +func (r *router) advertiseToNetwork(node *registry.Node) error { + // ticker to periodically scan the local registry + ticker := time.NewTicker(AdvertiseToNetworkTick) + + for { + select { + case <-r.exit: + return nil + case <-ticker.C: + // list all local services + services, err := r.opts.LocalRegistry.ListServices() + if err != nil { + return fmt.Errorf("failed to list local services: %v", err) + } + // loop through all registered local services and register them in the network registry + for _, service := range services { + svc := ®istry.Service{ + Name: service.Name, + Nodes: []*registry.Node{node}, + } + // register the local service in the network registry + if err := r.opts.NetworkRegistry.Register(svc, registry.RegisterTTL(AdvertiseNetworkTTL)); err != nil { + return fmt.Errorf("failed to register service %s in network registry: %v", svc.Name, err) + } + } + } + } +} + // manageServiceRoutes watches services in given registry and updates the routing table accordingly. // It returns error if the service registry watcher has stopped or if the routing table failed to be updated. func (r *router) manageServiceRoutes(w registry.Watcher, network string, metric int) error { - defer r.wg.Done() - // wait in the background for the router to stop // when the router stops, stop the watcher and exit r.wg.Add(1) @@ -198,7 +225,6 @@ func (r *router) manageServiceRoutes(w registry.Watcher, network string, metric var watchErr error - // watch for changes to services for { res, err := w.Next() if err == registry.ErrWatcherStopped { @@ -207,7 +233,6 @@ func (r *router) manageServiceRoutes(w registry.Watcher, network string, metric if err != nil { watchErr = err - log.Logf("[router] registry error: %s", err) break } @@ -220,25 +245,18 @@ func (r *router) manageServiceRoutes(w registry.Watcher, network string, metric switch res.Action { case "create": - log.Logf("[router] received <%s> create event for service %s", network, res.Service.Name) if len(res.Service.Nodes) > 0 { - log.Logf("[router] adding <%s> service %s to routing table", network, res.Service.Name) /// only return error if the route is not duplicate, but something else has failed if err := r.opts.Table.Add(route); err != nil && err != ErrDuplicateRoute { return fmt.Errorf("failed to add route for service: %v", res.Service.Name) } - log.Logf("[router] route successfully added; routing table: \n%s", r.opts.Table) } case "delete": - log.Logf("[router] received <%s> delete event for service %s", network, res.Service.Name) - //log.Logf("[router] <%s> service nodes: %v", network, res.Service.Nodes) if len(res.Service.Nodes) < 1 { - log.Logf("[router] removing <%s> service %s from routing table", network, res.Service.Name) // only return error if the route is present in the table, but something else has failed if err := r.opts.Table.Delete(route); err != nil && err != ErrRouteNotFound { return fmt.Errorf("failed to delete route for service: %v", res.Service.Name) } - log.Logf("[router] route successfully deleted; routing table: \n%s", r.opts.Table) } } } @@ -246,66 +264,14 @@ func (r *router) manageServiceRoutes(w registry.Watcher, network string, metric return watchErr } -// watchTable watches routing table entries and either adds or deletes locally registered service to/from network registry -// It returns error if the locally registered services either fails to be added/deleted to/from network registry. -func (r *router) watchTable(w Watcher) error { - defer r.wg.Done() - - r.wg.Add(1) - go func() { - defer r.wg.Done() - <-r.exit - w.Stop() - }() - - var watchErr error - - // watch for changes to services - for { - event, err := w.Next() - if err == ErrWatcherStopped { - break - } - - if err != nil { - watchErr = err - log.Logf("[router] routing table error: %s", err) - break - } - - node, err := r.parseToNode() - if err != nil { - return fmt.Errorf("failed to parse router into node: %v", err) - } - - // we know that .DestAddr contains the registered service name - service := ®istry.Service{ - Name: event.Route.Options().DestAddr, - Nodes: []*registry.Node{node}, - } - - switch event.Type { - case CreateEvent: - log.Logf("[router] adding service %s to network registry", event.Route.Options().DestAddr) - //if err := r.opts.NetworkRegistry.Register(service, registry.RegisterTTL(120*time.Second)); err != nil { - if err := r.opts.NetworkRegistry.Register(service, registry.RegisterTTL(5*time.Second)); err != nil { - return fmt.Errorf("failed to register service %s in network registry: %v", service.Name, err) - } - log.Logf("[router] successfully added service %s to network registry", event.Route.Options().DestAddr) - case DeleteEvent: - log.Logf("[router] deleting service %s from network registry", event.Route.Options().DestAddr) - if err := r.opts.NetworkRegistry.Deregister(service); err != nil { - return fmt.Errorf("failed to deregister service %s from network registry: %v", service.Name, err) - } - log.Logf("[router] successfully deleted service %s from network registry", event.Route.Options().DestAddr) - } - } - - return watchErr -} - // Stop stops the router func (r *router) Stop() error { + // notify all goroutines to finish + close(r.exit) + + // wait for all goroutines to finish + r.wg.Wait() + // NOTE: we need a more efficient way of doing this e.g. network routes // should ideally be autodeleted when the router stops gossiping // deregister all services advertised by this router from remote registry @@ -315,6 +281,7 @@ func (r *router) Stop() error { return fmt.Errorf("failed to lookup routes for router %s: %v", r.opts.ID, err) } + // parse router to registry.Node node, err := r.parseToNode() if err != nil { return fmt.Errorf("failed to parse router into service node: %v", err) @@ -330,12 +297,6 @@ func (r *router) Stop() error { } } - // notify all goroutines to finish - close(r.exit) - - // wait for all goroutines to finish - r.wg.Wait() - return nil } diff --git a/router/default_table.go b/router/default_table.go index 648abd8f..3bed626f 100644 --- a/router/default_table.go +++ b/router/default_table.go @@ -8,12 +8,11 @@ import ( "sync" "github.com/google/uuid" - "github.com/micro/go-log" "github.com/olekukonko/tablewriter" ) -// TODO: table options TBD in the future // TableOptions are routing table options +// TODO: table options TBD in the future type TableOptions struct{} // table is in memory routing table @@ -71,11 +70,8 @@ func (t *table) Add(r Route) error { t.Lock() defer t.Unlock() - log.Logf("[table] AddRoute request %d %s: \n%s", sum, r.Options().Policy, r) - // check if the destination has any routes in the table if _, ok := t.m[destAddr]; !ok { - log.Logf("[table] destination does NOT exist ADDING: \n%s", r) t.m[destAddr] = make(map[uint64]Route) t.m[destAddr][sum] = r go t.sendEvent(&Event{Type: CreateEvent, Route: r}) @@ -84,15 +80,13 @@ func (t *table) Add(r Route) error { // add new route to the table for the given destination if _, ok := t.m[destAddr][sum]; !ok { - log.Logf("[table] route does NOT exist ADDING: \n%s", r) t.m[destAddr][sum] = r go t.sendEvent(&Event{Type: CreateEvent, Route: r}) return nil } - // only add the route if it exists and if override is requested + // only add the route if the route override is explicitly requested if _, ok := t.m[destAddr][sum]; ok && r.Options().Policy == OverrideIfExists { - log.Logf("[table] route does exist OVERRIDING: \n%s", r) t.m[destAddr][sum] = r go t.sendEvent(&Event{Type: UpdateEvent, Route: r}) return nil @@ -101,12 +95,9 @@ func (t *table) Add(r Route) error { // if we reached this point without already returning the route already exists // we return nil only if explicitly requested by the client if r.Options().Policy == IgnoreIfExists { - log.Logf("[table] route does exist IGNORING: \n%s", r) return nil } - log.Logf("[table] AddRoute request: DUPPLICATE ROUTE") - return ErrDuplicateRoute } @@ -118,10 +109,7 @@ func (t *table) Delete(r Route) error { destAddr := r.Options().DestAddr sum := t.hash(r) - log.Logf("[table] DeleteRoute request %d: \n%s", sum, r) - if _, ok := t.m[destAddr]; !ok { - log.Logf("[table] DeleteRoute Route NOT found: %s", r) return ErrRouteNotFound } @@ -154,6 +142,21 @@ func (t *table) Update(r Route) error { return ErrRouteNotFound } +// List returns a list of all routes in the table +func (t *table) List() ([]Route, error) { + t.RLock() + defer t.RUnlock() + + var routes []Route + for _, rmap := range t.m { + for _, route := range rmap { + routes = append(routes, route) + } + } + + return routes, nil +} + // Lookup queries routing table and returns all routes that match it func (t *table) Lookup(q Query) ([]Route, error) { t.RLock() diff --git a/router/table.go b/router/table.go index 53a6f8cd..c00484cd 100644 --- a/router/table.go +++ b/router/table.go @@ -23,6 +23,8 @@ type Table interface { Delete(Route) error // Update updates route in the routing table Update(Route) error + // List returns the list of all routes in the table + List() ([]Route, error) // Lookup looks up routes in the routing table and returns them Lookup(Query) ([]Route, error) // Watch returns a watcher which allows to track updates to the routing table diff --git a/router/table_watcher.go b/router/table_watcher.go index e769906c..bf18e8c0 100644 --- a/router/table_watcher.go +++ b/router/table_watcher.go @@ -2,6 +2,9 @@ package router import ( "errors" + "strings" + + "github.com/olekukonko/tablewriter" ) var ( @@ -86,7 +89,7 @@ type tableWatcher struct { // Next returns the next noticed action taken on table // TODO: this needs to be thought through properly -// we are aiming to provide the same watch options Query() provides +// we are aiming to provide the same options Query provides func (w *tableWatcher) Next() (*Event, error) { for { select { @@ -116,3 +119,22 @@ func (w *tableWatcher) Stop() { close(w.done) } } + +// String prints debug information +func (w *tableWatcher) String() string { + sb := &strings.Builder{} + + table := tablewriter.NewWriter(sb) + table.SetHeader([]string{"DestAddr", "Network"}) + + data := []string{ + w.opts.DestAddr, + w.opts.Network, + } + table.Append(data) + + // render table into sb + table.Render() + + return sb.String() +}