Strip Advertise/Process from router
This commit is contained in:
@@ -317,44 +317,43 @@ func (n *mucpNetwork) maskRoute(r *pb.Route) {
|
||||
}
|
||||
|
||||
// advertise advertises routes to the network
|
||||
func (n *mucpNetwork) advertise(advertChan <-chan *router.Advert) {
|
||||
func (n *mucpNetwork) advertise(eventChan <-chan *router.Event) {
|
||||
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
|
||||
for {
|
||||
select {
|
||||
// process local adverts and randomly fire them at other nodes
|
||||
case advert := <-advertChan:
|
||||
// process local events and randomly fire them at other nodes
|
||||
case event := <-eventChan:
|
||||
// create a proto advert
|
||||
var events []*pb.Event
|
||||
var pbEvents []*pb.Event
|
||||
|
||||
for _, event := range advert.Events {
|
||||
// make a copy of the route
|
||||
route := &pb.Route{
|
||||
Service: event.Route.Service,
|
||||
Address: event.Route.Address,
|
||||
Gateway: event.Route.Gateway,
|
||||
Network: event.Route.Network,
|
||||
Router: event.Route.Router,
|
||||
Link: event.Route.Link,
|
||||
Metric: event.Route.Metric,
|
||||
}
|
||||
|
||||
// override the various values
|
||||
n.maskRoute(route)
|
||||
|
||||
e := &pb.Event{
|
||||
Type: pb.EventType(event.Type),
|
||||
Timestamp: event.Timestamp.UnixNano(),
|
||||
Route: route,
|
||||
}
|
||||
|
||||
events = append(events, e)
|
||||
// make a copy of the route
|
||||
route := &pb.Route{
|
||||
Service: event.Route.Service,
|
||||
Address: event.Route.Address,
|
||||
Gateway: event.Route.Gateway,
|
||||
Network: event.Route.Network,
|
||||
Router: event.Route.Router,
|
||||
Link: event.Route.Link,
|
||||
Metric: event.Route.Metric,
|
||||
}
|
||||
|
||||
// override the various values
|
||||
n.maskRoute(route)
|
||||
|
||||
e := &pb.Event{
|
||||
Type: pb.EventType(event.Type),
|
||||
Timestamp: event.Timestamp.UnixNano(),
|
||||
Route: route,
|
||||
}
|
||||
|
||||
pbEvents = append(pbEvents, e)
|
||||
|
||||
msg := &pb.Advert{
|
||||
Id: advert.Id,
|
||||
Type: pb.AdvertType(advert.Type),
|
||||
Timestamp: advert.Timestamp.UnixNano(),
|
||||
Events: events,
|
||||
Id: n.Id(),
|
||||
Type: pb.AdvertType(event.Type),
|
||||
Timestamp: event.Timestamp.UnixNano(),
|
||||
Events: pbEvents,
|
||||
}
|
||||
|
||||
// get a list of node peers
|
||||
@@ -673,9 +672,10 @@ func (n *mucpNetwork) processCtrlChan(listener tunnel.Listener) {
|
||||
}
|
||||
|
||||
// don't process your own messages
|
||||
if pbAdvert.Id == n.options.Id {
|
||||
if pbAdvert.Id == n.Id() {
|
||||
continue
|
||||
}
|
||||
|
||||
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
|
||||
logger.Debugf("Network received advert message from: %s", pbAdvert.Id)
|
||||
}
|
||||
@@ -690,8 +690,6 @@ func (n *mucpNetwork) processCtrlChan(listener tunnel.Listener) {
|
||||
continue
|
||||
}
|
||||
|
||||
var events []*router.Event
|
||||
|
||||
for _, event := range pbAdvert.Events {
|
||||
// for backwards compatibility reasons
|
||||
if event == nil || event.Route == nil {
|
||||
@@ -736,38 +734,11 @@ func (n *mucpNetwork) processCtrlChan(listener tunnel.Listener) {
|
||||
route.Metric = d
|
||||
}
|
||||
|
||||
// create router event
|
||||
e := &router.Event{
|
||||
Type: router.EventType(event.Type),
|
||||
Timestamp: time.Unix(0, pbAdvert.Timestamp),
|
||||
Route: route,
|
||||
}
|
||||
events = append(events, e)
|
||||
}
|
||||
|
||||
// if no events are eligible for processing continue
|
||||
if len(events) == 0 {
|
||||
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
|
||||
logger.Tracef("Network no events to be processed by router: %s", n.options.Id)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// create an advert and process it
|
||||
advert := &router.Advert{
|
||||
Id: pbAdvert.Id,
|
||||
Type: router.AdvertType(pbAdvert.Type),
|
||||
Timestamp: time.Unix(0, pbAdvert.Timestamp),
|
||||
TTL: time.Duration(pbAdvert.Ttl),
|
||||
Events: events,
|
||||
}
|
||||
|
||||
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
|
||||
logger.Tracef("Network router %s processing advert: %s", n.Id(), advert.Id)
|
||||
}
|
||||
if err := n.router.Process(advert); err != nil {
|
||||
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
|
||||
logger.Debugf("Network failed to process advert %s: %v", advert.Id, err)
|
||||
// update the local table
|
||||
if err := n.router.Table().Update(route); err != nil {
|
||||
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
|
||||
logger.Debugf("Network failed to process advert %s: %v", event.Id, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1024,13 +995,9 @@ func (n *mucpNetwork) processNetChan(listener tunnel.Listener) {
|
||||
route.Metric = d
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////////////////////////
|
||||
// maybe we should not be this clever ¯\_(ツ)_/¯ //
|
||||
/////////////////////////////////////////////////////////////////////
|
||||
// lookup best routes for the services in the just received route
|
||||
q := []router.QueryOption{
|
||||
router.QueryService(route.Service),
|
||||
router.QueryStrategy(n.router.Options().Advertise),
|
||||
router.QueryLink(route.Link),
|
||||
}
|
||||
|
||||
routes, err := n.router.Table().Query(q...)
|
||||
@@ -1068,8 +1035,6 @@ func (n *mucpNetwork) processNetChan(listener tunnel.Listener) {
|
||||
if bestRoute.Metric <= route.Metric {
|
||||
continue
|
||||
}
|
||||
///////////////////////////////////////////////////////////////////////
|
||||
///////////////////////////////////////////////////////////////////////
|
||||
|
||||
// add route to the routing table
|
||||
if err := n.router.Table().Create(route); err != nil && err != router.ErrDuplicateRoute {
|
||||
@@ -1168,6 +1133,7 @@ func (n *mucpNetwork) prunePeerRoutes(peer *node) error {
|
||||
// lookup all routes originated by router
|
||||
q := []router.QueryOption{
|
||||
router.QueryRouter(peer.id),
|
||||
router.QueryLink("*"),
|
||||
}
|
||||
if err := n.pruneRoutes(q...); err != nil {
|
||||
return err
|
||||
@@ -1176,6 +1142,7 @@ func (n *mucpNetwork) prunePeerRoutes(peer *node) error {
|
||||
// lookup all routes routable via gw
|
||||
q = []router.QueryOption{
|
||||
router.QueryGateway(peer.address),
|
||||
router.QueryLink("*"),
|
||||
}
|
||||
if err := n.pruneRoutes(q...); err != nil {
|
||||
return err
|
||||
@@ -1417,12 +1384,7 @@ func (n *mucpNetwork) manage() {
|
||||
// based on the advertisement strategy encoded in protobuf
|
||||
// It returns error if the routes failed to be retrieved from the routing table
|
||||
func (n *mucpNetwork) getProtoRoutes() ([]*pb.Route, error) {
|
||||
// get a list of the best routes for each service in our routing table
|
||||
q := []router.QueryOption{
|
||||
router.QueryStrategy(n.router.Options().Advertise),
|
||||
}
|
||||
|
||||
routes, err := n.router.Table().Query(q...)
|
||||
routes, err := n.router.Table().List()
|
||||
if err != nil && err != router.ErrRouteNotFound {
|
||||
return nil, err
|
||||
}
|
||||
@@ -1768,7 +1730,12 @@ func (n *mucpNetwork) Connect() error {
|
||||
n.closed = make(chan bool)
|
||||
|
||||
// start advertising routes
|
||||
advertChan, err := n.options.Router.Advertise()
|
||||
watcher, err := n.options.Router.Watch()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
advertChan, err := watcher.Chan()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
Reference in New Issue
Block a user