Added Router Status which allows to track router status
This commit is contained in:
parent
8ad2f73ad6
commit
32300eadc1
@ -13,10 +13,11 @@ import (
|
|||||||
// router provides default router implementation
|
// router provides default router implementation
|
||||||
type router struct {
|
type router struct {
|
||||||
opts Options
|
opts Options
|
||||||
running bool
|
status Status
|
||||||
advertChan chan *Update
|
advertChan chan *Update
|
||||||
exit chan struct{}
|
exit chan struct{}
|
||||||
wg *sync.WaitGroup
|
wg *sync.WaitGroup
|
||||||
|
sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// newRouter creates new router and returns it
|
// newRouter creates new router and returns it
|
||||||
@ -31,7 +32,7 @@ func newRouter(opts ...Option) Router {
|
|||||||
|
|
||||||
return &router{
|
return &router{
|
||||||
opts: options,
|
opts: options,
|
||||||
running: false,
|
status: Status{Error: nil, Code: Stopped},
|
||||||
advertChan: make(chan *Update),
|
advertChan: make(chan *Update),
|
||||||
exit: make(chan struct{}),
|
exit: make(chan struct{}),
|
||||||
wg: &sync.WaitGroup{},
|
wg: &sync.WaitGroup{},
|
||||||
@ -74,7 +75,10 @@ func (r *router) Network() string {
|
|||||||
// Advertise advertises the routes to the network.
|
// Advertise advertises the routes to the network.
|
||||||
// It returns error if any of the launched goroutines fail with error.
|
// It returns error if any of the launched goroutines fail with error.
|
||||||
func (r *router) Advertise() (<-chan *Update, error) {
|
func (r *router) Advertise() (<-chan *Update, error) {
|
||||||
if !r.running {
|
r.Lock()
|
||||||
|
defer r.Unlock()
|
||||||
|
|
||||||
|
if r.status.Code != Running {
|
||||||
// add local service routes into the routing table
|
// add local service routes into the routing table
|
||||||
if err := r.addServiceRoutes(r.opts.Registry, "local", DefaultLocalMetric); err != nil {
|
if err := r.addServiceRoutes(r.opts.Registry, "local", DefaultLocalMetric); err != nil {
|
||||||
return nil, fmt.Errorf("failed adding routes: %v", err)
|
return nil, fmt.Errorf("failed adding routes: %v", err)
|
||||||
@ -91,11 +95,11 @@ func (r *router) Advertise() (<-chan *Update, error) {
|
|||||||
Metric: DefaultLocalMetric,
|
Metric: DefaultLocalMetric,
|
||||||
}
|
}
|
||||||
if err := r.opts.Table.Add(route); err != nil {
|
if err := r.opts.Table.Add(route); err != nil {
|
||||||
return nil, fmt.Errorf("error adding default gateway route: %s", err)
|
return nil, fmt.Errorf("error to add default gateway route: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// routing table watcher that watches all routes being added
|
// routing table watcher which watches all routes i.e. to every destination
|
||||||
tableWatcher, err := r.opts.Table.Watch(WatchDestination("*"))
|
tableWatcher, err := r.opts.Table.Watch(WatchDestination("*"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create routing table watcher: %v", err)
|
return nil, fmt.Errorf("failed to create routing table watcher: %v", err)
|
||||||
@ -120,28 +124,27 @@ func (r *router) Advertise() (<-chan *Update, error) {
|
|||||||
r.wg.Add(1)
|
r.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer r.wg.Done()
|
defer r.wg.Done()
|
||||||
// watch local registry and register routes in routine table
|
// watch local registry and register routes in routing table
|
||||||
errChan <- r.watchTable(tableWatcher)
|
errChan <- r.watchTable(tableWatcher)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
r.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
defer r.wg.Done()
|
||||||
select {
|
select {
|
||||||
// wait for exit chan
|
// wait for exit chan
|
||||||
case <-r.exit:
|
case <-r.exit:
|
||||||
// wait for error
|
r.status.Code = Stopped
|
||||||
case <-errChan:
|
case err := <-errChan:
|
||||||
// TODO: we're missing the error context here
|
r.status.Code = Error
|
||||||
// might have to log it here as we don't send it down
|
r.status.Error = err
|
||||||
}
|
}
|
||||||
|
|
||||||
// close the advertise channel
|
// close the advertise channel
|
||||||
close(r.advertChan)
|
close(r.advertChan)
|
||||||
// mark the router as stopped
|
|
||||||
r.running = false
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// mark the router as running
|
// mark the router as running
|
||||||
r.running = true
|
r.status.Code = Running
|
||||||
}
|
}
|
||||||
|
|
||||||
return r.advertChan, nil
|
return r.advertChan, nil
|
||||||
@ -188,13 +191,13 @@ func (r *router) addServiceRoutes(reg registry.Registry, network string, metric
|
|||||||
|
|
||||||
// range over the flat slice of nodes
|
// range over the flat slice of nodes
|
||||||
for _, node := range nodes {
|
for _, node := range nodes {
|
||||||
gw := node.Address
|
gateway := node.Address
|
||||||
if node.Port > 0 {
|
if node.Port > 0 {
|
||||||
gw = fmt.Sprintf("%s:%d", node.Address, node.Port)
|
gateway = fmt.Sprintf("%s:%d", node.Address, node.Port)
|
||||||
}
|
}
|
||||||
route := Route{
|
route := Route{
|
||||||
Destination: service.Name,
|
Destination: service.Name,
|
||||||
Gateway: gw,
|
Gateway: gateway,
|
||||||
Router: r.opts.Address,
|
Router: r.opts.Address,
|
||||||
Network: r.opts.Network,
|
Network: r.opts.Network,
|
||||||
Metric: metric,
|
Metric: metric,
|
||||||
@ -298,6 +301,14 @@ func (r *router) watchTable(w Watcher) error {
|
|||||||
return watchErr
|
return watchErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Status returns router status
|
||||||
|
func (r *router) Status() Status {
|
||||||
|
r.RLock()
|
||||||
|
defer r.RUnlock()
|
||||||
|
|
||||||
|
return r.status
|
||||||
|
}
|
||||||
|
|
||||||
// Stop stops the router
|
// Stop stops the router
|
||||||
func (r *router) Stop() error {
|
func (r *router) Stop() error {
|
||||||
// notify all goroutines to finish
|
// notify all goroutines to finish
|
||||||
|
@ -26,6 +26,8 @@ type Router interface {
|
|||||||
Advertise() (<-chan *Update, error)
|
Advertise() (<-chan *Update, error)
|
||||||
// Update updates the routing table
|
// Update updates the routing table
|
||||||
Update(*Update) error
|
Update(*Update) error
|
||||||
|
// Status returns router status
|
||||||
|
Status() Status
|
||||||
// Stop stops the router
|
// Stop stops the router
|
||||||
Stop() error
|
Stop() error
|
||||||
// String returns debug info
|
// String returns debug info
|
||||||
@ -34,7 +36,7 @@ type Router interface {
|
|||||||
|
|
||||||
// Update is sent by the router to the network
|
// Update is sent by the router to the network
|
||||||
type Update struct {
|
type Update struct {
|
||||||
// ID is the source router ID
|
// ID is the router ID
|
||||||
ID string
|
ID string
|
||||||
// Timestamp marks the time when update is sent
|
// Timestamp marks the time when update is sent
|
||||||
Timestamp time.Time
|
Timestamp time.Time
|
||||||
@ -42,6 +44,40 @@ type Update struct {
|
|||||||
Event *Event
|
Event *Event
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StatusCode defines router status
|
||||||
|
type StatusCode int
|
||||||
|
|
||||||
|
// Status is router status
|
||||||
|
type Status struct {
|
||||||
|
// Error is router error
|
||||||
|
Error error
|
||||||
|
// Code defines router status
|
||||||
|
Code StatusCode
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
// Running means the rotuer is running
|
||||||
|
Running StatusCode = iota
|
||||||
|
// Error means the router has crashed with error
|
||||||
|
Error
|
||||||
|
// Stopped means the router has stopped
|
||||||
|
Stopped
|
||||||
|
)
|
||||||
|
|
||||||
|
// String returns human readable status code
|
||||||
|
func (sc StatusCode) String() string {
|
||||||
|
switch sc {
|
||||||
|
case Running:
|
||||||
|
return "RUNNING"
|
||||||
|
case Error:
|
||||||
|
return "ERROR"
|
||||||
|
case Stopped:
|
||||||
|
return "STOPPED"
|
||||||
|
default:
|
||||||
|
return "UNKNOWN"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Option used by the router
|
// Option used by the router
|
||||||
type Option func(*Options)
|
type Option func(*Options)
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user