diff --git a/network/router/default.go b/network/router/default.go index 5b13b2ae..0d153920 100644 --- a/network/router/default.go +++ b/network/router/default.go @@ -18,17 +18,24 @@ const ( // AdvertiseTableTick is time interval in which router advertises all routes found in routing table AdvertiseTableTick = 1 * time.Minute // AdvertSuppress is advert suppression threshold - AdvertSuppress = 2000 + AdvertSuppress = 2000.0 // AdvertRecover is advert recovery threshold - AdvertRecover = 750 + AdvertRecover = 750.0 // DefaultAdvertTTL is default advertisement TTL DefaultAdvertTTL = 1 * time.Minute - // PenaltyDecay is the penalty decay - PenaltyDecay = 1.15 - // Delete penalises route addition and deletion - Delete = 1000 + // DeletePenalty penalises route deletion + DeletePenalty = 1000.0 // UpdatePenalty penalises route updates - UpdatePenalty = 500 + UpdatePenalty = 500.0 + // PenaltyHalfLife is the time the advert penalty decays to half its value + PenaltyHalfLife = 2.0 + // MaxSuppressTime defines time after which the suppressed advert is deleted + MaxSuppressTime = 5 * time.Minute +) + +var ( + // PenaltyDecay is a coefficient which controls the speed the advert penalty decays + PenaltyDecay = math.Log(2) / PenaltyHalfLife ) // router provides default router implementation @@ -153,9 +160,9 @@ func (r *router) manageRegistryRoutes(reg registry.Registry, action string) erro return nil } -// watchServices watches services in given registry and updates the routing table accordingly. -// It returns error if the service registry watcher stops or if the routing table can't be updated. -func (r *router) watchServices(w registry.Watcher) error { +// watchRegistry watches registry and updates routing table based on the received events. +// It returns error if either the registry watcher fails with error or if the routing table update fails. +func (r *router) watchRegistry(w registry.Watcher) error { // wait in the background for the router to stop // when the router stops, stop the watcher and exit r.wg.Add(1) @@ -266,43 +273,26 @@ func (r *router) advertiseTable() error { // advertise all routes as Update events to subscribers if len(events) > 0 { - go func() { - r.advertWg.Add(1) - r.advertiseEvents(Update, events) - }() + r.advertWg.Add(1) + go r.advertiseEvents(Update, events) } case <-r.exit: return nil } } - - return nil } -// isFlapping detects if the event is flapping based on the current and previous event status. -func isFlapping(curr, prev *table.Event) bool { - if curr.Type == table.Update && prev.Type == table.Update { - return true - } - - if curr.Type == table.Create && prev.Type == table.Delete || curr.Type == table.Delete && prev.Type == table.Create { - return true - } - - return false -} - -// advertEvent is a table event enriched with advertisement data -type advertEvent struct { - *table.Event - // timestamp marks the time the event has been received - timestamp time.Time - // penalty is current event penalty +// routeAdvert contains a list of route events to be advertised +type routeAdvert struct { + events []*table.Event + // lastUpdate records the time of the last advert update + lastUpdate time.Time + // penalty is current advert penalty penalty float64 - // isSuppressed flags if the event should be considered for flap detection + // isSuppressed flags the advert suppression isSuppressed bool - // isFlapping marks the event as flapping event - isFlapping bool + // suppressTime records the time interval the advert has been suppressed for + suppressTime time.Time } // processEvents processes routing table events. @@ -310,22 +300,44 @@ type advertEvent struct { func (r *router) processEvents() error { // ticker to periodically scan event for advertising ticker := time.NewTicker(AdvertiseEventsTick) - // eventMap is a map of advert events - eventMap := make(map[uint64]*advertEvent) + // advertMap is a map of advert events + advertMap := make(map[uint64]*routeAdvert) for { select { case <-ticker.C: var events []*table.Event // collect all events which are not flapping - // TODO: decay the events and update suppression - for key, event := range eventMap { - if !event.isFlapping && !event.isSuppressed { - e := new(table.Event) - *e = *event.Event - events = append(events, e) - // this deletes the advertised event from the map - delete(eventMap, key) + for key, advert := range advertMap { + // decay the event penalty + delta := time.Since(advert.lastUpdate).Seconds() + advert.penalty = advert.penalty * math.Exp(-delta*PenaltyDecay) + + // suppress/recover the event based on its penalty level + switch { + case advert.penalty > AdvertSuppress && !advert.isSuppressed: + advert.isSuppressed = true + advert.suppressTime = time.Now() + case advert.penalty < AdvertRecover && advert.isSuppressed: + advert.isSuppressed = false + } + + // max suppression time threshold has been reached, delete the advert + if advert.isSuppressed { + if time.Since(advert.suppressTime) > MaxSuppressTime { + delete(advertMap, key) + continue + } + } + + if !advert.isSuppressed { + for _, event := range advert.events { + e := new(table.Event) + *e = *event + events = append(events, e) + // delete the advert from the advertMap + delete(advertMap, key) + } } } @@ -335,8 +347,6 @@ func (r *router) processEvents() error { go r.advertiseEvents(Update, events) } case e := <-r.eventChan: - // event timestamp - now := time.Now() // if event is nil, continue if e == nil { continue @@ -348,36 +358,36 @@ func (r *router) processEvents() error { case table.Update: penalty = UpdatePenalty case table.Delete: - penalty = Delete + penalty = DeletePenalty } - // we use route hash as eventMap key + + // check if we have already registered the route + // we use the route hash as advertMap key hash := e.Route.Hash() - event, ok := eventMap[hash] + advert, ok := advertMap[hash] if !ok { - event = &advertEvent{ - Event: e, - penalty: penalty, - timestamp: time.Now(), + events := []*table.Event{e} + advert = &routeAdvert{ + events: events, + penalty: penalty, + lastUpdate: time.Now(), } - eventMap[hash] = event + advertMap[hash] = advert continue } - // update penalty for existing event: decay existing and add new penalty - delta := time.Since(event.timestamp).Seconds() - event.penalty = event.penalty*math.Exp(-delta) + penalty - event.timestamp = now - // suppress or recover the event based on its current penalty - if !event.isSuppressed && event.penalty > AdvertSuppress { - event.isSuppressed = true - } else if event.penalty < AdvertRecover { - event.isSuppressed = false - } - // if not suppressed decide if if its flapping - if !event.isSuppressed { - // detect if its flapping by comparing current and previous event - event.isFlapping = isFlapping(e, event.Event) + // attempt to squash last two events if possible + lastEvent := advert.events[len(advert.events)-1] + if lastEvent.Type == e.Type { + advert.events[len(advert.events)-1] = e + } else { + advert.events = append(advert.events, e) } + + // update event penalty and recorded timestamp + advert.lastUpdate = time.Now() + advert.penalty += penalty + case <-r.exit: // first wait for the advertiser to finish r.advertWg.Wait() @@ -386,10 +396,6 @@ func (r *router) processEvents() error { return nil } } - - // we probably never reach this code path - - return nil } // watchErrors watches router errors and takes appropriate actions @@ -484,8 +490,9 @@ func (r *router) Advertise() (<-chan *Advert, error) { if err != nil { return nil, fmt.Errorf("failed creating routing table watcher: %v", err) } - // service registry watcher - svcWatcher, err := r.opts.Registry.Watch() + + // registry watcher + regWatcher, err := r.opts.Registry.Watch() if err != nil { return nil, fmt.Errorf("failed creating service registry watcher: %v", err) } @@ -497,7 +504,7 @@ func (r *router) Advertise() (<-chan *Advert, error) { go func() { defer r.wg.Done() // watch local registry and register routes in routine table - errChan <- r.watchServices(svcWatcher) + errChan <- r.watchRegistry(regWatcher) }() r.wg.Add(1) @@ -594,5 +601,5 @@ func (r *router) Stop() error { // String prints debugging information about router func (r *router) String() string { - return "router" + return "default" } diff --git a/network/router/table/default.go b/network/router/table/default.go index 09aba529..bce75935 100644 --- a/network/router/table/default.go +++ b/network/router/table/default.go @@ -7,14 +7,14 @@ import ( "github.com/google/uuid" ) -// TableOptions specify routing table options +// Options specify routing table options // TODO: table options TBD in the future -type TableOptions struct{} +type Options struct{} // table is an in memory routing table type table struct { // opts are table options - opts TableOptions + opts Options // m stores routing table map m map[string]map[uint64]Route // w is a list of table watchers @@ -23,9 +23,9 @@ type table struct { } // newTable creates a new routing table and returns it -func newTable(opts ...TableOption) Table { +func newTable(opts ...Option) Table { // default options - var options TableOptions + var options Options // apply requested options for _, o := range opts { @@ -40,7 +40,7 @@ func newTable(opts ...TableOption) Table { } // Init initializes routing table with options -func (t *table) Init(opts ...TableOption) error { +func (t *table) Init(opts ...Option) error { for _, o := range opts { o(&t.opts) } @@ -48,7 +48,7 @@ func (t *table) Init(opts ...TableOption) error { } // Options returns routing table options -func (t *table) Options() TableOptions { +func (t *table) Options() Options { return t.opts } @@ -219,7 +219,7 @@ func (t *table) Size() int { defer t.RUnlock() size := 0 - for dest, _ := range t.m { + for dest := range t.m { size += len(t.m[dest]) } @@ -227,6 +227,6 @@ func (t *table) Size() int { } // String returns debug information -func (t table) String() string { - return "table" +func (t *table) String() string { + return "default" } diff --git a/network/router/table/default_test.go b/network/router/table/default_test.go index b4238336..275da8c2 100644 --- a/network/router/table/default_test.go +++ b/network/router/table/default_test.go @@ -23,7 +23,7 @@ func TestCreate(t *testing.T) { if err := table.Create(route); err != nil { t.Errorf("error adding route: %s", err) } - testTableSize += 1 + testTableSize++ // adds new route for the original destination route.Gateway = "dest.gw2" @@ -31,7 +31,7 @@ func TestCreate(t *testing.T) { if err := table.Create(route); err != nil { t.Errorf("error adding route: %s", err) } - testTableSize += 1 + testTableSize++ if table.Size() != testTableSize { t.Errorf("invalid number of routes. Expected: %d, found: %d", testTableSize, table.Size()) @@ -50,7 +50,7 @@ func TestDelete(t *testing.T) { if err := table.Create(route); err != nil { t.Errorf("error adding route: %s", err) } - testTableSize += 1 + testTableSize++ // should fail to delete non-existant route prevSvc := route.Service @@ -66,7 +66,7 @@ func TestDelete(t *testing.T) { if err := table.Delete(route); err != nil { t.Errorf("error deleting route: %s", err) } - testTableSize -= 1 + testTableSize-- if table.Size() != testTableSize { t.Errorf("invalid number of routes. Expected: %d, found: %d", testTableSize, table.Size()) @@ -80,7 +80,7 @@ func TestUpdate(t *testing.T) { if err := table.Create(route); err != nil { t.Errorf("error adding route: %s", err) } - testTableSize += 1 + testTableSize++ // change the metric of the original route route.Metric = 200 @@ -100,7 +100,7 @@ func TestUpdate(t *testing.T) { if err := table.Update(route); err != nil { t.Errorf("error updating route: %s", err) } - testTableSize += 1 + testTableSize++ if table.Size() != testTableSize { t.Errorf("invalid number of routes. Expected: %d, found: %d", testTableSize, table.Size()) diff --git a/network/router/table/table.go b/network/router/table/table.go index c549617c..2836e3d6 100644 --- a/network/router/table/table.go +++ b/network/router/table/table.go @@ -29,10 +29,10 @@ type Table interface { Size() int } -// TableOption used by the routing table -type TableOption func(*TableOptions) +// Option used by the routing table +type Option func(*Options) // NewTable creates new routing table and returns it -func NewTable(opts ...TableOption) Table { +func NewTable(opts ...Option) Table { return newTable(opts...) }