From 9d7420658d9ea223e3564b7cdd09dbc8213d122e Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Thu, 27 Jun 2019 22:52:51 +0100 Subject: [PATCH 1/5] Changed router interface. Added table watcher. Advertise routes * Changed router interface to return Advertisement channel * Added default gateway route to the routing table if supplied * Watch table for updates and advertise to the network * We hash the routes on 3-tuple (Destination, Gateway, Network) --- network/router/default_router.go | 158 +++++++++++++++++++++++++------ network/router/default_table.go | 8 +- network/router/options.go | 14 ++- network/router/route.go | 8 +- network/router/router.go | 23 +++-- network/router/table_watcher.go | 3 +- 6 files changed, 172 insertions(+), 42 deletions(-) diff --git a/network/router/default_router.go b/network/router/default_router.go index 7c682150..ae55ee9f 100644 --- a/network/router/default_router.go +++ b/network/router/default_router.go @@ -11,16 +11,15 @@ import ( // router provides default router implementation type router struct { - opts Options - exit chan struct{} - wg *sync.WaitGroup + opts Options + started bool + advertChan chan *Advertisement + exit chan struct{} + wg *sync.WaitGroup } // newRouter creates new router and returns it func newRouter(opts ...Option) Router { - // TODO: we need to add default GW entry here - // Should default GW be part of router options? - // get default options options := DefaultOptions() @@ -30,9 +29,11 @@ func newRouter(opts ...Option) Router { } return &router{ - opts: options, - exit: make(chan struct{}), - wg: &sync.WaitGroup{}, + opts: options, + started: false, + advertChan: make(chan *Advertisement), + exit: make(chan struct{}), + wg: &sync.WaitGroup{}, } } @@ -69,43 +70,106 @@ func (r *router) Network() string { return r.opts.Network } -// Advertise advertises the routes to the network. It is a blocking function. +// Advertise advertises the routes to the network. // It returns error if any of the launched goroutines fail with error. -func (r *router) Advertise() error { - // add local service routes into the routing table - if err := r.addServiceRoutes(r.opts.Registry, "local", DefaultLocalMetric); err != nil { - return fmt.Errorf("failed adding routes: %v", err) +func (r *router) Advertise() (<-chan *Advertisement, error) { + if !r.started { + // add local service routes into the routing table + if err := r.addServiceRoutes(r.opts.Registry, "local", DefaultLocalMetric); err != nil { + return nil, fmt.Errorf("failed adding routes: %v", err) + } + + // add default gateway into routing table + if r.opts.Gateway != "" { + // note, the only non-default value is the gateway + route := Route{ + Destination: "*", + Gateway: r.opts.Gateway, + Router: "*", + Network: "*", + Metric: DefaultLocalMetric, + } + if err := r.opts.Table.Add(route); err != nil { + return nil, fmt.Errorf("error adding default gateway route: %s", err) + } + } + + // routing table watcher that watches all routes being added + tableWatcher, err := r.opts.Table.Watch(WatchDestination("*")) + if err != nil { + return nil, fmt.Errorf("failed to create routing table watcher: %v", err) + } + + // registry watcher + regWatcher, err := r.opts.Registry.Watch() + if err != nil { + return nil, fmt.Errorf("failed to create registry watcher: %v", err) + } + + // error channel collecting goroutine errors + errChan := make(chan error, 2) + + r.wg.Add(1) + go func() { + defer r.wg.Done() + // watch local registry and register routes in routine table + errChan <- r.manageServiceRoutes(regWatcher, DefaultLocalMetric) + }() + + r.wg.Add(1) + go func() { + defer r.wg.Done() + // watch local registry and register routes in routine table + errChan <- r.watchTable(tableWatcher) + }() + + go func() { + select { + // wait for exit chan + case <-r.exit: + // wait for error + case <-errChan: + // TODO: we're missing the error context here + // might have to log it here as we don't send it down + } + + // close the advertise channel + close(r.advertChan) + }() + + // mark the router as started + r.started = true } - localWatcher, err := r.opts.Registry.Watch() - if err != nil { - return fmt.Errorf("failed to create registry watcher: %v", err) + return r.advertChan, nil +} + +// Update updates the routing table using the advertised values +func (r *router) Update(a *Advertisement) error { + // we extract the route from advertisement and update the routing table + route := Route{ + Destination: a.Event.Route.Destination, + Gateway: a.Event.Route.Gateway, + Router: a.Event.Route.Router, + Network: a.Event.Route.Network, + Metric: a.Event.Route.Metric, + Policy: AddIfNotExists, } - // error channel collecting goroutine errors - errChan := make(chan error, 1) - - r.wg.Add(1) - go func() { - defer r.wg.Done() - // watch local registry and register routes in routine table - errChan <- r.manageServiceRoutes(localWatcher, DefaultLocalMetric) - }() - - return <-errChan + return r.opts.Table.Update(route) } // addServiceRoutes adds all services in given registry to the routing table. // NOTE: this is a one-off operation done when bootstrapping the routing table // It returns error if either the services failed to be listed or -// if the routes could not be added to the routing table. +// if any of the the routes could not be added to the routing table. func (r *router) addServiceRoutes(reg registry.Registry, network string, metric int) error { services, err := reg.ListServices() if err != nil { return fmt.Errorf("failed to list services: %v", err) } - // add each service node as a separate route; + // add each service node as a separate route for _, service := range services { // get the service to retrieve all its info srvs, err := reg.GetService(service.Name) @@ -190,6 +254,40 @@ func (r *router) manageServiceRoutes(w registry.Watcher, metric int) error { return watchErr } +// watchTable watches routing table entries and either adds or deletes locally registered service to/from network registry +// It returns error if the locally registered services either fails to be added/deleted to/from network registry. +func (r *router) watchTable(w Watcher) error { + // wait in the background for the router to stop + // when the router stops, stop the watcher and exit + r.wg.Add(1) + go func() { + defer r.wg.Done() + <-r.exit + w.Stop() + }() + + var watchErr error + + for { + event, err := w.Next() + if err == ErrWatcherStopped { + break + } + + if err != nil { + watchErr = err + break + } + + r.advertChan <- &Advertisement{ + ID: r.ID(), + Event: event, + } + } + + return watchErr +} + // Stop stops the router func (r *router) Stop() error { // notify all goroutines to finish diff --git a/network/router/default_table.go b/network/router/default_table.go index 196ca4bf..d68398ad 100644 --- a/network/router/default_table.go +++ b/network/router/default_table.go @@ -129,6 +129,12 @@ func (t *table) Update(r Route) error { // check if the destAddr has ANY routes in the table if _, ok := t.m[destAddr]; !ok { + if r.Policy == AddIfNotExists { + t.m[destAddr] = make(map[uint64]Route) + t.m[destAddr][sum] = r + go t.sendEvent(&Event{Type: CreateEvent, Route: r}) + return nil + } return ErrRouteNotFound } @@ -279,7 +285,7 @@ func (t *table) String() string { // hash hashes the route using router gateway and network address func (t *table) hash(r Route) uint64 { t.h.Reset() - t.h.Write([]byte(r.Destination + r.Gateway + r.Router + r.Network)) + t.h.Write([]byte(r.Destination + r.Gateway + r.Network)) return t.h.Sum64() } diff --git a/network/router/options.go b/network/router/options.go index 0d525f76..2426fa32 100644 --- a/network/router/options.go +++ b/network/router/options.go @@ -8,6 +8,8 @@ import ( var ( // DefaultAddress is default router address DefaultAddress = ":9093" + // DefaultNetwork is default micro network + DefaultNetwork = "local" ) // Options are router options @@ -18,6 +20,8 @@ type Options struct { Address string // Network is micro network Network string + // Gateway is micro network gateway + Gateway string // Registry is the local registry Registry registry.Registry // Table is routing table @@ -45,6 +49,13 @@ func Network(n string) Option { } } +// Gateway sets network gateway +func Gateway(g string) Option { + return func(o *Options) { + o.Gateway = g + } +} + // RoutingTable sets the routing table func RoutingTable(t Table) Option { return func(o *Options) { @@ -61,12 +72,11 @@ func Registry(r registry.Registry) Option { // DefaultOptions returns router default options func DefaultOptions() Options { - // NOTE: by default both local and network registies use default registry i.e. mdns return Options{ ID: uuid.New().String(), Address: DefaultAddress, + Network: DefaultNetwork, Registry: registry.DefaultRegistry, Table: NewTable(), - Network: "local", } } diff --git a/network/router/route.go b/network/router/route.go index d52c2688..1f45b688 100644 --- a/network/router/route.go +++ b/network/router/route.go @@ -20,6 +20,8 @@ type RoutePolicy int const ( // OverrideIfExists overrides route if it already exists OverrideIfExists RoutePolicy = iota + // AddIfNotExist adds the route if it does not exist + AddIfNotExists // IgnoreIfExists instructs to not modify existing route IgnoreIfExists ) @@ -28,9 +30,11 @@ const ( func (p RoutePolicy) String() string { switch p { case OverrideIfExists: - return "OVERRIDE" + return "OVERRIDE_IF_EXISTS" + case AddIfNotExists: + return "ADD_IF_NOT_EXISTS" case IgnoreIfExists: - return "IGNORE" + return "IGNORE_IF_EXISTS" default: return "UNKNOWN" } diff --git a/network/router/router.go b/network/router/router.go index 366c2fbf..a8772768 100644 --- a/network/router/router.go +++ b/network/router/router.go @@ -1,6 +1,11 @@ // Package router provides a network routing control plane package router +var ( + // DefaultRouter is default network router + DefaultRouter = NewRouter() +) + // Router is an interface for a routing control plane type Router interface { // Init initializes the router with options @@ -15,21 +20,27 @@ type Router interface { Address() string // Network returns the network address of the router Network() string - // Advertise starts advertising the routes to the network - Advertise() error + // Advertise starts advertising routes to the network + Advertise() (<-chan *Advertisement, error) + // Update updates the routing table + Update(*Advertisement) error // Stop stops the router Stop() error // String returns debug info String() string } +// Advertisement is sent by the router to the network +type Advertisement struct { + // ID is the source router ID + ID string + // Event defines advertisement even + Event *Event +} + // Option used by the router type Option func(*Options) -var ( - DefaultRouter = NewRouter() -) - // NewRouter creates new Router and returns it func NewRouter(opts ...Option) Router { return newRouter(opts...) diff --git a/network/router/table_watcher.go b/network/router/table_watcher.go index 97d1f1b7..4e9d9a9e 100644 --- a/network/router/table_watcher.go +++ b/network/router/table_watcher.go @@ -81,7 +81,7 @@ type tableWatcher struct { } // Next returns the next noticed action taken on table -// TODO: this needs to be thought through properly; we only allow watching particular route destination +// TODO: this needs to be thought through properly; we only allow watching particular route destination for now func (w *tableWatcher) Next() (*Event, error) { for { select { @@ -93,6 +93,7 @@ func (w *tableWatcher) Next() (*Event, error) { if w.opts.Destination == res.Route.Destination { return res, nil } + continue } case <-w.done: return nil, ErrWatcherStopped From 8ad2f73ad6b94e991b35b1b0fa3c2fae6e84e660 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Fri, 28 Jun 2019 11:53:55 +0100 Subject: [PATCH 2/5] Advertisement is now Update; started bit is now running. --- network/router/default_router.go | 34 +++++++++++++++++++++----------- network/router/router.go | 12 +++++++---- 2 files changed, 30 insertions(+), 16 deletions(-) diff --git a/network/router/default_router.go b/network/router/default_router.go index ae55ee9f..18544b6e 100644 --- a/network/router/default_router.go +++ b/network/router/default_router.go @@ -4,6 +4,7 @@ import ( "fmt" "strings" "sync" + "time" "github.com/micro/go-micro/registry" "github.com/olekukonko/tablewriter" @@ -12,8 +13,8 @@ import ( // router provides default router implementation type router struct { opts Options - started bool - advertChan chan *Advertisement + running bool + advertChan chan *Update exit chan struct{} wg *sync.WaitGroup } @@ -30,8 +31,8 @@ func newRouter(opts ...Option) Router { return &router{ opts: options, - started: false, - advertChan: make(chan *Advertisement), + running: false, + advertChan: make(chan *Update), exit: make(chan struct{}), wg: &sync.WaitGroup{}, } @@ -72,8 +73,8 @@ func (r *router) Network() string { // Advertise advertises the routes to the network. // It returns error if any of the launched goroutines fail with error. -func (r *router) Advertise() (<-chan *Advertisement, error) { - if !r.started { +func (r *router) Advertise() (<-chan *Update, error) { + if !r.running { // add local service routes into the routing table if err := r.addServiceRoutes(r.opts.Registry, "local", DefaultLocalMetric); err != nil { return nil, fmt.Errorf("failed adding routes: %v", err) @@ -135,17 +136,19 @@ func (r *router) Advertise() (<-chan *Advertisement, error) { // close the advertise channel close(r.advertChan) + // mark the router as stopped + r.running = false }() - // mark the router as started - r.started = true + // mark the router as running + r.running = true } return r.advertChan, nil } // Update updates the routing table using the advertised values -func (r *router) Update(a *Advertisement) error { +func (r *router) Update(a *Update) error { // we extract the route from advertisement and update the routing table route := Route{ Destination: a.Event.Route.Destination, @@ -279,9 +282,16 @@ func (r *router) watchTable(w Watcher) error { break } - r.advertChan <- &Advertisement{ - ID: r.ID(), - Event: event, + u := &Update{ + ID: r.ID(), + Timestamp: time.Now(), + Event: event, + } + + select { + case <-r.exit: + return nil + case r.advertChan <- u: } } diff --git a/network/router/router.go b/network/router/router.go index a8772768..9aa8789a 100644 --- a/network/router/router.go +++ b/network/router/router.go @@ -1,6 +1,8 @@ // Package router provides a network routing control plane package router +import "time" + var ( // DefaultRouter is default network router DefaultRouter = NewRouter() @@ -21,19 +23,21 @@ type Router interface { // Network returns the network address of the router Network() string // Advertise starts advertising routes to the network - Advertise() (<-chan *Advertisement, error) + Advertise() (<-chan *Update, error) // Update updates the routing table - Update(*Advertisement) error + Update(*Update) error // Stop stops the router Stop() error // String returns debug info String() string } -// Advertisement is sent by the router to the network -type Advertisement struct { +// Update is sent by the router to the network +type Update struct { // ID is the source router ID ID string + // Timestamp marks the time when update is sent + Timestamp time.Time // Event defines advertisement even Event *Event } From 32300eadc1a90cb1fff09f8f072d622414c738f9 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Fri, 28 Jun 2019 18:35:53 +0100 Subject: [PATCH 3/5] Added Router Status which allows to track router status --- network/router/default_router.go | 45 ++++++++++++++++++++------------ network/router/router.go | 38 ++++++++++++++++++++++++++- 2 files changed, 65 insertions(+), 18 deletions(-) diff --git a/network/router/default_router.go b/network/router/default_router.go index 18544b6e..cddd1169 100644 --- a/network/router/default_router.go +++ b/network/router/default_router.go @@ -13,10 +13,11 @@ import ( // router provides default router implementation type router struct { opts Options - running bool + status Status advertChan chan *Update exit chan struct{} wg *sync.WaitGroup + sync.RWMutex } // newRouter creates new router and returns it @@ -31,7 +32,7 @@ func newRouter(opts ...Option) Router { return &router{ opts: options, - running: false, + status: Status{Error: nil, Code: Stopped}, advertChan: make(chan *Update), exit: make(chan struct{}), wg: &sync.WaitGroup{}, @@ -74,7 +75,10 @@ func (r *router) Network() string { // Advertise advertises the routes to the network. // It returns error if any of the launched goroutines fail with error. func (r *router) Advertise() (<-chan *Update, error) { - if !r.running { + r.Lock() + defer r.Unlock() + + if r.status.Code != Running { // add local service routes into the routing table if err := r.addServiceRoutes(r.opts.Registry, "local", DefaultLocalMetric); err != nil { return nil, fmt.Errorf("failed adding routes: %v", err) @@ -91,11 +95,11 @@ func (r *router) Advertise() (<-chan *Update, error) { Metric: DefaultLocalMetric, } if err := r.opts.Table.Add(route); err != nil { - return nil, fmt.Errorf("error adding default gateway route: %s", err) + return nil, fmt.Errorf("error to add default gateway route: %s", err) } } - // routing table watcher that watches all routes being added + // routing table watcher which watches all routes i.e. to every destination tableWatcher, err := r.opts.Table.Watch(WatchDestination("*")) if err != nil { return nil, fmt.Errorf("failed to create routing table watcher: %v", err) @@ -120,28 +124,27 @@ func (r *router) Advertise() (<-chan *Update, error) { r.wg.Add(1) go func() { defer r.wg.Done() - // watch local registry and register routes in routine table + // watch local registry and register routes in routing table errChan <- r.watchTable(tableWatcher) }() + r.wg.Add(1) go func() { + defer r.wg.Done() select { // wait for exit chan case <-r.exit: - // wait for error - case <-errChan: - // TODO: we're missing the error context here - // might have to log it here as we don't send it down + r.status.Code = Stopped + case err := <-errChan: + r.status.Code = Error + r.status.Error = err } - // close the advertise channel close(r.advertChan) - // mark the router as stopped - r.running = false }() // mark the router as running - r.running = true + r.status.Code = Running } return r.advertChan, nil @@ -188,13 +191,13 @@ func (r *router) addServiceRoutes(reg registry.Registry, network string, metric // range over the flat slice of nodes for _, node := range nodes { - gw := node.Address + gateway := node.Address if node.Port > 0 { - gw = fmt.Sprintf("%s:%d", node.Address, node.Port) + gateway = fmt.Sprintf("%s:%d", node.Address, node.Port) } route := Route{ Destination: service.Name, - Gateway: gw, + Gateway: gateway, Router: r.opts.Address, Network: r.opts.Network, Metric: metric, @@ -298,6 +301,14 @@ func (r *router) watchTable(w Watcher) error { return watchErr } +// Status returns router status +func (r *router) Status() Status { + r.RLock() + defer r.RUnlock() + + return r.status +} + // Stop stops the router func (r *router) Stop() error { // notify all goroutines to finish diff --git a/network/router/router.go b/network/router/router.go index 9aa8789a..23da4b06 100644 --- a/network/router/router.go +++ b/network/router/router.go @@ -26,6 +26,8 @@ type Router interface { Advertise() (<-chan *Update, error) // Update updates the routing table Update(*Update) error + // Status returns router status + Status() Status // Stop stops the router Stop() error // String returns debug info @@ -34,7 +36,7 @@ type Router interface { // Update is sent by the router to the network type Update struct { - // ID is the source router ID + // ID is the router ID ID string // Timestamp marks the time when update is sent Timestamp time.Time @@ -42,6 +44,40 @@ type Update struct { Event *Event } +// StatusCode defines router status +type StatusCode int + +// Status is router status +type Status struct { + // Error is router error + Error error + // Code defines router status + Code StatusCode +} + +const ( + // Running means the rotuer is running + Running StatusCode = iota + // Error means the router has crashed with error + Error + // Stopped means the router has stopped + Stopped +) + +// String returns human readable status code +func (sc StatusCode) String() string { + switch sc { + case Running: + return "RUNNING" + case Error: + return "ERROR" + case Stopped: + return "STOPPED" + default: + return "UNKNOWN" + } +} + // Option used by the router type Option func(*Options) From cff46c3fd8edecf37883005db57d4338139b5911 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Sat, 29 Jun 2019 00:46:22 +0100 Subject: [PATCH 4/5] Added Init state. Recreate exit and advertise channels when recovering In order to differentiate between intialized and other states we introduced a new state: Init. The router is in this state only when it's created. We have cleaned up router status management which is now handled by manageStatus function only. --- network/router/default_router.go | 232 ++++++++++++++++++------------- network/router/router.go | 8 +- 2 files changed, 138 insertions(+), 102 deletions(-) diff --git a/network/router/default_router.go b/network/router/default_router.go index cddd1169..91f9e2a2 100644 --- a/network/router/default_router.go +++ b/network/router/default_router.go @@ -32,7 +32,7 @@ func newRouter(opts ...Option) Router { return &router{ opts: options, - status: Status{Error: nil, Code: Stopped}, + status: Status{Error: nil, Code: Init}, advertChan: make(chan *Update), exit: make(chan struct{}), wg: &sync.WaitGroup{}, @@ -72,99 +72,6 @@ func (r *router) Network() string { return r.opts.Network } -// Advertise advertises the routes to the network. -// It returns error if any of the launched goroutines fail with error. -func (r *router) Advertise() (<-chan *Update, error) { - r.Lock() - defer r.Unlock() - - if r.status.Code != Running { - // add local service routes into the routing table - if err := r.addServiceRoutes(r.opts.Registry, "local", DefaultLocalMetric); err != nil { - return nil, fmt.Errorf("failed adding routes: %v", err) - } - - // add default gateway into routing table - if r.opts.Gateway != "" { - // note, the only non-default value is the gateway - route := Route{ - Destination: "*", - Gateway: r.opts.Gateway, - Router: "*", - Network: "*", - Metric: DefaultLocalMetric, - } - if err := r.opts.Table.Add(route); err != nil { - return nil, fmt.Errorf("error to add default gateway route: %s", err) - } - } - - // routing table watcher which watches all routes i.e. to every destination - tableWatcher, err := r.opts.Table.Watch(WatchDestination("*")) - if err != nil { - return nil, fmt.Errorf("failed to create routing table watcher: %v", err) - } - - // registry watcher - regWatcher, err := r.opts.Registry.Watch() - if err != nil { - return nil, fmt.Errorf("failed to create registry watcher: %v", err) - } - - // error channel collecting goroutine errors - errChan := make(chan error, 2) - - r.wg.Add(1) - go func() { - defer r.wg.Done() - // watch local registry and register routes in routine table - errChan <- r.manageServiceRoutes(regWatcher, DefaultLocalMetric) - }() - - r.wg.Add(1) - go func() { - defer r.wg.Done() - // watch local registry and register routes in routing table - errChan <- r.watchTable(tableWatcher) - }() - - r.wg.Add(1) - go func() { - defer r.wg.Done() - select { - // wait for exit chan - case <-r.exit: - r.status.Code = Stopped - case err := <-errChan: - r.status.Code = Error - r.status.Error = err - } - // close the advertise channel - close(r.advertChan) - }() - - // mark the router as running - r.status.Code = Running - } - - return r.advertChan, nil -} - -// Update updates the routing table using the advertised values -func (r *router) Update(a *Update) error { - // we extract the route from advertisement and update the routing table - route := Route{ - Destination: a.Event.Route.Destination, - Gateway: a.Event.Route.Gateway, - Router: a.Event.Route.Router, - Network: a.Event.Route.Network, - Metric: a.Event.Route.Metric, - Policy: AddIfNotExists, - } - - return r.opts.Table.Update(route) -} - // addServiceRoutes adds all services in given registry to the routing table. // NOTE: this is a one-off operation done when bootstrapping the routing table // It returns error if either the services failed to be listed or @@ -301,21 +208,145 @@ func (r *router) watchTable(w Watcher) error { return watchErr } +// manageStatus manages router status +func (r *router) manageStatus(errChan <-chan error) { + defer r.wg.Done() + + r.Lock() + r.status.Code = Running + r.status.Error = nil + r.Unlock() + + var code StatusCode + var err error + + select { + case <-r.exit: + code = Stopped + case err = <-errChan: + code = Error + } + + r.Lock() + defer r.Unlock() + r.status.Code = code + r.status.Error = err + + // close the advertise channel + close(r.advertChan) + + // stop the router if some error happened + // this will notify all watcher goroutines to stop + if err != nil && code != Stopped { + close(r.exit) + } +} + +// Advertise advertises the routes to the network. +// It returns error if any of the launched goroutines fail with error. +func (r *router) Advertise() (<-chan *Update, error) { + r.Lock() + defer r.Unlock() + + if r.status.Code != Running { + // add local service routes into the routing table + if err := r.addServiceRoutes(r.opts.Registry, "local", DefaultLocalMetric); err != nil { + return nil, fmt.Errorf("failed adding routes: %v", err) + } + // add default gateway into routing table + if r.opts.Gateway != "" { + // note, the only non-default value is the gateway + route := Route{ + Destination: "*", + Gateway: r.opts.Gateway, + Router: "*", + Network: "*", + Metric: DefaultLocalMetric, + } + if err := r.opts.Table.Add(route); err != nil { + return nil, fmt.Errorf("error to add default gateway route: %s", err) + } + } + + // NOTE: we only need to recreate the exit/advertChan if the router errored or was stopped + if r.status.Code == Error || r.status.Code == Stopped { + r.exit = make(chan struct{}) + r.advertChan = make(chan *Update) + } + + // routing table watcher which watches all routes i.e. to every destination + tableWatcher, err := r.opts.Table.Watch(WatchDestination("*")) + if err != nil { + return nil, fmt.Errorf("failed to create routing table watcher: %v", err) + } + // registry watcher + regWatcher, err := r.opts.Registry.Watch() + if err != nil { + return nil, fmt.Errorf("failed to create registry watcher: %v", err) + } + + // error channel collecting goroutine errors + errChan := make(chan error, 2) + + r.wg.Add(1) + go func() { + defer r.wg.Done() + // watch local registry and register routes in routine table + errChan <- r.manageServiceRoutes(regWatcher, DefaultLocalMetric) + }() + + r.wg.Add(1) + go func() { + defer r.wg.Done() + // watch local registry and register routes in routing table + errChan <- r.watchTable(tableWatcher) + }() + + r.wg.Add(1) + go r.manageStatus(errChan) + } + + return r.advertChan, nil +} + +// Update updates the routing table using the advertised values +func (r *router) Update(a *Update) error { + // we extract the route from advertisement and update the routing table + route := Route{ + Destination: a.Event.Route.Destination, + Gateway: a.Event.Route.Gateway, + Router: a.Event.Route.Router, + Network: a.Event.Route.Network, + Metric: a.Event.Route.Metric, + Policy: AddIfNotExists, + } + + return r.opts.Table.Update(route) +} + // Status returns router status func (r *router) Status() Status { r.RLock() defer r.RUnlock() - return r.status + // make a copy of the status + status := r.status + + return status } // Stop stops the router func (r *router) Stop() error { - // notify all goroutines to finish - close(r.exit) + r.RLock() + defer r.RUnlock() - // wait for all goroutines to finish - r.wg.Wait() + // only close the channel if the router is running + if r.status.Code == Running { + // notify all goroutines to finish + close(r.exit) + // wait for all goroutines to finish + r.wg.Wait() + } return nil } @@ -325,13 +356,14 @@ func (r *router) String() string { sb := &strings.Builder{} table := tablewriter.NewWriter(sb) - table.SetHeader([]string{"ID", "Address", "Network", "Table"}) + table.SetHeader([]string{"ID", "Address", "Network", "Table", "Status"}) data := []string{ r.opts.ID, r.opts.Address, r.opts.Network, fmt.Sprintf("%d", r.opts.Table.Size()), + r.status.Code.String(), } table.Append(data) diff --git a/network/router/router.go b/network/router/router.go index 23da4b06..e45c0baa 100644 --- a/network/router/router.go +++ b/network/router/router.go @@ -56,8 +56,10 @@ type Status struct { } const ( - // Running means the rotuer is running - Running StatusCode = iota + // Init means the rotuer has just been initialized + Init StatusCode = iota + // Running means the router is running + Running // Error means the router has crashed with error Error // Stopped means the router has stopped @@ -67,6 +69,8 @@ const ( // String returns human readable status code func (sc StatusCode) String() string { switch sc { + case Init: + return "INITIALIZED" case Running: return "RUNNING" case Error: From f6e064cdbd1d363b4ff018a81a5b85ebce1bc981 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Mon, 1 Jul 2019 15:43:50 +0100 Subject: [PATCH 5/5] Fixed router idempotency. Return registry.ErrWatchStopped from mdns reg --- network/router/default_router.go | 63 +++++++++++++++++--------------- network/router/table_watcher.go | 2 +- registry/mdns_watcher.go | 3 +- 3 files changed, 36 insertions(+), 32 deletions(-) diff --git a/network/router/default_router.go b/network/router/default_router.go index 91f9e2a2..5c7b73ae 100644 --- a/network/router/default_router.go +++ b/network/router/default_router.go @@ -134,12 +134,10 @@ func (r *router) manageServiceRoutes(w registry.Watcher, metric int) error { for { res, err := w.Next() - if err == registry.ErrWatcherStopped { - break - } - if err != nil { - watchErr = err + if err != registry.ErrWatcherStopped { + watchErr = err + } break } @@ -181,14 +179,13 @@ func (r *router) watchTable(w Watcher) error { var watchErr error +exit: for { event, err := w.Next() - if err == ErrWatcherStopped { - break - } - if err != nil { - watchErr = err + if err != ErrWatcherStopped { + watchErr = err + } break } @@ -200,23 +197,21 @@ func (r *router) watchTable(w Watcher) error { select { case <-r.exit: - return nil + break exit case r.advertChan <- u: } } + // close the advertisement channel + close(r.advertChan) + return watchErr } -// manageStatus manages router status -func (r *router) manageStatus(errChan <-chan error) { +// watchError watches router errors +func (r *router) watchError(errChan <-chan error) { defer r.wg.Done() - r.Lock() - r.status.Code = Running - r.status.Error = nil - r.Unlock() - var code StatusCode var err error @@ -229,14 +224,13 @@ func (r *router) manageStatus(errChan <-chan error) { r.Lock() defer r.Unlock() - r.status.Code = code - r.status.Error = err - - // close the advertise channel - close(r.advertChan) + status := Status{ + Code: code, + Error: err, + } + r.status = status // stop the router if some error happened - // this will notify all watcher goroutines to stop if err != nil && code != Stopped { close(r.exit) } @@ -303,7 +297,14 @@ func (r *router) Advertise() (<-chan *Update, error) { }() r.wg.Add(1) - go r.manageStatus(errChan) + go r.watchError(errChan) + + // mark router as running and set its Error to nil + status := Status{ + Code: Running, + Error: nil, + } + r.status = status } return r.advertChan, nil @@ -338,15 +339,19 @@ func (r *router) Status() Status { // Stop stops the router func (r *router) Stop() error { r.RLock() - defer r.RUnlock() - // only close the channel if the router is running if r.status.Code == Running { // notify all goroutines to finish close(r.exit) - // wait for all goroutines to finish - r.wg.Wait() } + r.RUnlock() + + // drain the advertise channel + for range r.advertChan { + } + + // wait for all goroutines to finish + r.wg.Wait() return nil } diff --git a/network/router/table_watcher.go b/network/router/table_watcher.go index 4e9d9a9e..91411247 100644 --- a/network/router/table_watcher.go +++ b/network/router/table_watcher.go @@ -9,7 +9,7 @@ import ( var ( // ErrWatcherStopped is returned when routing table watcher has been stopped - ErrWatcherStopped = errors.New("routing table watcher stopped") + ErrWatcherStopped = errors.New("watcher stopped") ) // EventType defines routing table event diff --git a/registry/mdns_watcher.go b/registry/mdns_watcher.go index 7ccb6e80..bbcf90ea 100644 --- a/registry/mdns_watcher.go +++ b/registry/mdns_watcher.go @@ -1,7 +1,6 @@ package registry import ( - "errors" "strings" "github.com/micro/mdns" @@ -63,7 +62,7 @@ func (m *mdnsWatcher) Next() (*Result, error) { Service: service, }, nil case <-m.exit: - return nil, errors.New("watcher stopped") + return nil, ErrWatcherStopped } } }