diff --git a/tunnel/default.go b/tunnel/default.go index 1170a00d..7d265544 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -13,6 +13,8 @@ import ( ) var ( + // DiscoverTime sets the time at which we fire discover messages + DiscoverTime = 60 * time.Second // KeepAliveTime defines time interval we send keepalive messages to outbound links KeepAliveTime = 30 * time.Second // ReconnectTime defines time interval we periodically attempt to reconnect dead links @@ -144,6 +146,52 @@ func (t *tun) newSessionId() string { return uuid.New().String() } +func (t *tun) announce(channel, session string, link *link) { + // create the "announce" response message for a discover request + msg := &transport.Message{ + Header: map[string]string{ + "Micro-Tunnel": "announce", + "Micro-Tunnel-Id": t.id, + "Micro-Tunnel-Channel": channel, + "Micro-Tunnel-Session": session, + "Micro-Tunnel-Link": link.id, + "Micro-Tunnel-Token": t.token, + }, + } + + // if no channel is present we've been asked to discover all channels + if len(channel) == 0 { + // get the list of channels + t.RLock() + channels := t.listChannels() + t.RUnlock() + + // if there are no channels continue + if len(channels) == 0 { + return + } + + // create a list of channels as comma separated list + channel = strings.Join(channels, ",") + // set channels as header + msg.Header["Micro-Tunnel-Channel"] = channel + } else { + // otherwise look for a single channel mapping + // looking for existing mapping as a listener + _, exists := t.getSession(channel, "listener") + if !exists { + return + } + } + + log.Debugf("Tunnel sending announce for discovery of channel(s) %s", channel) + + // send back the announcement + if err := link.Send(msg); err != nil { + log.Debugf("Tunnel failed to send announcement for channel(s) %s message: %v", channel, err) + } +} + // monitor monitors outbound links and attempts to reconnect to the failed ones func (t *tun) monitor() { reconnect := time.NewTicker(ReconnectTime) @@ -398,6 +446,8 @@ func (t *tun) listen(link *link) { t.links[link.Remote()] = link t.Unlock() + // send back a discovery + go t.announce("", "", link) // nothing more to do continue case "close": @@ -454,6 +504,7 @@ func (t *tun) listen(link *link) { log.Debugf("Received %+v from %s", msg, link.Remote()) // an announcement of a channel listener case "announce": + // process the announcement channels := strings.Split(channel, ",") // update mapping in the link @@ -487,46 +538,8 @@ func (t *tun) listen(link *link) { } continue case "discover": - // create the "announce" response message for a discover request - msg := &transport.Message{ - Header: map[string]string{ - "Micro-Tunnel": "announce", - "Micro-Tunnel-Id": t.id, - "Micro-Tunnel-Channel": channel, - "Micro-Tunnel-Session": sessionId, - "Micro-Tunnel-Link": link.id, - "Micro-Tunnel-Token": t.token, - }, - } - - // if no channel is present we've been asked to discover all channels - if len(channel) == 0 { - // get the list of channels - t.RLock() - channels := t.listChannels() - t.RUnlock() - - // if there are no channels continue - if len(channels) == 0 { - continue - } - - // create a list of channels as comma separated list - list := strings.Join(channels, ",") - // set channels as header - msg.Header["Micro-Tunnel-Channel"] = list - } else { - // otherwise look for a single channel mapping - // looking for existing mapping as a listener - _, exists := t.getSession(channel, "listener") - if !exists { - continue - } - log.Debugf("Tunnel sending announce for discovery of channel %s", channel) - } - - // send back the announcement - link.Send(msg) + // send back an announcement + go t.announce(channel, sessionId, link) continue default: // blackhole it @@ -635,6 +648,30 @@ func (t *tun) listen(link *link) { } } +// discover sends channel discover requests periodically +func (t *tun) discover(link *link) { + tick := time.NewTicker(DiscoverTime) + defer tick.Stop() + + for { + select { + case <-tick.C: + // send a discovery message to all links + if err := link.Send(&transport.Message{ + Header: map[string]string{ + "Micro-Tunnel": "discover", + "Micro-Tunnel-Id": t.id, + "Micro-Tunnel-Token": t.token, + }, + }); err != nil { + log.Debugf("Tunnel failed to send discover to link %s: %v", link.id, err) + } + case <-t.closed: + return + } + } +} + // keepalive periodically sends keepalive messages to link func (t *tun) keepalive(link *link) { keepalive := time.NewTicker(KeepAliveTime) @@ -698,6 +735,9 @@ func (t *tun) setupLink(node string) (*link, error) { // start keepalive monitor go t.keepalive(link) + // discover things on the remote side + go t.discover(link) + return link, nil } @@ -776,9 +816,6 @@ func (t *tun) Connect() error { return err } - // request a discovery - t.discover() - // set as connected t.connected = true // create new close channel @@ -787,19 +824,6 @@ func (t *tun) Connect() error { return nil } -func (t *tun) discover() { - // send a discovery message to all links - for _, link := range t.links { - link.Send(&transport.Message{ - Header: map[string]string{ - "Micro-Tunnel": "discover", - "Micro-Tunnel-Id": t.id, - "Micro-Tunnel-Token": t.token, - }, - }) - } -} - func (t *tun) close() error { // close all the sessions for id, s := range t.sessions { @@ -964,7 +988,7 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { select { case msg := <-c.recv: if msg.typ != "announce" { - err = errors.New("failed to discover channel") + err = ErrDiscoverChan } case <-time.After(dialTimeout): err = ErrDialTimeout diff --git a/tunnel/tunnel.go b/tunnel/tunnel.go index 7c9e2afc..14604215 100644 --- a/tunnel/tunnel.go +++ b/tunnel/tunnel.go @@ -9,10 +9,12 @@ import ( ) var ( - // ErrDialTimeout is returned by a call to Dial where the timeout occurs - ErrDialTimeout = errors.New("dial timeout") // DefaultDialTimeout is the dial timeout if none is specified DefaultDialTimeout = time.Second * 5 + // ErrDialTimeout is returned by a call to Dial where the timeout occurs + ErrDialTimeout = errors.New("dial timeout") + // ErrDiscoverChan is returned when we failed to receive the "announce" back from a discovery + ErrDiscoverChan = errors.New("failed to discover channel") ) // Tunnel creates a gre tunnel on top of the go-micro/transport.