Adding new peers up to given depth. Outline of node gaph Update
This commit is contained in:
@@ -27,6 +27,8 @@ var (
|
||||
ControlChannel = "control"
|
||||
// DefaultLink is default network link
|
||||
DefaultLink = "network"
|
||||
// MaxDepth defines max depth of neighbourhood topology
|
||||
MaxDepth = 3
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -85,6 +87,69 @@ func (n *node) Neighbourhood() []Node {
|
||||
return nodes
|
||||
}
|
||||
|
||||
// getNeighbours collects node neighbours up to given depth into pbNeighbours
|
||||
// NOTE: this method is not thread safe, so make sure you serialize access to it
|
||||
// NOTE: we should be able to read-Lock this, even though it's recursive
|
||||
// TODO: we should rework this so it returns pbNeighbours along with error
|
||||
func (n *node) getNeighbours(pbNeighbours *pbNet.Neighbour, depth int) error {
|
||||
if pbNeighbours == nil {
|
||||
return errors.New("neighbours not initialized")
|
||||
}
|
||||
|
||||
// return if have either reached the depth or have no more neighbours
|
||||
if depth == 0 || len(n.neighbours) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// decrement the depth
|
||||
depth--
|
||||
|
||||
var neighbours []*pbNet.Neighbour
|
||||
for _, neighbour := range n.neighbours {
|
||||
// node
|
||||
node := &pbNet.Node{
|
||||
Id: neighbour.id,
|
||||
Address: neighbour.address,
|
||||
}
|
||||
// create new neighbour
|
||||
pbNodeNeighbour := &pbNet.Neighbour{
|
||||
Node: node,
|
||||
Neighbours: make([]*pbNet.Neighbour, 0),
|
||||
}
|
||||
// get neighbours of the neighbour
|
||||
// NOTE: this is [not] a recursive call
|
||||
if err := neighbour.getNeighbours(pbNodeNeighbour, depth); err != nil {
|
||||
return err
|
||||
}
|
||||
// add current neighbour to explored neighbours
|
||||
neighbours = append(neighbours, pbNodeNeighbour)
|
||||
}
|
||||
|
||||
// add neighbours to the parent topology
|
||||
pbNeighbours.Neighbours = neighbours
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateNeighbour updates node neighbour up to given depth
|
||||
func (n *node) updateNeighbour(neighbour *pbNet.Neighbour, depth int) error {
|
||||
if neighbour == nil {
|
||||
return errors.New("neighbour not initialized")
|
||||
}
|
||||
|
||||
// return if have either reached the depth or have no more neighbours
|
||||
if depth == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// decrement the depth
|
||||
depth--
|
||||
|
||||
// TODO: implement this
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// network implements Network interface
|
||||
type network struct {
|
||||
// node is network node
|
||||
@@ -363,18 +428,18 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen
|
||||
if n.neighbours[pbNetNeighbour.Node.Id].lastSeen.Before(now) {
|
||||
n.neighbours[pbNetNeighbour.Node.Id].lastSeen = now
|
||||
}
|
||||
// update/store the neighbour node neighbours
|
||||
// update/store the node neighbour neighbours up to TopologyDepth
|
||||
// NOTE: * we do NOT update lastSeen time for the neighbours of the neighbour
|
||||
// * even though we are NOT interested in neighbours of neighbours here
|
||||
// * even though we are NOT interested in neighbours over TopologyDepth
|
||||
// we still allocate the map of neighbours for each of them
|
||||
for _, pbNeighbour := range pbNetNeighbour.Neighbours {
|
||||
neighbourNode := &node{
|
||||
id: pbNeighbour.Id,
|
||||
address: pbNeighbour.Address,
|
||||
neighbours: make(map[string]*node),
|
||||
}
|
||||
n.neighbours[pbNetNeighbour.Node.Id].neighbours[pbNeighbour.Id] = neighbourNode
|
||||
}
|
||||
//for _, pbNeighbour := range pbNetNeighbour.Neighbours {
|
||||
// neighbourNode := &node{
|
||||
// id: pbNeighbour.Node.Id,
|
||||
// address: pbNeighbour.Node.Address,
|
||||
// neighbours: make(map[string]*node),
|
||||
// }
|
||||
// n.neighbours[pbNetNeighbour.Node.Id].neighbours[pbNeighbour.Node.Id] = neighbourNode
|
||||
//}
|
||||
n.Unlock()
|
||||
// send a solicit message when discovering a new node
|
||||
// NOTE: we need to send the solicit message here after the Lock is released as sendMsg locks, too
|
||||
@@ -431,20 +496,22 @@ func (n *network) sendMsg(msgType string, channel string) error {
|
||||
}
|
||||
case "neighbour":
|
||||
n.RLock()
|
||||
nodes := make([]*pbNet.Node, len(n.neighbours))
|
||||
i := 0
|
||||
for id := range n.neighbours {
|
||||
nodes[i] = &pbNet.Node{
|
||||
Id: id,
|
||||
Address: n.neighbours[id].address,
|
||||
}
|
||||
i++
|
||||
node := &pbNet.Node{
|
||||
Id: n.node.id,
|
||||
Address: n.node.address,
|
||||
}
|
||||
n.RUnlock()
|
||||
protoMsg = &pbNet.Neighbour{
|
||||
nodeNeighbour := &pbNet.Neighbour{
|
||||
Node: node,
|
||||
Neighbours: nodes,
|
||||
Neighbours: make([]*pbNet.Neighbour, 0),
|
||||
}
|
||||
// get all the neighbours down to MaxNeighbourDepth
|
||||
if err := n.node.getNeighbours(nodeNeighbour, MaxDepth); err != nil {
|
||||
log.Debugf("Network unable to retrieve node neighbours: %s", err)
|
||||
return err
|
||||
}
|
||||
// set protoMsg for serialization
|
||||
protoMsg = nodeNeighbour
|
||||
n.RUnlock()
|
||||
default:
|
||||
return ErrMsgUnknown
|
||||
}
|
||||
@@ -461,9 +528,10 @@ func (n *network) sendMsg(msgType string, channel string) error {
|
||||
Body: body,
|
||||
}
|
||||
|
||||
// check if the channel client is initialized
|
||||
n.RLock()
|
||||
client, ok := n.tunClient[channel]
|
||||
if !ok {
|
||||
if !ok || client == nil {
|
||||
n.RUnlock()
|
||||
return ErrClientNotFound
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user