diff --git a/network/default.go b/network/default.go index 18baab4b..e8489f34 100644 --- a/network/default.go +++ b/network/default.go @@ -170,7 +170,7 @@ func newNetwork(opts ...Option) Network { tunClient: make(map[string]transport.Client), peerLinks: make(map[string]tunnel.Link), discovered: make(chan bool, 1), - solicited: make(chan *node, 1), + solicited: make(chan *node, 32), } network.node.network = network @@ -667,7 +667,7 @@ func (n *network) processCtrlChan(listener tunnel.Listener) { log.Debugf("Network failed to process advert %s: %v", advert.Id, err) } case "solicit": - pbRtrSolicit := &pbRtr.Solicit{} + pbRtrSolicit := new(pbRtr.Solicit) if err := proto.Unmarshal(m.msg.Body, pbRtrSolicit); err != nil { log.Debugf("Network fail to unmarshal solicit message: %v", err) continue @@ -768,14 +768,19 @@ func (n *network) processNetChan(listener tunnel.Listener) { msg := PeersToProto(n.node, MaxDepth) go func() { - // advertise yourself to the network + // advertise yourself to the new node if err := n.sendTo("peer", NetworkChannel, peer, msg); err != nil { log.Debugf("Network failed to advertise peers: %v", err) } <-time.After(time.Millisecond * 100) - // specify that we're soliciting + // ask for the new nodes routes + if err := n.sendTo("solicit", ControlChannel, peer, msg); err != nil { + log.Debugf("Network failed to send solicit message: %s", err) + } + + // now advertise our own routes select { case n.solicited <- peer: default: @@ -786,7 +791,6 @@ func (n *network) processNetChan(listener tunnel.Listener) { if err := n.router.Solicit(); err != nil { log.Debugf("Network failed to solicit routes: %s", err) } - }() case "peer": // mark the time the message has been received @@ -838,6 +842,18 @@ func (n *network) processNetChan(listener tunnel.Listener) { if err := n.sendTo("solicit", ControlChannel, peer, msg); err != nil { log.Debugf("Network failed to send solicit message: %s", err) } + + // now advertise our own routes + select { + case n.solicited <- peer: + default: + // don't block + } + + // advertise all the routes when a new node has connected + if err := n.router.Solicit(); err != nil { + log.Debugf("Network failed to solicit routes: %s", err) + } }() continue @@ -850,12 +866,15 @@ func (n *network) processNetChan(listener tunnel.Listener) { log.Tracef("Network peer exists, refreshing: %s", pbNetPeer.Node.Id) // update lastSeen time for the peer - if err := n.RefreshPeer(pbNetPeer.Node.Id, peer.link, now); err != nil { + if err := n.RefreshPeer(peer.id, peer.link, now); err != nil { log.Debugf("Network failed refreshing peer %s: %v", pbNetPeer.Node.Id, err) } // NOTE: we don't unpack MaxDepth toplogy peer = UnpackPeerTopology(pbNetPeer, now, MaxDepth-1) + // update the link + peer.link = m.msg.Header["Micro-Link"] + log.Tracef("Network updating topology of node: %s", n.node.id) if err := n.node.UpdatePeer(peer); err != nil { log.Debugf("Network failed to update peers: %v", err) @@ -954,22 +973,109 @@ func (n *network) manage() { resolve := time.NewTicker(ResolveTime) defer resolve.Stop() + // list of links we've sent to + links := make(map[string]time.Time) + for { select { case <-n.closed: return case <-announce.C: - // jitter - j := rand.Int63n(int64(AnnounceTime.Seconds() / 2.0)) - time.Sleep(time.Duration(j) * time.Second) + current := make(map[string]time.Time) - // TODO: intermittently flip between peer selection - // and full broadcast pick a random set of peers + // build link map of current links + for _, link := range n.tunnel.Links() { + if n.isLoopback(link) { + continue + } + // get an existing timestamp if it exists + current[link.Id()] = links[link.Id()] + } + // replace link map + // we do this because a growing map is not + // garbage collected + links = current + + n.RLock() + var i int + // create a list of peers to send to + var peers []*node + + // check peers to see if they need to be sent to + for _, peer := range n.peers { + if i >= 3 { + break + } + + // get last sent + lastSent := links[peer.link] + + // check when we last sent to the peer + // and send a peer message if we havent + if lastSent.IsZero() || time.Since(lastSent) > KeepAliveTime { + link := peer.link + id := peer.id + + // might not exist for some weird reason + if len(link) == 0 { + // set the link via peer links + l, ok := n.peerLinks[peer.address] + if ok { + log.Debugf("Network link not found for peer %s cannot announce", peer.id) + continue + } + link = l.Id() + } + + // add to the list of peers we're going to send to + peers = append(peers, &node{ + id: id, + link: link, + }) + + // increment our count + i++ + } + } + + n.RUnlock() + + // peers to proto msg := PeersToProto(n.node, MaxDepth) - // advertise yourself to the network - if err := n.sendMsg("peer", NetworkChannel, msg); err != nil { - log.Debugf("Network failed to advertise peers: %v", err) + + // we're only going to send to max 3 peers at any given tick + for _, peer := range peers { + + // advertise yourself to the network + if err := n.sendTo("peer", NetworkChannel, peer, msg); err != nil { + log.Debugf("Network failed to advertise peer %s: %v", peer.id, err) + continue + } + + // update last sent time + links[peer.link] = time.Now() + } + + // now look at links we may not have sent to. this may occur + // where a connect message was lost + for link, lastSent := range links { + if !lastSent.IsZero() { + continue + } + + peer := &node{ + // unknown id of the peer + link: link, + } + + // unknown link and peer so lets do the connect flow + if err := n.sendTo("connect", NetworkChannel, peer, msg); err != nil { + log.Debugf("Network failed to advertise peer %s: %v", peer.id, err) + continue + } + + links[peer.link] = time.Now() } case <-prune.C: pruned := n.PruneStalePeers(PruneTime) @@ -1052,15 +1158,27 @@ func (n *network) sendTo(method, channel string, peer *node, msg proto.Message) } defer c.Close() - log.Debugf("Network sending %s message from: %s to %s", method, n.options.Id, peer.id) + id := peer.id - return c.Send(&transport.Message{ + if len(id) == 0 { + id = peer.link + } + + log.Debugf("Network sending %s message from: %s to %s", method, n.options.Id, id) + + tmsg := &transport.Message{ Header: map[string]string{ "Micro-Method": method, - "Micro-Peer": peer.id, }, Body: body, - }) + } + + // setting the peer header + if len(peer.id) > 0 { + tmsg.Header["Micro-Peer"] = peer.id + } + + return c.Send(tmsg) } // sendMsg sends a message to the tunnel channel @@ -1128,6 +1246,27 @@ func (n *network) updatePeerLinks(peer *node) error { return nil } +// isLoopback checks if a link is a loopback to ourselves +func (n *network) isLoopback(link tunnel.Link) bool { + // our advertise address + loopback := n.server.Options().Advertise + // actual address + address := n.tunnel.Address() + + // skip loopback + if link.Loopback() { + return true + } + + // if remote is ourselves + switch link.Remote() { + case loopback, address: + return true + } + + return false +} + // connect will wait for a link to be established and send the connect // message. We're trying to ensure convergence pretty quickly. So we want // to hear back. In the case we become completely disconnected we'll @@ -1137,11 +1276,6 @@ func (n *network) connect() { var discovered bool var attempts int - // our advertise address - loopback := n.server.Options().Advertise - // actual address - address := n.tunnel.Address() - for { // connected is used to define if the link is connected var connected bool @@ -1149,13 +1283,7 @@ func (n *network) connect() { // check the links state for _, link := range n.tunnel.Links() { // skip loopback - if link.Loopback() { - continue - } - - // if remote is ourselves - switch link.Remote() { - case loopback, address: + if n.isLoopback(link) { continue } @@ -1239,7 +1367,6 @@ func (n *network) Connect() error { netListener, err := n.tunnel.Listen( NetworkChannel, tunnel.ListenMode(tunnel.Multicast), - tunnel.ListenTimeout(AnnounceTime*2), ) if err != nil { return err @@ -1249,7 +1376,6 @@ func (n *network) Connect() error { ctrlListener, err := n.tunnel.Listen( ControlChannel, tunnel.ListenMode(tunnel.Multicast), - tunnel.ListenTimeout(router.AdvertiseTableTick*2), ) if err != nil { return err diff --git a/network/network.go b/network/network.go index e927241b..571deee1 100644 --- a/network/network.go +++ b/network/network.go @@ -16,7 +16,9 @@ var ( // ResolveTime defines time interval to periodically resolve network nodes ResolveTime = 1 * time.Minute // AnnounceTime defines time interval to periodically announce node neighbours - AnnounceTime = 30 * time.Second + AnnounceTime = 1 * time.Second + // KeepAliveTime is the time in which we want to have sent a message to a peer + KeepAliveTime = 30 * time.Second // PruneTime defines time interval to periodically check nodes that need to be pruned // due to their not announcing their presence within this time interval PruneTime = 90 * time.Second diff --git a/network/node.go b/network/node.go index 658c0964..0d79ffc5 100644 --- a/network/node.go +++ b/network/node.go @@ -140,10 +140,8 @@ func (n *node) RefreshPeer(id, link string, now time.Time) error { // set peer link peer.link = link - - if peer.lastSeen.Before(now) { - peer.lastSeen = now - } + // set last seen + peer.lastSeen = now return nil } diff --git a/tunnel/default.go b/tunnel/default.go index 41e13c5f..89de3dfb 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -484,6 +484,11 @@ func (t *tun) sendTo(links []*link, msg *message) error { // error channel for call errChan := make(chan error, len(links)) + // execute in parallel + sendTo := func(l *link, m *transport.Message, errChan chan error) { + errChan <- send(l, m) + } + // send the message for _, link := range links { // send the message via the current link @@ -501,10 +506,7 @@ func (t *tun) sendTo(links []*link, msg *message) error { m.Header[k] = v } - // execute in parallel - go func() { - errChan <- send(link, m) - }() + go sendTo(link, m, errChan) continue }