Big refactor. New Registry watchers. New options. New names.
This commit is contained in:
		| @@ -2,41 +2,33 @@ package router | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"strconv" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/micro/go-log" | ||||
| 	"github.com/micro/go-micro/registry" | ||||
| 	"github.com/micro/go-micro/registry/gossip" | ||||
| 	"github.com/olekukonko/tablewriter" | ||||
| ) | ||||
|  | ||||
| type router struct { | ||||
| 	opts Options | ||||
| 	goss registry.Registry | ||||
| 	exit chan struct{} | ||||
| 	wg   *sync.WaitGroup | ||||
| } | ||||
|  | ||||
| func newRouter(opts ...Option) Router { | ||||
| 	// set default options | ||||
| 	options := Options{ | ||||
| 		Table: NewTable(), | ||||
| 	} | ||||
| 	// get default options | ||||
| 	options := DefaultOptions() | ||||
|  | ||||
| 	// apply requested options | ||||
| 	for _, o := range opts { | ||||
| 		o(&options) | ||||
| 	} | ||||
|  | ||||
| 	// bind to gossip address to join gossip registry | ||||
| 	goss := gossip.NewRegistry( | ||||
| 		gossip.Address(options.GossipAddr), | ||||
| 	) | ||||
|  | ||||
| 	return &router{ | ||||
| 		opts: options, | ||||
| 		goss: goss, | ||||
| 		exit: make(chan struct{}), | ||||
| 		wg:   &sync.WaitGroup{}, | ||||
| 	} | ||||
| @@ -65,29 +57,78 @@ func (r *router) Address() string { | ||||
| 	return r.opts.Address | ||||
| } | ||||
|  | ||||
| // Gossip returns gossip bind address | ||||
| func (r *router) Gossip() string { | ||||
| 	return r.opts.GossipAddress | ||||
| } | ||||
|  | ||||
| // Network returns router's micro network | ||||
| func (r *router) Network() string { | ||||
| 	return r.opts.NetworkAddr | ||||
| 	return r.opts.NetworkAddress | ||||
| } | ||||
|  | ||||
| // Start starts the router | ||||
| func (r *router) Start() error { | ||||
| 	// TODO: | ||||
| 	// - list all remote services and populate routing table | ||||
| 	// - list all local services and populate remote registry | ||||
|  | ||||
| 	gWatcher, err := r.goss.Watch() | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("failed to create router gossip registry watcher: %v", err) | ||||
| 	// add local service routes into routing table | ||||
| 	if err := r.addServiceRoutes(r.opts.LocalRegistry, "local", 1); err != nil { | ||||
| 		return fmt.Errorf("failed to add service routes for local services: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	tWatcher, err := r.opts.Table.Watch() | ||||
| 	// add network service routes into routing table | ||||
| 	if err := r.addServiceRoutes(r.opts.NetworkRegistry, r.opts.NetworkAddress, 10); err != nil { | ||||
| 		return fmt.Errorf("failed to add service routes for network services: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	// lookup local service routes and advertise them in network registry | ||||
| 	query := NewQuery(QueryNetwork("local")) | ||||
| 	localRoutes, err := r.opts.Table.Lookup(query) | ||||
| 	if err != nil && err != ErrRouteNotFound { | ||||
| 		return fmt.Errorf("failed to lookup local service routes: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	addr := strings.Split(r.opts.Address, ":") | ||||
| 	port, err := strconv.Atoi(addr[1]) | ||||
| 	if err != nil { | ||||
| 		fmt.Errorf("could not parse router address from %s: %v", r.opts.Address, err) | ||||
| 	} | ||||
|  | ||||
| 	for _, route := range localRoutes { | ||||
| 		node := ®istry.Node{ | ||||
| 			Id:      r.opts.ID, | ||||
| 			Address: addr[0], | ||||
| 			Port:    port, | ||||
| 		} | ||||
|  | ||||
| 		service := ®istry.Service{ | ||||
| 			Name:  route.Options().DestAddr, | ||||
| 			Nodes: []*registry.Node{node}, | ||||
| 		} | ||||
| 		if err := r.opts.NetworkRegistry.Register(service, registry.RegisterTTL(10*time.Second)); err != nil { | ||||
| 			return fmt.Errorf("failed to register service %s in network registry: %v", service.Name, err) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	lWatcher, err := r.opts.LocalRegistry.Watch() | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("failed to create local registry watcher: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	rWatcher, err := r.opts.NetworkRegistry.Watch() | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("failed to create network registry watcher: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	// we only watch local entries which we resend to network registry | ||||
| 	tWatcher, err := r.opts.Table.Watch(WatchNetwork("local")) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("failed to create routing table watcher: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	r.wg.Add(1) | ||||
| 	go r.watchGossip(gWatcher) | ||||
| 	go r.watchLocal(lWatcher) | ||||
|  | ||||
| 	r.wg.Add(1) | ||||
| 	go r.watchRemote(rWatcher) | ||||
|  | ||||
| 	r.wg.Add(1) | ||||
| 	go r.watchTable(tWatcher) | ||||
| @@ -95,15 +136,39 @@ func (r *router) Start() error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // watch gossip registry | ||||
| func (r *router) watchGossip(w registry.Watcher) error { | ||||
| func (r *router) addServiceRoutes(reg registry.Registry, network string, metric int) error { | ||||
| 	// list all local services | ||||
| 	services, err := reg.ListServices() | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("failed to list services: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	// add services to routing table | ||||
| 	for _, service := range services { | ||||
| 		// create new micro network route | ||||
| 		route := NewRoute( | ||||
| 			DestAddr(service.Name), | ||||
| 			Gateway(r), | ||||
| 			Network(network), | ||||
| 			Metric(metric), | ||||
| 		) | ||||
| 		// add new route to routing table | ||||
| 		if err := r.opts.Table.Add(route); err != nil { | ||||
| 			return fmt.Errorf("failed to add route for service: %s", service.Name) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // watch local registry | ||||
| func (r *router) watchLocal(w registry.Watcher) error { | ||||
| 	defer r.wg.Done() | ||||
|  | ||||
| 	r.wg.Add(1) | ||||
| 	go func() { | ||||
| 		defer r.wg.Done() | ||||
| 		<-r.exit | ||||
| 		// stop gossip registry watcher | ||||
| 		w.Stop() | ||||
| 	}() | ||||
|  | ||||
| @@ -121,20 +186,83 @@ func (r *router) watchGossip(w registry.Watcher) error { | ||||
| 			break | ||||
| 		} | ||||
|  | ||||
| 		// create new route | ||||
| 		route := NewRoute( | ||||
| 			DestAddr(res.Service.Name), | ||||
| 			Gateway(r), | ||||
| 			Network("local"), | ||||
| 			Metric(1), | ||||
| 		) | ||||
|  | ||||
| 		switch res.Action { | ||||
| 		case "create": | ||||
| 			if len(res.Service.Nodes) > 0 { | ||||
| 				log.Logf("Action: %s, Service: %v", res.Action, res.Service.Name) | ||||
| 				if err := r.opts.Table.Add(route); err != nil { | ||||
| 					log.Logf("[router] failed to add route for local service: %v", res.Service.Name) | ||||
| 				} | ||||
| 			} | ||||
| 		case "delete": | ||||
| 			log.Logf("Action: %s, Service: %v", res.Action, res.Service.Name) | ||||
| 			if err := r.opts.Table.Remove(route); err != nil { | ||||
| 				log.Logf("[router] failed to remove route for local service: %v", res.Service.Name) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return watchErr | ||||
| } | ||||
|  | ||||
| // watch gossip registry | ||||
| // watch remote registry | ||||
| func (r *router) watchRemote(w registry.Watcher) error { | ||||
| 	defer r.wg.Done() | ||||
|  | ||||
| 	r.wg.Add(1) | ||||
| 	go func() { | ||||
| 		defer r.wg.Done() | ||||
| 		<-r.exit | ||||
| 		w.Stop() | ||||
| 	}() | ||||
|  | ||||
| 	var watchErr error | ||||
|  | ||||
| 	// watch for changes to services | ||||
| 	for { | ||||
| 		res, err := w.Next() | ||||
| 		if err == registry.ErrWatcherStopped { | ||||
| 			break | ||||
| 		} | ||||
|  | ||||
| 		if err != nil { | ||||
| 			watchErr = err | ||||
| 			break | ||||
| 		} | ||||
|  | ||||
| 		// create new route | ||||
| 		route := NewRoute( | ||||
| 			DestAddr(res.Service.Name), | ||||
| 			Gateway(r), | ||||
| 			Network(r.opts.NetworkAddress), | ||||
| 			Metric(10), | ||||
| 			RoutePolicy(IgnoreIfExists), | ||||
| 		) | ||||
|  | ||||
| 		switch res.Action { | ||||
| 		case "create": | ||||
| 			if len(res.Service.Nodes) > 0 { | ||||
| 				if err := r.opts.Table.Add(route); err != nil { | ||||
| 					log.Logf("[router] failed to add route for network service: %v", res.Service.Name) | ||||
| 				} | ||||
| 			} | ||||
| 		case "delete": | ||||
| 			if err := r.opts.Table.Remove(route); err != nil { | ||||
| 				log.Logf("[router] failed to remove route for network service: %v", res.Service.Name) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return watchErr | ||||
| } | ||||
|  | ||||
| // watch routing table changes | ||||
| func (r *router) watchTable(w Watcher) error { | ||||
| 	defer r.wg.Done() | ||||
|  | ||||
| @@ -142,7 +270,6 @@ func (r *router) watchTable(w Watcher) error { | ||||
| 	go func() { | ||||
| 		defer r.wg.Done() | ||||
| 		<-r.exit | ||||
| 		// stop gossip registry watcher | ||||
| 		w.Stop() | ||||
| 	}() | ||||
|  | ||||
| @@ -160,11 +287,35 @@ func (r *router) watchTable(w Watcher) error { | ||||
| 			break | ||||
| 		} | ||||
|  | ||||
| 		addr := strings.Split(r.opts.Address, ":") | ||||
| 		port, err := strconv.Atoi(addr[1]) | ||||
| 		if err != nil { | ||||
| 			log.Logf("[router] could not parse router address from %s: %v", r.opts.Address, err) | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		node := ®istry.Node{ | ||||
| 			Id:      r.opts.ID, | ||||
| 			Address: addr[0], | ||||
| 			Port:    port, | ||||
| 		} | ||||
|  | ||||
| 		service := ®istry.Service{ | ||||
| 			Name:  res.Route.Options().DestAddr, | ||||
| 			Nodes: []*registry.Node{node}, | ||||
| 		} | ||||
|  | ||||
| 		switch res.Action { | ||||
| 		case "add": | ||||
| 			log.Logf("Action: %s, Route: %v", res.Action, res.Route) | ||||
| 			log.Logf("[router] routing table action: %s, route: %v", res.Action, res.Route) | ||||
| 			if err := r.opts.NetworkRegistry.Register(service, registry.RegisterTTL(10*time.Second)); err != nil { | ||||
| 				log.Logf("[router] failed to register service %s in network registry: %v", service.Name, err) | ||||
| 			} | ||||
| 		case "remove": | ||||
| 			log.Logf("Action: %s, Route: %v", res.Action, res.Route) | ||||
| 			log.Logf("[router] routing table action: %s, route: %v", res.Action, res.Route) | ||||
| 			if err := r.opts.NetworkRegistry.Register(service); err != nil { | ||||
| 				log.Logf("[router] failed to deregister service %s from network registry: %v", service.Name, err) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| @@ -187,13 +338,12 @@ func (r *router) String() string { | ||||
| 	sb := &strings.Builder{} | ||||
|  | ||||
| 	table := tablewriter.NewWriter(sb) | ||||
| 	table.SetHeader([]string{"ID", "Address", "Gossip", "Network", "Table"}) | ||||
| 	table.SetHeader([]string{"ID", "Address", "Network", "Table"}) | ||||
|  | ||||
| 	data := []string{ | ||||
| 		r.opts.ID, | ||||
| 		r.opts.Address, | ||||
| 		r.opts.GossipAddr, | ||||
| 		r.opts.NetworkAddr, | ||||
| 		r.opts.NetworkAddress, | ||||
| 		fmt.Sprintf("%d", r.opts.Table.Size()), | ||||
| 	} | ||||
| 	table.Append(data) | ||||
|   | ||||
| @@ -1,11 +1,15 @@ | ||||
| package router | ||||
|  | ||||
| import "context" | ||||
|  | ||||
| // AddPolicy defines routing table addition policy | ||||
| type AddPolicy int | ||||
|  | ||||
| const ( | ||||
| 	// Override overrides existing routing table route | ||||
| 	OverrideIfExists AddPolicy = iota | ||||
| 	// IgnoreIfExists does not add new route | ||||
| 	IgnoreIfExists | ||||
| 	// ErrIfExists returns error if the route already exists | ||||
| 	ErrIfExists | ||||
| ) | ||||
| @@ -22,6 +26,8 @@ type RouteOptions struct { | ||||
| 	Metric int | ||||
| 	// Policy defines route addition policy | ||||
| 	Policy AddPolicy | ||||
| 	// Context stores other arbitrary options | ||||
| 	Context context.Context | ||||
| } | ||||
|  | ||||
| // DestAddr sets destination address | ||||
|   | ||||
| @@ -1,25 +1,35 @@ | ||||
| package router | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"github.com/google/uuid" | ||||
| 	"github.com/micro/go-micro/registry" | ||||
| ) | ||||
|  | ||||
| // Options allows to set Router options | ||||
| var ( | ||||
| 	// DefaultAddress is default router bind address | ||||
| 	DefaultAddress = ":9093" | ||||
| 	// DefaultNetworkAddress is default micro network bind address | ||||
| 	DefaultNetworkAddress = ":9094" | ||||
| ) | ||||
|  | ||||
| // Options allows to set router options | ||||
| type Options struct { | ||||
| 	// ID is router ID | ||||
| 	ID string | ||||
| 	// Address is router address | ||||
| 	Address string | ||||
| 	// GossipAddr is router gossip address | ||||
| 	GossipAddr string | ||||
| 	// NetworkAddr defines micro network address | ||||
| 	NetworkAddr string | ||||
| 	// RIB is Routing Information Base | ||||
| 	RIB RIB | ||||
| 	// GossipAddress is router gossip address | ||||
| 	GossipAddress string | ||||
| 	// NetworkAddress is micro network address | ||||
| 	NetworkAddress string | ||||
| 	// LocalRegistry is router local registry | ||||
| 	LocalRegistry registry.Registry | ||||
| 	// NetworkRegistry is router remote registry | ||||
| 	NetworkRegistry registry.Registry | ||||
| 	// Table is routing table | ||||
| 	Table Table | ||||
| 	// Context stores arbitrary options | ||||
| 	Context context.Context | ||||
| 	// RIB is Routing Information Base | ||||
| 	RIB RIB | ||||
| } | ||||
|  | ||||
| // ID sets Router ID | ||||
| @@ -36,24 +46,17 @@ func Address(a string) Option { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // GossipAddr sets router gossip address | ||||
| func GossipAddr(a string) Option { | ||||
| // GossipAddress sets router gossip address | ||||
| func GossipAddress(a string) Option { | ||||
| 	return func(o *Options) { | ||||
| 		o.GossipAddr = a | ||||
| 		o.GossipAddress = a | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // NetworkAddr sets router network address | ||||
| func NetworkAddr(n string) Option { | ||||
| // NetworkAddress sets router network address | ||||
| func NetworkAddress(n string) Option { | ||||
| 	return func(o *Options) { | ||||
| 		o.NetworkAddr = n | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // RIBase allows to configure RIB | ||||
| func RIBase(r RIB) Option { | ||||
| 	return func(o *Options) { | ||||
| 		o.RIB = r | ||||
| 		o.NetworkAddress = n | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @@ -63,3 +66,38 @@ func RoutingTable(t Table) Option { | ||||
| 		o.Table = t | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // LocalRegistry allows to specify local registry | ||||
| func LocalRegistry(r registry.Registry) Option { | ||||
| 	return func(o *Options) { | ||||
| 		o.LocalRegistry = r | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // NetworkRegistry allows to specify remote registry | ||||
| func NetworkRegistry(r registry.Registry) Option { | ||||
| 	return func(o *Options) { | ||||
| 		o.NetworkRegistry = r | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // RouterIB allows to configure RIB | ||||
| func RouterIB(r RIB) Option { | ||||
| 	return func(o *Options) { | ||||
| 		o.RIB = r | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // DefaultOptions returns router default options | ||||
| func DefaultOptions() Options { | ||||
| 	// NOTE: by default both local and network registies use default registry i.e. mdns | ||||
| 	// TODO: DefaultRIB needs to be added once it's properly figured out | ||||
| 	return Options{ | ||||
| 		ID:              uuid.New().String(), | ||||
| 		Address:         DefaultAddress, | ||||
| 		NetworkAddress:  DefaultNetworkAddress, | ||||
| 		LocalRegistry:   registry.DefaultRegistry, | ||||
| 		NetworkRegistry: registry.DefaultRegistry, | ||||
| 		Table:           NewTable(), | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -12,18 +12,25 @@ const ( | ||||
|  | ||||
| // QueryOptions allow to define routing table query options | ||||
| type QueryOptions struct { | ||||
| 	// Route allows to set route options | ||||
| 	RouteOptions *RouteOptions | ||||
| 	// DestAddr defines destination address | ||||
| 	DestAddr string | ||||
| 	// NetworkAddress defines network address | ||||
| 	Network string | ||||
| 	// Policy defines query lookup policy | ||||
| 	Policy LookupPolicy | ||||
| 	// Count defines max number of results to return | ||||
| 	Count int | ||||
| } | ||||
|  | ||||
| // QueryRouteOpts allows to set the route query options | ||||
| func QueryRouteOptons(r *RouteOptions) QueryOption { | ||||
| // QueryDestAddr sets query destination address | ||||
| func QueryDestAddr(a string) QueryOption { | ||||
| 	return func(o *QueryOptions) { | ||||
| 		o.RouteOptions = r | ||||
| 		o.DestAddr = a | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // QueryNetwork sets query network address | ||||
| func QueryNetwork(a string) QueryOption { | ||||
| 	return func(o *QueryOptions) { | ||||
| 		o.Network = a | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @@ -34,13 +41,6 @@ func QueryPolicy(p LookupPolicy) QueryOption { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // QueryCount allows to set max results to return | ||||
| func QueryCount(c int) QueryOption { | ||||
| 	return func(o *QueryOptions) { | ||||
| 		o.Count = c | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Query defines routing table query | ||||
| type Query interface { | ||||
| 	// Options returns query options | ||||
| @@ -53,7 +53,12 @@ type query struct { | ||||
|  | ||||
| // NewQuery creates new query and returns it | ||||
| func NewQuery(opts ...QueryOption) Query { | ||||
| 	qopts := QueryOptions{} | ||||
| 	// default options | ||||
| 	qopts := QueryOptions{ | ||||
| 		DestAddr: "*", | ||||
| 		Network:  "*", | ||||
| 		Policy:   DiscardNoRoute, | ||||
| 	} | ||||
|  | ||||
| 	for _, o := range opts { | ||||
| 		o(&qopts) | ||||
|   | ||||
							
								
								
									
										26
									
								
								router/rib.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										26
									
								
								router/rib.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,26 @@ | ||||
| package router | ||||
|  | ||||
| // RIB is Routing Information Base | ||||
| type RIB interface { | ||||
| 	// Initi initializes RIB | ||||
| 	Init(...RIBOption) error | ||||
| 	// Options returns RIB options | ||||
| 	Options() RIBOptions | ||||
| 	// Routes returns routes in RIB | ||||
| 	Routes() []Route | ||||
| 	// String returns debug info | ||||
| 	String() string | ||||
| } | ||||
|  | ||||
| // RIBOptions allow to set RIB sources. | ||||
| type RIBOptions struct { | ||||
| 	// Source defines RIB source URL | ||||
| 	Source string | ||||
| } | ||||
|  | ||||
| // Source sets RIB source | ||||
| func Source(s string) RIBOption { | ||||
| 	return func(o *RIBOptions) { | ||||
| 		o.Source = s | ||||
| 	} | ||||
| } | ||||
| @@ -3,26 +3,22 @@ package router | ||||
|  | ||||
| // Router is micro network router | ||||
| type Router interface { | ||||
| 	// Initi initializes Router with options | ||||
| 	// Init initializes the router with options | ||||
| 	Init(...Option) error | ||||
| 	// Options returns Router options | ||||
| 	// Options returns the router options | ||||
| 	Options() Options | ||||
| 	// Table returns routing table | ||||
| 	Table() Table | ||||
| 	// Address returns router adddress | ||||
| 	Address() string | ||||
| 	// Gossip returns router gossip address | ||||
| 	Gossip() string | ||||
| 	// Network returns router network address | ||||
| 	Network() string | ||||
| 	// Start starts router | ||||
| 	// Start starts the router | ||||
| 	Start() error | ||||
| 	// Stop stops router | ||||
| 	// Stop stops the router | ||||
| 	Stop() error | ||||
| 	// String returns router debug info | ||||
| 	String() string | ||||
| } | ||||
|  | ||||
| // RIB is Routing Information Base | ||||
| type RIB interface { | ||||
| 	// String returns debug info | ||||
| 	String() string | ||||
| } | ||||
| @@ -30,6 +26,9 @@ type RIB interface { | ||||
| // Option used by the router | ||||
| type Option func(*Options) | ||||
|  | ||||
| // RIBOptopn is used to configure RIB | ||||
| type RIBOption func(*RIBOptions) | ||||
|  | ||||
| // RouteOption is used to define routing table entry options | ||||
| type RouteOption func(*RouteOptions) | ||||
|  | ||||
|   | ||||
| @@ -28,7 +28,7 @@ type Table interface { | ||||
| 	// Remove removes existing route from the table | ||||
| 	Remove(Route) error | ||||
| 	// Update updates route in the table | ||||
| 	Update(...RouteOption) error | ||||
| 	Update(Route) error | ||||
| 	// Lookup looks up routes in the table | ||||
| 	Lookup(Query) ([]Route, error) | ||||
| 	// Watch returns a watcher which allows you to track updates to the table | ||||
| @@ -83,6 +83,10 @@ func (t *table) Add(r Route) error { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	if r.Options().Policy == IgnoreIfExists { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	return ErrDuplicateRoute | ||||
| } | ||||
|  | ||||
| @@ -104,13 +108,11 @@ func (t *table) Remove(r Route) error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Update updates routing table using propvided options | ||||
| func (t *table) Update(opts ...RouteOption) error { | ||||
| // Update updates routing table with new route | ||||
| func (t *table) Update(r Route) error { | ||||
| 	t.Lock() | ||||
| 	defer t.Unlock() | ||||
|  | ||||
| 	r := NewRoute(opts...) | ||||
|  | ||||
| 	destAddr := r.Options().DestAddr | ||||
| 	sum := t.hash(r) | ||||
|  | ||||
| @@ -129,7 +131,37 @@ func (t *table) Update(opts ...RouteOption) error { | ||||
|  | ||||
| // Lookup queries routing table and returns all routes that match it | ||||
| func (t *table) Lookup(q Query) ([]Route, error) { | ||||
| 	return nil, ErrNotImplemented | ||||
| 	t.RLock() | ||||
| 	defer t.RUnlock() | ||||
|  | ||||
| 	var results []Route | ||||
|  | ||||
| 	for destAddr, routes := range t.m { | ||||
| 		if q.Options().DestAddr != "*" { | ||||
| 			if q.Options().DestAddr != destAddr { | ||||
| 				continue | ||||
| 			} | ||||
| 			for _, route := range routes { | ||||
| 				if q.Options().Network == "*" || q.Options().Network == route.Options().Network { | ||||
| 					results = append(results, route) | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		if q.Options().DestAddr == "*" { | ||||
| 			for _, route := range routes { | ||||
| 				if q.Options().Network == "*" || q.Options().Network == route.Options().Network { | ||||
| 					results = append(results, route) | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if len(results) == 0 && q.Options().Policy != DiscardNoRoute { | ||||
| 		return nil, ErrRouteNotFound | ||||
| 	} | ||||
|  | ||||
| 	return results, nil | ||||
| } | ||||
|  | ||||
| // Watch returns routing table entry watcher | ||||
|   | ||||
| @@ -54,14 +54,14 @@ type tableWatcher struct { | ||||
| 	done    chan struct{} | ||||
| } | ||||
|  | ||||
| // TODO: We might simply use Query here once QueryLookup is figured out | ||||
| // TODO: this needs to be thought through properly | ||||
| // Next returns the next noticed action taken on table | ||||
| func (w *tableWatcher) Next() (*Result, error) { | ||||
| 	for { | ||||
| 		select { | ||||
| 		case res := <-w.resChan: | ||||
| 			switch w.opts.DestAddr { | ||||
| 			case "*": | ||||
| 			case "*", "": | ||||
| 				if w.opts.Network == "*" || w.opts.Network == res.Route.Options().Network { | ||||
| 					return res, nil | ||||
| 				} | ||||
|   | ||||
		Reference in New Issue
	
	Block a user