diff --git a/network/router/default.go b/network/router/default.go index 6250608a..5b13b2ae 100644 --- a/network/router/default.go +++ b/network/router/default.go @@ -13,14 +13,16 @@ import ( ) const ( - // AdvertiseTick is time interval in which we advertise route updates - AdvertiseTick = 5 * time.Second + // AdvertiseEventsTick is time interval in which the router advertises route updates + AdvertiseEventsTick = 5 * time.Second + // AdvertiseTableTick is time interval in which router advertises all routes found in routing table + AdvertiseTableTick = 1 * time.Minute // AdvertSuppress is advert suppression threshold AdvertSuppress = 2000 // AdvertRecover is advert recovery threshold AdvertRecover = 750 // DefaultAdvertTTL is default advertisement TTL - DefaultAdvertTTL = time.Minute + DefaultAdvertTTL = 1 * time.Minute // PenaltyDecay is the penalty decay PenaltyDecay = 1.15 // Delete penalises route addition and deletion @@ -78,6 +80,28 @@ func (r *router) Options() Options { return r.opts } +// manageRoute applies route action on the routing table +func (r *router) manageRoute(route table.Route, action string) error { + switch action { + case "create": + if err := r.Create(route); err != nil && err != table.ErrDuplicateRoute { + return fmt.Errorf("failed adding route for service %s: %s", route.Service, err) + } + case "update": + if err := r.Update(route); err != nil && err != table.ErrDuplicateRoute { + return fmt.Errorf("failed updating route for service %s: %s", route.Service, err) + } + case "delete": + if err := r.Delete(route); err != nil && err != table.ErrRouteNotFound { + return fmt.Errorf("failed deleting route for service %s: %s", route.Service, err) + } + default: + return fmt.Errorf("failed to manage route for service %s. Unknown action: %s", route.Service, action) + } + + return nil +} + // manageServiceRoutes manages routes for a given service. // It returns error of the routing table action fails. func (r *router) manageServiceRoutes(service *registry.Service, action string) error { @@ -95,21 +119,8 @@ func (r *router) manageServiceRoutes(service *registry.Service, action string) e Metric: table.DefaultLocalMetric, } - switch action { - case "create": - if err := r.Create(route); err != nil && err != table.ErrDuplicateRoute { - return fmt.Errorf("failed adding route for service %s: %s", service.Name, err) - } - case "update": - if err := r.Update(route); err != nil && err != table.ErrDuplicateRoute { - return fmt.Errorf("failed updating route for service %s: %s", service.Name, err) - } - case "delete": - if err := r.Delete(route); err != nil && err != table.ErrRouteNotFound { - return fmt.Errorf("failed deleting route for service %s: %s", service.Name, err) - } - default: - return fmt.Errorf("failed to manage route for service %s. Unknown action: %s", service.Name, action) + if err := r.manageRoute(route, action); err != nil { + return err } } @@ -132,8 +143,8 @@ func (r *router) manageRegistryRoutes(reg registry.Registry, action string) erro continue } // manage the routes for all returned services - for _, s := range srvs { - if err := r.manageServiceRoutes(s, action); err != nil { + for _, srv := range srvs { + if err := r.manageServiceRoutes(srv, action); err != nil { return err } } @@ -210,12 +221,14 @@ func (r *router) watchTable(w table.Watcher) error { return watchErr } -func (r *router) advertEvents(advType AdvertType, events []*table.Event) { +// advertiseEvents advertises events to event subscribers +func (r *router) advertiseEvents(advType AdvertType, events []*table.Event) { defer r.advertWg.Done() a := &Advert{ Id: r.opts.Id, Type: advType, + TTL: DefaultAdvertTTL, Timestamp: time.Now(), Events: events, } @@ -225,7 +238,45 @@ func (r *router) advertEvents(advType AdvertType, events []*table.Event) { case <-r.exit: return } +} +// advertiseTable advertises the whole routing table to the network +func (r *router) advertiseTable() error { + // create table advertisement ticker + ticker := time.NewTicker(AdvertiseTableTick) + + for { + select { + case <-ticker.C: + // list routing table routes to announce + routes, err := r.List() + if err != nil { + return fmt.Errorf("failed listing routes: %s", err) + } + // collect all the added routes before we attempt to add default gateway + events := make([]*table.Event, len(routes)) + for i, route := range routes { + event := &table.Event{ + Type: table.Update, + Timestamp: time.Now(), + Route: route, + } + events[i] = event + } + + // advertise all routes as Update events to subscribers + if len(events) > 0 { + go func() { + r.advertWg.Add(1) + r.advertiseEvents(Update, events) + }() + } + case <-r.exit: + return nil + } + } + + return nil } // isFlapping detects if the event is flapping based on the current and previous event status. @@ -241,8 +292,8 @@ func isFlapping(curr, prev *table.Event) bool { return false } -// updateEvent is a table event enriched with advertisement data -type updateEvent struct { +// advertEvent is a table event enriched with advertisement data +type advertEvent struct { *table.Event // timestamp marks the time the event has been received timestamp time.Time @@ -258,15 +309,16 @@ type updateEvent struct { // It suppresses unhealthy flapping events and advertises healthy events upstream. func (r *router) processEvents() error { // ticker to periodically scan event for advertising - ticker := time.NewTicker(AdvertiseTick) + ticker := time.NewTicker(AdvertiseEventsTick) // eventMap is a map of advert events - eventMap := make(map[uint64]*updateEvent) + eventMap := make(map[uint64]*advertEvent) for { select { case <-ticker.C: var events []*table.Event // collect all events which are not flapping + // TODO: decay the events and update suppression for key, event := range eventMap { if !event.isFlapping && !event.isSuppressed { e := new(table.Event) @@ -277,9 +329,10 @@ func (r *router) processEvents() error { } } + // advertise all Update events to subscribers if len(events) > 0 { r.advertWg.Add(1) - go r.advertEvents(Update, events) + go r.advertiseEvents(Update, events) } case e := <-r.eventChan: // event timestamp @@ -301,7 +354,7 @@ func (r *router) processEvents() error { hash := e.Route.Hash() event, ok := eventMap[hash] if !ok { - event = &updateEvent{ + event = &advertEvent{ Event: e, penalty: penalty, timestamp: time.Now(), @@ -334,7 +387,7 @@ func (r *router) processEvents() error { } } - // we probably never reach this place + // we probably never reach this code path return nil } @@ -438,7 +491,7 @@ func (r *router) Advertise() (<-chan *Advert, error) { } // error channel collecting goroutine errors - errChan := make(chan error, 3) + errChan := make(chan error, 4) r.wg.Add(1) go func() { @@ -457,24 +510,27 @@ func (r *router) Advertise() (<-chan *Advert, error) { r.wg.Add(1) go func() { defer r.wg.Done() - // listen to routing table events and process them + // watch routing table events and process them errChan <- r.processEvents() }() + r.advertWg.Add(1) + go func() { + defer r.advertWg.Done() + // advertise the whole routing table + errChan <- r.advertiseTable() + }() + + // advertise your presence + r.advertWg.Add(1) + go r.advertiseEvents(Announce, events) + // watch for errors and cleanup r.wg.Add(1) go r.watchErrors(errChan) - // advertise your presence - r.advertWg.Add(1) - go r.advertEvents(Announce, events) - // mark router as running and set its Error to nil - status := Status{ - Code: Running, - Error: nil, - } - r.status = status + r.status = Status{Code: Running, Error: nil} } return r.advertChan, nil @@ -494,8 +550,9 @@ func (r *router) Process(a *Advert) error { for _, event := range events { // create a copy of the route route := event.Route - if err := r.Update(route); err != nil { - return fmt.Errorf("failed updating routing table: %v", err) + action := event.Type + if err := r.manageRoute(route, fmt.Sprintf("%s", action)); err != nil { + return fmt.Errorf("failed applying action %s to routing table: %s", action, err) } } diff --git a/network/router/router.go b/network/router/router.go index 39e8d94a..724f38be 100644 --- a/network/router/router.go +++ b/network/router/router.go @@ -58,8 +58,7 @@ type Advert struct { // Timestamp marks the time when the update is sent Timestamp time.Time // TTL is Advert TTL - // TODO: not used - TTL time.Time + TTL time.Duration // Events is a list of routing table events to advertise Events []*table.Event } diff --git a/network/router/table/watcher.go b/network/router/table/watcher.go index 0f8fab7f..503993ff 100644 --- a/network/router/table/watcher.go +++ b/network/router/table/watcher.go @@ -22,6 +22,22 @@ const ( Update ) +// String implements fmt.Stringer +// NOTE: we need this as this makes converting the numeric codes +// into miro style string actions very simple +func (et EventType) String() string { + switch et { + case Create: + return "create" + case Delete: + return "delete" + case Update: + return "update" + default: + return "unknown" + } +} + // Event is returned by a call to Next on the watcher. type Event struct { // Type defines type of event