Moved node implementation into dedicated source file

This commit is contained in:
Milos Gajdos 2019-09-10 00:01:41 +01:00
parent eec780aaa7
commit f91d0408ab
No known key found for this signature in database
GPG Key ID: 8B31058CC55DFD4F
2 changed files with 158 additions and 157 deletions

View File

@ -27,8 +27,6 @@ var (
ControlChannel = "control" ControlChannel = "control"
// DefaultLink is default network link // DefaultLink is default network link
DefaultLink = "network" DefaultLink = "network"
// MaxDepth defines max depth of neighbourhood topology
MaxDepth = 3
) )
var ( var (
@ -38,149 +36,6 @@ var (
ErrClientNotFound = errors.New("client not found") ErrClientNotFound = errors.New("client not found")
) )
// node is network node
type node struct {
sync.RWMutex
// id is node id
id string
// address is node address
address string
// neighbours maps the node neighbourhood
neighbours map[string]*node
// network returns the node network
network Network
// lastSeen stores the time the node has been seen last time
lastSeen time.Time
}
// Id is node ide
func (n *node) Id() string {
return n.id
}
// Address returns node address
func (n *node) Address() string {
return n.address
}
// Network returns node network
func (n *node) Network() Network {
return n.network
}
// Neighbourhood returns node neighbourhood
func (n *node) Neighbourhood() []Node {
var nodes []Node
n.RLock()
for _, neighbourNode := range n.neighbours {
// make a copy of the node
n := &node{
id: neighbourNode.id,
address: neighbourNode.address,
network: neighbourNode.network,
}
// NOTE: we do not care about neighbour's neighbours
nodes = append(nodes, n)
}
n.RUnlock()
return nodes
}
// getNeighbours collects node neighbours up to given depth into pbNeighbours
// NOTE: this method is not thread safe, so make sure you serialize access to it
// NOTE: we should be able to read-Lock this, even though it's recursive
// TODO: we should rework this so it returns pbNeighbours along with error
func (n *node) getNeighbours(pbNeighbours *pbNet.Neighbour, depth int) error {
if pbNeighbours == nil {
return errors.New("neighbours not initialized")
}
// return if have either reached the depth or have no more neighbours
if depth == 0 || len(n.neighbours) == 0 {
return nil
}
// decrement the depth
depth--
var neighbours []*pbNet.Neighbour
for _, neighbour := range n.neighbours {
// node
node := &pbNet.Node{
Id: neighbour.id,
Address: neighbour.address,
}
// create new neighbour
pbNodeNeighbour := &pbNet.Neighbour{
Node: node,
Neighbours: make([]*pbNet.Neighbour, 0),
}
// get neighbours of the neighbour
// NOTE: this is [not] a recursive call
if err := neighbour.getNeighbours(pbNodeNeighbour, depth); err != nil {
return err
}
// add current neighbour to explored neighbours
neighbours = append(neighbours, pbNodeNeighbour)
}
// add neighbours to the parent topology
pbNeighbours.Neighbours = neighbours
return nil
}
// unpackNeighbour unpacks pbNet.Neighbour into node of given depth
func unpackNeighbour(pbNeighbour *pbNet.Neighbour, depth int) (*node, error) {
if pbNeighbour == nil {
return nil, errors.New("neighbour not initialized")
}
neighbourNode := &node{
id: pbNeighbour.Node.Id,
address: pbNeighbour.Node.Address,
neighbours: make(map[string]*node),
}
// return if have either reached the depth or have no more neighbours
if depth == 0 || len(pbNeighbour.Neighbours) == 0 {
return neighbourNode, nil
}
// decrement the depth
depth--
neighbours := make(map[string]*node)
for _, pbNode := range pbNeighbour.Neighbours {
node, err := unpackNeighbour(pbNode, depth)
if err != nil {
return nil, err
}
neighbours[pbNode.Node.Id] = node
}
neighbourNode.neighbours = neighbours
return neighbourNode, nil
}
// updateNeighbour updates node neighbour up to given depth
func (n *node) updateNeighbour(neighbour *pbNet.Neighbour, depth int) error {
// unpack neighbour into topology of size MaxDepth-1
// NOTE: we need MaxDepth-1 because node n is the parent adding which
// gives us the max neighbour topology we maintain and propagate
node, err := unpackNeighbour(neighbour, MaxDepth-1)
if err != nil {
return err
}
// update node neighbours with new topology
n.neighbours[neighbour.Node.Id] = node
return nil
}
// network implements Network interface // network implements Network interface
type network struct { type network struct {
// node is network node // node is network node
@ -520,21 +375,13 @@ func (n *network) sendMsg(msgType string, channel string) error {
} }
case "neighbour": case "neighbour":
n.RLock() n.RLock()
node := &pbNet.Node{ var err error
Id: n.node.id, // get all the neighbours down to MaxDepth
Address: n.node.address, protoMsg, err = n.node.getNeighbours(MaxDepth)
} if err != nil {
nodeNeighbour := &pbNet.Neighbour{
Node: node,
Neighbours: make([]*pbNet.Neighbour, 0),
}
// get all the neighbours down to MaxNeighbourDepth
if err := n.node.getNeighbours(nodeNeighbour, MaxDepth); err != nil {
log.Debugf("Network unable to retrieve node neighbours: %s", err) log.Debugf("Network unable to retrieve node neighbours: %s", err)
return err return err
} }
// set protoMsg for serialization
protoMsg = nodeNeighbour
n.RUnlock() n.RUnlock()
default: default:
return ErrMsgUnknown return ErrMsgUnknown

154
network/node.go Normal file
View File

@ -0,0 +1,154 @@
package network
import (
"errors"
"sync"
"time"
pbNet "github.com/micro/go-micro/network/proto"
)
var (
// MaxDepth defines max depth of peer topology
MaxDepth = 3
)
// node is network node
type node struct {
sync.RWMutex
// id is node id
id string
// address is node address
address string
// neighbours maps the node neighbourhood
neighbours map[string]*node
// network returns the node network
network Network
// lastSeen stores the time the node has been seen last time
lastSeen time.Time
}
// Id is node ide
func (n *node) Id() string {
return n.id
}
// Address returns node address
func (n *node) Address() string {
return n.address
}
// Network returns node network
func (n *node) Network() Network {
return n.network
}
// Neighbourhood returns node neighbourhood
func (n *node) Neighbourhood() []Node {
var nodes []Node
n.RLock()
for _, neighbourNode := range n.neighbours {
// make a copy of the node
n := &node{
id: neighbourNode.id,
address: neighbourNode.address,
network: neighbourNode.network,
}
// NOTE: we do not care about neighbour's neighbours
nodes = append(nodes, n)
}
n.RUnlock()
return nodes
}
// getNeighbours collects node neighbours up to given depth into pbNeighbours
// NOTE: this method is not thread safe, so make sure you serialize access to it
// NOTE: we should be able to read-Lock this, even though it's recursive
func (n *node) getNeighbours(depth int) (*pbNet.Neighbour, error) {
node := &pbNet.Node{
Id: n.id,
Address: n.address,
}
pbNeighbours := &pbNet.Neighbour{
Node: node,
Neighbours: make([]*pbNet.Neighbour, 0),
}
// return if have either reached the depth or have no more neighbours
if depth == 0 || len(n.neighbours) == 0 {
return pbNeighbours, nil
}
// decrement the depth
depth--
var neighbours []*pbNet.Neighbour
for _, neighbour := range n.neighbours {
// get neighbours of the neighbour
// NOTE: this is [not] a recursive call
pbNodeNeighbour, err := neighbour.getNeighbours(depth)
if err != nil {
return nil, err
}
// add current neighbour to explored neighbours
neighbours = append(neighbours, pbNodeNeighbour)
}
// add neighbours to the parent topology
pbNeighbours.Neighbours = neighbours
return pbNeighbours, nil
}
// unpackNeighbour unpacks pbNet.Neighbour into node of given depth
// NOTE: this method is not thread safe, so make sure you serialize access to it
func unpackNeighbour(pbNeighbour *pbNet.Neighbour, depth int) (*node, error) {
if pbNeighbour == nil {
return nil, errors.New("neighbour not initialized")
}
neighbourNode := &node{
id: pbNeighbour.Node.Id,
address: pbNeighbour.Node.Address,
neighbours: make(map[string]*node),
}
// return if have either reached the depth or have no more neighbours
if depth == 0 || len(pbNeighbour.Neighbours) == 0 {
return neighbourNode, nil
}
// decrement the depth
depth--
neighbours := make(map[string]*node)
for _, pbNode := range pbNeighbour.Neighbours {
node, err := unpackNeighbour(pbNode, depth)
if err != nil {
return nil, err
}
neighbours[pbNode.Node.Id] = node
}
neighbourNode.neighbours = neighbours
return neighbourNode, nil
}
// updateNeighbour updates node neighbour up to given depth
// NOTE: this method is not thread safe, so make sure you serialize access to it
func (n *node) updateNeighbour(neighbour *pbNet.Neighbour, depth int) error {
// unpack neighbour into topology of size MaxDepth-1
// NOTE: we need MaxDepth-1 because node n is the parent adding which
// gives us the max neighbour topology we maintain and propagate
node, err := unpackNeighbour(neighbour, MaxDepth-1)
if err != nil {
return err
}
// update node neighbours with new topology
n.neighbours[neighbour.Node.Id] = node
return nil
}