From 398acc67cada3349df5245d37e29452a6555b356 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Sun, 8 Dec 2019 13:45:24 +0000 Subject: [PATCH] fix broken test --- tunnel/default.go | 72 +++++++++----- tunnel/tunnel_test.go | 214 +++++++++++++++++++++++++----------------- 2 files changed, 179 insertions(+), 107 deletions(-) diff --git a/tunnel/default.go b/tunnel/default.go index 1fa2fd47..a6cb08c9 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -202,9 +202,6 @@ func (t *tun) manage() { reconnect := time.NewTicker(ReconnectTime) defer reconnect.Stop() - // do immediately - t.manageLinks() - for { select { case <-t.closed: @@ -231,23 +228,13 @@ func (t *tun) manageLink(link *link) { return case <-discover.C: // send a discovery message to all links - if err := link.Send(&transport.Message{ - Header: map[string]string{ - "Micro-Tunnel": "discover", - "Micro-Tunnel-Id": t.id, - }, - }); err != nil { + if err := t.sendMsg("discover", link); err != nil { log.Debugf("Tunnel failed to send discover to link %s: %v", link.Remote(), err) } case <-keepalive.C: // send keepalive message log.Debugf("Tunnel sending keepalive to link: %v", link.Remote()) - if err := link.Send(&transport.Message{ - Header: map[string]string{ - "Micro-Tunnel": "keepalive", - "Micro-Tunnel-Id": t.id, - }, - }); err != nil { + if err := t.sendMsg("keepalive", link); err != nil { log.Debugf("Tunnel error sending keepalive to link %v: %v", link.Remote(), err) t.delLink(link.Remote()) return @@ -570,8 +557,10 @@ func (t *tun) listen(link *link) { t.links[link.Remote()] = link t.Unlock() - // send back a discovery + // send back an announcement of our channels discovery go t.announce("", "", link) + // ask for the things on the other wise + go t.sendMsg("discover", link) // nothing more to do continue case "close": @@ -771,6 +760,15 @@ func (t *tun) listen(link *link) { } } +func (t *tun) sendMsg(method string, link *link) error { + return link.Send(&transport.Message{ + Header: map[string]string{ + "Micro-Tunnel": method, + "Micro-Tunnel-Id": t.id, + }, + }) +} + // setupLink connects to node and returns link if successful // It returns error if the link failed to be established func (t *tun) setupLink(node string) (*link, error) { @@ -790,12 +788,7 @@ func (t *tun) setupLink(node string) (*link, error) { link.id = c.Remote() // send the first connect message - if err := link.Send(&transport.Message{ - Header: map[string]string{ - "Micro-Tunnel": "connect", - "Micro-Tunnel-Id": t.id, - }, - }); err != nil { + if err := t.sendMsg("connect", link); err != nil { link.Close() return nil, err } @@ -813,6 +806,36 @@ func (t *tun) setupLink(node string) (*link, error) { return link, nil } +func (t *tun) setupLinks() { + var wg sync.WaitGroup + + for _, node := range t.options.Nodes { + wg.Add(1) + + go func(node string) { + defer wg.Done() + + // we're not trying to fix existing links + if _, ok := t.links[node]; ok { + return + } + + // create new link + link, err := t.setupLink(node) + if err != nil { + log.Debugf("Tunnel failed to setup node link to %s: %v", node, err) + return + } + + // save the link + t.links[node] = link + }(node) + } + + // wait for all threads to finish + wg.Wait() +} + // connect the tunnel to all the nodes and listen for incoming tunnel connections func (t *tun) connect() error { l, err := t.options.Transport.Listen(t.options.Address) @@ -860,7 +883,7 @@ func (t *tun) Connect() error { // already connected if t.connected { // do it immediately - t.manageLinks() + t.setupLinks() // setup links return nil } @@ -879,6 +902,9 @@ func (t *tun) Connect() error { // process sends to all links go t.process() + // call setup before managing them + t.setupLinks() + // manage the links go t.manage() diff --git a/tunnel/tunnel_test.go b/tunnel/tunnel_test.go index 839beeb1..9633e53d 100644 --- a/tunnel/tunnel_test.go +++ b/tunnel/tunnel_test.go @@ -8,6 +8,90 @@ import ( "github.com/micro/go-micro/transport" ) +func testBrokenTunAccept(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGroup) { + defer wg.Done() + + // listen on some virtual address + tl, err := tun.Listen("test-tunnel") + if err != nil { + t.Fatal(err) + } + + // receiver ready; notify sender + wait <- true + + // accept a connection + c, err := tl.Accept() + if err != nil { + t.Fatal(err) + } + + // accept the message and close the tunnel + // we do this to simulate loss of network connection + m := new(transport.Message) + if err := c.Recv(m); err != nil { + t.Fatal(err) + } + + // close all the links + for _, link := range tun.Links() { + link.Close() + } + + // receiver ready; notify sender + wait <- true + + // accept the message + m = new(transport.Message) + if err := c.Recv(m); err != nil { + t.Fatal(err) + } + + // notify the sender we have received + wait <- true +} + +func testBrokenTunSend(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGroup, reconnect time.Duration) { + defer wg.Done() + + // wait for the listener to get ready + <-wait + + // dial a new session + c, err := tun.Dial("test-tunnel") + if err != nil { + t.Fatal(err) + } + defer c.Close() + + m := transport.Message{ + Header: map[string]string{ + "test": "send", + }, + } + + // send the message + if err := c.Send(&m); err != nil { + t.Fatal(err) + } + + // wait for the listener to get ready + <-wait + + // give it time to reconnect + time.Sleep(reconnect) + + // send the message + if err := c.Send(&m); err != nil { + t.Fatal(err) + } + + // wait for the listener to receive the message + // c.Send merely enqueues the message to the link send queue and returns + // in order to verify it was received we wait for the listener to tell us + <-wait +} + // testAccept will accept connections on the transport, create a new link and tunnel on top func testAccept(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGroup) { defer wg.Done() @@ -163,90 +247,6 @@ func TestLoopbackTunnel(t *testing.T) { wg.Wait() } -func testBrokenTunAccept(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGroup) { - defer wg.Done() - - // listen on some virtual address - tl, err := tun.Listen("test-tunnel") - if err != nil { - t.Fatal(err) - } - - // receiver ready; notify sender - wait <- true - - // accept a connection - c, err := tl.Accept() - if err != nil { - t.Fatal(err) - } - - // accept the message and close the tunnel - // we do this to simulate loss of network connection - m := new(transport.Message) - if err := c.Recv(m); err != nil { - t.Fatal(err) - } - - // close all the links - for _, link := range tun.Links() { - link.Close() - } - - // receiver ready; notify sender - wait <- true - - // accept the message - m = new(transport.Message) - if err := c.Recv(m); err != nil { - t.Fatal(err) - } - - // notify the sender we have received - wait <- true -} - -func testBrokenTunSend(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGroup, reconnect time.Duration) { - defer wg.Done() - - // wait for the listener to get ready - <-wait - - // dial a new session - c, err := tun.Dial("test-tunnel") - if err != nil { - t.Fatal(err) - } - defer c.Close() - - m := transport.Message{ - Header: map[string]string{ - "test": "send", - }, - } - - // send the message - if err := c.Send(&m); err != nil { - t.Fatal(err) - } - - // wait for the listener to get ready - <-wait - - // give it time to reconnect - time.Sleep(10 * reconnect) - - // send the message - if err := c.Send(&m); err != nil { - t.Fatal(err) - } - - // wait for the listener to receive the message - // c.Send merely enqueues the message to the link send queue and returns - // in order to verify it was received we wait for the listener to tell us - <-wait -} - func TestTunnelRTTRate(t *testing.T) { // create a new tunnel client tunA := NewTunnel( @@ -296,3 +296,49 @@ func TestTunnelRTTRate(t *testing.T) { t.Logf("Link %s length %v rate %v", link.Id(), link.Length(), link.Rate()) } } + +func TestReconnectTunnel(t *testing.T) { + // we manually override the tunnel.ReconnectTime value here + // this is so that we make the reconnects faster than the default 5s + ReconnectTime = 200 * time.Millisecond + + // create a new tunnel client + tunA := NewTunnel( + Address("127.0.0.1:9098"), + Nodes("127.0.0.1:9099"), + ) + + // create a new tunnel server + tunB := NewTunnel( + Address("127.0.0.1:9099"), + ) + + // start tunnel + err := tunB.Connect() + if err != nil { + t.Fatal(err) + } + defer tunB.Close() + + // start tunnel + err = tunA.Connect() + if err != nil { + t.Fatal(err) + } + defer tunA.Close() + + wait := make(chan bool) + + var wg sync.WaitGroup + + wg.Add(1) + // start tunnel listener + go testBrokenTunAccept(t, tunB, wait, &wg) + + wg.Add(1) + // start tunnel sender + go testBrokenTunSend(t, tunA, wait, &wg, ReconnectTime*5) + + // wait until done + wg.Wait() +}