diff --git a/network/default.go b/network/default.go index b037e0aa..cd5d6967 100644 --- a/network/default.go +++ b/network/default.go @@ -202,6 +202,18 @@ func (n *network) Name() string { return n.options.Name } +func (n *network) initNodes() { + nodes, err := n.resolveNodes() + if err != nil { + log.Debugf("Network failed to resolve nodes: %v", err) + return + } + // initialize the tunnel + n.tunnel.Init( + tunnel.Nodes(nodes...), + ) +} + // resolveNodes resolves network nodes to addresses func (n *network) resolveNodes() ([]string, error) { // resolve the network address to network nodes @@ -253,29 +265,6 @@ func (n *network) resolveNodes() ([]string, error) { return nodes, err } -// resolve continuously resolves network nodes and initializes network tunnel with resolved addresses -func (n *network) resolve() { - resolve := time.NewTicker(ResolveTime) - defer resolve.Stop() - - for { - select { - case <-n.closed: - return - case <-resolve.C: - nodes, err := n.resolveNodes() - if err != nil { - log.Debugf("Network failed to resolve nodes: %v", err) - continue - } - // initialize the tunnel - n.tunnel.Init( - tunnel.Nodes(nodes...), - ) - } - } -} - // handleNetConn handles network announcement messages func (n *network) handleNetConn(s tunnel.Session, msg chan *message) { for { @@ -565,12 +554,14 @@ func (n *network) prunePeerRoutes(peer *node) error { // manage the process of announcing to peers and prune any peer nodes that have not been // seen for a period of time. Also removes all the routes either originated by or routable -//by the stale nodes +//by the stale nodes. it also resolves nodes periodically and adds them to the tunnel func (n *network) manage() { announce := time.NewTicker(AnnounceTime) defer announce.Stop() prune := time.NewTicker(PruneTime) defer prune.Stop() + resolve := time.NewTicker(ResolveTime) + defer resolve.Stop() for { select { @@ -584,11 +575,14 @@ func (n *network) manage() { } case <-prune.C: pruned := n.PruneStalePeers(PruneTime) + for id, peer := range pruned { log.Debugf("Network peer exceeded prune time: %s", id) + n.Lock() delete(n.peerLinks, peer.address) n.Unlock() + if err := n.prunePeerRoutes(peer); err != nil { log.Debugf("Network failed pruning peer %s routes: %v", id, err) } @@ -622,6 +616,8 @@ func (n *network) manage() { log.Debugf("Network failed deleting routes by %s: %v", route.Router, err) } } + case <-resolve.C: + n.initNodes() } } } @@ -1165,25 +1161,19 @@ func (n *network) Connect() error { n.Lock() defer n.Unlock() - // try to resolve network nodes - nodes, err := n.resolveNodes() - if err != nil { - log.Debugf("Network failed to resolve nodes: %v", err) - } - - // initialize the tunnel to resolved nodes - n.tunnel.Init( - tunnel.Nodes(nodes...), - ) - // connect network tunnel if err := n.tunnel.Connect(); err != nil { - n.Unlock() return err } + // initialise the nodes + n.initNodes() + // return if already connected if n.connected { + // immediately resolve + initNodes() + // send the connect message n.sendConnect() return nil @@ -1250,11 +1240,9 @@ func (n *network) Connect() error { return err } - // send connect after there's a link established + // manage connection once links are established go n.connect() - // resolve network nodes and re-init the tunnel - go n.resolve() - // broadcast announcements and prune stale nodes + // resolve nodes, broadcast announcements and prune stale nodes go n.manage() // advertise service routes go n.advertise(advertChan) diff --git a/tunnel/default.go b/tunnel/default.go index 985912d0..1fa2fd47 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -202,7 +202,7 @@ func (t *tun) manage() { reconnect := time.NewTicker(ReconnectTime) defer reconnect.Stop() - // do it immediately + // do immediately t.manageLinks() for { @@ -215,6 +215,47 @@ func (t *tun) manage() { } } +// manageLink sends channel discover requests periodically and +// keepalive messages to link +func (t *tun) manageLink(link *link) { + keepalive := time.NewTicker(KeepAliveTime) + defer keepalive.Stop() + discover := time.NewTicker(DiscoverTime) + defer discover.Stop() + + for { + select { + case <-t.closed: + return + case <-link.closed: + return + case <-discover.C: + // send a discovery message to all links + if err := link.Send(&transport.Message{ + Header: map[string]string{ + "Micro-Tunnel": "discover", + "Micro-Tunnel-Id": t.id, + }, + }); err != nil { + log.Debugf("Tunnel failed to send discover to link %s: %v", link.Remote(), err) + } + case <-keepalive.C: + // send keepalive message + log.Debugf("Tunnel sending keepalive to link: %v", link.Remote()) + if err := link.Send(&transport.Message{ + Header: map[string]string{ + "Micro-Tunnel": "keepalive", + "Micro-Tunnel-Id": t.id, + }, + }); err != nil { + log.Debugf("Tunnel error sending keepalive to link %v: %v", link.Remote(), err) + t.delLink(link.Remote()) + return + } + } + } +} + // manageLinks is a function that can be called to immediately to link setup func (t *tun) manageLinks() { var delLinks []string @@ -278,8 +319,15 @@ func (t *tun) manageLinks() { // save the link t.Lock() + defer t.Unlock() + + // just check nothing else was setup in the interim + if _, ok := t.links[node]; ok { + link.Close() + return + } + // save the link t.links[node] = link - t.Unlock() }(node) } @@ -723,59 +771,6 @@ func (t *tun) listen(link *link) { } } -// discover sends channel discover requests periodically -func (t *tun) discover(link *link) { - tick := time.NewTicker(DiscoverTime) - defer tick.Stop() - - for { - select { - case <-tick.C: - // send a discovery message to all links - if err := link.Send(&transport.Message{ - Header: map[string]string{ - "Micro-Tunnel": "discover", - "Micro-Tunnel-Id": t.id, - }, - }); err != nil { - log.Debugf("Tunnel failed to send discover to link %s: %v", link.Remote(), err) - } - case <-link.closed: - return - case <-t.closed: - return - } - } -} - -// keepalive periodically sends keepalive messages to link -func (t *tun) keepalive(link *link) { - keepalive := time.NewTicker(KeepAliveTime) - defer keepalive.Stop() - - for { - select { - case <-t.closed: - return - case <-link.closed: - return - case <-keepalive.C: - // send keepalive message - log.Debugf("Tunnel sending keepalive to link: %v", link.Remote()) - if err := link.Send(&transport.Message{ - Header: map[string]string{ - "Micro-Tunnel": "keepalive", - "Micro-Tunnel-Id": t.id, - }, - }); err != nil { - log.Debugf("Tunnel error sending keepalive to link %v: %v", link.Remote(), err) - t.delLink(link.Remote()) - return - } - } - } -} - // setupLink connects to node and returns link if successful // It returns error if the link failed to be established func (t *tun) setupLink(node string) (*link, error) { @@ -812,11 +807,8 @@ func (t *tun) setupLink(node string) (*link, error) { // process incoming messages go t.listen(link) - // start keepalive monitor - go t.keepalive(link) - - // discover things on the remote side - go t.discover(link) + // manage keepalives and discovery messages + go t.manageLink(link) return link, nil } @@ -839,11 +831,8 @@ func (t *tun) connect() error { // create a new link link := newLink(sock) - // start keepalive monitor - go t.keepalive(link) - - // discover things on the remote side - go t.discover(link) + // manage the link + go t.manageLink(link) // listen for inbound messages. // only save the link once connected. @@ -870,6 +859,8 @@ func (t *tun) Connect() error { // already connected if t.connected { + // do it immediately + t.manageLinks() // setup links return nil } @@ -884,13 +875,13 @@ func (t *tun) Connect() error { // create new close channel t.closed = make(chan bool) - // manage the links - go t.manage() - // process outbound messages to be sent // process sends to all links go t.process() + // manage the links + go t.manage() + return nil } diff --git a/tunnel/reconnect_test.go b/tunnel/reconnect_test.go index 2c78b8b2..3e9331c3 100644 --- a/tunnel/reconnect_test.go +++ b/tunnel/reconnect_test.go @@ -9,15 +9,19 @@ import ( ) func TestReconnectTunnel(t *testing.T) { + // we manually override the tunnel.ReconnectTime value here + // this is so that we make the reconnects faster than the default 5s + ReconnectTime = 100 * time.Millisecond + // create a new tunnel client tunA := NewTunnel( - Address("127.0.0.1:9096"), - Nodes("127.0.0.1:9097"), + Address("127.0.0.1:9098"), + Nodes("127.0.0.1:9099"), ) // create a new tunnel server tunB := NewTunnel( - Address("127.0.0.1:9097"), + Address("127.0.0.1:9099"), ) // start tunnel @@ -27,10 +31,6 @@ func TestReconnectTunnel(t *testing.T) { } defer tunB.Close() - // we manually override the tunnel.ReconnectTime value here - // this is so that we make the reconnects faster than the default 5s - ReconnectTime = 200 * time.Millisecond - // start tunnel err = tunA.Connect() if err != nil { @@ -48,7 +48,7 @@ func TestReconnectTunnel(t *testing.T) { wg.Add(1) // start tunnel sender - go testBrokenTunSend(t, tunA, wait, &wg) + go testBrokenTunSend(t, tunA, wait, &wg, ReconnectTime) // wait until done wg.Wait() diff --git a/tunnel/session.go b/tunnel/session.go index b43f3e75..84153541 100644 --- a/tunnel/session.go +++ b/tunnel/session.go @@ -420,7 +420,7 @@ func (s *session) Close() error { default: close(s.closed) - // don't send close on multicast + // don't send close on multicast or broadcast if s.mode != Unicast { return nil } diff --git a/tunnel/tunnel_test.go b/tunnel/tunnel_test.go index 8c3119da..839beeb1 100644 --- a/tunnel/tunnel_test.go +++ b/tunnel/tunnel_test.go @@ -206,7 +206,7 @@ func testBrokenTunAccept(t *testing.T, tun Tunnel, wait chan bool, wg *sync.Wait wait <- true } -func testBrokenTunSend(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGroup) { +func testBrokenTunSend(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGroup, reconnect time.Duration) { defer wg.Done() // wait for the listener to get ready @@ -234,7 +234,7 @@ func testBrokenTunSend(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGr <-wait // give it time to reconnect - time.Sleep(5 * ReconnectTime) + time.Sleep(10 * reconnect) // send the message if err := c.Send(&m); err != nil {