Neighbour is now peer. Neighbourhood is Peers. Small refactor.

This commit is contained in:
Milos Gajdos
2019-09-10 01:14:23 +01:00
parent f91d0408ab
commit 195c6a8c90
7 changed files with 385 additions and 394 deletions

View File

@@ -1,7 +1,6 @@
package network
import (
"container/list"
"errors"
"sync"
"time"
@@ -107,9 +106,9 @@ func newNetwork(opts ...Option) Network {
network := &network{
node: &node{
id: options.Id,
address: options.Address,
neighbours: make(map[string]*node),
id: options.Id,
address: options.Address,
peers: make(map[string]*node),
},
options: options,
Router: options.Router,
@@ -261,70 +260,69 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen
n.Lock()
log.Debugf("Network received connect message from: %s", pbNetConnect.Node.Id)
// if the entry already exists skip adding it
if neighbour, ok := n.neighbours[pbNetConnect.Node.Id]; ok {
if peer, ok := n.peers[pbNetConnect.Node.Id]; ok {
// update lastSeen timestamp
if n.neighbours[pbNetConnect.Node.Id].lastSeen.Before(now) {
neighbour.lastSeen = now
if n.peers[pbNetConnect.Node.Id].lastSeen.Before(now) {
peer.lastSeen = now
}
n.Unlock()
continue
}
// add a new neighbour
// NOTE: new node does not have any neighbours
n.neighbours[pbNetConnect.Node.Id] = &node{
id: pbNetConnect.Node.Id,
address: pbNetConnect.Node.Address,
neighbours: make(map[string]*node),
lastSeen: now,
// add a new peer to the node peers
// NOTE: new node does not have any peers, yet
n.peers[pbNetConnect.Node.Id] = &node{
id: pbNetConnect.Node.Id,
address: pbNetConnect.Node.Address,
peers: make(map[string]*node),
lastSeen: now,
}
n.Unlock()
// advertise yourself to the network
if err := n.sendMsg("neighbour", NetworkChannel); err != nil {
log.Debugf("Network failed to advertise neighbours: %v", err)
if err := n.sendMsg("peer", NetworkChannel); err != nil {
log.Debugf("Network failed to advertise peers: %v", err)
}
// advertise all the routes when a new node has connected
if err := n.Router.Solicit(); err != nil {
log.Debugf("Network failed to solicit routes: %s", err)
}
case "neighbour":
case "peer":
// mark the time the message has been received
now := time.Now()
pbNetNeighbour := &pbNet.Neighbour{}
if err := proto.Unmarshal(m.Body, pbNetNeighbour); err != nil {
log.Debugf("Network tunnel [%s] neighbour unmarshal error: %v", NetworkChannel, err)
pbNetPeer := &pbNet.Peer{}
if err := proto.Unmarshal(m.Body, pbNetPeer); err != nil {
log.Debugf("Network tunnel [%s] peer unmarshal error: %v", NetworkChannel, err)
continue
}
// don't process your own messages
if pbNetNeighbour.Node.Id == n.options.Id {
if pbNetPeer.Node.Id == n.options.Id {
continue
}
n.Lock()
log.Debugf("Network received neighbour message from: %s", pbNetNeighbour.Node.Id)
// only add the neighbour if it is NOT already in node's list of neighbours
_, exists := n.neighbours[pbNetNeighbour.Node.Id]
log.Debugf("Network received peer message from: %s", pbNetPeer.Node.Id)
// only add the peer if it is NOT already in node's list of peers
_, exists := n.peers[pbNetPeer.Node.Id]
if !exists {
n.neighbours[pbNetNeighbour.Node.Id] = &node{
id: pbNetNeighbour.Node.Id,
address: pbNetNeighbour.Node.Address,
neighbours: make(map[string]*node),
lastSeen: now,
n.peers[pbNetPeer.Node.Id] = &node{
id: pbNetPeer.Node.Id,
address: pbNetPeer.Node.Address,
peers: make(map[string]*node),
lastSeen: now,
}
n.Unlock()
// send a solicit message when discovering new neighbour
// send a solicit message when discovering new peer
// NOTE: we need to release the Lock here as sendMsg locsk, too
if err := n.sendMsg("solicit", ControlChannel); err != nil {
log.Debugf("Network failed to send solicit message: %s", err)
}
continue
}
// update/store the node neighbour neighbours up to (MaxDepth-1) topology depth
// NOTE: we don't update max topology depth as we dont include this network node
if err := n.node.updateNeighbour(pbNetNeighbour, MaxDepth-1); err != nil {
log.Debugf("Network failed to update neighbours")
if err := n.node.updatePeerTopology(pbNetPeer, MaxDepth-1); err != nil {
log.Debugf("Network failed to update peers")
}
// update lastSeen timestamp if outdated
if n.neighbours[pbNetNeighbour.Node.Id].lastSeen.Before(now) {
n.neighbours[pbNetNeighbour.Node.Id].lastSeen = now
if n.peers[pbNetPeer.Node.Id].lastSeen.Before(now) {
n.peers[pbNetPeer.Node.Id].lastSeen = now
}
n.Unlock()
case "close":
@@ -373,13 +371,13 @@ func (n *network) sendMsg(msgType string, channel string) error {
protoMsg = &pbNet.Solicit{
Node: node,
}
case "neighbour":
case "peer":
n.RLock()
var err error
// get all the neighbours down to MaxDepth
protoMsg, err = n.node.getNeighbours(MaxDepth)
// get all the node peers down to MaxDepth
protoMsg, err = n.node.getProtoTopology(MaxDepth)
if err != nil {
log.Debugf("Network unable to retrieve node neighbours: %s", err)
log.Debugf("Network unable to retrieve node peers: %s", err)
return err
}
n.RUnlock()
@@ -416,7 +414,7 @@ func (n *network) sendMsg(msgType string, channel string) error {
return nil
}
// announce announces node neighbourhood to the network
// announce announces node peers to the network
func (n *network) announce(client transport.Client) {
announce := time.NewTicker(AnnounceTime)
defer announce.Stop()
@@ -427,18 +425,18 @@ func (n *network) announce(client transport.Client) {
return
case <-announce.C:
// advertise yourself to the network
if err := n.sendMsg("neighbour", NetworkChannel); err != nil {
log.Debugf("Network failed to advertise neighbours: %v", err)
if err := n.sendMsg("peer", NetworkChannel); err != nil {
log.Debugf("Network failed to advertise peers: %v", err)
continue
}
}
}
}
// pruneNode removes a node with given id from the list of neighbours. It also removes all routes originted by this node.
// pruneNode removes a node with given id from the list of peers. It also removes all routes originted by this node.
// NOTE: this method is not thread-safe; when calling it make sure you lock the particular code segment
func (n *network) pruneNode(id string) error {
delete(n.neighbours, id)
delete(n.peers, id)
// lookup all the routes originated at this node
q := router.NewQuery(
router.QueryRouter(id),
@@ -470,7 +468,7 @@ func (n *network) prune() {
return
case <-prune.C:
n.Lock()
for id, node := range n.neighbours {
for id, node := range n.peers {
if id == n.options.Id {
continue
}
@@ -528,8 +526,8 @@ func (n *network) acceptCtrlConn(l tunnel.Listener, recv chan *transport.Message
// setRouteMetric calculates metric of the route and updates it in place
// - Local route metric is 1
// - Routes with ID of adjacent neighbour are 10
// - Routes of neighbours of the advertiser are 100
// - Routes with ID of adjacent nodes are 10
// - Routes by peers of the advertiser are 100
// - Routes beyond your neighbourhood are 1000
func (n *network) setRouteMetric(route *router.Route) {
// we are the origin of the route
@@ -539,16 +537,16 @@ func (n *network) setRouteMetric(route *router.Route) {
}
n.RLock()
// check if the route origin is our neighbour
if _, ok := n.neighbours[route.Router]; ok {
// check if the route origin is our peer
if _, ok := n.peers[route.Router]; ok {
route.Metric = 10
n.RUnlock()
return
}
// check if the route origin is the neighbour of our neighbour
for _, node := range n.neighbours {
for id := range node.neighbours {
// check if the route origin is the peer of our peer
for _, peer := range n.peers {
for id := range peer.peers {
if route.Router == id {
route.Metric = 100
n.RUnlock()
@@ -586,19 +584,19 @@ func (n *network) processCtrlChan(client transport.Client, listener tunnel.Liste
if pbRtrAdvert.Id == n.options.Id {
continue
}
// loookup advertising node in our neighbourhood
// loookup advertising node in our peers
n.RLock()
log.Debugf("Network received advert message from: %s", pbRtrAdvert.Id)
advertNode, ok := n.neighbours[pbRtrAdvert.Id]
advertNode, ok := n.peers[pbRtrAdvert.Id]
if !ok {
// advertising node has not been registered as our neighbour, yet
// let's add it to the map of our neighbours
// advertising node has not been registered as our peer, yet
// let's add it to our peers
advertNode = &node{
id: pbRtrAdvert.Id,
neighbours: make(map[string]*node),
lastSeen: now,
id: pbRtrAdvert.Id,
peers: make(map[string]*node),
lastSeen: now,
}
n.neighbours[pbRtrAdvert.Id] = advertNode
n.peers[pbRtrAdvert.Id] = advertNode
// send a solicit message when discovering a new node
if err := n.sendMsg("solicit", NetworkChannel); err != nil {
log.Debugf("Network failed to send solicit message: %s", err)
@@ -611,16 +609,16 @@ func (n *network) processCtrlChan(client transport.Client, listener tunnel.Liste
// set the address of the advertising node
// we know Route.Gateway is the address of advertNode
// NOTE: this is true only when advertNode had not been registered
// as our neighbour when we received the advert from it
// as our peer when we received the advert from it
if advertNode.address == "" {
advertNode.address = event.Route.Gateway
}
// if advertising node id is not the same as Route.Router
// we know the advertising node is not the origin of the route
if advertNode.id != event.Route.Router {
// if the origin router is not in the advertising node neighbourhood
// if the origin router is not the advertising node peer
// we can't rule out potential routing loops so we bail here
if _, ok := advertNode.neighbours[event.Route.Router]; !ok {
if _, ok := advertNode.peers[event.Route.Router]; !ok {
continue
}
}
@@ -821,7 +819,7 @@ func (n *network) Connect() error {
// go resolving network nodes
go n.resolve()
// broadcast neighbourhood
// broadcast peers
go n.announce(netClient)
// prune stale nodes
go n.prune()
@@ -839,48 +837,6 @@ func (n *network) Connect() error {
return nil
}
// Nodes returns a list of all network nodes
func (n *network) Nodes() []Node {
//track the visited nodes
visited := make(map[string]*node)
// queue of the nodes to visit
queue := list.New()
// we need to freeze the network graph here
// otherwise we might get invalid results
n.RLock()
defer n.RUnlock()
// push network node to the back of queue
queue.PushBack(n.node)
// mark the node as visited
visited[n.node.id] = n.node
// keep iterating over the queue until its empty
for queue.Len() > 0 {
// pop the node from the front of the queue
qnode := queue.Front()
// iterate through all of its neighbours
// mark the visited nodes; enqueue the non-visted
for id, node := range qnode.Value.(*node).neighbours {
if _, ok := visited[id]; !ok {
visited[id] = node
queue.PushBack(node)
}
}
// remove the node from the queue
queue.Remove(qnode)
}
var nodes []Node
// collect all the nodes and return them
for _, node := range visited {
nodes = append(nodes, node)
}
return nodes
}
func (n *network) close() error {
// stop the server
if err := n.server.Stop(); err != nil {