diff --git a/network/tunnel/default.go b/network/tunnel/default.go index e8d32bdd..1067924e 100644 --- a/network/tunnel/default.go +++ b/network/tunnel/default.go @@ -13,19 +13,21 @@ import ( // tun represents a network tunnel type tun struct { + // the link on top of which we build a tunnel link link.Link sync.RWMutex - // connect + // to indicate if we're connected or not connected bool - // the send channel + // the send channel for all messages send chan *message + // close channel closed chan bool - // sockets + // a map of sockets based on Micro-Tunnel-Id sockets map[string]*socket } @@ -39,6 +41,7 @@ func newTunnel(link link.Link) *tun { } } +// getSocket returns a socket from the internal socket map func (t *tun) getSocket(id string) (*socket, bool) { // get the socket t.RLock() @@ -47,6 +50,7 @@ func (t *tun) getSocket(id string) (*socket, bool) { return s, ok } +// newSocket creates a new socket and saves it func (t *tun) newSocket(id string) *socket { // new id if it doesn't exist if len(id) == 0 { @@ -81,7 +85,7 @@ func (t *tun) newSession() string { return uuid.New().String() } -// process outgoing messages +// process outgoing messages sent by all local sockets func (t *tun) process() { // manage the send buffer // all pseudo sockets throw everything down this @@ -129,7 +133,8 @@ func (t *tun) listen() { // get the socket s, exists := t.getSocket(id) if !exists { - // no op + // drop it, we don't care about + // messages we don't know about continue } @@ -145,24 +150,35 @@ func (t *tun) listen() { // is the socket new? select { - // if its new it will block here + // if its new the socket is actually blocked waiting + // for a connection. so we check if its waiting. case <-s.wait: - // its not new + // if its waiting e.g its new then we close it default: - // its new // set remote address of the socket s.remote = msg.Header["Remote"] close(s.wait) } + // construct a new transport message tmsg := &transport.Message{ Header: msg.Header, Body: msg.Body, } - // TODO: don't block on queuing + // construct the internal message + imsg := &message{ + id: id, + session: session, + data: tmsg, + } + // append to recv backlog - s.recv <- &message{id: id, session: session, data: tmsg} + // we don't block if we can't pass it on + select { + case s.recv <- imsg: + default: + } } }