From 6e28e7a86f23ad603c09bfd9b97e0530f838aa8f Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Wed, 11 Dec 2019 14:37:03 +0000 Subject: [PATCH] Save current state of the world --- network/default.go | 3 +- runtime/default.go | 2 +- tunnel/default.go | 88 +++++++++++++++++++++++++++++++--------------- tunnel/link.go | 38 +++++++++++++------- tunnel/options.go | 10 ++++++ 5 files changed, 97 insertions(+), 44 deletions(-) diff --git a/network/default.go b/network/default.go index dfc73bd2..c3c8d5db 100644 --- a/network/default.go +++ b/network/default.go @@ -1023,7 +1023,8 @@ func (n *network) sendTo(method, channel string, peer *node, msg proto.Message) if err != nil { return err } - c, err := n.tunnel.Dial(channel, tunnel.DialMode(tunnel.Multicast), tunnel.DialLink(peer.link)) + // Create a unicast connection to the peer but don't do the open/accept flow + c, err := n.tunnel.Dial(channel, tunnel.DialWait(false), tunnel.DialLink(peer.link)) if err != nil { return err } diff --git a/runtime/default.go b/runtime/default.go index 11ab7d32..e9318d46 100644 --- a/runtime/default.go +++ b/runtime/default.go @@ -143,7 +143,7 @@ func (r *runtime) run(events <-chan Event) { } } case <-r.closed: - log.Debugf("Runtime stopped.") + log.Debugf("Runtime stopped") return } } diff --git a/tunnel/default.go b/tunnel/default.go index a6cb08c9..8abad16b 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -73,10 +73,10 @@ func newTunnel(opts ...Option) *tun { // Init initializes tunnel options func (t *tun) Init(opts ...Option) error { t.Lock() - defer t.Unlock() for _, o := range opts { o(&t.options) } + t.Unlock() return nil } @@ -103,7 +103,6 @@ func (t *tun) delSession(channel, session string) { // listChannels returns a list of listening channels func (t *tun) listChannels() []string { t.RLock() - defer t.RUnlock() //nolint:prealloc var channels []string @@ -113,6 +112,9 @@ func (t *tun) listChannels() []string { } channels = append(channels, session.channel) } + + t.RUnlock() + return channels } @@ -244,53 +246,71 @@ func (t *tun) manageLink(link *link) { } // manageLinks is a function that can be called to immediately to link setup +// it purges dead links while generating new links for any nodes not connected func (t *tun) manageLinks() { - var delLinks []string + delLinks := make(map[*link]string) + connected := make(map[string]bool) t.RLock() + // get list of nodes from options + nodes := t.options.Nodes + // check the link status and purge dead links for node, link := range t.links { // check link status switch link.State() { - case "closed": - delLinks = append(delLinks, node) - case "error": - delLinks = append(delLinks, node) + case "closed", "error": + delLinks[link] = node + default: + connected[node] = true } } t.RUnlock() + // build a list of links to connect to + var connect []string + + for _, node := range nodes { + // check if we're connected + if _, ok := connected[node]; ok { + continue + } + // add nodes to connect o + connect = append(connect, node) + } + + // delete the dead links if len(delLinks) > 0 { t.Lock() - for _, node := range delLinks { + + for link, node := range delLinks { log.Debugf("Tunnel deleting dead link for %s", node) - if link, ok := t.links[node]; ok { - link.Close() + + // check if the link exists + l, ok := t.links[node] + if ok { + // close and delete + l.Close() delete(t.links, node) } + + // if the link does not match our own + if l != link { + // close our link just in case + link.Close() + } } + t.Unlock() } - // check current link status - var connect []string - - // build list of unknown nodes to connect to - t.RLock() - - for _, node := range t.options.Nodes { - if _, ok := t.links[node]; !ok { - connect = append(connect, node) - } - } - - t.RUnlock() - var wg sync.WaitGroup + // establish new links + for _, node := range connect { wg.Add(1) @@ -298,6 +318,7 @@ func (t *tun) manageLinks() { defer wg.Done() // create new link + // if we're using quic it should be a max 10 second handshake period link, err := t.setupLink(node) if err != nil { log.Debugf("Tunnel failed to setup node link to %s: %v", node, err) @@ -313,6 +334,7 @@ func (t *tun) manageLinks() { link.Close() return } + // save the link t.links[node] = link }(node) @@ -469,7 +491,6 @@ func (t *tun) process() { func (t *tun) delLink(remote string) { t.Lock() - defer t.Unlock() // get the link for id, link := range t.links { @@ -481,6 +502,8 @@ func (t *tun) delLink(remote string) { link.Close() delete(t.links, id) } + + t.Unlock() } // process incoming messages @@ -1035,6 +1058,7 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { // get opts options := DialOptions{ Timeout: DefaultDialTimeout, + Wait: true, } for _, o := range opts { @@ -1119,11 +1143,16 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { } // return early if its not unicast - // we will not call "open" for multicast + // we will not wait for "open" for multicast if c.mode != Unicast { return c, nil } + // if we're not told to wait + if !options.Wait { + return c, nil + } + // Note: we go no further for multicast or broadcast. // This is a unicast session so we call "open" and wait // for an "accept" @@ -1221,15 +1250,16 @@ func (t *tun) Listen(channel string, opts ...ListenOption) (Listener, error) { } func (t *tun) Links() []Link { - t.RLock() - defer t.RUnlock() - links := make([]Link, 0, len(t.links)) + t.RLock() + for _, link := range t.links { links = append(links, link) } + t.RUnlock() + return links } diff --git a/tunnel/link.go b/tunnel/link.go index a072a047..fe6ba83a 100644 --- a/tunnel/link.go +++ b/tunnel/link.go @@ -138,10 +138,10 @@ func (l *link) setRate(bits int64, delta time.Duration) { // setRTT sets a nanosecond based moving average roundtrip time for the link func (l *link) setRTT(d time.Duration) { l.Lock() - defer l.Unlock() if l.length <= 0 { l.length = d.Nanoseconds() + l.Unlock() return } @@ -149,6 +149,8 @@ func (l *link) setRTT(d time.Duration) { length := 0.8*float64(l.length) + 0.2*float64(d.Nanoseconds()) // set new length l.length = int64(length) + + l.Unlock() } func (l *link) delChannel(ch string) { @@ -159,8 +161,9 @@ func (l *link) delChannel(ch string) { func (l *link) getChannel(ch string) time.Time { l.RLock() - defer l.RUnlock() - return l.channels[ch] + t := l.channels[ch] + l.RUnlock() + return t } func (l *link) setChannel(channels ...string) { @@ -344,27 +347,31 @@ func (l *link) Delay() int64 { // Current transfer rate as bits per second (lower is better) func (l *link) Rate() float64 { l.RLock() - defer l.RUnlock() - return l.rate + r := l.rate + l.RUnlock() + return r } func (l *link) Loopback() bool { l.RLock() - defer l.RUnlock() - return l.loopback + lo := l.loopback + l.RUnlock() + return lo } // Length returns the roundtrip time as nanoseconds (lower is better). // Returns 0 where no measurement has been taken. func (l *link) Length() int64 { l.RLock() - defer l.RUnlock() - return l.length + length := l.length + l.RUnlock() + return length } func (l *link) Id() string { l.RLock() - defer l.RUnlock() + id := l.id + l.RUnlock() return l.id } @@ -413,11 +420,11 @@ func (l *link) Send(m *transport.Message) error { } l.Lock() - defer l.Unlock() // there's an error increment the counter and bail if err != nil { l.errCount++ + l.Unlock() return err } @@ -441,6 +448,8 @@ func (l *link) Send(m *transport.Message) error { l.setRate(int64(bits), time.Since(now)) } + l.Unlock() + return nil } @@ -476,10 +485,13 @@ func (l *link) State() string { return "closed" default: l.RLock() - defer l.RUnlock() - if l.errCount > 3 { + errCount := l.errCount + l.RUnlock() + + if lerrCount > 3 { return "error" } + return "connected" } } diff --git a/tunnel/options.go b/tunnel/options.go index 145db19f..262df797 100644 --- a/tunnel/options.go +++ b/tunnel/options.go @@ -38,6 +38,8 @@ type DialOptions struct { Link string // specify mode of the session Mode Mode + // Wait for connection to be accepted + Wait bool // the dial timeout Timeout time.Duration } @@ -124,6 +126,14 @@ func DialLink(id string) DialOption { } } +// DialWait specifies whether to wait for the connection +// to be accepted before returning the session +func DialWait(b bool) DialOption { + return func(o *DialOptions) { + o.Wait = b + } +} + // DefaultOptions returns router default options func DefaultOptions() Options { return Options{