table package is no more. Cleaned up unnecessary code, too.
This commit is contained in:
parent
4e27aac398
commit
e22c4b4c07
@ -8,7 +8,6 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/micro/go-micro/network/router/table"
|
|
||||||
"github.com/micro/go-micro/registry"
|
"github.com/micro/go-micro/registry"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -43,12 +42,12 @@ var (
|
|||||||
// router implements default router
|
// router implements default router
|
||||||
type router struct {
|
type router struct {
|
||||||
// embed the table
|
// embed the table
|
||||||
table.Table
|
*Table
|
||||||
opts Options
|
opts Options
|
||||||
status Status
|
status Status
|
||||||
exit chan struct{}
|
exit chan struct{}
|
||||||
errChan chan error
|
errChan chan error
|
||||||
eventChan chan *table.Event
|
eventChan chan *Event
|
||||||
advertChan chan *Advert
|
advertChan chan *Advert
|
||||||
advertWg *sync.WaitGroup
|
advertWg *sync.WaitGroup
|
||||||
wg *sync.WaitGroup
|
wg *sync.WaitGroup
|
||||||
@ -92,18 +91,18 @@ func (r *router) Options() Options {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// manageRoute applies action on a given route
|
// manageRoute applies action on a given route
|
||||||
func (r *router) manageRoute(route table.Route, action string) error {
|
func (r *router) manageRoute(route Route, action string) error {
|
||||||
switch action {
|
switch action {
|
||||||
case "create":
|
case "create":
|
||||||
if err := r.Create(route); err != nil && err != table.ErrDuplicateRoute {
|
if err := r.Create(route); err != nil && err != ErrDuplicateRoute {
|
||||||
return fmt.Errorf("failed adding route for service %s: %s", route.Service, err)
|
return fmt.Errorf("failed adding route for service %s: %s", route.Service, err)
|
||||||
}
|
}
|
||||||
case "update":
|
case "update":
|
||||||
if err := r.Update(route); err != nil && err != table.ErrDuplicateRoute {
|
if err := r.Update(route); err != nil && err != ErrDuplicateRoute {
|
||||||
return fmt.Errorf("failed updating route for service %s: %s", route.Service, err)
|
return fmt.Errorf("failed updating route for service %s: %s", route.Service, err)
|
||||||
}
|
}
|
||||||
case "delete":
|
case "delete":
|
||||||
if err := r.Delete(route); err != nil && err != table.ErrRouteNotFound {
|
if err := r.Delete(route); err != nil && err != ErrRouteNotFound {
|
||||||
return fmt.Errorf("failed deleting route for service %s: %s", route.Service, err)
|
return fmt.Errorf("failed deleting route for service %s: %s", route.Service, err)
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
@ -121,13 +120,13 @@ func (r *router) manageServiceRoutes(service *registry.Service, action string) e
|
|||||||
|
|
||||||
// take route action on each service node
|
// take route action on each service node
|
||||||
for _, node := range service.Nodes {
|
for _, node := range service.Nodes {
|
||||||
route := table.Route{
|
route := Route{
|
||||||
Service: service.Name,
|
Service: service.Name,
|
||||||
Address: node.Address,
|
Address: node.Address,
|
||||||
Gateway: "",
|
Gateway: "",
|
||||||
Network: r.opts.Network,
|
Network: r.opts.Network,
|
||||||
Link: table.DefaultLink,
|
Link: DefaultLink,
|
||||||
Metric: table.DefaultLocalMetric,
|
Metric: DefaultLocalMetric,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := r.manageRoute(route, action); err != nil {
|
if err := r.manageRoute(route, action); err != nil {
|
||||||
@ -197,7 +196,7 @@ func (r *router) watchRegistry(w registry.Watcher) error {
|
|||||||
|
|
||||||
// watchTable watches routing table entries and either adds or deletes locally registered service to/from network registry
|
// watchTable watches routing table entries and either adds or deletes locally registered service to/from network registry
|
||||||
// It returns error if the locally registered services either fails to be added/deleted to/from network registry.
|
// It returns error if the locally registered services either fails to be added/deleted to/from network registry.
|
||||||
func (r *router) watchTable(w table.Watcher) error {
|
func (r *router) watchTable(w Watcher) error {
|
||||||
// wait in the background for the router to stop
|
// wait in the background for the router to stop
|
||||||
// when the router stops, stop the watcher and exit
|
// when the router stops, stop the watcher and exit
|
||||||
r.wg.Add(1)
|
r.wg.Add(1)
|
||||||
@ -212,7 +211,7 @@ func (r *router) watchTable(w table.Watcher) error {
|
|||||||
for {
|
for {
|
||||||
event, err := w.Next()
|
event, err := w.Next()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != table.ErrWatcherStopped {
|
if err != ErrWatcherStopped {
|
||||||
watchErr = err
|
watchErr = err
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
@ -234,7 +233,7 @@ func (r *router) watchTable(w table.Watcher) error {
|
|||||||
|
|
||||||
// publishAdvert publishes router advert to advert channel
|
// publishAdvert publishes router advert to advert channel
|
||||||
// NOTE: this might cease to be a dedicated method in the future
|
// NOTE: this might cease to be a dedicated method in the future
|
||||||
func (r *router) publishAdvert(advType AdvertType, events []*table.Event) {
|
func (r *router) publishAdvert(advType AdvertType, events []*Event) {
|
||||||
defer r.advertWg.Done()
|
defer r.advertWg.Done()
|
||||||
|
|
||||||
a := &Advert{
|
a := &Advert{
|
||||||
@ -266,10 +265,10 @@ func (r *router) advertiseTable() error {
|
|||||||
return fmt.Errorf("failed listing routes: %s", err)
|
return fmt.Errorf("failed listing routes: %s", err)
|
||||||
}
|
}
|
||||||
// collect all the added routes before we attempt to add default gateway
|
// collect all the added routes before we attempt to add default gateway
|
||||||
events := make([]*table.Event, len(routes))
|
events := make([]*Event, len(routes))
|
||||||
for i, route := range routes {
|
for i, route := range routes {
|
||||||
event := &table.Event{
|
event := &Event{
|
||||||
Type: table.Update,
|
Type: Update,
|
||||||
Timestamp: time.Now(),
|
Timestamp: time.Now(),
|
||||||
Route: route,
|
Route: route,
|
||||||
}
|
}
|
||||||
@ -279,7 +278,7 @@ func (r *router) advertiseTable() error {
|
|||||||
// advertise all routes as Update events to subscribers
|
// advertise all routes as Update events to subscribers
|
||||||
if len(events) > 0 {
|
if len(events) > 0 {
|
||||||
r.advertWg.Add(1)
|
r.advertWg.Add(1)
|
||||||
go r.publishAdvert(Update, events)
|
go r.publishAdvert(RouteUpdate, events)
|
||||||
}
|
}
|
||||||
case <-r.exit:
|
case <-r.exit:
|
||||||
return nil
|
return nil
|
||||||
@ -289,7 +288,7 @@ func (r *router) advertiseTable() error {
|
|||||||
|
|
||||||
// routeAdvert contains a list of route events to be advertised
|
// routeAdvert contains a list of route events to be advertised
|
||||||
type routeAdvert struct {
|
type routeAdvert struct {
|
||||||
events []*table.Event
|
events []*Event
|
||||||
// lastUpdate records the time of the last advert update
|
// lastUpdate records the time of the last advert update
|
||||||
lastUpdate time.Time
|
lastUpdate time.Time
|
||||||
// penalty is current advert penalty
|
// penalty is current advert penalty
|
||||||
@ -326,7 +325,7 @@ func (r *router) advertiseEvents() error {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
var events []*table.Event
|
var events []*Event
|
||||||
// collect all events which are not flapping
|
// collect all events which are not flapping
|
||||||
for key, advert := range advertMap {
|
for key, advert := range advertMap {
|
||||||
// decay the event penalty
|
// decay the event penalty
|
||||||
@ -352,7 +351,7 @@ func (r *router) advertiseEvents() error {
|
|||||||
|
|
||||||
if !advert.isSuppressed {
|
if !advert.isSuppressed {
|
||||||
for _, event := range advert.events {
|
for _, event := range advert.events {
|
||||||
e := new(table.Event)
|
e := new(Event)
|
||||||
*e = *event
|
*e = *event
|
||||||
events = append(events, e)
|
events = append(events, e)
|
||||||
// delete the advert from the advertMap
|
// delete the advert from the advertMap
|
||||||
@ -364,7 +363,7 @@ func (r *router) advertiseEvents() error {
|
|||||||
// advertise all Update events to subscribers
|
// advertise all Update events to subscribers
|
||||||
if len(events) > 0 {
|
if len(events) > 0 {
|
||||||
r.advertWg.Add(1)
|
r.advertWg.Add(1)
|
||||||
go r.publishAdvert(Update, events)
|
go r.publishAdvert(RouteUpdate, events)
|
||||||
}
|
}
|
||||||
case e := <-r.eventChan:
|
case e := <-r.eventChan:
|
||||||
// if event is nil, continue
|
// if event is nil, continue
|
||||||
@ -375,9 +374,9 @@ func (r *router) advertiseEvents() error {
|
|||||||
// determine the event penalty
|
// determine the event penalty
|
||||||
var penalty float64
|
var penalty float64
|
||||||
switch e.Type {
|
switch e.Type {
|
||||||
case table.Update:
|
case Update:
|
||||||
penalty = UpdatePenalty
|
penalty = UpdatePenalty
|
||||||
case table.Delete:
|
case Delete:
|
||||||
penalty = DeletePenalty
|
penalty = DeletePenalty
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -386,7 +385,7 @@ func (r *router) advertiseEvents() error {
|
|||||||
hash := e.Route.Hash()
|
hash := e.Route.Hash()
|
||||||
advert, ok := advertMap[hash]
|
advert, ok := advertMap[hash]
|
||||||
if !ok {
|
if !ok {
|
||||||
events := []*table.Event{e}
|
events := []*Event{e}
|
||||||
advert = &routeAdvert{
|
advert = &routeAdvert{
|
||||||
events: events,
|
events: events,
|
||||||
penalty: penalty,
|
penalty: penalty,
|
||||||
@ -462,12 +461,12 @@ func (r *router) run() {
|
|||||||
// add default gateway into routing table
|
// add default gateway into routing table
|
||||||
if r.opts.Gateway != "" {
|
if r.opts.Gateway != "" {
|
||||||
// note, the only non-default value is the gateway
|
// note, the only non-default value is the gateway
|
||||||
route := table.Route{
|
route := Route{
|
||||||
Service: "*",
|
Service: "*",
|
||||||
Address: "*",
|
Address: "*",
|
||||||
Gateway: r.opts.Gateway,
|
Gateway: r.opts.Gateway,
|
||||||
Network: "*",
|
Network: "*",
|
||||||
Metric: table.DefaultLocalMetric,
|
Metric: DefaultLocalMetric,
|
||||||
}
|
}
|
||||||
if err := r.Create(route); err != nil {
|
if err := r.Create(route); err != nil {
|
||||||
r.status = Status{Code: Error, Error: fmt.Errorf("failed adding default gateway route: %s", err)}
|
r.status = Status{Code: Error, Error: fmt.Errorf("failed adding default gateway route: %s", err)}
|
||||||
@ -528,10 +527,10 @@ func (r *router) Advertise() (<-chan *Advert, error) {
|
|||||||
return nil, fmt.Errorf("failed listing routes: %s", err)
|
return nil, fmt.Errorf("failed listing routes: %s", err)
|
||||||
}
|
}
|
||||||
// collect all the added routes before we attempt to add default gateway
|
// collect all the added routes before we attempt to add default gateway
|
||||||
events := make([]*table.Event, len(routes))
|
events := make([]*Event, len(routes))
|
||||||
for i, route := range routes {
|
for i, route := range routes {
|
||||||
event := &table.Event{
|
event := &Event{
|
||||||
Type: table.Create,
|
Type: Create,
|
||||||
Timestamp: time.Now(),
|
Timestamp: time.Now(),
|
||||||
Route: route,
|
Route: route,
|
||||||
}
|
}
|
||||||
@ -540,7 +539,7 @@ func (r *router) Advertise() (<-chan *Advert, error) {
|
|||||||
|
|
||||||
// create advertise and event channels
|
// create advertise and event channels
|
||||||
r.advertChan = make(chan *Advert)
|
r.advertChan = make(chan *Advert)
|
||||||
r.eventChan = make(chan *table.Event)
|
r.eventChan = make(chan *Event)
|
||||||
|
|
||||||
// advertise your presence
|
// advertise your presence
|
||||||
r.advertWg.Add(1)
|
r.advertWg.Add(1)
|
||||||
@ -580,7 +579,7 @@ func (r *router) Advertise() (<-chan *Advert, error) {
|
|||||||
func (r *router) Process(a *Advert) error {
|
func (r *router) Process(a *Advert) error {
|
||||||
// NOTE: event sorting might not be necessary
|
// NOTE: event sorting might not be necessary
|
||||||
// copy update events intp new slices
|
// copy update events intp new slices
|
||||||
events := make([]*table.Event, len(a.Events))
|
events := make([]*Event, len(a.Events))
|
||||||
copy(events, a.Events)
|
copy(events, a.Events)
|
||||||
// sort events by timestamp
|
// sort events by timestamp
|
||||||
sort.Slice(events, func(i, j int) bool {
|
sort.Slice(events, func(i, j int) bool {
|
||||||
|
@ -2,7 +2,6 @@ package router
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"github.com/micro/go-micro/network/router/table"
|
|
||||||
"github.com/micro/go-micro/registry"
|
"github.com/micro/go-micro/registry"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -26,7 +25,7 @@ type Options struct {
|
|||||||
// Registry is the local registry
|
// Registry is the local registry
|
||||||
Registry registry.Registry
|
Registry registry.Registry
|
||||||
// Table is routing table
|
// Table is routing table
|
||||||
Table table.Table
|
Table *Table
|
||||||
}
|
}
|
||||||
|
|
||||||
// Id sets Router Id
|
// Id sets Router Id
|
||||||
@ -64,8 +63,8 @@ func Registry(r registry.Registry) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Table sets the routing table
|
// RoutingTable sets the routing table
|
||||||
func Table(t table.Table) Option {
|
func RoutingTable(t *Table) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
o.Table = t
|
o.Table = t
|
||||||
}
|
}
|
||||||
@ -78,6 +77,6 @@ func DefaultOptions() Options {
|
|||||||
Address: DefaultAddress,
|
Address: DefaultAddress,
|
||||||
Network: DefaultNetwork,
|
Network: DefaultNetwork,
|
||||||
Registry: registry.DefaultRegistry,
|
Registry: registry.DefaultRegistry,
|
||||||
Table: table.NewTable(),
|
Table: NewTable(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,26 +1,4 @@
|
|||||||
package table
|
package router
|
||||||
|
|
||||||
// LookupPolicy defines query policy
|
|
||||||
type LookupPolicy int
|
|
||||||
|
|
||||||
const (
|
|
||||||
// DiscardIfNone discards query when no route is found
|
|
||||||
DiscardIfNone LookupPolicy = iota
|
|
||||||
// ClosestMatch returns closest match to supplied query
|
|
||||||
ClosestMatch
|
|
||||||
)
|
|
||||||
|
|
||||||
// String returns human representation of LookupPolicy
|
|
||||||
func (lp LookupPolicy) String() string {
|
|
||||||
switch lp {
|
|
||||||
case DiscardIfNone:
|
|
||||||
return "DISCARD"
|
|
||||||
case ClosestMatch:
|
|
||||||
return "CLOSEST"
|
|
||||||
default:
|
|
||||||
return "UNKNOWN"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// QueryOption sets routing table query options
|
// QueryOption sets routing table query options
|
||||||
type QueryOption func(*QueryOptions)
|
type QueryOption func(*QueryOptions)
|
||||||
@ -33,8 +11,6 @@ type QueryOptions struct {
|
|||||||
Gateway string
|
Gateway string
|
||||||
// Network is network address
|
// Network is network address
|
||||||
Network string
|
Network string
|
||||||
// Policy is query lookup policy
|
|
||||||
Policy LookupPolicy
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueryService sets destination address
|
// QueryService sets destination address
|
||||||
@ -58,14 +34,6 @@ func QueryNetwork(n string) QueryOption {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueryPolicy sets query policy
|
|
||||||
// NOTE: this might be renamed to filter or some such
|
|
||||||
func QueryPolicy(p LookupPolicy) QueryOption {
|
|
||||||
return func(o *QueryOptions) {
|
|
||||||
o.Policy = p
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Query is routing table query
|
// Query is routing table query
|
||||||
type Query interface {
|
type Query interface {
|
||||||
// Options returns query options
|
// Options returns query options
|
||||||
@ -80,12 +48,10 @@ type query struct {
|
|||||||
// NewQuery creates new query and returns it
|
// NewQuery creates new query and returns it
|
||||||
func NewQuery(opts ...QueryOption) Query {
|
func NewQuery(opts ...QueryOption) Query {
|
||||||
// default options
|
// default options
|
||||||
// NOTE: by default we use DefaultNetworkMetric
|
|
||||||
qopts := QueryOptions{
|
qopts := QueryOptions{
|
||||||
Service: "*",
|
Service: "*",
|
||||||
Gateway: "*",
|
Gateway: "*",
|
||||||
Network: "*",
|
Network: "*",
|
||||||
Policy: DiscardIfNone,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
@ -1,4 +1,4 @@
|
|||||||
package table
|
package router
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"hash/fnv"
|
"hash/fnv"
|
24
network/router/route_test.go
Normal file
24
network/router/route_test.go
Normal file
@ -0,0 +1,24 @@
|
|||||||
|
package router
|
||||||
|
|
||||||
|
import "testing"
|
||||||
|
|
||||||
|
func TestHash(t *testing.T) {
|
||||||
|
route1 := Route{
|
||||||
|
Service: "dest.svc",
|
||||||
|
Gateway: "dest.gw",
|
||||||
|
Network: "dest.network",
|
||||||
|
Link: "det.link",
|
||||||
|
Metric: 10,
|
||||||
|
}
|
||||||
|
|
||||||
|
// make a copy
|
||||||
|
route2 := route1
|
||||||
|
|
||||||
|
route1Hash := route1.Hash()
|
||||||
|
route2Hash := route2.Hash()
|
||||||
|
|
||||||
|
// we should get the same hash
|
||||||
|
if route1Hash != route2Hash {
|
||||||
|
t.Errorf("identical routes result in different hashes")
|
||||||
|
}
|
||||||
|
}
|
@ -3,8 +3,6 @@ package router
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/micro/go-micro/network/router/table"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -14,8 +12,6 @@ var (
|
|||||||
|
|
||||||
// Router is an interface for a routing control plane
|
// Router is an interface for a routing control plane
|
||||||
type Router interface {
|
type Router interface {
|
||||||
// Router provides a routing table
|
|
||||||
table.Table
|
|
||||||
// Init initializes the router with options
|
// Init initializes the router with options
|
||||||
Init(...Option) error
|
Init(...Option) error
|
||||||
// Options returns the router options
|
// Options returns the router options
|
||||||
@ -24,6 +20,18 @@ type Router interface {
|
|||||||
Advertise() (<-chan *Advert, error)
|
Advertise() (<-chan *Advert, error)
|
||||||
// Process processes incoming adverts
|
// Process processes incoming adverts
|
||||||
Process(*Advert) error
|
Process(*Advert) error
|
||||||
|
// Create new route in the routing table
|
||||||
|
Create(Route) error
|
||||||
|
// Delete deletes existing route from the routing table
|
||||||
|
Delete(Route) error
|
||||||
|
// Update updates route in the routing table
|
||||||
|
Update(Route) error
|
||||||
|
// List returns the list of all routes in the table
|
||||||
|
List() ([]Route, error)
|
||||||
|
// Lookup looks up routes in the routing table and returns them
|
||||||
|
Lookup(Query) ([]Route, error)
|
||||||
|
// Watch returns a watcher which allows to track updates to the routing table
|
||||||
|
Watch(opts ...WatchOption) (Watcher, error)
|
||||||
// Status returns router status
|
// Status returns router status
|
||||||
Status() Status
|
Status() Status
|
||||||
// Stop stops the router
|
// Stop stops the router
|
||||||
@ -63,10 +71,22 @@ type AdvertType int
|
|||||||
const (
|
const (
|
||||||
// Announce is advertised when the router announces itself
|
// Announce is advertised when the router announces itself
|
||||||
Announce AdvertType = iota
|
Announce AdvertType = iota
|
||||||
// Update advertises route updates
|
// RouteUpdate advertises route updates
|
||||||
Update
|
RouteUpdate
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// String returns human readable advertisement type
|
||||||
|
func (t AdvertType) String() string {
|
||||||
|
switch t {
|
||||||
|
case Announce:
|
||||||
|
return "announce"
|
||||||
|
case RouteUpdate:
|
||||||
|
return "update"
|
||||||
|
default:
|
||||||
|
return "unknown"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Advert contains a list of events advertised by the router to the network
|
// Advert contains a list of events advertised by the router to the network
|
||||||
type Advert struct {
|
type Advert struct {
|
||||||
// Id is the router Id
|
// Id is the router Id
|
||||||
@ -78,7 +98,7 @@ type Advert struct {
|
|||||||
// TTL is Advert TTL
|
// TTL is Advert TTL
|
||||||
TTL time.Duration
|
TTL time.Duration
|
||||||
// Events is a list of routing table events to advertise
|
// Events is a list of routing table events to advertise
|
||||||
Events []*table.Event
|
Events []*Event
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewRouter creates new Router and returns it
|
// NewRouter creates new Router and returns it
|
||||||
|
@ -1,59 +1,39 @@
|
|||||||
package table
|
package router
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Options specify routing table options
|
var (
|
||||||
// TODO: table options TBD in the future
|
// ErrRouteNotFound is returned when no route was found in the routing table
|
||||||
type Options struct{}
|
ErrRouteNotFound = errors.New("route not found")
|
||||||
|
// ErrDuplicateRoute is returned when the route already exists
|
||||||
|
ErrDuplicateRoute = errors.New("duplicate route")
|
||||||
|
)
|
||||||
|
|
||||||
// table is an in memory routing table
|
// Table is an in memory routing table
|
||||||
type table struct {
|
type Table struct {
|
||||||
// opts are table options
|
// routes stores service routes
|
||||||
opts Options
|
routes map[string]map[uint64]Route
|
||||||
// m stores routing table map
|
// watchers stores table watchers
|
||||||
m map[string]map[uint64]Route
|
watchers map[string]*tableWatcher
|
||||||
// w is a list of table watchers
|
|
||||||
w map[string]*tableWatcher
|
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// newTable creates a new routing table and returns it
|
// NewTable creates a new routing table and returns it
|
||||||
func newTable(opts ...Option) Table {
|
func NewTable(opts ...Option) *Table {
|
||||||
// default options
|
return &Table{
|
||||||
var options Options
|
routes: make(map[string]map[uint64]Route),
|
||||||
|
watchers: make(map[string]*tableWatcher),
|
||||||
// apply requested options
|
|
||||||
for _, o := range opts {
|
|
||||||
o(&options)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return &table{
|
|
||||||
opts: options,
|
|
||||||
m: make(map[string]map[uint64]Route),
|
|
||||||
w: make(map[string]*tableWatcher),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Init initializes routing table with options
|
|
||||||
func (t *table) Init(opts ...Option) error {
|
|
||||||
for _, o := range opts {
|
|
||||||
o(&t.opts)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Options returns routing table options
|
|
||||||
func (t *table) Options() Options {
|
|
||||||
return t.opts
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create creates new route in the routing table
|
// Create creates new route in the routing table
|
||||||
func (t *table) Create(r Route) error {
|
func (t *Table) Create(r Route) error {
|
||||||
service := r.Service
|
service := r.Service
|
||||||
sum := r.Hash()
|
sum := r.Hash()
|
||||||
|
|
||||||
@ -61,16 +41,16 @@ func (t *table) Create(r Route) error {
|
|||||||
defer t.Unlock()
|
defer t.Unlock()
|
||||||
|
|
||||||
// check if there are any routes in the table for the route destination
|
// check if there are any routes in the table for the route destination
|
||||||
if _, ok := t.m[service]; !ok {
|
if _, ok := t.routes[service]; !ok {
|
||||||
t.m[service] = make(map[uint64]Route)
|
t.routes[service] = make(map[uint64]Route)
|
||||||
t.m[service][sum] = r
|
t.routes[service][sum] = r
|
||||||
go t.sendEvent(&Event{Type: Create, Timestamp: time.Now(), Route: r})
|
go t.sendEvent(&Event{Type: Create, Timestamp: time.Now(), Route: r})
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// add new route to the table for the route destination
|
// add new route to the table for the route destination
|
||||||
if _, ok := t.m[service][sum]; !ok {
|
if _, ok := t.routes[service][sum]; !ok {
|
||||||
t.m[service][sum] = r
|
t.routes[service][sum] = r
|
||||||
go t.sendEvent(&Event{Type: Create, Timestamp: time.Now(), Route: r})
|
go t.sendEvent(&Event{Type: Create, Timestamp: time.Now(), Route: r})
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -79,25 +59,25 @@ func (t *table) Create(r Route) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Delete deletes the route from the routing table
|
// Delete deletes the route from the routing table
|
||||||
func (t *table) Delete(r Route) error {
|
func (t *Table) Delete(r Route) error {
|
||||||
service := r.Service
|
service := r.Service
|
||||||
sum := r.Hash()
|
sum := r.Hash()
|
||||||
|
|
||||||
t.Lock()
|
t.Lock()
|
||||||
defer t.Unlock()
|
defer t.Unlock()
|
||||||
|
|
||||||
if _, ok := t.m[service]; !ok {
|
if _, ok := t.routes[service]; !ok {
|
||||||
return ErrRouteNotFound
|
return ErrRouteNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
delete(t.m[service], sum)
|
delete(t.routes[service], sum)
|
||||||
go t.sendEvent(&Event{Type: Delete, Timestamp: time.Now(), Route: r})
|
go t.sendEvent(&Event{Type: Delete, Timestamp: time.Now(), Route: r})
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update updates routing table with the new route
|
// Update updates routing table with the new route
|
||||||
func (t *table) Update(r Route) error {
|
func (t *Table) Update(r Route) error {
|
||||||
service := r.Service
|
service := r.Service
|
||||||
sum := r.Hash()
|
sum := r.Hash()
|
||||||
|
|
||||||
@ -105,26 +85,26 @@ func (t *table) Update(r Route) error {
|
|||||||
defer t.Unlock()
|
defer t.Unlock()
|
||||||
|
|
||||||
// check if the route destination has any routes in the table
|
// check if the route destination has any routes in the table
|
||||||
if _, ok := t.m[service]; !ok {
|
if _, ok := t.routes[service]; !ok {
|
||||||
t.m[service] = make(map[uint64]Route)
|
t.routes[service] = make(map[uint64]Route)
|
||||||
t.m[service][sum] = r
|
t.routes[service][sum] = r
|
||||||
go t.sendEvent(&Event{Type: Create, Timestamp: time.Now(), Route: r})
|
go t.sendEvent(&Event{Type: Create, Timestamp: time.Now(), Route: r})
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
t.m[service][sum] = r
|
t.routes[service][sum] = r
|
||||||
go t.sendEvent(&Event{Type: Update, Timestamp: time.Now(), Route: r})
|
go t.sendEvent(&Event{Type: Update, Timestamp: time.Now(), Route: r})
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// List returns a list of all routes in the table
|
// List returns a list of all routes in the table
|
||||||
func (t *table) List() ([]Route, error) {
|
func (t *Table) List() ([]Route, error) {
|
||||||
t.RLock()
|
t.RLock()
|
||||||
defer t.RUnlock()
|
defer t.RUnlock()
|
||||||
|
|
||||||
var routes []Route
|
var routes []Route
|
||||||
for _, rmap := range t.m {
|
for _, rmap := range t.routes {
|
||||||
for _, route := range rmap {
|
for _, route := range rmap {
|
||||||
routes = append(routes, route)
|
routes = append(routes, route)
|
||||||
}
|
}
|
||||||
@ -155,21 +135,20 @@ func findRoutes(routes map[uint64]Route, network, router string) []Route {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Lookup queries routing table and returns all routes that match the lookup query
|
// Lookup queries routing table and returns all routes that match the lookup query
|
||||||
func (t *table) Lookup(q Query) ([]Route, error) {
|
func (t *Table) Lookup(q Query) ([]Route, error) {
|
||||||
t.RLock()
|
t.RLock()
|
||||||
defer t.RUnlock()
|
defer t.RUnlock()
|
||||||
|
|
||||||
if q.Options().Service != "*" {
|
if q.Options().Service != "*" {
|
||||||
// no routes found for the destination and query policy is not a DiscardIfNone
|
if _, ok := t.routes[q.Options().Service]; !ok {
|
||||||
if _, ok := t.m[q.Options().Service]; !ok && q.Options().Policy != DiscardIfNone {
|
|
||||||
return nil, ErrRouteNotFound
|
return nil, ErrRouteNotFound
|
||||||
}
|
}
|
||||||
return findRoutes(t.m[q.Options().Service], q.Options().Network, q.Options().Gateway), nil
|
return findRoutes(t.routes[q.Options().Service], q.Options().Network, q.Options().Gateway), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var results []Route
|
var results []Route
|
||||||
// search through all destinations
|
// search through all destinations
|
||||||
for _, routes := range t.m {
|
for _, routes := range t.routes {
|
||||||
results = append(results, findRoutes(routes, q.Options().Network, q.Options().Gateway)...)
|
results = append(results, findRoutes(routes, q.Options().Network, q.Options().Gateway)...)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -177,7 +156,7 @@ func (t *table) Lookup(q Query) ([]Route, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Watch returns routing table entry watcher
|
// Watch returns routing table entry watcher
|
||||||
func (t *table) Watch(opts ...WatchOption) (Watcher, error) {
|
func (t *Table) Watch(opts ...WatchOption) (Watcher, error) {
|
||||||
// by default watch everything
|
// by default watch everything
|
||||||
wopts := WatchOptions{
|
wopts := WatchOptions{
|
||||||
Service: "*",
|
Service: "*",
|
||||||
@ -187,25 +166,25 @@ func (t *table) Watch(opts ...WatchOption) (Watcher, error) {
|
|||||||
o(&wopts)
|
o(&wopts)
|
||||||
}
|
}
|
||||||
|
|
||||||
watcher := &tableWatcher{
|
w := &tableWatcher{
|
||||||
opts: wopts,
|
opts: wopts,
|
||||||
resChan: make(chan *Event, 10),
|
resChan: make(chan *Event, 10),
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Lock()
|
t.Lock()
|
||||||
t.w[uuid.New().String()] = watcher
|
t.watchers[uuid.New().String()] = w
|
||||||
t.Unlock()
|
t.Unlock()
|
||||||
|
|
||||||
return watcher, nil
|
return w, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// sendEvent sends rules to all subscribe watchers
|
// sendEvent sends rules to all subscribe watchers
|
||||||
func (t *table) sendEvent(r *Event) {
|
func (t *Table) sendEvent(r *Event) {
|
||||||
t.RLock()
|
t.RLock()
|
||||||
defer t.RUnlock()
|
defer t.RUnlock()
|
||||||
|
|
||||||
for _, w := range t.w {
|
for _, w := range t.watchers {
|
||||||
select {
|
select {
|
||||||
case w.resChan <- r:
|
case w.resChan <- r:
|
||||||
case <-w.done:
|
case <-w.done:
|
||||||
@ -213,20 +192,7 @@ func (t *table) sendEvent(r *Event) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Size returns the size of the routing table
|
|
||||||
func (t *table) Size() int {
|
|
||||||
t.RLock()
|
|
||||||
defer t.RUnlock()
|
|
||||||
|
|
||||||
size := 0
|
|
||||||
for dest := range t.m {
|
|
||||||
size += len(t.m[dest])
|
|
||||||
}
|
|
||||||
|
|
||||||
return size
|
|
||||||
}
|
|
||||||
|
|
||||||
// String returns debug information
|
// String returns debug information
|
||||||
func (t *table) String() string {
|
func (t *Table) String() string {
|
||||||
return "default"
|
return "table"
|
||||||
}
|
}
|
@ -1,38 +0,0 @@
|
|||||||
package table
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
// ErrRouteNotFound is returned when no route was found in the routing table
|
|
||||||
ErrRouteNotFound = errors.New("route not found")
|
|
||||||
// ErrDuplicateRoute is returned when the route already exists
|
|
||||||
ErrDuplicateRoute = errors.New("duplicate route")
|
|
||||||
)
|
|
||||||
|
|
||||||
// Table defines routing table interface
|
|
||||||
type Table interface {
|
|
||||||
// Create new route in the routing table
|
|
||||||
Create(Route) error
|
|
||||||
// Delete deletes existing route from the routing table
|
|
||||||
Delete(Route) error
|
|
||||||
// Update updates route in the routing table
|
|
||||||
Update(Route) error
|
|
||||||
// List returns the list of all routes in the table
|
|
||||||
List() ([]Route, error)
|
|
||||||
// Lookup looks up routes in the routing table and returns them
|
|
||||||
Lookup(Query) ([]Route, error)
|
|
||||||
// Watch returns a watcher which allows to track updates to the routing table
|
|
||||||
Watch(opts ...WatchOption) (Watcher, error)
|
|
||||||
// Size returns the size of the routing table
|
|
||||||
Size() int
|
|
||||||
}
|
|
||||||
|
|
||||||
// Option used by the routing table
|
|
||||||
type Option func(*Options)
|
|
||||||
|
|
||||||
// NewTable creates new routing table and returns it
|
|
||||||
func NewTable(opts ...Option) Table {
|
|
||||||
return newTable(opts...)
|
|
||||||
}
|
|
@ -1,8 +1,8 @@
|
|||||||
package table
|
package router
|
||||||
|
|
||||||
import "testing"
|
import "testing"
|
||||||
|
|
||||||
func testSetup() (Table, Route) {
|
func testSetup() (*Table, Route) {
|
||||||
table := NewTable()
|
table := NewTable()
|
||||||
|
|
||||||
route := Route{
|
route := Route{
|
||||||
@ -18,12 +18,10 @@ func testSetup() (Table, Route) {
|
|||||||
|
|
||||||
func TestCreate(t *testing.T) {
|
func TestCreate(t *testing.T) {
|
||||||
table, route := testSetup()
|
table, route := testSetup()
|
||||||
testTableSize := table.Size()
|
|
||||||
|
|
||||||
if err := table.Create(route); err != nil {
|
if err := table.Create(route); err != nil {
|
||||||
t.Errorf("error adding route: %s", err)
|
t.Errorf("error adding route: %s", err)
|
||||||
}
|
}
|
||||||
testTableSize++
|
|
||||||
|
|
||||||
// adds new route for the original destination
|
// adds new route for the original destination
|
||||||
route.Gateway = "dest.gw2"
|
route.Gateway = "dest.gw2"
|
||||||
@ -31,11 +29,6 @@ func TestCreate(t *testing.T) {
|
|||||||
if err := table.Create(route); err != nil {
|
if err := table.Create(route); err != nil {
|
||||||
t.Errorf("error adding route: %s", err)
|
t.Errorf("error adding route: %s", err)
|
||||||
}
|
}
|
||||||
testTableSize++
|
|
||||||
|
|
||||||
if table.Size() != testTableSize {
|
|
||||||
t.Errorf("invalid number of routes. Expected: %d, found: %d", testTableSize, table.Size())
|
|
||||||
}
|
|
||||||
|
|
||||||
// adding the same route under Insert policy must error
|
// adding the same route under Insert policy must error
|
||||||
if err := table.Create(route); err != ErrDuplicateRoute {
|
if err := table.Create(route); err != ErrDuplicateRoute {
|
||||||
@ -45,12 +38,10 @@ func TestCreate(t *testing.T) {
|
|||||||
|
|
||||||
func TestDelete(t *testing.T) {
|
func TestDelete(t *testing.T) {
|
||||||
table, route := testSetup()
|
table, route := testSetup()
|
||||||
testTableSize := table.Size()
|
|
||||||
|
|
||||||
if err := table.Create(route); err != nil {
|
if err := table.Create(route); err != nil {
|
||||||
t.Errorf("error adding route: %s", err)
|
t.Errorf("error adding route: %s", err)
|
||||||
}
|
}
|
||||||
testTableSize++
|
|
||||||
|
|
||||||
// should fail to delete non-existant route
|
// should fail to delete non-existant route
|
||||||
prevSvc := route.Service
|
prevSvc := route.Service
|
||||||
@ -66,21 +57,14 @@ func TestDelete(t *testing.T) {
|
|||||||
if err := table.Delete(route); err != nil {
|
if err := table.Delete(route); err != nil {
|
||||||
t.Errorf("error deleting route: %s", err)
|
t.Errorf("error deleting route: %s", err)
|
||||||
}
|
}
|
||||||
testTableSize--
|
|
||||||
|
|
||||||
if table.Size() != testTableSize {
|
|
||||||
t.Errorf("invalid number of routes. Expected: %d, found: %d", testTableSize, table.Size())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestUpdate(t *testing.T) {
|
func TestUpdate(t *testing.T) {
|
||||||
table, route := testSetup()
|
table, route := testSetup()
|
||||||
testTableSize := table.Size()
|
|
||||||
|
|
||||||
if err := table.Create(route); err != nil {
|
if err := table.Create(route); err != nil {
|
||||||
t.Errorf("error adding route: %s", err)
|
t.Errorf("error adding route: %s", err)
|
||||||
}
|
}
|
||||||
testTableSize++
|
|
||||||
|
|
||||||
// change the metric of the original route
|
// change the metric of the original route
|
||||||
route.Metric = 200
|
route.Metric = 200
|
||||||
@ -89,22 +73,12 @@ func TestUpdate(t *testing.T) {
|
|||||||
t.Errorf("error updating route: %s", err)
|
t.Errorf("error updating route: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// the size of the table should not change as we're only updating the metric of an existing route
|
|
||||||
if table.Size() != testTableSize {
|
|
||||||
t.Errorf("invalid number of routes. Expected: %d, found: %d", testTableSize, table.Size())
|
|
||||||
}
|
|
||||||
|
|
||||||
// this should add a new route
|
// this should add a new route
|
||||||
route.Service = "rand.dest"
|
route.Service = "rand.dest"
|
||||||
|
|
||||||
if err := table.Update(route); err != nil {
|
if err := table.Update(route); err != nil {
|
||||||
t.Errorf("error updating route: %s", err)
|
t.Errorf("error updating route: %s", err)
|
||||||
}
|
}
|
||||||
testTableSize++
|
|
||||||
|
|
||||||
if table.Size() != testTableSize {
|
|
||||||
t.Errorf("invalid number of routes. Expected: %d, found: %d", testTableSize, table.Size())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestList(t *testing.T) {
|
func TestList(t *testing.T) {
|
||||||
@ -127,10 +101,6 @@ func TestList(t *testing.T) {
|
|||||||
if len(routes) != len(svc) {
|
if len(routes) != len(svc) {
|
||||||
t.Errorf("incorrect number of routes listed. Expected: %d, found: %d", len(svc), len(routes))
|
t.Errorf("incorrect number of routes listed. Expected: %d, found: %d", len(svc), len(routes))
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(routes) != table.Size() {
|
|
||||||
t.Errorf("mismatch number of routes and table size. Expected: %d, found: %d", len(routes), table.Size())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLookup(t *testing.T) {
|
func TestLookup(t *testing.T) {
|
||||||
@ -157,10 +127,6 @@ func TestLookup(t *testing.T) {
|
|||||||
t.Errorf("error looking up routes: %s", err)
|
t.Errorf("error looking up routes: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(routes) != table.Size() {
|
|
||||||
t.Errorf("incorrect number of routes returned. Expected: %d, found: %d", table.Size(), len(routes))
|
|
||||||
}
|
|
||||||
|
|
||||||
// query particular net
|
// query particular net
|
||||||
query = NewQuery(QueryNetwork("net1"))
|
query = NewQuery(QueryNetwork("net1"))
|
||||||
|
|
||||||
@ -218,8 +184,8 @@ func TestLookup(t *testing.T) {
|
|||||||
query = NewQuery(QueryService("foobar"))
|
query = NewQuery(QueryService("foobar"))
|
||||||
|
|
||||||
routes, err = table.Lookup(query)
|
routes, err = table.Lookup(query)
|
||||||
if err != nil {
|
if err != ErrRouteNotFound {
|
||||||
t.Errorf("error looking up routes: %s", err)
|
t.Errorf("error looking up routes. Expected: %s, found: %s", ErrRouteNotFound, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(routes) != 0 {
|
if len(routes) != 0 {
|
@ -1,4 +1,4 @@
|
|||||||
package table
|
package router
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
@ -22,11 +22,9 @@ const (
|
|||||||
Update
|
Update
|
||||||
)
|
)
|
||||||
|
|
||||||
// String implements fmt.Stringer
|
// String returns human readable event type
|
||||||
// NOTE: we need this as this makes converting the numeric codes
|
func (t EventType) String() string {
|
||||||
// into miro style string actions very simple
|
switch t {
|
||||||
func (et EventType) String() string {
|
|
||||||
switch et {
|
|
||||||
case Create:
|
case Create:
|
||||||
return "create"
|
return "create"
|
||||||
case Delete:
|
case Delete:
|
||||||
@ -83,8 +81,7 @@ type tableWatcher struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Next returns the next noticed action taken on table
|
// Next returns the next noticed action taken on table
|
||||||
// TODO: this needs to be thought through properly;
|
// TODO: think this through properly; right now we only watch service
|
||||||
// right now we only allow to watch service
|
|
||||||
func (w *tableWatcher) Next() (*Event, error) {
|
func (w *tableWatcher) Next() (*Event, error) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
Loading…
x
Reference in New Issue
Block a user