Add sessions to tunnel
This commit is contained in:
		| @@ -61,6 +61,7 @@ func (t *tun) newSocket(id string) *socket { | |||||||
| 	// new socket | 	// new socket | ||||||
| 	s := &socket{ | 	s := &socket{ | ||||||
| 		id:      id, | 		id:      id, | ||||||
|  | 		session: t.newSession(), | ||||||
| 		closed:  make(chan bool), | 		closed:  make(chan bool), | ||||||
| 		recv:    make(chan *message, 128), | 		recv:    make(chan *message, 128), | ||||||
| 		send:    t.send, | 		send:    t.send, | ||||||
| @@ -75,6 +76,11 @@ func (t *tun) newSocket(id string) *socket { | |||||||
| 	return s | 	return s | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // TODO: use tunnel id as part of the session | ||||||
|  | func (t *tun) newSession() string { | ||||||
|  | 	return uuid.New().String() | ||||||
|  | } | ||||||
|  |  | ||||||
| // process outgoing messages | // process outgoing messages | ||||||
| func (t *tun) process() { | func (t *tun) process() { | ||||||
| 	// manage the send buffer | 	// manage the send buffer | ||||||
| @@ -87,8 +93,11 @@ func (t *tun) process() { | |||||||
| 				Body:   msg.data.Body, | 				Body:   msg.data.Body, | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
| 			// set the stream id on the outgoing message | 			// set the tunnel id on the outgoing message | ||||||
| 			nmsg.Header["Micro-Tunnel"] = msg.id | 			nmsg.Header["Micro-Tunnel-Id"] = msg.id | ||||||
|  |  | ||||||
|  | 			// set the session id | ||||||
|  | 			nmsg.Header["Micro-Tunnel-Session"] = msg.session | ||||||
|  |  | ||||||
| 			// send the message via the interface | 			// send the message via the interface | ||||||
| 			if err := t.link.Send(nmsg); err != nil { | 			if err := t.link.Send(nmsg); err != nil { | ||||||
| @@ -111,8 +120,11 @@ func (t *tun) listen() { | |||||||
| 			return | 			return | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		// a stream id | 		// the tunnel id | ||||||
| 		id := msg.Header["Micro-Tunnel"] | 		id := msg.Header["Micro-Tunnel-Id"] | ||||||
|  |  | ||||||
|  | 		// the session id | ||||||
|  | 		session := msg.Header["Micro-Tunnel-Session"] | ||||||
|  |  | ||||||
| 		// get the socket | 		// get the socket | ||||||
| 		s, exists := t.getSocket(id) | 		s, exists := t.getSocket(id) | ||||||
| @@ -150,7 +162,7 @@ func (t *tun) listen() { | |||||||
|  |  | ||||||
| 		// TODO: don't block on queuing | 		// TODO: don't block on queuing | ||||||
| 		// append to recv backlog | 		// append to recv backlog | ||||||
| 		s.recv <- &message{id: id, data: tmsg} | 		s.recv <- &message{id: id, session: session, data: tmsg} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -209,10 +221,12 @@ func (t *tun) Dial(addr string) (Conn, error) { | |||||||
| 	c.remote = addr | 	c.remote = addr | ||||||
| 	// set local | 	// set local | ||||||
| 	c.local = t.link.Local() | 	c.local = t.link.Local() | ||||||
|  |  | ||||||
| 	return c, nil | 	return c, nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (t *tun) Accept(addr string) (Conn, error) { | // Accept a connection on the address | ||||||
|  | func (t *tun) Listen(addr string) (Listener, error) { | ||||||
| 	// create a new socket by hashing the address | 	// create a new socket by hashing the address | ||||||
| 	c := t.newSocket(addr) | 	c := t.newSocket(addr) | ||||||
| 	// set remote. it will be replaced by the first message received | 	// set remote. it will be replaced by the first message received | ||||||
| @@ -227,6 +241,20 @@ func (t *tun) Accept(addr string) (Conn, error) { | |||||||
| 	case <-c.wait: | 	case <-c.wait: | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// return socket | 	tl := &tunListener{ | ||||||
| 	return c, nil | 		addr: addr, | ||||||
|  | 		// the accept channel | ||||||
|  | 		accept: make(chan *socket, 128), | ||||||
|  | 		// the channel to close | ||||||
|  | 		closed: make(chan bool), | ||||||
|  | 		// the connection | ||||||
|  | 		conn: c, | ||||||
|  | 		// the listener socket | ||||||
|  | 		socket: c, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	go tl.process() | ||||||
|  |  | ||||||
|  | 	// return the listener | ||||||
|  | 	return tl, nil | ||||||
| } | } | ||||||
|   | |||||||
							
								
								
									
										89
									
								
								network/tunnel/listener.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										89
									
								
								network/tunnel/listener.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,89 @@ | |||||||
|  | package tunnel | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"io" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | type tunListener struct { | ||||||
|  | 	// address of the listener | ||||||
|  | 	addr string | ||||||
|  | 	// the accept channel | ||||||
|  | 	accept chan *socket | ||||||
|  | 	// the channel to close | ||||||
|  | 	closed chan bool | ||||||
|  | 	// the connection | ||||||
|  | 	conn Conn | ||||||
|  | 	// the listener socket | ||||||
|  | 	socket *socket | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (t *tunListener) process() { | ||||||
|  | 	// our connection map for session | ||||||
|  | 	conns := make(map[string]*socket) | ||||||
|  |  | ||||||
|  | 	for { | ||||||
|  | 		select { | ||||||
|  | 		case <-t.closed: | ||||||
|  | 			return | ||||||
|  | 		// receive a new message | ||||||
|  | 		case m := <-t.socket.recv: | ||||||
|  | 			// get a socket | ||||||
|  | 			sock, ok := conns[m.session] | ||||||
|  | 			if !ok { | ||||||
|  | 				// create a new socket session | ||||||
|  | 				sock = &socket{ | ||||||
|  | 					// our tunnel id | ||||||
|  | 					id: m.id, | ||||||
|  | 					// the session id | ||||||
|  | 					session: m.session, | ||||||
|  | 					// close chan | ||||||
|  | 					closed: make(chan bool), | ||||||
|  | 					// recv called by the acceptor | ||||||
|  | 					recv: make(chan *message, 128), | ||||||
|  | 					// use the internal send buffer | ||||||
|  | 					send: t.socket.send, | ||||||
|  | 				} | ||||||
|  |  | ||||||
|  | 				// save the socket | ||||||
|  | 				conns[m.session] = sock | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			// send this to the accept chan | ||||||
|  | 			select { | ||||||
|  | 			case <-t.closed: | ||||||
|  | 				return | ||||||
|  | 			case t.accept <- sock: | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (t *tunListener) Addr() string { | ||||||
|  | 	return t.addr | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (t *tunListener) Close() error { | ||||||
|  | 	select { | ||||||
|  | 	case <-t.closed: | ||||||
|  | 		return nil | ||||||
|  | 	default: | ||||||
|  | 		close(t.closed) | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Everytime accept is called we essentially block till we get a new connection | ||||||
|  | func (t *tunListener) Accept() (Conn, error) { | ||||||
|  | 	select { | ||||||
|  | 	// if the socket is closed return | ||||||
|  | 	case <-t.closed: | ||||||
|  | 		return nil, io.EOF | ||||||
|  | 	// wait for a new connection | ||||||
|  | 	case c, ok := <-t.accept: | ||||||
|  | 		if !ok { | ||||||
|  | 			return nil, io.EOF | ||||||
|  | 		} | ||||||
|  | 		return c, nil | ||||||
|  | 	} | ||||||
|  | 	return nil, nil | ||||||
|  | } | ||||||
| @@ -10,6 +10,8 @@ import ( | |||||||
| type socket struct { | type socket struct { | ||||||
| 	// socket id based on Micro-Tunnel | 	// socket id based on Micro-Tunnel | ||||||
| 	id string | 	id string | ||||||
|  | 	// the session id based on Micro.Tunnel-Session | ||||||
|  | 	session string | ||||||
| 	// closed | 	// closed | ||||||
| 	closed chan bool | 	closed chan bool | ||||||
| 	// remote addr | 	// remote addr | ||||||
| @@ -26,8 +28,10 @@ type socket struct { | |||||||
|  |  | ||||||
| // message is sent over the send channel | // message is sent over the send channel | ||||||
| type message struct { | type message struct { | ||||||
| 	// socket id | 	// tunnel id | ||||||
| 	id string | 	id string | ||||||
|  | 	// the session id | ||||||
|  | 	session string | ||||||
| 	// transport data | 	// transport data | ||||||
| 	data *transport.Message | 	data *transport.Message | ||||||
| } | } | ||||||
| @@ -52,7 +56,7 @@ func (s *socket) Send(m *transport.Message) error { | |||||||
| 		// no op | 		// no op | ||||||
| 	} | 	} | ||||||
| 	// append to backlog | 	// append to backlog | ||||||
| 	s.send <- &message{id: s.id, data: m} | 	s.send <- &message{id: s.id, session: s.session, data: m} | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -17,7 +17,13 @@ type Tunnel interface { | |||||||
| 	// Dial an endpoint | 	// Dial an endpoint | ||||||
| 	Dial(addr string) (Conn, error) | 	Dial(addr string) (Conn, error) | ||||||
| 	// Accept connections | 	// Accept connections | ||||||
| 	Accept(addr string) (Conn, error) | 	Listen(addr string) (Listener, error) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type Listener interface { | ||||||
|  | 	Addr() string | ||||||
|  | 	Close() error | ||||||
|  | 	Accept() (Conn, error) | ||||||
| } | } | ||||||
|  |  | ||||||
| type Conn interface { | type Conn interface { | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user