Merge pull request #608 from milosgajdos83/router-cleanup

Small router refactoring
This commit is contained in:
Asim Aslam 2019-07-24 09:46:48 -07:00 committed by GitHub
commit 388ac34b7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 193 additions and 155 deletions

View File

@ -17,6 +17,8 @@ const (
AdvertiseEventsTick = 5 * time.Second AdvertiseEventsTick = 5 * time.Second
// AdvertiseTableTick is time interval in which router advertises all routes found in routing table // AdvertiseTableTick is time interval in which router advertises all routes found in routing table
AdvertiseTableTick = 1 * time.Minute AdvertiseTableTick = 1 * time.Minute
// AdvertiseFlushTick is time the yet unconsumed advertisements are flush i.e. discarded
AdvertiseFlushTick = 15 * time.Second
// AdvertSuppress is advert suppression threshold // AdvertSuppress is advert suppression threshold
AdvertSuppress = 2000.0 AdvertSuppress = 2000.0
// AdvertRecover is advert recovery threshold // AdvertRecover is advert recovery threshold
@ -38,13 +40,14 @@ var (
PenaltyDecay = math.Log(2) / PenaltyHalfLife PenaltyDecay = math.Log(2) / PenaltyHalfLife
) )
// router provides default router implementation // router implements default router
type router struct { type router struct {
// embed the table // embed the table
table.Table table.Table
opts Options opts Options
status Status status Status
exit chan struct{} exit chan struct{}
errChan chan error
eventChan chan *table.Event eventChan chan *table.Event
advertChan chan *Advert advertChan chan *Advert
advertWg *sync.WaitGroup advertWg *sync.WaitGroup
@ -52,7 +55,7 @@ type router struct {
sync.RWMutex sync.RWMutex
} }
// newRouter creates a new router and returns it // newRouter creates new router and returns it
func newRouter(opts ...Option) Router { func newRouter(opts ...Option) Router {
// get default options // get default options
options := DefaultOptions() options := DefaultOptions()
@ -62,16 +65,17 @@ func newRouter(opts ...Option) Router {
o(&options) o(&options)
} }
return &router{ r := &router{
Table: options.Table, Table: options.Table,
opts: options, opts: options,
status: Status{Error: nil, Code: Stopped}, status: Status{Code: Stopped, Error: nil},
exit: make(chan struct{}), advertWg: &sync.WaitGroup{},
eventChan: make(chan *table.Event), wg: &sync.WaitGroup{},
advertChan: make(chan *Advert),
advertWg: &sync.WaitGroup{},
wg: &sync.WaitGroup{},
} }
go r.run()
return r
} }
// Init initializes router with given options // Init initializes router with given options
@ -87,7 +91,7 @@ func (r *router) Options() Options {
return r.opts return r.opts
} }
// manageRoute applies route action on the routing table // manageRoute applies action on a given route
func (r *router) manageRoute(route table.Route, action string) error { func (r *router) manageRoute(route table.Route, action string) error {
switch action { switch action {
case "create": case "create":
@ -109,8 +113,8 @@ func (r *router) manageRoute(route table.Route, action string) error {
return nil return nil
} }
// manageServiceRoutes manages routes for a given service. // manageServiceRoutes applies action to all routes of the service.
// It returns error of the routing table action fails. // It returns error of the action fails with error.
func (r *router) manageServiceRoutes(service *registry.Service, action string) error { func (r *router) manageServiceRoutes(service *registry.Service, action string) error {
// action is the routing table action // action is the routing table action
action = strings.ToLower(action) action = strings.ToLower(action)
@ -134,7 +138,7 @@ func (r *router) manageServiceRoutes(service *registry.Service, action string) e
return nil return nil
} }
// manageRegistryRoutes manages routes for each service found in the registry. // manageRegistryRoutes applies action to all routes of each service found in the registry.
// It returns error if either the services failed to be listed or the routing table action fails. // It returns error if either the services failed to be listed or the routing table action fails.
func (r *router) manageRegistryRoutes(reg registry.Registry, action string) error { func (r *router) manageRegistryRoutes(reg registry.Registry, action string) error {
services, err := reg.ListServices() services, err := reg.ListServices()
@ -228,8 +232,9 @@ func (r *router) watchTable(w table.Watcher) error {
return watchErr return watchErr
} }
// advertiseEvents advertises events to event subscribers // publishAdvert publishes router advert to advert channel
func (r *router) advertiseEvents(advType AdvertType, events []*table.Event) { // NOTE: this might cease to be a dedicated method in the future
func (r *router) publishAdvert(advType AdvertType, events []*table.Event) {
defer r.advertWg.Done() defer r.advertWg.Done()
a := &Advert{ a := &Advert{
@ -274,7 +279,7 @@ func (r *router) advertiseTable() error {
// advertise all routes as Update events to subscribers // advertise all routes as Update events to subscribers
if len(events) > 0 { if len(events) > 0 {
r.advertWg.Add(1) r.advertWg.Add(1)
go r.advertiseEvents(Update, events) go r.publishAdvert(Update, events)
} }
case <-r.exit: case <-r.exit:
return nil return nil
@ -295,14 +300,29 @@ type routeAdvert struct {
suppressTime time.Time suppressTime time.Time
} }
// processEvents processes routing table events. // advertiseEvents advertises routing table events
// It suppresses unhealthy flapping events and advertises healthy events upstream. // It suppresses unhealthy flapping events and advertises healthy events upstream.
func (r *router) processEvents() error { func (r *router) advertiseEvents() error {
// ticker to periodically scan event for advertising // ticker to periodically scan event for advertising
ticker := time.NewTicker(AdvertiseEventsTick) ticker := time.NewTicker(AdvertiseEventsTick)
// advertMap is a map of advert events // advertMap is a map of advert events
advertMap := make(map[uint64]*routeAdvert) advertMap := make(map[uint64]*routeAdvert)
// routing table watcher
tableWatcher, err := r.Watch()
if err != nil {
return fmt.Errorf("failed creating routing table watcher: %v", err)
}
r.wg.Add(1)
go func() {
defer r.wg.Done()
select {
case r.errChan <- r.watchTable(tableWatcher):
case <-r.exit:
}
}()
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
@ -344,7 +364,7 @@ func (r *router) processEvents() error {
// advertise all Update events to subscribers // advertise all Update events to subscribers
if len(events) > 0 { if len(events) > 0 {
r.advertWg.Add(1) r.advertWg.Add(1)
go r.advertiseEvents(Update, events) go r.publishAdvert(Update, events)
} }
case e := <-r.eventChan: case e := <-r.eventChan:
// if event is nil, continue // if event is nil, continue
@ -399,30 +419,18 @@ func (r *router) processEvents() error {
} }
// watchErrors watches router errors and takes appropriate actions // watchErrors watches router errors and takes appropriate actions
func (r *router) watchErrors(errChan <-chan error) { func (r *router) watchErrors() {
defer r.wg.Done()
var code StatusCode
var err error var err error
select { select {
case <-r.exit: case <-r.exit:
code = Stopped case err = <-r.errChan:
case err = <-errChan:
code = Error
} }
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
status := Status{ if r.status.Code != Stopped {
Code: code, // notify all goroutines to finish
Error: err,
}
r.status = status
// stop the router if some error happened
if err != nil && code != Stopped {
// this will stop watchers which will close r.advertChan
close(r.exit) close(r.exit)
// drain the advertise channel // drain the advertise channel
for range r.advertChan { for range r.advertChan {
@ -432,20 +440,88 @@ func (r *router) watchErrors(errChan <-chan error) {
} }
} }
if err != nil {
r.status = Status{Code: Error, Error: err}
}
} }
// Advertise advertises the routes to the network. // Run runs the router.
// It returns error if any of the launched goroutines fail with error. // It returns error if the router is already running.
func (r *router) run() {
r.Lock()
defer r.Unlock()
switch r.status.Code {
case Stopped, Error:
// add all local service routes into the routing table
if err := r.manageRegistryRoutes(r.opts.Registry, "create"); err != nil {
r.status = Status{Code: Error, Error: fmt.Errorf("failed adding registry routes: %s", err)}
return
}
// add default gateway into routing table
if r.opts.Gateway != "" {
// note, the only non-default value is the gateway
route := table.Route{
Service: "*",
Address: "*",
Gateway: r.opts.Gateway,
Network: "*",
Metric: table.DefaultLocalMetric,
}
if err := r.Create(route); err != nil {
r.status = Status{Code: Error, Error: fmt.Errorf("failed adding default gateway route: %s", err)}
return
}
}
// create error and exit channels
r.errChan = make(chan error, 1)
r.exit = make(chan struct{})
// registry watcher
regWatcher, err := r.opts.Registry.Watch()
if err != nil {
r.status = Status{Code: Error, Error: fmt.Errorf("failed creating registry watcher: %v", err)}
return
}
r.wg.Add(1)
go func() {
defer r.wg.Done()
select {
case r.errChan <- r.watchRegistry(regWatcher):
case <-r.exit:
}
}()
// watch for errors and cleanup
r.wg.Add(1)
go func() {
defer r.wg.Done()
r.watchErrors()
}()
// mark router as Running and set its Error to nil
r.status = Status{Code: Running, Error: nil}
return
}
return
}
// Advertise stars advertising the routes to the network and returns the advertisements channel to consume from.
// If the router is already advertising it returns the channel to consume from.
// It returns error if either the router is not running or if the routing table fails to list the routes to advertise.
func (r *router) Advertise() (<-chan *Advert, error) { func (r *router) Advertise() (<-chan *Advert, error) {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
if r.status.Code != Running { switch r.status.Code {
// add all local service routes into the routing table case Advertising:
if err := r.manageRegistryRoutes(r.opts.Registry, "create"); err != nil { return r.advertChan, nil
return nil, fmt.Errorf("failed adding routes: %s", err) case Running:
}
// list routing table routes to announce // list routing table routes to announce
routes, err := r.List() routes, err := r.List()
if err != nil { if err != nil {
@ -462,85 +538,42 @@ func (r *router) Advertise() (<-chan *Advert, error) {
events[i] = event events[i] = event
} }
// add default gateway into routing table // create advertise and event channels
if r.opts.Gateway != "" { r.advertChan = make(chan *Advert)
// note, the only non-default value is the gateway r.eventChan = make(chan *table.Event)
route := table.Route{
Service: "*", // advertise your presence
Address: "*", r.advertWg.Add(1)
Gateway: r.opts.Gateway, go r.publishAdvert(Announce, events)
Network: "*",
Metric: table.DefaultLocalMetric, r.wg.Add(1)
go func() {
defer r.wg.Done()
select {
case r.errChan <- r.advertiseEvents():
case <-r.exit:
} }
if err := r.Create(route); err != nil {
return nil, fmt.Errorf("failed adding default gateway route: %s", err)
}
}
// NOTE: we only need to recreate these if the router errored or was stopped
// TODO: These probably dont need to be struct members
if r.status.Code == Error || r.status.Code == Stopped {
r.exit = make(chan struct{})
r.eventChan = make(chan *table.Event)
r.advertChan = make(chan *Advert)
}
// routing table watcher
tableWatcher, err := r.Watch()
if err != nil {
return nil, fmt.Errorf("failed creating routing table watcher: %v", err)
}
// registry watcher
regWatcher, err := r.opts.Registry.Watch()
if err != nil {
return nil, fmt.Errorf("failed creating service registry watcher: %v", err)
}
// error channel collecting goroutine errors
errChan := make(chan error, 4)
r.wg.Add(1)
go func() {
defer r.wg.Done()
// watch local registry and register routes in routine table
errChan <- r.watchRegistry(regWatcher)
}()
r.wg.Add(1)
go func() {
defer r.wg.Done()
// watch local registry and register routes in routing table
errChan <- r.watchTable(tableWatcher)
}()
r.wg.Add(1)
go func() {
defer r.wg.Done()
// watch routing table events and process them
errChan <- r.processEvents()
}() }()
r.advertWg.Add(1) r.advertWg.Add(1)
go func() { go func() {
defer r.advertWg.Done() defer r.advertWg.Done()
// advertise the whole routing table // advertise the whole routing table
errChan <- r.advertiseTable() select {
case r.errChan <- r.advertiseTable():
case <-r.exit:
}
}() }()
// advertise your presence // mark router as Running and set its Error to nil
r.advertWg.Add(1) r.status = Status{Code: Advertising, Error: nil}
go r.advertiseEvents(Announce, events)
// watch for errors and cleanup return r.advertChan, nil
r.wg.Add(1) case Stopped:
go r.watchErrors(errChan) return nil, fmt.Errorf("not running")
// mark router as running and set its Error to nil
r.status = Status{Code: Running, Error: nil}
} }
return r.advertChan, nil return nil, fmt.Errorf("error: %s", r.status.Error)
} }
// Process updates the routing table using the advertised values // Process updates the routing table using the advertised values
@ -579,9 +612,9 @@ func (r *router) Status() Status {
// Stop stops the router // Stop stops the router
func (r *router) Stop() error { func (r *router) Stop() error {
r.RLock() r.Lock()
// only close the channel if the router is running // only close the channel if the router is running and/or advertising
if r.status.Code == Running { if r.status.Code == Running || r.status.Code == Advertising {
// notify all goroutines to finish // notify all goroutines to finish
close(r.exit) close(r.exit)
// drain the advertise channel // drain the advertise channel
@ -590,8 +623,11 @@ func (r *router) Stop() error {
// drain the event channel // drain the event channel
for range r.eventChan { for range r.eventChan {
} }
// mark the router as Stopped and set its Error to nil
r.status = Status{Code: Stopped, Error: nil}
} }
r.RUnlock() r.Unlock()
// wait for all goroutines to finish // wait for all goroutines to finish
r.wg.Wait() r.wg.Wait()

View File

@ -19,9 +19,9 @@ type Options struct {
Id string Id string
// Address is router address // Address is router address
Address string Address string
// Gateway is micro network gateway // Gateway is network gateway
Gateway string Gateway string
// Network is micro network // Network is network address
Network string Network string
// Registry is the local registry // Registry is the local registry
Registry registry.Registry Registry registry.Registry
@ -57,13 +57,6 @@ func Network(n string) Option {
} }
} }
// Table sets the routing table
func Table(t table.Table) Option {
return func(o *Options) {
o.Table = t
}
}
// Registry sets the local registry // Registry sets the local registry
func Registry(r registry.Registry) Option { func Registry(r registry.Registry) Option {
return func(o *Options) { return func(o *Options) {
@ -71,6 +64,13 @@ func Registry(r registry.Registry) Option {
} }
} }
// Table sets the routing table
func Table(t table.Table) Option {
return func(o *Options) {
o.Table = t
}
}
// DefaultOptions returns router default options // DefaultOptions returns router default options
func DefaultOptions() Options { func DefaultOptions() Options {
return Options{ return Options{

View File

@ -7,20 +7,9 @@ import (
"github.com/micro/go-micro/network/router/table" "github.com/micro/go-micro/network/router/table"
) )
const ( var (
// Status codes // DefaultRouter is default network router
// Running means the router is up and running DefaultRouter = NewRouter()
Running StatusCode = iota
// Stopped means the router has been stopped
Stopped
// Error means the router has encountered error
Error
// Advert types
// Announce is advertised when the router announces itself
Announce AdvertType = iota
// Update advertises route updates
Update
) )
// Router is an interface for a routing control plane // Router is an interface for a routing control plane
@ -46,9 +35,38 @@ type Router interface {
// Option used by the router // Option used by the router
type Option func(*Options) type Option func(*Options)
// StatusCode defines router status
type StatusCode int
const (
// Running means the router is up and running
Running StatusCode = iota
// Advertising means the router is advertising
Advertising
// Stopped means the router has been stopped
Stopped
// Error means the router has encountered error
Error
)
// Status is router status
type Status struct {
// Error is router error
Error error
// Code defines router status
Code StatusCode
}
// AdvertType is route advertisement type // AdvertType is route advertisement type
type AdvertType int type AdvertType int
const (
// Announce is advertised when the router announces itself
Announce AdvertType = iota
// Update advertises route updates
Update
)
// Advert contains a list of events advertised by the router to the network // Advert contains a list of events advertised by the router to the network
type Advert struct { type Advert struct {
// Id is the router Id // Id is the router Id
@ -63,22 +81,6 @@ type Advert struct {
Events []*table.Event Events []*table.Event
} }
// StatusCode defines router status
type StatusCode int
// Status is router status
type Status struct {
// Error is router error
Error error
// Code defines router status
Code StatusCode
}
var (
// DefaultRouter is default network router
DefaultRouter = NewRouter()
)
// NewRouter creates new Router and returns it // NewRouter creates new Router and returns it
func NewRouter(opts ...Option) Router { func NewRouter(opts ...Option) Router {
return newRouter(opts...) return newRouter(opts...)