Merge pull request #734 from micro/tunnel
Update tunnel to send discovery on connect and multicast messages. An…
This commit is contained in:
		| @@ -13,6 +13,8 @@ import ( | |||||||
| ) | ) | ||||||
|  |  | ||||||
| var ( | var ( | ||||||
|  | 	// DiscoverTime sets the time at which we fire discover messages | ||||||
|  | 	DiscoverTime = 60 * time.Second | ||||||
| 	// KeepAliveTime defines time interval we send keepalive messages to outbound links | 	// KeepAliveTime defines time interval we send keepalive messages to outbound links | ||||||
| 	KeepAliveTime = 30 * time.Second | 	KeepAliveTime = 30 * time.Second | ||||||
| 	// ReconnectTime defines time interval we periodically attempt to reconnect dead links | 	// ReconnectTime defines time interval we periodically attempt to reconnect dead links | ||||||
| @@ -94,6 +96,21 @@ func (t *tun) delSession(channel, session string) { | |||||||
| 	t.Unlock() | 	t.Unlock() | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // listChannels returns a list of listening channels | ||||||
|  | func (t *tun) listChannels() []string { | ||||||
|  | 	t.RLock() | ||||||
|  | 	defer t.RUnlock() | ||||||
|  |  | ||||||
|  | 	var channels []string | ||||||
|  | 	for _, session := range t.sessions { | ||||||
|  | 		if session.session != "listener" { | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 		channels = append(channels, session.channel) | ||||||
|  | 	} | ||||||
|  | 	return channels | ||||||
|  | } | ||||||
|  |  | ||||||
| // newSession creates a new session and saves it | // newSession creates a new session and saves it | ||||||
| func (t *tun) newSession(channel, sessionId string) (*session, bool) { | func (t *tun) newSession(channel, sessionId string) (*session, bool) { | ||||||
| 	// new session | 	// new session | ||||||
| @@ -129,6 +146,52 @@ func (t *tun) newSessionId() string { | |||||||
| 	return uuid.New().String() | 	return uuid.New().String() | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func (t *tun) announce(channel, session string, link *link) { | ||||||
|  | 	// create the "announce" response message for a discover request | ||||||
|  | 	msg := &transport.Message{ | ||||||
|  | 		Header: map[string]string{ | ||||||
|  | 			"Micro-Tunnel":         "announce", | ||||||
|  | 			"Micro-Tunnel-Id":      t.id, | ||||||
|  | 			"Micro-Tunnel-Channel": channel, | ||||||
|  | 			"Micro-Tunnel-Session": session, | ||||||
|  | 			"Micro-Tunnel-Link":    link.id, | ||||||
|  | 			"Micro-Tunnel-Token":   t.token, | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// if no channel is present we've been asked to discover all channels | ||||||
|  | 	if len(channel) == 0 { | ||||||
|  | 		// get the list of channels | ||||||
|  | 		t.RLock() | ||||||
|  | 		channels := t.listChannels() | ||||||
|  | 		t.RUnlock() | ||||||
|  |  | ||||||
|  | 		// if there are no channels continue | ||||||
|  | 		if len(channels) == 0 { | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		// create a list of channels as comma separated list | ||||||
|  | 		channel = strings.Join(channels, ",") | ||||||
|  | 		// set channels as header | ||||||
|  | 		msg.Header["Micro-Tunnel-Channel"] = channel | ||||||
|  | 	} else { | ||||||
|  | 		// otherwise look for a single channel mapping | ||||||
|  | 		// looking for existing mapping as a listener | ||||||
|  | 		_, exists := t.getSession(channel, "listener") | ||||||
|  | 		if !exists { | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	log.Debugf("Tunnel sending announce for discovery of channel(s) %s", channel) | ||||||
|  |  | ||||||
|  | 	// send back the announcement | ||||||
|  | 	if err := link.Send(msg); err != nil { | ||||||
|  | 		log.Debugf("Tunnel failed to send announcement for channel(s) %s message: %v", channel, err) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
| // monitor monitors outbound links and attempts to reconnect to the failed ones | // monitor monitors outbound links and attempts to reconnect to the failed ones | ||||||
| func (t *tun) monitor() { | func (t *tun) monitor() { | ||||||
| 	reconnect := time.NewTicker(ReconnectTime) | 	reconnect := time.NewTicker(ReconnectTime) | ||||||
| @@ -383,6 +446,8 @@ func (t *tun) listen(link *link) { | |||||||
| 			t.links[link.Remote()] = link | 			t.links[link.Remote()] = link | ||||||
| 			t.Unlock() | 			t.Unlock() | ||||||
|  |  | ||||||
|  | 			// send back a discovery | ||||||
|  | 			go t.announce("", "", link) | ||||||
| 			// nothing more to do | 			// nothing more to do | ||||||
| 			continue | 			continue | ||||||
| 		case "close": | 		case "close": | ||||||
| @@ -439,11 +504,21 @@ func (t *tun) listen(link *link) { | |||||||
| 			log.Debugf("Received %+v from %s", msg, link.Remote()) | 			log.Debugf("Received %+v from %s", msg, link.Remote()) | ||||||
| 		// an announcement of a channel listener | 		// an announcement of a channel listener | ||||||
| 		case "announce": | 		case "announce": | ||||||
|  | 			// process the announcement | ||||||
|  | 			channels := strings.Split(channel, ",") | ||||||
|  |  | ||||||
| 			// update mapping in the link | 			// update mapping in the link | ||||||
| 			link.Lock() | 			link.Lock() | ||||||
| 			link.channels[channel] = time.Now() | 			for _, channel := range channels { | ||||||
|  | 				link.channels[channel] = time.Now() | ||||||
|  | 			} | ||||||
| 			link.Unlock() | 			link.Unlock() | ||||||
|  |  | ||||||
|  | 			// this was an announcement not intended for anything | ||||||
|  | 			if sessionId == "listener" || sessionId == "" { | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
|  |  | ||||||
| 			// get the session that asked for the discovery | 			// get the session that asked for the discovery | ||||||
| 			s, exists := t.getSession(channel, sessionId) | 			s, exists := t.getSession(channel, sessionId) | ||||||
| 			if exists { | 			if exists { | ||||||
| @@ -463,22 +538,8 @@ func (t *tun) listen(link *link) { | |||||||
| 			} | 			} | ||||||
| 			continue | 			continue | ||||||
| 		case "discover": | 		case "discover": | ||||||
| 			// looking for existing mapping | 			// send back an announcement | ||||||
| 			_, exists := t.getSession(channel, "listener") | 			go t.announce(channel, sessionId, link) | ||||||
| 			if exists { |  | ||||||
| 				log.Debugf("Tunnel sending announce for discovery of channel %s", channel) |  | ||||||
| 				// send back the announcement |  | ||||||
| 				link.Send(&transport.Message{ |  | ||||||
| 					Header: map[string]string{ |  | ||||||
| 						"Micro-Tunnel":         "announce", |  | ||||||
| 						"Micro-Tunnel-Id":      t.id, |  | ||||||
| 						"Micro-Tunnel-Channel": channel, |  | ||||||
| 						"Micro-Tunnel-Session": sessionId, |  | ||||||
| 						"Micro-Tunnel-Link":    link.id, |  | ||||||
| 						"Micro-Tunnel-Token":   t.token, |  | ||||||
| 					}, |  | ||||||
| 				}) |  | ||||||
| 			} |  | ||||||
| 			continue | 			continue | ||||||
| 		default: | 		default: | ||||||
| 			// blackhole it | 			// blackhole it | ||||||
| @@ -587,6 +648,30 @@ func (t *tun) listen(link *link) { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // discover sends channel discover requests periodically | ||||||
|  | func (t *tun) discover(link *link) { | ||||||
|  | 	tick := time.NewTicker(DiscoverTime) | ||||||
|  | 	defer tick.Stop() | ||||||
|  |  | ||||||
|  | 	for { | ||||||
|  | 		select { | ||||||
|  | 		case <-tick.C: | ||||||
|  | 			// send a discovery message to all links | ||||||
|  | 			if err := link.Send(&transport.Message{ | ||||||
|  | 				Header: map[string]string{ | ||||||
|  | 					"Micro-Tunnel":       "discover", | ||||||
|  | 					"Micro-Tunnel-Id":    t.id, | ||||||
|  | 					"Micro-Tunnel-Token": t.token, | ||||||
|  | 				}, | ||||||
|  | 			}); err != nil { | ||||||
|  | 				log.Debugf("Tunnel failed to send discover to link %s: %v", link.id, err) | ||||||
|  | 			} | ||||||
|  | 		case <-t.closed: | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
| // keepalive periodically sends keepalive messages to link | // keepalive periodically sends keepalive messages to link | ||||||
| func (t *tun) keepalive(link *link) { | func (t *tun) keepalive(link *link) { | ||||||
| 	keepalive := time.NewTicker(KeepAliveTime) | 	keepalive := time.NewTicker(KeepAliveTime) | ||||||
| @@ -650,6 +735,9 @@ func (t *tun) setupLink(node string) (*link, error) { | |||||||
| 	// start keepalive monitor | 	// start keepalive monitor | ||||||
| 	go t.keepalive(link) | 	go t.keepalive(link) | ||||||
|  |  | ||||||
|  | 	// discover things on the remote side | ||||||
|  | 	go t.discover(link) | ||||||
|  |  | ||||||
| 	return link, nil | 	return link, nil | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -757,6 +845,7 @@ func (t *tun) close() error { | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// close the listener | 	// close the listener | ||||||
|  | 	// this appears to be blocking | ||||||
| 	return t.listener.Close() | 	return t.listener.Close() | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -866,6 +955,7 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { | |||||||
|  |  | ||||||
| 	// shit fuck | 	// shit fuck | ||||||
| 	if !c.discovered { | 	if !c.discovered { | ||||||
|  | 		// create a new discovery message for this channel | ||||||
| 		msg := c.newMessage("discover") | 		msg := c.newMessage("discover") | ||||||
| 		msg.broadcast = true | 		msg.broadcast = true | ||||||
| 		msg.outbound = true | 		msg.outbound = true | ||||||
| @@ -874,14 +964,6 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { | |||||||
| 		// send the discovery message | 		// send the discovery message | ||||||
| 		t.send <- msg | 		t.send <- msg | ||||||
|  |  | ||||||
| 		// don't bother waiting around |  | ||||||
| 		// we're just going to assume things come online |  | ||||||
| 		if c.multicast { |  | ||||||
| 			c.discovered = true |  | ||||||
| 			c.accepted = true |  | ||||||
| 			return c, nil |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		select { | 		select { | ||||||
| 		case <-time.After(after()): | 		case <-time.After(after()): | ||||||
| 			return nil, ErrDialTimeout | 			return nil, ErrDialTimeout | ||||||
| @@ -891,17 +973,45 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { | |||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
|  | 		var err error | ||||||
|  |  | ||||||
|  | 		// set a dialTimeout | ||||||
|  | 		dialTimeout := after() | ||||||
|  |  | ||||||
|  | 		// set a shorter delay for multicast | ||||||
|  | 		if c.multicast { | ||||||
|  | 			// shorten this | ||||||
|  | 			dialTimeout = time.Millisecond * 500 | ||||||
|  | 		} | ||||||
|  |  | ||||||
| 		// wait for announce | 		// wait for announce | ||||||
| 		select { | 		select { | ||||||
| 		case msg := <-c.recv: | 		case msg := <-c.recv: | ||||||
| 			if msg.typ != "announce" { | 			if msg.typ != "announce" { | ||||||
| 				return nil, errors.New("failed to discover channel") | 				err = ErrDiscoverChan | ||||||
| 			} | 			} | ||||||
| 		case <-time.After(after()): | 		case <-time.After(dialTimeout): | ||||||
| 			return nil, ErrDialTimeout | 			err = ErrDialTimeout | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
|  | 		// if its multicast just go ahead because this is best effort | ||||||
|  | 		if c.multicast { | ||||||
|  | 			c.discovered = true | ||||||
|  | 			c.accepted = true | ||||||
|  | 			return c, nil | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		// otherwise return an error | ||||||
|  | 		if err != nil { | ||||||
|  | 			return nil, err | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		// set discovered to true | ||||||
|  | 		c.discovered = true | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	// a unicast session so we call "open" and wait for an "accept" | ||||||
|  |  | ||||||
| 	// try to open the session | 	// try to open the session | ||||||
| 	err := c.Open() | 	err := c.Open() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
|   | |||||||
| @@ -24,7 +24,7 @@ type tunListener struct { | |||||||
|  |  | ||||||
| // periodically announce self | // periodically announce self | ||||||
| func (t *tunListener) announce() { | func (t *tunListener) announce() { | ||||||
| 	tick := time.NewTicker(time.Minute) | 	tick := time.NewTicker(time.Second * 30) | ||||||
| 	defer tick.Stop() | 	defer tick.Stop() | ||||||
|  |  | ||||||
| 	// first announcement | 	// first announcement | ||||||
|   | |||||||
| @@ -180,6 +180,8 @@ func (s *session) Announce() error { | |||||||
| 	msg := s.newMessage("announce") | 	msg := s.newMessage("announce") | ||||||
| 	// we don't need an error back | 	// we don't need an error back | ||||||
| 	msg.errChan = nil | 	msg.errChan = nil | ||||||
|  | 	// announce to all | ||||||
|  | 	msg.broadcast = true | ||||||
| 	// we don't need the link | 	// we don't need the link | ||||||
| 	msg.link = "" | 	msg.link = "" | ||||||
|  |  | ||||||
|   | |||||||
| @@ -9,10 +9,12 @@ import ( | |||||||
| ) | ) | ||||||
|  |  | ||||||
| var ( | var ( | ||||||
| 	// ErrDialTimeout is returned by a call to Dial where the timeout occurs |  | ||||||
| 	ErrDialTimeout = errors.New("dial timeout") |  | ||||||
| 	// DefaultDialTimeout is the dial timeout if none is specified | 	// DefaultDialTimeout is the dial timeout if none is specified | ||||||
| 	DefaultDialTimeout = time.Second * 5 | 	DefaultDialTimeout = time.Second * 5 | ||||||
|  | 	// ErrDialTimeout is returned by a call to Dial where the timeout occurs | ||||||
|  | 	ErrDialTimeout = errors.New("dial timeout") | ||||||
|  | 	// ErrDiscoverChan is returned when we failed to receive the "announce" back from a discovery | ||||||
|  | 	ErrDiscoverChan = errors.New("failed to discover channel") | ||||||
| ) | ) | ||||||
|  |  | ||||||
| // Tunnel creates a gre tunnel on top of the go-micro/transport. | // Tunnel creates a gre tunnel on top of the go-micro/transport. | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user