diff --git a/network/tunnel/default.go b/network/tunnel/default.go index c9f274f0..e8d32bdd 100644 --- a/network/tunnel/default.go +++ b/network/tunnel/default.go @@ -60,10 +60,11 @@ func (t *tun) newSocket(id string) *socket { // new socket s := &socket{ - id: id, - closed: make(chan bool), - recv: make(chan *message, 128), - send: t.send, + id: id, + session: t.newSession(), + closed: make(chan bool), + recv: make(chan *message, 128), + send: t.send, } // save socket @@ -75,6 +76,11 @@ func (t *tun) newSocket(id string) *socket { return s } +// TODO: use tunnel id as part of the session +func (t *tun) newSession() string { + return uuid.New().String() +} + // process outgoing messages func (t *tun) process() { // manage the send buffer @@ -87,8 +93,11 @@ func (t *tun) process() { Body: msg.data.Body, } - // set the stream id on the outgoing message - nmsg.Header["Micro-Tunnel"] = msg.id + // set the tunnel id on the outgoing message + nmsg.Header["Micro-Tunnel-Id"] = msg.id + + // set the session id + nmsg.Header["Micro-Tunnel-Session"] = msg.session // send the message via the interface if err := t.link.Send(nmsg); err != nil { @@ -111,8 +120,11 @@ func (t *tun) listen() { return } - // a stream id - id := msg.Header["Micro-Tunnel"] + // the tunnel id + id := msg.Header["Micro-Tunnel-Id"] + + // the session id + session := msg.Header["Micro-Tunnel-Session"] // get the socket s, exists := t.getSocket(id) @@ -150,7 +162,7 @@ func (t *tun) listen() { // TODO: don't block on queuing // append to recv backlog - s.recv <- &message{id: id, data: tmsg} + s.recv <- &message{id: id, session: session, data: tmsg} } } @@ -209,10 +221,12 @@ func (t *tun) Dial(addr string) (Conn, error) { c.remote = addr // set local c.local = t.link.Local() + return c, nil } -func (t *tun) Accept(addr string) (Conn, error) { +// Accept a connection on the address +func (t *tun) Listen(addr string) (Listener, error) { // create a new socket by hashing the address c := t.newSocket(addr) // set remote. it will be replaced by the first message received @@ -227,6 +241,20 @@ func (t *tun) Accept(addr string) (Conn, error) { case <-c.wait: } - // return socket - return c, nil + tl := &tunListener{ + addr: addr, + // the accept channel + accept: make(chan *socket, 128), + // the channel to close + closed: make(chan bool), + // the connection + conn: c, + // the listener socket + socket: c, + } + + go tl.process() + + // return the listener + return tl, nil } diff --git a/network/tunnel/listener.go b/network/tunnel/listener.go new file mode 100644 index 00000000..6a8d1ba3 --- /dev/null +++ b/network/tunnel/listener.go @@ -0,0 +1,89 @@ +package tunnel + +import ( + "io" +) + +type tunListener struct { + // address of the listener + addr string + // the accept channel + accept chan *socket + // the channel to close + closed chan bool + // the connection + conn Conn + // the listener socket + socket *socket +} + +func (t *tunListener) process() { + // our connection map for session + conns := make(map[string]*socket) + + for { + select { + case <-t.closed: + return + // receive a new message + case m := <-t.socket.recv: + // get a socket + sock, ok := conns[m.session] + if !ok { + // create a new socket session + sock = &socket{ + // our tunnel id + id: m.id, + // the session id + session: m.session, + // close chan + closed: make(chan bool), + // recv called by the acceptor + recv: make(chan *message, 128), + // use the internal send buffer + send: t.socket.send, + } + + // save the socket + conns[m.session] = sock + } + + // send this to the accept chan + select { + case <-t.closed: + return + case t.accept <- sock: + } + } + } +} + +func (t *tunListener) Addr() string { + return t.addr +} + +func (t *tunListener) Close() error { + select { + case <-t.closed: + return nil + default: + close(t.closed) + } + return nil +} + +// Everytime accept is called we essentially block till we get a new connection +func (t *tunListener) Accept() (Conn, error) { + select { + // if the socket is closed return + case <-t.closed: + return nil, io.EOF + // wait for a new connection + case c, ok := <-t.accept: + if !ok { + return nil, io.EOF + } + return c, nil + } + return nil, nil +} diff --git a/network/tunnel/socket.go b/network/tunnel/socket.go index 861e3f4c..96d16567 100644 --- a/network/tunnel/socket.go +++ b/network/tunnel/socket.go @@ -10,6 +10,8 @@ import ( type socket struct { // socket id based on Micro-Tunnel id string + // the session id based on Micro.Tunnel-Session + session string // closed closed chan bool // remote addr @@ -26,8 +28,10 @@ type socket struct { // message is sent over the send channel type message struct { - // socket id + // tunnel id id string + // the session id + session string // transport data data *transport.Message } @@ -52,7 +56,7 @@ func (s *socket) Send(m *transport.Message) error { // no op } // append to backlog - s.send <- &message{id: s.id, data: m} + s.send <- &message{id: s.id, session: s.session, data: m} return nil } diff --git a/network/tunnel/tunnel.go b/network/tunnel/tunnel.go index 3189e81d..60ad0f25 100644 --- a/network/tunnel/tunnel.go +++ b/network/tunnel/tunnel.go @@ -17,7 +17,13 @@ type Tunnel interface { // Dial an endpoint Dial(addr string) (Conn, error) // Accept connections - Accept(addr string) (Conn, error) + Listen(addr string) (Listener, error) +} + +type Listener interface { + Addr() string + Close() error + Accept() (Conn, error) } type Conn interface {