Merge pull request #553 from milosgajdos83/router-network
Changed router interface. Added table watcher. Advertise routes
This commit is contained in:
		| @@ -4,6 +4,7 @@ import ( | ||||
| 	"fmt" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/micro/go-micro/registry" | ||||
| 	"github.com/olekukonko/tablewriter" | ||||
| @@ -12,15 +13,15 @@ import ( | ||||
| // router provides default router implementation | ||||
| type router struct { | ||||
| 	opts       Options | ||||
| 	status     Status | ||||
| 	advertChan chan *Update | ||||
| 	exit       chan struct{} | ||||
| 	wg         *sync.WaitGroup | ||||
| 	sync.RWMutex | ||||
| } | ||||
|  | ||||
| // newRouter creates new router and returns it | ||||
| func newRouter(opts ...Option) Router { | ||||
| 	// TODO: we need to add default GW entry here | ||||
| 	// Should default GW be part of router options? | ||||
|  | ||||
| 	// get default options | ||||
| 	options := DefaultOptions() | ||||
|  | ||||
| @@ -31,6 +32,8 @@ func newRouter(opts ...Option) Router { | ||||
|  | ||||
| 	return &router{ | ||||
| 		opts:       options, | ||||
| 		status:     Status{Error: nil, Code: Init}, | ||||
| 		advertChan: make(chan *Update), | ||||
| 		exit:       make(chan struct{}), | ||||
| 		wg:         &sync.WaitGroup{}, | ||||
| 	} | ||||
| @@ -69,43 +72,17 @@ func (r *router) Network() string { | ||||
| 	return r.opts.Network | ||||
| } | ||||
|  | ||||
| // Advertise advertises the routes to the network. It is a blocking function. | ||||
| // It returns error if any of the launched goroutines fail with error. | ||||
| func (r *router) Advertise() error { | ||||
| 	// add local service routes into the routing table | ||||
| 	if err := r.addServiceRoutes(r.opts.Registry, "local", DefaultLocalMetric); err != nil { | ||||
| 		return fmt.Errorf("failed adding routes: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	localWatcher, err := r.opts.Registry.Watch() | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("failed to create registry watcher: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	// error channel collecting goroutine errors | ||||
| 	errChan := make(chan error, 1) | ||||
|  | ||||
| 	r.wg.Add(1) | ||||
| 	go func() { | ||||
| 		defer r.wg.Done() | ||||
| 		// watch local registry and register routes in routine table | ||||
| 		errChan <- r.manageServiceRoutes(localWatcher, DefaultLocalMetric) | ||||
| 	}() | ||||
|  | ||||
| 	return <-errChan | ||||
| } | ||||
|  | ||||
| // addServiceRoutes adds all services in given registry to the routing table. | ||||
| // NOTE: this is a one-off operation done when bootstrapping the routing table | ||||
| // It returns error if either the services failed to be listed or | ||||
| // if the routes could not be added to the routing table. | ||||
| // if any of the the routes could not be added to the routing table. | ||||
| func (r *router) addServiceRoutes(reg registry.Registry, network string, metric int) error { | ||||
| 	services, err := reg.ListServices() | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("failed to list services: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	// add each service node as a separate route; | ||||
| 	// add each service node as a separate route | ||||
| 	for _, service := range services { | ||||
| 		// get the service to retrieve all its info | ||||
| 		srvs, err := reg.GetService(service.Name) | ||||
| @@ -121,13 +98,13 @@ func (r *router) addServiceRoutes(reg registry.Registry, network string, metric | ||||
|  | ||||
| 		// range over the flat slice of nodes | ||||
| 		for _, node := range nodes { | ||||
| 			gw := node.Address | ||||
| 			gateway := node.Address | ||||
| 			if node.Port > 0 { | ||||
| 				gw = fmt.Sprintf("%s:%d", node.Address, node.Port) | ||||
| 				gateway = fmt.Sprintf("%s:%d", node.Address, node.Port) | ||||
| 			} | ||||
| 			route := Route{ | ||||
| 				Destination: service.Name, | ||||
| 				Gateway:     gw, | ||||
| 				Gateway:     gateway, | ||||
| 				Router:      r.opts.Address, | ||||
| 				Network:     r.opts.Network, | ||||
| 				Metric:      metric, | ||||
| @@ -157,12 +134,10 @@ func (r *router) manageServiceRoutes(w registry.Watcher, metric int) error { | ||||
|  | ||||
| 	for { | ||||
| 		res, err := w.Next() | ||||
| 		if err == registry.ErrWatcherStopped { | ||||
| 			break | ||||
| 		} | ||||
|  | ||||
| 		if err != nil { | ||||
| 			if err != registry.ErrWatcherStopped { | ||||
| 				watchErr = err | ||||
| 			} | ||||
| 			break | ||||
| 		} | ||||
|  | ||||
| @@ -190,10 +165,190 @@ func (r *router) manageServiceRoutes(w registry.Watcher, metric int) error { | ||||
| 	return watchErr | ||||
| } | ||||
|  | ||||
| // watchTable watches routing table entries and either adds or deletes locally registered service to/from network registry | ||||
| // It returns error if the locally registered services either fails to be added/deleted to/from network registry. | ||||
| func (r *router) watchTable(w Watcher) error { | ||||
| 	// wait in the background for the router to stop | ||||
| 	// when the router stops, stop the watcher and exit | ||||
| 	r.wg.Add(1) | ||||
| 	go func() { | ||||
| 		defer r.wg.Done() | ||||
| 		<-r.exit | ||||
| 		w.Stop() | ||||
| 	}() | ||||
|  | ||||
| 	var watchErr error | ||||
|  | ||||
| exit: | ||||
| 	for { | ||||
| 		event, err := w.Next() | ||||
| 		if err != nil { | ||||
| 			if err != ErrWatcherStopped { | ||||
| 				watchErr = err | ||||
| 			} | ||||
| 			break | ||||
| 		} | ||||
|  | ||||
| 		u := &Update{ | ||||
| 			ID:        r.ID(), | ||||
| 			Timestamp: time.Now(), | ||||
| 			Event:     event, | ||||
| 		} | ||||
|  | ||||
| 		select { | ||||
| 		case <-r.exit: | ||||
| 			break exit | ||||
| 		case r.advertChan <- u: | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// close the advertisement channel | ||||
| 	close(r.advertChan) | ||||
|  | ||||
| 	return watchErr | ||||
| } | ||||
|  | ||||
| // watchError watches router errors | ||||
| func (r *router) watchError(errChan <-chan error) { | ||||
| 	defer r.wg.Done() | ||||
|  | ||||
| 	var code StatusCode | ||||
| 	var err error | ||||
|  | ||||
| 	select { | ||||
| 	case <-r.exit: | ||||
| 		code = Stopped | ||||
| 	case err = <-errChan: | ||||
| 		code = Error | ||||
| 	} | ||||
|  | ||||
| 	r.Lock() | ||||
| 	defer r.Unlock() | ||||
| 	status := Status{ | ||||
| 		Code:  code, | ||||
| 		Error: err, | ||||
| 	} | ||||
| 	r.status = status | ||||
|  | ||||
| 	// stop the router if some error happened | ||||
| 	if err != nil && code != Stopped { | ||||
| 		close(r.exit) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Advertise advertises the routes to the network. | ||||
| // It returns error if any of the launched goroutines fail with error. | ||||
| func (r *router) Advertise() (<-chan *Update, error) { | ||||
| 	r.Lock() | ||||
| 	defer r.Unlock() | ||||
|  | ||||
| 	if r.status.Code != Running { | ||||
| 		// add local service routes into the routing table | ||||
| 		if err := r.addServiceRoutes(r.opts.Registry, "local", DefaultLocalMetric); err != nil { | ||||
| 			return nil, fmt.Errorf("failed adding routes: %v", err) | ||||
| 		} | ||||
| 		// add default gateway into routing table | ||||
| 		if r.opts.Gateway != "" { | ||||
| 			// note, the only non-default value is the gateway | ||||
| 			route := Route{ | ||||
| 				Destination: "*", | ||||
| 				Gateway:     r.opts.Gateway, | ||||
| 				Router:      "*", | ||||
| 				Network:     "*", | ||||
| 				Metric:      DefaultLocalMetric, | ||||
| 			} | ||||
| 			if err := r.opts.Table.Add(route); err != nil { | ||||
| 				return nil, fmt.Errorf("error to add default gateway route: %s", err) | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		// NOTE: we only need to recreate the exit/advertChan if the router errored or was stopped | ||||
| 		if r.status.Code == Error || r.status.Code == Stopped { | ||||
| 			r.exit = make(chan struct{}) | ||||
| 			r.advertChan = make(chan *Update) | ||||
| 		} | ||||
|  | ||||
| 		// routing table watcher which watches all routes i.e. to every destination | ||||
| 		tableWatcher, err := r.opts.Table.Watch(WatchDestination("*")) | ||||
| 		if err != nil { | ||||
| 			return nil, fmt.Errorf("failed to create routing table watcher: %v", err) | ||||
| 		} | ||||
| 		// registry watcher | ||||
| 		regWatcher, err := r.opts.Registry.Watch() | ||||
| 		if err != nil { | ||||
| 			return nil, fmt.Errorf("failed to create registry watcher: %v", err) | ||||
| 		} | ||||
|  | ||||
| 		// error channel collecting goroutine errors | ||||
| 		errChan := make(chan error, 2) | ||||
|  | ||||
| 		r.wg.Add(1) | ||||
| 		go func() { | ||||
| 			defer r.wg.Done() | ||||
| 			// watch local registry and register routes in routine table | ||||
| 			errChan <- r.manageServiceRoutes(regWatcher, DefaultLocalMetric) | ||||
| 		}() | ||||
|  | ||||
| 		r.wg.Add(1) | ||||
| 		go func() { | ||||
| 			defer r.wg.Done() | ||||
| 			// watch local registry and register routes in routing table | ||||
| 			errChan <- r.watchTable(tableWatcher) | ||||
| 		}() | ||||
|  | ||||
| 		r.wg.Add(1) | ||||
| 		go r.watchError(errChan) | ||||
|  | ||||
| 		// mark router as running and set its Error to nil | ||||
| 		status := Status{ | ||||
| 			Code:  Running, | ||||
| 			Error: nil, | ||||
| 		} | ||||
| 		r.status = status | ||||
| 	} | ||||
|  | ||||
| 	return r.advertChan, nil | ||||
| } | ||||
|  | ||||
| // Update updates the routing table using the advertised values | ||||
| func (r *router) Update(a *Update) error { | ||||
| 	// we extract the route from advertisement and update the routing table | ||||
| 	route := Route{ | ||||
| 		Destination: a.Event.Route.Destination, | ||||
| 		Gateway:     a.Event.Route.Gateway, | ||||
| 		Router:      a.Event.Route.Router, | ||||
| 		Network:     a.Event.Route.Network, | ||||
| 		Metric:      a.Event.Route.Metric, | ||||
| 		Policy:      AddIfNotExists, | ||||
| 	} | ||||
|  | ||||
| 	return r.opts.Table.Update(route) | ||||
| } | ||||
|  | ||||
| // Status returns router status | ||||
| func (r *router) Status() Status { | ||||
| 	r.RLock() | ||||
| 	defer r.RUnlock() | ||||
|  | ||||
| 	// make a copy of the status | ||||
| 	status := r.status | ||||
|  | ||||
| 	return status | ||||
| } | ||||
|  | ||||
| // Stop stops the router | ||||
| func (r *router) Stop() error { | ||||
| 	r.RLock() | ||||
| 	// only close the channel if the router is running | ||||
| 	if r.status.Code == Running { | ||||
| 		// notify all goroutines to finish | ||||
| 		close(r.exit) | ||||
| 	} | ||||
| 	r.RUnlock() | ||||
|  | ||||
| 	// drain the advertise channel | ||||
| 	for range r.advertChan { | ||||
| 	} | ||||
|  | ||||
| 	// wait for all goroutines to finish | ||||
| 	r.wg.Wait() | ||||
| @@ -206,13 +361,14 @@ func (r *router) String() string { | ||||
| 	sb := &strings.Builder{} | ||||
|  | ||||
| 	table := tablewriter.NewWriter(sb) | ||||
| 	table.SetHeader([]string{"ID", "Address", "Network", "Table"}) | ||||
| 	table.SetHeader([]string{"ID", "Address", "Network", "Table", "Status"}) | ||||
|  | ||||
| 	data := []string{ | ||||
| 		r.opts.ID, | ||||
| 		r.opts.Address, | ||||
| 		r.opts.Network, | ||||
| 		fmt.Sprintf("%d", r.opts.Table.Size()), | ||||
| 		r.status.Code.String(), | ||||
| 	} | ||||
| 	table.Append(data) | ||||
|  | ||||
|   | ||||
| @@ -129,6 +129,12 @@ func (t *table) Update(r Route) error { | ||||
|  | ||||
| 	// check if the destAddr has ANY routes in the table | ||||
| 	if _, ok := t.m[destAddr]; !ok { | ||||
| 		if r.Policy == AddIfNotExists { | ||||
| 			t.m[destAddr] = make(map[uint64]Route) | ||||
| 			t.m[destAddr][sum] = r | ||||
| 			go t.sendEvent(&Event{Type: CreateEvent, Route: r}) | ||||
| 			return nil | ||||
| 		} | ||||
| 		return ErrRouteNotFound | ||||
| 	} | ||||
|  | ||||
| @@ -279,7 +285,7 @@ func (t *table) String() string { | ||||
| // hash hashes the route using router gateway and network address | ||||
| func (t *table) hash(r Route) uint64 { | ||||
| 	t.h.Reset() | ||||
| 	t.h.Write([]byte(r.Destination + r.Gateway + r.Router + r.Network)) | ||||
| 	t.h.Write([]byte(r.Destination + r.Gateway + r.Network)) | ||||
|  | ||||
| 	return t.h.Sum64() | ||||
| } | ||||
|   | ||||
| @@ -8,6 +8,8 @@ import ( | ||||
| var ( | ||||
| 	// DefaultAddress is default router address | ||||
| 	DefaultAddress = ":9093" | ||||
| 	// DefaultNetwork is default micro network | ||||
| 	DefaultNetwork = "local" | ||||
| ) | ||||
|  | ||||
| // Options are router options | ||||
| @@ -18,6 +20,8 @@ type Options struct { | ||||
| 	Address string | ||||
| 	// Network is micro network | ||||
| 	Network string | ||||
| 	// Gateway is micro network gateway | ||||
| 	Gateway string | ||||
| 	// Registry is the local registry | ||||
| 	Registry registry.Registry | ||||
| 	// Table is routing table | ||||
| @@ -45,6 +49,13 @@ func Network(n string) Option { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Gateway sets network gateway | ||||
| func Gateway(g string) Option { | ||||
| 	return func(o *Options) { | ||||
| 		o.Gateway = g | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // RoutingTable sets the routing table | ||||
| func RoutingTable(t Table) Option { | ||||
| 	return func(o *Options) { | ||||
| @@ -61,12 +72,11 @@ func Registry(r registry.Registry) Option { | ||||
|  | ||||
| // DefaultOptions returns router default options | ||||
| func DefaultOptions() Options { | ||||
| 	// NOTE: by default both local and network registies use default registry i.e. mdns | ||||
| 	return Options{ | ||||
| 		ID:       uuid.New().String(), | ||||
| 		Address:  DefaultAddress, | ||||
| 		Network:  DefaultNetwork, | ||||
| 		Registry: registry.DefaultRegistry, | ||||
| 		Table:    NewTable(), | ||||
| 		Network:  "local", | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -20,6 +20,8 @@ type RoutePolicy int | ||||
| const ( | ||||
| 	// OverrideIfExists overrides route if it already exists | ||||
| 	OverrideIfExists RoutePolicy = iota | ||||
| 	// AddIfNotExist adds the route if it does not exist | ||||
| 	AddIfNotExists | ||||
| 	// IgnoreIfExists instructs to not modify existing route | ||||
| 	IgnoreIfExists | ||||
| ) | ||||
| @@ -28,9 +30,11 @@ const ( | ||||
| func (p RoutePolicy) String() string { | ||||
| 	switch p { | ||||
| 	case OverrideIfExists: | ||||
| 		return "OVERRIDE" | ||||
| 		return "OVERRIDE_IF_EXISTS" | ||||
| 	case AddIfNotExists: | ||||
| 		return "ADD_IF_NOT_EXISTS" | ||||
| 	case IgnoreIfExists: | ||||
| 		return "IGNORE" | ||||
| 		return "IGNORE_IF_EXISTS" | ||||
| 	default: | ||||
| 		return "UNKNOWN" | ||||
| 	} | ||||
|   | ||||
| @@ -1,6 +1,13 @@ | ||||
| // Package router provides a network routing control plane | ||||
| package router | ||||
|  | ||||
| import "time" | ||||
|  | ||||
| var ( | ||||
| 	// DefaultRouter is default network router | ||||
| 	DefaultRouter = NewRouter() | ||||
| ) | ||||
|  | ||||
| // Router is an interface for a routing control plane | ||||
| type Router interface { | ||||
| 	// Init initializes the router with options | ||||
| @@ -15,21 +22,69 @@ type Router interface { | ||||
| 	Address() string | ||||
| 	// Network returns the network address of the router | ||||
| 	Network() string | ||||
| 	// Advertise starts advertising the routes to the network | ||||
| 	Advertise() error | ||||
| 	// Advertise starts advertising routes to the network | ||||
| 	Advertise() (<-chan *Update, error) | ||||
| 	// Update updates the routing table | ||||
| 	Update(*Update) error | ||||
| 	// Status returns router status | ||||
| 	Status() Status | ||||
| 	// Stop stops the router | ||||
| 	Stop() error | ||||
| 	// String returns debug info | ||||
| 	String() string | ||||
| } | ||||
|  | ||||
| // Update is sent by the router to the network | ||||
| type Update struct { | ||||
| 	// ID is the router ID | ||||
| 	ID string | ||||
| 	// Timestamp marks the time when update is sent | ||||
| 	Timestamp time.Time | ||||
| 	// Event defines advertisement even | ||||
| 	Event *Event | ||||
| } | ||||
|  | ||||
| // StatusCode defines router status | ||||
| type StatusCode int | ||||
|  | ||||
| // Status is router status | ||||
| type Status struct { | ||||
| 	// Error is router error | ||||
| 	Error error | ||||
| 	// Code defines router status | ||||
| 	Code StatusCode | ||||
| } | ||||
|  | ||||
| const ( | ||||
| 	// Init means the rotuer has just been initialized | ||||
| 	Init StatusCode = iota | ||||
| 	// Running means the router is running | ||||
| 	Running | ||||
| 	// Error means the router has crashed with error | ||||
| 	Error | ||||
| 	// Stopped means the router has stopped | ||||
| 	Stopped | ||||
| ) | ||||
|  | ||||
| // String returns human readable status code | ||||
| func (sc StatusCode) String() string { | ||||
| 	switch sc { | ||||
| 	case Init: | ||||
| 		return "INITIALIZED" | ||||
| 	case Running: | ||||
| 		return "RUNNING" | ||||
| 	case Error: | ||||
| 		return "ERROR" | ||||
| 	case Stopped: | ||||
| 		return "STOPPED" | ||||
| 	default: | ||||
| 		return "UNKNOWN" | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Option used by the router | ||||
| type Option func(*Options) | ||||
|  | ||||
| var ( | ||||
| 	DefaultRouter = NewRouter() | ||||
| ) | ||||
|  | ||||
| // NewRouter creates new Router and returns it | ||||
| func NewRouter(opts ...Option) Router { | ||||
| 	return newRouter(opts...) | ||||
|   | ||||
| @@ -9,7 +9,7 @@ import ( | ||||
|  | ||||
| var ( | ||||
| 	// ErrWatcherStopped is returned when routing table watcher has been stopped | ||||
| 	ErrWatcherStopped = errors.New("routing table watcher stopped") | ||||
| 	ErrWatcherStopped = errors.New("watcher stopped") | ||||
| ) | ||||
|  | ||||
| // EventType defines routing table event | ||||
| @@ -81,7 +81,7 @@ type tableWatcher struct { | ||||
| } | ||||
|  | ||||
| // Next returns the next noticed action taken on table | ||||
| // TODO: this needs to be thought through properly; we only allow watching particular route destination | ||||
| // TODO: this needs to be thought through properly; we only allow watching particular route destination for now | ||||
| func (w *tableWatcher) Next() (*Event, error) { | ||||
| 	for { | ||||
| 		select { | ||||
| @@ -93,6 +93,7 @@ func (w *tableWatcher) Next() (*Event, error) { | ||||
| 				if w.opts.Destination == res.Route.Destination { | ||||
| 					return res, nil | ||||
| 				} | ||||
| 				continue | ||||
| 			} | ||||
| 		case <-w.done: | ||||
| 			return nil, ErrWatcherStopped | ||||
|   | ||||
| @@ -1,7 +1,6 @@ | ||||
| package registry | ||||
|  | ||||
| import ( | ||||
| 	"errors" | ||||
| 	"strings" | ||||
|  | ||||
| 	"github.com/micro/mdns" | ||||
| @@ -63,7 +62,7 @@ func (m *mdnsWatcher) Next() (*Result, error) { | ||||
| 				Service: service, | ||||
| 			}, nil | ||||
| 		case <-m.exit: | ||||
| 			return nil, errors.New("watcher stopped") | ||||
| 			return nil, ErrWatcherStopped | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user