diff --git a/tunnel/default.go b/tunnel/default.go index bd6af7dd..f03016ed 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -53,6 +53,8 @@ type link struct { // which we define for ourselves id string // whether its a loopback connection + // this flag is used by the transport listener + // which accepts inbound quic connections loopback bool // whether its actually connected // dialled side sets it to connected @@ -183,7 +185,7 @@ func (t *tun) process() { } // set message head - newMsg.Header["Micro-Tunnel"] = "message" + newMsg.Header["Micro-Tunnel"] = msg.typ // set the tunnel id on the outgoing message newMsg.Header["Micro-Tunnel-Id"] = msg.id @@ -196,9 +198,11 @@ func (t *tun) process() { // send the message via the interface t.Lock() + if len(t.links) == 0 { log.Debugf("No links to send to") } + for node, link := range t.links { // if the link is not connected skip it if !link.connected { @@ -206,6 +210,13 @@ func (t *tun) process() { continue } + // if we're picking the link check the id + // this is where we explicitly set the link + // in a message received via the listen method + if len(msg.link) > 0 && link.id != msg.link { + continue + } + // if the link was a loopback accepted connection // and the message is being sent outbound via // a dialled connection don't use this link @@ -219,6 +230,7 @@ func (t *tun) process() { continue } + // send the message via the current link log.Debugf("Sending %+v to %s", newMsg, node) if err := link.Send(newMsg); err != nil { log.Debugf("Tunnel error sending %+v to %s: %v", newMsg, node, err) @@ -226,6 +238,7 @@ func (t *tun) process() { continue } } + t.Unlock() case <-t.closed: return @@ -283,10 +296,11 @@ func (t *tun) listen(link *link) { log.Debugf("Tunnel link %s closing connection", link.Remote()) // TODO: handle the close message // maybe report io.EOF or kill the link - continue + return case "keepalive": log.Debugf("Tunnel link %s received keepalive", link.Remote()) t.Lock() + // save the keepalive link.lastKeepAlive = time.Now() t.Unlock() continue @@ -300,6 +314,7 @@ func (t *tun) listen(link *link) { // if its not connected throw away the link if !link.connected { + log.Debugf("Tunnel link %s not connected", link.id) return } @@ -388,6 +403,7 @@ func (t *tun) listen(link *link) { id: id, session: session, data: tmsg, + link: link.id, loopback: loopback, } @@ -449,13 +465,10 @@ func (t *tun) setupLink(node string) (*link, error) { } // create a new link - link := &link{ - Socket: c, - id: uuid.New().String(), - // we made the outbound connection - // and sent the connect message - connected: true, - } + link := newLink(c) + link.connected = true + // we made the outbound connection + // and sent the connect message // process incoming messages go t.listen(link) @@ -482,10 +495,7 @@ func (t *tun) connect() error { log.Debugf("Tunnel accepted connection from %s", sock.Remote()) // create a new link - link := &link{ - Socket: sock, - id: uuid.New().String(), - } + link := newLink(sock) // listen for inbound messages. // only save the link once connected. @@ -493,8 +503,8 @@ func (t *tun) connect() error { t.listen(link) }) - t.Lock() - defer t.Unlock() + t.RLock() + defer t.RUnlock() // still connected but the tunnel died if err != nil && t.connected { diff --git a/tunnel/link.go b/tunnel/link.go new file mode 100644 index 00000000..6b8f30aa --- /dev/null +++ b/tunnel/link.go @@ -0,0 +1,13 @@ +package tunnel + +import ( + "github.com/google/uuid" + "github.com/micro/go-micro/transport" +) + +func newLink(s transport.Socket) *link { + return &link{ + Socket: s, + id: uuid.New().String(), + } +} diff --git a/tunnel/listener.go b/tunnel/listener.go index b953601b..42aadfd1 100644 --- a/tunnel/listener.go +++ b/tunnel/listener.go @@ -43,6 +43,8 @@ func (t *tunListener) process() { session: m.session, // is loopback conn loopback: m.loopback, + // the link the message was received on + link: m.link, // close chan closed: make(chan bool), // recv called by the acceptor diff --git a/tunnel/socket.go b/tunnel/socket.go index 921d4cf0..789d9555 100644 --- a/tunnel/socket.go +++ b/tunnel/socket.go @@ -29,10 +29,14 @@ type socket struct { outbound bool // lookback marks the socket as a loopback on the inbound loopback bool + // the link on which this message was received + link string } // message is sent over the send channel type message struct { + // type of message + typ string // tunnel id id string // the session id @@ -41,6 +45,8 @@ type message struct { outbound bool // loopback marks the message intended for loopback loopback bool + // the link to send the message on + link string // transport data data *transport.Message } @@ -81,11 +87,15 @@ func (s *socket) Send(m *transport.Message) error { // append to backlog msg := &message{ + typ: "message", id: s.id, session: s.session, outbound: s.outbound, loopback: s.loopback, data: data, + // specify the link on which to send this + // it will be blank for dialled sockets + link: s.link, } log.Debugf("Appending %+v to send backlog", msg) s.send <- msg