Optimise the multicast to use one session in the tunnel

This commit is contained in:
Asim Aslam 2019-12-05 23:11:42 +00:00
parent ac9001ac74
commit 219efd27e9
4 changed files with 137 additions and 121 deletions

View File

@ -127,7 +127,6 @@ func (t *tun) newSession(channel, sessionId string) (*session, bool) {
closed: make(chan bool), closed: make(chan bool),
recv: make(chan *message, 128), recv: make(chan *message, 128),
send: t.send, send: t.send,
wait: make(chan bool),
errChan: make(chan error, 1), errChan: make(chan error, 1),
} }
@ -470,6 +469,9 @@ func (t *tun) listen(link *link) {
return return
} }
// this state machine block handles the only message types
// that we know or care about; connect, close, open, accept,
// discover, announce, session, keepalive
switch mtype { switch mtype {
case "connect": case "connect":
log.Debugf("Tunnel link %s received connect message", link.Remote()) log.Debugf("Tunnel link %s received connect message", link.Remote())
@ -500,9 +502,6 @@ func (t *tun) listen(link *link) {
// nothing more to do // nothing more to do
continue continue
case "close": case "close":
// TODO: handle the close message
// maybe report io.EOF or kill the link
// if there is no channel then we close the link // if there is no channel then we close the link
// as its a signal from the other side to close the connection // as its a signal from the other side to close the connection
if len(channel) == 0 { if len(channel) == 0 {
@ -521,6 +520,8 @@ func (t *tun) listen(link *link) {
// try get the dialing socket // try get the dialing socket
s, exists := t.getSession(channel, sessionId) s, exists := t.getSession(channel, sessionId)
if exists && !loopback { if exists && !loopback {
// only delete the session if its unicast
// otherwise ignore close on the multicast
if s.mode == Unicast { if s.mode == Unicast {
// only delete this if its unicast // only delete this if its unicast
// but not if its a loopback conn // but not if its a loopback conn
@ -541,14 +542,16 @@ func (t *tun) listen(link *link) {
// an accept returned by the listener // an accept returned by the listener
case "accept": case "accept":
s, exists := t.getSession(channel, sessionId) s, exists := t.getSession(channel, sessionId)
// we don't need this // just set accepted on anything not unicast
if exists && s.mode > Unicast { if exists && s.mode > Unicast {
s.accepted = true s.accepted = true
continue continue
} }
// if its already accepted move on
if exists && s.accepted { if exists && s.accepted {
continue continue
} }
// otherwise we're going to process to accept
// a continued session // a continued session
case "session": case "session":
// process message // process message
@ -562,7 +565,10 @@ func (t *tun) listen(link *link) {
link.setChannel(channels...) link.setChannel(channels...)
// this was an announcement not intended for anything // this was an announcement not intended for anything
if sessionId == "listener" || sessionId == "" { // if the dialing side sent "discover" then a session
// id would be present. We skip in case of multicast.
switch sessionId {
case "listener", "multicast", "":
continue continue
} }
@ -574,14 +580,19 @@ func (t *tun) listen(link *link) {
continue continue
} }
// send the announce back to the caller msg := &message{
s.recv <- &message{
typ: "announce", typ: "announce",
tunnel: id, tunnel: id,
channel: channel, channel: channel,
session: sessionId, session: sessionId,
link: link.id, link: link.id,
} }
// send the announce back to the caller
select {
case <-s.closed:
case s.recv <- msg:
}
} }
continue continue
case "discover": case "discover":
@ -651,22 +662,10 @@ func (t *tun) listen(link *link) {
delete(t.sessions, channel) delete(t.sessions, channel)
continue continue
default: default:
// process // otherwise process
} }
log.Debugf("Tunnel using channel %s session %s", s.channel, s.session) log.Debugf("Tunnel using channel %s session %s type %s", s.channel, s.session, mtype)
// is the session new?
select {
// if its new the session is actually blocked waiting
// for a connection. so we check if its waiting.
case <-s.wait:
// if its waiting e.g its new then we close it
default:
// set remote address of the session
s.remote = msg.Header["Remote"]
close(s.wait)
}
// construct a new transport message // construct a new transport message
tmsg := &transport.Message{ tmsg := &transport.Message{
@ -1052,7 +1051,7 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) {
t.RUnlock() t.RUnlock()
// link not found // link not found and one was specified so error out
if len(links) == 0 && len(options.Link) > 0 { if len(links) == 0 && len(options.Link) > 0 {
// delete session and return error // delete session and return error
t.delSession(c.channel, c.session) t.delSession(c.channel, c.session)
@ -1061,15 +1060,14 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) {
} }
// discovered so set the link if not multicast // discovered so set the link if not multicast
// TODO: pick the link efficiently based
// on link status and saturation.
if c.discovered && c.mode == Unicast { if c.discovered && c.mode == Unicast {
// pickLink will pick the best link // pickLink will pick the best link
link := t.pickLink(links) link := t.pickLink(links)
// set the link
c.link = link.id c.link = link.id
} }
// shit fuck // if its not already discovered we need to attempt to do so
if !c.discovered { if !c.discovered {
// piggy back roundtrip // piggy back roundtrip
nowRTT := time.Now() nowRTT := time.Now()
@ -1098,7 +1096,15 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) {
} }
} }
// a unicast session so we call "open" and wait for an "accept" // return early if its not unicast
// we will not call "open" for multicast
if c.mode != Unicast {
return c, nil
}
// Note: we go no further for multicast or broadcast.
// This is a unicast session so we call "open" and wait
// for an "accept"
// reset now in case we use it // reset now in case we use it
now := time.Now() now := time.Now()
@ -1115,7 +1121,7 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) {
d := time.Since(now) d := time.Since(now)
// if we haven't measured the roundtrip do it now // if we haven't measured the roundtrip do it now
if !measured && c.mode == Unicast { if !measured {
// set the link time // set the link time
t.RLock() t.RLock()
link, ok := t.links[c.link] link, ok := t.links[c.link]
@ -1145,6 +1151,7 @@ func (t *tun) Listen(channel string, opts ...ListenOption) (Listener, error) {
return nil, errors.New("already listening on " + channel) return nil, errors.New("already listening on " + channel)
} }
// delete function removes the session when closed
delFunc := func() { delFunc := func() {
t.delSession(channel, "listener") t.delSession(channel, "listener")
} }

View File

@ -24,7 +24,7 @@ type tunListener struct {
delFunc func() delFunc func()
} }
// periodically announce self // periodically announce self the channel being listened on
func (t *tunListener) announce() { func (t *tunListener) announce() {
tick := time.NewTicker(time.Second * 30) tick := time.NewTicker(time.Second * 30)
defer tick.Stop() defer tick.Stop()
@ -48,9 +48,12 @@ func (t *tunListener) process() {
defer func() { defer func() {
// close the sessions // close the sessions
for _, conn := range conns { for id, conn := range conns {
conn.Close() conn.Close()
delete(conns, id)
} }
// unassign
conns = nil
}() }()
for { for {
@ -62,9 +65,22 @@ func (t *tunListener) process() {
return return
// receive a new message // receive a new message
case m := <-t.session.recv: case m := <-t.session.recv:
var sessionId string
// get the session id
switch m.mode {
case Multicast, Broadcast:
// use channel name if multicast/broadcast
sessionId = m.channel
log.Tracef("Tunnel listener using session %s for real session %s", sessionId, m.session)
default:
// use session id if unicast
sessionId = m.session
}
// get a session // get a session
sess, ok := conns[m.session] sess, ok := conns[sessionId]
log.Debugf("Tunnel listener received channel %s session %s exists: %t", m.channel, m.session, ok) log.Debugf("Tunnel listener received channel %s session %s type %s exists: %t", m.channel, sessionId, m.typ, ok)
if !ok { if !ok {
// we only process open and session types // we only process open and session types
switch m.typ { switch m.typ {
@ -80,7 +96,7 @@ func (t *tunListener) process() {
// the channel // the channel
channel: m.channel, channel: m.channel,
// the session id // the session id
session: m.session, session: sessionId,
// tunnel token // tunnel token
token: t.token, token: t.token,
// is loopback conn // is loopback conn
@ -95,14 +111,12 @@ func (t *tunListener) process() {
recv: make(chan *message, 128), recv: make(chan *message, 128),
// use the internal send buffer // use the internal send buffer
send: t.session.send, send: t.session.send,
// wait
wait: make(chan bool),
// error channel // error channel
errChan: make(chan error, 1), errChan: make(chan error, 1),
} }
// save the session // save the session
conns[m.session] = sess conns[sessionId] = sess
select { select {
case <-t.closed: case <-t.closed:
@ -114,17 +128,21 @@ func (t *tunListener) process() {
// an existing session was found // an existing session was found
// received a close message
switch m.typ { switch m.typ {
case "close": case "close":
// received a close message
select { select {
// check if the session is closed
case <-sess.closed: case <-sess.closed:
// no op // no op
delete(conns, m.session) delete(conns, sessionId)
default: default:
if sess.mode == Unicast {
// only close if unicast session
// close and delete session // close and delete session
close(sess.closed) close(sess.closed)
delete(conns, m.session) delete(conns, sessionId)
}
} }
// continue // continue
@ -139,9 +157,9 @@ func (t *tunListener) process() {
// send this to the accept chan // send this to the accept chan
select { select {
case <-sess.closed: case <-sess.closed:
delete(conns, m.session) delete(conns, sessionId)
case sess.recv <- m: case sess.recv <- m:
log.Debugf("Tunnel listener sent to recv chan channel %s session %s", m.channel, m.session) log.Debugf("Tunnel listener sent to recv chan channel %s session %s type %s", m.channel, sessionId, m.typ)
} }
} }
} }

View File

@ -30,8 +30,6 @@ type session struct {
send chan *message send chan *message
// recv chan // recv chan
recv chan *message recv chan *message
// wait until we have a connection
wait chan bool
// if the discovery worked // if the discovery worked
discovered bool discovered bool
// if the session was accepted // if the session was accepted
@ -109,6 +107,29 @@ func (s *session) newMessage(typ string) *message {
} }
} }
func (s *session) sendMsg(msg *message) error {
select {
case <-s.closed:
return io.EOF
case s.send <- msg:
return nil
}
}
func (s *session) wait(msg *message) error {
// wait for an error response
select {
case err := <-msg.errChan:
if err != nil {
return err
}
case <-s.closed:
return io.EOF
}
return nil
}
// waitFor waits for the message type required until the timeout specified // waitFor waits for the message type required until the timeout specified
func (s *session) waitFor(msgType string, timeout time.Duration) (*message, error) { func (s *session) waitFor(msgType string, timeout time.Duration) (*message, error) {
now := time.Now() now := time.Now()
@ -144,20 +165,32 @@ func (s *session) waitFor(msgType string, timeout time.Duration) (*message, erro
} }
} }
// Discover attempts to discover the link for a specific channel // Discover attempts to discover the link for a specific channel.
// This is only used by the tunnel.Dial when first connecting.
func (s *session) Discover() error { func (s *session) Discover() error {
// create a new discovery message for this channel // create a new discovery message for this channel
msg := s.newMessage("discover") msg := s.newMessage("discover")
// broadcast the message to all links
msg.mode = Broadcast msg.mode = Broadcast
// its an outbound connection since we're dialling
msg.outbound = true msg.outbound = true
// don't set the link since we don't know where it is
msg.link = "" msg.link = ""
// send the discovery message // if multicast then set that as session
s.send <- msg if s.mode == Multicast {
msg.session = "multicast"
}
// send discover message
if err := s.sendMsg(msg); err != nil {
return err
}
// set time now // set time now
now := time.Now() now := time.Now()
// after strips down the dial timeout
after := func() time.Duration { after := func() time.Duration {
d := time.Since(now) d := time.Since(now)
// dial timeout minus time since // dial timeout minus time since
@ -168,6 +201,7 @@ func (s *session) Discover() error {
return wait return wait
} }
// the discover message is sent out, now
// wait to hear back about the sent message // wait to hear back about the sent message
select { select {
case <-time.After(after()): case <-time.After(after()):
@ -178,27 +212,16 @@ func (s *session) Discover() error {
} }
} }
var err error // bail early if its not unicast
// we don't need to wait for the announce
// set a new dialTimeout
dialTimeout := after()
// set a shorter delay for multicast
if s.mode != Unicast {
// shorten this
dialTimeout = time.Millisecond * 500
}
// wait for announce
_, err = s.waitFor("announce", dialTimeout)
// if its multicast just go ahead because this is best effort
if s.mode != Unicast { if s.mode != Unicast {
s.discovered = true s.discovered = true
s.accepted = true s.accepted = true
return nil return nil
} }
// wait for announce
_, err := s.waitFor("announce", after())
if err != nil { if err != nil {
return err return err
} }
@ -210,30 +233,22 @@ func (s *session) Discover() error {
} }
// Open will fire the open message for the session. This is called by the dialler. // Open will fire the open message for the session. This is called by the dialler.
// This is to indicate that we want to create a new session.
func (s *session) Open() error { func (s *session) Open() error {
// create a new message // create a new message
msg := s.newMessage("open") msg := s.newMessage("open")
// send open message // send open message
s.send <- msg if err := s.sendMsg(msg); err != nil {
// wait for an error response for send
select {
case err := <-msg.errChan:
if err != nil {
return err return err
} }
case <-s.closed:
return io.EOF // wait for an error response for send
if err := s.wait(msg); err != nil {
return err
} }
// don't wait on multicast/broadcast // now wait for the accept message to be returned
if s.mode == Multicast {
s.accepted = true
return nil
}
// now wait for the accept
msg, err := s.waitFor("accept", s.timeout) msg, err := s.waitFor("accept", s.timeout)
if err != nil { if err != nil {
return err return err
@ -252,32 +267,16 @@ func (s *session) Accept() error {
msg := s.newMessage("accept") msg := s.newMessage("accept")
// send the accept message // send the accept message
select { if err := s.sendMsg(msg); err != nil {
case <-s.closed: return err
return io.EOF
case s.send <- msg:
// no op here
}
// don't wait on multicast/broadcast
if s.mode == Multicast {
return nil
} }
// wait for send response // wait for send response
select { return s.wait(msg)
case err := <-s.errChan:
if err != nil {
return err
}
case <-s.closed:
return io.EOF
}
return nil
} }
// Announce sends an announcement to notify that this session exists. This is primarily used by the listener. // Announce sends an announcement to notify that this session exists.
// This is primarily used by the listener.
func (s *session) Announce() error { func (s *session) Announce() error {
msg := s.newMessage("announce") msg := s.newMessage("announce")
// we don't need an error back // we don't need an error back
@ -287,23 +286,12 @@ func (s *session) Announce() error {
// we don't need the link // we don't need the link
msg.link = "" msg.link = ""
select { // send announce message
case s.send <- msg: return s.sendMsg(msg)
return nil
case <-s.closed:
return io.EOF
}
} }
// Send is used to send a message // Send is used to send a message
func (s *session) Send(m *transport.Message) error { func (s *session) Send(m *transport.Message) error {
select {
case <-s.closed:
return io.EOF
default:
// no op
}
// encrypt the transport message payload // encrypt the transport message payload
body, err := Encrypt(m.Body, s.token+s.channel+s.session) body, err := Encrypt(m.Body, s.token+s.channel+s.session)
if err != nil { if err != nil {
@ -335,21 +323,19 @@ func (s *session) Send(m *transport.Message) error {
msg.data = data msg.data = data
// if multicast don't set the link // if multicast don't set the link
if s.mode == Multicast { if s.mode != Unicast {
msg.link = "" msg.link = ""
} }
log.Tracef("Appending %+v to send backlog", msg) log.Tracef("Appending %+v to send backlog", msg)
// send the actual message // send the actual message
s.send <- msg if err := s.sendMsg(msg); err != nil {
return err
}
// wait for an error response // wait for an error response
select { return s.wait(msg)
case err := <-msg.errChan:
return err
case <-s.closed:
return io.EOF
}
} }
// Recv is used to receive a message // Recv is used to receive a message
@ -413,6 +399,11 @@ func (s *session) Close() error {
default: default:
close(s.closed) close(s.closed)
// don't send close on multicast
if s.mode != Unicast {
return nil
}
// append to backlog // append to backlog
msg := s.newMessage("close") msg := s.newMessage("close")
// no error response on close // no error response on close
@ -421,7 +412,7 @@ func (s *session) Close() error {
// send the close message // send the close message
select { select {
case s.send <- msg: case s.send <- msg:
default: case <-time.After(time.Millisecond * 10):
} }
} }