micro/network/node.go
2020-01-15 21:38:37 +00:00

541 lines
11 KiB
Go

package network
import (
"container/list"
"errors"
"sync"
"time"
pb "github.com/micro/go-micro/network/service/proto"
)
var (
// MaxDepth defines max depth of peer topology
MaxDepth uint = 3
)
var (
// ErrPeerExists is returned when adding a peer which already exists
ErrPeerExists = errors.New("peer already exists")
// ErrPeerNotFound is returned when a peer could not be found in node topology
ErrPeerNotFound = errors.New("peer not found")
)
// nodeError tracks node errors
type nodeError struct {
sync.RWMutex
count int
msg error
}
// Increment increments node error count
func (e *nodeError) Update(err error) {
e.Lock()
defer e.Unlock()
e.count++
e.msg = err
}
// Count returns node error count
func (e *nodeError) Count() int {
e.RLock()
defer e.RUnlock()
return e.count
}
func (e *nodeError) Msg() string {
e.RLock()
defer e.RUnlock()
if e.msg != nil {
return e.msg.Error()
}
return ""
}
// status returns node status
type status struct {
sync.RWMutex
err *nodeError
}
// newStatus creates
func newStatus() *status {
return &status{
err: new(nodeError),
}
}
func newPeerStatus(peer *pb.Peer) *status {
status := &status{
err: new(nodeError),
}
// if Node.Status is nil, return empty status
if peer.Node.Status == nil {
return status
}
// if peer.Node.Status.Error is NOT nil, update status fields
if err := peer.Node.Status.GetError(); err != nil {
status.err.count = int(peer.Node.Status.Error.Count)
status.err.msg = errors.New(peer.Node.Status.Error.Msg)
}
return status
}
func (s *status) Error() Error {
s.RLock()
defer s.RUnlock()
return &nodeError{
count: s.err.count,
msg: s.err.msg,
}
}
// node is network node
type node struct {
sync.RWMutex
// id is node id
id string
// address is node address
address string
// link on which we communicate with the peer
link string
// peers are nodes with direct link to this node
peers map[string]*node
// network returns the node network
network Network
// lastSeen keeps track of node lifetime and updates
lastSeen time.Time
// lastSync keeps track of node last sync request
lastSync time.Time
// err tracks node status
status *status
}
// Id is node ide
func (n *node) Id() string {
return n.id
}
// Address returns node address
func (n *node) Address() string {
return n.address
}
// Network returns node network
func (n *node) Network() Network {
return n.network
}
// Status returns node status
func (n *node) Status() Status {
n.RLock()
defer n.RUnlock()
return &status{
err: &nodeError{
count: n.status.err.count,
msg: n.status.err.msg,
},
}
}
// walk walks the node graph until some condition is met
func (n *node) walk(until func(peer *node) bool, action func(parent, peer *node)) map[string]*node {
// track the visited nodes
visited := make(map[string]*node)
// queue of the nodes to visit
queue := list.New()
// push node to the back of queue
queue.PushBack(n)
// mark the node as visited
visited[n.id] = n
// keep iterating over the queue until its empty
for queue.Len() > 0 {
// pop the node from the front of the queue
qnode := queue.Front()
if until(qnode.Value.(*node)) {
return visited
}
// iterate through all of the node peers
// mark the visited nodes; enqueue the non-visted
for id, peer := range qnode.Value.(*node).peers {
action(qnode.Value.(*node), peer)
if _, ok := visited[id]; !ok {
visited[id] = peer
queue.PushBack(peer)
}
}
// remove the node from the queue
queue.Remove(qnode)
}
return visited
}
// AddPeer adds a new peer to node topology
// It returns false if the peer already exists
func (n *node) AddPeer(peer *node) error {
n.Lock()
defer n.Unlock()
// get node topology: we need to check if the peer
// we are trying to add is already in our graph
top := n.getTopology(MaxDepth)
untilFoundPeer := func(n *node) bool {
return n.id == peer.id
}
justWalk := func(paent, node *node) {}
visited := top.walk(untilFoundPeer, justWalk)
peerNode, inTop := visited[peer.id]
if _, ok := n.peers[peer.id]; !ok {
if inTop {
// just create a new edge to the existing peer
// but make sure you update the peer link
peerNode.link = peer.link
n.peers[peer.id] = peerNode
return nil
}
n.peers[peer.id] = peer
return nil
}
return ErrPeerExists
}
// DeletePeer deletes a peer from node peers
// It returns true if the peer has been deleted
func (n *node) DeletePeer(id string) bool {
n.Lock()
defer n.Unlock()
delete(n.peers, id)
return true
}
// UpdatePeer updates a peer if it already exists
// It returns error if the peer does not exist
func (n *node) UpdatePeer(peer *node) error {
n.Lock()
defer n.Unlock()
if _, ok := n.peers[peer.id]; ok {
n.peers[peer.id] = peer
return nil
}
return ErrPeerNotFound
}
// RefreshPeer updates node last seen timestamp
// It returns false if the peer has not been found.
func (n *node) RefreshPeer(id, link string, now time.Time) error {
n.Lock()
defer n.Unlock()
peer, ok := n.peers[id]
if !ok {
return ErrPeerNotFound
}
// set peer link
peer.link = link
// set last seen
peer.lastSeen = now
return nil
}
// RefreshSync refreshes nodes sync time
func (n *node) RefreshSync(now time.Time) error {
n.Lock()
defer n.Unlock()
n.lastSync = now
return nil
}
// Nodes returns a slice of all nodes in the whole node topology
func (n *node) Nodes() []Node {
// we need to freeze the network graph here
// otherwise we might get inconsisten results
n.RLock()
defer n.RUnlock()
// NOTE: this should never be true
untilNoMorePeers := func(node *node) bool {
return node == nil
}
justWalk := func(parent, node *node) {}
visited := n.walk(untilNoMorePeers, justWalk)
nodes := make([]Node, 0, len(visited))
// collect all the nodes and return them
for _, node := range visited {
nodes = append(nodes, node)
}
return nodes
}
// GetPeerNode returns a node from node MaxDepth topology
// It returns nil if the peer was not found
func (n *node) GetPeerNode(id string) *node {
// get node topology up to MaxDepth
top := n.Topology(MaxDepth)
untilFoundPeer := func(n *node) bool {
return n.id == id
}
justWalk := func(paent, node *node) {}
visited := top.walk(untilFoundPeer, justWalk)
peerNode, ok := visited[id]
if !ok {
return nil
}
return peerNode
}
// DeletePeerNode removes peer node from node topology
func (n *node) DeletePeerNode(id string) error {
n.Lock()
defer n.Unlock()
untilNoMorePeers := func(node *node) bool {
return node == nil
}
deleted := make(map[string]*node)
deletePeer := func(parent, node *node) {
if node.id != n.id && node.id == id {
delete(parent.peers, node.id)
deleted[node.id] = node
}
}
n.walk(untilNoMorePeers, deletePeer)
if _, ok := deleted[id]; !ok {
return ErrPeerNotFound
}
return nil
}
// PrunePeer prunes the peers with the given id
func (n *node) PrunePeer(id string) {
n.Lock()
defer n.Unlock()
untilNoMorePeers := func(node *node) bool {
return node == nil
}
prunePeer := func(parent, node *node) {
if node.id != n.id && node.id == id {
delete(parent.peers, node.id)
}
}
n.walk(untilNoMorePeers, prunePeer)
}
// PruneStalePeerNodes prunes the peers that have not been seen for longer than pruneTime
// It returns a map of the the nodes that got pruned
func (n *node) PruneStalePeers(pruneTime time.Duration) map[string]*node {
n.Lock()
defer n.Unlock()
untilNoMorePeers := func(node *node) bool {
return node == nil
}
pruned := make(map[string]*node)
pruneStalePeer := func(parent, node *node) {
if node.id != n.id && time.Since(node.lastSeen) > PruneTime {
delete(parent.peers, node.id)
pruned[node.id] = node
}
}
n.walk(untilNoMorePeers, pruneStalePeer)
return pruned
}
// getTopology traverses node graph and builds node topology
// NOTE: this function is not thread safe
func (n *node) getTopology(depth uint) *node {
// make a copy of yourself
node := &node{
id: n.id,
address: n.address,
peers: make(map[string]*node),
network: n.network,
status: n.status,
lastSeen: n.lastSeen,
}
// return if we reach requested depth or we have no more peers
if depth == 0 || len(n.peers) == 0 {
return node
}
// decrement the depth
depth--
// iterate through our peers and update the node peers
for _, peer := range n.peers {
nodePeer := peer.getTopology(depth)
if _, ok := node.peers[nodePeer.id]; !ok {
node.peers[nodePeer.id] = nodePeer
}
}
return node
}
// Topology returns a copy of the node topology down to given depth
// NOTE: the returned node is a node graph - not a single node
func (n *node) Topology(depth uint) *node {
n.RLock()
defer n.RUnlock()
return n.getTopology(depth)
}
// Peers returns node peers up to MaxDepth
func (n *node) Peers() []Node {
n.RLock()
defer n.RUnlock()
peers := make([]Node, 0, len(n.peers))
for _, nodePeer := range n.peers {
peer := nodePeer.getTopology(MaxDepth)
peers = append(peers, peer)
}
return peers
}
// UnpackPeerTopology unpacks pb.Peer into node topology of given depth
func UnpackPeerTopology(pbPeer *pb.Peer, lastSeen time.Time, depth uint) *node {
peerNode := &node{
id: pbPeer.Node.Id,
address: pbPeer.Node.Address,
peers: make(map[string]*node),
status: newPeerStatus(pbPeer),
lastSeen: lastSeen,
}
// return if have either reached the depth or have no more peers
if depth == 0 || len(pbPeer.Peers) == 0 {
return peerNode
}
// decrement the depth
depth--
peers := make(map[string]*node)
for _, pbPeer := range pbPeer.Peers {
peer := UnpackPeerTopology(pbPeer, lastSeen, depth)
peers[pbPeer.Node.Id] = peer
}
peerNode.peers = peers
return peerNode
}
func peerProtoTopology(peer Node, depth uint) *pb.Peer {
node := &pb.Node{
Id: peer.Id(),
Address: peer.Address(),
Status: &pb.Status{
Error: &pb.Error{
Count: uint32(peer.Status().Error().Count()),
Msg: peer.Status().Error().Msg(),
},
},
}
// set the network name if network is not nil
if peer.Network() != nil {
node.Network = peer.Network().Name()
}
pbPeers := &pb.Peer{
Node: node,
Peers: make([]*pb.Peer, 0),
}
// return if we reached the end of topology or depth
if depth == 0 || len(peer.Peers()) == 0 {
return pbPeers
}
// decrement the depth
depth--
// iterate through peers of peers aka pops
for _, pop := range peer.Peers() {
peer := peerProtoTopology(pop, depth)
pbPeers.Peers = append(pbPeers.Peers, peer)
}
return pbPeers
}
// PeersToProto returns node peers graph encoded into protobuf
func PeersToProto(node Node, depth uint) *pb.Peer {
// network node aka root node
pbNode := &pb.Node{
Id: node.Id(),
Address: node.Address(),
Status: &pb.Status{
Error: &pb.Error{
Count: uint32(node.Status().Error().Count()),
Msg: node.Status().Error().Msg(),
},
},
}
// set the network name if network is not nil
if node.Network() != nil {
pbNode.Network = node.Network().Name()
}
// we will build proto topology into this
pbPeers := &pb.Peer{
Node: pbNode,
Peers: make([]*pb.Peer, 0),
}
for _, peer := range node.Peers() {
pbPeer := peerProtoTopology(peer, depth)
pbPeers.Peers = append(pbPeers.Peers, pbPeer)
}
return pbPeers
}