diff --git a/network/default.go b/network/default.go index b0c77f62..6e91c59c 100644 --- a/network/default.go +++ b/network/default.go @@ -36,8 +36,10 @@ type node struct { address string // neighbours maps the node neighbourhood neighbours map[string]*node - // network returns network node is in + // network returns the node network network Network + // lastSeen stores the time the node has been seen last time + lastSeen time.Time } // Id is node ide @@ -307,6 +309,7 @@ func (n *network) processNetChan(l tunnel.Listener) { id: pbNetNeighbour.Node.Id, address: pbNetNeighbour.Node.Address, neighbours: make(map[string]*node), + lastSeen: time.Now(), } n.Lock() // we override the existing neighbour map @@ -331,7 +334,10 @@ func (n *network) processNetChan(l tunnel.Listener) { continue } n.Lock() - delete(n.neighbours, pbNetClose.Node.Id) + if err := n.pruneNode(pbNetClose.Node.Id); err != nil { + log.Debugf("Network failed to prune the node %s: %v", pbNetClose.Node.Id, err) + continue + } n.Unlock() } case <-n.closed: @@ -393,6 +399,55 @@ func (n *network) announce(client transport.Client) { } } +// pruneNode removes a node with given id from the list of neighbours. It also removes all routes originted by this node. +// NOTE: this method is not thread-safe; when calling it make sure you lock the particular code segment +func (n *network) pruneNode(id string) error { + delete(n.neighbours, id) + // lookup all the routes originated at this node + q := router.NewQuery( + router.QueryRouter(id), + ) + routes, err := n.Router.Table().Query(q) + if err != nil && err != router.ErrRouteNotFound { + return err + } + // delete the found routes + for _, route := range routes { + if err := n.Router.Table().Delete(route); err != nil && err != router.ErrRouteNotFound { + return err + } + } + + return nil +} + +// prune the nodes that have not been seen for certain period of time defined by PruneTime +// Additionally, prune also removes all the routes originated by these nodes +func (n *network) prune() { + prune := time.NewTicker(PruneTime) + defer prune.Stop() + + for { + select { + case <-n.closed: + return + case <-prune.C: + n.Lock() + for id, node := range n.neighbours { + nodeAge := time.Since(node.lastSeen) + if nodeAge > PruneTime { + log.Debugf("Network deleting node %s: reached prune time threshold", id) + if err := n.pruneNode(id); err != nil { + log.Debugf("Network failed to prune the node %s: %v", id, err) + continue + } + } + } + n.Unlock() + } + } +} + // handleCtrlConn handles ControlChannel connections func (n *network) handleCtrlConn(sess tunnel.Session, msg chan *transport.Message) { for { @@ -695,6 +750,8 @@ func (n *network) Connect() error { go n.resolve() // broadcast neighbourhood go n.announce(netClient) + // prune stale nodes + go n.prune() // listen to network messages go n.processNetChan(netListener) // advertise service routes diff --git a/network/network.go b/network/network.go index f68cc147..5a9b7f6a 100644 --- a/network/network.go +++ b/network/network.go @@ -17,6 +17,9 @@ var ( ResolveTime = 1 * time.Minute // AnnounceTime defines time interval to periodically announce node neighbours AnnounceTime = 30 * time.Second + // PruneTime defines time interval to periodically check nodes that need to be pruned + // due to their not announcing their presence within this time interval + PruneTime = 90 * time.Second ) // Node is network node