From 72ef0321626606d53291a6190004bcb263d96d55 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Fri, 5 Jul 2019 19:15:32 +0100 Subject: [PATCH] First shot at flapping detection and event advertising. This commit also adds Route hash function, lots of debug messages for now and String() methods for various API objects. --- network/router/default_router.go | 241 +++++++++++++++++++++++++++---- network/router/default_table.go | 6 + network/router/route.go | 12 +- network/router/table_watcher.go | 10 +- 4 files changed, 239 insertions(+), 30 deletions(-) diff --git a/network/router/default_router.go b/network/router/default_router.go index e1c97ccf..7ad1197c 100644 --- a/network/router/default_router.go +++ b/network/router/default_router.go @@ -2,11 +2,13 @@ package router import ( "fmt" + "math" "sort" "strings" "sync" "time" + "github.com/micro/go-log" "github.com/micro/go-micro/registry" "github.com/olekukonko/tablewriter" ) @@ -18,14 +20,21 @@ const ( DeleteRoutePenalty = 1000 // AdvertiseTick is time interval in which we advertise route updates AdvertiseTick = 5 * time.Second + // AdvertSuppress is advert suppression threshold + AdvertSuppress = 2000 + // AdvertRecover is advert suppression recovery threshold + AdvertRecover = 750 + // PenaltyDecay is the "half-life" of the penalty + PenaltyDecay = 1.15 ) // router provides default router implementation type router struct { opts Options status Status - advertChan chan *Advert exit chan struct{} + eventChan chan *Event + advertChan chan *Advert wg *sync.WaitGroup sync.RWMutex } @@ -43,8 +52,9 @@ func newRouter(opts ...Option) Router { return &router{ opts: options, status: Status{Error: nil, Code: Init}, - advertChan: make(chan *Advert), exit: make(chan struct{}), + eventChan: make(chan *Event), + advertChan: make(chan *Advert), wg: &sync.WaitGroup{}, } } @@ -83,9 +93,9 @@ func (r *router) Network() string { } // addServiceRoutes adds all services in given registry to the routing table. -// NOTE: this is a one-off operation done when bootstrapping the routing table +// NOTE: this is a one-off operation done when bootstrapping the router // It returns error if either the services failed to be listed or -// if any of the the routes could not be added to the routing table. +// if any of the the routes failed to be added to the routing table. func (r *router) addServiceRoutes(reg registry.Registry, network string, metric int) error { services, err := reg.ListServices() if err != nil { @@ -124,9 +134,9 @@ func (r *router) addServiceRoutes(reg registry.Registry, network string, metric return nil } -// manageServiceRoutes watches services in given registry and updates the routing table accordingly. -// It returns error if the service registry watcher has stopped or if the routing table failed to be updated. -func (r *router) manageServiceRoutes(w registry.Watcher, metric int) error { +// watchServices watches services in given registry and updates the routing table accordingly. +// It returns error if the service registry watcher stops or if the routing table can't be updated. +func (r *router) watchServices(w registry.Watcher) error { // wait in the background for the router to stop // when the router stops, stop the watcher and exit r.wg.Add(1) @@ -151,7 +161,7 @@ func (r *router) manageServiceRoutes(w registry.Watcher, metric int) error { Destination: res.Service.Name, Router: r.opts.Address, Network: r.opts.Network, - Metric: metric, + Metric: DefaultLocalMetric, } switch res.Action { @@ -193,31 +203,173 @@ func (r *router) watchTable(w Watcher) error { } break } - - u := &Advert{ - ID: r.ID(), - Timestamp: time.Now(), - Events: []*Event{event}, - } - select { case <-r.exit: - close(r.advertChan) - return watchErr - case r.advertChan <- u: + close(r.eventChan) + return nil + case r.eventChan <- event: } } - // close the advertisement channel - close(r.advertChan) + // close event channel on error + close(r.eventChan) return watchErr } -// watchError watches router errors -func (r *router) watchError(errChan <-chan error) { +func eventFlap(curr, prev *Event) bool { + if curr.Type == UpdateEvent && prev.Type == UpdateEvent { + // update flap: this can be either metric or whatnot + log.Logf("eventFlap(): Update flap") + return true + } + + if curr.Type == CreateEvent && prev.Type == DeleteEvent || curr.Type == DeleteEvent && prev.Type == CreateEvent { + log.Logf("eventFlap(): Create/Delete flap") + return true + } + + return false +} + +// processEvents processes routing table events. +// It suppresses unhealthy flapping events and advertises healthy events upstream. +func (r *router) processEvents() error { + // ticker to periodically scan event for advertising + ticker := time.NewTicker(AdvertiseTick) + + // TODO: Need to flag already advertised events otherwise we'll keep on advertising them + // as they keep getting advertised unless deleted and are only deleted when received by upstream + + // advertEvent is a table event enriched with advert data + type advertEvent struct { + *Event + timestamp time.Time + penalty float64 + isSuppressed bool + isFlapping bool + } + + // eventMap is a map of advert events that might end up being advertised + eventMap := make(map[uint64]*advertEvent) + // lock to protect access to eventMap + mu := &sync.RWMutex{} + // waitgroup to manage advertisement goroutines + var wg sync.WaitGroup + +process: + for { + select { + case <-ticker.C: + var events []*Event + // decay the penalties of existing events + mu.RLock() + for _, event := range eventMap { + delta := time.Since(event.timestamp).Seconds() + event.penalty = event.penalty * math.Exp(delta) + // suppress or recover the event based on its current penalty + if !event.isSuppressed && event.penalty > AdvertSuppress { + event.isSuppressed = true + } else if event.penalty < AdvertRecover { + event.isSuppressed = false + event.isFlapping = false + } + if !event.isFlapping { + e := new(Event) + *e = *event.Event + events = append(events, e) + } + } + mu.RUnlock() + + if len(events) > 0 { + wg.Add(1) + go func(events []*Event) { + defer wg.Done() + + log.Logf("go advertise(): start") + + a := &Advert{ + ID: r.ID(), + Timestamp: time.Now(), + Events: events, + } + + select { + case r.advertChan <- a: + mu.Lock() + // once we've advertised the events, we need to delete them + for _, event := range a.Events { + delete(eventMap, event.Route.Hash()) + } + mu.Unlock() + case <-r.exit: + log.Logf("go advertise(): exit") + return + } + log.Logf("go advertise(): exit") + }(events) + } + case e := <-r.eventChan: + // if event is nil, break + if e == nil { + continue + } + log.Logf("r.processEvents(): event received:\n%s", e) + // determine the event penalty + var penalty float64 + switch e.Type { + case UpdateEvent: + penalty = UpdateRoutePenalty + case CreateEvent, DeleteEvent: + penalty = DeleteRoutePenalty + } + // we use route hash as eventMap key + hash := e.Route.Hash() + event, ok := eventMap[hash] + if !ok { + event = &advertEvent{ + Event: e, + penalty: penalty, + timestamp: time.Now(), + } + eventMap[hash] = event + continue + } + // update penalty for existing event: decay existing and add new penalty + delta := time.Since(event.timestamp).Seconds() + event.penalty = event.penalty*math.Exp(delta) + penalty + event.timestamp = time.Now() + // suppress or recover the event based on its current penalty + if !event.isSuppressed && event.penalty > AdvertSuppress { + event.isSuppressed = true + } else if event.penalty < AdvertRecover { + event.isSuppressed = false + } + // if not suppressed decide if if its flapping + if !event.isSuppressed { + // detect if its flapping + event.isFlapping = eventFlap(e, event.Event) + } + case <-r.exit: + break process + } + } + + wg.Wait() + close(r.advertChan) + + log.Logf("r.processEvents(): event processor stopped") + + return nil +} + +// manage watches router errors and takes appropriate actions +func (r *router) manage(errChan <-chan error) { defer r.wg.Done() + log.Logf("r.manage(): manage start") + var code StatusCode var err error @@ -228,6 +380,8 @@ func (r *router) watchError(errChan <-chan error) { code = Error } + log.Logf("r.manage(): manage exiting") + r.Lock() defer r.Unlock() status := Status{ @@ -236,6 +390,8 @@ func (r *router) watchError(errChan <-chan error) { } r.status = status + log.Logf("r.manage(): router status: %v", r.status) + // stop the router if some error happened if err != nil && code != Stopped { // this will stop watchers which will close r.advertChan @@ -243,7 +399,12 @@ func (r *router) watchError(errChan <-chan error) { // drain the advertise channel for range r.advertChan { } + // drain the event channel + for range r.eventChan { + } } + + log.Logf("r.manage(): manage exit") } // Advertise advertises the routes to the network. @@ -257,6 +418,7 @@ func (r *router) Advertise() (<-chan *Advert, error) { if err := r.addServiceRoutes(r.opts.Registry, "local", DefaultLocalMetric); err != nil { return nil, fmt.Errorf("failed adding routes: %v", err) } + log.Logf("Routing table:\n%s", r.opts.Table) // add default gateway into routing table if r.opts.Gateway != "" { // note, the only non-default value is the gateway @@ -273,8 +435,10 @@ func (r *router) Advertise() (<-chan *Advert, error) { } // NOTE: we only need to recreate the exit/advertChan if the router errored or was stopped + // TODO: these channels most likely won't have to be the struct fields if r.status.Code == Error || r.status.Code == Stopped { r.exit = make(chan struct{}) + r.eventChan = make(chan *Event) r.advertChan = make(chan *Advert) } @@ -283,31 +447,44 @@ func (r *router) Advertise() (<-chan *Advert, error) { if err != nil { return nil, fmt.Errorf("failed creating routing table watcher: %v", err) } - // registry watcher - regWatcher, err := r.opts.Registry.Watch() + // service registry watcher + svcWatcher, err := r.opts.Registry.Watch() if err != nil { - return nil, fmt.Errorf("failed creating registry watcher: %v", err) + return nil, fmt.Errorf("failed creating service registry watcher: %v", err) } // error channel collecting goroutine errors - errChan := make(chan error, 2) + errChan := make(chan error, 3) r.wg.Add(1) go func() { defer r.wg.Done() + log.Logf("r.Advertise(): r.watchServices() start") // watch local registry and register routes in routine table - errChan <- r.manageServiceRoutes(regWatcher, DefaultLocalMetric) + errChan <- r.watchServices(svcWatcher) + log.Logf("r.Advertise(): r.watchServices() exit") }() r.wg.Add(1) go func() { defer r.wg.Done() + log.Logf("r.Advertise(): r.watchTable() start") // watch local registry and register routes in routing table errChan <- r.watchTable(tableWatcher) + log.Logf("r.Advertise(): r.watchTable() exit") }() r.wg.Add(1) - go r.watchError(errChan) + go func() { + defer r.wg.Done() + log.Logf("r.Advertise(): r.processEvents() start") + // listen to routing table events and process them + errChan <- r.processEvents() + log.Logf("r.Advertise(): r.processEvents() exit") + }() + + r.wg.Add(1) + go r.manage(errChan) // mark router as running and set its Error to nil status := Status{ @@ -362,20 +539,28 @@ func (r *router) Status() Status { // Stop stops the router func (r *router) Stop() error { + log.Logf("r.Stop(): Stopping router") r.RLock() // only close the channel if the router is running if r.status.Code == Running { // notify all goroutines to finish close(r.exit) + log.Logf("r.Stop(): exit closed") // drain the advertise channel for range r.advertChan { } + log.Logf("r.Stop(): advert channel drained") + // drain the event channel + for range r.eventChan { + } + log.Logf("r.Stop(): event channel drained") } r.RUnlock() // wait for all goroutines to finish r.wg.Wait() + log.Logf("r.Stop(): Router stopped") return nil } diff --git a/network/router/default_table.go b/network/router/default_table.go index bf4e9646..28659cbd 100644 --- a/network/router/default_table.go +++ b/network/router/default_table.go @@ -8,6 +8,7 @@ import ( "sync" "github.com/google/uuid" + "github.com/micro/go-log" "github.com/olekukonko/tablewriter" ) @@ -19,6 +20,7 @@ type TableOptions struct{} type table struct { // opts are table options opts TableOptions + // TODO: we should stop key-ing on destination // m stores routing table map m map[string]map[uint64]Route // h hashes route entries @@ -242,12 +244,16 @@ func (t *table) sendEvent(r *Event) { t.RLock() defer t.RUnlock() + log.Logf("sending event to %d registered table watchers", len(t.w)) + for _, w := range t.w { select { case w.resChan <- r: case <-w.done: } } + + log.Logf("sending event done") } // Size returns the size of the routing table diff --git a/network/router/route.go b/network/router/route.go index 7ee3559c..5e3cd8e5 100644 --- a/network/router/route.go +++ b/network/router/route.go @@ -2,6 +2,7 @@ package router import ( "fmt" + "hash/fnv" "strings" "github.com/olekukonko/tablewriter" @@ -56,8 +57,17 @@ type Route struct { Policy RoutePolicy } +// Hash returns route hash sum. +func (r *Route) Hash() uint64 { + h := fnv.New64() + h.Reset() + h.Write([]byte(r.Destination + r.Gateway + r.Network)) + + return h.Sum64() +} + // String allows to print the route -func (r *Route) String() string { +func (r Route) String() string { // this will help us build routing table string sb := &strings.Builder{} diff --git a/network/router/table_watcher.go b/network/router/table_watcher.go index 976fa8af..2c5d8989 100644 --- a/network/router/table_watcher.go +++ b/network/router/table_watcher.go @@ -2,9 +2,11 @@ package router import ( "errors" + "fmt" "strings" "time" + "github.com/micro/go-log" "github.com/olekukonko/tablewriter" ) @@ -45,10 +47,15 @@ type Event struct { Type EventType // Timestamp is event timestamp Timestamp time.Time - // Route is table rout + // Route is table route Route Route } +// String prints human readable Event +func (e Event) String() string { + return fmt.Sprintf("[EVENT] Type: %s\nRoute:\n%s", e.Type, e.Route) +} + // WatchOption is used to define what routes to watch in the table type WatchOption func(*WatchOptions) @@ -94,6 +101,7 @@ func (w *tableWatcher) Next() (*Event, error) { case res.Route.Destination, "*": return res, nil default: + log.Logf("no table watcher available to receive the event") continue } case <-w.done: