diff --git a/network/router/default_router.go b/network/router/default_router.go index 7c682150..5c7b73ae 100644 --- a/network/router/default_router.go +++ b/network/router/default_router.go @@ -4,6 +4,7 @@ import ( "fmt" "strings" "sync" + "time" "github.com/micro/go-micro/registry" "github.com/olekukonko/tablewriter" @@ -11,16 +12,16 @@ import ( // router provides default router implementation type router struct { - opts Options - exit chan struct{} - wg *sync.WaitGroup + opts Options + status Status + advertChan chan *Update + exit chan struct{} + wg *sync.WaitGroup + sync.RWMutex } // newRouter creates new router and returns it func newRouter(opts ...Option) Router { - // TODO: we need to add default GW entry here - // Should default GW be part of router options? - // get default options options := DefaultOptions() @@ -30,9 +31,11 @@ func newRouter(opts ...Option) Router { } return &router{ - opts: options, - exit: make(chan struct{}), - wg: &sync.WaitGroup{}, + opts: options, + status: Status{Error: nil, Code: Init}, + advertChan: make(chan *Update), + exit: make(chan struct{}), + wg: &sync.WaitGroup{}, } } @@ -69,43 +72,17 @@ func (r *router) Network() string { return r.opts.Network } -// Advertise advertises the routes to the network. It is a blocking function. -// It returns error if any of the launched goroutines fail with error. -func (r *router) Advertise() error { - // add local service routes into the routing table - if err := r.addServiceRoutes(r.opts.Registry, "local", DefaultLocalMetric); err != nil { - return fmt.Errorf("failed adding routes: %v", err) - } - - localWatcher, err := r.opts.Registry.Watch() - if err != nil { - return fmt.Errorf("failed to create registry watcher: %v", err) - } - - // error channel collecting goroutine errors - errChan := make(chan error, 1) - - r.wg.Add(1) - go func() { - defer r.wg.Done() - // watch local registry and register routes in routine table - errChan <- r.manageServiceRoutes(localWatcher, DefaultLocalMetric) - }() - - return <-errChan -} - // addServiceRoutes adds all services in given registry to the routing table. // NOTE: this is a one-off operation done when bootstrapping the routing table // It returns error if either the services failed to be listed or -// if the routes could not be added to the routing table. +// if any of the the routes could not be added to the routing table. func (r *router) addServiceRoutes(reg registry.Registry, network string, metric int) error { services, err := reg.ListServices() if err != nil { return fmt.Errorf("failed to list services: %v", err) } - // add each service node as a separate route; + // add each service node as a separate route for _, service := range services { // get the service to retrieve all its info srvs, err := reg.GetService(service.Name) @@ -121,13 +98,13 @@ func (r *router) addServiceRoutes(reg registry.Registry, network string, metric // range over the flat slice of nodes for _, node := range nodes { - gw := node.Address + gateway := node.Address if node.Port > 0 { - gw = fmt.Sprintf("%s:%d", node.Address, node.Port) + gateway = fmt.Sprintf("%s:%d", node.Address, node.Port) } route := Route{ Destination: service.Name, - Gateway: gw, + Gateway: gateway, Router: r.opts.Address, Network: r.opts.Network, Metric: metric, @@ -157,12 +134,10 @@ func (r *router) manageServiceRoutes(w registry.Watcher, metric int) error { for { res, err := w.Next() - if err == registry.ErrWatcherStopped { - break - } - if err != nil { - watchErr = err + if err != registry.ErrWatcherStopped { + watchErr = err + } break } @@ -190,10 +165,190 @@ func (r *router) manageServiceRoutes(w registry.Watcher, metric int) error { return watchErr } +// watchTable watches routing table entries and either adds or deletes locally registered service to/from network registry +// It returns error if the locally registered services either fails to be added/deleted to/from network registry. +func (r *router) watchTable(w Watcher) error { + // wait in the background for the router to stop + // when the router stops, stop the watcher and exit + r.wg.Add(1) + go func() { + defer r.wg.Done() + <-r.exit + w.Stop() + }() + + var watchErr error + +exit: + for { + event, err := w.Next() + if err != nil { + if err != ErrWatcherStopped { + watchErr = err + } + break + } + + u := &Update{ + ID: r.ID(), + Timestamp: time.Now(), + Event: event, + } + + select { + case <-r.exit: + break exit + case r.advertChan <- u: + } + } + + // close the advertisement channel + close(r.advertChan) + + return watchErr +} + +// watchError watches router errors +func (r *router) watchError(errChan <-chan error) { + defer r.wg.Done() + + var code StatusCode + var err error + + select { + case <-r.exit: + code = Stopped + case err = <-errChan: + code = Error + } + + r.Lock() + defer r.Unlock() + status := Status{ + Code: code, + Error: err, + } + r.status = status + + // stop the router if some error happened + if err != nil && code != Stopped { + close(r.exit) + } +} + +// Advertise advertises the routes to the network. +// It returns error if any of the launched goroutines fail with error. +func (r *router) Advertise() (<-chan *Update, error) { + r.Lock() + defer r.Unlock() + + if r.status.Code != Running { + // add local service routes into the routing table + if err := r.addServiceRoutes(r.opts.Registry, "local", DefaultLocalMetric); err != nil { + return nil, fmt.Errorf("failed adding routes: %v", err) + } + // add default gateway into routing table + if r.opts.Gateway != "" { + // note, the only non-default value is the gateway + route := Route{ + Destination: "*", + Gateway: r.opts.Gateway, + Router: "*", + Network: "*", + Metric: DefaultLocalMetric, + } + if err := r.opts.Table.Add(route); err != nil { + return nil, fmt.Errorf("error to add default gateway route: %s", err) + } + } + + // NOTE: we only need to recreate the exit/advertChan if the router errored or was stopped + if r.status.Code == Error || r.status.Code == Stopped { + r.exit = make(chan struct{}) + r.advertChan = make(chan *Update) + } + + // routing table watcher which watches all routes i.e. to every destination + tableWatcher, err := r.opts.Table.Watch(WatchDestination("*")) + if err != nil { + return nil, fmt.Errorf("failed to create routing table watcher: %v", err) + } + // registry watcher + regWatcher, err := r.opts.Registry.Watch() + if err != nil { + return nil, fmt.Errorf("failed to create registry watcher: %v", err) + } + + // error channel collecting goroutine errors + errChan := make(chan error, 2) + + r.wg.Add(1) + go func() { + defer r.wg.Done() + // watch local registry and register routes in routine table + errChan <- r.manageServiceRoutes(regWatcher, DefaultLocalMetric) + }() + + r.wg.Add(1) + go func() { + defer r.wg.Done() + // watch local registry and register routes in routing table + errChan <- r.watchTable(tableWatcher) + }() + + r.wg.Add(1) + go r.watchError(errChan) + + // mark router as running and set its Error to nil + status := Status{ + Code: Running, + Error: nil, + } + r.status = status + } + + return r.advertChan, nil +} + +// Update updates the routing table using the advertised values +func (r *router) Update(a *Update) error { + // we extract the route from advertisement and update the routing table + route := Route{ + Destination: a.Event.Route.Destination, + Gateway: a.Event.Route.Gateway, + Router: a.Event.Route.Router, + Network: a.Event.Route.Network, + Metric: a.Event.Route.Metric, + Policy: AddIfNotExists, + } + + return r.opts.Table.Update(route) +} + +// Status returns router status +func (r *router) Status() Status { + r.RLock() + defer r.RUnlock() + + // make a copy of the status + status := r.status + + return status +} + // Stop stops the router func (r *router) Stop() error { - // notify all goroutines to finish - close(r.exit) + r.RLock() + // only close the channel if the router is running + if r.status.Code == Running { + // notify all goroutines to finish + close(r.exit) + } + r.RUnlock() + + // drain the advertise channel + for range r.advertChan { + } // wait for all goroutines to finish r.wg.Wait() @@ -206,13 +361,14 @@ func (r *router) String() string { sb := &strings.Builder{} table := tablewriter.NewWriter(sb) - table.SetHeader([]string{"ID", "Address", "Network", "Table"}) + table.SetHeader([]string{"ID", "Address", "Network", "Table", "Status"}) data := []string{ r.opts.ID, r.opts.Address, r.opts.Network, fmt.Sprintf("%d", r.opts.Table.Size()), + r.status.Code.String(), } table.Append(data) diff --git a/network/router/default_table.go b/network/router/default_table.go index 196ca4bf..d68398ad 100644 --- a/network/router/default_table.go +++ b/network/router/default_table.go @@ -129,6 +129,12 @@ func (t *table) Update(r Route) error { // check if the destAddr has ANY routes in the table if _, ok := t.m[destAddr]; !ok { + if r.Policy == AddIfNotExists { + t.m[destAddr] = make(map[uint64]Route) + t.m[destAddr][sum] = r + go t.sendEvent(&Event{Type: CreateEvent, Route: r}) + return nil + } return ErrRouteNotFound } @@ -279,7 +285,7 @@ func (t *table) String() string { // hash hashes the route using router gateway and network address func (t *table) hash(r Route) uint64 { t.h.Reset() - t.h.Write([]byte(r.Destination + r.Gateway + r.Router + r.Network)) + t.h.Write([]byte(r.Destination + r.Gateway + r.Network)) return t.h.Sum64() } diff --git a/network/router/options.go b/network/router/options.go index 0d525f76..2426fa32 100644 --- a/network/router/options.go +++ b/network/router/options.go @@ -8,6 +8,8 @@ import ( var ( // DefaultAddress is default router address DefaultAddress = ":9093" + // DefaultNetwork is default micro network + DefaultNetwork = "local" ) // Options are router options @@ -18,6 +20,8 @@ type Options struct { Address string // Network is micro network Network string + // Gateway is micro network gateway + Gateway string // Registry is the local registry Registry registry.Registry // Table is routing table @@ -45,6 +49,13 @@ func Network(n string) Option { } } +// Gateway sets network gateway +func Gateway(g string) Option { + return func(o *Options) { + o.Gateway = g + } +} + // RoutingTable sets the routing table func RoutingTable(t Table) Option { return func(o *Options) { @@ -61,12 +72,11 @@ func Registry(r registry.Registry) Option { // DefaultOptions returns router default options func DefaultOptions() Options { - // NOTE: by default both local and network registies use default registry i.e. mdns return Options{ ID: uuid.New().String(), Address: DefaultAddress, + Network: DefaultNetwork, Registry: registry.DefaultRegistry, Table: NewTable(), - Network: "local", } } diff --git a/network/router/route.go b/network/router/route.go index d52c2688..1f45b688 100644 --- a/network/router/route.go +++ b/network/router/route.go @@ -20,6 +20,8 @@ type RoutePolicy int const ( // OverrideIfExists overrides route if it already exists OverrideIfExists RoutePolicy = iota + // AddIfNotExist adds the route if it does not exist + AddIfNotExists // IgnoreIfExists instructs to not modify existing route IgnoreIfExists ) @@ -28,9 +30,11 @@ const ( func (p RoutePolicy) String() string { switch p { case OverrideIfExists: - return "OVERRIDE" + return "OVERRIDE_IF_EXISTS" + case AddIfNotExists: + return "ADD_IF_NOT_EXISTS" case IgnoreIfExists: - return "IGNORE" + return "IGNORE_IF_EXISTS" default: return "UNKNOWN" } diff --git a/network/router/router.go b/network/router/router.go index 366c2fbf..e45c0baa 100644 --- a/network/router/router.go +++ b/network/router/router.go @@ -1,6 +1,13 @@ // Package router provides a network routing control plane package router +import "time" + +var ( + // DefaultRouter is default network router + DefaultRouter = NewRouter() +) + // Router is an interface for a routing control plane type Router interface { // Init initializes the router with options @@ -15,21 +22,69 @@ type Router interface { Address() string // Network returns the network address of the router Network() string - // Advertise starts advertising the routes to the network - Advertise() error + // Advertise starts advertising routes to the network + Advertise() (<-chan *Update, error) + // Update updates the routing table + Update(*Update) error + // Status returns router status + Status() Status // Stop stops the router Stop() error // String returns debug info String() string } +// Update is sent by the router to the network +type Update struct { + // ID is the router ID + ID string + // Timestamp marks the time when update is sent + Timestamp time.Time + // Event defines advertisement even + Event *Event +} + +// StatusCode defines router status +type StatusCode int + +// Status is router status +type Status struct { + // Error is router error + Error error + // Code defines router status + Code StatusCode +} + +const ( + // Init means the rotuer has just been initialized + Init StatusCode = iota + // Running means the router is running + Running + // Error means the router has crashed with error + Error + // Stopped means the router has stopped + Stopped +) + +// String returns human readable status code +func (sc StatusCode) String() string { + switch sc { + case Init: + return "INITIALIZED" + case Running: + return "RUNNING" + case Error: + return "ERROR" + case Stopped: + return "STOPPED" + default: + return "UNKNOWN" + } +} + // Option used by the router type Option func(*Options) -var ( - DefaultRouter = NewRouter() -) - // NewRouter creates new Router and returns it func NewRouter(opts ...Option) Router { return newRouter(opts...) diff --git a/network/router/table_watcher.go b/network/router/table_watcher.go index 97d1f1b7..91411247 100644 --- a/network/router/table_watcher.go +++ b/network/router/table_watcher.go @@ -9,7 +9,7 @@ import ( var ( // ErrWatcherStopped is returned when routing table watcher has been stopped - ErrWatcherStopped = errors.New("routing table watcher stopped") + ErrWatcherStopped = errors.New("watcher stopped") ) // EventType defines routing table event @@ -81,7 +81,7 @@ type tableWatcher struct { } // Next returns the next noticed action taken on table -// TODO: this needs to be thought through properly; we only allow watching particular route destination +// TODO: this needs to be thought through properly; we only allow watching particular route destination for now func (w *tableWatcher) Next() (*Event, error) { for { select { @@ -93,6 +93,7 @@ func (w *tableWatcher) Next() (*Event, error) { if w.opts.Destination == res.Route.Destination { return res, nil } + continue } case <-w.done: return nil, ErrWatcherStopped diff --git a/registry/mdns_watcher.go b/registry/mdns_watcher.go index 7ccb6e80..bbcf90ea 100644 --- a/registry/mdns_watcher.go +++ b/registry/mdns_watcher.go @@ -1,7 +1,6 @@ package registry import ( - "errors" "strings" "github.com/micro/mdns" @@ -63,7 +62,7 @@ func (m *mdnsWatcher) Next() (*Result, error) { Service: service, }, nil case <-m.exit: - return nil, errors.New("watcher stopped") + return nil, ErrWatcherStopped } } }