fix some tunnel bugs like races and duplicate messages...
This commit is contained in:
		| @@ -111,25 +111,26 @@ func (t *tun) process() { | |||||||
| 	for { | 	for { | ||||||
| 		select { | 		select { | ||||||
| 		case msg := <-t.send: | 		case msg := <-t.send: | ||||||
| 			nmsg := &transport.Message{ | 			newMsg := &transport.Message{ | ||||||
| 				Header: msg.data.Header, | 				Header: make(map[string]string), | ||||||
| 				Body:   msg.data.Body, | 				Body:   msg.data.Body, | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
| 			if nmsg.Header == nil { | 			for k, v := range msg.data.Header { | ||||||
| 				nmsg.Header = make(map[string]string) | 				newMsg.Header[k] = v | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
| 			// set the tunnel id on the outgoing message | 			// set the tunnel id on the outgoing message | ||||||
| 			nmsg.Header["Micro-Tunnel-Id"] = msg.id | 			newMsg.Header["Micro-Tunnel-Id"] = msg.id | ||||||
|  |  | ||||||
| 			// set the session id | 			// set the session id | ||||||
| 			nmsg.Header["Micro-Tunnel-Session"] = msg.session | 			newMsg.Header["Micro-Tunnel-Session"] = msg.session | ||||||
|  |  | ||||||
| 			// send the message via the interface | 			// send the message via the interface | ||||||
| 			t.RLock() | 			t.RLock() | ||||||
| 			for _, link := range t.links { | 			for _, link := range t.links { | ||||||
| 				link.Send(nmsg) | 				log.Debugf("Sending %+v to %s", newMsg, link.Remote()) | ||||||
|  | 				link.Send(newMsg) | ||||||
| 			} | 			} | ||||||
| 			t.RUnlock() | 			t.RUnlock() | ||||||
| 		case <-t.closed: | 		case <-t.closed: | ||||||
| @@ -170,6 +171,7 @@ func (t *tun) listen(link transport.Socket, listener bool) { | |||||||
| 		var s *socket | 		var s *socket | ||||||
| 		var exists bool | 		var exists bool | ||||||
|  |  | ||||||
|  | 		log.Debugf("Received %+v from %s", msg, link.Remote()) | ||||||
| 		// if its a local listener then we use that as the session id | 		// if its a local listener then we use that as the session id | ||||||
| 		// e.g we're using a loopback connecting to ourselves | 		// e.g we're using a loopback connecting to ourselves | ||||||
| 		if listener { | 		if listener { | ||||||
| @@ -189,10 +191,12 @@ func (t *tun) listen(link transport.Socket, listener bool) { | |||||||
|  |  | ||||||
| 		// no socket in existence | 		// no socket in existence | ||||||
| 		if !exists { | 		if !exists { | ||||||
|  | 			log.Debugf("Skipping") | ||||||
| 			// drop it, we don't care about | 			// drop it, we don't care about | ||||||
| 			// messages we don't know about | 			// messages we don't know about | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
|  | 		log.Debugf("Using socket %s %s", s.id, s.session) | ||||||
|  |  | ||||||
| 		// is the socket closed? | 		// is the socket closed? | ||||||
| 		select { | 		select { | ||||||
| @@ -398,6 +402,7 @@ func (t *tun) Init(opts ...Option) error { | |||||||
|  |  | ||||||
| // Dial an address | // Dial an address | ||||||
| func (t *tun) Dial(addr string) (Conn, error) { | func (t *tun) Dial(addr string) (Conn, error) { | ||||||
|  | 	log.Debugf("Tunnel dialing %s", addr) | ||||||
| 	c, ok := t.newSocket(addr, t.newSession()) | 	c, ok := t.newSocket(addr, t.newSession()) | ||||||
| 	if !ok { | 	if !ok { | ||||||
| 		return nil, errors.New("error dialing " + addr) | 		return nil, errors.New("error dialing " + addr) | ||||||
| @@ -413,6 +418,7 @@ func (t *tun) Dial(addr string) (Conn, error) { | |||||||
|  |  | ||||||
| // Accept a connection on the address | // Accept a connection on the address | ||||||
| func (t *tun) Listen(addr string) (Listener, error) { | func (t *tun) Listen(addr string) (Listener, error) { | ||||||
|  | 	log.Debugf("Tunnel listening on %s", addr) | ||||||
| 	// create a new socket by hashing the address | 	// create a new socket by hashing the address | ||||||
| 	c, ok := t.newSocket(addr, "listener") | 	c, ok := t.newSocket(addr, "listener") | ||||||
| 	if !ok { | 	if !ok { | ||||||
|   | |||||||
| @@ -48,9 +48,6 @@ func (t *tunListener) process() { | |||||||
| 					wait: make(chan bool), | 					wait: make(chan bool), | ||||||
| 				} | 				} | ||||||
|  |  | ||||||
| 				// first message |  | ||||||
| 				sock.recv <- m |  | ||||||
|  |  | ||||||
| 				// save the socket | 				// save the socket | ||||||
| 				conns[m.session] = sock | 				conns[m.session] = sock | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user