Merge pull request #726 from milosgajdos83/prune-nodes
Prune nodes that have not announced themselves for certain time period.
This commit is contained in:
		| @@ -36,8 +36,10 @@ type node struct { | |||||||
| 	address string | 	address string | ||||||
| 	// neighbours maps the node neighbourhood | 	// neighbours maps the node neighbourhood | ||||||
| 	neighbours map[string]*node | 	neighbours map[string]*node | ||||||
| 	// network returns network node is in | 	// network returns the node network | ||||||
| 	network Network | 	network Network | ||||||
|  | 	// lastSeen stores the time the node has been seen last time | ||||||
|  | 	lastSeen time.Time | ||||||
| } | } | ||||||
|  |  | ||||||
| // Id is node ide | // Id is node ide | ||||||
| @@ -307,6 +309,7 @@ func (n *network) processNetChan(l tunnel.Listener) { | |||||||
| 					id:         pbNetNeighbour.Node.Id, | 					id:         pbNetNeighbour.Node.Id, | ||||||
| 					address:    pbNetNeighbour.Node.Address, | 					address:    pbNetNeighbour.Node.Address, | ||||||
| 					neighbours: make(map[string]*node), | 					neighbours: make(map[string]*node), | ||||||
|  | 					lastSeen:   time.Now(), | ||||||
| 				} | 				} | ||||||
| 				n.Lock() | 				n.Lock() | ||||||
| 				// we override the existing neighbour map | 				// we override the existing neighbour map | ||||||
| @@ -331,7 +334,10 @@ func (n *network) processNetChan(l tunnel.Listener) { | |||||||
| 					continue | 					continue | ||||||
| 				} | 				} | ||||||
| 				n.Lock() | 				n.Lock() | ||||||
| 				delete(n.neighbours, pbNetClose.Node.Id) | 				if err := n.pruneNode(pbNetClose.Node.Id); err != nil { | ||||||
|  | 					log.Debugf("Network failed to prune the node %s: %v", pbNetClose.Node.Id, err) | ||||||
|  | 					continue | ||||||
|  | 				} | ||||||
| 				n.Unlock() | 				n.Unlock() | ||||||
| 			} | 			} | ||||||
| 		case <-n.closed: | 		case <-n.closed: | ||||||
| @@ -393,6 +399,55 @@ func (n *network) announce(client transport.Client) { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // pruneNode removes a node with given id from the list of neighbours. It also removes all routes originted by this node. | ||||||
|  | // NOTE: this method is not thread-safe; when calling it make sure you lock the particular code segment | ||||||
|  | func (n *network) pruneNode(id string) error { | ||||||
|  | 	delete(n.neighbours, id) | ||||||
|  | 	// lookup all the routes originated at this node | ||||||
|  | 	q := router.NewQuery( | ||||||
|  | 		router.QueryRouter(id), | ||||||
|  | 	) | ||||||
|  | 	routes, err := n.Router.Table().Query(q) | ||||||
|  | 	if err != nil && err != router.ErrRouteNotFound { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	// delete the found routes | ||||||
|  | 	for _, route := range routes { | ||||||
|  | 		if err := n.Router.Table().Delete(route); err != nil && err != router.ErrRouteNotFound { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // prune the nodes that have not been seen for certain period of time defined by PruneTime | ||||||
|  | // Additionally, prune also removes all the routes originated by these nodes | ||||||
|  | func (n *network) prune() { | ||||||
|  | 	prune := time.NewTicker(PruneTime) | ||||||
|  | 	defer prune.Stop() | ||||||
|  |  | ||||||
|  | 	for { | ||||||
|  | 		select { | ||||||
|  | 		case <-n.closed: | ||||||
|  | 			return | ||||||
|  | 		case <-prune.C: | ||||||
|  | 			n.Lock() | ||||||
|  | 			for id, node := range n.neighbours { | ||||||
|  | 				nodeAge := time.Since(node.lastSeen) | ||||||
|  | 				if nodeAge > PruneTime { | ||||||
|  | 					log.Debugf("Network deleting node %s: reached prune time threshold", id) | ||||||
|  | 					if err := n.pruneNode(id); err != nil { | ||||||
|  | 						log.Debugf("Network failed to prune the node %s: %v", id, err) | ||||||
|  | 						continue | ||||||
|  | 					} | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 			n.Unlock() | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
| // handleCtrlConn handles ControlChannel connections | // handleCtrlConn handles ControlChannel connections | ||||||
| func (n *network) handleCtrlConn(sess tunnel.Session, msg chan *transport.Message) { | func (n *network) handleCtrlConn(sess tunnel.Session, msg chan *transport.Message) { | ||||||
| 	for { | 	for { | ||||||
| @@ -695,6 +750,8 @@ func (n *network) Connect() error { | |||||||
| 	go n.resolve() | 	go n.resolve() | ||||||
| 	// broadcast neighbourhood | 	// broadcast neighbourhood | ||||||
| 	go n.announce(netClient) | 	go n.announce(netClient) | ||||||
|  | 	// prune stale nodes | ||||||
|  | 	go n.prune() | ||||||
| 	// listen to network messages | 	// listen to network messages | ||||||
| 	go n.processNetChan(netListener) | 	go n.processNetChan(netListener) | ||||||
| 	// advertise service routes | 	// advertise service routes | ||||||
|   | |||||||
| @@ -17,6 +17,9 @@ var ( | |||||||
| 	ResolveTime = 1 * time.Minute | 	ResolveTime = 1 * time.Minute | ||||||
| 	// AnnounceTime defines time interval to periodically announce node neighbours | 	// AnnounceTime defines time interval to periodically announce node neighbours | ||||||
| 	AnnounceTime = 30 * time.Second | 	AnnounceTime = 30 * time.Second | ||||||
|  | 	// PruneTime defines time interval to periodically check nodes that need to be pruned | ||||||
|  | 	// due to their not announcing their presence within this time interval | ||||||
|  | 	PruneTime = 90 * time.Second | ||||||
| ) | ) | ||||||
|  |  | ||||||
| // Node is network node | // Node is network node | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user