From 1d8c66780e25ecb5b8e5e2bdb013937d71ec1755 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Fri, 6 Dec 2019 00:18:40 +0000 Subject: [PATCH] save working solution --- network/default.go | 82 ++++++++++++++++++++++++++++++++-------------- tunnel/default.go | 6 ++-- tunnel/listener.go | 6 +++- tunnel/session.go | 10 +++--- 4 files changed, 71 insertions(+), 33 deletions(-) diff --git a/network/default.go b/network/default.go index ef859d51..4e5d96d7 100644 --- a/network/default.go +++ b/network/default.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "hash/fnv" + "io" "math" "sync" "time" @@ -70,6 +71,9 @@ type network struct { connected bool // closed closes the network closed chan bool + // whether we've announced the first connect successfully + // and received back some sort of peer message + announced chan bool } // newNetwork returns a new network node @@ -267,10 +271,11 @@ func (n *network) handleNetConn(s tunnel.Session, msg chan *message) { m := new(transport.Message) if err := s.Recv(m); err != nil { log.Debugf("Network tunnel [%s] receive error: %v", NetworkChannel, err) - if sessionErr := s.Close(); sessionErr != nil { - log.Debugf("Network tunnel [%s] closing connection error: %v", NetworkChannel, sessionErr) + if err == io.EOF { + s.Close() + return } - return + continue } select { @@ -452,6 +457,14 @@ func (n *network) processNetChan(listener tunnel.Listener) { if err := n.node.UpdatePeer(peer); err != nil { log.Debugf("Network failed to update peers: %v", err) } + + // check if we announced and were discovered + select { + case <-n.announced: + // we've sent the connect and received this response + default: + close(n.announced) + } case "close": pbNetClose := &pbNet.Close{} if err := proto.Unmarshal(m.msg.Body, pbNetClose); err != nil { @@ -622,10 +635,11 @@ func (n *network) handleCtrlConn(s tunnel.Session, msg chan *message) { m := new(transport.Message) if err := s.Recv(m); err != nil { log.Debugf("Network tunnel [%s] receive error: %v", ControlChannel, err) - if sessionErr := s.Close(); sessionErr != nil { - log.Debugf("Network tunnel [%s] closing connection error: %v", ControlChannel, sessionErr) + if err == io.EOF { + s.Close() + return } - return + continue } select { @@ -930,6 +944,40 @@ func (n *network) sendConnect() { } } +// connect will wait for a link to be established +func (n *network) connect() { + // wait for connected state + var connected bool + + for { + // check the links + for _, link := range n.tunnel.Links() { + if link.State() == "connected" { + connected = true + break + } + } + + // if we're not conencted wait + if !connected { + time.Sleep(time.Second) + continue + } + + // send the connect message + n.sendConnect() + + // check the announce channel + select { + case <-n.announced: + return + default: + time.Sleep(time.Second) + // we have to go again + } + } +} + // Connect connects the network func (n *network) Connect() error { n.Lock() @@ -1000,6 +1048,8 @@ func (n *network) Connect() error { // create closed channel n.closed = make(chan bool) + // create new announced channel + n.announced = make(chan bool) // start the router if err := n.options.Router.Start(); err != nil { @@ -1022,25 +1072,7 @@ func (n *network) Connect() error { n.Unlock() // send connect after there's a link established - go func() { - // wait for 30 ticks e.g 30 seconds - for i := 0; i < 30; i++ { - // get the current links - links := n.tunnel.Links() - - // if there are no links wait until we have one - if len(links) == 0 { - time.Sleep(time.Second) - continue - } - - // send the connect message - n.sendConnect() - // most importantly - break - } - }() - + go n.connect() // go resolving network nodes go n.resolve() // broadcast peers diff --git a/tunnel/default.go b/tunnel/default.go index e64359a6..6b1b9151 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -629,7 +629,7 @@ func (t *tun) listen(link *link) { s, exists = t.getSession(channel, "listener") // only return accept to the session case mtype == "accept": - log.Debugf("Received accept message for %s %s", channel, sessionId) + log.Debugf("Received accept message for channel: %s session: %s", channel, sessionId) s, exists = t.getSession(channel, sessionId) if exists && s.accepted { continue @@ -649,7 +649,7 @@ func (t *tun) listen(link *link) { // bail if no session or listener has been found if !exists { - log.Debugf("Tunnel skipping no session %s %s exists", channel, sessionId) + log.Debugf("Tunnel skipping no channel: %s session: %s exists", channel, sessionId) // drop it, we don't care about // messages we don't know about continue @@ -665,7 +665,7 @@ func (t *tun) listen(link *link) { // otherwise process } - log.Debugf("Tunnel using channel %s session %s type %s", s.channel, s.session, mtype) + log.Debugf("Tunnel using channel: %s session: %s type: %s", s.channel, s.session, mtype) // construct a new transport message tmsg := &transport.Message{ diff --git a/tunnel/listener.go b/tunnel/listener.go index 36d70e96..ec435b03 100644 --- a/tunnel/listener.go +++ b/tunnel/listener.go @@ -71,7 +71,7 @@ func (t *tunListener) process() { switch m.mode { case Multicast, Broadcast: // use channel name if multicast/broadcast - sessionId = m.channel + sessionId = "multicast" log.Tracef("Tunnel listener using session %s for real session %s", sessionId, m.session) default: // use session id if unicast @@ -198,6 +198,10 @@ func (t *tunListener) Accept() (Session, error) { if !ok { return nil, io.EOF } + // return without accept + if c.mode != Unicast { + return c, nil + } // send back the accept if err := c.Accept(); err != nil { return nil, err diff --git a/tunnel/session.go b/tunnel/session.go index 6c4ad69a..3cb37c61 100644 --- a/tunnel/session.go +++ b/tunnel/session.go @@ -2,7 +2,6 @@ package tunnel import ( "encoding/hex" - "errors" "io" "time" @@ -344,7 +343,7 @@ func (s *session) Recv(m *transport.Message) error { select { case <-s.closed: - return errors.New("session is closed") + return io.EOF // recv from backlog case msg = <-s.recv: } @@ -360,7 +359,10 @@ func (s *session) Recv(m *transport.Message) error { log.Debugf("Received %+v from recv backlog", msg) // decrypt the received payload using the token - body, err := Decrypt(msg.data.Body, s.token+s.channel+s.session) + // we have to used msg.session because multicast has a shared + // session id of "multicast" in this session struct on + // the listener side + body, err := Decrypt(msg.data.Body, s.token+s.channel+msg.session) if err != nil { log.Debugf("failed to decrypt message body: %v", err) return err @@ -376,7 +378,7 @@ func (s *session) Recv(m *transport.Message) error { return err } // encrypt the transport message payload - val, err := Decrypt([]byte(h), s.token+s.channel+s.session) + val, err := Decrypt([]byte(h), s.token+s.channel+msg.session) if err != nil { log.Debugf("failed to decrypt message header %s: %v", k, err) return err