diff --git a/tunnel/default.go b/tunnel/default.go index e4328837..69a5d878 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -73,6 +73,8 @@ func newTunnel(opts ...Option) *tun { // Init initializes tunnel options func (t *tun) Init(opts ...Option) error { + t.Lock() + defer t.Unlock() for _, o := range opts { o(&t.options) } @@ -230,7 +232,9 @@ func (t *tun) listen(link *link) { // are we connecting to ourselves? if token == t.token { + t.Lock() link.loopback = true + t.Unlock() } // nothing more to do @@ -242,7 +246,9 @@ func (t *tun) listen(link *link) { continue case "keepalive": log.Debugf("Tunnel link %s received keepalive", link.Remote()) + t.Lock() link.lastKeepAlive = time.Now() + t.Unlock() continue case "message": // process message diff --git a/tunnel/tunnel_test.go b/tunnel/tunnel_test.go index 5b6cfbc0..3fc84b6d 100644 --- a/tunnel/tunnel_test.go +++ b/tunnel/tunnel_test.go @@ -10,6 +10,8 @@ import ( // testAccept will accept connections on the transport, create a new link and tunnel on top func testAccept(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGroup) { + defer wg.Done() + // listen on some virtual address tl, err := tun.Listen("test-tunnel") if err != nil { @@ -43,7 +45,8 @@ func testAccept(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGroup) { t.Fatal(err) } - wg.Done() + wait <- true + return } } @@ -79,6 +82,8 @@ func testSend(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGroup) { t.Fatal(err) } + <-wait + if v := mr.Header["test"]; v != "accept" { t.Fatalf("Message not received from accepted side. Received: %s", v) } @@ -140,6 +145,8 @@ func TestLoopbackTunnel(t *testing.T) { } defer tun.Close() + time.Sleep(500 * time.Millisecond) + wait := make(chan bool) var wg sync.WaitGroup