From 92495d22dbba7715bb818cd9cd40f393f2b34f21 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Tue, 16 Jul 2019 19:00:25 +0100 Subject: [PATCH 1/3] Fixes advert dampening behaviour. This commit adds the following changes: * advert now stores a list of route events as opposed to just last one * attempt to dedup route events before appending them to advert * have max suppress threshold for long time suppressed adverts * decaying events on every advert tick Originally we werent decaying penalties on every advert tick. That was incorrect behaviour. Furthermore some events would end up being accumulated potentially causing memory leaks. We were also overriding the last received router event which was causing incorrect sequence of events to be applied when received by a receiver: Create, Delete would be "squashed" into Delete only which would be nonsensical since the Create event would never be delivered hence we would be deleting nonexistent routes. Not Decaying the events on every tick or not having the max suppression threshold could lead to DoS by growing the router memory infinitely. --- network/router/default.go | 155 +++++++++++++++++++++----------------- 1 file changed, 85 insertions(+), 70 deletions(-) diff --git a/network/router/default.go b/network/router/default.go index 5b13b2ae..58050fdb 100644 --- a/network/router/default.go +++ b/network/router/default.go @@ -18,17 +18,24 @@ const ( // AdvertiseTableTick is time interval in which router advertises all routes found in routing table AdvertiseTableTick = 1 * time.Minute // AdvertSuppress is advert suppression threshold - AdvertSuppress = 2000 + AdvertSuppress = 2000.0 // AdvertRecover is advert recovery threshold - AdvertRecover = 750 + AdvertRecover = 750.0 // DefaultAdvertTTL is default advertisement TTL DefaultAdvertTTL = 1 * time.Minute - // PenaltyDecay is the penalty decay - PenaltyDecay = 1.15 - // Delete penalises route addition and deletion - Delete = 1000 + // DeletePenalty penalises route deletion + DeletePenalty = 1000.0 // UpdatePenalty penalises route updates - UpdatePenalty = 500 + UpdatePenalty = 500.0 + // PenaltyHalfLife is the time the advert penalty decays to half its value + PenaltyHalfLife = 2.0 + // MaxSuppressTime defines time after which the suppressed advert is deleted + MaxSuppressTime = 5 * time.Minute +) + +var ( + // PenaltyDecay is a coefficient which controls the speed the advert penalty decays + PenaltyDecay = math.Log(2) / PenaltyHalfLife ) // router provides default router implementation @@ -153,9 +160,9 @@ func (r *router) manageRegistryRoutes(reg registry.Registry, action string) erro return nil } -// watchServices watches services in given registry and updates the routing table accordingly. -// It returns error if the service registry watcher stops or if the routing table can't be updated. -func (r *router) watchServices(w registry.Watcher) error { +// watchRegistry watches sregistry and updates the routing table. +// It returns error if either the registry watcher fails with error or if the routing table update fails. +func (r *router) watchRegistry(w registry.Watcher) error { // wait in the background for the router to stop // when the router stops, stop the watcher and exit r.wg.Add(1) @@ -279,30 +286,17 @@ func (r *router) advertiseTable() error { return nil } -// isFlapping detects if the event is flapping based on the current and previous event status. -func isFlapping(curr, prev *table.Event) bool { - if curr.Type == table.Update && prev.Type == table.Update { - return true - } - - if curr.Type == table.Create && prev.Type == table.Delete || curr.Type == table.Delete && prev.Type == table.Create { - return true - } - - return false -} - -// advertEvent is a table event enriched with advertisement data -type advertEvent struct { - *table.Event - // timestamp marks the time the event has been received - timestamp time.Time - // penalty is current event penalty +// routeAdvert contains a list of route events to be advertised +type routeAdvert struct { + events []*table.Event + // lastUpdate records the time of the last advert update + lastUpdate time.Time + // penalty is current advert penalty penalty float64 - // isSuppressed flags if the event should be considered for flap detection + // isSuppressed flags the advert suppression isSuppressed bool - // isFlapping marks the event as flapping event - isFlapping bool + // suppressTime records the time interval the advert has been suppressed for + suppressTime time.Time } // processEvents processes routing table events. @@ -310,22 +304,44 @@ type advertEvent struct { func (r *router) processEvents() error { // ticker to periodically scan event for advertising ticker := time.NewTicker(AdvertiseEventsTick) - // eventMap is a map of advert events - eventMap := make(map[uint64]*advertEvent) + // advertMap is a map of advert events + advertMap := make(map[uint64]*routeAdvert) for { select { case <-ticker.C: var events []*table.Event // collect all events which are not flapping - // TODO: decay the events and update suppression - for key, event := range eventMap { - if !event.isFlapping && !event.isSuppressed { - e := new(table.Event) - *e = *event.Event - events = append(events, e) - // this deletes the advertised event from the map - delete(eventMap, key) + for key, advert := range advertMap { + // decay the event penalty + delta := time.Since(advert.lastUpdate).Seconds() + advert.penalty = advert.penalty * math.Exp(-delta*PenaltyDecay) + + // suppress/recover the event based on its penalty level + switch { + case advert.penalty > AdvertSuppress && !advert.isSuppressed: + advert.isSuppressed = true + advert.suppressTime = time.Now() + case advert.penalty < AdvertRecover && advert.isSuppressed: + advert.isSuppressed = false + } + + // max suppression time threshold has been reached, delete the advert + if advert.isSuppressed { + if time.Since(advert.suppressTime) > MaxSuppressTime { + delete(advertMap, key) + continue + } + } + + if !advert.isSuppressed { + for _, event := range advert.events { + e := new(table.Event) + *e = *event + events = append(events, e) + // delete the advert from the advertMap + delete(advertMap, key) + } } } @@ -335,8 +351,6 @@ func (r *router) processEvents() error { go r.advertiseEvents(Update, events) } case e := <-r.eventChan: - // event timestamp - now := time.Now() // if event is nil, continue if e == nil { continue @@ -348,36 +362,36 @@ func (r *router) processEvents() error { case table.Update: penalty = UpdatePenalty case table.Delete: - penalty = Delete + penalty = DeletePenalty } - // we use route hash as eventMap key + + // check if we have already registered the route + // we use the route hash as advertMap key hash := e.Route.Hash() - event, ok := eventMap[hash] + advert, ok := advertMap[hash] if !ok { - event = &advertEvent{ - Event: e, - penalty: penalty, - timestamp: time.Now(), + events := []*table.Event{e} + advert = &routeAdvert{ + events: events, + penalty: penalty, + lastUpdate: time.Now(), } - eventMap[hash] = event + advertMap[hash] = advert continue } - // update penalty for existing event: decay existing and add new penalty - delta := time.Since(event.timestamp).Seconds() - event.penalty = event.penalty*math.Exp(-delta) + penalty - event.timestamp = now - // suppress or recover the event based on its current penalty - if !event.isSuppressed && event.penalty > AdvertSuppress { - event.isSuppressed = true - } else if event.penalty < AdvertRecover { - event.isSuppressed = false - } - // if not suppressed decide if if its flapping - if !event.isSuppressed { - // detect if its flapping by comparing current and previous event - event.isFlapping = isFlapping(e, event.Event) + // attempt to squash last two events if possible + lastEvent := advert.events[len(advert.events)-1] + if lastEvent.Type == e.Type { + advert.events[len(advert.events)-1] = e + } else { + advert.events = append(advert.events, e) } + + // update event penalty and recorded timestamp + advert.lastUpdate = time.Now() + advert.penalty += penalty + case <-r.exit: // first wait for the advertiser to finish r.advertWg.Wait() @@ -484,8 +498,9 @@ func (r *router) Advertise() (<-chan *Advert, error) { if err != nil { return nil, fmt.Errorf("failed creating routing table watcher: %v", err) } - // service registry watcher - svcWatcher, err := r.opts.Registry.Watch() + + // registry watcher + regWatcher, err := r.opts.Registry.Watch() if err != nil { return nil, fmt.Errorf("failed creating service registry watcher: %v", err) } @@ -497,7 +512,7 @@ func (r *router) Advertise() (<-chan *Advert, error) { go func() { defer r.wg.Done() // watch local registry and register routes in routine table - errChan <- r.watchServices(svcWatcher) + errChan <- r.watchRegistry(regWatcher) }() r.wg.Add(1) @@ -594,5 +609,5 @@ func (r *router) Stop() error { // String prints debugging information about router func (r *router) String() string { - return "router" + return "default router" } From 280314667382b9c3395db9af3e2901c7369d84e1 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Wed, 17 Jul 2019 00:06:11 +0100 Subject: [PATCH 2/3] Renaming rampage Addressing the comments in #591, router.String() now returns "default" Furthermore, a tonne of other renaming has been included in this commit as a result of running go vet ./... inside the router package. --- network/router/default.go | 10 ++-------- network/router/table/default.go | 20 ++++++++++---------- network/router/table/default_test.go | 12 ++++++------ network/router/table/table.go | 6 +++--- 4 files changed, 21 insertions(+), 27 deletions(-) diff --git a/network/router/default.go b/network/router/default.go index 58050fdb..8af78ef3 100644 --- a/network/router/default.go +++ b/network/router/default.go @@ -160,7 +160,7 @@ func (r *router) manageRegistryRoutes(reg registry.Registry, action string) erro return nil } -// watchRegistry watches sregistry and updates the routing table. +// watchRegistry watches registry and updates routing table based on the received events. // It returns error if either the registry watcher fails with error or if the routing table update fails. func (r *router) watchRegistry(w registry.Watcher) error { // wait in the background for the router to stop @@ -282,8 +282,6 @@ func (r *router) advertiseTable() error { return nil } } - - return nil } // routeAdvert contains a list of route events to be advertised @@ -400,10 +398,6 @@ func (r *router) processEvents() error { return nil } } - - // we probably never reach this code path - - return nil } // watchErrors watches router errors and takes appropriate actions @@ -609,5 +603,5 @@ func (r *router) Stop() error { // String prints debugging information about router func (r *router) String() string { - return "default router" + return "default" } diff --git a/network/router/table/default.go b/network/router/table/default.go index 09aba529..bce75935 100644 --- a/network/router/table/default.go +++ b/network/router/table/default.go @@ -7,14 +7,14 @@ import ( "github.com/google/uuid" ) -// TableOptions specify routing table options +// Options specify routing table options // TODO: table options TBD in the future -type TableOptions struct{} +type Options struct{} // table is an in memory routing table type table struct { // opts are table options - opts TableOptions + opts Options // m stores routing table map m map[string]map[uint64]Route // w is a list of table watchers @@ -23,9 +23,9 @@ type table struct { } // newTable creates a new routing table and returns it -func newTable(opts ...TableOption) Table { +func newTable(opts ...Option) Table { // default options - var options TableOptions + var options Options // apply requested options for _, o := range opts { @@ -40,7 +40,7 @@ func newTable(opts ...TableOption) Table { } // Init initializes routing table with options -func (t *table) Init(opts ...TableOption) error { +func (t *table) Init(opts ...Option) error { for _, o := range opts { o(&t.opts) } @@ -48,7 +48,7 @@ func (t *table) Init(opts ...TableOption) error { } // Options returns routing table options -func (t *table) Options() TableOptions { +func (t *table) Options() Options { return t.opts } @@ -219,7 +219,7 @@ func (t *table) Size() int { defer t.RUnlock() size := 0 - for dest, _ := range t.m { + for dest := range t.m { size += len(t.m[dest]) } @@ -227,6 +227,6 @@ func (t *table) Size() int { } // String returns debug information -func (t table) String() string { - return "table" +func (t *table) String() string { + return "default" } diff --git a/network/router/table/default_test.go b/network/router/table/default_test.go index b4238336..275da8c2 100644 --- a/network/router/table/default_test.go +++ b/network/router/table/default_test.go @@ -23,7 +23,7 @@ func TestCreate(t *testing.T) { if err := table.Create(route); err != nil { t.Errorf("error adding route: %s", err) } - testTableSize += 1 + testTableSize++ // adds new route for the original destination route.Gateway = "dest.gw2" @@ -31,7 +31,7 @@ func TestCreate(t *testing.T) { if err := table.Create(route); err != nil { t.Errorf("error adding route: %s", err) } - testTableSize += 1 + testTableSize++ if table.Size() != testTableSize { t.Errorf("invalid number of routes. Expected: %d, found: %d", testTableSize, table.Size()) @@ -50,7 +50,7 @@ func TestDelete(t *testing.T) { if err := table.Create(route); err != nil { t.Errorf("error adding route: %s", err) } - testTableSize += 1 + testTableSize++ // should fail to delete non-existant route prevSvc := route.Service @@ -66,7 +66,7 @@ func TestDelete(t *testing.T) { if err := table.Delete(route); err != nil { t.Errorf("error deleting route: %s", err) } - testTableSize -= 1 + testTableSize-- if table.Size() != testTableSize { t.Errorf("invalid number of routes. Expected: %d, found: %d", testTableSize, table.Size()) @@ -80,7 +80,7 @@ func TestUpdate(t *testing.T) { if err := table.Create(route); err != nil { t.Errorf("error adding route: %s", err) } - testTableSize += 1 + testTableSize++ // change the metric of the original route route.Metric = 200 @@ -100,7 +100,7 @@ func TestUpdate(t *testing.T) { if err := table.Update(route); err != nil { t.Errorf("error updating route: %s", err) } - testTableSize += 1 + testTableSize++ if table.Size() != testTableSize { t.Errorf("invalid number of routes. Expected: %d, found: %d", testTableSize, table.Size()) diff --git a/network/router/table/table.go b/network/router/table/table.go index c549617c..2836e3d6 100644 --- a/network/router/table/table.go +++ b/network/router/table/table.go @@ -29,10 +29,10 @@ type Table interface { Size() int } -// TableOption used by the routing table -type TableOption func(*TableOptions) +// Option used by the routing table +type Option func(*Options) // NewTable creates new routing table and returns it -func NewTable(opts ...TableOption) Table { +func NewTable(opts ...Option) Table { return newTable(opts...) } From 94b6455577b41c2de739d0f585133a51df9a8603 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Wed, 17 Jul 2019 13:02:47 +0100 Subject: [PATCH 3/3] Increment WaitGroup before launching advertiseEvents goroutine --- network/router/default.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/network/router/default.go b/network/router/default.go index 8af78ef3..0d153920 100644 --- a/network/router/default.go +++ b/network/router/default.go @@ -273,10 +273,8 @@ func (r *router) advertiseTable() error { // advertise all routes as Update events to subscribers if len(events) > 0 { - go func() { - r.advertWg.Add(1) - r.advertiseEvents(Update, events) - }() + r.advertWg.Add(1) + go r.advertiseEvents(Update, events) } case <-r.exit: return nil