diff --git a/network/router/default_router.go b/network/router/default_router.go index 9a7b933e..340c63f5 100644 --- a/network/router/default_router.go +++ b/network/router/default_router.go @@ -2,6 +2,7 @@ package router import ( "fmt" + "sort" "strings" "sync" "time" @@ -10,6 +11,15 @@ import ( "github.com/olekukonko/tablewriter" ) +const ( + // UpdateRoutePenalty penalises route updates + UpdateRoutePenalty = 500 + // DeleteRoutePenalty penalises route deletes + DeleteRoutePenalty = 1000 + // AdvertiseTick is time interval in which we advertise route updates + AdvertiseTick = 5 * time.Second +) + // router provides default router implementation type router struct { opts Options @@ -79,7 +89,7 @@ func (r *router) Network() string { func (r *router) addServiceRoutes(reg registry.Registry, network string, metric int) error { services, err := reg.ListServices() if err != nil { - return fmt.Errorf("failed to list services: %v", err) + return fmt.Errorf("failed listing services: %v", err) } // add each service node as a separate route @@ -148,12 +158,12 @@ func (r *router) manageServiceRoutes(w registry.Watcher, metric int) error { case "create": // only return error if the route is not duplicate, but something else has failed if err := r.opts.Table.Add(route); err != nil && err != ErrDuplicateRoute { - return fmt.Errorf("failed to add route for service %v: %s", res.Service.Name, err) + return fmt.Errorf("failed adding route for service %v: %s", res.Service.Name, err) } case "delete": // only return error if the route is not in the table, but something else has failed if err := r.opts.Table.Delete(route); err != nil && err != ErrRouteNotFound { - return fmt.Errorf("failed to delete route for service %v: %s", res.Service.Name, err) + return fmt.Errorf("failed adding route for service %v: %s", res.Service.Name, err) } } } @@ -175,7 +185,6 @@ func (r *router) watchTable(w Watcher) error { var watchErr error -exit: for { event, err := w.Next() if err != nil { @@ -188,12 +197,13 @@ exit: u := &Update{ ID: r.ID(), Timestamp: time.Now(), - Event: event, + Events: []*Event{event}, } select { case <-r.exit: - break exit + close(r.advertChan) + return watchErr case r.advertChan <- u: } } @@ -258,7 +268,7 @@ func (r *router) Advertise() (<-chan *Update, error) { Metric: DefaultLocalMetric, } if err := r.opts.Table.Add(route); err != nil { - return nil, fmt.Errorf("error to add default gateway route: %s", err) + return nil, fmt.Errorf("failed adding default gateway route: %s", err) } } @@ -271,12 +281,12 @@ func (r *router) Advertise() (<-chan *Update, error) { // routing table watcher which watches all routes i.e. to every destination tableWatcher, err := r.opts.Table.Watch(WatchDestination("*")) if err != nil { - return nil, fmt.Errorf("failed to create routing table watcher: %v", err) + return nil, fmt.Errorf("failed creating routing table watcher: %v", err) } // registry watcher regWatcher, err := r.opts.Registry.Watch() if err != nil { - return nil, fmt.Errorf("failed to create registry watcher: %v", err) + return nil, fmt.Errorf("failed creating registry watcher: %v", err) } // error channel collecting goroutine errors @@ -311,18 +321,32 @@ func (r *router) Advertise() (<-chan *Update, error) { } // Update updates the routing table using the advertised values -func (r *router) Update(a *Update) error { - // we extract the route from advertisement and update the routing table - route := Route{ - Destination: a.Event.Route.Destination, - Gateway: a.Event.Route.Gateway, - Router: a.Event.Route.Router, - Network: a.Event.Route.Network, - Metric: a.Event.Route.Metric, - Policy: AddIfNotExists, +func (r *router) Update(u *Update) error { + // NOTE: event sorting might not be necessary + // copy update events intp new slices + events := make([]*Event, len(u.Events)) + copy(events, u.Events) + // sort events by timestamp + sort.Slice(events, func(i, j int) bool { + return events[i].Timestamp.Before(events[j].Timestamp) + }) + + for _, event := range events { + // we extract the route from advertisement and update the routing table + route := Route{ + Destination: event.Route.Destination, + Gateway: event.Route.Gateway, + Router: event.Route.Router, + Network: event.Route.Network, + Metric: event.Route.Metric, + Policy: AddIfNotExists, + } + if err := r.opts.Table.Update(route); err != nil { + return fmt.Errorf("failed updating routing table: %v", err) + } } - return r.opts.Table.Update(route) + return nil } // Status returns router status diff --git a/network/router/default_table.go b/network/router/default_table.go index 3297c028..bf4e9646 100644 --- a/network/router/default_table.go +++ b/network/router/default_table.go @@ -138,6 +138,7 @@ func (t *table) Update(r Route) error { return ErrRouteNotFound } + // check if destination has this particular router in the table if _, ok := t.m[destAddr][sum]; !ok && r.Policy == AddIfNotExists { t.m[destAddr][sum] = r go t.sendEvent(&Event{Type: CreateEvent, Route: r}) diff --git a/network/router/router.go b/network/router/router.go index e45c0baa..e5ff9a18 100644 --- a/network/router/router.go +++ b/network/router/router.go @@ -34,14 +34,39 @@ type Router interface { String() string } +// Option used by the router +type Option func(*Options) + +// UpdateType is route advertisement update type +type UpdateType int + +const ( + // Announce is advertised when the router announces itself + Announce UpdateType = iota + // RouteEvent advertises route events + RouteEvent +) + +// String returns string representation of update event +func (ut UpdateType) String() string { + switch ut { + case Announce: + return "ANNOUNCE" + case RouteEvent: + return "ROUTE" + default: + return "UNKNOWN" + } +} + // Update is sent by the router to the network type Update struct { // ID is the router ID ID string - // Timestamp marks the time when update is sent + // Timestamp marks the time when the update is sent Timestamp time.Time - // Event defines advertisement even - Event *Event + // Events is a list of events to advertise + Events []*Event } // StatusCode defines router status @@ -58,12 +83,12 @@ type Status struct { const ( // Init means the rotuer has just been initialized Init StatusCode = iota - // Running means the router is running + // Running means the router is up and running Running - // Error means the router has crashed with error - Error - // Stopped means the router has stopped + // Stopped means the router has been stopped Stopped + // Error means the router has encountered error + Error ) // String returns human readable status code @@ -73,18 +98,15 @@ func (sc StatusCode) String() string { return "INITIALIZED" case Running: return "RUNNING" - case Error: - return "ERROR" case Stopped: return "STOPPED" + case Error: + return "ERROR" default: return "UNKNOWN" } } -// Option used by the router -type Option func(*Options) - // NewRouter creates new Router and returns it func NewRouter(opts ...Option) Router { return newRouter(opts...) diff --git a/network/router/table_watcher.go b/network/router/table_watcher.go index 91411247..976fa8af 100644 --- a/network/router/table_watcher.go +++ b/network/router/table_watcher.go @@ -3,6 +3,7 @@ package router import ( "errors" "strings" + "time" "github.com/olekukonko/tablewriter" ) @@ -16,11 +17,11 @@ var ( type EventType int const ( - // CreateEvent is emitted when new route has been created + // CreateEvent is emitted when a new route has been created CreateEvent EventType = iota // DeleteEvent is emitted when an existing route has been deleted DeleteEvent - // UpdateEvent is emitted when a routing table has been updated + // UpdateEvent is emitted when an existing route has been updated UpdateEvent ) @@ -42,6 +43,8 @@ func (et EventType) String() string { type Event struct { // Type defines type of event Type EventType + // Timestamp is event timestamp + Timestamp time.Time // Route is table rout Route Route } @@ -81,18 +84,16 @@ type tableWatcher struct { } // Next returns the next noticed action taken on table -// TODO: this needs to be thought through properly; we only allow watching particular route destination for now +// TODO: this needs to be thought through properly; +// right now we only allow to watch destination func (w *tableWatcher) Next() (*Event, error) { for { select { case res := <-w.resChan: switch w.opts.Destination { - case "*", "": + case res.Route.Destination, "*": return res, nil default: - if w.opts.Destination == res.Route.Destination { - return res, nil - } continue } case <-w.done: