Merge pull request #670 from milosgajdos83/loopback-msg-fix
Fixing the tunnel loopback messaging
This commit is contained in:
		| @@ -42,6 +42,7 @@ type tun struct { | |||||||
| type link struct { | type link struct { | ||||||
| 	transport.Socket | 	transport.Socket | ||||||
| 	id       string | 	id       string | ||||||
|  | 	loopback bool | ||||||
| } | } | ||||||
|  |  | ||||||
| // create new tunnel on top of a link | // create new tunnel on top of a link | ||||||
| @@ -148,8 +149,11 @@ func (t *tun) process() { | |||||||
| 				log.Debugf("Zero links to send to") | 				log.Debugf("Zero links to send to") | ||||||
| 			} | 			} | ||||||
| 			for _, link := range t.links { | 			for _, link := range t.links { | ||||||
| 				log.Debugf("Sending %+v to %s", newMsg, link.Remote()) |  | ||||||
| 				// TODO: error check and reconnect | 				// TODO: error check and reconnect | ||||||
|  | 				log.Debugf("Sending %+v to %s", newMsg, link.Remote()) | ||||||
|  | 				if link.loopback && msg.outbound { | ||||||
|  | 					continue | ||||||
|  | 				} | ||||||
| 				link.Send(newMsg) | 				link.Send(newMsg) | ||||||
| 			} | 			} | ||||||
| 			t.RUnlock() | 			t.RUnlock() | ||||||
| @@ -160,7 +164,7 @@ func (t *tun) process() { | |||||||
| } | } | ||||||
|  |  | ||||||
| // process incoming messages | // process incoming messages | ||||||
| func (t *tun) listen(link transport.Socket) { | func (t *tun) listen(link *link) { | ||||||
| 	// loopback flag | 	// loopback flag | ||||||
| 	var loopback bool | 	var loopback bool | ||||||
|  |  | ||||||
| @@ -184,6 +188,7 @@ func (t *tun) listen(link transport.Socket) { | |||||||
| 			// are we connecting to ourselves? | 			// are we connecting to ourselves? | ||||||
| 			if token == t.token { | 			if token == t.token { | ||||||
| 				loopback = true | 				loopback = true | ||||||
|  | 				link.loopback = true | ||||||
| 			} | 			} | ||||||
| 			continue | 			continue | ||||||
| 		case "close": | 		case "close": | ||||||
| @@ -298,10 +303,11 @@ func (t *tun) connect() error { | |||||||
| 			// save the link | 			// save the link | ||||||
| 			id := uuid.New().String() | 			id := uuid.New().String() | ||||||
| 			t.Lock() | 			t.Lock() | ||||||
| 			t.links[id] = &link{ | 			link := &link{ | ||||||
| 				Socket: sock, | 				Socket: sock, | ||||||
| 				id:     id, | 				id:     id, | ||||||
| 			} | 			} | ||||||
|  | 			t.links[id] = link | ||||||
| 			t.Unlock() | 			t.Unlock() | ||||||
|  |  | ||||||
| 			// delete the link | 			// delete the link | ||||||
| @@ -313,7 +319,7 @@ func (t *tun) connect() error { | |||||||
| 			}() | 			}() | ||||||
|  |  | ||||||
| 			// listen for inbound messages | 			// listen for inbound messages | ||||||
| 			t.listen(sock) | 			t.listen(link) | ||||||
| 		}) | 		}) | ||||||
|  |  | ||||||
| 		t.Lock() | 		t.Lock() | ||||||
| @@ -349,15 +355,16 @@ func (t *tun) connect() error { | |||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		// process incoming messages |  | ||||||
| 		go t.listen(c) |  | ||||||
|  |  | ||||||
| 		// save the link | 		// save the link | ||||||
| 		id := uuid.New().String() | 		id := uuid.New().String() | ||||||
| 		t.links[id] = &link{ | 		link := &link{ | ||||||
| 			Socket: c, | 			Socket: c, | ||||||
| 			id:     id, | 			id:     id, | ||||||
| 		} | 		} | ||||||
|  | 		t.links[id] = link | ||||||
|  |  | ||||||
|  | 		// process incoming messages | ||||||
|  | 		go t.listen(link) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// process outbound messages to be sent | 	// process outbound messages to be sent | ||||||
| @@ -448,6 +455,8 @@ func (t *tun) Dial(addr string) (Conn, error) { | |||||||
| 	c.remote = addr | 	c.remote = addr | ||||||
| 	// set local | 	// set local | ||||||
| 	c.local = "local" | 	c.local = "local" | ||||||
|  | 	// outbound socket | ||||||
|  | 	c.outbound = true | ||||||
|  |  | ||||||
| 	return c, nil | 	return c, nil | ||||||
| } | } | ||||||
|   | |||||||
| @@ -25,6 +25,8 @@ type socket struct { | |||||||
| 	recv chan *message | 	recv chan *message | ||||||
| 	// wait until we have a connection | 	// wait until we have a connection | ||||||
| 	wait chan bool | 	wait chan bool | ||||||
|  | 	// outbound marks the socket as outbound | ||||||
|  | 	outbound bool | ||||||
| } | } | ||||||
|  |  | ||||||
| // message is sent over the send channel | // message is sent over the send channel | ||||||
| @@ -33,6 +35,8 @@ type message struct { | |||||||
| 	id string | 	id string | ||||||
| 	// the session id | 	// the session id | ||||||
| 	session string | 	session string | ||||||
|  | 	// outbound marks the message as outbound | ||||||
|  | 	outbound bool | ||||||
| 	// transport data | 	// transport data | ||||||
| 	data *transport.Message | 	data *transport.Message | ||||||
| } | } | ||||||
| @@ -72,7 +76,12 @@ func (s *socket) Send(m *transport.Message) error { | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// append to backlog | 	// append to backlog | ||||||
| 	msg := &message{id: s.id, session: s.session, data: data} | 	msg := &message{ | ||||||
|  | 		id:       s.id, | ||||||
|  | 		session:  s.session, | ||||||
|  | 		outbound: s.outbound, | ||||||
|  | 		data:     data, | ||||||
|  | 	} | ||||||
| 	log.Debugf("Appending %+v to send backlog", msg) | 	log.Debugf("Appending %+v to send backlog", msg) | ||||||
| 	s.send <- msg | 	s.send <- msg | ||||||
| 	return nil | 	return nil | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user