Remove Solicitation from the network

Instead, when a new peer is discovered it is sent a sync message i.e. we
do the full sync when discovering peers
This commit is contained in:
Milos Gajdos
2020-01-16 19:43:10 +00:00
parent ba12513199
commit 7f9b3b5556
7 changed files with 130 additions and 374 deletions

View File

@@ -80,8 +80,6 @@ type network struct {
closed chan bool
// whether we've discovered by the network
discovered chan bool
// solicted checks whether routes were solicited by one node
solicited chan *node
}
// message is network message
@@ -176,7 +174,6 @@ func newNetwork(opts ...Option) Network {
tunClient: make(map[string]tunnel.Session),
peerLinks: make(map[string]tunnel.Link),
discovered: make(chan bool, 1),
solicited: make(chan *node, 32),
}
network.node.network = network
@@ -346,60 +343,23 @@ func (n *network) advertise(advertChan <-chan *router.Advert) {
Events: events,
}
// send the advert to a select number of random peers
if advert.Type != router.Solicitation {
// get a list of node peers
peers := n.Peers()
// there is no one to send to
if len(peers) == 0 {
continue
}
// advertise to max 3 peers
max := len(peers)
if max > 3 {
max = 3
}
for i := 0; i < max; i++ {
if peer := n.node.GetPeerNode(peers[rnd.Intn(len(peers))].Id()); peer != nil {
if err := n.sendTo("advert", ControlChannel, peer, msg); err != nil {
log.Debugf("Network failed to advertise routes to %s: %v", peer.Id(), err)
}
}
}
// get a list of node peers
peers := n.Peers()
// continue if there is no one to send to
if len(peers) == 0 {
continue
}
// it's a solication, someone asked for it
// so we're going to pick off the node and send it
select {
case peer := <-n.solicited:
// someone requested the route
n.sendTo("advert", ControlChannel, peer, msg)
default:
// get a list of node peers
peers := n.Peers()
// advertise to max 3 peers
max := len(peers)
if max > 3 {
max = 3
}
// only proceed if we have a peer
if len(peers) == 0 {
continue
}
// pick a random peer from the list of peers
peer := n.node.GetPeerNode(peers[rnd.Intn(len(peers))].Id())
// only proceed with a peer
if peer == nil {
continue
}
// attempt to send the advert to the peer
if err := n.sendTo("advert", ControlChannel, peer, msg); err != nil {
log.Debugf("Network failed to advertise routes to %s: %v, sending multicast", peer.Id(), err)
// send a multicast message if we fail to send Unicast message
if err := n.sendMsg("advert", ControlChannel, msg); err != nil {
for i := 0; i < max; i++ {
if peer := n.node.GetPeerNode(peers[rnd.Intn(len(peers))].Id()); peer != nil {
if err := n.sendTo("advert", ControlChannel, peer, msg); err != nil {
log.Debugf("Network failed to advertise routes to %s: %v", peer.Id(), err)
}
}
@@ -740,38 +700,6 @@ func (n *network) processCtrlChan(listener tunnel.Listener) {
if err := n.router.Process(advert); err != nil {
log.Debugf("Network failed to process advert %s: %v", advert.Id, err)
}
case "solicit":
pbRtrSolicit := new(pbRtr.Solicit)
if err := proto.Unmarshal(m.msg.Body, pbRtrSolicit); err != nil {
log.Debugf("Network fail to unmarshal solicit message: %v", err)
continue
}
log.Debugf("Network received solicit message from: %s", pbRtrSolicit.Id)
// ignore solicitation when requested by you
if pbRtrSolicit.Id == n.options.Id {
continue
}
log.Tracef("Network router flushing routes for: %s", pbRtrSolicit.Id)
peer := &node{
id: pbRtrSolicit.Id,
link: m.msg.Header["Micro-Link"],
}
// specify that someone solicited the route
select {
case n.solicited <- peer:
default:
// don't block
}
// advertise all the routes when a new node has connected
if err := n.router.Solicit(); err != nil {
log.Debugf("Network failed to solicit routes: %s", err)
}
}
case <-n.closed:
return
@@ -851,54 +779,17 @@ func (n *network) processNetChan(listener tunnel.Listener) {
}
// get a list of the best routes for each service in our routing table
q := []router.QueryOption{
router.QueryStrategy(n.router.Options().Advertise),
}
routes, err := n.router.Table().Query(q...)
switch err {
case nil:
// encode the routes to protobuf
pbRoutes := make([]*pbRtr.Route, 0, len(routes))
for _, route := range routes {
// generate new route proto
pbRoute := pbUtil.RouteToProto(route)
// mask the route before outbounding
n.maskRoute(pbRoute)
// add to list of routes
pbRoutes = append(pbRoutes, pbRoute)
}
// pack the routes into the sync message
msg.Routes = pbRoutes
default:
// we can't list the routes
routes, err := n.getProtoRoutes()
if err != nil {
log.Debugf("Network node %s failed listing routes: %v", n.id, err)
}
// attached the routes to the message
msg.Routes = routes
// send sync message to the newly connected peer
if err := n.sendTo("sync", NetworkChannel, peer, msg); err != nil {
log.Debugf("Network failed to send sync message: %v", err)
}
// wait for a short period of time before sending a solicit message
<-time.After(time.Millisecond * 100)
// send a solicit message when discovering new peer
// this triggers the node to flush its routing table to the network
// and leads to faster convergence of the network
solicit := &pbRtr.Solicit{
Id: n.options.Id,
}
// ask for the new nodes routes
if err := n.sendTo("solicit", ControlChannel, peer, solicit); err != nil {
log.Debugf("Network failed to send solicit message: %s", err)
}
// now advertise our own routes
select {
case n.solicited <- peer:
default:
// don't block
}
}()
case "peer":
// mark the time the message has been received
@@ -934,38 +825,27 @@ func (n *network) processNetChan(listener tunnel.Listener) {
log.Debugf("Network failed updating peer links: %s", err)
}
// if it's a new peer i.e. we do not have it in our graph, we solicit its routes
// if it's a new peer i.e. we do not have it in our graph, we request full sync
if err := n.node.AddPeer(peer); err == nil {
go func() {
msg := PeersToProto(n.node, MaxDepth)
// marshal node graph into protobuf
node := PeersToProto(n.node, MaxDepth)
// advertise yourself to the peer
if err := n.sendTo("peer", NetworkChannel, peer, msg); err != nil {
log.Debugf("Network failed to advertise peers: %v", err)
msg := &pbNet.Sync{
Peer: node,
}
<-time.After(time.Millisecond * 100)
// send a solicit message when discovering new peer
solicit := &pbRtr.Solicit{
Id: n.options.Id,
// get a list of the best routes for each service in our routing table
routes, err := n.getProtoRoutes()
if err != nil {
log.Debugf("Network node %s failed listing routes: %v", n.id, err)
}
// attached the routes to the message
msg.Routes = routes
// then solicit this peer
if err := n.sendTo("solicit", ControlChannel, peer, solicit); err != nil {
log.Debugf("Network failed to send solicit message: %s", err)
}
// now advertise our own routes
select {
case n.solicited <- peer:
default:
// don't block
}
// advertise all the routes when a new node has connected
if err := n.router.Solicit(); err != nil {
log.Debugf("Network failed to solicit routes: %s", err)
// send sync message to the newly connected peer
if err := n.sendTo("sync", NetworkChannel, peer, msg); err != nil {
log.Debugf("Network failed to send sync message: %v", err)
}
}()
@@ -1377,28 +1257,12 @@ func (n *network) manage() {
}
// get a list of the best routes for each service in our routing table
q := []router.QueryOption{
router.QueryStrategy(n.router.Options().Advertise),
}
routes, err := n.router.Table().Query(q...)
switch err {
case nil:
// encode the routes to protobuf
pbRoutes := make([]*pbRtr.Route, 0, len(routes))
for _, route := range routes {
// generate new route proto
pbRoute := pbUtil.RouteToProto(route)
// mask the route before outbounding
n.maskRoute(pbRoute)
// add to list of routes
pbRoutes = append(pbRoutes, pbRoute)
}
// pack the routes into the sync message
msg.Routes = pbRoutes
default:
// we can't list the routes
routes, err := n.getProtoRoutes()
if err != nil {
log.Debugf("Network node %s failed listing routes: %v", n.id, err)
}
// attached the routes to the message
msg.Routes = routes
// send sync message to the newly connected peer
if err := n.sendTo("sync", NetworkChannel, peer, msg); err != nil {
@@ -1411,6 +1275,33 @@ func (n *network) manage() {
}
}
// getAdvertProtoRoutes returns a list of routes to advertise to remote peer
// based on the advertisement strategy encoded in protobuf
// It returns error if the routes failed to be retrieved from the routing table
func (n *network) getProtoRoutes() ([]*pbRtr.Route, error) {
// get a list of the best routes for each service in our routing table
q := []router.QueryOption{
router.QueryStrategy(n.router.Options().Advertise),
}
routes, err := n.router.Table().Query(q...)
if err != nil {
return nil, err
}
// encode the routes to protobuf
pbRoutes := make([]*pbRtr.Route, 0, len(routes))
for _, route := range routes {
// generate new route proto
pbRoute := pbUtil.RouteToProto(route)
// mask the route before outbounding
n.maskRoute(pbRoute)
// add to list of routes
pbRoutes = append(pbRoutes, pbRoute)
}
return pbRoutes, nil
}
func (n *network) sendConnect() {
// send connect message to NetworkChannel
// NOTE: in theory we could do this as soon as
@@ -1822,7 +1713,6 @@ func (n *network) Close() error {
n.Unlock()
return nil
default:
// TODO: send close message to the network channel
close(n.closed)
// set connected to false