View File

View File

View File

@ -1,13 +1,14 @@
package register // import ""
package registry
import (
var (
@ -20,26 +21,33 @@ var (
// rtr implements router interface
type rtr struct {
running bool
table *table
options router.Options
exit chan bool
initChan chan bool
opts router.Options
running bool
// NewRouter creates new router and returns it
func NewRouter(opts ...router.Option) router.Router {
options := router.NewOptions(opts...)
// get default options
options := router.DefaultOptions()
// apply requested options
for _, o := range opts {
// construct the router
r := &rtr{
opts: options,
options: options,
initChan: make(chan bool),
// create the new table, passing the fetchRoute method in as a fallback if
// the table doesn't contain the result for a query.
r.table = newTable(r.lookup)
r.table = newTable()
// start the router
@ -50,16 +58,12 @@ func NewRouter(opts ...router.Option) router.Router {
func (r *rtr) Init(opts ...router.Option) error {
for _, o := range opts {
if r.opts.Register == nil {
return fmt.Errorf("register not set")
// push a message to the init chan so the watchers
// can reset in the case the register was changed
// can reset in the case the registry was changed
go func() {
r.initChan <- true
@ -72,7 +76,7 @@ func (r *rtr) Options() router.Options {
defer r.RUnlock()
options := r.opts
options := r.options
return options
@ -84,9 +88,9 @@ func (r *rtr) Table() router.Table {
return r.table
func getDomain(srv *register.Service) string {
func getDomain(srv *registry.Service) string {
// check the service metadata for domain
// TODO: domain as Domain field in register?
// TODO: domain as Domain field in registry?
if srv.Metadata != nil && len(srv.Metadata["domain"]) > 0 {
return srv.Metadata["domain"]
} else if len(srv.Nodes) > 0 && srv.Nodes[0].Metadata != nil {
@ -95,7 +99,7 @@ func getDomain(srv *register.Service) string {
// otherwise return wildcard
// TODO: return GlobalDomain or PublicDomain
return register.DefaultDomain
return registry.DefaultDomain
// manageRoute applies action on a given route
@ -121,8 +125,8 @@ func (r *rtr) manageRoute(route router.Route, action string) error {
// createRoutes turns a service into a list routes basically converting nodes to routes
func (r *rtr) createRoutes(service *register.Service, network string) []router.Route {
routes := make([]router.Route, 0, len(service.Nodes))
func (r *rtr) createRoutes(service *registry.Service, network string) []router.Route {
var routes []router.Route
for _, node := range service.Nodes {
routes = append(routes, router.Route{
@ -130,9 +134,9 @@ func (r *rtr) createRoutes(service *register.Service, network string) []router.R
Address: node.Address,
Gateway: "",
Network: network,
Router: r.opts.ID,
Router: r.options.Id,
Link: router.DefaultLink,
Metric: router.DefaultLocalMetric,
Metric: router.DefaultMetric,
Metadata: node.Metadata,
@ -142,7 +146,10 @@ func (r *rtr) createRoutes(service *register.Service, network string) []router.R
// manageServiceRoutes applies action to all routes of the service.
// It returns error of the action fails with error.
func (r *rtr) manageRoutes(service *register.Service, action, network string) error {
func (r *rtr) manageRoutes(service *registry.Service, action, network string) error {
// action is the routing table action
action = strings.ToLower(action)
// create a set of routes from the service
routes := r.createRoutes(service, network)
@ -157,9 +164,7 @@ func (r *rtr) manageRoutes(service *register.Service, action, network string) er
// create the routes in the table
for _, route := range routes {
if r.opts.Logger.V(logger.TraceLevel) {
r.opts.Logger.Tracef(r.opts.Context, "Creating route %v domain: %v", route, network)
logger.Tracef("Creating route %v domain: %v", route, network)
if err := r.manageRoute(route, action); err != nil {
return err
@ -168,10 +173,10 @@ func (r *rtr) manageRoutes(service *register.Service, action, network string) er
return nil
// manageRegisterRoutes applies action to all routes of each service found in the register.
// manageRegistryRoutes applies action to all routes of each service found in the registry.
// It returns error if either the services failed to be listed or the routing table action fails.
func (r *rtr) loadRoutes(reg register.Register) error {
services, err := reg.ListServices(r.opts.Context, register.ListDomain(register.WildcardDomain))
func (r *rtr) loadRoutes(reg registry.Registry) error {
services, err := reg.ListServices(registry.ListDomain(registry.WildcardDomain))
if err != nil {
return fmt.Errorf("failed listing services: %v", err)
@ -186,9 +191,7 @@ func (r *rtr) loadRoutes(reg register.Register) error {
// if the routes exist save them
if len(routes) > 0 {
if r.opts.Logger.V(logger.TraceLevel) {
r.opts.Logger.Tracef(r.opts.Context, "Creating routes for service %v domain: %v", service, domain)
logger.Tracef("Creating routes for service %v domain: %v", service, domain)
for _, rt := range routes {
err := r.table.Create(rt)
@ -198,9 +201,7 @@ func (r *rtr) loadRoutes(reg register.Register) error {
if err != nil {
if r.opts.Logger.V(logger.ErrorLevel) {
r.opts.Logger.Errorf(r.opts.Context, "Error creating route for service %v in domain %v: %v", service, domain, err)
logger.Errorf("Error creating route for service %v in domain %v: %v", service, domain, err)
@ -209,11 +210,9 @@ func (r *rtr) loadRoutes(reg register.Register) error {
// otherwise get all the service info
// get the service to retrieve all its info
srvs, err := reg.LookupService(r.opts.Context, service.Name, register.LookupDomain(domain))
srvs, err := reg.GetService(service.Name, registry.GetDomain(domain))
if err != nil {
if r.opts.Logger.V(logger.TraceLevel) {
r.opts.Logger.Tracef(r.opts.Context, "Failed to get service %s domain: %s", service.Name, domain)
logger.Tracef("Failed to get service %s domain: %s", service.Name, domain)
@ -222,9 +221,7 @@ func (r *rtr) loadRoutes(reg register.Register) error {
routes := r.createRoutes(srv, domain)
if len(routes) > 0 {
if r.opts.Logger.V(logger.TraceLevel) {
r.opts.Logger.Tracef(r.opts.Context, "Creating routes for service %v domain: %v", srv, domain)
logger.Tracef("Creating routes for service %v domain: %v", srv, domain)
for _, rt := range routes {
err := r.table.Create(rt)
@ -234,9 +231,7 @@ func (r *rtr) loadRoutes(reg register.Register) error {
if err != nil {
if r.opts.Logger.V(logger.ErrorLevel) {
r.opts.Logger.Errorf(r.opts.Context, "Error creating route for service %v in domain %v: %v", service, domain, err)
logger.Errorf("Error creating route for service %v in domain %v: %v", service, domain, err)
@ -246,27 +241,52 @@ func (r *rtr) loadRoutes(reg register.Register) error {
return nil
// lookup retrieves all the routes for a given service and creates them in the routing table
func (r *rtr) lookup(service string) ([]router.Route, error) {
if r.opts.Logger.V(logger.TraceLevel) {
r.opts.Logger.Tracef(r.opts.Context, "Fetching route for %s domain: %v", service, register.WildcardDomain)
// Close the router
func (r *rtr) Close() error {
defer r.Unlock()
select {
case <-r.exit:
return nil
if !r.running {
return nil
services, err := r.opts.Register.LookupService(r.opts.Context, service, register.LookupDomain(register.WildcardDomain))
if err == register.ErrNotFound {
if r.opts.Logger.V(logger.TraceLevel) {
r.opts.Logger.Tracef(r.opts.Context, "Failed to find route for %s", service)
r.running = false
return nil
// lookup retrieves all the routes for a given service and creates them in the routing table
func (r *rtr) Lookup(service string, opts ...router.LookupOption) ([]router.Route, error) {
q := router.NewLookup(opts...)
// if we find the routes filter and return them
routes, err := r.table.Read(router.ReadService(service))
if err == nil {
routes = router.Filter(routes, q)
if len(routes) == 0 {
return nil, router.ErrRouteNotFound
return routes, nil
// lookup the route
logger.Tracef("Fetching route for %s domain: %v", service, registry.WildcardDomain)
services, err := r.options.Registry.GetService(service, registry.GetDomain(registry.WildcardDomain))
if err == registry.ErrNotFound {
logger.Tracef("Failed to find route for %s", service)
return nil, router.ErrRouteNotFound
} else if err != nil {
if r.opts.Logger.V(logger.TraceLevel) {
r.opts.Logger.Tracef(r.opts.Context, "Failed to find route for %s: %v", service, err)
logger.Tracef("Failed to find route for %s: %v", service, err)
return nil, fmt.Errorf("failed getting services: %v", err)
var routes []router.Route
for _, srv := range services {
domain := getDomain(srv)
// TODO: should we continue to send the event indicating we created a route?
@ -274,12 +294,23 @@ func (r *rtr) lookup(service string) ([]router.Route, error) {
routes = append(routes, r.createRoutes(srv, domain)...)
// if we're supposed to cache then save the routes
if r.options.Cache {
for _, route := range routes {
routes = router.Filter(routes, q)
if len(routes) == 0 {
return nil, router.ErrRouteNotFound
return routes, nil
// watchRegister watches register and updates routing table based on the received events.
// It returns error if either the register watcher fails with error or if the routing table update fails.
func (r *rtr) watchRegister(w register.Watcher) error {
// watchRegistry watches registry and updates routing table based on the received events.
// It returns error if either the registry watcher fails with error or if the routing table update fails.
func (r *rtr) watchRegistry(w registry.Watcher) error {
exit := make(chan bool)
defer func() {
@ -303,7 +334,7 @@ func (r *rtr) watchRegister(w register.Watcher) error {
// get the next service
res, err := w.Next()
if err != nil {
if err != register.ErrWatcherStopped {
if err != registry.ErrWatcherStopped {
return err
@ -311,15 +342,11 @@ func (r *rtr) watchRegister(w register.Watcher) error {
// don't process nil entries
if res.Service == nil {
if logger.V(logger.TraceLevel) {
logger.Trace(r.opts.Context, "Received a nil service")
logger.Trace("Received a nil service")
if r.opts.Logger.V(logger.TraceLevel) {
r.opts.Logger.Tracef(r.opts.Context, "Router dealing with next route %s %+v\n", res.Action, res.Service)
logger.Tracef("Router dealing with next route %s %+v\n", res.Action, res.Service)
// get the services domain from metadata. Fallback to wildcard.
domain := getDomain(res.Service)
@ -339,24 +366,17 @@ func (r *rtr) start() error {
return nil
if r.opts.Precache {
// add all local service routes into the routing table
if err := r.loadRoutes(r.opts.Register); err != nil {
return fmt.Errorf("failed loading register routes: %s", err)
// add default gateway into routing table
if r.opts.Gateway != "" {
if r.options.Gateway != "" {
// note, the only non-default value is the gateway
route := router.Route{
Service: "*",
Address: "*",
Gateway: r.opts.Gateway,
Gateway: r.options.Gateway,
Network: "*",
Router: r.opts.ID,
Router: r.options.Id,
Link: router.DefaultLink,
Metric: router.DefaultLocalMetric,
Metric: router.DefaultMetric,
if err := r.table.Create(route); err != nil {
return fmt.Errorf("failed adding default gateway route: %s", err)
@ -365,27 +385,59 @@ func (r *rtr) start() error {
// create error and exit channels
r.exit = make(chan bool)
r.running = true
// periodically refresh all the routes
// only cache if told to do so
if !r.options.Cache {
return nil
// create a refresh notify channel
refresh := make(chan bool, 1)
// fires the refresh for loading routes
refreshRoutes := func() {
select {
case refresh <- true:
// refresh all the routes in the event of a failure watching the registry
go func() {
t1 := time.NewTicker(RefreshInterval)
defer t1.Stop()
var lastRefresh time.Time
t2 := time.NewTicker(PruneInterval)
defer t2.Stop()
// load a refresh
for {
select {
case <-r.exit:
case <-t2.C:
case <-t1.C:
if err := r.loadRoutes(r.opts.Register); err != nil {
if r.opts.Logger.V(logger.DebugLevel) {
r.opts.Logger.Debugf(r.opts.Context, "failed refreshing register routes: %s", err)
case <-refresh:
// don't refresh if we've done so in the past minute
if !lastRefresh.IsZero() && time.Since(lastRefresh) < time.Minute {
// load new routes
if err := r.loadRoutes(r.options.Registry); err != nil {
logger.Debugf("failed refreshing registry routes: %s", err)
// in this don't prune
// first time so nothing to prune
if !lastRefresh.IsZero() {
// prune any routes since last refresh since we've
// updated basically everything we care about
// update the refresh time
lastRefresh = time.Now()
case <-time.After(RefreshInterval):
@ -396,69 +448,40 @@ func (r *rtr) start() error {
case <-r.exit:
if r.opts.Logger.V(logger.TraceLevel) {
r.opts.Logger.Tracef(r.opts.Context, "Router starting register watch")
w, err := r.opts.Register.Watch(r.opts.Context, register.WatchDomain(register.WildcardDomain))
logger.Tracef("Router starting registry watch")
w, err := r.options.Registry.Watch(registry.WatchDomain(registry.WildcardDomain))
if err != nil {
if r.opts.Logger.V(logger.DebugLevel) {
r.opts.Logger.Debug(r.opts.Context, "failed creating register watcher: %v", err)
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("failed creating registry watcher: %v", err)
// in the event of an error reload routes
// watchRegister calls stop when it's done
if err := r.watchRegister(w); err != nil {
if r.opts.Logger.V(logger.DebugLevel) {
r.opts.Logger.Debugf(r.opts.Context, "Error watching the register: %v", err)
// watchRegistry calls stop when it's done
if err := r.watchRegistry(w); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Error watching the registry: %v", err)
// in the event of an error reload routes
r.running = true
return nil
// Lookup routes in the routing table
func (r *rtr) Lookup(q ...router.QueryOption) ([]router.Route, error) {
return r.Table().Query(q...)
// Watch routes
func (r *rtr) Watch(opts ...router.WatchOption) (router.Watcher, error) {
return r.table.Watch(opts...)
// Close the router
func (r *rtr) Close() error {
defer r.Unlock()
select {
case <-r.exit:
return nil
if !r.running {
return nil
r.running = false
return nil
// String prints debugging information about router
func (r *rtr) String() string {
return "register"
func (r *rtr) Name() string {
return r.opts.Name
return "registry"

View File

@ -1,19 +1,16 @@
//go:build ignore
// +build ignore
package register
package registry
import (
func routerTestSetup() router.Router {
r := memory.NewRegister()
return NewRouter(router.Register(r))
r := memory.NewRegistry()
return NewRouter(router.Registry(r))
func TestRouterClose(t *testing.T) {
@ -22,7 +19,7 @@ func TestRouterClose(t *testing.T) {
if err := r.Close(); err != nil {
t.Errorf("failed to stop router: %v", err)
if len(os.Getenv("INTEGRATION_TESTS")) == 0 {
if len(os.Getenv("IN_TRAVIS_CI")) == 0 {
t.Logf("TestRouterStartStop STOPPED")

View File

@ -1,38 +1,33 @@
package register
package registry
import (
// table is an in-memory routing table
type table struct {
// lookup for a service
lookup func(string) ([]router.Route, error)
// routes stores service routes
routes map[string]map[uint64]*route
// watchers stores table watchers
watchers map[string]*tableWatcher
opts router.Options
type route struct {
updated time.Time
route router.Route
updated time.Time
// newtable creates a new routing table and returns it
func newTable(lookup func(string) ([]router.Route, error), opts ...router.Option) *table {
func newTable() *table {
return &table{
lookup: lookup,
routes: make(map[string]map[uint64]*route),
watchers: make(map[string]*tableWatcher),
opts: router.NewOptions(opts...),
@ -96,8 +91,8 @@ func (t *table) sendEvent(e *router.Event) {
defer t.RUnlock()
if len(e.ID) == 0 {
e.ID, _ = id.New()
if len(e.Id) == 0 {
e.Id = uuid.New().String()
for _, w := range t.watchers {
@ -129,10 +124,10 @@ func (t *table) Create(r router.Route) error {
// create the route
t.routes[service][sum] = &route{updated: time.Now(), route: r}
t.routes[service][sum] = &route{r, time.Now()}
if t.opts.Logger.V(logger.DebugLevel) {
t.opts.Logger.Debugf(t.opts.Context, "Router emitting %s for route: %s", router.Create, r.Address)
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Router emitting %s for route: %s", router.Create, r.Address)
// send a route created event
@ -165,8 +160,8 @@ func (t *table) Delete(r router.Route) error {
delete(t.routes, service)
if t.opts.Logger.V(logger.DebugLevel) {
t.opts.Logger.Debugf(t.opts.Context, "Router emitting %s for route: %s", router.Delete, r.Address)
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Router emitting %s for route: %s", router.Delete, r.Address)
go t.sendEvent(&router.Event{Type: router.Delete, Timestamp: time.Now(), Route: r})
@ -188,168 +183,53 @@ func (t *table) Update(r router.Route) error {
if _, ok := t.routes[service][sum]; !ok {
// update the route
t.routes[service][sum] = &route{updated: time.Now(), route: r}
t.routes[service][sum] = &route{r, time.Now()}
if t.opts.Logger.V(logger.DebugLevel) {
t.opts.Logger.Debugf(t.opts.Context, "Router emitting %s for route: %s", router.Update, r.Address)
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Router emitting %s for route: %s", router.Update, r.Address)
go t.sendEvent(&router.Event{Type: router.Update, Timestamp: time.Now(), Route: r})
return nil
// just update the route, but dont emit Update event
t.routes[service][sum] = &route{updated: time.Now(), route: r}
t.routes[service][sum] = &route{r, time.Now()}
return nil
// List returns a list of all routes in the table
func (t *table) List() ([]router.Route, error) {
// Read entries from the table
func (t *table) Read(opts ...router.ReadOption) ([]router.Route, error) {
var options router.ReadOptions
for _, o := range opts {
defer t.RUnlock()
var routes []router.Route
for _, rmap := range t.routes {
for _, route := range rmap {
routes = append(routes, route.route)
return routes, nil
// isMatch checks if the route matches given query options
func isMatch(route router.Route, address, gateway, network, rtr, link string) bool {
// matches the values provided
match := func(a, b string) bool {
if a == "*" || b == "*" || a == b {
return true
return false
// a simple struct to hold our values
type compare struct {
a string
b string
// compare the following values
values := []compare{
{gateway, route.Gateway},
{network, route.Network},
{rtr, route.Router},
{address, route.Address},
{link, route.Link},
for _, v := range values {
// attempt to match each value
if !match(v.a, v.b) {
return false
return true
// filterRoutes finds all the routes for given network and router and returns them
func filterRoutes(routes map[uint64]*route, opts router.QueryOptions) []router.Route {
address := opts.Address
gateway := opts.Gateway
network := opts.Network
rtr := opts.Router
link := opts.Link
// routeMap stores the routes we're going to advertise
routeMap := make(map[string][]router.Route)
var routeCnt int
for _, rt := range routes {
// get the actual route
route := rt.route
if isMatch(route, address, gateway, network, rtr, link) {
// add matchihg route to the routeMap
routeKey := route.Service + "@" + route.Network
routeMap[routeKey] = append(routeMap[routeKey], route)
results := make([]router.Route, 0, routeCnt)
for _, route := range routeMap {
results = append(results, route...)
return results
// Lookup queries routing table and returns all routes that match the lookup query
func (t *table) Query(q ...router.QueryOption) ([]router.Route, error) {
// create new query options
opts := router.NewQuery(q...)
// create a cwslicelist of query results
results := make([]router.Route, 0, len(t.routes))
// readAndFilter routes for this service under read lock.
readAndFilter := func(q router.QueryOptions) ([]router.Route, bool) {
defer t.RUnlock()
routes, ok := t.routes[q.Service]
if !ok || len(routes) == 0 {
return nil, false
return filterRoutes(routes, q), true
if opts.Service != "*" {
// try and load services from the cache
if routes, ok := readAndFilter(opts); ok {
return routes, nil
// lookup the route and try again
// TODO: move this logic out of the hot path
// being hammered on queries will require multiple lookups
routes, err := t.lookup(opts.Service)
if err != nil {
return nil, err
// cache the routes
for _, rt := range routes {
// try again
if routes, ok := readAndFilter(opts); ok {
return routes, nil
// get the routes based on options passed
if len(options.Service) > 0 {
routeMap, ok := t.routes[options.Service]
if !ok {
return nil, router.ErrRouteNotFound
// search through all destinations
for _, routes := range t.routes {
// filter the routes
found := filterRoutes(routes, opts)
// ensure we don't append zero length routes
if len(found) == 0 {
for _, rt := range routeMap {
routes = append(routes, rt.route)
results = append(results, found...)
return routes, nil
// otherwise get all routes
for _, serviceRoutes := range t.routes {
for _, rt := range serviceRoutes {
routes = append(routes, rt.route)
return results, nil
return routes, nil
// Watch returns routing table entry watcher
@ -364,11 +244,11 @@ func (t *table) Watch(opts ...router.WatchOption) (router.Watcher, error) {
w := &tableWatcher{
id: uuid.New().String(),
opts: wopts,
resChan: make(chan *router.Event, 10),
done: make(chan struct{}),
}, _ = id.New()
// when the watcher is stopped delete it
go func() {

View File

@ -1,23 +1,13 @@
//go:build ignore
// +build ignore
package register
package registry
import (
func testSetup(t *testing.T) (*table, router.Route) {
r, err := NewRouter()
if err != nil {
routr := r.(*rtr)
table := newTable(routr.lookup)
func testSetup() (*table, router.Route) {
table := newTable()
route := router.Route{
Service: "dest.svc",
@ -33,7 +23,7 @@ func testSetup(t *testing.T) (*table, router.Route) {
func TestCreate(t *testing.T) {
table, route := testSetup(t)
table, route := testSetup()
if err := table.Create(route); err != nil {
t.Fatalf("error adding route: %s", err)
@ -53,13 +43,13 @@ func TestCreate(t *testing.T) {
func TestDelete(t *testing.T) {
table, route := testSetup(t)
table, route := testSetup()
if err := table.Create(route); err != nil {
t.Fatalf("error adding route: %s", err)
// should fail to delete non-existent route
// should fail to delete non-existant route
prevSvc := route.Service
route.Service = "randDest"
@ -76,7 +66,7 @@ func TestDelete(t *testing.T) {
func TestUpdate(t *testing.T) {
table, route := testSetup(t)
table, route := testSetup()
if err := table.Create(route); err != nil {
t.Fatalf("error adding route: %s", err)
@ -98,7 +88,7 @@ func TestUpdate(t *testing.T) {
func TestList(t *testing.T) {
table, route := testSetup(t)
table, route := testSetup()
svc := []string{"one.svc", "two.svc", "three.svc"}
@ -109,7 +99,7 @@ func TestList(t *testing.T) {
routes, err := table.List()
routes, err := table.Read()
if err != nil {
t.Fatalf("error listing routes: %s", err)
@ -120,234 +110,22 @@ func TestList(t *testing.T) {
func TestQuery(t *testing.T) {
table, route := testSetup(t)
svc := []string{"svc1", "svc2", "svc3", "svc1"}
net := []string{"net1", "net2", "net1", "net3"}
gw := []string{"gw1", "gw2", "gw3", "gw3"}
rtr := []string{"rtr1", "rt2", "rt3", "rtr3"}
for i := 0; i < len(svc); i++ {
route.Service = svc[i]
route.Network = net[i]
route.Gateway = gw[i]
route.Router = rtr[i]
route.Link = router.DefaultLink
table, route := testSetup()
if err := table.Create(route); err != nil {
t.Fatalf("error adding route: %s", err)
// return all routes
routes, err := table.Query()
rt, err := table.Read(router.ReadService(route.Service))
if err != nil {
t.Fatalf("error looking up routes: %s", err)
} else if len(routes) == 0 {
t.Fatalf("error looking up routes: not found")
t.Fatal("Expected a route got err", err)
// query routes particular network
network := "net1"
routes, err = table.Query(router.QueryNetwork(network))
if err != nil {
t.Fatalf("error looking up routes: %s", err)
if len(rt) != 1 {
t.Fatalf("Expected one route got %d", len(rt))
if len(routes) != 2 {
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 2, len(routes))
for _, route := range routes {
if route.Network != network {
t.Fatalf("incorrect route returned. Expected network: %s, found: %s", network, route.Network)
// query routes for particular gateway
gateway := "gw1"
routes, err = table.Query(router.QueryGateway(gateway))
if err != nil {
t.Fatalf("error looking up routes: %s", err)
if len(routes) != 1 {
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 1, len(routes))
if routes[0].Gateway != gateway {
t.Fatalf("incorrect route returned. Expected gateway: %s, found: %s", gateway, routes[0].Gateway)
// query routes for particular router
rt := "rtr1"
routes, err = table.Query(router.QueryRouter(rt))
if err != nil {
t.Fatalf("error looking up routes: %s", err)
if len(routes) != 1 {
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 1, len(routes))
if routes[0].Router != rt {
t.Fatalf("incorrect route returned. Expected router: %s, found: %s", rt, routes[0].Router)
// query particular gateway and network
query := []router.QueryOption{
routes, err = table.Query(query...)
if err != nil {
t.Fatalf("error looking up routes: %s", err)
if len(routes) != 1 {
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 1, len(routes))
if routes[0].Gateway != gateway {
t.Fatalf("incorrect route returned. Expected gateway: %s, found: %s", gateway, routes[0].Gateway)
if routes[0].Network != network {
t.Fatalf("incorrect network returned. Expected network: %s, found: %s", network, routes[0].Network)
if routes[0].Router != rt {
t.Fatalf("incorrect route returned. Expected router: %s, found: %s", rt, routes[0].Router)
// non-existen route query
routes, err = table.Query(router.QueryService("foobar"))
if err != router.ErrRouteNotFound {
t.Fatalf("error looking up routes. Expected: %s, found: %s", router.ErrRouteNotFound, err)
if len(routes) != 0 {
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 0, len(routes))
// query NO routes
query = []router.QueryOption{
routes, err = table.Query(query...)
if err != nil {
t.Fatalf("error looking up routes: %s", err)
if len(routes) > 0 {
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 0, len(routes))
// insert local routes to query
for i := 0; i < 2; i++ {
route.Link = "foobar"
route.Address = fmt.Sprintf("local.route.address-%d", i)
if err := table.Create(route); err != nil {
t.Fatalf("error adding route: %s", err)
// query local routes
query = []router.QueryOption{
routes, err = table.Query(query...)
if err != nil {
t.Fatalf("error looking up routes: %s", err)
if len(routes) != 2 {
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 2, len(routes))
// add two different routes for svcX with different metric
for i := 0; i < 2; i++ {
route.Service = "svcX"
route.Address = fmt.Sprintf("svcX.route.address-%d", i)
route.Metric = int64(100 + i)
route.Link = router.DefaultLink
if err := table.Create(route); err != nil {
t.Fatalf("error adding route: %s", err)
query = []router.QueryOption{
routes, err = table.Query(query...)
if err != nil {
t.Fatalf("error looking up routes: %s", err)
if len(routes) != 2 {
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 1, len(routes))
func TestFallback(t *testing.T) {
r := &rtr{
options: router.NewOptions(),
route := router.Route{
Service: "",
Router: r.options.ID,
Link: router.DefaultLink,
Metric: router.DefaultLocalMetric,
r.table = newTable(func(s string) ([]router.Route, error) {
return []router.Route{route}, nil
rts, err := r.Lookup(router.QueryService(""))
if err != nil {
t.Fatalf("error looking up service %s", err)
if len(rts) != 1 {
t.Fatalf("incorrect number of routes returned %d", len(rts))
// deleting from the table but the next query should invoke the fallback that we passed during new table creation
if err := r.table.Delete(route); err != nil {
t.Fatalf("error deleting route %s", err)
rts, err = r.Lookup(router.QueryService(""))
if err != nil {
t.Fatalf("error looking up service %s", err)
if len(rts) != 1 {
t.Fatalf("incorrect number of routes returned %d", len(rts))
func TestFallbackError(t *testing.T) {
r := &rtr{
options: router.NewOptions(),
r.table = newTable(func(s string) ([]router.Route, error) {
return nil, fmt.Errorf("ERROR")
_, err := r.Lookup(router.QueryService(""))
if err == nil {
t.Fatalf("expected error looking up service but none returned")
if rt[0].Hash() != route.Hash() {
t.Fatal("Mismatched routes received")

View File

@ -1,18 +1,18 @@
package register
package registry
import (
// tableWatcher implements routing table Watcher
type tableWatcher struct {
resChan chan *router.Event
done chan struct{}
id string
opts router.WatchOptions
resChan chan *router.Event
done chan struct{}
// Next returns the next noticed action taken on table