Update tunnel to send discovery on connect and multicast messages. Announce as broadcast
This commit is contained in:
		| @@ -94,6 +94,21 @@ func (t *tun) delSession(channel, session string) { | ||||
| 	t.Unlock() | ||||
| } | ||||
|  | ||||
| // listChannels returns a list of listening channels | ||||
| func (t *tun) listChannels() []string { | ||||
| 	t.RLock() | ||||
| 	defer t.RUnlock() | ||||
|  | ||||
| 	var channels []string | ||||
| 	for _, session := range t.sessions { | ||||
| 		if session.session != "listener" { | ||||
| 			continue | ||||
| 		} | ||||
| 		channels = append(channels, session.channel) | ||||
| 	} | ||||
| 	return channels | ||||
| } | ||||
|  | ||||
| // newSession creates a new session and saves it | ||||
| func (t *tun) newSession(channel, sessionId string) (*session, bool) { | ||||
| 	// new session | ||||
| @@ -439,11 +454,20 @@ func (t *tun) listen(link *link) { | ||||
| 			log.Debugf("Received %+v from %s", msg, link.Remote()) | ||||
| 		// an announcement of a channel listener | ||||
| 		case "announce": | ||||
| 			channels := strings.Split(channel, ",") | ||||
|  | ||||
| 			// update mapping in the link | ||||
| 			link.Lock() | ||||
| 			link.channels[channel] = time.Now() | ||||
| 			for _, channel := range channels { | ||||
| 				link.channels[channel] = time.Now() | ||||
| 			} | ||||
| 			link.Unlock() | ||||
|  | ||||
| 			// this was an announcement not intended for anything | ||||
| 			if sessionId == "listener" || sessionId == "" { | ||||
| 				continue | ||||
| 			} | ||||
|  | ||||
| 			// get the session that asked for the discovery | ||||
| 			s, exists := t.getSession(channel, sessionId) | ||||
| 			if exists { | ||||
| @@ -463,22 +487,46 @@ func (t *tun) listen(link *link) { | ||||
| 			} | ||||
| 			continue | ||||
| 		case "discover": | ||||
| 			// looking for existing mapping | ||||
| 			_, exists := t.getSession(channel, "listener") | ||||
| 			if exists { | ||||
| 				log.Debugf("Tunnel sending announce for discovery of channel %s", channel) | ||||
| 				// send back the announcement | ||||
| 				link.Send(&transport.Message{ | ||||
| 					Header: map[string]string{ | ||||
| 						"Micro-Tunnel":         "announce", | ||||
| 						"Micro-Tunnel-Id":      t.id, | ||||
| 						"Micro-Tunnel-Channel": channel, | ||||
| 						"Micro-Tunnel-Session": sessionId, | ||||
| 						"Micro-Tunnel-Link":    link.id, | ||||
| 						"Micro-Tunnel-Token":   t.token, | ||||
| 					}, | ||||
| 				}) | ||||
| 			// create the "announce" response message for a discover request | ||||
| 			msg := &transport.Message{ | ||||
| 				Header: map[string]string{ | ||||
| 					"Micro-Tunnel":         "announce", | ||||
| 					"Micro-Tunnel-Id":      t.id, | ||||
| 					"Micro-Tunnel-Channel": channel, | ||||
| 					"Micro-Tunnel-Session": sessionId, | ||||
| 					"Micro-Tunnel-Link":    link.id, | ||||
| 					"Micro-Tunnel-Token":   t.token, | ||||
| 				}, | ||||
| 			} | ||||
|  | ||||
| 			// if no channel is present we've been asked to discover all channels | ||||
| 			if len(channel) == 0 { | ||||
| 				// get the list of channels | ||||
| 				t.RLock() | ||||
| 				channels := t.listChannels() | ||||
| 				t.RUnlock() | ||||
|  | ||||
| 				// if there are no channels continue | ||||
| 				if len(channels) == 0 { | ||||
| 					continue | ||||
| 				} | ||||
|  | ||||
| 				// create a list of channels as comma separated list | ||||
| 				list := strings.Join(channels, ",") | ||||
| 				// set channels as header | ||||
| 				msg.Header["Micro-Tunnel-Channel"] = list | ||||
| 			} else { | ||||
| 				// otherwise look for a single channel mapping | ||||
| 				// looking for existing mapping as a listener | ||||
| 				_, exists := t.getSession(channel, "listener") | ||||
| 				if !exists { | ||||
| 					continue | ||||
| 				} | ||||
| 				log.Debugf("Tunnel sending announce for discovery of channel %s", channel) | ||||
| 			} | ||||
|  | ||||
| 			// send back the announcement | ||||
| 			link.Send(msg) | ||||
| 			continue | ||||
| 		default: | ||||
| 			// blackhole it | ||||
| @@ -728,6 +776,9 @@ func (t *tun) Connect() error { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	// request a discovery | ||||
| 	t.discover() | ||||
|  | ||||
| 	// set as connected | ||||
| 	t.connected = true | ||||
| 	// create new close channel | ||||
| @@ -736,6 +787,19 @@ func (t *tun) Connect() error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (t *tun) discover() { | ||||
| 	// send a discovery message to all links | ||||
| 	for _, link := range t.links { | ||||
| 		link.Send(&transport.Message{ | ||||
| 			Header: map[string]string{ | ||||
| 				"Micro-Tunnel":       "discover", | ||||
| 				"Micro-Tunnel-Id":    t.id, | ||||
| 				"Micro-Tunnel-Token": t.token, | ||||
| 			}, | ||||
| 		}) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (t *tun) close() error { | ||||
| 	// close all the sessions | ||||
| 	for id, s := range t.sessions { | ||||
| @@ -757,7 +821,8 @@ func (t *tun) close() error { | ||||
| 	} | ||||
|  | ||||
| 	// close the listener | ||||
| 	return t.listener.Close() | ||||
| 	//return t.listener.Close() | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (t *tun) Address() string { | ||||
| @@ -856,7 +921,7 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { | ||||
| 	t.RUnlock() | ||||
|  | ||||
| 	// discovered so set the link if not multicast | ||||
| 	// TODO: pick the link efficiently based  | ||||
| 	// TODO: pick the link efficiently based | ||||
| 	// on link status and saturation. | ||||
| 	if c.discovered && !c.multicast { | ||||
| 		// set the link | ||||
| @@ -866,6 +931,7 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { | ||||
|  | ||||
| 	// shit fuck | ||||
| 	if !c.discovered { | ||||
| 		// create a new discovery message for this channel | ||||
| 		msg := c.newMessage("discover") | ||||
| 		msg.broadcast = true | ||||
| 		msg.outbound = true | ||||
| @@ -874,14 +940,6 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { | ||||
| 		// send the discovery message | ||||
| 		t.send <- msg | ||||
|  | ||||
| 		// don't bother waiting around | ||||
| 		// we're just going to assume things come online | ||||
| 		if c.multicast { | ||||
| 			c.discovered = true | ||||
| 			c.accepted = true | ||||
| 			return c, nil | ||||
| 		} | ||||
|  | ||||
| 		select { | ||||
| 		case <-time.After(after()): | ||||
| 			return nil, ErrDialTimeout | ||||
| @@ -891,17 +949,33 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		var err error | ||||
|  | ||||
| 		// wait for announce | ||||
| 		select { | ||||
| 		case msg := <-c.recv: | ||||
| 			if msg.typ != "announce" { | ||||
| 				return nil, errors.New("failed to discover channel") | ||||
| 				err = errors.New("failed to discover channel") | ||||
| 			} | ||||
| 		case <-time.After(after()): | ||||
| 			return nil, ErrDialTimeout | ||||
| 			err = ErrDialTimeout | ||||
| 		} | ||||
|  | ||||
| 		// don't both sending the error for multicast | ||||
| 		// we're just going to assume things come online | ||||
| 		if err == nil || c.multicast { | ||||
| 			c.discovered = true | ||||
| 			return c, nil | ||||
| 		} | ||||
|  | ||||
| 		// return the error if unicast | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// a unicast session so we call "open" and wait for an "accept" | ||||
|  | ||||
| 	// try to open the session | ||||
| 	err := c.Open() | ||||
| 	if err != nil { | ||||
|   | ||||
| @@ -24,7 +24,7 @@ type tunListener struct { | ||||
|  | ||||
| // periodically announce self | ||||
| func (t *tunListener) announce() { | ||||
| 	tick := time.NewTicker(time.Minute) | ||||
| 	tick := time.NewTicker(time.Second * 30) | ||||
| 	defer tick.Stop() | ||||
|  | ||||
| 	// first announcement | ||||
|   | ||||
| @@ -180,6 +180,8 @@ func (s *session) Announce() error { | ||||
| 	msg := s.newMessage("announce") | ||||
| 	// we don't need an error back | ||||
| 	msg.errChan = nil | ||||
| 	// announce to all | ||||
| 	msg.broadcast = true | ||||
| 	// we don't need the link | ||||
| 	msg.link = "" | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user