Fixed some races. Added more tests.
This commit is contained in:
parent
d58eb51976
commit
588484c3bf
@ -275,10 +275,10 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen
|
|||||||
lastSeen: now,
|
lastSeen: now,
|
||||||
}
|
}
|
||||||
n.Unlock()
|
n.Unlock()
|
||||||
// get all the node peers down to MaxDepth encoded in protobuf message
|
// get all the node peers down to MaxDepth encoded in protobuf
|
||||||
msg, err := n.node.getProtoTopology(MaxDepth)
|
msg, err := n.node.getProtoTopology(MaxDepth)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Debugf("Network unable to retrieve node peers: %s", err)
|
log.Debugf("Network unable to retrieve node topology: %s", err)
|
||||||
}
|
}
|
||||||
// advertise yourself to the network
|
// advertise yourself to the network
|
||||||
if err := n.sendMsg("peer", msg, NetworkChannel); err != nil {
|
if err := n.sendMsg("peer", msg, NetworkChannel); err != nil {
|
||||||
@ -322,7 +322,7 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen
|
|||||||
// after adding new peer go to the next step
|
// after adding new peer go to the next step
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// NOTE: we don't update max topology depth as we dont include this network node
|
// NOTE: we don't update MaxDepth toplogy as we dont update this node only its peers
|
||||||
if err := n.node.updatePeerTopology(pbNetPeer, MaxDepth-1); err != nil {
|
if err := n.node.updatePeerTopology(pbNetPeer, MaxDepth-1); err != nil {
|
||||||
log.Debugf("Network failed to update peers")
|
log.Debugf("Network failed to update peers")
|
||||||
}
|
}
|
||||||
@ -396,14 +396,12 @@ func (n *network) announce(client transport.Client) {
|
|||||||
case <-n.closed:
|
case <-n.closed:
|
||||||
return
|
return
|
||||||
case <-announce.C:
|
case <-announce.C:
|
||||||
n.RLock()
|
|
||||||
msg, err := n.node.getProtoTopology(MaxDepth)
|
msg, err := n.node.getProtoTopology(MaxDepth)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Debugf("Network unable to retrieve node peers: %s", err)
|
log.Debugf("Network unable to retrieve node topology: %s", err)
|
||||||
n.RUnlock()
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
n.RUnlock()
|
n.node.RUnlock()
|
||||||
// advertise yourself to the network
|
// advertise yourself to the network
|
||||||
if err := n.sendMsg("peer", msg, NetworkChannel); err != nil {
|
if err := n.sendMsg("peer", msg, NetworkChannel); err != nil {
|
||||||
log.Debugf("Network failed to advertise peers: %v", err)
|
log.Debugf("Network failed to advertise peers: %v", err)
|
||||||
|
@ -136,9 +136,11 @@ func (n *node) Topology(depth uint) *node {
|
|||||||
return node
|
return node
|
||||||
}
|
}
|
||||||
|
|
||||||
// getProtoTopology returns node peers up to given depth encoded in protobufs
|
// getProtoTopology returns node topology down to the given depth encoded in protobuf
|
||||||
// NOTE: this method is NOT thread-safe, so make sure you serialize access to it
|
|
||||||
func (n *node) getProtoTopology(depth uint) (*pb.Peer, error) {
|
func (n *node) getProtoTopology(depth uint) (*pb.Peer, error) {
|
||||||
|
n.RLock()
|
||||||
|
defer n.RUnlock()
|
||||||
|
|
||||||
node := &pb.Node{
|
node := &pb.Node{
|
||||||
Id: n.id,
|
Id: n.id,
|
||||||
Address: n.address,
|
Address: n.address,
|
||||||
@ -176,7 +178,7 @@ func (n *node) getProtoTopology(depth uint) (*pb.Peer, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// unpackPeer unpacks pb.Peer into node topology of given depth
|
// unpackPeer unpacks pb.Peer into node topology of given depth
|
||||||
// NOTE: this method is NOT thread-safe, so make sure you serialize access to it
|
// NOTE: this function is not thread-safe
|
||||||
func unpackPeer(pbPeer *pb.Peer, depth uint) *node {
|
func unpackPeer(pbPeer *pb.Peer, depth uint) *node {
|
||||||
peerNode := &node{
|
peerNode := &node{
|
||||||
id: pbPeer.Node.Id,
|
id: pbPeer.Node.Id,
|
||||||
@ -203,16 +205,17 @@ func unpackPeer(pbPeer *pb.Peer, depth uint) *node {
|
|||||||
return peerNode
|
return peerNode
|
||||||
}
|
}
|
||||||
|
|
||||||
// updatePeer updates node peer up to given depth
|
// updateTopology updates node peer topology down to given depth
|
||||||
// NOTE: this method is not thread safe, so make sure you serialize access to it
|
|
||||||
func (n *node) updatePeerTopology(pbPeer *pb.Peer, depth uint) error {
|
func (n *node) updatePeerTopology(pbPeer *pb.Peer, depth uint) error {
|
||||||
|
n.Lock()
|
||||||
|
defer n.Unlock()
|
||||||
|
|
||||||
if pbPeer == nil {
|
if pbPeer == nil {
|
||||||
return errors.New("peer not initialized")
|
return errors.New("peer not initialized")
|
||||||
}
|
}
|
||||||
|
|
||||||
// NOTE: we need MaxDepth-1 because node n is the parent adding which
|
// unpack Peer topology into *node
|
||||||
// gives us the max peer topology we maintain and propagate
|
peer := unpackPeer(pbPeer, depth)
|
||||||
peer := unpackPeer(pbPeer, MaxDepth-1)
|
|
||||||
|
|
||||||
// update node peers with new topology
|
// update node peers with new topology
|
||||||
n.peers[pbPeer.Node.Id] = peer
|
n.peers[pbPeer.Node.Id] = peer
|
||||||
|
@ -2,6 +2,8 @@ package network
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
pb "github.com/micro/go-micro/network/proto"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -229,3 +231,111 @@ func TestTopology(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestUpdatePeerTopology(t *testing.T) {
|
||||||
|
// single node
|
||||||
|
single := &node{
|
||||||
|
id: testNodeId,
|
||||||
|
address: testNodeAddress,
|
||||||
|
peers: make(map[string]*node),
|
||||||
|
network: newNetwork(Name(testNodeNetName)),
|
||||||
|
}
|
||||||
|
// nil peer should return error
|
||||||
|
if err := single.updatePeerTopology(nil, 5); err == nil {
|
||||||
|
t.Errorf("Expected error, got %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// update with peer that is not yet in the peer map
|
||||||
|
pbPeer := &pb.Peer{
|
||||||
|
Node: &pb.Node{
|
||||||
|
Id: "newPeer",
|
||||||
|
Address: "newPeerAddress",
|
||||||
|
},
|
||||||
|
Peers: make([]*pb.Peer, 0),
|
||||||
|
}
|
||||||
|
// it should add pbPeer to the single node peers
|
||||||
|
if err := single.updatePeerTopology(pbPeer, 5); err != nil {
|
||||||
|
t.Errorf("Error updating topology: %s", err)
|
||||||
|
}
|
||||||
|
if _, ok := single.peers[pbPeer.Node.Id]; !ok {
|
||||||
|
t.Errorf("Expected %s to be added to %s peers", pbPeer.Node.Id, single.id)
|
||||||
|
}
|
||||||
|
|
||||||
|
// complicated node graph
|
||||||
|
node := testSetup()
|
||||||
|
// build a simple topology to update node peer1
|
||||||
|
peer1 := node.peers["peer1"]
|
||||||
|
pbPeer1Node := &pb.Node{
|
||||||
|
Id: peer1.id,
|
||||||
|
Address: peer1.address,
|
||||||
|
}
|
||||||
|
|
||||||
|
pbPeer111 := &pb.Peer{
|
||||||
|
Node: &pb.Node{
|
||||||
|
Id: "peer111",
|
||||||
|
Address: "peer111Address",
|
||||||
|
},
|
||||||
|
Peers: make([]*pb.Peer, 0),
|
||||||
|
}
|
||||||
|
|
||||||
|
pbPeer121 := &pb.Peer{
|
||||||
|
Node: &pb.Node{
|
||||||
|
Id: "peer121",
|
||||||
|
Address: "peer121Address",
|
||||||
|
},
|
||||||
|
Peers: make([]*pb.Peer, 0),
|
||||||
|
}
|
||||||
|
// topology to update
|
||||||
|
pbPeer1 := &pb.Peer{
|
||||||
|
Node: pbPeer1Node,
|
||||||
|
Peers: []*pb.Peer{pbPeer111, pbPeer121},
|
||||||
|
}
|
||||||
|
// update peer1 topology
|
||||||
|
if err := node.updatePeerTopology(pbPeer1, 5); err != nil {
|
||||||
|
t.Errorf("Error updating topology: %s", err)
|
||||||
|
}
|
||||||
|
// make sure peer1 topology has been correctly updated
|
||||||
|
newPeerIds := []string{pbPeer111.Node.Id, pbPeer121.Node.Id}
|
||||||
|
for _, id := range newPeerIds {
|
||||||
|
if _, ok := node.peers["peer1"].peers[id]; !ok {
|
||||||
|
t.Errorf("Expected %s to be a peer of %s", id, "peer1")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetProtoTopology(t *testing.T) {
|
||||||
|
// single node
|
||||||
|
single := &node{
|
||||||
|
id: testNodeId,
|
||||||
|
address: testNodeAddress,
|
||||||
|
peers: make(map[string]*node),
|
||||||
|
network: newNetwork(Name(testNodeNetName)),
|
||||||
|
}
|
||||||
|
topCount := 0
|
||||||
|
|
||||||
|
protoTop, err := single.getProtoTopology(10)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Error getting proto topology: %s", err)
|
||||||
|
}
|
||||||
|
if len(protoTop.Peers) != topCount {
|
||||||
|
t.Errorf("Expected to find %d nodes, found: %d", topCount, len(protoTop.Peers))
|
||||||
|
}
|
||||||
|
|
||||||
|
// complicated node graph
|
||||||
|
node := testSetup()
|
||||||
|
topCount = 3
|
||||||
|
// list of ids of nodes of depth 1 i.e. node peers
|
||||||
|
peerIds := make(map[string]bool)
|
||||||
|
// add peer Ids
|
||||||
|
for _, id := range testNodePeerIds {
|
||||||
|
peerIds[id] = true
|
||||||
|
}
|
||||||
|
// depth 1 should give us immmediate neighbours only
|
||||||
|
protoTop, err = node.getProtoTopology(1)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Error getting proto topology: %s", err)
|
||||||
|
}
|
||||||
|
if len(protoTop.Peers) != topCount {
|
||||||
|
t.Errorf("Expected to find %d nodes, found: %d", topCount, len(protoTop.Peers))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user