From 283c85d2568441a29da825c18fae7538c3cd3f51 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Sun, 8 Dec 2019 00:53:55 +0000 Subject: [PATCH] done --- network/default.go | 10 +++++----- tunnel/default.go | 5 +++-- tunnel/listener.go | 20 ++++++++++++++++---- tunnel/session.go | 6 ++++++ 4 files changed, 30 insertions(+), 11 deletions(-) diff --git a/network/default.go b/network/default.go index 83557592..b037e0aa 100644 --- a/network/default.go +++ b/network/default.go @@ -379,9 +379,8 @@ func (n *network) processNetChan(listener tunnel.Listener) { } // update peer links - log.Tracef("Network updating peer link %s for peer: %s", m.session.Link(), pbNetConnect.Node.Address) - if err := n.updatePeerLinks(pbNetConnect.Node.Address, m.session.Link()); err != nil { + if err := n.updatePeerLinks(pbNetConnect.Node.Address, m); err != nil { log.Debugf("Network failed updating peer links: %s", err) } @@ -443,9 +442,8 @@ func (n *network) processNetChan(listener tunnel.Listener) { } // update peer links - log.Tracef("Network updating peer link %s for peer: %s", m.session.Link(), pbNetPeer.Node.Address) - if err := n.updatePeerLinks(pbNetPeer.Node.Address, m.session.Link()); err != nil { + if err := n.updatePeerLinks(pbNetPeer.Node.Address, m); err != nil { log.Debugf("Network failed updating peer links: %s", err) } @@ -680,10 +678,12 @@ func (n *network) sendMsg(method, channel string, msg proto.Message) error { } // updatePeerLinks updates link for a given peer -func (n *network) updatePeerLinks(peerAddr string, linkId string) error { +func (n *network) updatePeerLinks(peerAddr string, m *message) error { n.Lock() defer n.Unlock() + linkId := m.msg.Header["Micro-Link"] + log.Tracef("Network looking up link %s in the peer links", linkId) // lookup the peer link diff --git a/tunnel/default.go b/tunnel/default.go index 15909654..985912d0 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -266,7 +266,7 @@ func (t *tun) manageLinks() { for _, node := range connect { wg.Add(1) - go func() { + go func(node string) { defer wg.Done() // create new link @@ -280,7 +280,7 @@ func (t *tun) manageLinks() { t.Lock() t.links[node] = link t.Unlock() - }() + }(node) } // wait for all threads to finish @@ -801,6 +801,7 @@ func (t *tun) setupLink(node string) (*link, error) { "Micro-Tunnel-Id": t.id, }, }); err != nil { + link.Close() return nil, err } diff --git a/tunnel/listener.go b/tunnel/listener.go index 2417edd6..6dbec5c8 100644 --- a/tunnel/listener.go +++ b/tunnel/listener.go @@ -65,12 +65,24 @@ func (t *tunListener) process() { return // receive a new message case m := <-t.session.recv: - // session id - sessionId := m.session + var sessionId string + var linkId string + + switch m.mode { + case Multicast: + sessionId = "multicast" + linkId = "multicast" + case Broadcast: + sessionId = "broadcast" + linkId = "broadcast" + default: + sessionId = m.session + linkId = m.link + } // get a session sess, ok := conns[sessionId] - log.Tracef("Tunnel listener received channel %s session %s type %s exists: %t", m.channel, sessionId, m.typ, ok) + log.Tracef("Tunnel listener received channel %s session %s type %s exists: %t", m.channel, m.session, m.typ, ok) if !ok { // we only process open and session types switch m.typ { @@ -92,7 +104,7 @@ func (t *tunListener) process() { // is loopback conn loopback: m.loopback, // the link the message was received on - link: m.link, + link: linkId, // set the connection mode mode: m.mode, // close chan diff --git a/tunnel/session.go b/tunnel/session.go index ebb179df..b43f3e75 100644 --- a/tunnel/session.go +++ b/tunnel/session.go @@ -400,6 +400,12 @@ func (s *session) Recv(m *transport.Message) error { msg.data.Header[k] = string(val) } + // set the link + // TODO: decruft, this is only for multicast + // since the session is now a single session + // likely provide as part of message.Link() + msg.data.Header["Micro-Link"] = msg.link + // set message *m = *msg.data // return nil