add further comments to tunnel

This commit is contained in:
Asim Aslam 2019-07-10 18:35:10 +01:00
parent 89231f701b
commit 0f16eb2858

View File

@ -13,19 +13,21 @@ import (
// tun represents a network tunnel // tun represents a network tunnel
type tun struct { type tun struct {
// the link on top of which we build a tunnel
link link.Link link link.Link
sync.RWMutex sync.RWMutex
// connect // to indicate if we're connected or not
connected bool connected bool
// the send channel // the send channel for all messages
send chan *message send chan *message
// close channel // close channel
closed chan bool closed chan bool
// sockets // a map of sockets based on Micro-Tunnel-Id
sockets map[string]*socket sockets map[string]*socket
} }
@ -39,6 +41,7 @@ func newTunnel(link link.Link) *tun {
} }
} }
// getSocket returns a socket from the internal socket map
func (t *tun) getSocket(id string) (*socket, bool) { func (t *tun) getSocket(id string) (*socket, bool) {
// get the socket // get the socket
t.RLock() t.RLock()
@ -47,6 +50,7 @@ func (t *tun) getSocket(id string) (*socket, bool) {
return s, ok return s, ok
} }
// newSocket creates a new socket and saves it
func (t *tun) newSocket(id string) *socket { func (t *tun) newSocket(id string) *socket {
// new id if it doesn't exist // new id if it doesn't exist
if len(id) == 0 { if len(id) == 0 {
@ -81,7 +85,7 @@ func (t *tun) newSession() string {
return uuid.New().String() return uuid.New().String()
} }
// process outgoing messages // process outgoing messages sent by all local sockets
func (t *tun) process() { func (t *tun) process() {
// manage the send buffer // manage the send buffer
// all pseudo sockets throw everything down this // all pseudo sockets throw everything down this
@ -129,7 +133,8 @@ func (t *tun) listen() {
// get the socket // get the socket
s, exists := t.getSocket(id) s, exists := t.getSocket(id)
if !exists { if !exists {
// no op // drop it, we don't care about
// messages we don't know about
continue continue
} }
@ -145,24 +150,35 @@ func (t *tun) listen() {
// is the socket new? // is the socket new?
select { select {
// if its new it will block here // if its new the socket is actually blocked waiting
// for a connection. so we check if its waiting.
case <-s.wait: case <-s.wait:
// its not new // if its waiting e.g its new then we close it
default: default:
// its new
// set remote address of the socket // set remote address of the socket
s.remote = msg.Header["Remote"] s.remote = msg.Header["Remote"]
close(s.wait) close(s.wait)
} }
// construct a new transport message
tmsg := &transport.Message{ tmsg := &transport.Message{
Header: msg.Header, Header: msg.Header,
Body: msg.Body, Body: msg.Body,
} }
// TODO: don't block on queuing // construct the internal message
imsg := &message{
id: id,
session: session,
data: tmsg,
}
// append to recv backlog // append to recv backlog
s.recv <- &message{id: id, session: session, data: tmsg} // we don't block if we can't pass it on
select {
case s.recv <- imsg:
default:
}
} }
} }