diff --git a/network/router/default.go b/network/router/default.go index 9340e8df..cf98da3c 100644 --- a/network/router/default.go +++ b/network/router/default.go @@ -8,7 +8,6 @@ import ( "sync" "time" - "github.com/micro/go-micro/network/router/table" "github.com/micro/go-micro/registry" ) @@ -43,12 +42,12 @@ var ( // router implements default router type router struct { // embed the table - table.Table + *Table opts Options status Status exit chan struct{} errChan chan error - eventChan chan *table.Event + eventChan chan *Event advertChan chan *Advert advertWg *sync.WaitGroup wg *sync.WaitGroup @@ -92,18 +91,18 @@ func (r *router) Options() Options { } // manageRoute applies action on a given route -func (r *router) manageRoute(route table.Route, action string) error { +func (r *router) manageRoute(route Route, action string) error { switch action { case "create": - if err := r.Create(route); err != nil && err != table.ErrDuplicateRoute { + if err := r.Create(route); err != nil && err != ErrDuplicateRoute { return fmt.Errorf("failed adding route for service %s: %s", route.Service, err) } case "update": - if err := r.Update(route); err != nil && err != table.ErrDuplicateRoute { + if err := r.Update(route); err != nil && err != ErrDuplicateRoute { return fmt.Errorf("failed updating route for service %s: %s", route.Service, err) } case "delete": - if err := r.Delete(route); err != nil && err != table.ErrRouteNotFound { + if err := r.Delete(route); err != nil && err != ErrRouteNotFound { return fmt.Errorf("failed deleting route for service %s: %s", route.Service, err) } default: @@ -121,13 +120,13 @@ func (r *router) manageServiceRoutes(service *registry.Service, action string) e // take route action on each service node for _, node := range service.Nodes { - route := table.Route{ + route := Route{ Service: service.Name, Address: node.Address, Gateway: "", Network: r.opts.Network, - Link: table.DefaultLink, - Metric: table.DefaultLocalMetric, + Link: DefaultLink, + Metric: DefaultLocalMetric, } if err := r.manageRoute(route, action); err != nil { @@ -197,7 +196,7 @@ func (r *router) watchRegistry(w registry.Watcher) error { // watchTable watches routing table entries and either adds or deletes locally registered service to/from network registry // It returns error if the locally registered services either fails to be added/deleted to/from network registry. -func (r *router) watchTable(w table.Watcher) error { +func (r *router) watchTable(w Watcher) error { // wait in the background for the router to stop // when the router stops, stop the watcher and exit r.wg.Add(1) @@ -212,7 +211,7 @@ func (r *router) watchTable(w table.Watcher) error { for { event, err := w.Next() if err != nil { - if err != table.ErrWatcherStopped { + if err != ErrWatcherStopped { watchErr = err } break @@ -234,7 +233,7 @@ func (r *router) watchTable(w table.Watcher) error { // publishAdvert publishes router advert to advert channel // NOTE: this might cease to be a dedicated method in the future -func (r *router) publishAdvert(advType AdvertType, events []*table.Event) { +func (r *router) publishAdvert(advType AdvertType, events []*Event) { defer r.advertWg.Done() a := &Advert{ @@ -266,10 +265,10 @@ func (r *router) advertiseTable() error { return fmt.Errorf("failed listing routes: %s", err) } // collect all the added routes before we attempt to add default gateway - events := make([]*table.Event, len(routes)) + events := make([]*Event, len(routes)) for i, route := range routes { - event := &table.Event{ - Type: table.Update, + event := &Event{ + Type: Update, Timestamp: time.Now(), Route: route, } @@ -279,7 +278,7 @@ func (r *router) advertiseTable() error { // advertise all routes as Update events to subscribers if len(events) > 0 { r.advertWg.Add(1) - go r.publishAdvert(Update, events) + go r.publishAdvert(RouteUpdate, events) } case <-r.exit: return nil @@ -289,7 +288,7 @@ func (r *router) advertiseTable() error { // routeAdvert contains a list of route events to be advertised type routeAdvert struct { - events []*table.Event + events []*Event // lastUpdate records the time of the last advert update lastUpdate time.Time // penalty is current advert penalty @@ -326,7 +325,7 @@ func (r *router) advertiseEvents() error { for { select { case <-ticker.C: - var events []*table.Event + var events []*Event // collect all events which are not flapping for key, advert := range advertMap { // decay the event penalty @@ -352,7 +351,7 @@ func (r *router) advertiseEvents() error { if !advert.isSuppressed { for _, event := range advert.events { - e := new(table.Event) + e := new(Event) *e = *event events = append(events, e) // delete the advert from the advertMap @@ -364,7 +363,7 @@ func (r *router) advertiseEvents() error { // advertise all Update events to subscribers if len(events) > 0 { r.advertWg.Add(1) - go r.publishAdvert(Update, events) + go r.publishAdvert(RouteUpdate, events) } case e := <-r.eventChan: // if event is nil, continue @@ -375,9 +374,9 @@ func (r *router) advertiseEvents() error { // determine the event penalty var penalty float64 switch e.Type { - case table.Update: + case Update: penalty = UpdatePenalty - case table.Delete: + case Delete: penalty = DeletePenalty } @@ -386,7 +385,7 @@ func (r *router) advertiseEvents() error { hash := e.Route.Hash() advert, ok := advertMap[hash] if !ok { - events := []*table.Event{e} + events := []*Event{e} advert = &routeAdvert{ events: events, penalty: penalty, @@ -462,12 +461,12 @@ func (r *router) run() { // add default gateway into routing table if r.opts.Gateway != "" { // note, the only non-default value is the gateway - route := table.Route{ + route := Route{ Service: "*", Address: "*", Gateway: r.opts.Gateway, Network: "*", - Metric: table.DefaultLocalMetric, + Metric: DefaultLocalMetric, } if err := r.Create(route); err != nil { r.status = Status{Code: Error, Error: fmt.Errorf("failed adding default gateway route: %s", err)} @@ -528,10 +527,10 @@ func (r *router) Advertise() (<-chan *Advert, error) { return nil, fmt.Errorf("failed listing routes: %s", err) } // collect all the added routes before we attempt to add default gateway - events := make([]*table.Event, len(routes)) + events := make([]*Event, len(routes)) for i, route := range routes { - event := &table.Event{ - Type: table.Create, + event := &Event{ + Type: Create, Timestamp: time.Now(), Route: route, } @@ -540,7 +539,7 @@ func (r *router) Advertise() (<-chan *Advert, error) { // create advertise and event channels r.advertChan = make(chan *Advert) - r.eventChan = make(chan *table.Event) + r.eventChan = make(chan *Event) // advertise your presence r.advertWg.Add(1) @@ -580,7 +579,7 @@ func (r *router) Advertise() (<-chan *Advert, error) { func (r *router) Process(a *Advert) error { // NOTE: event sorting might not be necessary // copy update events intp new slices - events := make([]*table.Event, len(a.Events)) + events := make([]*Event, len(a.Events)) copy(events, a.Events) // sort events by timestamp sort.Slice(events, func(i, j int) bool { diff --git a/network/router/options.go b/network/router/options.go index d64aca01..ac108b32 100644 --- a/network/router/options.go +++ b/network/router/options.go @@ -2,7 +2,6 @@ package router import ( "github.com/google/uuid" - "github.com/micro/go-micro/network/router/table" "github.com/micro/go-micro/registry" ) @@ -26,7 +25,7 @@ type Options struct { // Registry is the local registry Registry registry.Registry // Table is routing table - Table table.Table + Table *Table } // Id sets Router Id @@ -64,8 +63,8 @@ func Registry(r registry.Registry) Option { } } -// Table sets the routing table -func Table(t table.Table) Option { +// RoutingTable sets the routing table +func RoutingTable(t *Table) Option { return func(o *Options) { o.Table = t } @@ -78,6 +77,6 @@ func DefaultOptions() Options { Address: DefaultAddress, Network: DefaultNetwork, Registry: registry.DefaultRegistry, - Table: table.NewTable(), + Table: NewTable(), } } diff --git a/network/router/table/query.go b/network/router/query.go similarity index 63% rename from network/router/table/query.go rename to network/router/query.go index 7703e3b3..68fb3588 100644 --- a/network/router/table/query.go +++ b/network/router/query.go @@ -1,26 +1,4 @@ -package table - -// LookupPolicy defines query policy -type LookupPolicy int - -const ( - // DiscardIfNone discards query when no route is found - DiscardIfNone LookupPolicy = iota - // ClosestMatch returns closest match to supplied query - ClosestMatch -) - -// String returns human representation of LookupPolicy -func (lp LookupPolicy) String() string { - switch lp { - case DiscardIfNone: - return "DISCARD" - case ClosestMatch: - return "CLOSEST" - default: - return "UNKNOWN" - } -} +package router // QueryOption sets routing table query options type QueryOption func(*QueryOptions) @@ -33,8 +11,6 @@ type QueryOptions struct { Gateway string // Network is network address Network string - // Policy is query lookup policy - Policy LookupPolicy } // QueryService sets destination address @@ -58,14 +34,6 @@ func QueryNetwork(n string) QueryOption { } } -// QueryPolicy sets query policy -// NOTE: this might be renamed to filter or some such -func QueryPolicy(p LookupPolicy) QueryOption { - return func(o *QueryOptions) { - o.Policy = p - } -} - // Query is routing table query type Query interface { // Options returns query options @@ -80,12 +48,10 @@ type query struct { // NewQuery creates new query and returns it func NewQuery(opts ...QueryOption) Query { // default options - // NOTE: by default we use DefaultNetworkMetric qopts := QueryOptions{ Service: "*", Gateway: "*", Network: "*", - Policy: DiscardIfNone, } for _, o := range opts { diff --git a/network/router/table/route.go b/network/router/route.go similarity index 98% rename from network/router/table/route.go rename to network/router/route.go index 50a52068..fa9dbc72 100644 --- a/network/router/table/route.go +++ b/network/router/route.go @@ -1,4 +1,4 @@ -package table +package router import ( "hash/fnv" diff --git a/network/router/route_test.go b/network/router/route_test.go new file mode 100644 index 00000000..483b852c --- /dev/null +++ b/network/router/route_test.go @@ -0,0 +1,24 @@ +package router + +import "testing" + +func TestHash(t *testing.T) { + route1 := Route{ + Service: "dest.svc", + Gateway: "dest.gw", + Network: "dest.network", + Link: "det.link", + Metric: 10, + } + + // make a copy + route2 := route1 + + route1Hash := route1.Hash() + route2Hash := route2.Hash() + + // we should get the same hash + if route1Hash != route2Hash { + t.Errorf("identical routes result in different hashes") + } +} diff --git a/network/router/router.go b/network/router/router.go index 8ce84fb1..0a0e4788 100644 --- a/network/router/router.go +++ b/network/router/router.go @@ -3,8 +3,6 @@ package router import ( "time" - - "github.com/micro/go-micro/network/router/table" ) var ( @@ -14,8 +12,6 @@ var ( // Router is an interface for a routing control plane type Router interface { - // Router provides a routing table - table.Table // Init initializes the router with options Init(...Option) error // Options returns the router options @@ -24,6 +20,18 @@ type Router interface { Advertise() (<-chan *Advert, error) // Process processes incoming adverts Process(*Advert) error + // Create new route in the routing table + Create(Route) error + // Delete deletes existing route from the routing table + Delete(Route) error + // Update updates route in the routing table + Update(Route) error + // List returns the list of all routes in the table + List() ([]Route, error) + // Lookup looks up routes in the routing table and returns them + Lookup(Query) ([]Route, error) + // Watch returns a watcher which allows to track updates to the routing table + Watch(opts ...WatchOption) (Watcher, error) // Status returns router status Status() Status // Stop stops the router @@ -63,10 +71,22 @@ type AdvertType int const ( // Announce is advertised when the router announces itself Announce AdvertType = iota - // Update advertises route updates - Update + // RouteUpdate advertises route updates + RouteUpdate ) +// String returns human readable advertisement type +func (t AdvertType) String() string { + switch t { + case Announce: + return "announce" + case RouteUpdate: + return "update" + default: + return "unknown" + } +} + // Advert contains a list of events advertised by the router to the network type Advert struct { // Id is the router Id @@ -78,7 +98,7 @@ type Advert struct { // TTL is Advert TTL TTL time.Duration // Events is a list of routing table events to advertise - Events []*table.Event + Events []*Event } // NewRouter creates new Router and returns it diff --git a/network/router/table/default.go b/network/router/table.go similarity index 54% rename from network/router/table/default.go rename to network/router/table.go index bce75935..cb746cbb 100644 --- a/network/router/table/default.go +++ b/network/router/table.go @@ -1,59 +1,39 @@ -package table +package router import ( + "errors" "sync" "time" "github.com/google/uuid" ) -// Options specify routing table options -// TODO: table options TBD in the future -type Options struct{} +var ( + // ErrRouteNotFound is returned when no route was found in the routing table + ErrRouteNotFound = errors.New("route not found") + // ErrDuplicateRoute is returned when the route already exists + ErrDuplicateRoute = errors.New("duplicate route") +) -// table is an in memory routing table -type table struct { - // opts are table options - opts Options - // m stores routing table map - m map[string]map[uint64]Route - // w is a list of table watchers - w map[string]*tableWatcher +// Table is an in memory routing table +type Table struct { + // routes stores service routes + routes map[string]map[uint64]Route + // watchers stores table watchers + watchers map[string]*tableWatcher sync.RWMutex } -// newTable creates a new routing table and returns it -func newTable(opts ...Option) Table { - // default options - var options Options - - // apply requested options - for _, o := range opts { - o(&options) +// NewTable creates a new routing table and returns it +func NewTable(opts ...Option) *Table { + return &Table{ + routes: make(map[string]map[uint64]Route), + watchers: make(map[string]*tableWatcher), } - - return &table{ - opts: options, - m: make(map[string]map[uint64]Route), - w: make(map[string]*tableWatcher), - } -} - -// Init initializes routing table with options -func (t *table) Init(opts ...Option) error { - for _, o := range opts { - o(&t.opts) - } - return nil -} - -// Options returns routing table options -func (t *table) Options() Options { - return t.opts } // Create creates new route in the routing table -func (t *table) Create(r Route) error { +func (t *Table) Create(r Route) error { service := r.Service sum := r.Hash() @@ -61,16 +41,16 @@ func (t *table) Create(r Route) error { defer t.Unlock() // check if there are any routes in the table for the route destination - if _, ok := t.m[service]; !ok { - t.m[service] = make(map[uint64]Route) - t.m[service][sum] = r + if _, ok := t.routes[service]; !ok { + t.routes[service] = make(map[uint64]Route) + t.routes[service][sum] = r go t.sendEvent(&Event{Type: Create, Timestamp: time.Now(), Route: r}) return nil } // add new route to the table for the route destination - if _, ok := t.m[service][sum]; !ok { - t.m[service][sum] = r + if _, ok := t.routes[service][sum]; !ok { + t.routes[service][sum] = r go t.sendEvent(&Event{Type: Create, Timestamp: time.Now(), Route: r}) return nil } @@ -79,25 +59,25 @@ func (t *table) Create(r Route) error { } // Delete deletes the route from the routing table -func (t *table) Delete(r Route) error { +func (t *Table) Delete(r Route) error { service := r.Service sum := r.Hash() t.Lock() defer t.Unlock() - if _, ok := t.m[service]; !ok { + if _, ok := t.routes[service]; !ok { return ErrRouteNotFound } - delete(t.m[service], sum) + delete(t.routes[service], sum) go t.sendEvent(&Event{Type: Delete, Timestamp: time.Now(), Route: r}) return nil } // Update updates routing table with the new route -func (t *table) Update(r Route) error { +func (t *Table) Update(r Route) error { service := r.Service sum := r.Hash() @@ -105,26 +85,26 @@ func (t *table) Update(r Route) error { defer t.Unlock() // check if the route destination has any routes in the table - if _, ok := t.m[service]; !ok { - t.m[service] = make(map[uint64]Route) - t.m[service][sum] = r + if _, ok := t.routes[service]; !ok { + t.routes[service] = make(map[uint64]Route) + t.routes[service][sum] = r go t.sendEvent(&Event{Type: Create, Timestamp: time.Now(), Route: r}) return nil } - t.m[service][sum] = r + t.routes[service][sum] = r go t.sendEvent(&Event{Type: Update, Timestamp: time.Now(), Route: r}) return nil } // List returns a list of all routes in the table -func (t *table) List() ([]Route, error) { +func (t *Table) List() ([]Route, error) { t.RLock() defer t.RUnlock() var routes []Route - for _, rmap := range t.m { + for _, rmap := range t.routes { for _, route := range rmap { routes = append(routes, route) } @@ -155,21 +135,20 @@ func findRoutes(routes map[uint64]Route, network, router string) []Route { } // Lookup queries routing table and returns all routes that match the lookup query -func (t *table) Lookup(q Query) ([]Route, error) { +func (t *Table) Lookup(q Query) ([]Route, error) { t.RLock() defer t.RUnlock() if q.Options().Service != "*" { - // no routes found for the destination and query policy is not a DiscardIfNone - if _, ok := t.m[q.Options().Service]; !ok && q.Options().Policy != DiscardIfNone { + if _, ok := t.routes[q.Options().Service]; !ok { return nil, ErrRouteNotFound } - return findRoutes(t.m[q.Options().Service], q.Options().Network, q.Options().Gateway), nil + return findRoutes(t.routes[q.Options().Service], q.Options().Network, q.Options().Gateway), nil } var results []Route // search through all destinations - for _, routes := range t.m { + for _, routes := range t.routes { results = append(results, findRoutes(routes, q.Options().Network, q.Options().Gateway)...) } @@ -177,7 +156,7 @@ func (t *table) Lookup(q Query) ([]Route, error) { } // Watch returns routing table entry watcher -func (t *table) Watch(opts ...WatchOption) (Watcher, error) { +func (t *Table) Watch(opts ...WatchOption) (Watcher, error) { // by default watch everything wopts := WatchOptions{ Service: "*", @@ -187,25 +166,25 @@ func (t *table) Watch(opts ...WatchOption) (Watcher, error) { o(&wopts) } - watcher := &tableWatcher{ + w := &tableWatcher{ opts: wopts, resChan: make(chan *Event, 10), done: make(chan struct{}), } t.Lock() - t.w[uuid.New().String()] = watcher + t.watchers[uuid.New().String()] = w t.Unlock() - return watcher, nil + return w, nil } // sendEvent sends rules to all subscribe watchers -func (t *table) sendEvent(r *Event) { +func (t *Table) sendEvent(r *Event) { t.RLock() defer t.RUnlock() - for _, w := range t.w { + for _, w := range t.watchers { select { case w.resChan <- r: case <-w.done: @@ -213,20 +192,7 @@ func (t *table) sendEvent(r *Event) { } } -// Size returns the size of the routing table -func (t *table) Size() int { - t.RLock() - defer t.RUnlock() - - size := 0 - for dest := range t.m { - size += len(t.m[dest]) - } - - return size -} - // String returns debug information -func (t *table) String() string { - return "default" +func (t *Table) String() string { + return "table" } diff --git a/network/router/table/table.go b/network/router/table/table.go deleted file mode 100644 index 2836e3d6..00000000 --- a/network/router/table/table.go +++ /dev/null @@ -1,38 +0,0 @@ -package table - -import ( - "errors" -) - -var ( - // ErrRouteNotFound is returned when no route was found in the routing table - ErrRouteNotFound = errors.New("route not found") - // ErrDuplicateRoute is returned when the route already exists - ErrDuplicateRoute = errors.New("duplicate route") -) - -// Table defines routing table interface -type Table interface { - // Create new route in the routing table - Create(Route) error - // Delete deletes existing route from the routing table - Delete(Route) error - // Update updates route in the routing table - Update(Route) error - // List returns the list of all routes in the table - List() ([]Route, error) - // Lookup looks up routes in the routing table and returns them - Lookup(Query) ([]Route, error) - // Watch returns a watcher which allows to track updates to the routing table - Watch(opts ...WatchOption) (Watcher, error) - // Size returns the size of the routing table - Size() int -} - -// Option used by the routing table -type Option func(*Options) - -// NewTable creates new routing table and returns it -func NewTable(opts ...Option) Table { - return newTable(opts...) -} diff --git a/network/router/table/default_test.go b/network/router/table_test.go similarity index 78% rename from network/router/table/default_test.go rename to network/router/table_test.go index 275da8c2..eee734a2 100644 --- a/network/router/table/default_test.go +++ b/network/router/table_test.go @@ -1,8 +1,8 @@ -package table +package router import "testing" -func testSetup() (Table, Route) { +func testSetup() (*Table, Route) { table := NewTable() route := Route{ @@ -18,12 +18,10 @@ func testSetup() (Table, Route) { func TestCreate(t *testing.T) { table, route := testSetup() - testTableSize := table.Size() if err := table.Create(route); err != nil { t.Errorf("error adding route: %s", err) } - testTableSize++ // adds new route for the original destination route.Gateway = "dest.gw2" @@ -31,11 +29,6 @@ func TestCreate(t *testing.T) { if err := table.Create(route); err != nil { t.Errorf("error adding route: %s", err) } - testTableSize++ - - if table.Size() != testTableSize { - t.Errorf("invalid number of routes. Expected: %d, found: %d", testTableSize, table.Size()) - } // adding the same route under Insert policy must error if err := table.Create(route); err != ErrDuplicateRoute { @@ -45,12 +38,10 @@ func TestCreate(t *testing.T) { func TestDelete(t *testing.T) { table, route := testSetup() - testTableSize := table.Size() if err := table.Create(route); err != nil { t.Errorf("error adding route: %s", err) } - testTableSize++ // should fail to delete non-existant route prevSvc := route.Service @@ -66,21 +57,14 @@ func TestDelete(t *testing.T) { if err := table.Delete(route); err != nil { t.Errorf("error deleting route: %s", err) } - testTableSize-- - - if table.Size() != testTableSize { - t.Errorf("invalid number of routes. Expected: %d, found: %d", testTableSize, table.Size()) - } } func TestUpdate(t *testing.T) { table, route := testSetup() - testTableSize := table.Size() if err := table.Create(route); err != nil { t.Errorf("error adding route: %s", err) } - testTableSize++ // change the metric of the original route route.Metric = 200 @@ -89,22 +73,12 @@ func TestUpdate(t *testing.T) { t.Errorf("error updating route: %s", err) } - // the size of the table should not change as we're only updating the metric of an existing route - if table.Size() != testTableSize { - t.Errorf("invalid number of routes. Expected: %d, found: %d", testTableSize, table.Size()) - } - // this should add a new route route.Service = "rand.dest" if err := table.Update(route); err != nil { t.Errorf("error updating route: %s", err) } - testTableSize++ - - if table.Size() != testTableSize { - t.Errorf("invalid number of routes. Expected: %d, found: %d", testTableSize, table.Size()) - } } func TestList(t *testing.T) { @@ -127,10 +101,6 @@ func TestList(t *testing.T) { if len(routes) != len(svc) { t.Errorf("incorrect number of routes listed. Expected: %d, found: %d", len(svc), len(routes)) } - - if len(routes) != table.Size() { - t.Errorf("mismatch number of routes and table size. Expected: %d, found: %d", len(routes), table.Size()) - } } func TestLookup(t *testing.T) { @@ -157,10 +127,6 @@ func TestLookup(t *testing.T) { t.Errorf("error looking up routes: %s", err) } - if len(routes) != table.Size() { - t.Errorf("incorrect number of routes returned. Expected: %d, found: %d", table.Size(), len(routes)) - } - // query particular net query = NewQuery(QueryNetwork("net1")) @@ -218,8 +184,8 @@ func TestLookup(t *testing.T) { query = NewQuery(QueryService("foobar")) routes, err = table.Lookup(query) - if err != nil { - t.Errorf("error looking up routes: %s", err) + if err != ErrRouteNotFound { + t.Errorf("error looking up routes. Expected: %s, found: %s", ErrRouteNotFound, err) } if len(routes) != 0 { diff --git a/network/router/table/watcher.go b/network/router/watcher.go similarity index 88% rename from network/router/table/watcher.go rename to network/router/watcher.go index 503993ff..d1ff3ea7 100644 --- a/network/router/table/watcher.go +++ b/network/router/watcher.go @@ -1,4 +1,4 @@ -package table +package router import ( "errors" @@ -22,11 +22,9 @@ const ( Update ) -// String implements fmt.Stringer -// NOTE: we need this as this makes converting the numeric codes -// into miro style string actions very simple -func (et EventType) String() string { - switch et { +// String returns human readable event type +func (t EventType) String() string { + switch t { case Create: return "create" case Delete: @@ -83,8 +81,7 @@ type tableWatcher struct { } // Next returns the next noticed action taken on table -// TODO: this needs to be thought through properly; -// right now we only allow to watch service +// TODO: think this through properly; right now we only watch service func (w *tableWatcher) Next() (*Event, error) { for { select {