Prune nodes that have not announced themselves for certain time period.
This commit is contained in:
parent
5440325a18
commit
ec6318befc
@ -36,8 +36,10 @@ type node struct {
|
|||||||
address string
|
address string
|
||||||
// neighbours maps the node neighbourhood
|
// neighbours maps the node neighbourhood
|
||||||
neighbours map[string]*node
|
neighbours map[string]*node
|
||||||
// network returns network node is in
|
// network returns the node network
|
||||||
network Network
|
network Network
|
||||||
|
// lastSeen stores the time the node has been seen last time
|
||||||
|
lastSeen time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// Id is node ide
|
// Id is node ide
|
||||||
@ -307,6 +309,7 @@ func (n *network) processNetChan(l tunnel.Listener) {
|
|||||||
id: pbNetNeighbour.Node.Id,
|
id: pbNetNeighbour.Node.Id,
|
||||||
address: pbNetNeighbour.Node.Address,
|
address: pbNetNeighbour.Node.Address,
|
||||||
neighbours: make(map[string]*node),
|
neighbours: make(map[string]*node),
|
||||||
|
lastSeen: time.Now(),
|
||||||
}
|
}
|
||||||
n.Lock()
|
n.Lock()
|
||||||
// we override the existing neighbour map
|
// we override the existing neighbour map
|
||||||
@ -331,7 +334,10 @@ func (n *network) processNetChan(l tunnel.Listener) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
n.Lock()
|
n.Lock()
|
||||||
delete(n.neighbours, pbNetClose.Node.Id)
|
if err := n.pruneNode(pbNetClose.Node.Id); err != nil {
|
||||||
|
log.Debugf("Network failed to prune the node %s: %v", pbNetClose.Node.Id, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
n.Unlock()
|
n.Unlock()
|
||||||
}
|
}
|
||||||
case <-n.closed:
|
case <-n.closed:
|
||||||
@ -393,6 +399,55 @@ func (n *network) announce(client transport.Client) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// pruneNode removes a node with given id from the list of neighbours. It also removes all routes originted by this node.
|
||||||
|
// NOTE: this method is not thread-safe; when calling it make sure you lock the particular code segment
|
||||||
|
func (n *network) pruneNode(id string) error {
|
||||||
|
delete(n.neighbours, id)
|
||||||
|
// lookup all the routes originated at this node
|
||||||
|
q := router.NewQuery(
|
||||||
|
router.QueryRouter(id),
|
||||||
|
)
|
||||||
|
routes, err := n.Router.Table().Query(q)
|
||||||
|
if err != nil && err != router.ErrRouteNotFound {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// delete the found routes
|
||||||
|
for _, route := range routes {
|
||||||
|
if err := n.Router.Table().Delete(route); err != nil && err != router.ErrRouteNotFound {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// prune the nodes that have not been seen for certain period of time defined by PruneTime
|
||||||
|
// Additionally, prune also removes all the routes originated by these nodes
|
||||||
|
func (n *network) prune() {
|
||||||
|
prune := time.NewTicker(PruneTime)
|
||||||
|
defer prune.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-n.closed:
|
||||||
|
return
|
||||||
|
case <-prune.C:
|
||||||
|
n.Lock()
|
||||||
|
for id, node := range n.neighbours {
|
||||||
|
nodeAge := time.Since(node.lastSeen)
|
||||||
|
if nodeAge > PruneTime {
|
||||||
|
log.Debugf("Network deleting node %s: reached prune time threshold", id)
|
||||||
|
if err := n.pruneNode(id); err != nil {
|
||||||
|
log.Debugf("Network failed to prune the node %s: %v", id, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
n.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// handleCtrlConn handles ControlChannel connections
|
// handleCtrlConn handles ControlChannel connections
|
||||||
func (n *network) handleCtrlConn(sess tunnel.Session, msg chan *transport.Message) {
|
func (n *network) handleCtrlConn(sess tunnel.Session, msg chan *transport.Message) {
|
||||||
for {
|
for {
|
||||||
@ -695,6 +750,8 @@ func (n *network) Connect() error {
|
|||||||
go n.resolve()
|
go n.resolve()
|
||||||
// broadcast neighbourhood
|
// broadcast neighbourhood
|
||||||
go n.announce(netClient)
|
go n.announce(netClient)
|
||||||
|
// prune stale nodes
|
||||||
|
go n.prune()
|
||||||
// listen to network messages
|
// listen to network messages
|
||||||
go n.processNetChan(netListener)
|
go n.processNetChan(netListener)
|
||||||
// advertise service routes
|
// advertise service routes
|
||||||
|
@ -17,6 +17,9 @@ var (
|
|||||||
ResolveTime = 1 * time.Minute
|
ResolveTime = 1 * time.Minute
|
||||||
// AnnounceTime defines time interval to periodically announce node neighbours
|
// AnnounceTime defines time interval to periodically announce node neighbours
|
||||||
AnnounceTime = 30 * time.Second
|
AnnounceTime = 30 * time.Second
|
||||||
|
// PruneTime defines time interval to periodically check nodes that need to be pruned
|
||||||
|
// due to their not announcing their presence within this time interval
|
||||||
|
PruneTime = 90 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
// Node is network node
|
// Node is network node
|
||||||
|
Loading…
Reference in New Issue
Block a user