Update neighbours when neighbour message is received

This commit is contained in:
Milos Gajdos 2019-09-09 23:34:27 +01:00
parent f0a1031e97
commit eec780aaa7
No known key found for this signature in database
GPG Key ID: 8B31058CC55DFD4F

View File

@ -131,21 +131,52 @@ func (n *node) getNeighbours(pbNeighbours *pbNet.Neighbour, depth int) error {
return nil return nil
} }
// updateNeighbour updates node neighbour up to given depth // unpackNeighbour unpacks pbNet.Neighbour into node of given depth
func (n *node) updateNeighbour(neighbour *pbNet.Neighbour, depth int) error { func unpackNeighbour(pbNeighbour *pbNet.Neighbour, depth int) (*node, error) {
if neighbour == nil { if pbNeighbour == nil {
return errors.New("neighbour not initialized") return nil, errors.New("neighbour not initialized")
}
neighbourNode := &node{
id: pbNeighbour.Node.Id,
address: pbNeighbour.Node.Address,
neighbours: make(map[string]*node),
} }
// return if have either reached the depth or have no more neighbours // return if have either reached the depth or have no more neighbours
if depth == 0 { if depth == 0 || len(pbNeighbour.Neighbours) == 0 {
return nil return neighbourNode, nil
} }
// decrement the depth // decrement the depth
depth-- depth--
// TODO: implement this neighbours := make(map[string]*node)
for _, pbNode := range pbNeighbour.Neighbours {
node, err := unpackNeighbour(pbNode, depth)
if err != nil {
return nil, err
}
neighbours[pbNode.Node.Id] = node
}
neighbourNode.neighbours = neighbours
return neighbourNode, nil
}
// updateNeighbour updates node neighbour up to given depth
func (n *node) updateNeighbour(neighbour *pbNet.Neighbour, depth int) error {
// unpack neighbour into topology of size MaxDepth-1
// NOTE: we need MaxDepth-1 because node n is the parent adding which
// gives us the max neighbour topology we maintain and propagate
node, err := unpackNeighbour(neighbour, MaxDepth-1)
if err != nil {
return err
}
// update node neighbours with new topology
n.neighbours[neighbour.Node.Id] = node
return nil return nil
} }
@ -423,31 +454,24 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen
neighbours: make(map[string]*node), neighbours: make(map[string]*node),
lastSeen: now, lastSeen: now,
} }
} n.Unlock()
// update lastSeen timestamp // send a solicit message when discovering new neighbour
if n.neighbours[pbNetNeighbour.Node.Id].lastSeen.Before(now) { // NOTE: we need to release the Lock here as sendMsg locsk, too
n.neighbours[pbNetNeighbour.Node.Id].lastSeen = now
}
// update/store the node neighbour neighbours up to TopologyDepth
// NOTE: * we do NOT update lastSeen time for the neighbours of the neighbour
// * even though we are NOT interested in neighbours over TopologyDepth
// we still allocate the map of neighbours for each of them
//for _, pbNeighbour := range pbNetNeighbour.Neighbours {
// neighbourNode := &node{
// id: pbNeighbour.Node.Id,
// address: pbNeighbour.Node.Address,
// neighbours: make(map[string]*node),
// }
// n.neighbours[pbNetNeighbour.Node.Id].neighbours[pbNeighbour.Node.Id] = neighbourNode
//}
n.Unlock()
// send a solicit message when discovering a new node
// NOTE: we need to send the solicit message here after the Lock is released as sendMsg locks, too
if !exists {
if err := n.sendMsg("solicit", ControlChannel); err != nil { if err := n.sendMsg("solicit", ControlChannel); err != nil {
log.Debugf("Network failed to send solicit message: %s", err) log.Debugf("Network failed to send solicit message: %s", err)
} }
continue
} }
// update/store the node neighbour neighbours up to (MaxDepth-1) topology depth
// NOTE: we don't update max topology depth as we dont include this network node
if err := n.node.updateNeighbour(pbNetNeighbour, MaxDepth-1); err != nil {
log.Debugf("Network failed to update neighbours")
}
// update lastSeen timestamp if outdated
if n.neighbours[pbNetNeighbour.Node.Id].lastSeen.Before(now) {
n.neighbours[pbNetNeighbour.Node.Id].lastSeen = now
}
n.Unlock()
case "close": case "close":
pbNetClose := &pbNet.Close{} pbNetClose := &pbNet.Close{}
if err := proto.Unmarshal(m.Body, pbNetClose); err != nil { if err := proto.Unmarshal(m.Body, pbNetClose); err != nil {
@ -1031,32 +1055,26 @@ func (n *network) close() error {
// Close closes network connection // Close closes network connection
func (n *network) Close() error { func (n *network) Close() error {
// lock this operation
n.Lock() n.Lock()
defer n.Unlock()
if !n.connected { if !n.connected {
n.Unlock()
return nil return nil
} }
select { select {
case <-n.closed: case <-n.closed:
n.Unlock()
return nil return nil
default: default:
// TODO: send close message to the network channel
close(n.closed)
// set connected to false
n.connected = false
// unlock the lock otherwise we'll deadlock sending the close
n.Unlock()
// send close message only if we managed to connect to NetworkChannel // send close message only if we managed to connect to NetworkChannel
log.Debugf("Sending close message from: %s", n.options.Id) log.Debugf("Sending close message from: %s", n.options.Id)
if err := n.sendMsg("close", NetworkChannel); err != nil { if err := n.sendMsg("close", NetworkChannel); err != nil {
log.Debugf("Network failed to send close message: %s", err) log.Debugf("Network failed to send close message: %s", err)
} }
// TODO: send close message to the network channel
close(n.closed)
// set connected to false
n.connected = false
} }
return n.close() return n.close()