diff --git a/tunnel/default.go b/tunnel/default.go index 1aa5d2bf..7d265544 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -13,6 +13,8 @@ import ( ) var ( + // DiscoverTime sets the time at which we fire discover messages + DiscoverTime = 60 * time.Second // KeepAliveTime defines time interval we send keepalive messages to outbound links KeepAliveTime = 30 * time.Second // ReconnectTime defines time interval we periodically attempt to reconnect dead links @@ -94,6 +96,21 @@ func (t *tun) delSession(channel, session string) { t.Unlock() } +// listChannels returns a list of listening channels +func (t *tun) listChannels() []string { + t.RLock() + defer t.RUnlock() + + var channels []string + for _, session := range t.sessions { + if session.session != "listener" { + continue + } + channels = append(channels, session.channel) + } + return channels +} + // newSession creates a new session and saves it func (t *tun) newSession(channel, sessionId string) (*session, bool) { // new session @@ -129,6 +146,52 @@ func (t *tun) newSessionId() string { return uuid.New().String() } +func (t *tun) announce(channel, session string, link *link) { + // create the "announce" response message for a discover request + msg := &transport.Message{ + Header: map[string]string{ + "Micro-Tunnel": "announce", + "Micro-Tunnel-Id": t.id, + "Micro-Tunnel-Channel": channel, + "Micro-Tunnel-Session": session, + "Micro-Tunnel-Link": link.id, + "Micro-Tunnel-Token": t.token, + }, + } + + // if no channel is present we've been asked to discover all channels + if len(channel) == 0 { + // get the list of channels + t.RLock() + channels := t.listChannels() + t.RUnlock() + + // if there are no channels continue + if len(channels) == 0 { + return + } + + // create a list of channels as comma separated list + channel = strings.Join(channels, ",") + // set channels as header + msg.Header["Micro-Tunnel-Channel"] = channel + } else { + // otherwise look for a single channel mapping + // looking for existing mapping as a listener + _, exists := t.getSession(channel, "listener") + if !exists { + return + } + } + + log.Debugf("Tunnel sending announce for discovery of channel(s) %s", channel) + + // send back the announcement + if err := link.Send(msg); err != nil { + log.Debugf("Tunnel failed to send announcement for channel(s) %s message: %v", channel, err) + } +} + // monitor monitors outbound links and attempts to reconnect to the failed ones func (t *tun) monitor() { reconnect := time.NewTicker(ReconnectTime) @@ -383,6 +446,8 @@ func (t *tun) listen(link *link) { t.links[link.Remote()] = link t.Unlock() + // send back a discovery + go t.announce("", "", link) // nothing more to do continue case "close": @@ -439,11 +504,21 @@ func (t *tun) listen(link *link) { log.Debugf("Received %+v from %s", msg, link.Remote()) // an announcement of a channel listener case "announce": + // process the announcement + channels := strings.Split(channel, ",") + // update mapping in the link link.Lock() - link.channels[channel] = time.Now() + for _, channel := range channels { + link.channels[channel] = time.Now() + } link.Unlock() + // this was an announcement not intended for anything + if sessionId == "listener" || sessionId == "" { + continue + } + // get the session that asked for the discovery s, exists := t.getSession(channel, sessionId) if exists { @@ -463,22 +538,8 @@ func (t *tun) listen(link *link) { } continue case "discover": - // looking for existing mapping - _, exists := t.getSession(channel, "listener") - if exists { - log.Debugf("Tunnel sending announce for discovery of channel %s", channel) - // send back the announcement - link.Send(&transport.Message{ - Header: map[string]string{ - "Micro-Tunnel": "announce", - "Micro-Tunnel-Id": t.id, - "Micro-Tunnel-Channel": channel, - "Micro-Tunnel-Session": sessionId, - "Micro-Tunnel-Link": link.id, - "Micro-Tunnel-Token": t.token, - }, - }) - } + // send back an announcement + go t.announce(channel, sessionId, link) continue default: // blackhole it @@ -587,6 +648,30 @@ func (t *tun) listen(link *link) { } } +// discover sends channel discover requests periodically +func (t *tun) discover(link *link) { + tick := time.NewTicker(DiscoverTime) + defer tick.Stop() + + for { + select { + case <-tick.C: + // send a discovery message to all links + if err := link.Send(&transport.Message{ + Header: map[string]string{ + "Micro-Tunnel": "discover", + "Micro-Tunnel-Id": t.id, + "Micro-Tunnel-Token": t.token, + }, + }); err != nil { + log.Debugf("Tunnel failed to send discover to link %s: %v", link.id, err) + } + case <-t.closed: + return + } + } +} + // keepalive periodically sends keepalive messages to link func (t *tun) keepalive(link *link) { keepalive := time.NewTicker(KeepAliveTime) @@ -650,6 +735,9 @@ func (t *tun) setupLink(node string) (*link, error) { // start keepalive monitor go t.keepalive(link) + // discover things on the remote side + go t.discover(link) + return link, nil } @@ -757,6 +845,7 @@ func (t *tun) close() error { } // close the listener + // this appears to be blocking return t.listener.Close() } @@ -856,7 +945,7 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { t.RUnlock() // discovered so set the link if not multicast - // TODO: pick the link efficiently based + // TODO: pick the link efficiently based // on link status and saturation. if c.discovered && !c.multicast { // set the link @@ -866,6 +955,7 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { // shit fuck if !c.discovered { + // create a new discovery message for this channel msg := c.newMessage("discover") msg.broadcast = true msg.outbound = true @@ -874,14 +964,6 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { // send the discovery message t.send <- msg - // don't bother waiting around - // we're just going to assume things come online - if c.multicast { - c.discovered = true - c.accepted = true - return c, nil - } - select { case <-time.After(after()): return nil, ErrDialTimeout @@ -891,17 +973,45 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { } } + var err error + + // set a dialTimeout + dialTimeout := after() + + // set a shorter delay for multicast + if c.multicast { + // shorten this + dialTimeout = time.Millisecond * 500 + } + // wait for announce select { case msg := <-c.recv: if msg.typ != "announce" { - return nil, errors.New("failed to discover channel") + err = ErrDiscoverChan } - case <-time.After(after()): - return nil, ErrDialTimeout + case <-time.After(dialTimeout): + err = ErrDialTimeout } + + // if its multicast just go ahead because this is best effort + if c.multicast { + c.discovered = true + c.accepted = true + return c, nil + } + + // otherwise return an error + if err != nil { + return nil, err + } + + // set discovered to true + c.discovered = true } + // a unicast session so we call "open" and wait for an "accept" + // try to open the session err := c.Open() if err != nil { diff --git a/tunnel/listener.go b/tunnel/listener.go index e60ff396..ee394519 100644 --- a/tunnel/listener.go +++ b/tunnel/listener.go @@ -24,7 +24,7 @@ type tunListener struct { // periodically announce self func (t *tunListener) announce() { - tick := time.NewTicker(time.Minute) + tick := time.NewTicker(time.Second * 30) defer tick.Stop() // first announcement diff --git a/tunnel/session.go b/tunnel/session.go index 58246fdd..a4c7a2fe 100644 --- a/tunnel/session.go +++ b/tunnel/session.go @@ -180,6 +180,8 @@ func (s *session) Announce() error { msg := s.newMessage("announce") // we don't need an error back msg.errChan = nil + // announce to all + msg.broadcast = true // we don't need the link msg.link = "" diff --git a/tunnel/tunnel.go b/tunnel/tunnel.go index 7c9e2afc..14604215 100644 --- a/tunnel/tunnel.go +++ b/tunnel/tunnel.go @@ -9,10 +9,12 @@ import ( ) var ( - // ErrDialTimeout is returned by a call to Dial where the timeout occurs - ErrDialTimeout = errors.New("dial timeout") // DefaultDialTimeout is the dial timeout if none is specified DefaultDialTimeout = time.Second * 5 + // ErrDialTimeout is returned by a call to Dial where the timeout occurs + ErrDialTimeout = errors.New("dial timeout") + // ErrDiscoverChan is returned when we failed to receive the "announce" back from a discovery + ErrDiscoverChan = errors.New("failed to discover channel") ) // Tunnel creates a gre tunnel on top of the go-micro/transport.