From 1840b5bd74f2c26a3eb63bbc756befce71875086 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Thu, 5 Sep 2019 15:16:11 +0100 Subject: [PATCH] Update tunnel to send discovery on connect and multicast messages. Announce as broadcast --- tunnel/default.go | 130 +++++++++++++++++++++++++++++++++++---------- tunnel/listener.go | 2 +- tunnel/session.go | 2 + 3 files changed, 105 insertions(+), 29 deletions(-) diff --git a/tunnel/default.go b/tunnel/default.go index 1aa5d2bf..6b74bfa8 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -94,6 +94,21 @@ func (t *tun) delSession(channel, session string) { t.Unlock() } +// listChannels returns a list of listening channels +func (t *tun) listChannels() []string { + t.RLock() + defer t.RUnlock() + + var channels []string + for _, session := range t.sessions { + if session.session != "listener" { + continue + } + channels = append(channels, session.channel) + } + return channels +} + // newSession creates a new session and saves it func (t *tun) newSession(channel, sessionId string) (*session, bool) { // new session @@ -439,11 +454,20 @@ func (t *tun) listen(link *link) { log.Debugf("Received %+v from %s", msg, link.Remote()) // an announcement of a channel listener case "announce": + channels := strings.Split(channel, ",") + // update mapping in the link link.Lock() - link.channels[channel] = time.Now() + for _, channel := range channels { + link.channels[channel] = time.Now() + } link.Unlock() + // this was an announcement not intended for anything + if sessionId == "listener" || sessionId == "" { + continue + } + // get the session that asked for the discovery s, exists := t.getSession(channel, sessionId) if exists { @@ -463,22 +487,46 @@ func (t *tun) listen(link *link) { } continue case "discover": - // looking for existing mapping - _, exists := t.getSession(channel, "listener") - if exists { - log.Debugf("Tunnel sending announce for discovery of channel %s", channel) - // send back the announcement - link.Send(&transport.Message{ - Header: map[string]string{ - "Micro-Tunnel": "announce", - "Micro-Tunnel-Id": t.id, - "Micro-Tunnel-Channel": channel, - "Micro-Tunnel-Session": sessionId, - "Micro-Tunnel-Link": link.id, - "Micro-Tunnel-Token": t.token, - }, - }) + // create the "announce" response message for a discover request + msg := &transport.Message{ + Header: map[string]string{ + "Micro-Tunnel": "announce", + "Micro-Tunnel-Id": t.id, + "Micro-Tunnel-Channel": channel, + "Micro-Tunnel-Session": sessionId, + "Micro-Tunnel-Link": link.id, + "Micro-Tunnel-Token": t.token, + }, } + + // if no channel is present we've been asked to discover all channels + if len(channel) == 0 { + // get the list of channels + t.RLock() + channels := t.listChannels() + t.RUnlock() + + // if there are no channels continue + if len(channels) == 0 { + continue + } + + // create a list of channels as comma separated list + list := strings.Join(channels, ",") + // set channels as header + msg.Header["Micro-Tunnel-Channel"] = list + } else { + // otherwise look for a single channel mapping + // looking for existing mapping as a listener + _, exists := t.getSession(channel, "listener") + if !exists { + continue + } + log.Debugf("Tunnel sending announce for discovery of channel %s", channel) + } + + // send back the announcement + link.Send(msg) continue default: // blackhole it @@ -728,6 +776,9 @@ func (t *tun) Connect() error { return err } + // request a discovery + t.discover() + // set as connected t.connected = true // create new close channel @@ -736,6 +787,19 @@ func (t *tun) Connect() error { return nil } +func (t *tun) discover() { + // send a discovery message to all links + for _, link := range t.links { + link.Send(&transport.Message{ + Header: map[string]string{ + "Micro-Tunnel": "discover", + "Micro-Tunnel-Id": t.id, + "Micro-Tunnel-Token": t.token, + }, + }) + } +} + func (t *tun) close() error { // close all the sessions for id, s := range t.sessions { @@ -757,7 +821,8 @@ func (t *tun) close() error { } // close the listener - return t.listener.Close() + //return t.listener.Close() + return nil } func (t *tun) Address() string { @@ -856,7 +921,7 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { t.RUnlock() // discovered so set the link if not multicast - // TODO: pick the link efficiently based + // TODO: pick the link efficiently based // on link status and saturation. if c.discovered && !c.multicast { // set the link @@ -866,6 +931,7 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { // shit fuck if !c.discovered { + // create a new discovery message for this channel msg := c.newMessage("discover") msg.broadcast = true msg.outbound = true @@ -874,14 +940,6 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { // send the discovery message t.send <- msg - // don't bother waiting around - // we're just going to assume things come online - if c.multicast { - c.discovered = true - c.accepted = true - return c, nil - } - select { case <-time.After(after()): return nil, ErrDialTimeout @@ -891,17 +949,33 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { } } + var err error + // wait for announce select { case msg := <-c.recv: if msg.typ != "announce" { - return nil, errors.New("failed to discover channel") + err = errors.New("failed to discover channel") } case <-time.After(after()): - return nil, ErrDialTimeout + err = ErrDialTimeout + } + + // don't both sending the error for multicast + // we're just going to assume things come online + if err == nil || c.multicast { + c.discovered = true + return c, nil + } + + // return the error if unicast + if err != nil { + return nil, err } } + // a unicast session so we call "open" and wait for an "accept" + // try to open the session err := c.Open() if err != nil { diff --git a/tunnel/listener.go b/tunnel/listener.go index e60ff396..ee394519 100644 --- a/tunnel/listener.go +++ b/tunnel/listener.go @@ -24,7 +24,7 @@ type tunListener struct { // periodically announce self func (t *tunListener) announce() { - tick := time.NewTicker(time.Minute) + tick := time.NewTicker(time.Second * 30) defer tick.Stop() // first announcement diff --git a/tunnel/session.go b/tunnel/session.go index 58246fdd..a4c7a2fe 100644 --- a/tunnel/session.go +++ b/tunnel/session.go @@ -180,6 +180,8 @@ func (s *session) Announce() error { msg := s.newMessage("announce") // we don't need an error back msg.errChan = nil + // announce to all + msg.broadcast = true // we don't need the link msg.link = ""