From 6ab86c9e576139c337ed03d5b0b17e85a5e93efa Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Wed, 28 Aug 2019 23:12:22 +0100 Subject: [PATCH 1/2] Don't process unless connected, and only fire loopback messages back up the loopback --- tunnel/default.go | 106 +++++++++++++++++++++++++++++++-------------- tunnel/listener.go | 2 + tunnel/socket.go | 7 ++- 3 files changed, 82 insertions(+), 33 deletions(-) diff --git a/tunnel/default.go b/tunnel/default.go index 69a5d878..bd6af7dd 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -49,8 +49,18 @@ type tun struct { type link struct { transport.Socket - id string - loopback bool + // unique id of this link e.g uuid + // which we define for ourselves + id string + // whether its a loopback connection + loopback bool + // whether its actually connected + // dialled side sets it to connected + // after sending the message. the + // listener waits for the connect + connected bool + // the last time we received a keepalive + // on this link from the remote side lastKeepAlive time.Time } @@ -190,9 +200,25 @@ func (t *tun) process() { log.Debugf("No links to send to") } for node, link := range t.links { + // if the link is not connected skip it + if !link.connected { + log.Debugf("Link for node %s not connected", node) + continue + } + + // if the link was a loopback accepted connection + // and the message is being sent outbound via + // a dialled connection don't use this link if link.loopback && msg.outbound { continue } + + // if the message was being returned by the loopback listener + // send it back up the loopback link only + if msg.loopback && !link.loopback { + continue + } + log.Debugf("Sending %+v to %s", newMsg, node) if err := link.Send(newMsg); err != nil { log.Debugf("Tunnel error sending %+v to %s: %v", newMsg, node, err) @@ -209,15 +235,22 @@ func (t *tun) process() { // process incoming messages func (t *tun) listen(link *link) { + // remove the link on exit + defer func() { + log.Debugf("Tunnel deleting connection from %s", link.Remote()) + t.Lock() + delete(t.links, link.Remote()) + t.Unlock() + }() + + // let us know if its a loopback + var loopback bool + for { // process anything via the net interface msg := new(transport.Message) - err := link.Recv(msg) - if err != nil { + if err := link.Recv(msg); err != nil { log.Debugf("Tunnel link %s receive error: %#v", link.Remote(), err) - t.Lock() - delete(t.links, link.Remote()) - t.Unlock() return } @@ -232,11 +265,18 @@ func (t *tun) listen(link *link) { // are we connecting to ourselves? if token == t.token { - t.Lock() link.loopback = true - t.Unlock() + loopback = true } + // set as connected + link.connected = true + + // save the link once connected + t.Lock() + t.links[link.Remote()] = link + t.Unlock() + // nothing more to do continue case "close": @@ -258,6 +298,11 @@ func (t *tun) listen(link *link) { continue } + // if its not connected throw away the link + if !link.connected { + return + } + // strip message header delete(msg.Header, "Micro-Tunnel") @@ -283,8 +328,10 @@ func (t *tun) listen(link *link) { var s *socket var exists bool + // If its a loopback connection then we've enabled link direction + // listening side is used for listening, the dialling side for dialling switch { - case link.loopback: + case loopback: s, exists = t.getSocket(id, "listener") default: // get the socket based on the tunnel id and session @@ -298,6 +345,7 @@ func (t *tun) listen(link *link) { s, exists = t.getSocket(id, "listener") } } + // bail if no socket has been found if !exists { log.Debugf("Tunnel skipping no socket exists") @@ -337,9 +385,10 @@ func (t *tun) listen(link *link) { // construct the internal message imsg := &message{ - id: id, - session: session, - data: tmsg, + id: id, + session: session, + data: tmsg, + loopback: loopback, } // append to recv backlog @@ -399,13 +448,14 @@ func (t *tun) setupLink(node string) (*link, error) { return nil, err } - // save the link - id := uuid.New().String() + // create a new link link := &link{ Socket: c, - id: id, + id: uuid.New().String(), + // we made the outbound connection + // and sent the connect message + connected: true, } - t.links[node] = link // process incoming messages go t.listen(link) @@ -430,25 +480,16 @@ func (t *tun) connect() error { // accept inbound connections err := l.Accept(func(sock transport.Socket) { log.Debugf("Tunnel accepted connection from %s", sock.Remote()) - // save the link - id := uuid.New().String() - t.Lock() + + // create a new link link := &link{ Socket: sock, - id: id, + id: uuid.New().String(), } - t.links[sock.Remote()] = link - t.Unlock() - // delete the link - defer func() { - log.Debugf("Tunnel deleting connection from %s", sock.Remote()) - t.Lock() - delete(t.links, sock.Remote()) - t.Unlock() - }() - - // listen for inbound messages + // listen for inbound messages. + // only save the link once connected. + // we do this inside liste t.listen(link) }) @@ -473,6 +514,7 @@ func (t *tun) connect() error { log.Debugf("Tunnel failed to establish node link to %s: %v", node, err) continue } + // save the link t.links[node] = link } diff --git a/tunnel/listener.go b/tunnel/listener.go index 3002e7b6..b953601b 100644 --- a/tunnel/listener.go +++ b/tunnel/listener.go @@ -41,6 +41,8 @@ func (t *tunListener) process() { id: m.id, // the session id session: m.session, + // is loopback conn + loopback: m.loopback, // close chan closed: make(chan bool), // recv called by the acceptor diff --git a/tunnel/socket.go b/tunnel/socket.go index 2590a48e..921d4cf0 100644 --- a/tunnel/socket.go +++ b/tunnel/socket.go @@ -25,8 +25,10 @@ type socket struct { recv chan *message // wait until we have a connection wait chan bool - // outbound marks the socket as outbound + // outbound marks the socket as outbound dialled connection outbound bool + // lookback marks the socket as a loopback on the inbound + loopback bool } // message is sent over the send channel @@ -37,6 +39,8 @@ type message struct { session string // outbound marks the message as outbound outbound bool + // loopback marks the message intended for loopback + loopback bool // transport data data *transport.Message } @@ -80,6 +84,7 @@ func (s *socket) Send(m *transport.Message) error { id: s.id, session: s.session, outbound: s.outbound, + loopback: s.loopback, data: data, } log.Debugf("Appending %+v to send backlog", msg) From 00ab58f61b8eac3dc990c5ba39216fe72d4ddeb4 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Thu, 29 Aug 2019 12:42:27 +0100 Subject: [PATCH 2/2] Fix loopback cruft --- tunnel/default.go | 40 +++++++++++++++++++++++++--------------- tunnel/link.go | 13 +++++++++++++ tunnel/listener.go | 2 ++ tunnel/socket.go | 10 ++++++++++ 4 files changed, 50 insertions(+), 15 deletions(-) create mode 100644 tunnel/link.go diff --git a/tunnel/default.go b/tunnel/default.go index bd6af7dd..f03016ed 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -53,6 +53,8 @@ type link struct { // which we define for ourselves id string // whether its a loopback connection + // this flag is used by the transport listener + // which accepts inbound quic connections loopback bool // whether its actually connected // dialled side sets it to connected @@ -183,7 +185,7 @@ func (t *tun) process() { } // set message head - newMsg.Header["Micro-Tunnel"] = "message" + newMsg.Header["Micro-Tunnel"] = msg.typ // set the tunnel id on the outgoing message newMsg.Header["Micro-Tunnel-Id"] = msg.id @@ -196,9 +198,11 @@ func (t *tun) process() { // send the message via the interface t.Lock() + if len(t.links) == 0 { log.Debugf("No links to send to") } + for node, link := range t.links { // if the link is not connected skip it if !link.connected { @@ -206,6 +210,13 @@ func (t *tun) process() { continue } + // if we're picking the link check the id + // this is where we explicitly set the link + // in a message received via the listen method + if len(msg.link) > 0 && link.id != msg.link { + continue + } + // if the link was a loopback accepted connection // and the message is being sent outbound via // a dialled connection don't use this link @@ -219,6 +230,7 @@ func (t *tun) process() { continue } + // send the message via the current link log.Debugf("Sending %+v to %s", newMsg, node) if err := link.Send(newMsg); err != nil { log.Debugf("Tunnel error sending %+v to %s: %v", newMsg, node, err) @@ -226,6 +238,7 @@ func (t *tun) process() { continue } } + t.Unlock() case <-t.closed: return @@ -283,10 +296,11 @@ func (t *tun) listen(link *link) { log.Debugf("Tunnel link %s closing connection", link.Remote()) // TODO: handle the close message // maybe report io.EOF or kill the link - continue + return case "keepalive": log.Debugf("Tunnel link %s received keepalive", link.Remote()) t.Lock() + // save the keepalive link.lastKeepAlive = time.Now() t.Unlock() continue @@ -300,6 +314,7 @@ func (t *tun) listen(link *link) { // if its not connected throw away the link if !link.connected { + log.Debugf("Tunnel link %s not connected", link.id) return } @@ -388,6 +403,7 @@ func (t *tun) listen(link *link) { id: id, session: session, data: tmsg, + link: link.id, loopback: loopback, } @@ -449,13 +465,10 @@ func (t *tun) setupLink(node string) (*link, error) { } // create a new link - link := &link{ - Socket: c, - id: uuid.New().String(), - // we made the outbound connection - // and sent the connect message - connected: true, - } + link := newLink(c) + link.connected = true + // we made the outbound connection + // and sent the connect message // process incoming messages go t.listen(link) @@ -482,10 +495,7 @@ func (t *tun) connect() error { log.Debugf("Tunnel accepted connection from %s", sock.Remote()) // create a new link - link := &link{ - Socket: sock, - id: uuid.New().String(), - } + link := newLink(sock) // listen for inbound messages. // only save the link once connected. @@ -493,8 +503,8 @@ func (t *tun) connect() error { t.listen(link) }) - t.Lock() - defer t.Unlock() + t.RLock() + defer t.RUnlock() // still connected but the tunnel died if err != nil && t.connected { diff --git a/tunnel/link.go b/tunnel/link.go new file mode 100644 index 00000000..6b8f30aa --- /dev/null +++ b/tunnel/link.go @@ -0,0 +1,13 @@ +package tunnel + +import ( + "github.com/google/uuid" + "github.com/micro/go-micro/transport" +) + +func newLink(s transport.Socket) *link { + return &link{ + Socket: s, + id: uuid.New().String(), + } +} diff --git a/tunnel/listener.go b/tunnel/listener.go index b953601b..42aadfd1 100644 --- a/tunnel/listener.go +++ b/tunnel/listener.go @@ -43,6 +43,8 @@ func (t *tunListener) process() { session: m.session, // is loopback conn loopback: m.loopback, + // the link the message was received on + link: m.link, // close chan closed: make(chan bool), // recv called by the acceptor diff --git a/tunnel/socket.go b/tunnel/socket.go index 921d4cf0..789d9555 100644 --- a/tunnel/socket.go +++ b/tunnel/socket.go @@ -29,10 +29,14 @@ type socket struct { outbound bool // lookback marks the socket as a loopback on the inbound loopback bool + // the link on which this message was received + link string } // message is sent over the send channel type message struct { + // type of message + typ string // tunnel id id string // the session id @@ -41,6 +45,8 @@ type message struct { outbound bool // loopback marks the message intended for loopback loopback bool + // the link to send the message on + link string // transport data data *transport.Message } @@ -81,11 +87,15 @@ func (s *socket) Send(m *transport.Message) error { // append to backlog msg := &message{ + typ: "message", id: s.id, session: s.session, outbound: s.outbound, loopback: s.loopback, data: data, + // specify the link on which to send this + // it will be blank for dialled sockets + link: s.link, } log.Debugf("Appending %+v to send backlog", msg) s.send <- msg