refactor and cleanup some router code

This commit is contained in:
Asim Aslam
2020-01-22 16:33:31 +00:00
parent 8b306780ee
commit 29c1076950
7 changed files with 219 additions and 536 deletions

View File

@@ -1,6 +1,7 @@
package router
import (
"errors"
"fmt"
"math"
"sort"
@@ -16,8 +17,6 @@ import (
var (
// AdvertiseEventsTick is time interval in which the router advertises route updates
AdvertiseEventsTick = 10 * time.Second
// AdvertiseTableTick is time interval in which router advertises all routes found in routing table
AdvertiseTableTick = 2 * time.Minute
// DefaultAdvertTTL is default advertisement TTL
DefaultAdvertTTL = 2 * time.Minute
// AdvertSuppress is advert suppression threshold
@@ -37,14 +36,12 @@ var (
// router implements default router
type router struct {
sync.RWMutex
options Options
status Status
running bool
table *table
exit chan struct{}
errChan chan error
options Options
exit chan bool
eventChan chan *Event
advertWg *sync.WaitGroup
wg *sync.WaitGroup
// advert subscribers
sub sync.RWMutex
@@ -61,15 +58,9 @@ func newRouter(opts ...Option) Router {
o(&options)
}
// set initial status to Stopped
status := Status{Code: Stopped, Error: nil}
return &router{
options: options,
status: status,
table: newTable(),
advertWg: &sync.WaitGroup{},
wg: &sync.WaitGroup{},
subscribers: make(map[string]chan *Advert),
}
}
@@ -125,7 +116,7 @@ func (r *router) manageRoute(route Route, action string) error {
// manageServiceRoutes applies action to all routes of the service.
// It returns error of the action fails with error.
func (r *router) manageServiceRoutes(service *registry.Service, action string) error {
func (r *router) manageRoutes(service *registry.Service, action string) error {
// action is the routing table action
action = strings.ToLower(action)
@@ -166,7 +157,7 @@ func (r *router) manageRegistryRoutes(reg registry.Registry, action string) erro
}
// manage the routes for all returned services
for _, srv := range srvs {
if err := r.manageServiceRoutes(srv, action); err != nil {
if err := r.manageRoutes(srv, action); err != nil {
return err
}
}
@@ -181,42 +172,35 @@ func (r *router) watchRegistry(w registry.Watcher) error {
exit := make(chan bool)
defer func() {
// close the exit channel when the go routine finishes
close(exit)
}()
// wait in the background for the router to stop
// when the router stops, stop the watcher and exit
r.wg.Add(1)
go func() {
defer w.Stop()
defer r.wg.Done()
select {
case <-r.exit:
return
case <-exit:
return
case <-r.exit:
return
}
}()
var watchErr error
for {
res, err := w.Next()
if err != nil {
if err != registry.ErrWatcherStopped {
watchErr = err
return err
}
break
}
if err := r.manageServiceRoutes(res.Service, res.Action); err != nil {
if err := r.manageRoutes(res.Service, res.Action); err != nil {
return err
}
}
return watchErr
return nil
}
// watchTable watches routing table entries and either adds or deletes locally registered service to/from network registry
@@ -225,16 +209,13 @@ func (r *router) watchTable(w Watcher) error {
exit := make(chan bool)
defer func() {
// close the exit channel when the go routine finishes
close(exit)
}()
// wait in the background for the router to stop
// when the router stops, stop the watcher and exit
r.wg.Add(1)
go func() {
defer w.Stop()
defer r.wg.Done()
select {
case <-r.exit:
@@ -244,13 +225,11 @@ func (r *router) watchTable(w Watcher) error {
}
}()
var watchErr error
for {
event, err := w.Next()
if err != nil {
if err != ErrWatcherStopped {
watchErr = err
return err
}
break
}
@@ -260,13 +239,11 @@ func (r *router) watchTable(w Watcher) error {
close(r.eventChan)
return nil
case r.eventChan <- event:
// process event
}
}
// close event channel on error
close(r.eventChan)
return watchErr
return nil
}
// publishAdvert publishes router advert to advert channel
@@ -292,36 +269,6 @@ func (r *router) publishAdvert(advType AdvertType, events []*Event) {
r.sub.RUnlock()
}
// advertiseTable advertises the whole routing table to the network
func (r *router) advertiseTable() error {
// create table advertisement ticker
ticker := time.NewTicker(AdvertiseTableTick)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// do full table flush
events, err := r.flushRouteEvents(Update)
if err != nil {
return fmt.Errorf("failed flushing routes: %s", err)
}
// advertise routes to subscribers
if len(events) > 0 {
log.Debugf("Router flushing table with %d events: %s", len(events), r.options.Id)
r.advertWg.Add(1)
go func() {
defer r.advertWg.Done()
r.publishAdvert(RouteUpdate, events)
}()
}
case <-r.exit:
return nil
}
}
}
// advert contains a route event to be advertised
type advert struct {
// event received from routing table
@@ -392,17 +339,39 @@ func (r *router) advertiseEvents() error {
adverts := make(adverts)
// routing table watcher
tableWatcher, err := r.Watch()
w, err := r.Watch()
if err != nil {
return fmt.Errorf("failed creating routing table watcher: %v", err)
return err
}
defer w.Stop()
r.wg.Add(1)
go func() {
defer r.wg.Done()
select {
case r.errChan <- r.watchTable(tableWatcher):
case <-r.exit:
var err error
for {
select {
case <-r.exit:
return
default:
if w == nil {
// routing table watcher
w, err = r.Watch()
if err != nil {
log.Logf("Error creating watcher: %v", err)
time.Sleep(time.Second)
continue
}
}
if err := r.watchTable(w); err != nil {
log.Logf("Error watching table: %v", err)
time.Sleep(time.Second)
}
// reset
w.Stop()
w = nil
}
}
}()
@@ -446,11 +415,7 @@ func (r *router) advertiseEvents() error {
// advertise events to subscribers
if len(events) > 0 {
log.Debugf("Router publishing %d events", len(events))
r.advertWg.Add(1)
go func() {
defer r.advertWg.Done()
r.publishAdvert(RouteUpdate, events)
}()
go r.publishAdvert(RouteUpdate, events)
}
case e := <-r.eventChan:
// if event is nil, continue
@@ -502,65 +467,19 @@ func (r *router) advertiseEvents() error {
a.penalty += Penalty
log.Debugf("Router advert %d for route %s %s event penalty: %f", hash, a.event.Route.Service, a.event.Route.Address, a.penalty)
case <-r.exit:
// first wait for the advertiser to finish
r.advertWg.Wait()
w.Stop()
return nil
}
}
}
// close closes exit channels
func (r *router) close() {
log.Debugf("Router closing remaining channels")
// drain the advertise channel only if advertising
if r.status.Code == Advertising {
// drain the event channel
for range r.eventChan {
}
// close advert subscribers
for id, sub := range r.subscribers {
select {
case <-sub:
default:
}
// close the channel
close(sub)
// delete the subscriber
r.sub.Lock()
delete(r.subscribers, id)
r.sub.Unlock()
}
}
// mark the router as Stopped and set its Error to nil
r.status = Status{Code: Stopped, Error: nil}
}
// watchErrors watches router errors and takes appropriate actions
func (r *router) watchErrors() {
var err error
select {
case <-r.exit:
return
case err = <-r.errChan:
}
r.Lock()
defer r.Unlock()
// if the router is not stopped, stop it
if r.status.Code != Stopped {
// notify all goroutines to finish
close(r.exit)
// close all the channels
r.close()
// set the status error
if err != nil {
r.status.Error = err
// drain all the events, only called on Stop
func (r *router) drain() {
for {
select {
case <-r.eventChan:
default:
return
}
}
}
@@ -570,16 +489,13 @@ func (r *router) Start() error {
r.Lock()
defer r.Unlock()
// only start if we're stopped
if r.status.Code != Stopped {
if r.running {
return nil
}
// add all local service routes into the routing table
if err := r.manageRegistryRoutes(r.options.Registry, "create"); err != nil {
e := fmt.Errorf("failed adding registry routes: %s", err)
r.status = Status{Code: Error, Error: e}
return e
return fmt.Errorf("failed adding registry routes: %s", err)
}
// add default gateway into routing table
@@ -595,42 +511,49 @@ func (r *router) Start() error {
Metric: DefaultLocalMetric,
}
if err := r.table.Create(route); err != nil {
e := fmt.Errorf("failed adding default gateway route: %s", err)
r.status = Status{Code: Error, Error: e}
return e
return fmt.Errorf("failed adding default gateway route: %s", err)
}
}
// create error and exit channels
r.errChan = make(chan error, 1)
r.exit = make(chan struct{})
r.exit = make(chan bool)
// registry watcher
regWatcher, err := r.options.Registry.Watch()
w, err := r.options.Registry.Watch()
if err != nil {
e := fmt.Errorf("failed creating registry watcher: %v", err)
r.status = Status{Code: Error, Error: e}
return e
return fmt.Errorf("failed creating registry watcher: %v", err)
}
r.wg.Add(1)
go func() {
defer r.wg.Done()
select {
case r.errChan <- r.watchRegistry(regWatcher):
case <-r.exit:
var err error
for {
select {
case <-r.exit:
w.Stop()
return
default:
if w == nil {
w, err = r.options.Registry.Watch()
if err != nil {
log.Logf("failed creating registry watcher: %v", err)
time.Sleep(time.Second)
continue
}
}
if err := r.watchRegistry(w); err != nil {
log.Logf("Error watching the registry: %v", err)
time.Sleep(time.Second)
}
w.Stop()
w = nil
}
}
}()
// watch for errors and cleanup
r.wg.Add(1)
go func() {
defer r.wg.Done()
r.watchErrors()
}()
// mark router as Running
r.status = Status{Code: Running, Error: nil}
r.running = true
return nil
}
@@ -642,61 +565,46 @@ func (r *router) Advertise() (<-chan *Advert, error) {
r.Lock()
defer r.Unlock()
switch r.status.Code {
case Advertising:
advertChan := make(chan *Advert, 128)
r.subscribers[uuid.New().String()] = advertChan
return advertChan, nil
case Running:
// list all the routes and pack them into even slice to advertise
events, err := r.flushRouteEvents(Create)
if err != nil {
return nil, fmt.Errorf("failed to flush routes: %s", err)
}
// create event channels
r.eventChan = make(chan *Event)
// create advert channel
advertChan := make(chan *Advert, 128)
r.subscribers[uuid.New().String()] = advertChan
// advertise your presence
r.advertWg.Add(1)
go func() {
defer r.advertWg.Done()
r.publishAdvert(Announce, events)
}()
r.wg.Add(1)
go func() {
defer r.wg.Done()
select {
case r.errChan <- r.advertiseEvents():
case <-r.exit:
}
}()
r.advertWg.Add(1)
go func() {
defer r.advertWg.Done()
// advertise the whole routing table
select {
case r.errChan <- r.advertiseTable():
case <-r.exit:
}
}()
// mark router as Running and set its Error to nil
r.status = Status{Code: Advertising, Error: nil}
log.Debugf("Router starting to advertise")
return advertChan, nil
case Stopped:
return nil, fmt.Errorf("not running")
if !r.running {
return nil, errors.New("not running")
}
return nil, fmt.Errorf("error: %s", r.status.Error)
// already advertising
if r.eventChan != nil {
advertChan := make(chan *Advert, 128)
r.subscribers[uuid.New().String()] = advertChan
return advertChan, nil
}
// list all the routes and pack them into even slice to advertise
events, err := r.flushRouteEvents(Create)
if err != nil {
return nil, fmt.Errorf("failed to flush routes: %s", err)
}
// create event channels
r.eventChan = make(chan *Event)
// create advert channel
advertChan := make(chan *Advert, 128)
r.subscribers[uuid.New().String()] = advertChan
// advertise your presence
go r.publishAdvert(Announce, events)
go func() {
select {
case <-r.exit:
return
default:
if err := r.advertiseEvents(); err != nil {
log.Logf("Error adveritising events: %v", err)
}
}
}()
return advertChan, nil
}
// Process updates the routing table using the advertised values
@@ -774,48 +682,39 @@ func (r *router) Watch(opts ...WatchOption) (Watcher, error) {
return r.table.Watch(opts...)
}
// Status returns router status
func (r *router) Status() Status {
r.RLock()
defer r.RUnlock()
// make a copy of the status
status := r.status
return status
}
// Stop stops the router
func (r *router) Stop() error {
r.Lock()
defer r.Unlock()
log.Debugf("Router shutting down")
switch r.status.Code {
case Stopped, Error:
r.Unlock()
return r.status.Error
case Running, Advertising:
// notify all goroutines to finish
select {
case <-r.exit:
return nil
default:
close(r.exit)
// close all the channels
// NOTE: close marks the router status as Stopped
r.close()
// extract the events
r.drain()
// close advert subscribers
for id, sub := range r.subscribers {
// close the channel
close(sub)
// delete the subscriber
r.sub.Lock()
delete(r.subscribers, id)
r.sub.Unlock()
}
}
r.Unlock()
log.Tracef("Router waiting for all goroutines to finish")
// wait for all goroutines to finish
r.wg.Wait()
log.Debugf("Router successfully stopped")
// remove event chan
r.eventChan = nil
return nil
}
// String prints debugging information about router
func (r *router) String() string {
return "memory"
return "registry"
}