From 1840b5bd74f2c26a3eb63bbc756befce71875086 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Thu, 5 Sep 2019 15:16:11 +0100 Subject: [PATCH 1/4] Update tunnel to send discovery on connect and multicast messages. Announce as broadcast --- tunnel/default.go | 130 +++++++++++++++++++++++++++++++++++---------- tunnel/listener.go | 2 +- tunnel/session.go | 2 + 3 files changed, 105 insertions(+), 29 deletions(-) diff --git a/tunnel/default.go b/tunnel/default.go index 1aa5d2bf..6b74bfa8 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -94,6 +94,21 @@ func (t *tun) delSession(channel, session string) { t.Unlock() } +// listChannels returns a list of listening channels +func (t *tun) listChannels() []string { + t.RLock() + defer t.RUnlock() + + var channels []string + for _, session := range t.sessions { + if session.session != "listener" { + continue + } + channels = append(channels, session.channel) + } + return channels +} + // newSession creates a new session and saves it func (t *tun) newSession(channel, sessionId string) (*session, bool) { // new session @@ -439,11 +454,20 @@ func (t *tun) listen(link *link) { log.Debugf("Received %+v from %s", msg, link.Remote()) // an announcement of a channel listener case "announce": + channels := strings.Split(channel, ",") + // update mapping in the link link.Lock() - link.channels[channel] = time.Now() + for _, channel := range channels { + link.channels[channel] = time.Now() + } link.Unlock() + // this was an announcement not intended for anything + if sessionId == "listener" || sessionId == "" { + continue + } + // get the session that asked for the discovery s, exists := t.getSession(channel, sessionId) if exists { @@ -463,22 +487,46 @@ func (t *tun) listen(link *link) { } continue case "discover": - // looking for existing mapping - _, exists := t.getSession(channel, "listener") - if exists { - log.Debugf("Tunnel sending announce for discovery of channel %s", channel) - // send back the announcement - link.Send(&transport.Message{ - Header: map[string]string{ - "Micro-Tunnel": "announce", - "Micro-Tunnel-Id": t.id, - "Micro-Tunnel-Channel": channel, - "Micro-Tunnel-Session": sessionId, - "Micro-Tunnel-Link": link.id, - "Micro-Tunnel-Token": t.token, - }, - }) + // create the "announce" response message for a discover request + msg := &transport.Message{ + Header: map[string]string{ + "Micro-Tunnel": "announce", + "Micro-Tunnel-Id": t.id, + "Micro-Tunnel-Channel": channel, + "Micro-Tunnel-Session": sessionId, + "Micro-Tunnel-Link": link.id, + "Micro-Tunnel-Token": t.token, + }, } + + // if no channel is present we've been asked to discover all channels + if len(channel) == 0 { + // get the list of channels + t.RLock() + channels := t.listChannels() + t.RUnlock() + + // if there are no channels continue + if len(channels) == 0 { + continue + } + + // create a list of channels as comma separated list + list := strings.Join(channels, ",") + // set channels as header + msg.Header["Micro-Tunnel-Channel"] = list + } else { + // otherwise look for a single channel mapping + // looking for existing mapping as a listener + _, exists := t.getSession(channel, "listener") + if !exists { + continue + } + log.Debugf("Tunnel sending announce for discovery of channel %s", channel) + } + + // send back the announcement + link.Send(msg) continue default: // blackhole it @@ -728,6 +776,9 @@ func (t *tun) Connect() error { return err } + // request a discovery + t.discover() + // set as connected t.connected = true // create new close channel @@ -736,6 +787,19 @@ func (t *tun) Connect() error { return nil } +func (t *tun) discover() { + // send a discovery message to all links + for _, link := range t.links { + link.Send(&transport.Message{ + Header: map[string]string{ + "Micro-Tunnel": "discover", + "Micro-Tunnel-Id": t.id, + "Micro-Tunnel-Token": t.token, + }, + }) + } +} + func (t *tun) close() error { // close all the sessions for id, s := range t.sessions { @@ -757,7 +821,8 @@ func (t *tun) close() error { } // close the listener - return t.listener.Close() + //return t.listener.Close() + return nil } func (t *tun) Address() string { @@ -856,7 +921,7 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { t.RUnlock() // discovered so set the link if not multicast - // TODO: pick the link efficiently based + // TODO: pick the link efficiently based // on link status and saturation. if c.discovered && !c.multicast { // set the link @@ -866,6 +931,7 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { // shit fuck if !c.discovered { + // create a new discovery message for this channel msg := c.newMessage("discover") msg.broadcast = true msg.outbound = true @@ -874,14 +940,6 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { // send the discovery message t.send <- msg - // don't bother waiting around - // we're just going to assume things come online - if c.multicast { - c.discovered = true - c.accepted = true - return c, nil - } - select { case <-time.After(after()): return nil, ErrDialTimeout @@ -891,17 +949,33 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { } } + var err error + // wait for announce select { case msg := <-c.recv: if msg.typ != "announce" { - return nil, errors.New("failed to discover channel") + err = errors.New("failed to discover channel") } case <-time.After(after()): - return nil, ErrDialTimeout + err = ErrDialTimeout + } + + // don't both sending the error for multicast + // we're just going to assume things come online + if err == nil || c.multicast { + c.discovered = true + return c, nil + } + + // return the error if unicast + if err != nil { + return nil, err } } + // a unicast session so we call "open" and wait for an "accept" + // try to open the session err := c.Open() if err != nil { diff --git a/tunnel/listener.go b/tunnel/listener.go index e60ff396..ee394519 100644 --- a/tunnel/listener.go +++ b/tunnel/listener.go @@ -24,7 +24,7 @@ type tunListener struct { // periodically announce self func (t *tunListener) announce() { - tick := time.NewTicker(time.Minute) + tick := time.NewTicker(time.Second * 30) defer tick.Stop() // first announcement diff --git a/tunnel/session.go b/tunnel/session.go index 58246fdd..a4c7a2fe 100644 --- a/tunnel/session.go +++ b/tunnel/session.go @@ -180,6 +180,8 @@ func (s *session) Announce() error { msg := s.newMessage("announce") // we don't need an error back msg.errChan = nil + // announce to all + msg.broadcast = true // we don't need the link msg.link = "" From d198765c6c004a4f9a5440a23b9ec2fc4a389b9b Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Thu, 5 Sep 2019 15:23:19 +0100 Subject: [PATCH 2/4] Put back close of listener --- tunnel/default.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tunnel/default.go b/tunnel/default.go index 6b74bfa8..0cb35eba 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -821,8 +821,8 @@ func (t *tun) close() error { } // close the listener - //return t.listener.Close() - return nil + // this appears to be blocking + return t.listener.Close() } func (t *tun) Address() string { From 1527a842976c69724a261ddd6e37795d922c9b9f Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Thu, 5 Sep 2019 17:40:41 +0100 Subject: [PATCH 3/4] Shorten multicast discovery --- tunnel/default.go | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/tunnel/default.go b/tunnel/default.go index 0cb35eba..1170a00d 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -951,27 +951,39 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { var err error + // set a dialTimeout + dialTimeout := after() + + // set a shorter delay for multicast + if c.multicast { + // shorten this + dialTimeout = time.Millisecond * 500 + } + // wait for announce select { case msg := <-c.recv: if msg.typ != "announce" { err = errors.New("failed to discover channel") } - case <-time.After(after()): + case <-time.After(dialTimeout): err = ErrDialTimeout } - // don't both sending the error for multicast - // we're just going to assume things come online - if err == nil || c.multicast { + // if its multicast just go ahead because this is best effort + if c.multicast { c.discovered = true + c.accepted = true return c, nil } - // return the error if unicast + // otherwise return an error if err != nil { return nil, err } + + // set discovered to true + c.discovered = true } // a unicast session so we call "open" and wait for an "accept" From ed1faa7a5ccf66541f7ed5f4eeccf8047185b241 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Thu, 5 Sep 2019 18:13:02 +0100 Subject: [PATCH 4/4] Add a discover ticker, announce on connect and refactor --- tunnel/default.go | 138 +++++++++++++++++++++++++++------------------- tunnel/tunnel.go | 6 +- 2 files changed, 85 insertions(+), 59 deletions(-) diff --git a/tunnel/default.go b/tunnel/default.go index 1170a00d..7d265544 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -13,6 +13,8 @@ import ( ) var ( + // DiscoverTime sets the time at which we fire discover messages + DiscoverTime = 60 * time.Second // KeepAliveTime defines time interval we send keepalive messages to outbound links KeepAliveTime = 30 * time.Second // ReconnectTime defines time interval we periodically attempt to reconnect dead links @@ -144,6 +146,52 @@ func (t *tun) newSessionId() string { return uuid.New().String() } +func (t *tun) announce(channel, session string, link *link) { + // create the "announce" response message for a discover request + msg := &transport.Message{ + Header: map[string]string{ + "Micro-Tunnel": "announce", + "Micro-Tunnel-Id": t.id, + "Micro-Tunnel-Channel": channel, + "Micro-Tunnel-Session": session, + "Micro-Tunnel-Link": link.id, + "Micro-Tunnel-Token": t.token, + }, + } + + // if no channel is present we've been asked to discover all channels + if len(channel) == 0 { + // get the list of channels + t.RLock() + channels := t.listChannels() + t.RUnlock() + + // if there are no channels continue + if len(channels) == 0 { + return + } + + // create a list of channels as comma separated list + channel = strings.Join(channels, ",") + // set channels as header + msg.Header["Micro-Tunnel-Channel"] = channel + } else { + // otherwise look for a single channel mapping + // looking for existing mapping as a listener + _, exists := t.getSession(channel, "listener") + if !exists { + return + } + } + + log.Debugf("Tunnel sending announce for discovery of channel(s) %s", channel) + + // send back the announcement + if err := link.Send(msg); err != nil { + log.Debugf("Tunnel failed to send announcement for channel(s) %s message: %v", channel, err) + } +} + // monitor monitors outbound links and attempts to reconnect to the failed ones func (t *tun) monitor() { reconnect := time.NewTicker(ReconnectTime) @@ -398,6 +446,8 @@ func (t *tun) listen(link *link) { t.links[link.Remote()] = link t.Unlock() + // send back a discovery + go t.announce("", "", link) // nothing more to do continue case "close": @@ -454,6 +504,7 @@ func (t *tun) listen(link *link) { log.Debugf("Received %+v from %s", msg, link.Remote()) // an announcement of a channel listener case "announce": + // process the announcement channels := strings.Split(channel, ",") // update mapping in the link @@ -487,46 +538,8 @@ func (t *tun) listen(link *link) { } continue case "discover": - // create the "announce" response message for a discover request - msg := &transport.Message{ - Header: map[string]string{ - "Micro-Tunnel": "announce", - "Micro-Tunnel-Id": t.id, - "Micro-Tunnel-Channel": channel, - "Micro-Tunnel-Session": sessionId, - "Micro-Tunnel-Link": link.id, - "Micro-Tunnel-Token": t.token, - }, - } - - // if no channel is present we've been asked to discover all channels - if len(channel) == 0 { - // get the list of channels - t.RLock() - channels := t.listChannels() - t.RUnlock() - - // if there are no channels continue - if len(channels) == 0 { - continue - } - - // create a list of channels as comma separated list - list := strings.Join(channels, ",") - // set channels as header - msg.Header["Micro-Tunnel-Channel"] = list - } else { - // otherwise look for a single channel mapping - // looking for existing mapping as a listener - _, exists := t.getSession(channel, "listener") - if !exists { - continue - } - log.Debugf("Tunnel sending announce for discovery of channel %s", channel) - } - - // send back the announcement - link.Send(msg) + // send back an announcement + go t.announce(channel, sessionId, link) continue default: // blackhole it @@ -635,6 +648,30 @@ func (t *tun) listen(link *link) { } } +// discover sends channel discover requests periodically +func (t *tun) discover(link *link) { + tick := time.NewTicker(DiscoverTime) + defer tick.Stop() + + for { + select { + case <-tick.C: + // send a discovery message to all links + if err := link.Send(&transport.Message{ + Header: map[string]string{ + "Micro-Tunnel": "discover", + "Micro-Tunnel-Id": t.id, + "Micro-Tunnel-Token": t.token, + }, + }); err != nil { + log.Debugf("Tunnel failed to send discover to link %s: %v", link.id, err) + } + case <-t.closed: + return + } + } +} + // keepalive periodically sends keepalive messages to link func (t *tun) keepalive(link *link) { keepalive := time.NewTicker(KeepAliveTime) @@ -698,6 +735,9 @@ func (t *tun) setupLink(node string) (*link, error) { // start keepalive monitor go t.keepalive(link) + // discover things on the remote side + go t.discover(link) + return link, nil } @@ -776,9 +816,6 @@ func (t *tun) Connect() error { return err } - // request a discovery - t.discover() - // set as connected t.connected = true // create new close channel @@ -787,19 +824,6 @@ func (t *tun) Connect() error { return nil } -func (t *tun) discover() { - // send a discovery message to all links - for _, link := range t.links { - link.Send(&transport.Message{ - Header: map[string]string{ - "Micro-Tunnel": "discover", - "Micro-Tunnel-Id": t.id, - "Micro-Tunnel-Token": t.token, - }, - }) - } -} - func (t *tun) close() error { // close all the sessions for id, s := range t.sessions { @@ -964,7 +988,7 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { select { case msg := <-c.recv: if msg.typ != "announce" { - err = errors.New("failed to discover channel") + err = ErrDiscoverChan } case <-time.After(dialTimeout): err = ErrDialTimeout diff --git a/tunnel/tunnel.go b/tunnel/tunnel.go index 7c9e2afc..14604215 100644 --- a/tunnel/tunnel.go +++ b/tunnel/tunnel.go @@ -9,10 +9,12 @@ import ( ) var ( - // ErrDialTimeout is returned by a call to Dial where the timeout occurs - ErrDialTimeout = errors.New("dial timeout") // DefaultDialTimeout is the dial timeout if none is specified DefaultDialTimeout = time.Second * 5 + // ErrDialTimeout is returned by a call to Dial where the timeout occurs + ErrDialTimeout = errors.New("dial timeout") + // ErrDiscoverChan is returned when we failed to receive the "announce" back from a discovery + ErrDiscoverChan = errors.New("failed to discover channel") ) // Tunnel creates a gre tunnel on top of the go-micro/transport.