Merge pull request #580 from milosgajdos83/advertise-table

Advertise full table every minute.
This commit is contained in:
Asim Aslam 2019-07-11 12:49:02 +01:00 committed by GitHub
commit dac8a13a77
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 115 additions and 43 deletions

View File

@ -13,14 +13,16 @@ import (
) )
const ( const (
// AdvertiseTick is time interval in which we advertise route updates // AdvertiseEventsTick is time interval in which the router advertises route updates
AdvertiseTick = 5 * time.Second AdvertiseEventsTick = 5 * time.Second
// AdvertiseTableTick is time interval in which router advertises all routes found in routing table
AdvertiseTableTick = 1 * time.Minute
// AdvertSuppress is advert suppression threshold // AdvertSuppress is advert suppression threshold
AdvertSuppress = 2000 AdvertSuppress = 2000
// AdvertRecover is advert recovery threshold // AdvertRecover is advert recovery threshold
AdvertRecover = 750 AdvertRecover = 750
// DefaultAdvertTTL is default advertisement TTL // DefaultAdvertTTL is default advertisement TTL
DefaultAdvertTTL = time.Minute DefaultAdvertTTL = 1 * time.Minute
// PenaltyDecay is the penalty decay // PenaltyDecay is the penalty decay
PenaltyDecay = 1.15 PenaltyDecay = 1.15
// Delete penalises route addition and deletion // Delete penalises route addition and deletion
@ -78,6 +80,28 @@ func (r *router) Options() Options {
return r.opts return r.opts
} }
// manageRoute applies route action on the routing table
func (r *router) manageRoute(route table.Route, action string) error {
switch action {
case "create":
if err := r.Create(route); err != nil && err != table.ErrDuplicateRoute {
return fmt.Errorf("failed adding route for service %s: %s", route.Service, err)
}
case "update":
if err := r.Update(route); err != nil && err != table.ErrDuplicateRoute {
return fmt.Errorf("failed updating route for service %s: %s", route.Service, err)
}
case "delete":
if err := r.Delete(route); err != nil && err != table.ErrRouteNotFound {
return fmt.Errorf("failed deleting route for service %s: %s", route.Service, err)
}
default:
return fmt.Errorf("failed to manage route for service %s. Unknown action: %s", route.Service, action)
}
return nil
}
// manageServiceRoutes manages routes for a given service. // manageServiceRoutes manages routes for a given service.
// It returns error of the routing table action fails. // It returns error of the routing table action fails.
func (r *router) manageServiceRoutes(service *registry.Service, action string) error { func (r *router) manageServiceRoutes(service *registry.Service, action string) error {
@ -95,21 +119,8 @@ func (r *router) manageServiceRoutes(service *registry.Service, action string) e
Metric: table.DefaultLocalMetric, Metric: table.DefaultLocalMetric,
} }
switch action { if err := r.manageRoute(route, action); err != nil {
case "create": return err
if err := r.Create(route); err != nil && err != table.ErrDuplicateRoute {
return fmt.Errorf("failed adding route for service %s: %s", service.Name, err)
}
case "update":
if err := r.Update(route); err != nil && err != table.ErrDuplicateRoute {
return fmt.Errorf("failed updating route for service %s: %s", service.Name, err)
}
case "delete":
if err := r.Delete(route); err != nil && err != table.ErrRouteNotFound {
return fmt.Errorf("failed deleting route for service %s: %s", service.Name, err)
}
default:
return fmt.Errorf("failed to manage route for service %s. Unknown action: %s", service.Name, action)
} }
} }
@ -132,8 +143,8 @@ func (r *router) manageRegistryRoutes(reg registry.Registry, action string) erro
continue continue
} }
// manage the routes for all returned services // manage the routes for all returned services
for _, s := range srvs { for _, srv := range srvs {
if err := r.manageServiceRoutes(s, action); err != nil { if err := r.manageServiceRoutes(srv, action); err != nil {
return err return err
} }
} }
@ -210,12 +221,14 @@ func (r *router) watchTable(w table.Watcher) error {
return watchErr return watchErr
} }
func (r *router) advertEvents(advType AdvertType, events []*table.Event) { // advertiseEvents advertises events to event subscribers
func (r *router) advertiseEvents(advType AdvertType, events []*table.Event) {
defer r.advertWg.Done() defer r.advertWg.Done()
a := &Advert{ a := &Advert{
Id: r.opts.Id, Id: r.opts.Id,
Type: advType, Type: advType,
TTL: DefaultAdvertTTL,
Timestamp: time.Now(), Timestamp: time.Now(),
Events: events, Events: events,
} }
@ -225,7 +238,45 @@ func (r *router) advertEvents(advType AdvertType, events []*table.Event) {
case <-r.exit: case <-r.exit:
return return
} }
}
// advertiseTable advertises the whole routing table to the network
func (r *router) advertiseTable() error {
// create table advertisement ticker
ticker := time.NewTicker(AdvertiseTableTick)
for {
select {
case <-ticker.C:
// list routing table routes to announce
routes, err := r.List()
if err != nil {
return fmt.Errorf("failed listing routes: %s", err)
}
// collect all the added routes before we attempt to add default gateway
events := make([]*table.Event, len(routes))
for i, route := range routes {
event := &table.Event{
Type: table.Update,
Timestamp: time.Now(),
Route: route,
}
events[i] = event
}
// advertise all routes as Update events to subscribers
if len(events) > 0 {
go func() {
r.advertWg.Add(1)
r.advertiseEvents(Update, events)
}()
}
case <-r.exit:
return nil
}
}
return nil
} }
// isFlapping detects if the event is flapping based on the current and previous event status. // isFlapping detects if the event is flapping based on the current and previous event status.
@ -241,8 +292,8 @@ func isFlapping(curr, prev *table.Event) bool {
return false return false
} }
// updateEvent is a table event enriched with advertisement data // advertEvent is a table event enriched with advertisement data
type updateEvent struct { type advertEvent struct {
*table.Event *table.Event
// timestamp marks the time the event has been received // timestamp marks the time the event has been received
timestamp time.Time timestamp time.Time
@ -258,15 +309,16 @@ type updateEvent struct {
// It suppresses unhealthy flapping events and advertises healthy events upstream. // It suppresses unhealthy flapping events and advertises healthy events upstream.
func (r *router) processEvents() error { func (r *router) processEvents() error {
// ticker to periodically scan event for advertising // ticker to periodically scan event for advertising
ticker := time.NewTicker(AdvertiseTick) ticker := time.NewTicker(AdvertiseEventsTick)
// eventMap is a map of advert events // eventMap is a map of advert events
eventMap := make(map[uint64]*updateEvent) eventMap := make(map[uint64]*advertEvent)
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
var events []*table.Event var events []*table.Event
// collect all events which are not flapping // collect all events which are not flapping
// TODO: decay the events and update suppression
for key, event := range eventMap { for key, event := range eventMap {
if !event.isFlapping && !event.isSuppressed { if !event.isFlapping && !event.isSuppressed {
e := new(table.Event) e := new(table.Event)
@ -277,9 +329,10 @@ func (r *router) processEvents() error {
} }
} }
// advertise all Update events to subscribers
if len(events) > 0 { if len(events) > 0 {
r.advertWg.Add(1) r.advertWg.Add(1)
go r.advertEvents(Update, events) go r.advertiseEvents(Update, events)
} }
case e := <-r.eventChan: case e := <-r.eventChan:
// event timestamp // event timestamp
@ -301,7 +354,7 @@ func (r *router) processEvents() error {
hash := e.Route.Hash() hash := e.Route.Hash()
event, ok := eventMap[hash] event, ok := eventMap[hash]
if !ok { if !ok {
event = &updateEvent{ event = &advertEvent{
Event: e, Event: e,
penalty: penalty, penalty: penalty,
timestamp: time.Now(), timestamp: time.Now(),
@ -334,7 +387,7 @@ func (r *router) processEvents() error {
} }
} }
// we probably never reach this place // we probably never reach this code path
return nil return nil
} }
@ -438,7 +491,7 @@ func (r *router) Advertise() (<-chan *Advert, error) {
} }
// error channel collecting goroutine errors // error channel collecting goroutine errors
errChan := make(chan error, 3) errChan := make(chan error, 4)
r.wg.Add(1) r.wg.Add(1)
go func() { go func() {
@ -457,24 +510,27 @@ func (r *router) Advertise() (<-chan *Advert, error) {
r.wg.Add(1) r.wg.Add(1)
go func() { go func() {
defer r.wg.Done() defer r.wg.Done()
// listen to routing table events and process them // watch routing table events and process them
errChan <- r.processEvents() errChan <- r.processEvents()
}() }()
r.advertWg.Add(1)
go func() {
defer r.advertWg.Done()
// advertise the whole routing table
errChan <- r.advertiseTable()
}()
// advertise your presence
r.advertWg.Add(1)
go r.advertiseEvents(Announce, events)
// watch for errors and cleanup // watch for errors and cleanup
r.wg.Add(1) r.wg.Add(1)
go r.watchErrors(errChan) go r.watchErrors(errChan)
// advertise your presence
r.advertWg.Add(1)
go r.advertEvents(Announce, events)
// mark router as running and set its Error to nil // mark router as running and set its Error to nil
status := Status{ r.status = Status{Code: Running, Error: nil}
Code: Running,
Error: nil,
}
r.status = status
} }
return r.advertChan, nil return r.advertChan, nil
@ -494,8 +550,9 @@ func (r *router) Process(a *Advert) error {
for _, event := range events { for _, event := range events {
// create a copy of the route // create a copy of the route
route := event.Route route := event.Route
if err := r.Update(route); err != nil { action := event.Type
return fmt.Errorf("failed updating routing table: %v", err) if err := r.manageRoute(route, fmt.Sprintf("%s", action)); err != nil {
return fmt.Errorf("failed applying action %s to routing table: %s", action, err)
} }
} }

View File

@ -58,8 +58,7 @@ type Advert struct {
// Timestamp marks the time when the update is sent // Timestamp marks the time when the update is sent
Timestamp time.Time Timestamp time.Time
// TTL is Advert TTL // TTL is Advert TTL
// TODO: not used TTL time.Duration
TTL time.Time
// Events is a list of routing table events to advertise // Events is a list of routing table events to advertise
Events []*table.Event Events []*table.Event
} }

View File

@ -22,6 +22,22 @@ const (
Update Update
) )
// String implements fmt.Stringer
// NOTE: we need this as this makes converting the numeric codes
// into miro style string actions very simple
func (et EventType) String() string {
switch et {
case Create:
return "create"
case Delete:
return "delete"
case Update:
return "update"
default:
return "unknown"
}
}
// Event is returned by a call to Next on the watcher. // Event is returned by a call to Next on the watcher.
type Event struct { type Event struct {
// Type defines type of event // Type defines type of event