Changed router interface. Added table watcher. Advertise routes
* Changed router interface to return Advertisement channel * Added default gateway route to the routing table if supplied * Watch table for updates and advertise to the network * We hash the routes on 3-tuple (Destination, Gateway, Network)
This commit is contained in:
		| @@ -11,16 +11,15 @@ import ( | ||||
|  | ||||
| // router provides default router implementation | ||||
| type router struct { | ||||
| 	opts Options | ||||
| 	exit chan struct{} | ||||
| 	wg   *sync.WaitGroup | ||||
| 	opts       Options | ||||
| 	started    bool | ||||
| 	advertChan chan *Advertisement | ||||
| 	exit       chan struct{} | ||||
| 	wg         *sync.WaitGroup | ||||
| } | ||||
|  | ||||
| // newRouter creates new router and returns it | ||||
| func newRouter(opts ...Option) Router { | ||||
| 	// TODO: we need to add default GW entry here | ||||
| 	// Should default GW be part of router options? | ||||
|  | ||||
| 	// get default options | ||||
| 	options := DefaultOptions() | ||||
|  | ||||
| @@ -30,9 +29,11 @@ func newRouter(opts ...Option) Router { | ||||
| 	} | ||||
|  | ||||
| 	return &router{ | ||||
| 		opts: options, | ||||
| 		exit: make(chan struct{}), | ||||
| 		wg:   &sync.WaitGroup{}, | ||||
| 		opts:       options, | ||||
| 		started:    false, | ||||
| 		advertChan: make(chan *Advertisement), | ||||
| 		exit:       make(chan struct{}), | ||||
| 		wg:         &sync.WaitGroup{}, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @@ -69,43 +70,106 @@ func (r *router) Network() string { | ||||
| 	return r.opts.Network | ||||
| } | ||||
|  | ||||
| // Advertise advertises the routes to the network. It is a blocking function. | ||||
| // Advertise advertises the routes to the network. | ||||
| // It returns error if any of the launched goroutines fail with error. | ||||
| func (r *router) Advertise() error { | ||||
| 	// add local service routes into the routing table | ||||
| 	if err := r.addServiceRoutes(r.opts.Registry, "local", DefaultLocalMetric); err != nil { | ||||
| 		return fmt.Errorf("failed adding routes: %v", err) | ||||
| func (r *router) Advertise() (<-chan *Advertisement, error) { | ||||
| 	if !r.started { | ||||
| 		// add local service routes into the routing table | ||||
| 		if err := r.addServiceRoutes(r.opts.Registry, "local", DefaultLocalMetric); err != nil { | ||||
| 			return nil, fmt.Errorf("failed adding routes: %v", err) | ||||
| 		} | ||||
|  | ||||
| 		// add default gateway into routing table | ||||
| 		if r.opts.Gateway != "" { | ||||
| 			// note, the only non-default value is the gateway | ||||
| 			route := Route{ | ||||
| 				Destination: "*", | ||||
| 				Gateway:     r.opts.Gateway, | ||||
| 				Router:      "*", | ||||
| 				Network:     "*", | ||||
| 				Metric:      DefaultLocalMetric, | ||||
| 			} | ||||
| 			if err := r.opts.Table.Add(route); err != nil { | ||||
| 				return nil, fmt.Errorf("error adding default gateway route: %s", err) | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		// routing table watcher that watches all routes being added | ||||
| 		tableWatcher, err := r.opts.Table.Watch(WatchDestination("*")) | ||||
| 		if err != nil { | ||||
| 			return nil, fmt.Errorf("failed to create routing table watcher: %v", err) | ||||
| 		} | ||||
|  | ||||
| 		// registry watcher | ||||
| 		regWatcher, err := r.opts.Registry.Watch() | ||||
| 		if err != nil { | ||||
| 			return nil, fmt.Errorf("failed to create registry watcher: %v", err) | ||||
| 		} | ||||
|  | ||||
| 		// error channel collecting goroutine errors | ||||
| 		errChan := make(chan error, 2) | ||||
|  | ||||
| 		r.wg.Add(1) | ||||
| 		go func() { | ||||
| 			defer r.wg.Done() | ||||
| 			// watch local registry and register routes in routine table | ||||
| 			errChan <- r.manageServiceRoutes(regWatcher, DefaultLocalMetric) | ||||
| 		}() | ||||
|  | ||||
| 		r.wg.Add(1) | ||||
| 		go func() { | ||||
| 			defer r.wg.Done() | ||||
| 			// watch local registry and register routes in routine table | ||||
| 			errChan <- r.watchTable(tableWatcher) | ||||
| 		}() | ||||
|  | ||||
| 		go func() { | ||||
| 			select { | ||||
| 			// wait for exit chan | ||||
| 			case <-r.exit: | ||||
| 			// wait for error | ||||
| 			case <-errChan: | ||||
| 				// TODO: we're missing the error context here | ||||
| 				// might have to log it here as we don't send it down | ||||
| 			} | ||||
|  | ||||
| 			// close the advertise channel | ||||
| 			close(r.advertChan) | ||||
| 		}() | ||||
|  | ||||
| 		// mark the router as started | ||||
| 		r.started = true | ||||
| 	} | ||||
|  | ||||
| 	localWatcher, err := r.opts.Registry.Watch() | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("failed to create registry watcher: %v", err) | ||||
| 	return r.advertChan, nil | ||||
| } | ||||
|  | ||||
| // Update updates the routing table using the advertised values | ||||
| func (r *router) Update(a *Advertisement) error { | ||||
| 	// we extract the route from advertisement and update the routing table | ||||
| 	route := Route{ | ||||
| 		Destination: a.Event.Route.Destination, | ||||
| 		Gateway:     a.Event.Route.Gateway, | ||||
| 		Router:      a.Event.Route.Router, | ||||
| 		Network:     a.Event.Route.Network, | ||||
| 		Metric:      a.Event.Route.Metric, | ||||
| 		Policy:      AddIfNotExists, | ||||
| 	} | ||||
|  | ||||
| 	// error channel collecting goroutine errors | ||||
| 	errChan := make(chan error, 1) | ||||
|  | ||||
| 	r.wg.Add(1) | ||||
| 	go func() { | ||||
| 		defer r.wg.Done() | ||||
| 		// watch local registry and register routes in routine table | ||||
| 		errChan <- r.manageServiceRoutes(localWatcher, DefaultLocalMetric) | ||||
| 	}() | ||||
|  | ||||
| 	return <-errChan | ||||
| 	return r.opts.Table.Update(route) | ||||
| } | ||||
|  | ||||
| // addServiceRoutes adds all services in given registry to the routing table. | ||||
| // NOTE: this is a one-off operation done when bootstrapping the routing table | ||||
| // It returns error if either the services failed to be listed or | ||||
| // if the routes could not be added to the routing table. | ||||
| // if any of the the routes could not be added to the routing table. | ||||
| func (r *router) addServiceRoutes(reg registry.Registry, network string, metric int) error { | ||||
| 	services, err := reg.ListServices() | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("failed to list services: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	// add each service node as a separate route; | ||||
| 	// add each service node as a separate route | ||||
| 	for _, service := range services { | ||||
| 		// get the service to retrieve all its info | ||||
| 		srvs, err := reg.GetService(service.Name) | ||||
| @@ -190,6 +254,40 @@ func (r *router) manageServiceRoutes(w registry.Watcher, metric int) error { | ||||
| 	return watchErr | ||||
| } | ||||
|  | ||||
| // watchTable watches routing table entries and either adds or deletes locally registered service to/from network registry | ||||
| // It returns error if the locally registered services either fails to be added/deleted to/from network registry. | ||||
| func (r *router) watchTable(w Watcher) error { | ||||
| 	// wait in the background for the router to stop | ||||
| 	// when the router stops, stop the watcher and exit | ||||
| 	r.wg.Add(1) | ||||
| 	go func() { | ||||
| 		defer r.wg.Done() | ||||
| 		<-r.exit | ||||
| 		w.Stop() | ||||
| 	}() | ||||
|  | ||||
| 	var watchErr error | ||||
|  | ||||
| 	for { | ||||
| 		event, err := w.Next() | ||||
| 		if err == ErrWatcherStopped { | ||||
| 			break | ||||
| 		} | ||||
|  | ||||
| 		if err != nil { | ||||
| 			watchErr = err | ||||
| 			break | ||||
| 		} | ||||
|  | ||||
| 		r.advertChan <- &Advertisement{ | ||||
| 			ID:    r.ID(), | ||||
| 			Event: event, | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return watchErr | ||||
| } | ||||
|  | ||||
| // Stop stops the router | ||||
| func (r *router) Stop() error { | ||||
| 	// notify all goroutines to finish | ||||
|   | ||||
		Reference in New Issue
	
	Block a user