Simplified processEvents loop; Added router Announcement.

This commit is contained in:
Milos Gajdos 2019-07-09 12:46:15 +01:00
parent b82245429e
commit 265271008e
No known key found for this signature in database
GPG Key ID: 8B31058CC55DFD4F
2 changed files with 73 additions and 49 deletions

View File

@ -38,6 +38,7 @@ type router struct {
exit chan struct{}
eventChan chan *table.Event
advertChan chan *Advert
advertWg *sync.WaitGroup
wg *sync.WaitGroup
sync.RWMutex
}
@ -58,6 +59,7 @@ func newRouter(opts ...Option) Router {
exit: make(chan struct{}),
eventChan: make(chan *table.Event),
advertChan: make(chan *Advert),
advertWg: &sync.WaitGroup{},
wg: &sync.WaitGroup{},
}
}
@ -97,7 +99,7 @@ func (r *router) Network() string {
// manageServiceRoutes manages routes for a given service.
// It returns error of the routing table action fails.
func (r *router) manageServiceRoutes(service *registry.Service, action string, metric int) error {
func (r *router) manageServiceRoutes(service *registry.Service, action string) error {
// action is the routing table action
action = strings.ToLower(action)
// take route action on each service node
@ -107,7 +109,7 @@ func (r *router) manageServiceRoutes(service *registry.Service, action string, m
Gateway: node.Address,
Router: r.opts.Address,
Network: r.opts.Network,
Metric: metric,
Metric: table.DefaultLocalMetric,
}
switch action {
case "insert", "create":
@ -127,7 +129,7 @@ func (r *router) manageServiceRoutes(service *registry.Service, action string, m
// manageRegistryRoutes manages routes for each service found in the registry.
// It returns error if either the services failed to be listed or the routing table action fails.
func (r *router) manageRegistryRoutes(reg registry.Registry, action string, metric int) error {
func (r *router) manageRegistryRoutes(reg registry.Registry, action string) error {
services, err := reg.ListServices()
if err != nil {
return fmt.Errorf("failed listing services: %v", err)
@ -143,7 +145,7 @@ func (r *router) manageRegistryRoutes(reg registry.Registry, action string, metr
}
// manage the routes for all returned services
for _, s := range srvs {
if err := r.manageServiceRoutes(s, action, metric); err != nil {
if err := r.manageServiceRoutes(s, action); err != nil {
return err
}
}
@ -177,7 +179,7 @@ func (r *router) watchServices(w registry.Watcher) error {
log.Logf("r.watchServices() new service event: Action: %s Service: %v", res.Action, res.Service)
if err := r.manageServiceRoutes(res.Service, res.Action, table.DefaultLocalMetric); err != nil {
if err := r.manageServiceRoutes(res.Service, res.Action); err != nil {
return err
}
}
@ -224,6 +226,29 @@ func (r *router) watchTable(w table.Watcher) error {
return watchErr
}
func (r *router) advertEvents(advType AdvertType, events []*table.Event) {
defer r.advertWg.Done()
log.Logf("r.advertEvents(): start event: %s", advType)
a := &Advert{
ID: r.ID(),
Type: advType,
Timestamp: time.Now(),
Events: events,
}
select {
case r.advertChan <- a:
log.Logf("r.advertEvents(): advertised event: %s", advType)
case <-r.exit:
log.Logf("r.advertEvents(): DONE exit")
return
}
log.Logf("r.advertEvents(): REGULAR exit")
}
// isFlapping detects if the event is flapping based on the current and previous event status.
func isFlapping(curr, prev *table.Event) bool {
if curr.Type == table.Update && prev.Type == table.Update {
@ -259,18 +284,12 @@ func (r *router) processEvents() error {
ticker := time.NewTicker(AdvertiseTick)
// eventMap is a map of advert events
eventMap := make(map[uint64]*updateEvent)
// lock to protect access to eventMap
mu := &sync.RWMutex{}
// waitgroup to manage advertisement goroutines
var wg sync.WaitGroup
processLoop:
for {
select {
case <-ticker.C:
var events []*table.Event
// collect all events which are not flapping
mu.Lock()
for key, event := range eventMap {
if !event.isFlapping && !event.isSuppressed {
e := new(table.Event)
@ -280,29 +299,10 @@ processLoop:
delete(eventMap, key)
}
}
mu.Unlock()
if len(events) > 0 {
wg.Add(1)
go func(events []*table.Event) {
defer wg.Done()
log.Logf("go advertise(): start")
a := &Advert{
ID: r.ID(),
Timestamp: time.Now(),
Events: events,
}
select {
case r.advertChan <- a:
case <-r.exit:
log.Logf("go advertise(): exit")
return
}
log.Logf("go advertise(): exit")
}(events)
r.advertWg.Add(1)
go r.advertEvents(Update, events)
}
case e := <-r.eventChan:
// event timestamp
@ -348,15 +348,16 @@ processLoop:
event.isFlapping = isFlapping(e, event.Event)
}
case <-r.exit:
break processLoop
}
}
// first wait for the advertiser to finish
wg.Wait()
r.advertWg.Wait()
// close the advert channel
close(r.advertChan)
log.Logf("r.processEvents(): event processor stopped")
return nil
}
}
// we probably never reach this place
log.Logf("r.processEvents(): event processor stopped")
return nil
@ -395,9 +396,11 @@ func (r *router) watchErrors(errChan <-chan error) {
// drain the advertise channel
for range r.advertChan {
}
log.Logf("r.watchErrors(): advert channel drained")
// drain the event channel
for range r.eventChan {
}
log.Logf("r.watchErrors(): event channel drained")
}
log.Logf("r.watchErrors(): watchErrors exit")
@ -411,10 +414,15 @@ func (r *router) Advertise() (<-chan *Advert, error) {
if r.status.Code != Running {
// add all local service routes into the routing table
if err := r.manageRegistryRoutes(r.opts.Registry, "insert", table.DefaultLocalMetric); err != nil {
return nil, fmt.Errorf("failed adding routes: %v", err)
if err := r.manageRegistryRoutes(r.opts.Registry, "insert"); err != nil {
return nil, fmt.Errorf("failed adding routes: %s", err)
}
log.Logf("Routing table:\n%s", r.opts.Table)
// list routing table routes to announce
routes, err := r.opts.Table.List()
if err != nil {
return nil, fmt.Errorf("failed listing routes: %s", err)
}
// add default gateway into routing table
if r.opts.Gateway != "" {
// note, the only non-default value is the gateway
@ -431,14 +439,15 @@ func (r *router) Advertise() (<-chan *Advert, error) {
}
// NOTE: we only need to recreate these if the router errored or was stopped
// TODO: These probably dont need to be struct members
if r.status.Code == Error || r.status.Code == Stopped {
r.exit = make(chan struct{})
r.eventChan = make(chan *table.Event)
r.advertChan = make(chan *Advert)
}
// routing table watcher which watches all routes i.e. to every destination
tableWatcher, err := r.opts.Table.Watch(table.WatchDestination("*"))
// routing table watcher
tableWatcher, err := r.opts.Table.Watch()
if err != nil {
return nil, fmt.Errorf("failed creating routing table watcher: %v", err)
}
@ -478,11 +487,24 @@ func (r *router) Advertise() (<-chan *Advert, error) {
log.Logf("r.Advertise(): r.processEvents() exit")
}()
// watch for errors and cleanup
r.wg.Add(1)
go r.watchErrors(errChan)
// TODO: send router announcement update comes here
// the announcement update contains routes from routing table
// announce yourself with all the existing routes
events := make([]*table.Event, len(routes))
for i, route := range routes {
event := &table.Event{
Type: table.Insert,
Timestamp: time.Now(),
Route: route,
}
events[i] = event
}
// advertise your presence
r.advertWg.Add(1)
go r.advertEvents(Announce, events)
// mark router as running and set its Error to nil
status := Status{

View File

@ -41,19 +41,19 @@ type Router interface {
// Option used by the router
type Option func(*Options)
// UpdateType is route advertisement update type
type UpdateType int
// AdvertType is route advertisement type
type AdvertType int
const (
// Announce is advertised when the router announces itself
Announce UpdateType = iota
Announce AdvertType = iota
// Update advertises route updates
Update
)
// String returns string representation of update event
func (ut UpdateType) String() string {
switch ut {
func (at AdvertType) String() string {
switch at {
case Announce:
return "ANNOUNCE"
case Update:
@ -67,6 +67,8 @@ func (ut UpdateType) String() string {
type Advert struct {
// ID is the router ID
ID string
// Type is type of advert
Type AdvertType
// Timestamp marks the time when the update is sent
Timestamp time.Time
// TTL is Advert TTL