final updates

This commit is contained in:
Asim Aslam 2019-12-13 15:27:47 +00:00
parent caa74d1b5f
commit b0b6b8fce2
4 changed files with 169 additions and 41 deletions

View File

@ -170,7 +170,7 @@ func newNetwork(opts ...Option) Network {
tunClient: make(map[string]transport.Client), tunClient: make(map[string]transport.Client),
peerLinks: make(map[string]tunnel.Link), peerLinks: make(map[string]tunnel.Link),
discovered: make(chan bool, 1), discovered: make(chan bool, 1),
solicited: make(chan *node, 1), solicited: make(chan *node, 32),
} }
network.node.network = network network.node.network = network
@ -667,7 +667,7 @@ func (n *network) processCtrlChan(listener tunnel.Listener) {
log.Debugf("Network failed to process advert %s: %v", advert.Id, err) log.Debugf("Network failed to process advert %s: %v", advert.Id, err)
} }
case "solicit": case "solicit":
pbRtrSolicit := &pbRtr.Solicit{} pbRtrSolicit := new(pbRtr.Solicit)
if err := proto.Unmarshal(m.msg.Body, pbRtrSolicit); err != nil { if err := proto.Unmarshal(m.msg.Body, pbRtrSolicit); err != nil {
log.Debugf("Network fail to unmarshal solicit message: %v", err) log.Debugf("Network fail to unmarshal solicit message: %v", err)
continue continue
@ -768,14 +768,19 @@ func (n *network) processNetChan(listener tunnel.Listener) {
msg := PeersToProto(n.node, MaxDepth) msg := PeersToProto(n.node, MaxDepth)
go func() { go func() {
// advertise yourself to the network // advertise yourself to the new node
if err := n.sendTo("peer", NetworkChannel, peer, msg); err != nil { if err := n.sendTo("peer", NetworkChannel, peer, msg); err != nil {
log.Debugf("Network failed to advertise peers: %v", err) log.Debugf("Network failed to advertise peers: %v", err)
} }
<-time.After(time.Millisecond * 100) <-time.After(time.Millisecond * 100)
// specify that we're soliciting // ask for the new nodes routes
if err := n.sendTo("solicit", ControlChannel, peer, msg); err != nil {
log.Debugf("Network failed to send solicit message: %s", err)
}
// now advertise our own routes
select { select {
case n.solicited <- peer: case n.solicited <- peer:
default: default:
@ -786,7 +791,6 @@ func (n *network) processNetChan(listener tunnel.Listener) {
if err := n.router.Solicit(); err != nil { if err := n.router.Solicit(); err != nil {
log.Debugf("Network failed to solicit routes: %s", err) log.Debugf("Network failed to solicit routes: %s", err)
} }
}() }()
case "peer": case "peer":
// mark the time the message has been received // mark the time the message has been received
@ -838,6 +842,18 @@ func (n *network) processNetChan(listener tunnel.Listener) {
if err := n.sendTo("solicit", ControlChannel, peer, msg); err != nil { if err := n.sendTo("solicit", ControlChannel, peer, msg); err != nil {
log.Debugf("Network failed to send solicit message: %s", err) log.Debugf("Network failed to send solicit message: %s", err)
} }
// now advertise our own routes
select {
case n.solicited <- peer:
default:
// don't block
}
// advertise all the routes when a new node has connected
if err := n.router.Solicit(); err != nil {
log.Debugf("Network failed to solicit routes: %s", err)
}
}() }()
continue continue
@ -850,12 +866,15 @@ func (n *network) processNetChan(listener tunnel.Listener) {
log.Tracef("Network peer exists, refreshing: %s", pbNetPeer.Node.Id) log.Tracef("Network peer exists, refreshing: %s", pbNetPeer.Node.Id)
// update lastSeen time for the peer // update lastSeen time for the peer
if err := n.RefreshPeer(pbNetPeer.Node.Id, peer.link, now); err != nil { if err := n.RefreshPeer(peer.id, peer.link, now); err != nil {
log.Debugf("Network failed refreshing peer %s: %v", pbNetPeer.Node.Id, err) log.Debugf("Network failed refreshing peer %s: %v", pbNetPeer.Node.Id, err)
} }
// NOTE: we don't unpack MaxDepth toplogy // NOTE: we don't unpack MaxDepth toplogy
peer = UnpackPeerTopology(pbNetPeer, now, MaxDepth-1) peer = UnpackPeerTopology(pbNetPeer, now, MaxDepth-1)
// update the link
peer.link = m.msg.Header["Micro-Link"]
log.Tracef("Network updating topology of node: %s", n.node.id) log.Tracef("Network updating topology of node: %s", n.node.id)
if err := n.node.UpdatePeer(peer); err != nil { if err := n.node.UpdatePeer(peer); err != nil {
log.Debugf("Network failed to update peers: %v", err) log.Debugf("Network failed to update peers: %v", err)
@ -954,22 +973,109 @@ func (n *network) manage() {
resolve := time.NewTicker(ResolveTime) resolve := time.NewTicker(ResolveTime)
defer resolve.Stop() defer resolve.Stop()
// list of links we've sent to
links := make(map[string]time.Time)
for { for {
select { select {
case <-n.closed: case <-n.closed:
return return
case <-announce.C: case <-announce.C:
// jitter current := make(map[string]time.Time)
j := rand.Int63n(int64(AnnounceTime.Seconds() / 2.0))
time.Sleep(time.Duration(j) * time.Second)
// TODO: intermittently flip between peer selection // build link map of current links
// and full broadcast pick a random set of peers for _, link := range n.tunnel.Links() {
if n.isLoopback(link) {
continue
}
// get an existing timestamp if it exists
current[link.Id()] = links[link.Id()]
}
// replace link map
// we do this because a growing map is not
// garbage collected
links = current
n.RLock()
var i int
// create a list of peers to send to
var peers []*node
// check peers to see if they need to be sent to
for _, peer := range n.peers {
if i >= 3 {
break
}
// get last sent
lastSent := links[peer.link]
// check when we last sent to the peer
// and send a peer message if we havent
if lastSent.IsZero() || time.Since(lastSent) > KeepAliveTime {
link := peer.link
id := peer.id
// might not exist for some weird reason
if len(link) == 0 {
// set the link via peer links
l, ok := n.peerLinks[peer.address]
if ok {
log.Debugf("Network link not found for peer %s cannot announce", peer.id)
continue
}
link = l.Id()
}
// add to the list of peers we're going to send to
peers = append(peers, &node{
id: id,
link: link,
})
// increment our count
i++
}
}
n.RUnlock()
// peers to proto
msg := PeersToProto(n.node, MaxDepth) msg := PeersToProto(n.node, MaxDepth)
// advertise yourself to the network
if err := n.sendMsg("peer", NetworkChannel, msg); err != nil { // we're only going to send to max 3 peers at any given tick
log.Debugf("Network failed to advertise peers: %v", err) for _, peer := range peers {
// advertise yourself to the network
if err := n.sendTo("peer", NetworkChannel, peer, msg); err != nil {
log.Debugf("Network failed to advertise peer %s: %v", peer.id, err)
continue
}
// update last sent time
links[peer.link] = time.Now()
}
// now look at links we may not have sent to. this may occur
// where a connect message was lost
for link, lastSent := range links {
if !lastSent.IsZero() {
continue
}
peer := &node{
// unknown id of the peer
link: link,
}
// unknown link and peer so lets do the connect flow
if err := n.sendTo("connect", NetworkChannel, peer, msg); err != nil {
log.Debugf("Network failed to advertise peer %s: %v", peer.id, err)
continue
}
links[peer.link] = time.Now()
} }
case <-prune.C: case <-prune.C:
pruned := n.PruneStalePeers(PruneTime) pruned := n.PruneStalePeers(PruneTime)
@ -1052,15 +1158,27 @@ func (n *network) sendTo(method, channel string, peer *node, msg proto.Message)
} }
defer c.Close() defer c.Close()
log.Debugf("Network sending %s message from: %s to %s", method, n.options.Id, peer.id) id := peer.id
return c.Send(&transport.Message{ if len(id) == 0 {
id = peer.link
}
log.Debugf("Network sending %s message from: %s to %s", method, n.options.Id, id)
tmsg := &transport.Message{
Header: map[string]string{ Header: map[string]string{
"Micro-Method": method, "Micro-Method": method,
"Micro-Peer": peer.id,
}, },
Body: body, Body: body,
}) }
// setting the peer header
if len(peer.id) > 0 {
tmsg.Header["Micro-Peer"] = peer.id
}
return c.Send(tmsg)
} }
// sendMsg sends a message to the tunnel channel // sendMsg sends a message to the tunnel channel
@ -1128,6 +1246,27 @@ func (n *network) updatePeerLinks(peer *node) error {
return nil return nil
} }
// isLoopback checks if a link is a loopback to ourselves
func (n *network) isLoopback(link tunnel.Link) bool {
// our advertise address
loopback := n.server.Options().Advertise
// actual address
address := n.tunnel.Address()
// skip loopback
if link.Loopback() {
return true
}
// if remote is ourselves
switch link.Remote() {
case loopback, address:
return true
}
return false
}
// connect will wait for a link to be established and send the connect // connect will wait for a link to be established and send the connect
// message. We're trying to ensure convergence pretty quickly. So we want // message. We're trying to ensure convergence pretty quickly. So we want
// to hear back. In the case we become completely disconnected we'll // to hear back. In the case we become completely disconnected we'll
@ -1137,11 +1276,6 @@ func (n *network) connect() {
var discovered bool var discovered bool
var attempts int var attempts int
// our advertise address
loopback := n.server.Options().Advertise
// actual address
address := n.tunnel.Address()
for { for {
// connected is used to define if the link is connected // connected is used to define if the link is connected
var connected bool var connected bool
@ -1149,13 +1283,7 @@ func (n *network) connect() {
// check the links state // check the links state
for _, link := range n.tunnel.Links() { for _, link := range n.tunnel.Links() {
// skip loopback // skip loopback
if link.Loopback() { if n.isLoopback(link) {
continue
}
// if remote is ourselves
switch link.Remote() {
case loopback, address:
continue continue
} }
@ -1239,7 +1367,6 @@ func (n *network) Connect() error {
netListener, err := n.tunnel.Listen( netListener, err := n.tunnel.Listen(
NetworkChannel, NetworkChannel,
tunnel.ListenMode(tunnel.Multicast), tunnel.ListenMode(tunnel.Multicast),
tunnel.ListenTimeout(AnnounceTime*2),
) )
if err != nil { if err != nil {
return err return err
@ -1249,7 +1376,6 @@ func (n *network) Connect() error {
ctrlListener, err := n.tunnel.Listen( ctrlListener, err := n.tunnel.Listen(
ControlChannel, ControlChannel,
tunnel.ListenMode(tunnel.Multicast), tunnel.ListenMode(tunnel.Multicast),
tunnel.ListenTimeout(router.AdvertiseTableTick*2),
) )
if err != nil { if err != nil {
return err return err

View File

@ -16,7 +16,9 @@ var (
// ResolveTime defines time interval to periodically resolve network nodes // ResolveTime defines time interval to periodically resolve network nodes
ResolveTime = 1 * time.Minute ResolveTime = 1 * time.Minute
// AnnounceTime defines time interval to periodically announce node neighbours // AnnounceTime defines time interval to periodically announce node neighbours
AnnounceTime = 30 * time.Second AnnounceTime = 1 * time.Second
// KeepAliveTime is the time in which we want to have sent a message to a peer
KeepAliveTime = 30 * time.Second
// PruneTime defines time interval to periodically check nodes that need to be pruned // PruneTime defines time interval to periodically check nodes that need to be pruned
// due to their not announcing their presence within this time interval // due to their not announcing their presence within this time interval
PruneTime = 90 * time.Second PruneTime = 90 * time.Second

View File

@ -140,10 +140,8 @@ func (n *node) RefreshPeer(id, link string, now time.Time) error {
// set peer link // set peer link
peer.link = link peer.link = link
// set last seen
if peer.lastSeen.Before(now) { peer.lastSeen = now
peer.lastSeen = now
}
return nil return nil
} }

View File

@ -484,6 +484,11 @@ func (t *tun) sendTo(links []*link, msg *message) error {
// error channel for call // error channel for call
errChan := make(chan error, len(links)) errChan := make(chan error, len(links))
// execute in parallel
sendTo := func(l *link, m *transport.Message, errChan chan error) {
errChan <- send(l, m)
}
// send the message // send the message
for _, link := range links { for _, link := range links {
// send the message via the current link // send the message via the current link
@ -501,10 +506,7 @@ func (t *tun) sendTo(links []*link, msg *message) error {
m.Header[k] = v m.Header[k] = v
} }
// execute in parallel go sendTo(link, m, errChan)
go func() {
errChan <- send(link, m)
}()
continue continue
} }