Merge pull request #1104 from milosgajdos83/network-hackery
[WIP] Network hackery
This commit is contained in:
@@ -6,6 +6,7 @@ import (
|
||||
"hash/fnv"
|
||||
"io"
|
||||
"math"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -25,6 +26,7 @@ import (
|
||||
tun "github.com/micro/go-micro/tunnel/transport"
|
||||
"github.com/micro/go-micro/util/backoff"
|
||||
"github.com/micro/go-micro/util/log"
|
||||
pbUtil "github.com/micro/go-micro/util/proto"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -36,6 +38,8 @@ var (
|
||||
DefaultLink = "network"
|
||||
// MaxConnections is the max number of network client connections
|
||||
MaxConnections = 3
|
||||
// MaxPeerErrors is the max number of peer errors before we remove it from network graph
|
||||
MaxPeerErrors = 3
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -43,6 +47,8 @@ var (
|
||||
ErrClientNotFound = errors.New("client not found")
|
||||
// ErrPeerLinkNotFound is returned when peer link could not be found in tunnel Links
|
||||
ErrPeerLinkNotFound = errors.New("peer link not found")
|
||||
// ErrPeerMaxExceeded is returned when peer has reached its max error count limit
|
||||
ErrPeerMaxExceeded = errors.New("peer max errors exceeded")
|
||||
)
|
||||
|
||||
// network implements Network interface
|
||||
@@ -159,6 +165,7 @@ func newNetwork(opts ...Option) Network {
|
||||
id: options.Id,
|
||||
address: peerAddress,
|
||||
peers: make(map[string]*node),
|
||||
status: newStatus(),
|
||||
},
|
||||
options: options,
|
||||
router: options.Router,
|
||||
@@ -270,9 +277,9 @@ func (n *network) acceptCtrlConn(l tunnel.Listener, recv chan *message) {
|
||||
}
|
||||
}
|
||||
|
||||
// handleCtrlConn handles ControlChannel connections
|
||||
// advertise advertises routes to the network
|
||||
func (n *network) advertise(advertChan <-chan *router.Advert) {
|
||||
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
hasher := fnv.New64()
|
||||
for {
|
||||
select {
|
||||
@@ -322,11 +329,22 @@ func (n *network) advertise(advertChan <-chan *router.Advert) {
|
||||
Events: events,
|
||||
}
|
||||
|
||||
// send the advert to all on the control channel
|
||||
// since its not a solicitation
|
||||
// send the advert to a select number of random peers
|
||||
if advert.Type != router.Solicitation {
|
||||
if err := n.sendMsg("advert", ControlChannel, msg); err != nil {
|
||||
log.Debugf("Network failed to advertise routes: %v", err)
|
||||
// get a list of node peers
|
||||
peers := n.Peers()
|
||||
|
||||
// advertise to max 3 peers
|
||||
max := len(peers)
|
||||
if max > 3 {
|
||||
max = 3
|
||||
}
|
||||
for i := 0; i < max; i++ {
|
||||
if peer := n.node.GetPeerNode(peers[rnd.Intn(len(peers))].Id()); peer != nil {
|
||||
if err := n.sendTo("advert", ControlChannel, peer, msg); err != nil {
|
||||
log.Debugf("Network failed to advertise routes to %s: %v", peer.Id(), err)
|
||||
}
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
@@ -338,8 +356,17 @@ func (n *network) advertise(advertChan <-chan *router.Advert) {
|
||||
// someone requested the route
|
||||
n.sendTo("advert", ControlChannel, peer, msg)
|
||||
default:
|
||||
if err := n.sendMsg("advert", ControlChannel, msg); err != nil {
|
||||
log.Debugf("Network failed to advertise routes: %v", err)
|
||||
// get a list of node peers
|
||||
peers := n.Peers()
|
||||
// pick a random peer from the list of peers
|
||||
if peer := n.node.GetPeerNode(peers[rnd.Intn(len(peers))].Id()); peer != nil {
|
||||
if err := n.sendTo("advert", ControlChannel, peer, msg); err != nil {
|
||||
log.Debugf("Network failed to advertise routes to %s: %v, sending multicast", peer.Id(), err)
|
||||
// send a multicast message if we fail to send Unicast message
|
||||
if err := n.sendMsg("advert", ControlChannel, msg); err != nil {
|
||||
log.Debugf("Network failed to advertise routes to %s: %v", peer.Id(), err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
case <-n.closed:
|
||||
@@ -354,7 +381,7 @@ func (n *network) initNodes(startup bool) {
|
||||
// NOTE: this condition never fires
|
||||
// as resolveNodes() never returns error
|
||||
if err != nil && !startup {
|
||||
log.Debugf("Network failed to resolve nodes: %v", err)
|
||||
log.Debugf("Network failed to init nodes: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -460,6 +487,7 @@ func (n *network) handleNetConn(s tunnel.Session, msg chan *message) {
|
||||
}
|
||||
}
|
||||
|
||||
// handleCtrlConn handles ControlChannel connections
|
||||
func (n *network) handleCtrlConn(s tunnel.Session, msg chan *message) {
|
||||
for {
|
||||
m := new(transport.Message)
|
||||
@@ -493,10 +521,11 @@ func (n *network) handleCtrlConn(s tunnel.Session, msg chan *message) {
|
||||
}
|
||||
|
||||
// getHopCount queries network graph and returns hop count for given router
|
||||
// NOTE: this should be called getHopeMetric
|
||||
// - Routes for local services have hop count 1
|
||||
// - Routes with ID of adjacent nodes have hop count 2
|
||||
// - Routes by peers of the advertiser have hop count 3
|
||||
// - Routes beyond node neighbourhood have hop count 4
|
||||
// - Routes with ID of adjacent nodes have hop count 10
|
||||
// - Routes by peers of the advertiser have hop count 100
|
||||
// - Routes beyond node neighbourhood have hop count 1000
|
||||
func (n *network) getHopCount(rtr string) int {
|
||||
// make sure node.peers are not modified
|
||||
n.node.RLock()
|
||||
@@ -733,8 +762,8 @@ func (n *network) processNetChan(listener tunnel.Listener) {
|
||||
case "connect":
|
||||
// mark the time the message has been received
|
||||
now := time.Now()
|
||||
pbNetConnect := &pbNet.Connect{}
|
||||
|
||||
pbNetConnect := &pbNet.Connect{}
|
||||
if err := proto.Unmarshal(m.msg.Body, pbNetConnect); err != nil {
|
||||
log.Debugf("Network tunnel [%s] connect unmarshal error: %v", NetworkChannel, err)
|
||||
continue
|
||||
@@ -752,41 +781,73 @@ func (n *network) processNetChan(listener tunnel.Listener) {
|
||||
address: pbNetConnect.Node.Address,
|
||||
link: m.msg.Header["Micro-Link"],
|
||||
peers: make(map[string]*node),
|
||||
status: newStatus(),
|
||||
lastSeen: now,
|
||||
}
|
||||
|
||||
// update peer links
|
||||
|
||||
// TODO: should we do this only if we manage to add a peer
|
||||
// What should we do if the peer links failed to be updated?
|
||||
if err := n.updatePeerLinks(peer); err != nil {
|
||||
log.Debugf("Network failed updating peer links: %s", err)
|
||||
}
|
||||
|
||||
// add peer to the list of node peers
|
||||
if err := n.node.AddPeer(peer); err == ErrPeerExists {
|
||||
if err := n.AddPeer(peer); err == ErrPeerExists {
|
||||
log.Tracef("Network peer exists, refreshing: %s", peer.id)
|
||||
// update lastSeen time for the existing node
|
||||
// update lastSeen time for the peer
|
||||
if err := n.RefreshPeer(peer.id, peer.link, now); err != nil {
|
||||
log.Debugf("Network failed refreshing peer %s: %v", peer.id, err)
|
||||
}
|
||||
}
|
||||
|
||||
// we send the peer message because someone has sent connect
|
||||
// and wants to know what's on the network. The faster we
|
||||
// respond the faster we start to converge
|
||||
|
||||
// get node peers down to MaxDepth encoded in protobuf
|
||||
msg := PeersToProto(n.node, MaxDepth)
|
||||
// we send the sync message because someone has sent connect
|
||||
// and wants to either connect or reconnect to the network
|
||||
// The faster it gets the network config (routes and peer graph)
|
||||
// the faster the network converges to a stable state
|
||||
|
||||
go func() {
|
||||
// advertise yourself to the new node
|
||||
if err := n.sendTo("peer", NetworkChannel, peer, msg); err != nil {
|
||||
log.Debugf("Network failed to advertise peers: %v", err)
|
||||
// get node peer graph to send back to the connecting node
|
||||
node := PeersToProto(n.node, MaxDepth)
|
||||
|
||||
msg := &pbNet.Sync{
|
||||
Peer: node,
|
||||
}
|
||||
|
||||
// get a list of all of our routes
|
||||
routes, err := n.options.Router.Table().List()
|
||||
switch err {
|
||||
case nil:
|
||||
// encode the routes to protobuf
|
||||
pbRoutes := make([]*pbRtr.Route, 0, len(routes))
|
||||
for _, route := range routes {
|
||||
pbRoute := pbUtil.RouteToProto(route)
|
||||
pbRoutes = append(pbRoutes, pbRoute)
|
||||
}
|
||||
// pack the routes into the sync message
|
||||
msg.Routes = pbRoutes
|
||||
default:
|
||||
// we can't list the routes
|
||||
log.Debugf("Network node %s failed listing routes: %v", n.id, err)
|
||||
}
|
||||
|
||||
// send sync message to the newly connected peer
|
||||
if err := n.sendTo("sync", NetworkChannel, peer, msg); err != nil {
|
||||
log.Debugf("Network failed to send sync message: %v", err)
|
||||
}
|
||||
// wait for a short period of time before sending a solicit message
|
||||
<-time.After(time.Millisecond * 100)
|
||||
|
||||
// send a solicit message when discovering new peer
|
||||
// this triggers the node to flush its routing table to the network
|
||||
// and leads to faster convergence of the network
|
||||
solicit := &pbRtr.Solicit{
|
||||
Id: n.options.Id,
|
||||
}
|
||||
|
||||
// ask for the new nodes routes
|
||||
if err := n.sendTo("solicit", ControlChannel, peer, msg); err != nil {
|
||||
if err := n.sendTo("solicit", ControlChannel, peer, solicit); err != nil {
|
||||
log.Debugf("Network failed to send solicit message: %s", err)
|
||||
}
|
||||
|
||||
@@ -796,11 +857,6 @@ func (n *network) processNetChan(listener tunnel.Listener) {
|
||||
default:
|
||||
// don't block
|
||||
}
|
||||
|
||||
// advertise all the routes when a new node has connected
|
||||
if err := n.router.Solicit(); err != nil {
|
||||
log.Debugf("Network failed to solicit routes: %s", err)
|
||||
}
|
||||
}()
|
||||
case "peer":
|
||||
// mark the time the message has been received
|
||||
@@ -824,32 +880,37 @@ func (n *network) processNetChan(listener tunnel.Listener) {
|
||||
address: pbNetPeer.Node.Address,
|
||||
link: m.msg.Header["Micro-Link"],
|
||||
peers: make(map[string]*node),
|
||||
status: newPeerStatus(pbNetPeer),
|
||||
lastSeen: now,
|
||||
}
|
||||
|
||||
// update peer links
|
||||
|
||||
// TODO: should we do this only if we manage to add a peer
|
||||
// What should we do if the peer links failed to be updated?
|
||||
if err := n.updatePeerLinks(peer); err != nil {
|
||||
log.Debugf("Network failed updating peer links: %s", err)
|
||||
}
|
||||
|
||||
// if it's a new peer i.e. we do not have it in our graph, we solicit its routes
|
||||
if err := n.node.AddPeer(peer); err == nil {
|
||||
// send a solicit message when discovering new peer
|
||||
msg := &pbRtr.Solicit{
|
||||
Id: n.options.Id,
|
||||
}
|
||||
|
||||
go func() {
|
||||
msg := PeersToProto(n.node, MaxDepth)
|
||||
|
||||
// advertise yourself to the peer
|
||||
if err := n.sendTo("peer", NetworkChannel, peer, msg); err != nil {
|
||||
log.Debugf("Network failed to advertise peers: %v", err)
|
||||
}
|
||||
|
||||
// wait for a second
|
||||
<-time.After(time.Millisecond * 100)
|
||||
|
||||
// send a solicit message when discovering new peer
|
||||
solicit := &pbRtr.Solicit{
|
||||
Id: n.options.Id,
|
||||
}
|
||||
|
||||
// then solicit this peer
|
||||
if err := n.sendTo("solicit", ControlChannel, peer, msg); err != nil {
|
||||
if err := n.sendTo("solicit", ControlChannel, peer, solicit); err != nil {
|
||||
log.Debugf("Network failed to send solicit message: %s", err)
|
||||
}
|
||||
|
||||
@@ -867,7 +928,7 @@ func (n *network) processNetChan(listener tunnel.Listener) {
|
||||
}()
|
||||
|
||||
continue
|
||||
// we're expecting any error to be ErrPeerExists
|
||||
// if we already have the peer in our graph, skip further steps
|
||||
} else if err != ErrPeerExists {
|
||||
log.Debugf("Network got error adding peer %v", err)
|
||||
continue
|
||||
@@ -897,6 +958,75 @@ func (n *network) processNetChan(listener tunnel.Listener) {
|
||||
default:
|
||||
// don't block here
|
||||
}
|
||||
case "sync":
|
||||
// record the timestamp of the message receipt
|
||||
now := time.Now()
|
||||
|
||||
pbNetSync := &pbNet.Sync{}
|
||||
if err := proto.Unmarshal(m.msg.Body, pbNetSync); err != nil {
|
||||
log.Debugf("Network tunnel [%s] sync unmarshal error: %v", NetworkChannel, err)
|
||||
continue
|
||||
}
|
||||
|
||||
// don't process your own messages
|
||||
if pbNetSync.Peer.Node.Id == n.options.Id {
|
||||
continue
|
||||
}
|
||||
|
||||
log.Debugf("Network received sync message from: %s", pbNetSync.Peer.Node.Id)
|
||||
|
||||
peer := &node{
|
||||
id: pbNetSync.Peer.Node.Id,
|
||||
address: pbNetSync.Peer.Node.Address,
|
||||
link: m.msg.Header["Micro-Link"],
|
||||
peers: make(map[string]*node),
|
||||
status: newPeerStatus(pbNetSync.Peer),
|
||||
lastSeen: now,
|
||||
}
|
||||
|
||||
// update peer links
|
||||
|
||||
// TODO: should we do this only if we manage to add a peer
|
||||
// What should we do if the peer links failed to be updated?
|
||||
if err := n.updatePeerLinks(peer); err != nil {
|
||||
log.Debugf("Network failed updating peer links: %s", err)
|
||||
}
|
||||
|
||||
// add peer to the list of node peers
|
||||
if err := n.node.AddPeer(peer); err == ErrPeerExists {
|
||||
log.Tracef("Network peer exists, refreshing: %s", peer.id)
|
||||
// update lastSeen time for the existing node
|
||||
if err := n.RefreshPeer(peer.id, peer.link, now); err != nil {
|
||||
log.Debugf("Network failed refreshing peer %s: %v", peer.id, err)
|
||||
}
|
||||
}
|
||||
|
||||
// when we receive a sync message we update our routing table
|
||||
// and send a peer message back to the network to announce our presence
|
||||
|
||||
// add all the routes we have received in the sync message
|
||||
for _, pbRoute := range pbNetSync.Routes {
|
||||
route := pbUtil.ProtoToRoute(pbRoute)
|
||||
if err := n.router.Table().Create(route); err != nil && err != router.ErrDuplicateRoute {
|
||||
log.Debugf("Network node %s failed to add route: %v", n.id, err)
|
||||
}
|
||||
}
|
||||
|
||||
// update your sync timestamp
|
||||
// NOTE: this might go away as we will be doing full table advert to random peer
|
||||
if err := n.RefreshSync(now); err != nil {
|
||||
log.Debugf("Network failed refreshing sync time: %v", err)
|
||||
}
|
||||
|
||||
go func() {
|
||||
// get node peer graph to send back to the syncing node
|
||||
msg := PeersToProto(n.node, MaxDepth)
|
||||
|
||||
// advertise yourself to the new node
|
||||
if err := n.sendTo("peer", NetworkChannel, peer, msg); err != nil {
|
||||
log.Debugf("Network failed to advertise peers: %v", err)
|
||||
}
|
||||
}()
|
||||
case "close":
|
||||
pbNetClose := &pbNet.Close{}
|
||||
if err := proto.Unmarshal(m.msg.Body, pbNetClose); err != nil {
|
||||
@@ -924,6 +1054,9 @@ func (n *network) processNetChan(listener tunnel.Listener) {
|
||||
log.Debugf("Network failed pruning peer %s routes: %v", peer.id, err)
|
||||
}
|
||||
|
||||
// NOTE: we should maybe advertise this to the network so we converge faster on closed nodes
|
||||
// as opposed to our waiting until the node eventually gets pruned; something to think about
|
||||
|
||||
// delete peer from the peerLinks
|
||||
n.Lock()
|
||||
delete(n.peerLinks, pbNetClose.Node.Address)
|
||||
@@ -976,12 +1109,15 @@ func (n *network) prunePeerRoutes(peer *node) error {
|
||||
// seen for a period of time. Also removes all the routes either originated by or routable
|
||||
// by the stale nodes. it also resolves nodes periodically and adds them to the tunnel
|
||||
func (n *network) manage() {
|
||||
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
announce := time.NewTicker(AnnounceTime)
|
||||
defer announce.Stop()
|
||||
prune := time.NewTicker(PruneTime)
|
||||
defer prune.Stop()
|
||||
resolve := time.NewTicker(ResolveTime)
|
||||
defer resolve.Stop()
|
||||
netsync := time.NewTicker(SyncTime)
|
||||
defer netsync.Stop()
|
||||
|
||||
// list of links we've sent to
|
||||
links := make(map[string]time.Time)
|
||||
@@ -1080,7 +1216,7 @@ func (n *network) manage() {
|
||||
|
||||
// unknown link and peer so lets do the connect flow
|
||||
if err := n.sendTo("connect", NetworkChannel, peer, msg); err != nil {
|
||||
log.Debugf("Network failed to advertise peer %s: %v", peer.id, err)
|
||||
log.Debugf("Network failed to connect %s: %v", peer.id, err)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -1129,6 +1265,46 @@ func (n *network) manage() {
|
||||
log.Debugf("Network failed deleting routes by %s: %v", route.Router, err)
|
||||
}
|
||||
}
|
||||
case <-netsync.C:
|
||||
// get a list of node peers
|
||||
peers := n.Peers()
|
||||
// pick a random peer from the list of peers and request full sync
|
||||
peer := n.node.GetPeerNode(peers[rnd.Intn(len(peers))].Id())
|
||||
// skip if we can't find randmly selected peer
|
||||
if peer == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
go func() {
|
||||
// get node peer graph to send back to the connecting node
|
||||
node := PeersToProto(n.node, MaxDepth)
|
||||
|
||||
msg := &pbNet.Sync{
|
||||
Peer: node,
|
||||
}
|
||||
|
||||
// get a list of all of our routes
|
||||
routes, err := n.options.Router.Table().List()
|
||||
switch err {
|
||||
case nil:
|
||||
// encode the routes to protobuf
|
||||
pbRoutes := make([]*pbRtr.Route, 0, len(routes))
|
||||
for _, route := range routes {
|
||||
pbRoute := pbUtil.RouteToProto(route)
|
||||
pbRoutes = append(pbRoutes, pbRoute)
|
||||
}
|
||||
// pack the routes into the sync message
|
||||
msg.Routes = pbRoutes
|
||||
default:
|
||||
// we can't list the routes
|
||||
log.Debugf("Network node %s failed listing routes: %v", n.id, err)
|
||||
}
|
||||
|
||||
// send sync message to the newly connected peer
|
||||
if err := n.sendTo("sync", NetworkChannel, peer, msg); err != nil {
|
||||
log.Debugf("Network failed to send sync message: %v", err)
|
||||
}
|
||||
}()
|
||||
case <-resolve.C:
|
||||
n.initNodes(false)
|
||||
}
|
||||
@@ -1160,9 +1336,20 @@ func (n *network) sendTo(method, channel string, peer *node, msg proto.Message)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create a unicast connection to the peer but don't do the open/accept flow
|
||||
c, err := n.tunnel.Dial(channel, tunnel.DialWait(false), tunnel.DialLink(peer.link))
|
||||
if err != nil {
|
||||
if peerNode := n.GetPeerNode(peer.id); peerNode != nil {
|
||||
log.Debugf("Network found peer %s: %v", peer.id, peerNode)
|
||||
// update node status when error happens
|
||||
peerNode.status.err.Update(err)
|
||||
log.Debugf("Network increment node peer %p %v count to: %d", peerNode, peerNode, peerNode.status.Error().Count())
|
||||
if count := peerNode.status.Error().Count(); count == MaxPeerErrors {
|
||||
log.Debugf("Network node peer %v count exceeded %d: %d", peerNode, MaxPeerErrors, peerNode.status.Error().Count())
|
||||
n.PrunePeer(peerNode.id)
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
defer c.Close()
|
||||
@@ -1187,7 +1374,22 @@ func (n *network) sendTo(method, channel string, peer *node, msg proto.Message)
|
||||
tmsg.Header["Micro-Peer"] = peer.id
|
||||
}
|
||||
|
||||
return c.Send(tmsg)
|
||||
if err := c.Send(tmsg); err != nil {
|
||||
// TODO: Lookup peer in our graph
|
||||
if peerNode := n.GetPeerNode(peer.id); peerNode != nil {
|
||||
log.Debugf("Network found peer %s: %v", peer.id, peerNode)
|
||||
// update node status when error happens
|
||||
peerNode.status.err.Update(err)
|
||||
log.Debugf("Network increment node peer %p %v count to: %d", peerNode, peerNode, peerNode.status.Error().Count())
|
||||
if count := peerNode.status.Error().Count(); count == MaxPeerErrors {
|
||||
log.Debugf("Network node peer %v count exceeded %d: %d", peerNode, MaxPeerErrors, peerNode.status.Error().Count())
|
||||
n.PrunePeer(peerNode.id)
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// sendMsg sends a message to the tunnel channel
|
||||
@@ -1318,6 +1520,9 @@ func (n *network) connect() {
|
||||
if !discovered {
|
||||
// recreate the clients because all the tunnel links are gone
|
||||
// so we haven't send discovery beneath
|
||||
// NOTE: when starting the tunnel for the first time we might be recreating potentially
|
||||
// well functioning tunnel clients as "discovered" will be false until the
|
||||
// n.discovered channel is read at some point later on.
|
||||
if err := n.createClients(); err != nil {
|
||||
log.Debugf("Failed to recreate network/control clients: %v", err)
|
||||
continue
|
||||
|
||||
Reference in New Issue
Block a user