diff --git a/network/router/default_router.go b/network/router/default_router.go index 7c682150..ae55ee9f 100644 --- a/network/router/default_router.go +++ b/network/router/default_router.go @@ -11,16 +11,15 @@ import ( // router provides default router implementation type router struct { - opts Options - exit chan struct{} - wg *sync.WaitGroup + opts Options + started bool + advertChan chan *Advertisement + exit chan struct{} + wg *sync.WaitGroup } // newRouter creates new router and returns it func newRouter(opts ...Option) Router { - // TODO: we need to add default GW entry here - // Should default GW be part of router options? - // get default options options := DefaultOptions() @@ -30,9 +29,11 @@ func newRouter(opts ...Option) Router { } return &router{ - opts: options, - exit: make(chan struct{}), - wg: &sync.WaitGroup{}, + opts: options, + started: false, + advertChan: make(chan *Advertisement), + exit: make(chan struct{}), + wg: &sync.WaitGroup{}, } } @@ -69,43 +70,106 @@ func (r *router) Network() string { return r.opts.Network } -// Advertise advertises the routes to the network. It is a blocking function. +// Advertise advertises the routes to the network. // It returns error if any of the launched goroutines fail with error. -func (r *router) Advertise() error { - // add local service routes into the routing table - if err := r.addServiceRoutes(r.opts.Registry, "local", DefaultLocalMetric); err != nil { - return fmt.Errorf("failed adding routes: %v", err) +func (r *router) Advertise() (<-chan *Advertisement, error) { + if !r.started { + // add local service routes into the routing table + if err := r.addServiceRoutes(r.opts.Registry, "local", DefaultLocalMetric); err != nil { + return nil, fmt.Errorf("failed adding routes: %v", err) + } + + // add default gateway into routing table + if r.opts.Gateway != "" { + // note, the only non-default value is the gateway + route := Route{ + Destination: "*", + Gateway: r.opts.Gateway, + Router: "*", + Network: "*", + Metric: DefaultLocalMetric, + } + if err := r.opts.Table.Add(route); err != nil { + return nil, fmt.Errorf("error adding default gateway route: %s", err) + } + } + + // routing table watcher that watches all routes being added + tableWatcher, err := r.opts.Table.Watch(WatchDestination("*")) + if err != nil { + return nil, fmt.Errorf("failed to create routing table watcher: %v", err) + } + + // registry watcher + regWatcher, err := r.opts.Registry.Watch() + if err != nil { + return nil, fmt.Errorf("failed to create registry watcher: %v", err) + } + + // error channel collecting goroutine errors + errChan := make(chan error, 2) + + r.wg.Add(1) + go func() { + defer r.wg.Done() + // watch local registry and register routes in routine table + errChan <- r.manageServiceRoutes(regWatcher, DefaultLocalMetric) + }() + + r.wg.Add(1) + go func() { + defer r.wg.Done() + // watch local registry and register routes in routine table + errChan <- r.watchTable(tableWatcher) + }() + + go func() { + select { + // wait for exit chan + case <-r.exit: + // wait for error + case <-errChan: + // TODO: we're missing the error context here + // might have to log it here as we don't send it down + } + + // close the advertise channel + close(r.advertChan) + }() + + // mark the router as started + r.started = true } - localWatcher, err := r.opts.Registry.Watch() - if err != nil { - return fmt.Errorf("failed to create registry watcher: %v", err) + return r.advertChan, nil +} + +// Update updates the routing table using the advertised values +func (r *router) Update(a *Advertisement) error { + // we extract the route from advertisement and update the routing table + route := Route{ + Destination: a.Event.Route.Destination, + Gateway: a.Event.Route.Gateway, + Router: a.Event.Route.Router, + Network: a.Event.Route.Network, + Metric: a.Event.Route.Metric, + Policy: AddIfNotExists, } - // error channel collecting goroutine errors - errChan := make(chan error, 1) - - r.wg.Add(1) - go func() { - defer r.wg.Done() - // watch local registry and register routes in routine table - errChan <- r.manageServiceRoutes(localWatcher, DefaultLocalMetric) - }() - - return <-errChan + return r.opts.Table.Update(route) } // addServiceRoutes adds all services in given registry to the routing table. // NOTE: this is a one-off operation done when bootstrapping the routing table // It returns error if either the services failed to be listed or -// if the routes could not be added to the routing table. +// if any of the the routes could not be added to the routing table. func (r *router) addServiceRoutes(reg registry.Registry, network string, metric int) error { services, err := reg.ListServices() if err != nil { return fmt.Errorf("failed to list services: %v", err) } - // add each service node as a separate route; + // add each service node as a separate route for _, service := range services { // get the service to retrieve all its info srvs, err := reg.GetService(service.Name) @@ -190,6 +254,40 @@ func (r *router) manageServiceRoutes(w registry.Watcher, metric int) error { return watchErr } +// watchTable watches routing table entries and either adds or deletes locally registered service to/from network registry +// It returns error if the locally registered services either fails to be added/deleted to/from network registry. +func (r *router) watchTable(w Watcher) error { + // wait in the background for the router to stop + // when the router stops, stop the watcher and exit + r.wg.Add(1) + go func() { + defer r.wg.Done() + <-r.exit + w.Stop() + }() + + var watchErr error + + for { + event, err := w.Next() + if err == ErrWatcherStopped { + break + } + + if err != nil { + watchErr = err + break + } + + r.advertChan <- &Advertisement{ + ID: r.ID(), + Event: event, + } + } + + return watchErr +} + // Stop stops the router func (r *router) Stop() error { // notify all goroutines to finish diff --git a/network/router/default_table.go b/network/router/default_table.go index 196ca4bf..d68398ad 100644 --- a/network/router/default_table.go +++ b/network/router/default_table.go @@ -129,6 +129,12 @@ func (t *table) Update(r Route) error { // check if the destAddr has ANY routes in the table if _, ok := t.m[destAddr]; !ok { + if r.Policy == AddIfNotExists { + t.m[destAddr] = make(map[uint64]Route) + t.m[destAddr][sum] = r + go t.sendEvent(&Event{Type: CreateEvent, Route: r}) + return nil + } return ErrRouteNotFound } @@ -279,7 +285,7 @@ func (t *table) String() string { // hash hashes the route using router gateway and network address func (t *table) hash(r Route) uint64 { t.h.Reset() - t.h.Write([]byte(r.Destination + r.Gateway + r.Router + r.Network)) + t.h.Write([]byte(r.Destination + r.Gateway + r.Network)) return t.h.Sum64() } diff --git a/network/router/options.go b/network/router/options.go index 0d525f76..2426fa32 100644 --- a/network/router/options.go +++ b/network/router/options.go @@ -8,6 +8,8 @@ import ( var ( // DefaultAddress is default router address DefaultAddress = ":9093" + // DefaultNetwork is default micro network + DefaultNetwork = "local" ) // Options are router options @@ -18,6 +20,8 @@ type Options struct { Address string // Network is micro network Network string + // Gateway is micro network gateway + Gateway string // Registry is the local registry Registry registry.Registry // Table is routing table @@ -45,6 +49,13 @@ func Network(n string) Option { } } +// Gateway sets network gateway +func Gateway(g string) Option { + return func(o *Options) { + o.Gateway = g + } +} + // RoutingTable sets the routing table func RoutingTable(t Table) Option { return func(o *Options) { @@ -61,12 +72,11 @@ func Registry(r registry.Registry) Option { // DefaultOptions returns router default options func DefaultOptions() Options { - // NOTE: by default both local and network registies use default registry i.e. mdns return Options{ ID: uuid.New().String(), Address: DefaultAddress, + Network: DefaultNetwork, Registry: registry.DefaultRegistry, Table: NewTable(), - Network: "local", } } diff --git a/network/router/route.go b/network/router/route.go index d52c2688..1f45b688 100644 --- a/network/router/route.go +++ b/network/router/route.go @@ -20,6 +20,8 @@ type RoutePolicy int const ( // OverrideIfExists overrides route if it already exists OverrideIfExists RoutePolicy = iota + // AddIfNotExist adds the route if it does not exist + AddIfNotExists // IgnoreIfExists instructs to not modify existing route IgnoreIfExists ) @@ -28,9 +30,11 @@ const ( func (p RoutePolicy) String() string { switch p { case OverrideIfExists: - return "OVERRIDE" + return "OVERRIDE_IF_EXISTS" + case AddIfNotExists: + return "ADD_IF_NOT_EXISTS" case IgnoreIfExists: - return "IGNORE" + return "IGNORE_IF_EXISTS" default: return "UNKNOWN" } diff --git a/network/router/router.go b/network/router/router.go index 366c2fbf..a8772768 100644 --- a/network/router/router.go +++ b/network/router/router.go @@ -1,6 +1,11 @@ // Package router provides a network routing control plane package router +var ( + // DefaultRouter is default network router + DefaultRouter = NewRouter() +) + // Router is an interface for a routing control plane type Router interface { // Init initializes the router with options @@ -15,21 +20,27 @@ type Router interface { Address() string // Network returns the network address of the router Network() string - // Advertise starts advertising the routes to the network - Advertise() error + // Advertise starts advertising routes to the network + Advertise() (<-chan *Advertisement, error) + // Update updates the routing table + Update(*Advertisement) error // Stop stops the router Stop() error // String returns debug info String() string } +// Advertisement is sent by the router to the network +type Advertisement struct { + // ID is the source router ID + ID string + // Event defines advertisement even + Event *Event +} + // Option used by the router type Option func(*Options) -var ( - DefaultRouter = NewRouter() -) - // NewRouter creates new Router and returns it func NewRouter(opts ...Option) Router { return newRouter(opts...) diff --git a/network/router/table_watcher.go b/network/router/table_watcher.go index 97d1f1b7..4e9d9a9e 100644 --- a/network/router/table_watcher.go +++ b/network/router/table_watcher.go @@ -81,7 +81,7 @@ type tableWatcher struct { } // Next returns the next noticed action taken on table -// TODO: this needs to be thought through properly; we only allow watching particular route destination +// TODO: this needs to be thought through properly; we only allow watching particular route destination for now func (w *tableWatcher) Next() (*Event, error) { for { select { @@ -93,6 +93,7 @@ func (w *tableWatcher) Next() (*Event, error) { if w.opts.Destination == res.Route.Destination { return res, nil } + continue } case <-w.done: return nil, ErrWatcherStopped