Flap detection vol2 (#915)

* We now purge flapping routes before regular tick processes them

* Updated comments

* Record the timestamp as soon as you receive the event

* Set route Address to routing table test

* Fixed a bunch of deadlocks. Added basic Router tests.
This commit is contained in:
Milos Gajdos 2019-11-05 17:44:24 +00:00 committed by Asim Aslam
parent f67c5e779f
commit b84134581c
3 changed files with 254 additions and 81 deletions

View File

@ -13,28 +13,25 @@ import (
"github.com/micro/go-micro/util/log" "github.com/micro/go-micro/util/log"
) )
const ( var (
// AdvertiseEventsTick is time interval in which the router advertises route updates // AdvertiseEventsTick is time interval in which the router advertises route updates
AdvertiseEventsTick = 5 * time.Second AdvertiseEventsTick = 5 * time.Second
// AdvertiseTableTick is time interval in which router advertises all routes found in routing table // AdvertiseTableTick is time interval in which router advertises all routes found in routing table
AdvertiseTableTick = 1 * time.Minute AdvertiseTableTick = 1 * time.Minute
// AdvertiseFlushTick is time the yet unconsumed advertisements are flush i.e. discarded // AdvertiseFlushTick is time the yet unconsumed advertisements are flush i.e. discarded
AdvertiseFlushTick = 15 * time.Second AdvertiseFlushTick = 15 * time.Second
// DefaultAdvertTTL is default advertisement TTL
DefaultAdvertTTL = 1 * time.Minute
// AdvertSuppress is advert suppression threshold // AdvertSuppress is advert suppression threshold
AdvertSuppress = 200.0 AdvertSuppress = 200.0
// AdvertRecover is advert recovery threshold // AdvertRecover is advert recovery threshold
AdvertRecover = 120.0 AdvertRecover = 120.0
// DefaultAdvertTTL is default advertisement TTL
DefaultAdvertTTL = 1 * time.Minute
// Penalty for routes processed multiple times // Penalty for routes processed multiple times
Penalty = 100.0 Penalty = 100.0
// PenaltyHalfLife is the time the advert penalty decays to half its value // PenaltyHalfLife is the time the advert penalty decays to half its value
PenaltyHalfLife = 30.0 PenaltyHalfLife = 30.0
// MaxSuppressTime defines time after which the suppressed advert is deleted // MaxSuppressTime defines time after which the suppressed advert is deleted
MaxSuppressTime = 5 * time.Minute MaxSuppressTime = 5 * time.Minute
)
var (
// PenaltyDecay is a coefficient which controls the speed the advert penalty decays // PenaltyDecay is a coefficient which controls the speed the advert penalty decays
PenaltyDecay = math.Log(2) / PenaltyHalfLife PenaltyDecay = math.Log(2) / PenaltyHalfLife
) )
@ -52,6 +49,7 @@ type router struct {
wg *sync.WaitGroup wg *sync.WaitGroup
// advert subscribers // advert subscribers
sub sync.RWMutex
subscribers map[string]chan *Advert subscribers map[string]chan *Advert
} }
@ -189,7 +187,6 @@ func (r *router) watchRegistry(w registry.Watcher) error {
defer func() { defer func() {
// close the exit channel when the go routine finishes // close the exit channel when the go routine finishes
close(exit) close(exit)
r.wg.Done()
}() }()
// wait in the background for the router to stop // wait in the background for the router to stop
@ -197,6 +194,7 @@ func (r *router) watchRegistry(w registry.Watcher) error {
r.wg.Add(1) r.wg.Add(1)
go func() { go func() {
defer w.Stop() defer w.Stop()
defer r.wg.Done()
select { select {
case <-r.exit: case <-r.exit:
@ -233,7 +231,6 @@ func (r *router) watchTable(w Watcher) error {
defer func() { defer func() {
// close the exit channel when the go routine finishes // close the exit channel when the go routine finishes
close(exit) close(exit)
r.wg.Done()
}() }()
// wait in the background for the router to stop // wait in the background for the router to stop
@ -241,6 +238,7 @@ func (r *router) watchTable(w Watcher) error {
r.wg.Add(1) r.wg.Add(1)
go func() { go func() {
defer w.Stop() defer w.Stop()
defer r.wg.Done()
select { select {
case <-r.exit: case <-r.exit:
@ -276,10 +274,7 @@ func (r *router) watchTable(w Watcher) error {
} }
// publishAdvert publishes router advert to advert channel // publishAdvert publishes router advert to advert channel
// NOTE: this might cease to be a dedicated method in the future
func (r *router) publishAdvert(advType AdvertType, events []*Event) { func (r *router) publishAdvert(advType AdvertType, events []*Event) {
defer r.advertWg.Done()
a := &Advert{ a := &Advert{
Id: r.options.Id, Id: r.options.Id,
Type: advType, Type: advType,
@ -288,24 +283,17 @@ func (r *router) publishAdvert(advType AdvertType, events []*Event) {
Events: events, Events: events,
} }
log.Debugf("Router publishing advert; %+v", a) r.sub.RLock()
r.RLock()
for _, sub := range r.subscribers { for _, sub := range r.subscribers {
// check the exit chan first
select {
case <-r.exit:
r.RUnlock()
return
default:
}
// now send the message // now send the message
select { select {
case sub <- a: case sub <- a:
default: case <-r.exit:
r.sub.RUnlock()
return
} }
} }
r.RUnlock() r.sub.RUnlock()
} }
// advertiseTable advertises the whole routing table to the network // advertiseTable advertises the whole routing table to the network
@ -327,7 +315,10 @@ func (r *router) advertiseTable() error {
if len(events) > 0 { if len(events) > 0 {
log.Debugf("Router flushing table with %d events: %s", len(events), r.options.Id) log.Debugf("Router flushing table with %d events: %s", len(events), r.options.Id)
r.advertWg.Add(1) r.advertWg.Add(1)
go r.publishAdvert(RouteUpdate, events) go func() {
defer r.advertWg.Done()
r.publishAdvert(RouteUpdate, events)
}()
} }
case <-r.exit: case <-r.exit:
return nil return nil
@ -335,12 +326,12 @@ func (r *router) advertiseTable() error {
} }
} }
// routeAdvert contains a route event to be advertised // advert contains a route event to be advertised
type routeAdvert struct { type advert struct {
// event received from routing table // event received from routing table
event *Event event *Event
// lastUpdate records the time of the last advert update // lastSeen records the time of the last advert update
lastUpdate time.Time lastSeen time.Time
// penalty is current advert penalty // penalty is current advert penalty
penalty float64 penalty float64
// isSuppressed flags the advert suppression // isSuppressed flags the advert suppression
@ -349,6 +340,51 @@ type routeAdvert struct {
suppressTime time.Time suppressTime time.Time
} }
// adverts maintains a map of router adverts
type adverts map[uint64]*advert
// process processes advert
// It updates advert timestamp, increments its penalty and
// marks upresses or recovers it if it reaches configured thresholds
func (m adverts) process(a *advert) error {
// lookup advert in adverts
hash := a.event.Route.Hash()
a, ok := m[hash]
if !ok {
return fmt.Errorf("advert not found")
}
// decay the event penalty
delta := time.Since(a.lastSeen).Seconds()
// decay advert penalty
a.penalty = a.penalty * math.Exp(-delta*PenaltyDecay)
service := a.event.Route.Service
address := a.event.Route.Address
// suppress/recover the event based on its penalty level
switch {
case a.penalty > AdvertSuppress && !a.isSuppressed:
log.Debugf("Router suppressing advert %d %.2f for route %s %s", hash, a.penalty, service, address)
a.isSuppressed = true
a.suppressTime = time.Now()
case a.penalty < AdvertRecover && a.isSuppressed:
log.Debugf("Router recovering advert %d %.2f for route %s %s", hash, a.penalty, service, address)
a.isSuppressed = false
}
// if suppressed, checked how long has it been suppressed for
if a.isSuppressed {
// max suppression time threshold has been reached, delete the advert
if time.Since(a.suppressTime) > MaxSuppressTime {
delete(m, hash)
return nil
}
}
return nil
}
// advertiseEvents advertises routing table events // advertiseEvents advertises routing table events
// It suppresses unhealthy flapping events and advertises healthy events upstream. // It suppresses unhealthy flapping events and advertises healthy events upstream.
func (r *router) advertiseEvents() error { func (r *router) advertiseEvents() error {
@ -356,8 +392,8 @@ func (r *router) advertiseEvents() error {
ticker := time.NewTicker(AdvertiseEventsTick) ticker := time.NewTicker(AdvertiseEventsTick)
defer ticker.Stop() defer ticker.Stop()
// advertMap is a map of advert events // adverts is a map of advert events
advertMap := make(map[uint64]*routeAdvert) adverts := make(adverts)
// routing table watcher // routing table watcher
tableWatcher, err := r.Watch() tableWatcher, err := r.Watch()
@ -379,82 +415,74 @@ func (r *router) advertiseEvents() error {
case <-ticker.C: case <-ticker.C:
var events []*Event var events []*Event
// collect all events which are not flapping // collect all events which are not flapping
for key, advert := range advertMap { for key, advert := range adverts {
// decay the event penalty // process the advert
delta := time.Since(advert.lastUpdate).Seconds() if err := adverts.process(advert); err != nil {
advert.penalty = advert.penalty * math.Exp(-delta*PenaltyDecay) log.Debugf("Router failed processing advert %d: %v", key, err)
service := advert.event.Route.Service continue
address := advert.event.Route.Address
// suppress/recover the event based on its penalty level
switch {
case advert.penalty > AdvertSuppress && !advert.isSuppressed:
log.Debugf("Router suppressing advert %d %.2f for route %s %s", key, advert.penalty, service, address)
advert.isSuppressed = true
advert.suppressTime = time.Now()
case advert.penalty < AdvertRecover && advert.isSuppressed:
log.Debugf("Router recovering advert %d %.2f for route %s %s", key, advert.penalty, service, address)
advert.isSuppressed = false
} }
// if suppressed go to the next advert
if advert.isSuppressed { if advert.isSuppressed {
// max suppression time threshold has been reached, delete the advert
if time.Since(advert.suppressTime) > MaxSuppressTime {
delete(advertMap, key)
continue
}
// process next advert
continue continue
} }
// copy the event and append // copy the event and append
e := new(Event) e := new(Event)
// this is ok, because router.Event only contains builtin types // this is ok, because router.Event only contains builtin types
// and no references so this creates a deep struct copy of Event // and no references so this creates a deep copy of struct Event
*e = *(advert.event) *e = *(advert.event)
events = append(events, e) events = append(events, e)
// delete the advert from the advertMap // delete the advert from adverts
delete(advertMap, key) delete(adverts, key)
} }
// advertise all Update events to subscribers // advertise events to subscribers
if len(events) > 0 { if len(events) > 0 {
r.advertWg.Add(1)
log.Debugf("Router publishing %d events", len(events)) log.Debugf("Router publishing %d events", len(events))
go r.publishAdvert(RouteUpdate, events) r.advertWg.Add(1)
go func() {
defer r.advertWg.Done()
r.publishAdvert(RouteUpdate, events)
}()
} }
case e := <-r.eventChan: case e := <-r.eventChan:
// if event is nil, continue // if event is nil, continue
if e == nil { if e == nil {
continue continue
} }
log.Debugf("Router processing table event %s for service %s %s", e.Type, e.Route.Service, e.Route.Address) now := time.Now()
// determine the event penalty log.Debugf("Router processing table event %s for service %s %s", e.Type, e.Route.Service, e.Route.Address)
// TODO: should there be any difference in penalty for different event types
penalty := Penalty
// check if we have already registered the route // check if we have already registered the route
hash := e.Route.Hash() hash := e.Route.Hash()
advert, ok := advertMap[hash] a, ok := adverts[hash]
if !ok { if !ok {
advert = &routeAdvert{ a = &advert{
event: e, event: e,
penalty: penalty, penalty: Penalty,
lastUpdate: time.Now(), lastSeen: now,
} }
advertMap[hash] = advert adverts[hash] = a
continue continue
} }
// override the route event only if the last event was different // override the route event only if the previous event was different
if advert.event.Type != e.Type { if a.event.Type != e.Type {
advert.event = e a.event = e
}
// process the advert
if err := adverts.process(a); err != nil {
log.Debugf("Router error processing advert %d: %v", hash, err)
continue
} }
// update event penalty and timestamp // update event penalty and timestamp
advert.lastUpdate = time.Now() a.lastSeen = now
advert.penalty += penalty // increment the penalty
log.Debugf("Router advert %d for route %s %s event penalty: %f", hash, advert.event.Route.Service, advert.event.Route.Address, advert.penalty) a.penalty += Penalty
log.Debugf("Router advert %d for route %s %s event penalty: %f", hash, a.event.Route.Service, a.event.Route.Address, a.penalty)
case <-r.exit: case <-r.exit:
// first wait for the advertiser to finish // first wait for the advertiser to finish
r.advertWg.Wait() r.advertWg.Wait()
@ -474,6 +502,7 @@ func (r *router) close() {
for range r.eventChan { for range r.eventChan {
} }
r.sub.RLock()
// close advert subscribers // close advert subscribers
for id, sub := range r.subscribers { for id, sub := range r.subscribers {
// close the channel // close the channel
@ -482,6 +511,7 @@ func (r *router) close() {
// delete the subscriber // delete the subscriber
delete(r.subscribers, id) delete(r.subscribers, id)
} }
r.sub.RUnlock()
} }
// mark the router as Stopped and set its Error to nil // mark the router as Stopped and set its Error to nil
@ -603,9 +633,16 @@ func (r *router) Advertise() (<-chan *Advert, error) {
// create event channels // create event channels
r.eventChan = make(chan *Event) r.eventChan = make(chan *Event)
// create advert channel
advertChan := make(chan *Advert, 128)
r.subscribers[uuid.New().String()] = advertChan
// advertise your presence // advertise your presence
r.advertWg.Add(1) r.advertWg.Add(1)
go r.publishAdvert(Announce, events) go func() {
defer r.advertWg.Done()
r.publishAdvert(Announce, events)
}()
r.wg.Add(1) r.wg.Add(1)
go func() { go func() {
@ -629,10 +666,7 @@ func (r *router) Advertise() (<-chan *Advert, error) {
// mark router as Running and set its Error to nil // mark router as Running and set its Error to nil
r.status = Status{Code: Advertising, Error: nil} r.status = Status{Code: Advertising, Error: nil}
// create advert channel log.Debugf("Router starting to advertise")
advertChan := make(chan *Advert, 128)
r.subscribers[uuid.New().String()] = advertChan
return advertChan, nil return advertChan, nil
case Stopped: case Stopped:
return nil, fmt.Errorf("not running") return nil, fmt.Errorf("not running")
@ -747,7 +781,10 @@ func (r *router) Solicit() error {
// advertise the routes // advertise the routes
r.advertWg.Add(1) r.advertWg.Add(1)
go r.publishAdvert(RouteUpdate, events) go func() {
defer r.advertWg.Done()
r.publishAdvert(RouteUpdate, events)
}()
return nil return nil
} }
@ -776,19 +813,27 @@ func (r *router) Status() Status {
// Stop stops the router // Stop stops the router
func (r *router) Stop() error { func (r *router) Stop() error {
r.Lock() r.Lock()
defer r.Unlock()
log.Debugf("Router shutting down")
switch r.status.Code { switch r.status.Code {
case Stopped, Error: case Stopped, Error:
r.Unlock()
return r.status.Error return r.status.Error
case Running, Advertising: case Running, Advertising:
// close all the channels // close all the channels
// NOTE: close marks the router status as Stopped
r.close() r.close()
} }
r.Unlock()
log.Debugf("Router waiting for all goroutines to finish")
// wait for all goroutines to finish // wait for all goroutines to finish
r.wg.Wait() r.wg.Wait()
log.Debugf("Router successfully stopped")
return nil return nil
} }

127
router/default_test.go Normal file
View File

@ -0,0 +1,127 @@
package router
import (
"fmt"
"sync"
"testing"
"time"
"github.com/micro/go-micro/registry/memory"
"github.com/micro/go-micro/util/log"
)
func routerTestSetup() Router {
r := memory.NewRegistry()
return newRouter(Registry(r))
}
func TestRouterStartStop(t *testing.T) {
r := routerTestSetup()
log.Debugf("TestRouterStartStop STARTING")
if err := r.Start(); err != nil {
t.Errorf("failed to start router: %v", err)
}
_, err := r.Advertise()
if err != nil {
t.Errorf("failed to start advertising: %v", err)
}
if err := r.Stop(); err != nil {
t.Errorf("failed to stop router: %v", err)
}
log.Debugf("TestRouterStartStop STOPPED")
}
func TestRouterAdvertise(t *testing.T) {
r := routerTestSetup()
// lower the advertise interval
AdvertiseEventsTick = 500 * time.Millisecond
AdvertiseTableTick = 1 * time.Second
if err := r.Start(); err != nil {
t.Errorf("failed to start router: %v", err)
}
ch, err := r.Advertise()
if err != nil {
t.Errorf("failed to start advertising: %v", err)
}
// receive announce event
ann := <-ch
log.Debugf("received announce advert: %v", ann)
// Generate random unique routes
nrRoutes := 5
routes := make([]Route, nrRoutes)
route := Route{
Service: "dest.svc",
Address: "dest.addr",
Gateway: "dest.gw",
Network: "dest.network",
Router: "src.router",
Link: "det.link",
Metric: 10,
}
for i := 0; i < nrRoutes; i++ {
testRoute := route
testRoute.Service = fmt.Sprintf("%s-%d", route.Service, i)
routes[i] = testRoute
}
var advertErr error
errChan := make(chan error)
var wg sync.WaitGroup
wg.Add(1)
go func() {
wg.Done()
for _, route := range routes {
log.Debugf("Creating route %v", route)
if err := r.Table().Create(route); err != nil {
log.Debugf("Failed to create route: %v", err)
errChan <- err
return
}
}
}()
var adverts int
doneChan := make(chan bool)
wg.Add(1)
go func() {
defer func() {
wg.Done()
doneChan <- true
}()
for advert := range ch {
select {
case advertErr = <-errChan:
t.Errorf("failed advertising events: %v", advertErr)
default:
// do nothing for now
log.Debugf("Router advert received: %v", advert)
adverts += len(advert.Events)
}
return
}
}()
<-doneChan
if adverts != nrRoutes {
t.Errorf("Expected %d adverts, received: %d", nrRoutes, adverts)
}
wg.Wait()
if err := r.Stop(); err != nil {
t.Errorf("failed to stop router: %v", err)
}
}

View File

@ -7,6 +7,7 @@ func testSetup() (*table, Route) {
route := Route{ route := Route{
Service: "dest.svc", Service: "dest.svc",
Address: "dest.addr",
Gateway: "dest.gw", Gateway: "dest.gw",
Network: "dest.network", Network: "dest.network",
Router: "src.router", Router: "src.router",