From a72a2f717de8e17be9a17a27ad5ce38fdaacd167 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Mon, 16 Sep 2019 19:22:55 +0100 Subject: [PATCH] Prune stale nodes in the whole topology. --- network/default.go | 53 ++++------- network/node.go | 207 +++++++++++++++++++++++++++---------------- network/node_test.go | 40 +++++++++ 3 files changed, 188 insertions(+), 112 deletions(-) diff --git a/network/default.go b/network/default.go index 35d72184..0a0faf90 100644 --- a/network/default.go +++ b/network/default.go @@ -262,11 +262,11 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen peers: make(map[string]*node), lastSeen: now, } - if ok := n.node.AddPeer(peer); !ok { + if err := n.node.AddPeer(peer); err == ErrPeerExists { log.Debugf("Network peer exists, refreshing: %s", peer.id) // update lastSeen time for the existing node - if ok := n.RefreshPeer(peer.id, now); !ok { - log.Debugf("Network failed refreshing peer: %s", peer.id) + if err := n.RefreshPeer(peer.id, now); err != nil { + log.Debugf("Network failed refreshing peer %s: %v", peer.id, err) } continue } @@ -299,7 +299,7 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen peers: make(map[string]*node), lastSeen: now, } - if ok := n.node.AddPeer(peer); ok { + if err := n.node.AddPeer(peer); err == ErrPeerExists { // send a solicit message when discovering new peer msg := &pbRtr.Solicit{ Id: n.options.Id, @@ -311,14 +311,14 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen } log.Debugf("Network peer exists, refreshing: %s", pbNetPeer.Node.Id) // update lastSeen time for the peer - if ok := n.RefreshPeer(pbNetPeer.Node.Id, now); !ok { - log.Debugf("Network failed refreshing peer: %s", pbNetPeer.Node.Id) + if err := n.RefreshPeer(pbNetPeer.Node.Id, now); err != nil { + log.Debugf("Network failed refreshing peer %s: %v", pbNetPeer.Node.Id, err) } // NOTE: we don't unpack MaxDepth toplogy peer = UnpackPeerTopology(pbNetPeer, now, MaxDepth-1) log.Debugf("Network updating topology of node: %s", n.node.id) - if ok := n.node.UpdatePeer(peer); !ok { - log.Debugf("Network failed to update peers") + if err := n.node.UpdatePeer(peer); err != nil { + log.Debugf("Network failed to update peers: %v", err) } case "close": pbNetClose := &pbNet.Close{} @@ -331,15 +331,16 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen continue } log.Debugf("Network received close message from: %s", pbNetClose.Node.Id) - n.node.Lock() peer := &node{ id: pbNetClose.Node.Id, address: pbNetClose.Node.Address, } - if err := n.prunePeer(peer); err != nil { - log.Debugf("Network failed to prune node %s routes: %v", peer.id, err) + if err := n.DeletePeerNode(peer.id); err != nil { + log.Debugf("Network failed to delete node %s routes: %v", peer.id, err) + } + if err := n.prunePeerRoutes(peer); err != nil { + log.Debugf("Network failed pruning peer %s routes: %v", peer.id, err) } - n.node.Unlock() } case <-n.closed: return @@ -421,7 +422,6 @@ func (n *network) prunePeerRoutes(peer *node) error { router.QueryRouter(peer.id), ) if err := n.pruneRoutes(q); err != nil { - log.Debugf("Network failed deleting routes originated by %s: %s", peer.id, err) return err } @@ -430,23 +430,12 @@ func (n *network) prunePeerRoutes(peer *node) error { router.QueryGateway(peer.address), ) if err := n.pruneRoutes(q); err != nil { - log.Debugf("Network failed deleting routes routable via gateway %s: %s", peer.address, err) return err } return nil } -// prunePeer prune peer from network node as well as all all the routes associated with it -func (n *network) prunePeer(peer *node) error { - delete(n.node.peers, peer.id) - if err := n.prunePeerRoutes(peer); err != nil { - log.Debugf("Network failed to prune %s routes: %v", peer.id, err) - return err - } - return nil -} - // prune deltes node peers that have not been seen for longer than PruneTime seconds // prune also removes all the routes either originated by or routable by the stale nodes func (n *network) prune() { @@ -458,19 +447,13 @@ func (n *network) prune() { case <-n.closed: return case <-prune.C: - n.node.Lock() - for id, peer := range n.peers { - if id == n.options.Id { - continue - } - if time.Since(peer.lastSeen) > PruneTime { - log.Debugf("Network peer exceeded prune time: %s", id) - if err := n.prunePeer(peer); err != nil { - log.Debugf("Network failed to prune %s: %s", id, err) - } + pruned := n.PruneStalePeerNodes(PruneTime) + for id, peer := range pruned { + log.Debugf("Network peer exceeded prune time: %s", id) + if err := n.prunePeerRoutes(peer); err != nil { + log.Debugf("Network failed pruning peer %s routes: %v", id, err) } } - n.node.Unlock() } } } diff --git a/network/node.go b/network/node.go index 86cc83fb..93bffa41 100644 --- a/network/node.go +++ b/network/node.go @@ -2,6 +2,7 @@ package network import ( "container/list" + "errors" "sync" "time" @@ -13,6 +14,13 @@ var ( MaxDepth uint = 3 ) +var ( + // ErrPeerExists is returned when adding a peer which already exists + ErrPeerExists = errors.New("peer already exists") + // ErrPeerNotFound is returned when a peer could not be found in node topology + ErrPeerNotFound = errors.New("peer not found") +) + // node is network node type node struct { sync.RWMutex @@ -22,6 +30,8 @@ type node struct { address string // peers are nodes with direct link to this node peers map[string]*node + // edges store the node edges + edges map[string]map[string]*node // network returns the node network network Network // lastSeen keeps track of node lifetime and updates @@ -43,74 +53,8 @@ func (n *node) Network() Network { return n.network } -// AddPeer adds a new peer to node topology -// It returns false if the peer already exists -func (n *node) AddPeer(peer *node) bool { - n.Lock() - defer n.Unlock() - - if _, ok := n.peers[peer.id]; !ok { - n.peers[peer.id] = peer - return true - } - - return false -} - -// UpdatePeer updates a peer if it exists -// It returns false if the peer does not exist -func (n *node) UpdatePeer(peer *node) bool { - n.Lock() - defer n.Unlock() - - if _, ok := n.peers[peer.id]; ok { - n.peers[peer.id] = peer - return true - } - - return false -} - -// DeletePeer deletes a peer from node peers -// It returns true if the peers has been deleted -func (n *node) DeletePeer(id string) bool { - n.Lock() - defer n.Unlock() - - delete(n.peers, id) - - return true -} - -// HasPeer returns true if node has peer with given id -func (n *node) HasPeer(id string) bool { - n.RLock() - defer n.RUnlock() - - _, ok := n.peers[id] - return ok -} - -// RefreshPeer updates node timestamp -// It returns false if the peer has not been found. -func (n *node) RefreshPeer(id string, now time.Time) bool { - n.Lock() - defer n.Unlock() - - peer, ok := n.peers[id] - if !ok { - return false - } - - if peer.lastSeen.Before(now) { - peer.lastSeen = now - } - - return true -} - // walk walks the node graph until some condition is met -func (n *node) walk(until func(peer *node) bool) map[string]*node { +func (n *node) walk(until func(peer *node) bool, action func(parent, peer *node)) map[string]*node { // track the visited nodes visited := make(map[string]*node) // queue of the nodes to visit @@ -125,15 +69,16 @@ func (n *node) walk(until func(peer *node) bool) map[string]*node { for queue.Len() > 0 { // pop the node from the front of the queue qnode := queue.Front() + if until(qnode.Value.(*node)) { + return visited + } // iterate through all of the node peers // mark the visited nodes; enqueue the non-visted - for id, node := range qnode.Value.(*node).peers { + for id, peer := range qnode.Value.(*node).peers { if _, ok := visited[id]; !ok { - visited[id] = node - queue.PushBack(node) - } - if until(node) { - return visited + visited[id] = peer + action(qnode.Value.(*node), peer) + queue.PushBack(peer) } } // remove the node from the queue @@ -143,6 +88,63 @@ func (n *node) walk(until func(peer *node) bool) map[string]*node { return visited } +// AddPeer adds a new peer to node topology +// It returns false if the peer already exists +func (n *node) AddPeer(peer *node) error { + n.Lock() + defer n.Unlock() + + if _, ok := n.peers[peer.id]; !ok { + n.peers[peer.id] = peer + return nil + } + + return ErrPeerExists +} + +// DeletePeer deletes a peer from node peers +// It returns true if the peer has been deleted +func (n *node) DeletePeer(id string) bool { + n.Lock() + defer n.Unlock() + + delete(n.peers, id) + + return true +} + +// UpdatePeer updates a peer if it already exists +// It returns error if the peer does not exist +func (n *node) UpdatePeer(peer *node) error { + n.Lock() + defer n.Unlock() + + if _, ok := n.peers[peer.id]; ok { + n.peers[peer.id] = peer + return nil + } + + return ErrPeerNotFound +} + +// RefreshPeer updates node timestamp +// It returns false if the peer has not been found. +func (n *node) RefreshPeer(id string, now time.Time) error { + n.Lock() + defer n.Unlock() + + peer, ok := n.peers[id] + if !ok { + return ErrPeerNotFound + } + + if peer.lastSeen.Before(now) { + peer.lastSeen = now + } + + return nil +} + // Nodes returns a slice of all nodes in the whole node topology func (n *node) Nodes() []Node { // we need to freeze the network graph here @@ -151,11 +153,12 @@ func (n *node) Nodes() []Node { defer n.RUnlock() // NOTE: this should never be true - untilNoMorePeers := func(n *node) bool { - return n == nil + untilNoMorePeers := func(node *node) bool { + return node == nil } + justWalk := func(parent, node *node) {} - visited := n.walk(untilNoMorePeers) + visited := n.walk(untilNoMorePeers, justWalk) var nodes []Node // collect all the nodes and return them @@ -178,8 +181,9 @@ func (n *node) GetPeerNode(id string) *node { untilFoundPeer := func(n *node) bool { return n.id == id } + justWalk := func(paent, node *node) {} - visited := top.walk(untilFoundPeer) + visited := top.walk(untilFoundPeer, justWalk) peerNode, ok := visited[id] if !ok { @@ -189,6 +193,55 @@ func (n *node) GetPeerNode(id string) *node { return peerNode } +// DeletePeerNode removes peer node from node topology +func (n *node) DeletePeerNode(id string) error { + n.Lock() + n.Unlock() + + untilNoMorePeers := func(node *node) bool { + return node == nil + } + + deleted := make(map[string]*node) + deletePeer := func(parent, node *node) { + if node.id != n.id && node.id == id { + delete(parent.peers, node.id) + deleted[node.id] = node + } + } + + n.walk(untilNoMorePeers, deletePeer) + + if _, ok := deleted[id]; !ok { + return ErrPeerNotFound + } + + return nil +} + +// PruneStalePeerNodes prune the peers that have not been seen for longer than given time +// It returns a map of the the nodes that got pruned +func (n *node) PruneStalePeerNodes(pruneTime time.Duration) map[string]*node { + n.Lock() + n.Unlock() + + untilNoMorePeers := func(node *node) bool { + return node == nil + } + + pruned := make(map[string]*node) + pruneStalePeer := func(parent, node *node) { + if node.id != n.id && time.Since(node.lastSeen) > PruneTime { + delete(parent.peers, node.id) + pruned[node.id] = node + } + } + + n.walk(untilNoMorePeers, pruneStalePeer) + + return pruned +} + // Topology returns a copy of the node topology down to given depth // NOTE: the returned node is a node graph - not a single node func (n *node) Topology(depth uint) *node { diff --git a/network/node_test.go b/network/node_test.go index 10bc17bc..80a41c70 100644 --- a/network/node_test.go +++ b/network/node_test.go @@ -192,6 +192,46 @@ func TestPeers(t *testing.T) { } } +func TestDeletePeerNode(t *testing.T) { + // complicated node graph + node := testSetup() + + nodeCount := len(node.Nodes()) + + // should not find non-existent peer node + if err := node.DeletePeerNode("foobar"); err != ErrPeerNotFound { + t.Errorf("Expected: %v, got: %v", ErrPeerNotFound, err) + } + + // lets pick one of the peer1 peers + if err := node.DeletePeerNode(testPeerOfPeerIds[0]); err != nil { + t.Errorf("Error deleting peer node: %v", err) + } + + nodeDelCount := len(node.Nodes()) + + if nodeDelCount != nodeCount-1 { + t.Errorf("Expected node count: %d, got: %d", nodeCount-1, nodeDelCount) + } +} + +func TestPruneStalePeerNodes(t *testing.T) { + // complicated node graph + node := testSetup() + + nodes := node.Nodes() + + pruneTime := 10 * time.Millisecond + time.Sleep(pruneTime) + + // should delete all nodes besides node + pruned := node.PruneStalePeerNodes(pruneTime) + + if len(pruned) != len(nodes)-1 { + t.Errorf("Expected pruned node count: %d, got: %d", len(nodes)-1, len(pruned)) + } +} + func TestUnpackPeerTopology(t *testing.T) { pbPeer := &pb.Peer{ Node: &pb.Node{