diff --git a/network/router/default_router.go b/network/router/default_router.go index 872dfc93..a9517f9d 100644 --- a/network/router/default_router.go +++ b/network/router/default_router.go @@ -39,7 +39,7 @@ type router struct { sync.RWMutex } -// newRouter creates new router and returns it +// newRouter creates a new router and returns it func newRouter(opts ...Option) Router { // get default options options := DefaultOptions() @@ -51,7 +51,7 @@ func newRouter(opts ...Option) Router { return &router{ opts: options, - status: Status{Error: nil, Code: Init}, + status: Status{Error: nil, Code: Stopped}, exit: make(chan struct{}), eventChan: make(chan *Event), advertChan: make(chan *Advert), @@ -92,11 +92,39 @@ func (r *router) Network() string { return r.opts.Network } -// addServiceRoutes adds all services in given registry to the routing table. -// NOTE: this is a one-off operation done when bootstrapping the router -// It returns error if either the services failed to be listed or -// if any of the the routes failed to be added to the routing table. -func (r *router) addServiceRoutes(reg registry.Registry, network string, metric int) error { +// manageServiceRoutes manages the routes for a given service. +// It returns error of the routing table action fails with error. +func (r *router) manageServiceRoutes(service *registry.Service, action string, metric int) error { + // action is the routing table action + action = strings.ToLower(action) + // take route action on each service node + for _, node := range service.Nodes { + route := Route{ + Destination: service.Name, + Gateway: node.Address, + Router: r.opts.Address, + Network: r.opts.Network, + Metric: metric, + } + switch action { + case "insert", "create": + if err := r.opts.Table.Add(route); err != nil && err != ErrDuplicateRoute { + return fmt.Errorf("failed adding route for service %s: %s", service.Name, err) + } + case "delete": + if err := r.opts.Table.Delete(route); err != nil && err != ErrRouteNotFound { + return fmt.Errorf("failed deleting route for service %v: %s", service.Name, err) + } + default: + return fmt.Errorf("failed to manage route for service %v. Unknown action: %s", service.Name, action) + } + } + return nil +} + +// manageRegistryRoutes manages routes for each service found in the registry. +// It returns error if either the services failed to be listed or if the routing table action fails wirh error +func (r *router) manageRegistryRoutes(reg registry.Registry, action string, metric int) error { services, err := reg.ListServices() if err != nil { return fmt.Errorf("failed listing services: %v", err) @@ -107,27 +135,13 @@ func (r *router) addServiceRoutes(reg registry.Registry, network string, metric // get the service to retrieve all its info srvs, err := reg.GetService(service.Name) if err != nil { - log.Logf("r.addServiceRoutes() GetService() error: %v", err) + log.Logf("r.manageRegistryRoutes() GetService() error: %v", err) continue } - - // create a flat slide of nodes - var nodes []*registry.Node + // manage the routes for all return services for _, s := range srvs { - nodes = append(nodes, s.Nodes...) - } - - // range over the flat slice of nodes - for _, node := range nodes { - route := Route{ - Destination: service.Name, - Gateway: node.Address, - Router: r.opts.Address, - Network: r.opts.Network, - Metric: metric, - } - if err := r.opts.Table.Add(route); err != nil && err != ErrDuplicateRoute { - return fmt.Errorf("error adding route for service %s: %s", service.Name, err) + if err := r.manageServiceRoutes(s, action, metric); err != nil { + return err } } } @@ -160,39 +174,8 @@ func (r *router) watchServices(w registry.Watcher) error { log.Logf("r.watchServices() new service event: Action: %s Service: %v", res.Action, res.Service) - switch res.Action { - case "create": - // range over the flat slice of nodes - for _, node := range res.Service.Nodes { - gateway := node.Address - if node.Port > 0 { - gateway = fmt.Sprintf("%s:%d", node.Address, node.Port) - } - route := Route{ - Destination: res.Service.Name, - Gateway: gateway, - Router: r.opts.Address, - Network: r.opts.Network, - Metric: DefaultLocalMetric, - } - if err := r.opts.Table.Add(route); err != nil && err != ErrDuplicateRoute { - return fmt.Errorf("error adding route for service %s: %s", res.Service.Name, err) - } - } - case "delete": - for _, node := range res.Service.Nodes { - route := Route{ - Destination: res.Service.Name, - Gateway: node.Address, - Router: r.opts.Address, - Network: r.opts.Network, - Metric: DefaultLocalMetric, - } - // only return error if the route is not in the table, but something else has failed - if err := r.opts.Table.Delete(route); err != nil && err != ErrRouteNotFound { - return fmt.Errorf("failed adding route for service %v: %s", res.Service.Name, err) - } - } + if err := r.manageServiceRoutes(res.Service, res.Action, DefaultLocalMetric); err != nil { + return err } } @@ -431,8 +414,8 @@ func (r *router) Advertise() (<-chan *Advert, error) { defer r.Unlock() if r.status.Code != Running { - // add local service routes into the routing table - if err := r.addServiceRoutes(r.opts.Registry, "local", DefaultLocalMetric); err != nil { + // add all local service routes into the routing table + if err := r.manageRegistryRoutes(r.opts.Registry, "insert", DefaultLocalMetric); err != nil { return nil, fmt.Errorf("failed adding routes: %v", err) } log.Logf("Routing table:\n%s", r.opts.Table) @@ -533,7 +516,7 @@ func (r *router) Update(a *Advert) error { Router: event.Route.Router, Network: event.Route.Network, Metric: event.Route.Metric, - Policy: AddIfNotExists, + Policy: Insert, } if err := r.opts.Table.Update(route); err != nil { return fmt.Errorf("failed updating routing table: %v", err) diff --git a/network/router/default_table.go b/network/router/default_table.go index 28659cbd..9ae35e41 100644 --- a/network/router/default_table.go +++ b/network/router/default_table.go @@ -2,8 +2,6 @@ package router import ( "fmt" - "hash" - "hash/fnv" "strings" "sync" @@ -12,25 +10,22 @@ import ( "github.com/olekukonko/tablewriter" ) -// TableOptions are routing table options +// TableOptions specify routing table options // TODO: table options TBD in the future type TableOptions struct{} -// table is in memory routing table +// table is an in memory routing table type table struct { // opts are table options opts TableOptions - // TODO: we should stop key-ing on destination // m stores routing table map m map[string]map[uint64]Route - // h hashes route entries - h hash.Hash64 // w is a list of table watchers w map[string]*tableWatcher sync.RWMutex } -// newTable creates in memory routing table and returns it +// newTable creates a new routing table and returns it func newTable(opts ...TableOption) Table { // default options var options TableOptions @@ -40,14 +35,10 @@ func newTable(opts ...TableOption) Table { o(&options) } - h := fnv.New64() - h.Reset() - return &table{ opts: options, m: make(map[string]map[uint64]Route), w: make(map[string]*tableWatcher), - h: h, } } @@ -67,12 +58,12 @@ func (t *table) Options() TableOptions { // Add adds a route to the routing table func (t *table) Add(r Route) error { destAddr := r.Destination - sum := t.hash(r) + sum := r.Hash() t.Lock() defer t.Unlock() - // check if the destination has any routes in the table + // check if there are any routes in the table for the route destination if _, ok := t.m[destAddr]; !ok { t.m[destAddr] = make(map[uint64]Route) t.m[destAddr][sum] = r @@ -80,7 +71,7 @@ func (t *table) Add(r Route) error { return nil } - // add new route to the table for the given destination + // add new route to the table for the route destination if _, ok := t.m[destAddr][sum]; !ok { t.m[destAddr][sum] = r go t.sendEvent(&Event{Type: CreateEvent, Route: r}) @@ -88,15 +79,15 @@ func (t *table) Add(r Route) error { } // only add the route if the route override is explicitly requested - if _, ok := t.m[destAddr][sum]; ok && r.Policy == OverrideIfExists { + if _, ok := t.m[destAddr][sum]; ok && r.Policy == Override { t.m[destAddr][sum] = r go t.sendEvent(&Event{Type: UpdateEvent, Route: r}) return nil } - // if we reached this point without already returning the route already exists + // if we reached this point the route must already exist // we return nil only if explicitly requested by the client - if r.Policy == IgnoreIfExists { + if r.Policy == Skip { return nil } @@ -105,12 +96,12 @@ func (t *table) Add(r Route) error { // Delete deletes the route from the routing table func (t *table) Delete(r Route) error { + destAddr := r.Destination + sum := r.Hash() + t.Lock() defer t.Unlock() - destAddr := r.Destination - sum := t.hash(r) - if _, ok := t.m[destAddr]; !ok { return ErrRouteNotFound } @@ -121,17 +112,17 @@ func (t *table) Delete(r Route) error { return nil } -// Update updates routing table with new route +// Update updates routing table with the new route func (t *table) Update(r Route) error { destAddr := r.Destination - sum := t.hash(r) + sum := r.Hash() t.Lock() defer t.Unlock() - // check if the destAddr has ANY routes in the table + // check if the route destination has any routes in the table if _, ok := t.m[destAddr]; !ok { - if r.Policy == AddIfNotExists { + if r.Policy == Insert { t.m[destAddr] = make(map[uint64]Route) t.m[destAddr][sum] = r go t.sendEvent(&Event{Type: CreateEvent, Route: r}) @@ -140,8 +131,9 @@ func (t *table) Update(r Route) error { return ErrRouteNotFound } - // check if destination has this particular router in the table - if _, ok := t.m[destAddr][sum]; !ok && r.Policy == AddIfNotExists { + // check if the route for the route destination already exists + // NOTE: We only insert the route if explicitly requested by the client + if _, ok := t.m[destAddr][sum]; !ok && r.Policy == Insert { t.m[destAddr][sum] = r go t.sendEvent(&Event{Type: CreateEvent, Route: r}) return nil @@ -299,11 +291,3 @@ func (t *table) String() string { return sb.String() } - -// hash hashes the route using router gateway and network address -func (t *table) hash(r Route) uint64 { - t.h.Reset() - t.h.Write([]byte(r.Destination + r.Gateway + r.Network)) - - return t.h.Sum64() -} diff --git a/network/router/default_table_test.go b/network/router/default_table_test.go index 9098e5ad..46b8b034 100644 --- a/network/router/default_table_test.go +++ b/network/router/default_table_test.go @@ -2,7 +2,6 @@ package router import "testing" -// creates routing table and test route func testSetup() (Table, Route) { table := NewTable() @@ -35,32 +34,32 @@ func TestAdd(t *testing.T) { testTableSize += 1 // overrides an existing route - // NOTE: the size of the table should not change route.Metric = 100 - route.Policy = OverrideIfExists + route.Policy = Override if err := table.Add(route); err != nil { t.Errorf("error adding route: %s", err) } + // the size of the table should not change when Override policy is used if table.Size() != testTableSize { t.Errorf("invalid number of routes. expected: %d, given: %d", testTableSize, table.Size()) } // dont add new route if it already exists - // NOTE: The size of the table should not change - route.Policy = IgnoreIfExists + route.Policy = Skip if err := table.Add(route); err != nil { t.Errorf("error adding route: %s", err) } + // the size of the table should not change if Skip policy is used if table.Size() != testTableSize { t.Errorf("invalid number of routes. expected: %d, given: %d", testTableSize, table.Size()) } - // adding the same route under AddIfNotExists policy must error - route.Policy = AddIfNotExists + // adding the same route under Insert policy must error + route.Policy = Insert if err := table.Add(route); err != ErrDuplicateRoute { t.Errorf("error adding route. Expected error: %s, Given: %s", ErrDuplicateRoute, err) @@ -107,18 +106,17 @@ func TestUpdate(t *testing.T) { testTableSize += 1 // change the metric of the original route - // NOTE: this should NOT change the size of the table route.Metric = 200 if err := table.Update(route); err != nil { t.Errorf("error updating route: %s", err) } + // the size of the table should not change as we're only updating the metric of an existing route if table.Size() != testTableSize { t.Errorf("invalid number of routes. expected: %d, given: %d", testTableSize, table.Size()) } - // NOTE: routing table routes on // this should add a new route route.Destination = "new.dest" @@ -127,12 +125,11 @@ func TestUpdate(t *testing.T) { } testTableSize += 1 - // NOTE: default policy is AddIfNotExists so the new route will be added here + // Default policy is Insert so the new route will be added here since the route does not exist if table.Size() != testTableSize { t.Errorf("invalid number of routes. expected: %d, given: %d", testTableSize, table.Size()) } - // NOTE: we are hashing routes on // this should add a new route route.Gateway = "new.gw" @@ -145,9 +142,9 @@ func TestUpdate(t *testing.T) { t.Errorf("invalid number of routes. expected: %d, given: %d", testTableSize, table.Size()) } - // this should NOT add a new route as we are setting the policy to IgnoreIfExists + // this should NOT add a new route as we are setting the policy to Skip route.Destination = "rand.dest" - route.Policy = IgnoreIfExists + route.Policy = Skip if err := table.Update(route); err != ErrRouteNotFound { t.Errorf("error updating route. Expected error: %s, given: %s", ErrRouteNotFound, err) diff --git a/network/router/options.go b/network/router/options.go index 2426fa32..f6413a53 100644 --- a/network/router/options.go +++ b/network/router/options.go @@ -9,7 +9,7 @@ var ( // DefaultAddress is default router address DefaultAddress = ":9093" // DefaultNetwork is default micro network - DefaultNetwork = "local" + DefaultNetwork = "micro.mu" ) // Options are router options diff --git a/network/router/route.go b/network/router/route.go index 5e3cd8e5..f995d9d4 100644 --- a/network/router/route.go +++ b/network/router/route.go @@ -9,33 +9,33 @@ import ( ) var ( - // DefaultLocalMetric is default route cost for local network + // DefaultLocalMetric is default route cost metric for the local network DefaultLocalMetric = 1 - // DefaultNetworkMetric is default route cost for micro network + // DefaultNetworkMetric is default route cost metric for the micro network DefaultNetworkMetric = 10 ) -// RoutePolicy defines routing table addition policy +// RoutePolicy defines routing table policy type RoutePolicy int const ( - // AddIfNotExist adds the route if it does not exist - AddIfNotExists RoutePolicy = iota - // OverrideIfExists overrides route if it already exists - OverrideIfExists - // IgnoreIfExists instructs to not modify existing route - IgnoreIfExists + // Insert inserts a new route if it does not already exist + Insert RoutePolicy = iota + // Override overrides the route if it already exists + Override + // Skip skips modifying the route if it already exists + Skip ) // String returns human reprensentation of policy func (p RoutePolicy) String() string { switch p { - case AddIfNotExists: - return "ADD_IF_NOT_EXISTS" - case OverrideIfExists: - return "OVERRIDE_IF_EXISTS" - case IgnoreIfExists: - return "IGNORE_IF_EXISTS" + case Insert: + return "INSERT" + case Override: + return "OVERRIDE" + case Skip: + return "SKIP" default: return "UNKNOWN" } @@ -47,9 +47,9 @@ type Route struct { Destination string // Gateway is route gateway Gateway string - // Router is the network router address + // Router is the router address Router string - // Network is micro network address + // Network is network address Network string // Metric is the route cost metric Metric int @@ -66,7 +66,7 @@ func (r *Route) Hash() uint64 { return h.Sum64() } -// String allows to print the route +// String returns human readable route func (r Route) String() string { // this will help us build routing table string sb := &strings.Builder{} diff --git a/network/router/router.go b/network/router/router.go index 20c05fa0..ea91538c 100644 --- a/network/router/router.go +++ b/network/router/router.go @@ -14,7 +14,7 @@ type Router interface { Init(...Option) error // Options returns the router options Options() Options - // ID returns the id of the router + // ID returns the ID of the router ID() string // Table returns the routing table Table() Table @@ -22,7 +22,7 @@ type Router interface { Address() string // Network returns the network address of the router Network() string - // Advertise starts advertising routes to the network + // Advertise advertises routes to the network Advertise() (<-chan *Advert, error) // Update updates the routing table Update(*Advert) error @@ -59,7 +59,7 @@ func (ut UpdateType) String() string { } } -// Advert is sent by the router to the network +// Advert contains a list of events advertised by the router to the network type Advert struct { // ID is the router ID ID string @@ -81,10 +81,8 @@ type Status struct { } const ( - // Init means the rotuer has just been initialized - Init StatusCode = iota // Running means the router is up and running - Running + Running StatusCode = iota // Stopped means the router has been stopped Stopped // Error means the router has encountered error @@ -94,8 +92,6 @@ const ( // String returns human readable status code func (sc StatusCode) String() string { switch sc { - case Init: - return "INITIALIZED" case Running: return "RUNNING" case Stopped: