Lots of refactoring. We now have basic routing table watcher.
This commit is contained in:
		| @@ -3,7 +3,9 @@ package router | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
|  | ||||
| 	"github.com/micro/go-log" | ||||
| 	"github.com/micro/go-micro/registry" | ||||
| 	"github.com/micro/go-micro/registry/gossip" | ||||
| 	"github.com/olekukonko/tablewriter" | ||||
| @@ -12,6 +14,8 @@ import ( | ||||
| type router struct { | ||||
| 	opts Options | ||||
| 	goss registry.Registry | ||||
| 	exit chan struct{} | ||||
| 	wg   *sync.WaitGroup | ||||
| } | ||||
|  | ||||
| func newRouter(opts ...Option) Router { | ||||
| @@ -20,22 +24,22 @@ func newRouter(opts ...Option) Router { | ||||
| 		Table: NewTable(), | ||||
| 	} | ||||
|  | ||||
| 	// apply requested options | ||||
| 	for _, o := range opts { | ||||
| 		o(&options) | ||||
| 	} | ||||
|  | ||||
| 	// bind to gossip address to join gossip registry | ||||
| 	goss := gossip.NewRegistry( | ||||
| 		gossip.Address(options.GossipAddr), | ||||
| 	) | ||||
|  | ||||
| 	r := &router{ | ||||
| 	return &router{ | ||||
| 		opts: options, | ||||
| 		goss: goss, | ||||
| 		exit: make(chan struct{}), | ||||
| 		wg:   &sync.WaitGroup{}, | ||||
| 	} | ||||
|  | ||||
| 	// TODO: start gossip.Registry watch here | ||||
|  | ||||
| 	return r | ||||
| } | ||||
|  | ||||
| // Init initializes router with given options | ||||
| @@ -66,6 +70,118 @@ func (r *router) Network() string { | ||||
| 	return r.opts.NetworkAddr | ||||
| } | ||||
|  | ||||
| // Start starts the router | ||||
| func (r *router) Start() error { | ||||
| 	// TODO: | ||||
| 	// - list all remote services and populate routing table | ||||
| 	// - list all local services and populate remote registry | ||||
|  | ||||
| 	gWatcher, err := r.goss.Watch() | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("failed to create router gossip registry watcher: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	tWatcher, err := r.opts.Table.Watch() | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("failed to create routing table watcher: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	r.wg.Add(1) | ||||
| 	go r.watchGossip(gWatcher) | ||||
|  | ||||
| 	r.wg.Add(1) | ||||
| 	go r.watchTable(tWatcher) | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // watch gossip registry | ||||
| func (r *router) watchGossip(w registry.Watcher) error { | ||||
| 	defer r.wg.Done() | ||||
|  | ||||
| 	r.wg.Add(1) | ||||
| 	go func() { | ||||
| 		defer r.wg.Done() | ||||
| 		<-r.exit | ||||
| 		// stop gossip registry watcher | ||||
| 		w.Stop() | ||||
| 	}() | ||||
|  | ||||
| 	var watchErr error | ||||
|  | ||||
| 	// watch for changes to services | ||||
| 	for { | ||||
| 		res, err := w.Next() | ||||
| 		if err == registry.ErrWatcherStopped { | ||||
| 			break | ||||
| 		} | ||||
|  | ||||
| 		if err != nil { | ||||
| 			watchErr = err | ||||
| 			break | ||||
| 		} | ||||
|  | ||||
| 		switch res.Action { | ||||
| 		case "create": | ||||
| 			if len(res.Service.Nodes) > 0 { | ||||
| 				log.Logf("Action: %s, Service: %v", res.Action, res.Service.Name) | ||||
| 			} | ||||
| 		case "delete": | ||||
| 			log.Logf("Action: %s, Service: %v", res.Action, res.Service.Name) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return watchErr | ||||
| } | ||||
|  | ||||
| // watch gossip registry | ||||
| func (r *router) watchTable(w Watcher) error { | ||||
| 	defer r.wg.Done() | ||||
|  | ||||
| 	r.wg.Add(1) | ||||
| 	go func() { | ||||
| 		defer r.wg.Done() | ||||
| 		<-r.exit | ||||
| 		// stop gossip registry watcher | ||||
| 		w.Stop() | ||||
| 	}() | ||||
|  | ||||
| 	var watchErr error | ||||
|  | ||||
| 	// watch for changes to services | ||||
| 	for { | ||||
| 		res, err := w.Next() | ||||
| 		if err == ErrWatcherStopped { | ||||
| 			break | ||||
| 		} | ||||
|  | ||||
| 		if err != nil { | ||||
| 			watchErr = err | ||||
| 			break | ||||
| 		} | ||||
|  | ||||
| 		switch res.Action { | ||||
| 		case "add": | ||||
| 			log.Logf("Action: %s, Route: %v", res.Action, res.Route) | ||||
| 		case "remove": | ||||
| 			log.Logf("Action: %s, Route: %v", res.Action, res.Route) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return watchErr | ||||
| } | ||||
|  | ||||
| // Stop stops the router | ||||
| func (r *router) Stop() error { | ||||
| 	// notify all goroutines to finish | ||||
| 	close(r.exit) | ||||
|  | ||||
| 	// wait for all goroutines to finish | ||||
| 	r.wg.Wait() | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // String prints debugging information about router | ||||
| func (r *router) String() string { | ||||
| 	sb := &strings.Builder{} | ||||
|   | ||||
| @@ -14,8 +14,8 @@ const ( | ||||
| type RouteOptions struct { | ||||
| 	// DestAddr is destination address | ||||
| 	DestAddr string | ||||
| 	// Hop is the next route hop | ||||
| 	Hop Router | ||||
| 	// Gateway is the next route hop | ||||
| 	Gateway Router | ||||
| 	// Network defines micro network | ||||
| 	Network string | ||||
| 	// Metric is route cost metric | ||||
| @@ -31,10 +31,10 @@ func DestAddr(a string) RouteOption { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Hop allows to set the route route options | ||||
| func Hop(r Router) RouteOption { | ||||
| // Gateway sets the route gateway | ||||
| func Gateway(r Router) RouteOption { | ||||
| 	return func(o *RouteOptions) { | ||||
| 		o.Hop = r | ||||
| 		o.Gateway = r | ||||
| 	} | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -29,21 +29,21 @@ func ID(id string) Option { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Address allows to set router address | ||||
| // Address sets router address | ||||
| func Address(a string) Option { | ||||
| 	return func(o *Options) { | ||||
| 		o.Address = a | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // GossipAddress allows to set router address | ||||
| func GossipAddress(a string) Option { | ||||
| // GossipAddr sets router gossip address | ||||
| func GossipAddr(a string) Option { | ||||
| 	return func(o *Options) { | ||||
| 		o.GossipAddr = a | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // NetworkAddr allows to set router network | ||||
| // NetworkAddr sets router network address | ||||
| func NetworkAddr(n string) Option { | ||||
| 	return func(o *Options) { | ||||
| 		o.NetworkAddr = n | ||||
|   | ||||
| @@ -13,26 +13,17 @@ const ( | ||||
| // QueryOptions allow to define routing table query options | ||||
| type QueryOptions struct { | ||||
| 	// Route allows to set route options | ||||
| 	Route *RouteOptions | ||||
| 	// Service is micro service name | ||||
| 	Service string | ||||
| 	RouteOptions *RouteOptions | ||||
| 	// Policy defines query lookup policy | ||||
| 	Policy LookupPolicy | ||||
| 	// Count defines max number of results to return | ||||
| 	Count int | ||||
| } | ||||
|  | ||||
| // RouteOpts allows to set the route query options | ||||
| func RouteOpts(r *RouteOptions) QueryOption { | ||||
| // QueryRouteOpts allows to set the route query options | ||||
| func QueryRouteOptons(r *RouteOptions) QueryOption { | ||||
| 	return func(o *QueryOptions) { | ||||
| 		o.Route = r | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Service allows to set the service name in routing query | ||||
| func Service(s string) QueryOption { | ||||
| 	return func(o *QueryOptions) { | ||||
| 		o.Service = s | ||||
| 		o.RouteOptions = r | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @@ -43,8 +34,8 @@ func QueryPolicy(p LookupPolicy) QueryOption { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // ResultCount allows to set max results to return | ||||
| func ResultCount(c int) QueryOption { | ||||
| // QueryCount allows to set max results to return | ||||
| func QueryCount(c int) QueryOption { | ||||
| 	return func(o *QueryOptions) { | ||||
| 		o.Count = c | ||||
| 	} | ||||
|   | ||||
| @@ -1,4 +1,4 @@ | ||||
| // Package router provides an interface for micro network routers | ||||
| // Package router provides an interface for micro network router | ||||
| package router | ||||
|  | ||||
| // Router is micro network router | ||||
| @@ -9,11 +9,15 @@ type Router interface { | ||||
| 	Options() Options | ||||
| 	// Table returns routing table | ||||
| 	Table() Table | ||||
| 	// Address returns router gossip adddress | ||||
| 	// Address returns router adddress | ||||
| 	Address() string | ||||
| 	// Network returns micro network address | ||||
| 	// Network returns router network address | ||||
| 	Network() string | ||||
| 	// String implemens fmt.Stringer interface | ||||
| 	// Start starts router | ||||
| 	Start() error | ||||
| 	// Stop stops router | ||||
| 	Stop() error | ||||
| 	// String returns router debug info | ||||
| 	String() string | ||||
| } | ||||
|  | ||||
| @@ -32,6 +36,9 @@ type RouteOption func(*RouteOptions) | ||||
| // QueryOption is used to define query options | ||||
| type QueryOption func(*QueryOptions) | ||||
|  | ||||
| // WatchOption is used to define what routes to watch in the table | ||||
| type WatchOption func(*WatchOptions) | ||||
|  | ||||
| // NewRouter creates new Router and returns it | ||||
| func NewRouter(opts ...Option) Router { | ||||
| 	return newRouter(opts...) | ||||
|   | ||||
							
								
								
									
										115
									
								
								router/table.go
									
									
									
									
									
								
							
							
						
						
									
										115
									
								
								router/table.go
									
									
									
									
									
								
							| @@ -8,6 +8,7 @@ import ( | ||||
| 	"strings" | ||||
| 	"sync" | ||||
|  | ||||
| 	"github.com/google/uuid" | ||||
| 	"github.com/olekukonko/tablewriter" | ||||
| ) | ||||
|  | ||||
| @@ -24,12 +25,14 @@ var ( | ||||
| type Table interface { | ||||
| 	// Add adds new route to the table | ||||
| 	Add(Route) error | ||||
| 	// Remove removes route from the table | ||||
| 	// Remove removes existing route from the table | ||||
| 	Remove(Route) error | ||||
| 	// Update updates route in the table | ||||
| 	Update(...RouteOption) error | ||||
| 	// Lookup looks up routes in the table | ||||
| 	Lookup(Query) ([]Route, error) | ||||
| 	// Watch returns a watcher which allows you to track updates to the table | ||||
| 	Watch(opts ...WatchOption) (Watcher, error) | ||||
| 	// Size returns the size of the table | ||||
| 	Size() int | ||||
| 	// String prints the routing table | ||||
| @@ -37,12 +40,13 @@ type Table interface { | ||||
| } | ||||
|  | ||||
| // table is routing table | ||||
| // It maps service name to routes | ||||
| type table struct { | ||||
| 	// m stores routing table map | ||||
| 	m map[uint64]Route | ||||
| 	// h is a hasher hashes route entries | ||||
| 	m map[string]map[uint64]Route | ||||
| 	// h hashes route entries | ||||
| 	h hash.Hash64 | ||||
| 	// w is a list of table watchers | ||||
| 	w map[string]*tableWatcher | ||||
| 	sync.RWMutex | ||||
| } | ||||
|  | ||||
| @@ -52,73 +56,120 @@ func NewTable() Table { | ||||
| 	h.Reset() | ||||
|  | ||||
| 	return &table{ | ||||
| 		m: make(map[uint64]Route), | ||||
| 		m: make(map[string]map[uint64]Route), | ||||
| 		w: make(map[string]*tableWatcher), | ||||
| 		h: h, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Add adds new routing entry | ||||
| // Add adds a route to the routing table | ||||
| func (t *table) Add(r Route) error { | ||||
| 	t.Lock() | ||||
| 	defer t.Unlock() | ||||
|  | ||||
| 	destAddr := r.Options().DestAddr | ||||
| 	sum := t.hash(r) | ||||
|  | ||||
| 	if _, ok := t.m[sum]; !ok { | ||||
| 		t.m[sum] = r | ||||
| 	if _, ok := t.m[destAddr]; !ok { | ||||
| 		t.m[destAddr] = make(map[uint64]Route) | ||||
| 		t.m[destAddr][sum] = r | ||||
| 		go t.sendResult(&Result{Action: "add", Route: r}) | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	if _, ok := t.m[sum]; ok && r.Options().Policy == OverrideIfExists { | ||||
| 		t.m[sum] = r | ||||
| 	if _, ok := t.m[destAddr][sum]; ok && r.Options().Policy == OverrideIfExists { | ||||
| 		t.m[destAddr][sum] = r | ||||
| 		go t.sendResult(&Result{Action: "update", Route: r}) | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	return ErrDuplicateRoute | ||||
| } | ||||
|  | ||||
| // Remove removes entry from the routing table | ||||
| // Remove removes the route from the routing table | ||||
| func (t *table) Remove(r Route) error { | ||||
| 	t.Lock() | ||||
| 	defer t.Unlock() | ||||
|  | ||||
| 	destAddr := r.Options().DestAddr | ||||
| 	sum := t.hash(r) | ||||
|  | ||||
| 	if _, ok := t.m[sum]; !ok { | ||||
| 	if _, ok := t.m[destAddr]; !ok { | ||||
| 		return ErrRouteNotFound | ||||
| 	} | ||||
|  | ||||
| 	delete(t.m, sum) | ||||
| 	delete(t.m[destAddr], sum) | ||||
| 	go t.sendResult(&Result{Action: "remove", Route: r}) | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Update updates routing entry | ||||
| // Update updates routing table using propvided options | ||||
| func (t *table) Update(opts ...RouteOption) error { | ||||
| 	t.Lock() | ||||
| 	defer t.Unlock() | ||||
|  | ||||
| 	r := NewRoute(opts...) | ||||
|  | ||||
| 	destAddr := r.Options().DestAddr | ||||
| 	sum := t.hash(r) | ||||
|  | ||||
| 	if _, ok := t.m[sum]; !ok { | ||||
| 	if _, ok := t.m[destAddr]; !ok { | ||||
| 		return ErrRouteNotFound | ||||
| 	} | ||||
|  | ||||
| 	if _, ok := t.m[sum]; ok { | ||||
| 		t.m[sum] = r | ||||
| 	if _, ok := t.m[destAddr][sum]; ok { | ||||
| 		t.m[destAddr][sum] = r | ||||
| 		go t.sendResult(&Result{Action: "update", Route: r}) | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	return ErrRouteNotFound | ||||
| } | ||||
|  | ||||
| // Lookup looks up entry in the routing table | ||||
| // Lookup queries routing table and returns all routes that match it | ||||
| func (t *table) Lookup(q Query) ([]Route, error) { | ||||
| 	return nil, ErrNotImplemented | ||||
| } | ||||
|  | ||||
| // Watch returns routing table entry watcher | ||||
| func (t *table) Watch(opts ...WatchOption) (Watcher, error) { | ||||
| 	// by default watch everything | ||||
| 	wopts := WatchOptions{ | ||||
| 		DestAddr: "*", | ||||
| 		Network:  "*", | ||||
| 	} | ||||
|  | ||||
| 	for _, o := range opts { | ||||
| 		o(&wopts) | ||||
| 	} | ||||
|  | ||||
| 	watcher := &tableWatcher{ | ||||
| 		opts:    wopts, | ||||
| 		resChan: make(chan *Result, 10), | ||||
| 		done:    make(chan struct{}), | ||||
| 	} | ||||
|  | ||||
| 	t.Lock() | ||||
| 	t.w[uuid.New().String()] = watcher | ||||
| 	t.Unlock() | ||||
|  | ||||
| 	return watcher, nil | ||||
| } | ||||
|  | ||||
| // sendResult sends rules to all subscribe watchers | ||||
| func (t *table) sendResult(r *Result) { | ||||
| 	t.RLock() | ||||
| 	defer t.RUnlock() | ||||
|  | ||||
| 	for _, w := range t.w { | ||||
| 		select { | ||||
| 		case w.resChan <- r: | ||||
| 		case <-w.done: | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Size returns the size of the routing table | ||||
| func (t *table) Size() int { | ||||
| 	t.RLock() | ||||
| @@ -127,7 +178,7 @@ func (t *table) Size() int { | ||||
| 	return len(t.m) | ||||
| } | ||||
|  | ||||
| // String returns text representation of routing table | ||||
| // String returns debug information | ||||
| func (t *table) String() string { | ||||
| 	t.RLock() | ||||
| 	defer t.RUnlock() | ||||
| @@ -137,16 +188,18 @@ func (t *table) String() string { | ||||
|  | ||||
| 	// create nice table printing structure | ||||
| 	table := tablewriter.NewWriter(sb) | ||||
| 	table.SetHeader([]string{"Service", "Gateway", "Network", "Metric"}) | ||||
| 	table.SetHeader([]string{"Destination", "Gateway", "Network", "Metric"}) | ||||
|  | ||||
| 	for _, route := range t.m { | ||||
| 		strRoute := []string{ | ||||
| 			route.Options().DestAddr, | ||||
| 			route.Options().Hop.Address(), | ||||
| 			route.Options().Network, | ||||
| 			fmt.Sprintf("%d", route.Options().Metric), | ||||
| 	for _, destRoute := range t.m { | ||||
| 		for _, route := range destRoute { | ||||
| 			strRoute := []string{ | ||||
| 				route.Options().DestAddr, | ||||
| 				route.Options().Gateway.Address(), | ||||
| 				route.Options().Gateway.Network(), | ||||
| 				fmt.Sprintf("%d", route.Options().Metric), | ||||
| 			} | ||||
| 			table.Append(strRoute) | ||||
| 		} | ||||
| 		table.Append(strRoute) | ||||
| 	} | ||||
|  | ||||
| 	// render table into sb | ||||
| @@ -155,13 +208,13 @@ func (t *table) String() string { | ||||
| 	return sb.String() | ||||
| } | ||||
|  | ||||
| // hash hashes the route using router gateway and network address | ||||
| func (t *table) hash(r Route) uint64 { | ||||
| 	destAddr := r.Options().DestAddr | ||||
| 	routerAddr := r.Options().Hop.Address() | ||||
| 	network := r.Options().Network | ||||
| 	gwAddr := r.Options().Gateway.Address() | ||||
| 	netAddr := r.Options().Network | ||||
|  | ||||
| 	t.h.Reset() | ||||
| 	t.h.Write([]byte(destAddr + routerAddr + network)) | ||||
| 	t.h.Write([]byte(gwAddr + netAddr)) | ||||
|  | ||||
| 	return t.h.Sum64() | ||||
| } | ||||
|   | ||||
							
								
								
									
										89
									
								
								router/table_watcher.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										89
									
								
								router/table_watcher.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,89 @@ | ||||
| package router | ||||
|  | ||||
| import ( | ||||
| 	"errors" | ||||
| ) | ||||
|  | ||||
| var ( | ||||
| 	// ErrWatcherStopped is returned when routing table watcher has been stopped | ||||
| 	ErrWatcherStopped = errors.New("routing table watcher stopped") | ||||
| ) | ||||
|  | ||||
| // Watcher is an interface that returns updates to the routing table | ||||
| type Watcher interface { | ||||
| 	// Next is a blocking call that returns watch result | ||||
| 	Next() (*Result, error) | ||||
| 	// Stop stops watcher | ||||
| 	Stop() | ||||
| } | ||||
|  | ||||
| // Result is returned by a call to Next on the watcher. | ||||
| type Result struct { | ||||
| 	// Action is routing table action which is either of add, remove or update | ||||
| 	Action string | ||||
| 	// Route is table rout | ||||
| 	Route Route | ||||
| } | ||||
|  | ||||
| // Watcher options | ||||
| type WatchOptions struct { | ||||
| 	// Specify destination address to watch | ||||
| 	DestAddr string | ||||
| 	// Specify network to watch | ||||
| 	Network string | ||||
| } | ||||
|  | ||||
| // WatchDestAddr sets what destination to watch | ||||
| // Destination is usually microservice name | ||||
| func WatchDestAddr(a string) WatchOption { | ||||
| 	return func(o *WatchOptions) { | ||||
| 		o.DestAddr = a | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // WatchNetwork sets what network to watch | ||||
| func WatchNetwork(n string) WatchOption { | ||||
| 	return func(o *WatchOptions) { | ||||
| 		o.Network = n | ||||
| 	} | ||||
| } | ||||
|  | ||||
| type tableWatcher struct { | ||||
| 	opts    WatchOptions | ||||
| 	resChan chan *Result | ||||
| 	done    chan struct{} | ||||
| } | ||||
|  | ||||
| // TODO: We might simply use Query here once QueryLookup is figured out | ||||
| // Next returns the next noticed action taken on table | ||||
| func (w *tableWatcher) Next() (*Result, error) { | ||||
| 	for { | ||||
| 		select { | ||||
| 		case res := <-w.resChan: | ||||
| 			switch w.opts.DestAddr { | ||||
| 			case "*": | ||||
| 				if w.opts.Network == "*" || w.opts.Network == res.Route.Options().Network { | ||||
| 					return res, nil | ||||
| 				} | ||||
| 			case res.Route.Options().DestAddr: | ||||
| 				if w.opts.Network == "*" || w.opts.Network == res.Route.Options().Network { | ||||
| 					return res, nil | ||||
| 				} | ||||
| 			} | ||||
| 			// ignore if no match is found | ||||
| 			continue | ||||
| 		case <-w.done: | ||||
| 			return nil, ErrWatcherStopped | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Stop stops routing table watcher | ||||
| func (w *tableWatcher) Stop() { | ||||
| 	select { | ||||
| 	case <-w.done: | ||||
| 		return | ||||
| 	default: | ||||
| 		close(w.done) | ||||
| 	} | ||||
| } | ||||
		Reference in New Issue
	
	Block a user