From 6353b2b89499a66624e8f3893390f769ab5fa02f Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Tue, 22 Oct 2019 20:48:51 +0100 Subject: [PATCH] Keep track of peer links --- network/default.go | 96 ++++++++++++++++++++++++++++++++++++---------- network/network.go | 10 +++++ 2 files changed, 86 insertions(+), 20 deletions(-) diff --git a/network/default.go b/network/default.go index 36f80da3..782c0b55 100644 --- a/network/default.go +++ b/network/default.go @@ -37,6 +37,8 @@ var ( var ( // ErrClientNotFound is returned when client for tunnel channel could not be found ErrClientNotFound = errors.New("client not found") + // ErrPeerLinkNotFound is returned when peer link could not be found in tunnel Links + ErrPeerLinkNotFound = errors.New("peer link not found") ) // network implements Network interface @@ -58,6 +60,8 @@ type network struct { // tunClient is a map of tunnel clients keyed over tunnel channel names tunClient map[string]transport.Client + // peerLinks is a map of links for each peer + peerLinks map[string]tunnel.Link sync.RWMutex // connected marks the network as connected @@ -138,6 +142,7 @@ func newNetwork(opts ...Option) Network { server: server, client: client, tunClient: make(map[string]transport.Client), + peerLinks: make(map[string]tunnel.Link), } network.node.network = network @@ -246,19 +251,22 @@ func (n *network) resolve() { } // handleNetConn handles network announcement messages -func (n *network) handleNetConn(sess tunnel.Session, msg chan *transport.Message) { +func (n *network) handleNetConn(s tunnel.Session, msg chan *Message) { for { m := new(transport.Message) - if err := sess.Recv(m); err != nil { + if err := s.Recv(m); err != nil { log.Debugf("Network tunnel [%s] receive error: %v", NetworkChannel, err) - if sessErr := sess.Close(); sessErr != nil { - log.Debugf("Network tunnel [%s] closing connection error: %v", sessErr) + if sessionErr := s.Close(); sessionErr != nil { + log.Debugf("Network tunnel [%s] closing connection error: %v", NetworkChannel, sessionErr) } return } select { - case msg <- m: + case msg <- &Message{ + msg: m, + session: s, + }: case <-n.closed: return } @@ -266,7 +274,7 @@ func (n *network) handleNetConn(sess tunnel.Session, msg chan *transport.Message } // acceptNetConn accepts connections from NetworkChannel -func (n *network) acceptNetConn(l tunnel.Listener, recv chan *transport.Message) { +func (n *network) acceptNetConn(l tunnel.Listener, recv chan *Message) { var i int for { // accept a connection @@ -295,10 +303,40 @@ func (n *network) acceptNetConn(l tunnel.Listener, recv chan *transport.Message) } } +func (n *network) updatePeerLinks(peerAddr string, linkId string) error { + n.Lock() + defer n.Unlock() + log.Debugf("Network looking up link %s in the peer links", linkId) + // lookup the peer link + var peerLink tunnel.Link + for _, link := range n.tunnel.Links() { + if link.Id() == linkId { + peerLink = link + break + } + } + if peerLink == nil { + return ErrPeerLinkNotFound + } + // if the peerLink is found in the returned links update peerLinks + log.Debugf("Network updating peer links for peer %s", peerAddr) + // add peerLink to the peerLinks map + if link, ok := n.peerLinks[peerAddr]; ok { + // if the existing has better Length then the new, replace it + if link.Length() < peerLink.Length() { + n.peerLinks[peerAddr] = peerLink + } + } else { + n.peerLinks[peerAddr] = peerLink + } + + return nil +} + // processNetChan processes messages received on NetworkChannel func (n *network) processNetChan(listener tunnel.Listener) { // receive network message queue - recv := make(chan *transport.Message, 128) + recv := make(chan *Message, 128) // accept NetworkChannel connections go n.acceptNetConn(listener, recv) @@ -307,12 +345,12 @@ func (n *network) processNetChan(listener tunnel.Listener) { select { case m := <-recv: // switch on type of message and take action - switch m.Header["Micro-Method"] { + switch m.msg.Header["Micro-Method"] { case "connect": // mark the time the message has been received now := time.Now() pbNetConnect := &pbNet.Connect{} - if err := proto.Unmarshal(m.Body, pbNetConnect); err != nil { + if err := proto.Unmarshal(m.msg.Body, pbNetConnect); err != nil { log.Debugf("Network tunnel [%s] connect unmarshal error: %v", NetworkChannel, err) continue } @@ -327,6 +365,10 @@ func (n *network) processNetChan(listener tunnel.Listener) { peers: make(map[string]*node), lastSeen: now, } + // update peer links + if err := n.updatePeerLinks(pbNetConnect.Node.Address, m.session.Link()); err != nil { + log.Debugf("Network failed updating peer links: %s", err) + } if err := n.node.AddPeer(peer); err == ErrPeerExists { log.Debugf("Network peer exists, refreshing: %s", peer.id) // update lastSeen time for the existing node @@ -349,7 +391,7 @@ func (n *network) processNetChan(listener tunnel.Listener) { // mark the time the message has been received now := time.Now() pbNetPeer := &pbNet.Peer{} - if err := proto.Unmarshal(m.Body, pbNetPeer); err != nil { + if err := proto.Unmarshal(m.msg.Body, pbNetPeer); err != nil { log.Debugf("Network tunnel [%s] peer unmarshal error: %v", NetworkChannel, err) continue } @@ -364,6 +406,10 @@ func (n *network) processNetChan(listener tunnel.Listener) { peers: make(map[string]*node), lastSeen: now, } + // update peer links + if err := n.updatePeerLinks(pbNetPeer.Node.Address, m.session.Link()); err != nil { + log.Debugf("Network failed updating peer links: %s", err) + } if err := n.node.AddPeer(peer); err == nil { // send a solicit message when discovering new peer msg := &pbRtr.Solicit{ @@ -393,7 +439,7 @@ func (n *network) processNetChan(listener tunnel.Listener) { } case "close": pbNetClose := &pbNet.Close{} - if err := proto.Unmarshal(m.Body, pbNetClose); err != nil { + if err := proto.Unmarshal(m.msg.Body, pbNetClose); err != nil { log.Debugf("Network tunnel [%s] close unmarshal error: %v", NetworkChannel, err) continue } @@ -412,6 +458,10 @@ func (n *network) processNetChan(listener tunnel.Listener) { if err := n.prunePeerRoutes(peer); err != nil { log.Debugf("Network failed pruning peer %s routes: %v", peer.id, err) } + // deelete peer from the peerLinks + n.Lock() + delete(n.peerLinks, pbNetClose.Node.Address) + n.Unlock() } case <-n.closed: return @@ -549,16 +599,22 @@ func (n *network) prune() { } // handleCtrlConn handles ControlChannel connections -func (n *network) handleCtrlConn(sess tunnel.Session, msg chan *transport.Message) { +func (n *network) handleCtrlConn(s tunnel.Session, msg chan *Message) { for { m := new(transport.Message) - if err := sess.Recv(m); err != nil { - log.Debugf("Network tunnel advert receive error: %v", err) + if err := s.Recv(m); err != nil { + log.Debugf("Network tunnel [%s] receive error: %v", ControlChannel, err) + if sessionErr := s.Close(); sessionErr != nil { + log.Debugf("Network tunnel [%s] closing connection error: %v", ControlChannel, sessionErr) + } return } select { - case msg <- m: + case msg <- &Message{ + msg: m, + session: s, + }: case <-n.closed: return } @@ -566,7 +622,7 @@ func (n *network) handleCtrlConn(sess tunnel.Session, msg chan *transport.Messag } // acceptCtrlConn accepts connections from ControlChannel -func (n *network) acceptCtrlConn(l tunnel.Listener, recv chan *transport.Message) { +func (n *network) acceptCtrlConn(l tunnel.Listener, recv chan *Message) { var i int for { // accept a connection @@ -631,7 +687,7 @@ func (n *network) setRouteMetric(route *router.Route) { // processCtrlChan processes messages received on ControlChannel func (n *network) processCtrlChan(listener tunnel.Listener) { // receive control message queue - recv := make(chan *transport.Message, 128) + recv := make(chan *Message, 128) // accept ControlChannel cconnections go n.acceptCtrlConn(listener, recv) @@ -640,10 +696,10 @@ func (n *network) processCtrlChan(listener tunnel.Listener) { select { case m := <-recv: // switch on type of message and take action - switch m.Header["Micro-Method"] { + switch m.msg.Header["Micro-Method"] { case "advert": pbRtrAdvert := &pbRtr.Advert{} - if err := proto.Unmarshal(m.Body, pbRtrAdvert); err != nil { + if err := proto.Unmarshal(m.msg.Body, pbRtrAdvert); err != nil { log.Debugf("Network fail to unmarshal advert message: %v", err) continue } @@ -717,7 +773,7 @@ func (n *network) processCtrlChan(listener tunnel.Listener) { } case "solicit": pbRtrSolicit := &pbRtr.Solicit{} - if err := proto.Unmarshal(m.Body, pbRtrSolicit); err != nil { + if err := proto.Unmarshal(m.msg.Body, pbRtrSolicit); err != nil { log.Debugf("Network fail to unmarshal solicit message: %v", err) continue } diff --git a/network/network.go b/network/network.go index e927241b..44e205e6 100644 --- a/network/network.go +++ b/network/network.go @@ -6,6 +6,8 @@ import ( "github.com/micro/go-micro/client" "github.com/micro/go-micro/server" + "github.com/micro/go-micro/transport" + "github.com/micro/go-micro/tunnel" ) var ( @@ -54,6 +56,14 @@ type Network interface { Server() server.Server } +// Message is network message +type Message struct { + // msg is transport message + msg *transport.Message + // session is tunnel session + session tunnel.Session +} + // NewNetwork returns a new network interface func NewNetwork(opts ...Option) Network { return newNetwork(opts...)