Merge pull request #862 from milosgajdos83/tunnel-cleanup
Cleanup of tunnel.Dial(). Clean up network channel processors
This commit is contained in:
		| @@ -296,7 +296,7 @@ func (n *network) acceptNetConn(l tunnel.Listener, recv chan *transport.Message) | |||||||
| } | } | ||||||
|  |  | ||||||
| // processNetChan processes messages received on NetworkChannel | // processNetChan processes messages received on NetworkChannel | ||||||
| func (n *network) processNetChan(client transport.Client, listener tunnel.Listener) { | func (n *network) processNetChan(listener tunnel.Listener) { | ||||||
| 	// receive network message queue | 	// receive network message queue | ||||||
| 	recv := make(chan *transport.Message, 128) | 	recv := make(chan *transport.Message, 128) | ||||||
|  |  | ||||||
| @@ -629,7 +629,7 @@ func (n *network) setRouteMetric(route *router.Route) { | |||||||
| } | } | ||||||
|  |  | ||||||
| // processCtrlChan processes messages received on ControlChannel | // processCtrlChan processes messages received on ControlChannel | ||||||
| func (n *network) processCtrlChan(client transport.Client, listener tunnel.Listener) { | func (n *network) processCtrlChan(listener tunnel.Listener) { | ||||||
| 	// receive control message queue | 	// receive control message queue | ||||||
| 	recv := make(chan *transport.Message, 128) | 	recv := make(chan *transport.Message, 128) | ||||||
|  |  | ||||||
| @@ -739,7 +739,7 @@ func (n *network) processCtrlChan(client transport.Client, listener tunnel.Liste | |||||||
| } | } | ||||||
|  |  | ||||||
| // advertise advertises routes to the network | // advertise advertises routes to the network | ||||||
| func (n *network) advertise(client transport.Client, advertChan <-chan *router.Advert) { | func (n *network) advertise(advertChan <-chan *router.Advert) { | ||||||
| 	hasher := fnv.New64() | 	hasher := fnv.New64() | ||||||
| 	for { | 	for { | ||||||
| 		select { | 		select { | ||||||
| @@ -910,11 +910,11 @@ func (n *network) Connect() error { | |||||||
| 	// prune stale nodes | 	// prune stale nodes | ||||||
| 	go n.prune() | 	go n.prune() | ||||||
| 	// listen to network messages | 	// listen to network messages | ||||||
| 	go n.processNetChan(netClient, netListener) | 	go n.processNetChan(netListener) | ||||||
| 	// advertise service routes | 	// advertise service routes | ||||||
| 	go n.advertise(ctrlClient, advertChan) | 	go n.advertise(advertChan) | ||||||
| 	// accept and process routes | 	// accept and process routes | ||||||
| 	go n.processCtrlChan(ctrlClient, ctrlListener) | 	go n.processCtrlChan(ctrlListener) | ||||||
|  |  | ||||||
| 	n.Lock() | 	n.Lock() | ||||||
| 	n.connected = true | 	n.connected = true | ||||||
|   | |||||||
| @@ -982,28 +982,32 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { | |||||||
| 	var links []string | 	var links []string | ||||||
|  |  | ||||||
| 	// non multicast so we need to find the link | 	// non multicast so we need to find the link | ||||||
| 	t.RLock() | 	if id := options.Link; id != "" { | ||||||
| 	for _, link := range t.links { | 		t.RLock() | ||||||
| 		// use the link specified it its available | 		for _, link := range t.links { | ||||||
| 		if id := options.Link; len(id) > 0 && link.id != id { | 			// use the link specified it its available | ||||||
| 			continue | 			if link.id != id { | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			link.RLock() | ||||||
|  | 			_, ok := link.channels[channel] | ||||||
|  | 			link.RUnlock() | ||||||
|  |  | ||||||
|  | 			// we have at least one channel mapping | ||||||
|  | 			if ok { | ||||||
|  | 				c.discovered = true | ||||||
|  | 				links = append(links, link.id) | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 		t.RUnlock() | ||||||
|  | 		// link not found | ||||||
|  | 		if len(links) == 0 { | ||||||
|  | 			// delete session and return error | ||||||
|  | 			t.delSession(c.channel, c.session) | ||||||
|  | 			return nil, ErrLinkNotFound | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		link.RLock() |  | ||||||
| 		_, ok := link.channels[channel] |  | ||||||
| 		link.RUnlock() |  | ||||||
|  |  | ||||||
| 		// we have at least one channel mapping |  | ||||||
| 		if ok { |  | ||||||
| 			c.discovered = true |  | ||||||
| 			links = append(links, link.id) |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 	t.RUnlock() |  | ||||||
|  |  | ||||||
| 	// link not found |  | ||||||
| 	if len(links) == 0 && len(options.Link) > 0 { |  | ||||||
| 		return nil, ErrLinkNotFound |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// discovered so set the link if not multicast | 	// discovered so set the link if not multicast | ||||||
| @@ -1028,9 +1032,11 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { | |||||||
|  |  | ||||||
| 		select { | 		select { | ||||||
| 		case <-time.After(after()): | 		case <-time.After(after()): | ||||||
|  | 			t.delSession(c.channel, c.session) | ||||||
| 			return nil, ErrDialTimeout | 			return nil, ErrDialTimeout | ||||||
| 		case err := <-c.errChan: | 		case err := <-c.errChan: | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
|  | 				t.delSession(c.channel, c.session) | ||||||
| 				return nil, err | 				return nil, err | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| @@ -1041,7 +1047,7 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { | |||||||
| 		dialTimeout := after() | 		dialTimeout := after() | ||||||
|  |  | ||||||
| 		// set a shorter delay for multicast | 		// set a shorter delay for multicast | ||||||
| 		if c.mode > Unicast { | 		if c.mode != Unicast { | ||||||
| 			// shorten this | 			// shorten this | ||||||
| 			dialTimeout = time.Millisecond * 500 | 			dialTimeout = time.Millisecond * 500 | ||||||
| 		} | 		} | ||||||
| @@ -1057,7 +1063,7 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { | |||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		// if its multicast just go ahead because this is best effort | 		// if its multicast just go ahead because this is best effort | ||||||
| 		if c.mode > Unicast { | 		if c.mode != Unicast { | ||||||
| 			c.discovered = true | 			c.discovered = true | ||||||
| 			c.accepted = true | 			c.accepted = true | ||||||
| 			return c, nil | 			return c, nil | ||||||
| @@ -1065,6 +1071,7 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { | |||||||
|  |  | ||||||
| 		// otherwise return an error | 		// otherwise return an error | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
|  | 			t.delSession(c.channel, c.session) | ||||||
| 			return nil, err | 			return nil, err | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user