Add a discover ticker, announce on connect and refactor
This commit is contained in:
		@@ -13,6 +13,8 @@ import (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	// DiscoverTime sets the time at which we fire discover messages
 | 
			
		||||
	DiscoverTime = 60 * time.Second
 | 
			
		||||
	// KeepAliveTime defines time interval we send keepalive messages to outbound links
 | 
			
		||||
	KeepAliveTime = 30 * time.Second
 | 
			
		||||
	// ReconnectTime defines time interval we periodically attempt to reconnect dead links
 | 
			
		||||
@@ -144,6 +146,52 @@ func (t *tun) newSessionId() string {
 | 
			
		||||
	return uuid.New().String()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (t *tun) announce(channel, session string, link *link) {
 | 
			
		||||
	// create the "announce" response message for a discover request
 | 
			
		||||
	msg := &transport.Message{
 | 
			
		||||
		Header: map[string]string{
 | 
			
		||||
			"Micro-Tunnel":         "announce",
 | 
			
		||||
			"Micro-Tunnel-Id":      t.id,
 | 
			
		||||
			"Micro-Tunnel-Channel": channel,
 | 
			
		||||
			"Micro-Tunnel-Session": session,
 | 
			
		||||
			"Micro-Tunnel-Link":    link.id,
 | 
			
		||||
			"Micro-Tunnel-Token":   t.token,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// if no channel is present we've been asked to discover all channels
 | 
			
		||||
	if len(channel) == 0 {
 | 
			
		||||
		// get the list of channels
 | 
			
		||||
		t.RLock()
 | 
			
		||||
		channels := t.listChannels()
 | 
			
		||||
		t.RUnlock()
 | 
			
		||||
 | 
			
		||||
		// if there are no channels continue
 | 
			
		||||
		if len(channels) == 0 {
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// create a list of channels as comma separated list
 | 
			
		||||
		channel = strings.Join(channels, ",")
 | 
			
		||||
		// set channels as header
 | 
			
		||||
		msg.Header["Micro-Tunnel-Channel"] = channel
 | 
			
		||||
	} else {
 | 
			
		||||
		// otherwise look for a single channel mapping
 | 
			
		||||
		// looking for existing mapping as a listener
 | 
			
		||||
		_, exists := t.getSession(channel, "listener")
 | 
			
		||||
		if !exists {
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	log.Debugf("Tunnel sending announce for discovery of channel(s) %s", channel)
 | 
			
		||||
 | 
			
		||||
	// send back the announcement
 | 
			
		||||
	if err := link.Send(msg); err != nil {
 | 
			
		||||
		log.Debugf("Tunnel failed to send announcement for channel(s) %s message: %v", channel, err)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// monitor monitors outbound links and attempts to reconnect to the failed ones
 | 
			
		||||
func (t *tun) monitor() {
 | 
			
		||||
	reconnect := time.NewTicker(ReconnectTime)
 | 
			
		||||
@@ -398,6 +446,8 @@ func (t *tun) listen(link *link) {
 | 
			
		||||
			t.links[link.Remote()] = link
 | 
			
		||||
			t.Unlock()
 | 
			
		||||
 | 
			
		||||
			// send back a discovery
 | 
			
		||||
			go t.announce("", "", link)
 | 
			
		||||
			// nothing more to do
 | 
			
		||||
			continue
 | 
			
		||||
		case "close":
 | 
			
		||||
@@ -454,6 +504,7 @@ func (t *tun) listen(link *link) {
 | 
			
		||||
			log.Debugf("Received %+v from %s", msg, link.Remote())
 | 
			
		||||
		// an announcement of a channel listener
 | 
			
		||||
		case "announce":
 | 
			
		||||
			// process the announcement
 | 
			
		||||
			channels := strings.Split(channel, ",")
 | 
			
		||||
 | 
			
		||||
			// update mapping in the link
 | 
			
		||||
@@ -487,46 +538,8 @@ func (t *tun) listen(link *link) {
 | 
			
		||||
			}
 | 
			
		||||
			continue
 | 
			
		||||
		case "discover":
 | 
			
		||||
			// create the "announce" response message for a discover request
 | 
			
		||||
			msg := &transport.Message{
 | 
			
		||||
				Header: map[string]string{
 | 
			
		||||
					"Micro-Tunnel":         "announce",
 | 
			
		||||
					"Micro-Tunnel-Id":      t.id,
 | 
			
		||||
					"Micro-Tunnel-Channel": channel,
 | 
			
		||||
					"Micro-Tunnel-Session": sessionId,
 | 
			
		||||
					"Micro-Tunnel-Link":    link.id,
 | 
			
		||||
					"Micro-Tunnel-Token":   t.token,
 | 
			
		||||
				},
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// if no channel is present we've been asked to discover all channels
 | 
			
		||||
			if len(channel) == 0 {
 | 
			
		||||
				// get the list of channels
 | 
			
		||||
				t.RLock()
 | 
			
		||||
				channels := t.listChannels()
 | 
			
		||||
				t.RUnlock()
 | 
			
		||||
 | 
			
		||||
				// if there are no channels continue
 | 
			
		||||
				if len(channels) == 0 {
 | 
			
		||||
					continue
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				// create a list of channels as comma separated list
 | 
			
		||||
				list := strings.Join(channels, ",")
 | 
			
		||||
				// set channels as header
 | 
			
		||||
				msg.Header["Micro-Tunnel-Channel"] = list
 | 
			
		||||
			} else {
 | 
			
		||||
				// otherwise look for a single channel mapping
 | 
			
		||||
				// looking for existing mapping as a listener
 | 
			
		||||
				_, exists := t.getSession(channel, "listener")
 | 
			
		||||
				if !exists {
 | 
			
		||||
					continue
 | 
			
		||||
				}
 | 
			
		||||
				log.Debugf("Tunnel sending announce for discovery of channel %s", channel)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// send back the announcement
 | 
			
		||||
			link.Send(msg)
 | 
			
		||||
			// send back an announcement
 | 
			
		||||
			go t.announce(channel, sessionId, link)
 | 
			
		||||
			continue
 | 
			
		||||
		default:
 | 
			
		||||
			// blackhole it
 | 
			
		||||
@@ -635,6 +648,30 @@ func (t *tun) listen(link *link) {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// discover sends channel discover requests periodically
 | 
			
		||||
func (t *tun) discover(link *link) {
 | 
			
		||||
	tick := time.NewTicker(DiscoverTime)
 | 
			
		||||
	defer tick.Stop()
 | 
			
		||||
 | 
			
		||||
	for {
 | 
			
		||||
		select {
 | 
			
		||||
		case <-tick.C:
 | 
			
		||||
			// send a discovery message to all links
 | 
			
		||||
			if err := link.Send(&transport.Message{
 | 
			
		||||
				Header: map[string]string{
 | 
			
		||||
					"Micro-Tunnel":       "discover",
 | 
			
		||||
					"Micro-Tunnel-Id":    t.id,
 | 
			
		||||
					"Micro-Tunnel-Token": t.token,
 | 
			
		||||
				},
 | 
			
		||||
			}); err != nil {
 | 
			
		||||
				log.Debugf("Tunnel failed to send discover to link %s: %v", link.id, err)
 | 
			
		||||
			}
 | 
			
		||||
		case <-t.closed:
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// keepalive periodically sends keepalive messages to link
 | 
			
		||||
func (t *tun) keepalive(link *link) {
 | 
			
		||||
	keepalive := time.NewTicker(KeepAliveTime)
 | 
			
		||||
@@ -698,6 +735,9 @@ func (t *tun) setupLink(node string) (*link, error) {
 | 
			
		||||
	// start keepalive monitor
 | 
			
		||||
	go t.keepalive(link)
 | 
			
		||||
 | 
			
		||||
	// discover things on the remote side
 | 
			
		||||
	go t.discover(link)
 | 
			
		||||
 | 
			
		||||
	return link, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -776,9 +816,6 @@ func (t *tun) Connect() error {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// request a discovery
 | 
			
		||||
	t.discover()
 | 
			
		||||
 | 
			
		||||
	// set as connected
 | 
			
		||||
	t.connected = true
 | 
			
		||||
	// create new close channel
 | 
			
		||||
@@ -787,19 +824,6 @@ func (t *tun) Connect() error {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (t *tun) discover() {
 | 
			
		||||
	// send a discovery message to all links
 | 
			
		||||
	for _, link := range t.links {
 | 
			
		||||
		link.Send(&transport.Message{
 | 
			
		||||
			Header: map[string]string{
 | 
			
		||||
				"Micro-Tunnel":       "discover",
 | 
			
		||||
				"Micro-Tunnel-Id":    t.id,
 | 
			
		||||
				"Micro-Tunnel-Token": t.token,
 | 
			
		||||
			},
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (t *tun) close() error {
 | 
			
		||||
	// close all the sessions
 | 
			
		||||
	for id, s := range t.sessions {
 | 
			
		||||
@@ -964,7 +988,7 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) {
 | 
			
		||||
		select {
 | 
			
		||||
		case msg := <-c.recv:
 | 
			
		||||
			if msg.typ != "announce" {
 | 
			
		||||
				err = errors.New("failed to discover channel")
 | 
			
		||||
				err = ErrDiscoverChan
 | 
			
		||||
			}
 | 
			
		||||
		case <-time.After(dialTimeout):
 | 
			
		||||
			err = ErrDialTimeout
 | 
			
		||||
 
 | 
			
		||||
@@ -9,10 +9,12 @@ import (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	// ErrDialTimeout is returned by a call to Dial where the timeout occurs
 | 
			
		||||
	ErrDialTimeout = errors.New("dial timeout")
 | 
			
		||||
	// DefaultDialTimeout is the dial timeout if none is specified
 | 
			
		||||
	DefaultDialTimeout = time.Second * 5
 | 
			
		||||
	// ErrDialTimeout is returned by a call to Dial where the timeout occurs
 | 
			
		||||
	ErrDialTimeout = errors.New("dial timeout")
 | 
			
		||||
	// ErrDiscoverChan is returned when we failed to receive the "announce" back from a discovery
 | 
			
		||||
	ErrDiscoverChan = errors.New("failed to discover channel")
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Tunnel creates a gre tunnel on top of the go-micro/transport.
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user