From b82245429eb158eee541223bc32f257d5d714e0f Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Mon, 8 Jul 2019 21:03:54 +0100 Subject: [PATCH] Simplified table logic. Lookup tests. mucp/cient update --- client/selector/router/router.go | 11 +- network/proxy/mucp/mucp.go | 11 +- network/router/default.go | 111 +++++++++--------- network/router/router.go | 7 +- network/router/table/default.go | 31 +---- network/router/table/default_test.go | 163 +++++++++++++++++---------- network/router/table/query.go | 1 + network/router/table/route.go | 32 +----- network/router/table/watcher.go | 10 +- 9 files changed, 183 insertions(+), 194 deletions(-) diff --git a/client/selector/router/router.go b/client/selector/router/router.go index 4bcdecd1..65e69702 100644 --- a/client/selector/router/router.go +++ b/client/selector/router/router.go @@ -11,6 +11,7 @@ import ( "github.com/micro/go-micro/client/selector" "github.com/micro/go-micro/network/router" pb "github.com/micro/go-micro/network/router/proto" + "github.com/micro/go-micro/network/router/table" "github.com/micro/go-micro/registry" ) @@ -40,11 +41,11 @@ type clientKey struct{} type routerKey struct{} // getRoutes returns the routes whether they are remote or local -func (r *routerSelector) getRoutes(service string) ([]router.Route, error) { +func (r *routerSelector) getRoutes(service string) ([]table.Route, error) { if !r.remote { // lookup router for routes for the service - return r.r.Table().Lookup(router.NewQuery( - router.QueryDestination(service), + return r.r.Table().Lookup(table.NewQuery( + table.QueryDestination(service), )) } @@ -101,11 +102,11 @@ func (r *routerSelector) getRoutes(service string) ([]router.Route, error) { return nil, selector.ErrNoneAvailable } - var routes []router.Route + var routes []table.Route // convert from pb to []*router.Route for _, r := range pbRoutes.Routes { - routes = append(routes, router.Route{ + routes = append(routes, table.Route{ Destination: r.Destination, Gateway: r.Gateway, Router: r.Router, diff --git a/network/proxy/mucp/mucp.go b/network/proxy/mucp/mucp.go index 1abe976b..d0ecb81f 100644 --- a/network/proxy/mucp/mucp.go +++ b/network/proxy/mucp/mucp.go @@ -18,6 +18,7 @@ import ( "github.com/micro/go-micro/server" pb "github.com/micro/go-micro/network/router/proto" + "github.com/micro/go-micro/network/router/table" ) // Proxy will transparently proxy requests to an endpoint. @@ -40,7 +41,7 @@ type Proxy struct { // A fib of routes service:address sync.RWMutex - Routes map[string][]router.Route + Routes map[string][]table.Route } // read client request and write to server @@ -80,7 +81,7 @@ func readLoop(r server.Request, s client.Stream) error { func (p *Proxy) getRoute(service string) ([]string, error) { // converts routes to just addresses - toNodes := func(routes []router.Route) []string { + toNodes := func(routes []table.Route) []string { var nodes []string for _, node := range routes { nodes = append(nodes, node.Gateway) @@ -106,7 +107,7 @@ func (p *Proxy) getRoute(service string) ([]string, error) { if p.Router != nil { // lookup the router routes, err := p.Router.Table().Lookup( - router.NewQuery(router.QueryDestination(service)), + table.NewQuery(table.QueryDestination(service)), ) if err != nil { return nil, err @@ -114,7 +115,7 @@ func (p *Proxy) getRoute(service string) ([]string, error) { p.Lock() if p.Routes == nil { - p.Routes = make(map[string][]router.Route) + p.Routes = make(map[string][]table.Route) } p.Routes[service] = routes p.Unlock() @@ -203,7 +204,7 @@ func (p *Proxy) getRoute(service string) ([]string, error) { // convert from pb to []*router.Route for _, r := range pbRoutes.Routes { - routes = append(routes, router.Route{ + routes = append(routes, table.Route{ Destination: r.Destination, Gateway: r.Gateway, Router: r.Router, diff --git a/network/router/default.go b/network/router/default.go index 44e07108..b77b40fe 100644 --- a/network/router/default.go +++ b/network/router/default.go @@ -15,18 +15,20 @@ import ( ) const ( - // UpdateRoutePenalty penalises route updates - UpdateRoutePenalty = 500 - // DeleteRoutePenalty penalises route deletes - DeleteRoutePenalty = 1000 // AdvertiseTick is time interval in which we advertise route updates AdvertiseTick = 5 * time.Second // AdvertSuppress is advert suppression threshold AdvertSuppress = 2000 - // AdvertRecover is advert suppression recovery threshold + // AdvertRecover is advert recovery threshold AdvertRecover = 750 - // PenaltyDecay is the "half-life" of the penalty + // DefaultAdvertTTL is default advertisement TTL + DefaultAdvertTTL = time.Minute + // PenaltyDecay is the penalty decay PenaltyDecay = 1.15 + // Delete penalises route addition and deletion + Delete = 1000 + // UpdatePenalty penalises route updates + UpdatePenalty = 500 ) // router provides default router implementation @@ -93,8 +95,8 @@ func (r *router) Network() string { return r.opts.Network } -// manageServiceRoutes manages the routes for a given service. -// It returns error of the routing table action fails with error. +// manageServiceRoutes manages routes for a given service. +// It returns error of the routing table action fails. func (r *router) manageServiceRoutes(service *registry.Service, action string, metric int) error { // action is the routing table action action = strings.ToLower(action) @@ -124,7 +126,7 @@ func (r *router) manageServiceRoutes(service *registry.Service, action string, m } // manageRegistryRoutes manages routes for each service found in the registry. -// It returns error if either the services failed to be listed or if the routing table action fails wirh error +// It returns error if either the services failed to be listed or the routing table action fails. func (r *router) manageRegistryRoutes(reg registry.Registry, action string, metric int) error { services, err := reg.ListServices() if err != nil { @@ -222,66 +224,60 @@ func (r *router) watchTable(w table.Watcher) error { return watchErr } -func eventFlap(curr, prev *table.Event) bool { +// isFlapping detects if the event is flapping based on the current and previous event status. +func isFlapping(curr, prev *table.Event) bool { if curr.Type == table.Update && prev.Type == table.Update { - // update flap: this can be either metric or whatnot - log.Logf("eventFlap(): Update flap") + log.Logf("isFlapping(): Update flap") return true } - if curr.Type == table.Create && prev.Type == table.Delete || curr.Type == table.Delete && prev.Type == table.Create { - log.Logf("eventFlap(): Create/Delete flap") + if curr.Type == table.Insert && prev.Type == table.Delete || curr.Type == table.Delete && prev.Type == table.Insert { + log.Logf("isFlapping(): Create/Delete flap") return true } return false } +// updateEvent is a table event enriched with advertisement data +type updateEvent struct { + *table.Event + // timestamp marks the time the event has been received + timestamp time.Time + // penalty is current event penalty + penalty float64 + // isSuppressed flags if the event should be considered for flap detection + isSuppressed bool + // isFlapping marks the event as flapping event + isFlapping bool +} + // processEvents processes routing table events. // It suppresses unhealthy flapping events and advertises healthy events upstream. func (r *router) processEvents() error { // ticker to periodically scan event for advertising ticker := time.NewTicker(AdvertiseTick) - - // advertEvent is a table event enriched with advert data - type advertEvent struct { - *table.Event - timestamp time.Time - penalty float64 - isSuppressed bool - isFlapping bool - } - - // eventMap is a map of advert events that might end up being advertised - eventMap := make(map[uint64]*advertEvent) + // eventMap is a map of advert events + eventMap := make(map[uint64]*updateEvent) // lock to protect access to eventMap mu := &sync.RWMutex{} // waitgroup to manage advertisement goroutines var wg sync.WaitGroup -process: +processLoop: for { select { case <-ticker.C: var events []*table.Event - // decay the penalties of existing events + // collect all events which are not flapping mu.Lock() - for advert, event := range eventMap { - delta := time.Since(event.timestamp).Seconds() - event.penalty = event.penalty * math.Exp(delta) - // suppress or recover the event based on its current penalty - if !event.isSuppressed && event.penalty > AdvertSuppress { - event.isSuppressed = true - } else if event.penalty < AdvertRecover { - event.isSuppressed = false - event.isFlapping = false - } - if !event.isFlapping { + for key, event := range eventMap { + if !event.isFlapping && !event.isSuppressed { e := new(table.Event) *e = *event.Event events = append(events, e) // this deletes the advertised event from the map - delete(eventMap, advert) + delete(eventMap, key) } } mu.Unlock() @@ -301,12 +297,6 @@ process: select { case r.advertChan <- a: - mu.Lock() - // once we've advertised the events, we need to delete them - for _, event := range a.Events { - delete(eventMap, event.Route.Hash()) - } - mu.Unlock() case <-r.exit: log.Logf("go advertise(): exit") return @@ -315,7 +305,9 @@ process: }(events) } case e := <-r.eventChan: - // if event is nil, break + // event timestamp + now := time.Now() + // if event is nil, continue if e == nil { continue } @@ -324,15 +316,15 @@ process: var penalty float64 switch e.Type { case table.Update: - penalty = UpdateRoutePenalty - case table.Create, table.Delete: - penalty = DeleteRoutePenalty + penalty = UpdatePenalty + case table.Delete: + penalty = Delete } // we use route hash as eventMap key hash := e.Route.Hash() event, ok := eventMap[hash] if !ok { - event = &advertEvent{ + event = &updateEvent{ Event: e, penalty: penalty, timestamp: time.Now(), @@ -342,8 +334,8 @@ process: } // update penalty for existing event: decay existing and add new penalty delta := time.Since(event.timestamp).Seconds() - event.penalty = event.penalty*math.Exp(delta) + penalty - event.timestamp = time.Now() + event.penalty = event.penalty*math.Exp(-delta) + penalty + event.timestamp = now // suppress or recover the event based on its current penalty if !event.isSuppressed && event.penalty > AdvertSuppress { event.isSuppressed = true @@ -352,11 +344,11 @@ process: } // if not suppressed decide if if its flapping if !event.isSuppressed { - // detect if its flapping - event.isFlapping = eventFlap(e, event.Event) + // detect if its flapping by comparing current and previous event + event.isFlapping = isFlapping(e, event.Event) } case <-r.exit: - break process + break processLoop } } @@ -438,8 +430,7 @@ func (r *router) Advertise() (<-chan *Advert, error) { } } - // NOTE: we only need to recreate the exit/advertChan if the router errored or was stopped - // TODO: these channels most likely won't have to be the struct fields + // NOTE: we only need to recreate these if the router errored or was stopped if r.status.Code == Error || r.status.Code == Stopped { r.exit = make(chan struct{}) r.eventChan = make(chan *table.Event) @@ -490,6 +481,9 @@ func (r *router) Advertise() (<-chan *Advert, error) { r.wg.Add(1) go r.watchErrors(errChan) + // TODO: send router announcement update comes here + // the announcement update contains routes from routing table + // mark router as running and set its Error to nil status := Status{ Code: Running, @@ -520,7 +514,6 @@ func (r *router) Update(a *Advert) error { Router: event.Route.Router, Network: event.Route.Network, Metric: event.Route.Metric, - Policy: table.Insert, } if err := r.opts.Table.Update(route); err != nil { return fmt.Errorf("failed updating routing table: %v", err) diff --git a/network/router/router.go b/network/router/router.go index e0f7d676..4fd53c07 100644 --- a/network/router/router.go +++ b/network/router/router.go @@ -20,12 +20,12 @@ type Router interface { Options() Options // ID returns the ID of the router ID() string - // Table returns the routing table - Table() table.Table // Address returns the router adddress Address() string // Network returns the network address of the router Network() string + // Table returns the routing table + Table() table.Table // Advertise advertises routes to the network Advertise() (<-chan *Advert, error) // Update updates the routing table @@ -69,6 +69,9 @@ type Advert struct { ID string // Timestamp marks the time when the update is sent Timestamp time.Time + // TTL is Advert TTL + // TODO: not used + TTL time.Time // Events is a list of routing table events to advertise Events []*table.Event } diff --git a/network/router/table/default.go b/network/router/table/default.go index 3480183e..7a95716c 100644 --- a/network/router/table/default.go +++ b/network/router/table/default.go @@ -67,27 +67,14 @@ func (t *table) Add(r Route) error { if _, ok := t.m[destAddr]; !ok { t.m[destAddr] = make(map[uint64]Route) t.m[destAddr][sum] = r - go t.sendEvent(&Event{Type: Create, Route: r}) + go t.sendEvent(&Event{Type: Insert, Route: r}) return nil } // add new route to the table for the route destination if _, ok := t.m[destAddr][sum]; !ok { t.m[destAddr][sum] = r - go t.sendEvent(&Event{Type: Create, Route: r}) - return nil - } - - // only add the route if the route override is explicitly requested - if _, ok := t.m[destAddr][sum]; ok && r.Policy == Override { - t.m[destAddr][sum] = r - go t.sendEvent(&Event{Type: Update, Route: r}) - return nil - } - - // if we reached this point the route must already exist - // we return nil only if explicitly requested by the client - if r.Policy == Skip { + go t.sendEvent(&Event{Type: Insert, Route: r}) return nil } @@ -122,23 +109,9 @@ func (t *table) Update(r Route) error { // check if the route destination has any routes in the table if _, ok := t.m[destAddr]; !ok { - if r.Policy == Insert { - t.m[destAddr] = make(map[uint64]Route) - t.m[destAddr][sum] = r - go t.sendEvent(&Event{Type: Create, Route: r}) - return nil - } return ErrRouteNotFound } - // check if the route for the route destination already exists - // NOTE: We only insert the route if explicitly requested by the client - if _, ok := t.m[destAddr][sum]; !ok && r.Policy == Insert { - t.m[destAddr][sum] = r - go t.sendEvent(&Event{Type: Create, Route: r}) - return nil - } - // if the route has been found update it if _, ok := t.m[destAddr][sum]; ok { t.m[destAddr][sum] = r diff --git a/network/router/table/default_test.go b/network/router/table/default_test.go index a0c364b7..6447cf03 100644 --- a/network/router/table/default_test.go +++ b/network/router/table/default_test.go @@ -33,36 +33,13 @@ func TestAdd(t *testing.T) { } testTableSize += 1 - // overrides an existing route - route.Metric = 100 - route.Policy = Override - - if err := table.Add(route); err != nil { - t.Errorf("error adding route: %s", err) - } - - // the size of the table should not change when Override policy is used if table.Size() != testTableSize { - t.Errorf("invalid number of routes. expected: %d, given: %d", testTableSize, table.Size()) - } - - // dont add new route if it already exists - route.Policy = Skip - - if err := table.Add(route); err != nil { - t.Errorf("error adding route: %s", err) - } - - // the size of the table should not change if Skip policy is used - if table.Size() != testTableSize { - t.Errorf("invalid number of routes. expected: %d, given: %d", testTableSize, table.Size()) + t.Errorf("invalid number of routes. expected: %d, found: %d", testTableSize, table.Size()) } // adding the same route under Insert policy must error - route.Policy = Insert - if err := table.Add(route); err != ErrDuplicateRoute { - t.Errorf("error adding route. Expected error: %s, Given: %s", ErrDuplicateRoute, err) + t.Errorf("error adding route. Expected error: %s, found: %s", ErrDuplicateRoute, err) } } @@ -80,7 +57,7 @@ func TestDelete(t *testing.T) { route.Destination = "randDest" if err := table.Delete(route); err != ErrRouteNotFound { - t.Errorf("error deleting route. Expected error: %s, given: %s", ErrRouteNotFound, err) + t.Errorf("error deleting route. Expected error: %s, found: %s", ErrRouteNotFound, err) } // we should be able to delete the existing route @@ -92,7 +69,7 @@ func TestDelete(t *testing.T) { testTableSize -= 1 if table.Size() != testTableSize { - t.Errorf("invalid number of routes. expected: %d, given: %d", testTableSize, table.Size()) + t.Errorf("invalid number of routes. expected: %d, found: %d", testTableSize, table.Size()) } } @@ -114,44 +91,18 @@ func TestUpdate(t *testing.T) { // the size of the table should not change as we're only updating the metric of an existing route if table.Size() != testTableSize { - t.Errorf("invalid number of routes. expected: %d, given: %d", testTableSize, table.Size()) + t.Errorf("invalid number of routes. expected: %d, found: %d", testTableSize, table.Size()) } - // this should add a new route - route.Destination = "new.dest" - - if err := table.Update(route); err != nil { - t.Errorf("error updating route: %s", err) - } - testTableSize += 1 - - // Default policy is Insert so the new route will be added here since the route does not exist - if table.Size() != testTableSize { - t.Errorf("invalid number of routes. expected: %d, given: %d", testTableSize, table.Size()) - } - - // this should add a new route - route.Gateway = "new.gw" - - if err := table.Update(route); err != nil { - t.Errorf("error updating route: %s", err) - } - testTableSize += 1 - - if table.Size() != testTableSize { - t.Errorf("invalid number of routes. expected: %d, given: %d", testTableSize, table.Size()) - } - - // this should NOT add a new route as we are setting the policy to Skip + // this should error as the destination does not exist route.Destination = "rand.dest" - route.Policy = Skip if err := table.Update(route); err != ErrRouteNotFound { - t.Errorf("error updating route. Expected error: %s, given: %s", ErrRouteNotFound, err) + t.Errorf("error updating route. Expected error: %s, found: %s", ErrRouteNotFound, err) } - if table.Size() != 3 { - t.Errorf("invalid number of routes. expected: %d, given: %d", testTableSize, table.Size()) + if table.Size() != testTableSize { + t.Errorf("invalid number of routes. expected: %d, found: %d", testTableSize, table.Size()) } } @@ -173,10 +124,104 @@ func TestList(t *testing.T) { } if len(routes) != len(dest) { - t.Errorf("incorrect number of routes listed. Expected: %d, Given: %d", len(dest), len(routes)) + t.Errorf("incorrect number of routes listed. Expected: %d, found: %d", len(dest), len(routes)) } if len(routes) != table.Size() { t.Errorf("mismatch number of routes and table size. Routes: %d, Size: %d", len(routes), table.Size()) } } + +func TestLookup(t *testing.T) { + table, route := testSetup() + + dest := []string{"svc1", "svc2", "svc3"} + net := []string{"net1", "net2", "net1"} + rtr := []string{"router1", "router2", "router3"} + + for i := 0; i < len(dest); i++ { + route.Destination = dest[i] + route.Network = net[i] + route.Router = rtr[i] + if err := table.Add(route); err != nil { + t.Errorf("error adding route: %s", err) + } + } + + // return all routes + query := NewQuery() + + routes, err := table.Lookup(query) + if err != nil { + t.Errorf("error looking up routes: %s", err) + } + + if len(routes) != table.Size() { + t.Errorf("incorrect number of routes returned. expected: %d, found: %d", table.Size(), len(routes)) + } + + // query particular net + query = NewQuery(QueryNetwork("net1")) + + routes, err = table.Lookup(query) + if err != nil { + t.Errorf("error looking up routes: %s", err) + } + + if len(routes) != 2 { + t.Errorf("incorrect number of routes returned. expected: %d, found: %d", 2, len(routes)) + } + + // query particular router + router := "router1" + query = NewQuery(QueryRouter(router)) + + routes, err = table.Lookup(query) + if err != nil { + t.Errorf("error looking up routes: %s", err) + } + + if len(routes) != 1 { + t.Errorf("incorrect number of routes returned. expected: %d, found: %d", 1, len(routes)) + } + + if routes[0].Router != router { + t.Errorf("incorrect route returned. Expected router: %s, found: %s", router, routes[0].Router) + } + + // query particular route + network := "net1" + query = NewQuery( + QueryRouter(router), + QueryNetwork(network), + ) + + routes, err = table.Lookup(query) + if err != nil { + t.Errorf("error looking up routes: %s", err) + } + + if len(routes) != 1 { + t.Errorf("incorrect number of routes returned. expected: %d, found: %d", 1, len(routes)) + } + + if routes[0].Router != router { + t.Errorf("incorrect route returned. Expected router: %s, found: %s", router, routes[0].Router) + } + + if routes[0].Network != network { + t.Errorf("incorrect network returned. Expected network: %s, found: %s", network, routes[0].Network) + } + + // bullshit route query + query = NewQuery(QueryDestination("foobar")) + + routes, err = table.Lookup(query) + if err != nil { + t.Errorf("error looking up routes: %s", err) + } + + if len(routes) != 0 { + t.Errorf("incorrect number of routes returned. expected: %d, found: %d", 0, len(routes)) + } +} diff --git a/network/router/table/query.go b/network/router/table/query.go index ccb7d396..42b44db6 100644 --- a/network/router/table/query.go +++ b/network/router/table/query.go @@ -90,6 +90,7 @@ func NewQuery(opts ...QueryOption) Query { // NOTE: by default we use DefaultNetworkMetric qopts := QueryOptions{ Destination: "*", + Router: "*", Network: "*", Policy: DiscardIfNone, } diff --git a/network/router/table/route.go b/network/router/table/route.go index 3d9c3bcb..cbb32bf1 100644 --- a/network/router/table/route.go +++ b/network/router/table/route.go @@ -15,46 +15,18 @@ var ( DefaultNetworkMetric = 10 ) -// RoutePolicy defines routing table policy -type RoutePolicy int - -const ( - // Insert inserts a new route if it does not already exist - Insert RoutePolicy = iota - // Override overrides the route if it already exists - Override - // Skip skips modifying the route if it already exists - Skip -) - -// String returns human reprensentation of policy -func (p RoutePolicy) String() string { - switch p { - case Insert: - return "INSERT" - case Override: - return "OVERRIDE" - case Skip: - return "SKIP" - default: - return "UNKNOWN" - } -} - // Route is network route type Route struct { // Destination is destination address Destination string // Gateway is route gateway Gateway string - // Router is the router address - Router string // Network is network address Network string + // Router is the router address + Router string // Metric is the route cost metric Metric int - // Policy defines route policy - Policy RoutePolicy } // Hash returns route hash sum. diff --git a/network/router/table/watcher.go b/network/router/table/watcher.go index 850a9089..90e46360 100644 --- a/network/router/table/watcher.go +++ b/network/router/table/watcher.go @@ -19,8 +19,8 @@ var ( type EventType int const ( - // Create is emitted when a new route has been created - Create EventType = iota + // Insert is emitted when a new route has been inserted + Insert EventType = iota // Delete is emitted when an existing route has been deleted Delete // Update is emitted when an existing route has been updated @@ -30,8 +30,8 @@ const ( // String returns string representation of the event func (et EventType) String() string { switch et { - case Create: - return "CREATE" + case Insert: + return "INSERT" case Delete: return "DELETE" case Update: @@ -126,7 +126,7 @@ func (w *tableWatcher) Stop() { } // String prints debug information -func (w *tableWatcher) String() string { +func (w tableWatcher) String() string { sb := &strings.Builder{} table := tablewriter.NewWriter(sb)