Merge pull request #761 from milosgajdos83/delete-peer-gw

Delete dead peer [gateway] routes
This commit is contained in:
Asim Aslam 2019-09-17 16:54:35 +01:00 committed by GitHub
commit e586763301
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 224 additions and 113 deletions

View File

@ -263,11 +263,11 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen
peers: make(map[string]*node),
lastSeen: now,
}
if ok := n.node.AddPeer(peer); !ok {
if err := n.node.AddPeer(peer); err == ErrPeerExists {
log.Debugf("Network peer exists, refreshing: %s", peer.id)
// update lastSeen time for the existing node
if ok := n.RefreshPeer(peer.id, now); !ok {
log.Debugf("Network failed refreshing peer: %s", peer.id)
if err := n.RefreshPeer(peer.id, now); err != nil {
log.Debugf("Network failed refreshing peer %s: %v", peer.id, err)
}
continue
}
@ -300,7 +300,7 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen
peers: make(map[string]*node),
lastSeen: now,
}
if ok := n.node.AddPeer(peer); ok {
if err := n.node.AddPeer(peer); err == ErrPeerExists {
// send a solicit message when discovering new peer
msg := &pbRtr.Solicit{
Id: n.options.Id,
@ -312,14 +312,14 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen
}
log.Debugf("Network peer exists, refreshing: %s", pbNetPeer.Node.Id)
// update lastSeen time for the peer
if ok := n.RefreshPeer(pbNetPeer.Node.Id, now); !ok {
log.Debugf("Network failed refreshing peer: %s", pbNetPeer.Node.Id)
if err := n.RefreshPeer(pbNetPeer.Node.Id, now); err != nil {
log.Debugf("Network failed refreshing peer %s: %v", pbNetPeer.Node.Id, err)
}
// NOTE: we don't unpack MaxDepth toplogy
peer = UnpackPeerTopology(pbNetPeer, now, MaxDepth-1)
log.Debugf("Network updating topology of node: %s", n.node.id)
if ok := n.node.UpdatePeer(peer); !ok {
log.Debugf("Network failed to update peers")
if err := n.node.UpdatePeer(peer); err != nil {
log.Debugf("Network failed to update peers: %v", err)
}
case "close":
pbNetClose := &pbNet.Close{}
@ -332,9 +332,15 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen
continue
}
log.Debugf("Network received close message from: %s", pbNetClose.Node.Id)
if err := n.pruneNode(pbNetClose.Node.Id); err != nil {
log.Debugf("Network failed to prune the node %s: %v", pbNetClose.Node.Id, err)
continue
peer := &node{
id: pbNetClose.Node.Id,
address: pbNetClose.Node.Address,
}
if err := n.DeletePeerNode(peer.id); err != nil {
log.Debugf("Network failed to delete node %s routes: %v", peer.id, err)
}
if err := n.prunePeerRoutes(peer); err != nil {
log.Debugf("Network failed pruning peer %s routes: %v", peer.id, err)
}
}
case <-n.closed:
@ -394,20 +400,13 @@ func (n *network) announce(client transport.Client) {
}
}
// pruneNode removes a node with given id from the list of peers. It also removes all routes originted by this node.
func (n *network) pruneNode(id string) error {
// DeletePeer serializes access
n.node.DeletePeer(id)
// lookup all the routes originated at this node
q := router.NewQuery(
router.QueryRouter(id),
)
// pruneRoutes prunes routes return by given query
func (n *network) pruneRoutes(q router.Query) error {
routes, err := n.Router.Table().Query(q)
if err != nil && err != router.ErrRouteNotFound {
return err
}
// delete the found routes
log.Logf("Network deleting routes originated by router: %s", id)
for _, route := range routes {
if err := n.Router.Table().Delete(route); err != nil && err != router.ErrRouteNotFound {
return err
@ -417,8 +416,29 @@ func (n *network) pruneNode(id string) error {
return nil
}
// prune the nodes that have not been seen for certain period of time defined by PruneTime
// Additionally, prune also removes all the routes originated by these nodes
// pruneNodeRoutes prunes routes that were either originated by or routable via given node
func (n *network) prunePeerRoutes(peer *node) error {
// lookup all routes originated by router
q := router.NewQuery(
router.QueryRouter(peer.id),
)
if err := n.pruneRoutes(q); err != nil {
return err
}
// lookup all routes routable via gw
q = router.NewQuery(
router.QueryGateway(peer.id),
)
if err := n.pruneRoutes(q); err != nil {
return err
}
return nil
}
// prune deltes node peers that have not been seen for longer than PruneTime seconds
// prune also removes all the routes either originated by or routable by the stale nodes
func (n *network) prune() {
prune := time.NewTicker(PruneTime)
defer prune.Stop()
@ -428,20 +448,13 @@ func (n *network) prune() {
case <-n.closed:
return
case <-prune.C:
n.Lock()
for id, node := range n.peers {
if id == n.options.Id {
continue
}
if time.Since(node.lastSeen) > PruneTime {
log.Debugf("Network deleting node %s: reached prune time threshold", id)
if err := n.pruneNode(id); err != nil {
log.Debugf("Network failed to prune the node %s: %v", id, err)
continue
}
pruned := n.PruneStalePeerNodes(PruneTime)
for id, peer := range pruned {
log.Debugf("Network peer exceeded prune time: %s", id)
if err := n.prunePeerRoutes(peer); err != nil {
log.Debugf("Network failed pruning peer %s routes: %v", id, err)
}
}
n.Unlock()
}
}
}

View File

@ -2,6 +2,7 @@ package network
import (
"container/list"
"errors"
"sync"
"time"
@ -13,6 +14,13 @@ var (
MaxDepth uint = 3
)
var (
// ErrPeerExists is returned when adding a peer which already exists
ErrPeerExists = errors.New("peer already exists")
// ErrPeerNotFound is returned when a peer could not be found in node topology
ErrPeerNotFound = errors.New("peer not found")
)
// node is network node
type node struct {
sync.RWMutex
@ -22,6 +30,8 @@ type node struct {
address string
// peers are nodes with direct link to this node
peers map[string]*node
// edges store the node edges
edges map[string]map[string]*node
// network returns the node network
network Network
// lastSeen keeps track of node lifetime and updates
@ -43,70 +53,8 @@ func (n *node) Network() Network {
return n.network
}
// AddPeer adds a new peer to node topology
// It returns false if the peer already exists
func (n *node) AddPeer(peer *node) bool {
n.Lock()
defer n.Unlock()
if _, ok := n.peers[peer.id]; !ok {
n.peers[peer.id] = peer
return true
}
return false
}
// UpdatePeer updates a peer if it exists
// It returns false if the peer does not exist
func (n *node) UpdatePeer(peer *node) bool {
n.Lock()
defer n.Unlock()
if _, ok := n.peers[peer.id]; ok {
n.peers[peer.id] = peer
return true
}
return false
}
// DeletePeer deletes a peer if it exists
func (n *node) DeletePeer(id string) {
n.Lock()
defer n.Unlock()
delete(n.peers, id)
}
// HasPeer returns true if node has peer with given id
func (n *node) HasPeer(id string) bool {
n.RLock()
defer n.RUnlock()
_, ok := n.peers[id]
return ok
}
// RefreshPeer updates node timestamp
// It returns false if the peer has not been found.
func (n *node) RefreshPeer(id string, now time.Time) bool {
n.Lock()
defer n.Unlock()
peer, ok := n.peers[id]
if !ok {
return false
}
if peer.lastSeen.Before(now) {
peer.lastSeen = now
}
return true
}
func (n *node) walk(until func(peer *node) bool) map[string]*node {
// walk walks the node graph until some condition is met
func (n *node) walk(until func(peer *node) bool, action func(parent, peer *node)) map[string]*node {
// track the visited nodes
visited := make(map[string]*node)
// queue of the nodes to visit
@ -121,15 +69,16 @@ func (n *node) walk(until func(peer *node) bool) map[string]*node {
for queue.Len() > 0 {
// pop the node from the front of the queue
qnode := queue.Front()
if until(qnode.Value.(*node)) {
return visited
}
// iterate through all of the node peers
// mark the visited nodes; enqueue the non-visted
for id, node := range qnode.Value.(*node).peers {
for id, peer := range qnode.Value.(*node).peers {
if _, ok := visited[id]; !ok {
visited[id] = node
queue.PushBack(node)
}
if until(node) {
return visited
visited[id] = peer
action(qnode.Value.(*node), peer)
queue.PushBack(peer)
}
}
// remove the node from the queue
@ -139,7 +88,64 @@ func (n *node) walk(until func(peer *node) bool) map[string]*node {
return visited
}
// Nodes returns a slice if all nodes in node topology
// AddPeer adds a new peer to node topology
// It returns false if the peer already exists
func (n *node) AddPeer(peer *node) error {
n.Lock()
defer n.Unlock()
if _, ok := n.peers[peer.id]; !ok {
n.peers[peer.id] = peer
return nil
}
return ErrPeerExists
}
// DeletePeer deletes a peer from node peers
// It returns true if the peer has been deleted
func (n *node) DeletePeer(id string) bool {
n.Lock()
defer n.Unlock()
delete(n.peers, id)
return true
}
// UpdatePeer updates a peer if it already exists
// It returns error if the peer does not exist
func (n *node) UpdatePeer(peer *node) error {
n.Lock()
defer n.Unlock()
if _, ok := n.peers[peer.id]; ok {
n.peers[peer.id] = peer
return nil
}
return ErrPeerNotFound
}
// RefreshPeer updates node timestamp
// It returns false if the peer has not been found.
func (n *node) RefreshPeer(id string, now time.Time) error {
n.Lock()
defer n.Unlock()
peer, ok := n.peers[id]
if !ok {
return ErrPeerNotFound
}
if peer.lastSeen.Before(now) {
peer.lastSeen = now
}
return nil
}
// Nodes returns a slice of all nodes in the whole node topology
func (n *node) Nodes() []Node {
// we need to freeze the network graph here
// otherwise we might get inconsisten results
@ -147,11 +153,12 @@ func (n *node) Nodes() []Node {
defer n.RUnlock()
// NOTE: this should never be true
untilNoMorePeers := func(n *node) bool {
return n == nil
untilNoMorePeers := func(node *node) bool {
return node == nil
}
justWalk := func(parent, node *node) {}
visited := n.walk(untilNoMorePeers)
visited := n.walk(untilNoMorePeers, justWalk)
var nodes []Node
// collect all the nodes and return them
@ -162,8 +169,8 @@ func (n *node) Nodes() []Node {
return nodes
}
// GetPeerNode returns a peer from node topology i.e. up to MaxDepth
// It returns nil if the peer was not found in the node topology
// GetPeerNode returns a node from node MaxDepth topology
// It returns nil if the peer was not found
func (n *node) GetPeerNode(id string) *node {
n.RLock()
defer n.RUnlock()
@ -174,8 +181,9 @@ func (n *node) GetPeerNode(id string) *node {
untilFoundPeer := func(n *node) bool {
return n.id == id
}
justWalk := func(paent, node *node) {}
visited := top.walk(untilFoundPeer)
visited := top.walk(untilFoundPeer, justWalk)
peerNode, ok := visited[id]
if !ok {
@ -185,7 +193,57 @@ func (n *node) GetPeerNode(id string) *node {
return peerNode
}
// Topology returns a copy of th node topology down to given depth
// DeletePeerNode removes peer node from node topology
func (n *node) DeletePeerNode(id string) error {
n.Lock()
n.Unlock()
untilNoMorePeers := func(node *node) bool {
return node == nil
}
deleted := make(map[string]*node)
deletePeer := func(parent, node *node) {
if node.id != n.id && node.id == id {
delete(parent.peers, node.id)
deleted[node.id] = node
}
}
n.walk(untilNoMorePeers, deletePeer)
if _, ok := deleted[id]; !ok {
return ErrPeerNotFound
}
return nil
}
// PruneStalePeerNodes prune the peers that have not been seen for longer than given time
// It returns a map of the the nodes that got pruned
func (n *node) PruneStalePeerNodes(pruneTime time.Duration) map[string]*node {
n.Lock()
n.Unlock()
untilNoMorePeers := func(node *node) bool {
return node == nil
}
pruned := make(map[string]*node)
pruneStalePeer := func(parent, node *node) {
if node.id != n.id && time.Since(node.lastSeen) > PruneTime {
delete(parent.peers, node.id)
pruned[node.id] = node
}
}
n.walk(untilNoMorePeers, pruneStalePeer)
return pruned
}
// Topology returns a copy of the node topology down to given depth
// NOTE: the returned node is a node graph - not a single node
func (n *node) Topology(depth uint) *node {
n.RLock()
defer n.RUnlock()

View File

@ -192,6 +192,46 @@ func TestPeers(t *testing.T) {
}
}
func TestDeletePeerNode(t *testing.T) {
// complicated node graph
node := testSetup()
nodeCount := len(node.Nodes())
// should not find non-existent peer node
if err := node.DeletePeerNode("foobar"); err != ErrPeerNotFound {
t.Errorf("Expected: %v, got: %v", ErrPeerNotFound, err)
}
// lets pick one of the peer1 peers
if err := node.DeletePeerNode(testPeerOfPeerIds[0]); err != nil {
t.Errorf("Error deleting peer node: %v", err)
}
nodeDelCount := len(node.Nodes())
if nodeDelCount != nodeCount-1 {
t.Errorf("Expected node count: %d, got: %d", nodeCount-1, nodeDelCount)
}
}
func TestPruneStalePeerNodes(t *testing.T) {
// complicated node graph
node := testSetup()
nodes := node.Nodes()
pruneTime := 10 * time.Millisecond
time.Sleep(pruneTime)
// should delete all nodes besides node
pruned := node.PruneStalePeerNodes(pruneTime)
if len(pruned) != len(nodes)-1 {
t.Errorf("Expected pruned node count: %d, got: %d", len(nodes)-1, len(pruned))
}
}
func TestUnpackPeerTopology(t *testing.T) {
pbPeer := &pb.Peer{
Node: &pb.Node{