Keep track of peer links

This commit is contained in:
Milos Gajdos 2019-10-22 20:48:51 +01:00
parent caca93f65b
commit 6353b2b894
No known key found for this signature in database
GPG Key ID: 8B31058CC55DFD4F
2 changed files with 86 additions and 20 deletions

View File

@ -37,6 +37,8 @@ var (
var ( var (
// ErrClientNotFound is returned when client for tunnel channel could not be found // ErrClientNotFound is returned when client for tunnel channel could not be found
ErrClientNotFound = errors.New("client not found") ErrClientNotFound = errors.New("client not found")
// ErrPeerLinkNotFound is returned when peer link could not be found in tunnel Links
ErrPeerLinkNotFound = errors.New("peer link not found")
) )
// network implements Network interface // network implements Network interface
@ -58,6 +60,8 @@ type network struct {
// tunClient is a map of tunnel clients keyed over tunnel channel names // tunClient is a map of tunnel clients keyed over tunnel channel names
tunClient map[string]transport.Client tunClient map[string]transport.Client
// peerLinks is a map of links for each peer
peerLinks map[string]tunnel.Link
sync.RWMutex sync.RWMutex
// connected marks the network as connected // connected marks the network as connected
@ -138,6 +142,7 @@ func newNetwork(opts ...Option) Network {
server: server, server: server,
client: client, client: client,
tunClient: make(map[string]transport.Client), tunClient: make(map[string]transport.Client),
peerLinks: make(map[string]tunnel.Link),
} }
network.node.network = network network.node.network = network
@ -246,19 +251,22 @@ func (n *network) resolve() {
} }
// handleNetConn handles network announcement messages // handleNetConn handles network announcement messages
func (n *network) handleNetConn(sess tunnel.Session, msg chan *transport.Message) { func (n *network) handleNetConn(s tunnel.Session, msg chan *Message) {
for { for {
m := new(transport.Message) m := new(transport.Message)
if err := sess.Recv(m); err != nil { if err := s.Recv(m); err != nil {
log.Debugf("Network tunnel [%s] receive error: %v", NetworkChannel, err) log.Debugf("Network tunnel [%s] receive error: %v", NetworkChannel, err)
if sessErr := sess.Close(); sessErr != nil { if sessionErr := s.Close(); sessionErr != nil {
log.Debugf("Network tunnel [%s] closing connection error: %v", sessErr) log.Debugf("Network tunnel [%s] closing connection error: %v", NetworkChannel, sessionErr)
} }
return return
} }
select { select {
case msg <- m: case msg <- &Message{
msg: m,
session: s,
}:
case <-n.closed: case <-n.closed:
return return
} }
@ -266,7 +274,7 @@ func (n *network) handleNetConn(sess tunnel.Session, msg chan *transport.Message
} }
// acceptNetConn accepts connections from NetworkChannel // acceptNetConn accepts connections from NetworkChannel
func (n *network) acceptNetConn(l tunnel.Listener, recv chan *transport.Message) { func (n *network) acceptNetConn(l tunnel.Listener, recv chan *Message) {
var i int var i int
for { for {
// accept a connection // accept a connection
@ -295,10 +303,40 @@ func (n *network) acceptNetConn(l tunnel.Listener, recv chan *transport.Message)
} }
} }
func (n *network) updatePeerLinks(peerAddr string, linkId string) error {
n.Lock()
defer n.Unlock()
log.Debugf("Network looking up link %s in the peer links", linkId)
// lookup the peer link
var peerLink tunnel.Link
for _, link := range n.tunnel.Links() {
if link.Id() == linkId {
peerLink = link
break
}
}
if peerLink == nil {
return ErrPeerLinkNotFound
}
// if the peerLink is found in the returned links update peerLinks
log.Debugf("Network updating peer links for peer %s", peerAddr)
// add peerLink to the peerLinks map
if link, ok := n.peerLinks[peerAddr]; ok {
// if the existing has better Length then the new, replace it
if link.Length() < peerLink.Length() {
n.peerLinks[peerAddr] = peerLink
}
} else {
n.peerLinks[peerAddr] = peerLink
}
return nil
}
// processNetChan processes messages received on NetworkChannel // processNetChan processes messages received on NetworkChannel
func (n *network) processNetChan(listener tunnel.Listener) { func (n *network) processNetChan(listener tunnel.Listener) {
// receive network message queue // receive network message queue
recv := make(chan *transport.Message, 128) recv := make(chan *Message, 128)
// accept NetworkChannel connections // accept NetworkChannel connections
go n.acceptNetConn(listener, recv) go n.acceptNetConn(listener, recv)
@ -307,12 +345,12 @@ func (n *network) processNetChan(listener tunnel.Listener) {
select { select {
case m := <-recv: case m := <-recv:
// switch on type of message and take action // switch on type of message and take action
switch m.Header["Micro-Method"] { switch m.msg.Header["Micro-Method"] {
case "connect": case "connect":
// mark the time the message has been received // mark the time the message has been received
now := time.Now() now := time.Now()
pbNetConnect := &pbNet.Connect{} pbNetConnect := &pbNet.Connect{}
if err := proto.Unmarshal(m.Body, pbNetConnect); err != nil { if err := proto.Unmarshal(m.msg.Body, pbNetConnect); err != nil {
log.Debugf("Network tunnel [%s] connect unmarshal error: %v", NetworkChannel, err) log.Debugf("Network tunnel [%s] connect unmarshal error: %v", NetworkChannel, err)
continue continue
} }
@ -327,6 +365,10 @@ func (n *network) processNetChan(listener tunnel.Listener) {
peers: make(map[string]*node), peers: make(map[string]*node),
lastSeen: now, lastSeen: now,
} }
// update peer links
if err := n.updatePeerLinks(pbNetConnect.Node.Address, m.session.Link()); err != nil {
log.Debugf("Network failed updating peer links: %s", err)
}
if err := n.node.AddPeer(peer); err == ErrPeerExists { if err := n.node.AddPeer(peer); err == ErrPeerExists {
log.Debugf("Network peer exists, refreshing: %s", peer.id) log.Debugf("Network peer exists, refreshing: %s", peer.id)
// update lastSeen time for the existing node // update lastSeen time for the existing node
@ -349,7 +391,7 @@ func (n *network) processNetChan(listener tunnel.Listener) {
// mark the time the message has been received // mark the time the message has been received
now := time.Now() now := time.Now()
pbNetPeer := &pbNet.Peer{} pbNetPeer := &pbNet.Peer{}
if err := proto.Unmarshal(m.Body, pbNetPeer); err != nil { if err := proto.Unmarshal(m.msg.Body, pbNetPeer); err != nil {
log.Debugf("Network tunnel [%s] peer unmarshal error: %v", NetworkChannel, err) log.Debugf("Network tunnel [%s] peer unmarshal error: %v", NetworkChannel, err)
continue continue
} }
@ -364,6 +406,10 @@ func (n *network) processNetChan(listener tunnel.Listener) {
peers: make(map[string]*node), peers: make(map[string]*node),
lastSeen: now, lastSeen: now,
} }
// update peer links
if err := n.updatePeerLinks(pbNetPeer.Node.Address, m.session.Link()); err != nil {
log.Debugf("Network failed updating peer links: %s", err)
}
if err := n.node.AddPeer(peer); err == nil { if err := n.node.AddPeer(peer); err == nil {
// send a solicit message when discovering new peer // send a solicit message when discovering new peer
msg := &pbRtr.Solicit{ msg := &pbRtr.Solicit{
@ -393,7 +439,7 @@ func (n *network) processNetChan(listener tunnel.Listener) {
} }
case "close": case "close":
pbNetClose := &pbNet.Close{} pbNetClose := &pbNet.Close{}
if err := proto.Unmarshal(m.Body, pbNetClose); err != nil { if err := proto.Unmarshal(m.msg.Body, pbNetClose); err != nil {
log.Debugf("Network tunnel [%s] close unmarshal error: %v", NetworkChannel, err) log.Debugf("Network tunnel [%s] close unmarshal error: %v", NetworkChannel, err)
continue continue
} }
@ -412,6 +458,10 @@ func (n *network) processNetChan(listener tunnel.Listener) {
if err := n.prunePeerRoutes(peer); err != nil { if err := n.prunePeerRoutes(peer); err != nil {
log.Debugf("Network failed pruning peer %s routes: %v", peer.id, err) log.Debugf("Network failed pruning peer %s routes: %v", peer.id, err)
} }
// deelete peer from the peerLinks
n.Lock()
delete(n.peerLinks, pbNetClose.Node.Address)
n.Unlock()
} }
case <-n.closed: case <-n.closed:
return return
@ -549,16 +599,22 @@ func (n *network) prune() {
} }
// handleCtrlConn handles ControlChannel connections // handleCtrlConn handles ControlChannel connections
func (n *network) handleCtrlConn(sess tunnel.Session, msg chan *transport.Message) { func (n *network) handleCtrlConn(s tunnel.Session, msg chan *Message) {
for { for {
m := new(transport.Message) m := new(transport.Message)
if err := sess.Recv(m); err != nil { if err := s.Recv(m); err != nil {
log.Debugf("Network tunnel advert receive error: %v", err) log.Debugf("Network tunnel [%s] receive error: %v", ControlChannel, err)
if sessionErr := s.Close(); sessionErr != nil {
log.Debugf("Network tunnel [%s] closing connection error: %v", ControlChannel, sessionErr)
}
return return
} }
select { select {
case msg <- m: case msg <- &Message{
msg: m,
session: s,
}:
case <-n.closed: case <-n.closed:
return return
} }
@ -566,7 +622,7 @@ func (n *network) handleCtrlConn(sess tunnel.Session, msg chan *transport.Messag
} }
// acceptCtrlConn accepts connections from ControlChannel // acceptCtrlConn accepts connections from ControlChannel
func (n *network) acceptCtrlConn(l tunnel.Listener, recv chan *transport.Message) { func (n *network) acceptCtrlConn(l tunnel.Listener, recv chan *Message) {
var i int var i int
for { for {
// accept a connection // accept a connection
@ -631,7 +687,7 @@ func (n *network) setRouteMetric(route *router.Route) {
// processCtrlChan processes messages received on ControlChannel // processCtrlChan processes messages received on ControlChannel
func (n *network) processCtrlChan(listener tunnel.Listener) { func (n *network) processCtrlChan(listener tunnel.Listener) {
// receive control message queue // receive control message queue
recv := make(chan *transport.Message, 128) recv := make(chan *Message, 128)
// accept ControlChannel cconnections // accept ControlChannel cconnections
go n.acceptCtrlConn(listener, recv) go n.acceptCtrlConn(listener, recv)
@ -640,10 +696,10 @@ func (n *network) processCtrlChan(listener tunnel.Listener) {
select { select {
case m := <-recv: case m := <-recv:
// switch on type of message and take action // switch on type of message and take action
switch m.Header["Micro-Method"] { switch m.msg.Header["Micro-Method"] {
case "advert": case "advert":
pbRtrAdvert := &pbRtr.Advert{} pbRtrAdvert := &pbRtr.Advert{}
if err := proto.Unmarshal(m.Body, pbRtrAdvert); err != nil { if err := proto.Unmarshal(m.msg.Body, pbRtrAdvert); err != nil {
log.Debugf("Network fail to unmarshal advert message: %v", err) log.Debugf("Network fail to unmarshal advert message: %v", err)
continue continue
} }
@ -717,7 +773,7 @@ func (n *network) processCtrlChan(listener tunnel.Listener) {
} }
case "solicit": case "solicit":
pbRtrSolicit := &pbRtr.Solicit{} pbRtrSolicit := &pbRtr.Solicit{}
if err := proto.Unmarshal(m.Body, pbRtrSolicit); err != nil { if err := proto.Unmarshal(m.msg.Body, pbRtrSolicit); err != nil {
log.Debugf("Network fail to unmarshal solicit message: %v", err) log.Debugf("Network fail to unmarshal solicit message: %v", err)
continue continue
} }

View File

@ -6,6 +6,8 @@ import (
"github.com/micro/go-micro/client" "github.com/micro/go-micro/client"
"github.com/micro/go-micro/server" "github.com/micro/go-micro/server"
"github.com/micro/go-micro/transport"
"github.com/micro/go-micro/tunnel"
) )
var ( var (
@ -54,6 +56,14 @@ type Network interface {
Server() server.Server Server() server.Server
} }
// Message is network message
type Message struct {
// msg is transport message
msg *transport.Message
// session is tunnel session
session tunnel.Session
}
// NewNetwork returns a new network interface // NewNetwork returns a new network interface
func NewNetwork(opts ...Option) Network { func NewNetwork(opts ...Option) Network {
return newNetwork(opts...) return newNetwork(opts...)