From b9c437fbfeb2e2e425b2287c7f479b8fe5c013eb Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Wed, 4 Sep 2019 09:48:05 +0100 Subject: [PATCH] Tunnel discover/announce/open/session/close --- network/default.go | 4 +- tunnel/default.go | 262 ++++++++++++++++++++++++++++++++++++++++----- tunnel/link.go | 65 ++++++++++- tunnel/listener.go | 96 ++++++++++++++++- tunnel/options.go | 26 +++++ tunnel/session.go | 144 +++++++++++++++++++++++-- tunnel/tunnel.go | 11 +- 7 files changed, 565 insertions(+), 43 deletions(-) diff --git a/network/default.go b/network/default.go index d9d8cbfe..e7b8c781 100644 --- a/network/default.go +++ b/network/default.go @@ -718,7 +718,7 @@ func (n *network) Connect() error { ) // dial into ControlChannel to send route adverts - ctrlClient, err := n.Tunnel.Dial(ControlChannel) + ctrlClient, err := n.Tunnel.Dial(ControlChannel, tunnel.DialMulticast()) if err != nil { return err } @@ -732,7 +732,7 @@ func (n *network) Connect() error { } // dial into NetworkChannel to send network messages - netClient, err := n.Tunnel.Dial(NetworkChannel) + netClient, err := n.Tunnel.Dial(NetworkChannel, tunnel.DialMulticast()) if err != nil { return err } diff --git a/tunnel/default.go b/tunnel/default.go index c7bd52ee..f1a0eaac 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -87,6 +87,12 @@ func (t *tun) getSession(channel, session string) (*session, bool) { return s, ok } +func (t *tun) delSession(channel, session string) { + t.Lock() + delete(t.sessions, channel+session) + t.Unlock() +} + // newSession creates a new session and saves it func (t *tun) newSession(channel, sessionId string) (*session, bool) { // new session @@ -150,7 +156,9 @@ func (t *tun) monitor() { log.Debugf("Tunnel failed to setup node link to %s: %v", node, err) continue } - + // set the link id to the node + // TODO: hash it + link.id = node // save the link t.Lock() t.links[node] = link @@ -169,11 +177,14 @@ func (t *tun) process() { case msg := <-t.send: newMsg := &transport.Message{ Header: make(map[string]string), - Body: msg.data.Body, } - for k, v := range msg.data.Header { - newMsg.Header[k] = v + // set the data + if msg.data != nil { + for k, v := range msg.data.Header { + newMsg.Header[k] = v + } + newMsg.Body = msg.data.Body } // set message head @@ -195,7 +206,7 @@ func (t *tun) process() { t.Lock() if len(t.links) == 0 { - log.Debugf("No links to send to") + log.Debugf("No links to send message type: %s channel: %s", msg.typ, msg.channel) } var sent bool @@ -232,25 +243,55 @@ func (t *tun) process() { continue } + // check the multicast mappings + if msg.multicast { + link.RLock() + _, ok := link.channels[msg.channel] + link.RUnlock() + // channel mapping not found in link + if !ok { + continue + } + } + // send the message via the current link log.Debugf("Sending %+v to %s", newMsg, node) + if errr := link.Send(newMsg); errr != nil { log.Debugf("Tunnel error sending %+v to %s: %v", newMsg, node, errr) err = errors.New(errr.Error()) + // kill the link + link.Close() + // delete the link delete(t.links, node) continue } + // is sent sent = true + + // keep sending broadcast messages + if msg.broadcast || msg.multicast { + continue + } + + // break on unicast + break } t.Unlock() + // set the error if not sent var gerr error if !sent { gerr = err } + // skip if its not been set + if msg.errChan == nil { + continue + } + // return error non blocking select { case msg.errChan <- gerr: @@ -262,14 +303,25 @@ func (t *tun) process() { } } +func (t *tun) delLink(id string) { + t.Lock() + defer t.Unlock() + // get the link + link, ok := t.links[id] + if !ok { + return + } + // close and delete + link.Close() + delete(t.links, id) +} + // process incoming messages func (t *tun) listen(link *link) { // remove the link on exit defer func() { log.Debugf("Tunnel deleting connection from %s", link.Remote()) - t.Lock() - delete(t.links, link.Remote()) - t.Unlock() + t.delLink(link.Remote()) }() // let us know if its a loopback @@ -301,6 +353,13 @@ func (t *tun) listen(link *link) { // the session id sessionId := msg.Header["Micro-Tunnel-Session"] + // if its not connected throw away the link + // the first message we process needs to be connect + if !link.connected && mtype != "connect" { + log.Debugf("Tunnel link %s not connected", link.id) + return + } + switch mtype { case "connect": log.Debugf("Tunnel link %s received connect message", link.Remote()) @@ -311,6 +370,8 @@ func (t *tun) listen(link *link) { loopback = true } + // set to remote node + link.id = id // set as connected link.connected = true @@ -322,10 +383,31 @@ func (t *tun) listen(link *link) { // nothing more to do continue case "close": - log.Debugf("Tunnel link %s closing connection", link.Remote()) // TODO: handle the close message // maybe report io.EOF or kill the link - return + + // close the link entirely + if len(channel) == 0 { + log.Debugf("Tunnel link %s received close message", link.Remote()) + return + } + + // the entire listener was closed so remove it from the mapping + if sessionId == "listener" { + link.Lock() + delete(link.channels, channel) + link.Unlock() + continue + } + + // try get the dialing socket + s, exists := t.getSession(channel, sessionId) + if exists { + // close and continue + s.Close() + continue + } + // otherwise its a session mapping of sorts case "keepalive": log.Debugf("Tunnel link %s received keepalive", link.Remote()) t.Lock() @@ -333,20 +415,64 @@ func (t *tun) listen(link *link) { link.lastKeepAlive = time.Now() t.Unlock() continue + // a new connection dialled outbound + case "open": + // we just let it pass through to be processed + // an accept returned by the listener + case "accept": + + // a continued session case "session": // process message log.Debugf("Received %+v from %s", msg, link.Remote()) + // an announcement of a channel listener + case "announce": + // update mapping in the link + link.Lock() + link.channels[channel] = time.Now() + link.Unlock() + + // get the session that asked for the discovery + s, exists := t.getSession(channel, sessionId) + if exists { + // don't bother it's already discovered + if s.discovered { + continue + } + + // send the announce back to the caller + s.recv <- &message{ + typ: "announce", + tunnel: id, + channel: channel, + session: sessionId, + link: link.id, + } + } + continue + case "discover": + // looking for existing mapping + _, exists := t.getSession(channel, "listener") + if exists { + log.Debugf("Tunnel sending announce for discovery of channel %s", channel) + // send back the announcement + link.Send(&transport.Message{ + Header: map[string]string{ + "Micro-Tunnel": "announce", + "Micro-Tunnel-Id": t.id, + "Micro-Tunnel-Channel": channel, + "Micro-Tunnel-Session": sessionId, + "Micro-Tunnel-Link": link.id, + "Micro-Tunnel-Token": t.token, + }, + }) + } + continue default: // blackhole it continue } - // if its not connected throw away the link - if !link.connected { - log.Debugf("Tunnel link %s not connected", link.id) - return - } - // strip tunnel message header for k, _ := range msg.Header { if strings.HasPrefix(k, "Micro-Tunnel") { @@ -368,8 +494,15 @@ func (t *tun) listen(link *link) { // If its a loopback connection then we've enabled link direction // listening side is used for listening, the dialling side for dialling switch { - case loopback: + case loopback, mtype == "open": s, exists = t.getSession(channel, "listener") + // only return accept to the session + case mtype == "accept": + log.Debugf("Received accept message for %s %s", channel, sessionId) + s, exists = t.getSession(channel, sessionId) + if exists && s.accepted { + continue + } default: // get the session based on the tunnel id and session // this could be something we dialed in which case @@ -383,7 +516,7 @@ func (t *tun) listen(link *link) { } } - // bail if no session has been found + // bail if no session or listener has been found if !exists { log.Debugf("Tunnel skipping no session exists") // drop it, we don't care about @@ -391,8 +524,6 @@ func (t *tun) listen(link *link) { continue } - log.Debugf("Tunnel using session %s %s", s.channel, s.session) - // is the session closed? select { case <-s.closed: @@ -403,6 +534,8 @@ func (t *tun) listen(link *link) { // process } + log.Debugf("Tunnel using channel %s session %s", s.channel, s.session) + // is the session new? select { // if its new the session is actually blocked waiting @@ -462,9 +595,7 @@ func (t *tun) keepalive(link *link) { }, }); err != nil { log.Debugf("Error sending keepalive to link %v: %v", link.Remote(), err) - t.Lock() - delete(t.links, link.Remote()) - t.Unlock() + t.delLink(link.Remote()) return } } @@ -482,6 +613,7 @@ func (t *tun) setupLink(node string) (*link, error) { } log.Debugf("Tunnel connected to %s", node) + // send the first connect message if err := c.Send(&transport.Message{ Header: map[string]string{ "Micro-Tunnel": "connect", @@ -494,9 +626,11 @@ func (t *tun) setupLink(node string) (*link, error) { // create a new link link := newLink(c) - link.connected = true + // set link id to remote side + link.id = c.Remote() // we made the outbound connection // and sent the connect message + link.connected = true // process incoming messages go t.listen(link) @@ -554,7 +688,7 @@ func (t *tun) connect() error { } // save the link - t.links[node] = link + t.links[link.Remote()] = link } // process outbound messages to be sent @@ -628,6 +762,8 @@ func (t *tun) Close() error { return nil } + log.Debug("Tunnel closing") + select { case <-t.closed: return nil @@ -651,7 +787,7 @@ func (t *tun) Close() error { } // Dial an address -func (t *tun) Dial(channel string) (Session, error) { +func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { log.Debugf("Tunnel dialing %s", channel) c, ok := t.newSession(channel, t.newSessionId()) if !ok { @@ -664,18 +800,87 @@ func (t *tun) Dial(channel string) (Session, error) { // outbound session c.outbound = true + // get opts + options := DialOptions{ + Timeout: DefaultDialTimeout, + } + + for _, o := range opts { + o(&options) + } + + // set the multicast option + c.multicast = options.Multicast + // set the dial timeout + c.timeout = options.Timeout + + t.RLock() + for _, link := range t.links { + link.RLock() + _, ok := link.channels[channel] + link.RUnlock() + + // we have at least one channel mapping + if ok { + c.discovered = true + break + } + } + t.RUnlock() + + // shit fuck + if !c.discovered { + t.send <- &message{ + typ: "discover", + tunnel: t.id, + channel: channel, + session: c.session, + broadcast: true, + outbound: true, + errChan: c.errChan, + } + + select { + case err := <-c.errChan: + if err != nil { + return nil, err + } + } + + // wait for announce + select { + case msg := <-c.recv: + if msg.typ != "announce" { + return nil, errors.New("failed to discover channel") + } + } + } + + // try to open the session + err := c.Open() + if err != nil { + // delete the session + t.delSession(c.channel, c.session) + return nil, err + } + return c, nil } // Accept a connection on the address func (t *tun) Listen(channel string) (Listener, error) { log.Debugf("Tunnel listening on %s", channel) + // create a new session by hashing the address c, ok := t.newSession(channel, "listener") if !ok { return nil, errors.New("already listening on " + channel) } + delFunc := func() { + t.delSession(channel, "listener") + } + // set remote. it will be replaced by the first message received c.remote = "remote" // set local @@ -691,6 +896,8 @@ func (t *tun) Listen(channel string) (Listener, error) { tunClosed: t.closed, // the listener session session: c, + // delete session + delFunc: delFunc, } // this kicks off the internal message processor @@ -699,6 +906,9 @@ func (t *tun) Listen(channel string) (Listener, error) { // to the existign sessions go tl.process() + // announces the listener channel to others + go tl.announce() + // return the listener return tl, nil } diff --git a/tunnel/link.go b/tunnel/link.go index fbec2e6a..1e349848 100644 --- a/tunnel/link.go +++ b/tunnel/link.go @@ -9,9 +9,10 @@ import ( ) type link struct { + transport.Socket + sync.RWMutex - transport.Socket // unique id of this link e.g uuid // which we define for ourselves id string @@ -27,11 +28,67 @@ type link struct { // the last time we received a keepalive // on this link from the remote side lastKeepAlive time.Time + // channels keeps a mapping of channels and last seen + channels map[string]time.Time + // stop the link + closed chan bool } func newLink(s transport.Socket) *link { - return &link{ - Socket: s, - id: uuid.New().String(), + l := &link{ + Socket: s, + id: uuid.New().String(), + channels: make(map[string]time.Time), + closed: make(chan bool), + } + go l.run() + return l +} + +func (l *link) run() { + t := time.NewTicker(time.Minute) + defer t.Stop() + + for { + select { + case <-l.closed: + return + case <-t.C: + // drop any channel mappings older than 2 minutes + var kill []string + killTime := time.Minute * 2 + + l.RLock() + for ch, t := range l.channels { + if d := time.Since(t); d > killTime { + kill = append(kill, ch) + } + } + l.RUnlock() + + // if nothing to kill don't both with a wasted lock + if len(kill) == 0 { + continue + } + + // kill the channels! + l.Lock() + for _, ch := range kill { + delete(l.channels, ch) + } + l.Unlock() + } + } +} + +func (l *link) Close() { + l.Lock() + defer l.Unlock() + + select { + case <-l.closed: + return + default: + close(l.closed) } } diff --git a/tunnel/listener.go b/tunnel/listener.go index 4bb8517a..d86e0f80 100644 --- a/tunnel/listener.go +++ b/tunnel/listener.go @@ -2,6 +2,7 @@ package tunnel import ( "io" + "time" "github.com/micro/go-micro/util/log" ) @@ -17,22 +18,77 @@ type tunListener struct { tunClosed chan bool // the listener session session *session + // del func to kill listener + delFunc func() +} + +// periodically announce self +func (t *tunListener) announce() { + tick := time.NewTicker(time.Minute) + defer tick.Stop() + + announce := func() { + msg := &message{ + typ: "announce", + tunnel: t.session.tunnel, + channel: t.session.channel, + session: t.session.session, + outbound: t.session.outbound, + loopback: t.session.loopback, + multicast: t.session.multicast, + } + + select { + case t.session.send <- msg: + case <-t.session.closed: + return + case <-t.closed: + return + } + } + + // first announcement + announce() + + for { + select { + case <-tick.C: + announce() + case <-t.closed: + return + } + } } func (t *tunListener) process() { // our connection map for session conns := make(map[string]*session) + defer func() { + // close the sessions + for _, conn := range conns { + conn.Close() + } + }() + for { select { case <-t.closed: return + case <-t.tunClosed: + t.Close() + return // receive a new message case m := <-t.session.recv: // get a session sess, ok := conns[m.session] log.Debugf("Tunnel listener received channel %s session %s exists: %t", m.channel, m.session, ok) if !ok { + // only create new sessions on open message + if m.typ != "open" { + continue + } + // create a new session session sess = &session{ // the id of the remote side @@ -45,6 +101,8 @@ func (t *tunListener) process() { loopback: m.loopback, // the link the message was received on link: m.link, + // set multicast + multicast: m.multicast, // close chan closed: make(chan bool), // recv called by the acceptor @@ -60,12 +118,39 @@ func (t *tunListener) process() { // save the session conns[m.session] = sess - // send to accept chan select { case <-t.closed: return + // send to accept chan case t.accept <- sess: } + + // continue + continue + } + + // an existing session was found + + // received a close message + switch m.typ { + case "close": + select { + case <-sess.closed: + // no op + delete(conns, m.session) + default: + // close and delete session + close(sess.closed) + delete(conns, m.session) + } + + // continue + continue + case "session": + // operate on this + default: + // non operational type + continue } // send this to the accept chan @@ -89,6 +174,9 @@ func (t *tunListener) Close() error { case <-t.closed: return nil default: + // close and delete + t.delFunc() + t.session.Close() close(t.closed) } return nil @@ -102,13 +190,17 @@ func (t *tunListener) Accept() (Session, error) { return nil, io.EOF case <-t.tunClosed: // close the listener when the tunnel closes - t.Close() return nil, io.EOF // wait for a new connection case c, ok := <-t.accept: + // check if the accept chan is closed if !ok { return nil, io.EOF } + // send back the accept + if err := c.Accept(); err != nil { + return nil, err + } return c, nil } return nil, nil diff --git a/tunnel/options.go b/tunnel/options.go index 9d612173..39795671 100644 --- a/tunnel/options.go +++ b/tunnel/options.go @@ -1,6 +1,8 @@ package tunnel import ( + "time" + "github.com/google/uuid" "github.com/micro/go-micro/transport" "github.com/micro/go-micro/transport/quic" @@ -29,6 +31,15 @@ type Options struct { Transport transport.Transport } +type DialOption func(*DialOptions) + +type DialOptions struct { + // specify a multicast connection + Multicast bool + // the dial timeout + Timeout time.Duration +} + // The tunnel id func Id(id string) Option { return func(o *Options) { @@ -73,3 +84,18 @@ func DefaultOptions() Options { Transport: quic.NewTransport(), } } + +// Dial options + +// Dial multicast sets the multicast option to send only to those mapped +func DialMulticast() DialOption { + return func(o *DialOptions) { + o.Multicast = true + } +} + +func DialTimeout(t time.Duration) DialOption { + return func(o *DialOptions) { + o.Timeout = t + } +} diff --git a/tunnel/session.go b/tunnel/session.go index 41616c7d..464f9bc0 100644 --- a/tunnel/session.go +++ b/tunnel/session.go @@ -3,6 +3,7 @@ package tunnel import ( "errors" "io" + "time" "github.com/micro/go-micro/transport" "github.com/micro/go-micro/util/log" @@ -28,10 +29,20 @@ type session struct { recv chan *message // wait until we have a connection wait chan bool + // if the discovery worked + discovered bool + // if the session was accepted + accepted bool // outbound marks the session as outbound dialled connection outbound bool // lookback marks the session as a loopback on the inbound loopback bool + // if the session is multicast + multicast bool + // if the session is broadcast + broadcast bool + // the timeout + timeout time.Duration // the link on which this message was received link string // the error response @@ -52,6 +63,10 @@ type message struct { outbound bool // loopback marks the message intended for loopback loopback bool + // whether to send as multicast + multicast bool + // broadcast sets the broadcast type + broadcast bool // the link to send the message on link string // transport data @@ -76,10 +91,97 @@ func (s *session) Channel() string { return s.channel } +// Open will fire the open message for the session +func (s *session) Open() error { + msg := &message{ + typ: "open", + tunnel: s.tunnel, + channel: s.channel, + session: s.session, + outbound: s.outbound, + loopback: s.loopback, + multicast: s.multicast, + link: s.link, + errChan: s.errChan, + } + + // send open message + s.send <- msg + + // wait for an error response for send + select { + case err := <-msg.errChan: + if err != nil { + return err + } + case <-s.closed: + return io.EOF + } + + // we don't wait on multicast + if s.multicast { + s.accepted = true + return nil + } + + // now wait for the accept + select { + case msg = <-s.recv: + if msg.typ != "accept" { + log.Debugf("Received non accept message in Open %s", msg.typ) + return errors.New("failed to connect") + } + // set to accepted + s.accepted = true + // set link + s.link = msg.link + case <-time.After(s.timeout): + return ErrDialTimeout + case <-s.closed: + return io.EOF + } + + return nil +} + +func (s *session) Accept() error { + msg := &message{ + typ: "accept", + tunnel: s.tunnel, + channel: s.channel, + session: s.session, + outbound: s.outbound, + loopback: s.loopback, + multicast: s.multicast, + link: s.link, + errChan: s.errChan, + } + + // send the accept message + select { + case <-s.closed: + return io.EOF + case s.send <- msg: + return nil + } + + // wait for send response + select { + case err := <-s.errChan: + if err != nil { + return err + } + case <-s.closed: + return io.EOF + } + + return nil +} + func (s *session) Send(m *transport.Message) error { select { case <-s.closed: - return errors.New("session is closed") + return io.EOF default: // no op } @@ -96,19 +198,26 @@ func (s *session) Send(m *transport.Message) error { // append to backlog msg := &message{ - typ: "session", - tunnel: s.tunnel, - channel: s.channel, - session: s.session, - outbound: s.outbound, - loopback: s.loopback, - data: data, + typ: "session", + tunnel: s.tunnel, + channel: s.channel, + session: s.session, + outbound: s.outbound, + loopback: s.loopback, + multicast: s.multicast, + data: data, // specify the link on which to send this // it will be blank for dialled sessions link: s.link, // error chan errChan: s.errChan, } + + // if not multicast then set link + if !s.multicast { + msg.link = s.link + } + log.Debugf("Appending %+v to send backlog", msg) s.send <- msg @@ -154,6 +263,25 @@ func (s *session) Close() error { // no op default: close(s.closed) + + // append to backlog + msg := &message{ + typ: "close", + tunnel: s.tunnel, + channel: s.channel, + session: s.session, + outbound: s.outbound, + loopback: s.loopback, + multicast: s.multicast, + link: s.link, + } + + // send the close message + select { + case s.send <- msg: + default: + } } + return nil } diff --git a/tunnel/tunnel.go b/tunnel/tunnel.go index 58c0ac27..a4ef7da2 100644 --- a/tunnel/tunnel.go +++ b/tunnel/tunnel.go @@ -2,6 +2,9 @@ package tunnel import ( + "errors" + "time" + "github.com/micro/go-micro/transport" ) @@ -18,7 +21,7 @@ type Tunnel interface { // Close closes the tunnel Close() error // Connect to a channel - Dial(channel string) (Session, error) + Dial(channel string, opts ...DialOption) (Session, error) // Accept connections on a channel Listen(channel string) (Listener, error) // Name of the tunnel implementation @@ -42,6 +45,12 @@ type Session interface { transport.Socket } +var ( + ErrDialTimeout = errors.New("dial timeout") + + DefaultDialTimeout = time.Second * 5 +) + // NewTunnel creates a new tunnel func NewTunnel(opts ...Option) Tunnel { return newTunnel(opts...)