Added more node tests. Small refactoring of Netowkr and handler.

This commit is contained in:
Milos Gajdos 2019-09-11 00:23:37 +01:00
parent 16fcf1fbda
commit 2dfbe93d65
No known key found for this signature in database
GPG Key ID: 8B31058CC55DFD4F
7 changed files with 145 additions and 159 deletions

View File

@ -838,6 +838,16 @@ func (n *network) Connect() error {
return nil
}
// Nodes returns a list of all network nodes
func (n *network) Nodes() []Node {
return n.node.Nodes()
}
// Topology returns network topology
func (n *network) Topology() Node {
return n.node.Topology(MaxDepth)
}
func (n *network) close() error {
// stop the server
if err := n.server.Stop(); err != nil {

View File

@ -51,37 +51,46 @@ func (n *Network) ListPeers(ctx context.Context, req *pbNet.PeerRequest, resp *p
return nil
}
// Topology returns a list of nodes in node topology i.e. it returns all (in)directly reachable nodes from this node
func (n *Network) Topology(ctx context.Context, req *pbNet.TopologyRequest, resp *pbNet.TopologyResponse) error {
// NOTE: we are downcasting here
depth := uint(req.Depth)
if depth <= 0 {
depth = network.MaxDepth
// toplogyToProto recursively traverses node topology and returns it
func toplogyToProto(node network.Node, pbPeer *pbNet.Peer) *pbNet.Peer {
// return if we reached the end of topology
if len(node.Peers()) == 0 {
return pbPeer
}
// get topology
topNodes := n.Network.Topology(depth)
var nodes []*pbNet.Node
for _, topNode := range topNodes {
// creaate peers answer
for _, topNode := range node.Peers() {
pbNode := &pbNet.Node{
Id: topNode.Id(),
Address: topNode.Address(),
}
nodes = append(nodes, pbNode)
pbPeer := &pbNet.Peer{
Node: pbNode,
Peers: make([]*pbNet.Peer, 0),
}
peer := toplogyToProto(topNode, pbPeer)
pbPeer.Peers = append(pbPeer.Peers, peer)
}
// network node
return pbPeer
}
// Topology returns a list of nodes in node topology i.e. it returns all (in)directly reachable nodes from this node
func (n *Network) Topology(ctx context.Context, req *pbNet.TopologyRequest, resp *pbNet.TopologyResponse) error {
// get node topology
topNode := n.Network.Topology()
// network node aka root node
node := &pbNet.Node{
Id: n.Network.Id(),
Address: n.Network.Address(),
}
topology := &pbNet.Topology{
// we will build proto topology into this
pbPeer := &pbNet.Peer{
Node: node,
Nodes: nodes,
Peers: make([]*pbNet.Peer, 0),
}
// return topology encoded into protobuf
topology := toplogyToProto(topNode, pbPeer)
resp.Topology = topology

View File

@ -46,8 +46,8 @@ type Network interface {
Connect() error
// Nodes returns list of network nodes
Nodes() []Node
// Topology returns a list of all reachable nodes up to depth
Topology(depth uint) []Node
// Topology returns node topology up to MaxDepth hops
Topology() Node
// Close stops the tunnel and resolving
Close() error
// Client is micro client

View File

@ -3,7 +3,6 @@ package network
import (
"container/list"
"errors"
"sort"
"sync"
"time"
@ -96,10 +95,13 @@ func (n *node) Peers() []Node {
p := &node{
id: peer.id,
address: peer.address,
peers: make(map[string]*node),
network: peer.network,
}
// NOTE: we do not care about peer's peers
// we only collect the node's peers i.e. its adjacent nodes
// collect peer's peers aka pop (peer of peer)
for id, pop := range peer.peers {
p.peers[id] = pop
}
peers = append(peers, p)
}
n.RUnlock()
@ -107,36 +109,31 @@ func (n *node) Peers() []Node {
return peers
}
// Topology returns a slice of all nodes in reachable by node up to given depth
func (n *node) Topology(depth uint) []Node {
// get all the nodes
nodes := n.Nodes()
// topology returns network topology up to MaxDepth
func (n *node) Topology(depth uint) *node {
n.RLock()
// sort the slice of nodes
sort.Slice(nodes, func(i, j int) bool { return nodes[i].Id() <= nodes[j].Id() })
// find the node with our id
i := sort.Search(len(nodes), func(j int) bool { return nodes[j].Id() >= n.id })
// make a copy of yourself
node := &node{
id: n.id,
address: n.address,
peers: make(map[string]*node),
network: n.network,
}
// TODO: finish implementing this
var topology []Node
// collect all the reachable nodes into slice
if i < len(nodes) && nodes[i].Id() == n.id {
for _, peer := range nodes[i].Peers() {
// don't return yourself
if peer.Id() == n.id {
continue
}
topNode := &node{
id: peer.Id(),
address: peer.Address(),
}
topology = append(topology, topNode)
}
// return if we reach requested depth or we have no more peers
if depth == 0 || len(n.peers) == 0 {
return node
}
depth--
for _, peer := range n.peers {
nodePeer := peer.Topology(depth)
node.peers[nodePeer.id] = nodePeer
}
n.RUnlock()
return topology
return node
}
// getProtoTopology returns node peers up to given depth encoded in protobufs

View File

@ -1,6 +1,8 @@
package network
import "testing"
import (
"testing"
)
var (
testNodeId = "testNode"
@ -150,7 +152,7 @@ func TestPeers(t *testing.T) {
// iterate through the list of peers and makes sure all have been returned
for _, peer := range peers {
if _, ok := peerIds[peer.Id()]; !ok {
t.Errorf("Expected to find %s peer", node.Id())
t.Errorf("Expected to find %s peer", peer.Id())
}
}
}
@ -168,7 +170,47 @@ func TestTopology(t *testing.T) {
// you should not be in your topology
topCount := 0
if len(topology) != topCount {
t.Errorf("Expected to find %d nodes, found: %d", topCount, len(topology))
if len(topology.peers) != topCount {
t.Errorf("Expected to find %d nodes, found: %d", topCount, len(topology.peers))
}
// complicated node graph
node := testSetup()
// list of ids of nodes of depth 1 i.e. node peers
peerIds := make(map[string]bool)
// add peer Ids
for _, id := range testNodePeerIds {
peerIds[id] = true
}
topology = node.Topology(1)
// depth 1 should return only immediate peers
if len(topology.peers) != len(peerIds) {
t.Errorf("Expected to find %d nodes, found: %d", len(peerIds), len(topology.peers))
}
for id := range topology.peers {
if _, ok := peerIds[id]; !ok {
t.Errorf("Expected to find %s peer", id)
}
}
// add peers of peers to peerIds
for _, id := range testPeerOfPeerIds {
peerIds[id] = true
}
topology = node.Topology(2)
// iterate through the whole graph
// NOTE: this is a manual iteration as we know the size of the graph
for id, peer := range topology.peers {
if _, ok := peerIds[id]; !ok {
t.Errorf("Expected to find %s peer", peer.Id())
}
// peers of peers
for id := range peer.peers {
if _, ok := peerIds[id]; !ok {
t.Errorf("Expected to find %s peer", peer.Id())
}
}
}
}

View File

@ -178,9 +178,7 @@ func (m *PeerResponse) GetPeers() []*Node {
// TopologyRequest list node topology
type TopologyRequest struct {
// node id
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
// topology depth
Depth uint64 `protobuf:"varint,2,opt,name=depth,proto3" json:"depth,omitempty"`
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -218,19 +216,12 @@ func (m *TopologyRequest) GetId() string {
return ""
}
func (m *TopologyRequest) GetDepth() uint64 {
if m != nil {
return m.Depth
}
return 0
}
// TopologyResponse is returned by Topology
type TopologyResponse struct {
Topology *Topology `protobuf:"bytes,1,opt,name=topology,proto3" json:"topology,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
Topology *Peer `protobuf:"bytes,1,opt,name=topology,proto3" json:"topology,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *TopologyResponse) Reset() { *m = TopologyResponse{} }
@ -258,7 +249,7 @@ func (m *TopologyResponse) XXX_DiscardUnknown() {
var xxx_messageInfo_TopologyResponse proto.InternalMessageInfo
func (m *TopologyResponse) GetTopology() *Topology {
func (m *TopologyResponse) GetTopology() *Peer {
if m != nil {
return m.Topology
}
@ -315,56 +306,6 @@ func (m *Node) GetAddress() string {
return ""
}
// Topology is used to nnounce node neighbourhood
type Topology struct {
// network node
Node *Node `protobuf:"bytes,1,opt,name=node,proto3" json:"node,omitempty"`
// neighbours
Nodes []*Node `protobuf:"bytes,2,rep,name=nodes,proto3" json:"nodes,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Topology) Reset() { *m = Topology{} }
func (m *Topology) String() string { return proto.CompactTextString(m) }
func (*Topology) ProtoMessage() {}
func (*Topology) Descriptor() ([]byte, []int) {
return fileDescriptor_8571034d60397816, []int{7}
}
func (m *Topology) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Topology.Unmarshal(m, b)
}
func (m *Topology) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Topology.Marshal(b, m, deterministic)
}
func (m *Topology) XXX_Merge(src proto.Message) {
xxx_messageInfo_Topology.Merge(m, src)
}
func (m *Topology) XXX_Size() int {
return xxx_messageInfo_Topology.Size(m)
}
func (m *Topology) XXX_DiscardUnknown() {
xxx_messageInfo_Topology.DiscardUnknown(m)
}
var xxx_messageInfo_Topology proto.InternalMessageInfo
func (m *Topology) GetNode() *Node {
if m != nil {
return m.Node
}
return nil
}
func (m *Topology) GetNodes() []*Node {
if m != nil {
return m.Nodes
}
return nil
}
// Connect is sent when the node connects to the network
type Connect struct {
// network mode
@ -378,7 +319,7 @@ func (m *Connect) Reset() { *m = Connect{} }
func (m *Connect) String() string { return proto.CompactTextString(m) }
func (*Connect) ProtoMessage() {}
func (*Connect) Descriptor() ([]byte, []int) {
return fileDescriptor_8571034d60397816, []int{8}
return fileDescriptor_8571034d60397816, []int{7}
}
func (m *Connect) XXX_Unmarshal(b []byte) error {
@ -419,7 +360,7 @@ func (m *Close) Reset() { *m = Close{} }
func (m *Close) String() string { return proto.CompactTextString(m) }
func (*Close) ProtoMessage() {}
func (*Close) Descriptor() ([]byte, []int) {
return fileDescriptor_8571034d60397816, []int{9}
return fileDescriptor_8571034d60397816, []int{8}
}
func (m *Close) XXX_Unmarshal(b []byte) error {
@ -460,7 +401,7 @@ func (m *Solicit) Reset() { *m = Solicit{} }
func (m *Solicit) String() string { return proto.CompactTextString(m) }
func (*Solicit) ProtoMessage() {}
func (*Solicit) Descriptor() ([]byte, []int) {
return fileDescriptor_8571034d60397816, []int{10}
return fileDescriptor_8571034d60397816, []int{9}
}
func (m *Solicit) XXX_Unmarshal(b []byte) error {
@ -503,7 +444,7 @@ func (m *Peer) Reset() { *m = Peer{} }
func (m *Peer) String() string { return proto.CompactTextString(m) }
func (*Peer) ProtoMessage() {}
func (*Peer) Descriptor() ([]byte, []int) {
return fileDescriptor_8571034d60397816, []int{11}
return fileDescriptor_8571034d60397816, []int{10}
}
func (m *Peer) XXX_Unmarshal(b []byte) error {
@ -546,7 +487,6 @@ func init() {
proto.RegisterType((*TopologyRequest)(nil), "go.micro.network.TopologyRequest")
proto.RegisterType((*TopologyResponse)(nil), "go.micro.network.TopologyResponse")
proto.RegisterType((*Node)(nil), "go.micro.network.Node")
proto.RegisterType((*Topology)(nil), "go.micro.network.Topology")
proto.RegisterType((*Connect)(nil), "go.micro.network.Connect")
proto.RegisterType((*Close)(nil), "go.micro.network.Close")
proto.RegisterType((*Solicit)(nil), "go.micro.network.Solicit")
@ -556,31 +496,29 @@ func init() {
func init() { proto.RegisterFile("network.proto", fileDescriptor_8571034d60397816) }
var fileDescriptor_8571034d60397816 = []byte{
// 412 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x94, 0xdf, 0x8a, 0xd3, 0x40,
0x14, 0xc6, 0x37, 0x31, 0x35, 0xdb, 0x13, 0x57, 0x97, 0x41, 0x24, 0x14, 0x2a, 0x75, 0xae, 0x8a,
0x68, 0x2a, 0x2d, 0xab, 0x37, 0xde, 0xed, 0x85, 0x20, 0xcb, 0x22, 0xa9, 0x0f, 0xa0, 0xcd, 0x0c,
0x69, 0xb0, 0xcd, 0x89, 0x33, 0x53, 0xc4, 0xe7, 0xf3, 0xc5, 0x64, 0xfe, 0xa4, 0x89, 0xa6, 0xa9,
0xe6, 0xae, 0x67, 0xce, 0xf9, 0x7e, 0x5f, 0x72, 0xfa, 0x4d, 0xe0, 0xaa, 0xe4, 0xea, 0x07, 0x8a,
0x6f, 0x49, 0x25, 0x50, 0x21, 0xb9, 0xce, 0x31, 0xd9, 0x17, 0x99, 0xc0, 0xc4, 0x9d, 0x4f, 0x56,
0x79, 0xa1, 0xb6, 0x87, 0x4d, 0x92, 0xe1, 0x7e, 0x61, 0x3a, 0x8b, 0x1c, 0x5f, 0xdb, 0x1f, 0x02,
0x0f, 0x8a, 0x8b, 0x85, 0x51, 0xba, 0xc2, 0x62, 0xe8, 0x15, 0x44, 0x77, 0x85, 0x54, 0x29, 0xff,
0x7e, 0xe0, 0x52, 0xd1, 0xf7, 0xf0, 0xc8, 0x96, 0xb2, 0xc2, 0x52, 0x72, 0xf2, 0x0a, 0x46, 0x25,
0x32, 0x2e, 0x63, 0x6f, 0xf6, 0x60, 0x1e, 0x2d, 0x9f, 0x25, 0x7f, 0xbb, 0x26, 0xf7, 0xc8, 0x78,
0x6a, 0x87, 0xe8, 0x14, 0xa2, 0x4f, 0x9c, 0x0b, 0x07, 0x23, 0x8f, 0xc1, 0x2f, 0x58, 0xec, 0xcd,
0xbc, 0xf9, 0x38, 0xf5, 0x0b, 0xa6, 0xe1, 0xb6, 0xdd, 0xc0, 0x2b, 0xce, 0xc5, 0x3f, 0xe1, 0x66,
0x88, 0xbe, 0x83, 0x27, 0x9f, 0xb1, 0xc2, 0x1d, 0xe6, 0x3f, 0x7b, 0x0c, 0xc8, 0x53, 0x18, 0x31,
0x5e, 0xa9, 0x6d, 0xec, 0xcf, 0xbc, 0x79, 0x90, 0xda, 0x82, 0x7e, 0x84, 0xeb, 0x46, 0xe8, 0xac,
0xdf, 0xc2, 0xa5, 0x72, 0x67, 0x46, 0x1f, 0x2d, 0x27, 0x5d, 0xf7, 0xa3, 0xea, 0x38, 0x4b, 0xdf,
0x40, 0xa0, 0x9f, 0xa9, 0xe3, 0x1c, 0x43, 0xf8, 0x95, 0x31, 0xc1, 0xa5, 0x34, 0xde, 0xe3, 0xb4,
0x2e, 0x29, 0x83, 0xcb, 0x9a, 0x43, 0x5e, 0x42, 0xa0, 0x17, 0xe5, 0x1c, 0xfb, 0xde, 0xd7, 0xcc,
0x34, 0x9b, 0xf7, 0xff, 0x67, 0xf3, 0x37, 0x10, 0xde, 0x62, 0x59, 0xf2, 0x4c, 0x0d, 0x31, 0xa1,
0x2b, 0x18, 0xdd, 0xee, 0x50, 0xf2, 0x41, 0xa2, 0x1b, 0x08, 0xd7, 0xb8, 0x2b, 0xb2, 0x62, 0x98,
0xd7, 0x17, 0x08, 0xf4, 0xbf, 0x3f, 0x74, 0x09, 0x36, 0x21, 0xbd, 0x4b, 0x30, 0x81, 0xb2, 0x43,
0xcb, 0x5f, 0x3e, 0x84, 0xf7, 0xf6, 0x9c, 0xdc, 0xc1, 0x58, 0x07, 0x59, 0xb3, 0x24, 0x99, 0x76,
0x75, 0xad, 0xd0, 0x4f, 0x9e, 0xf7, 0xb5, 0x6d, 0x58, 0xe8, 0x45, 0x4d, 0xd3, 0x66, 0x27, 0x69,
0xad, 0xd4, 0x9f, 0xa2, 0xb5, 0x53, 0x4f, 0x2f, 0xc8, 0xba, 0x15, 0x89, 0x17, 0x67, 0x62, 0xe7,
0x80, 0xf4, 0xdc, 0xc8, 0x11, 0xfa, 0x01, 0xc0, 0x3c, 0xb4, 0xbe, 0xdc, 0x92, 0xc4, 0x8d, 0xc6,
0x5d, 0xf7, 0x9a, 0x36, 0xed, 0x74, 0xfe, 0x7c, 0xd7, 0xcd, 0x43, 0xf3, 0x61, 0x58, 0xfd, 0x0e,
0x00, 0x00, 0xff, 0xff, 0xdc, 0xcf, 0x08, 0x89, 0x70, 0x04, 0x00, 0x00,
// 382 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x93, 0x51, 0x4f, 0xea, 0x30,
0x14, 0xc7, 0x61, 0x17, 0xee, 0xe0, 0x70, 0xb9, 0x92, 0x3e, 0x98, 0x85, 0x04, 0x03, 0x7d, 0x22,
0x46, 0x87, 0x81, 0xf0, 0xe6, 0x1b, 0x89, 0xbe, 0x10, 0x62, 0xc0, 0x0f, 0xa0, 0x6c, 0xcd, 0x6c,
0x84, 0x9d, 0xd9, 0x96, 0x18, 0x3f, 0x9f, 0x5f, 0xcc, 0xb4, 0xdd, 0x18, 0x02, 0xd3, 0xf0, 0xb6,
0x9e, 0xf3, 0x3f, 0xbf, 0xff, 0xda, 0xfe, 0x0b, 0xcd, 0x98, 0xa9, 0x77, 0x14, 0xaf, 0x7e, 0x22,
0x50, 0x21, 0x69, 0x45, 0xe8, 0xaf, 0x79, 0x20, 0xd0, 0x4f, 0xeb, 0xed, 0x51, 0xc4, 0xd5, 0xcb,
0x66, 0xe9, 0x07, 0xb8, 0x1e, 0x98, 0xce, 0x20, 0xc2, 0x6b, 0xfb, 0x21, 0x70, 0xa3, 0x98, 0x18,
0x98, 0xc9, 0x74, 0x61, 0x31, 0xb4, 0x09, 0x8d, 0x29, 0x97, 0x6a, 0xce, 0xde, 0x36, 0x4c, 0x2a,
0x7a, 0x0b, 0xff, 0xec, 0x52, 0x26, 0x18, 0x4b, 0x46, 0xae, 0xa0, 0x1a, 0x63, 0xc8, 0xa4, 0x57,
0xee, 0xfe, 0xe9, 0x37, 0x86, 0xe7, 0xfe, 0xbe, 0xab, 0x3f, 0xc3, 0x90, 0xcd, 0xad, 0x88, 0x76,
0xa0, 0xf1, 0xc0, 0x98, 0x48, 0x61, 0xe4, 0x3f, 0x38, 0x3c, 0xf4, 0xca, 0xdd, 0x72, 0xbf, 0x3e,
0x77, 0x78, 0xa8, 0xe1, 0xb6, 0x9d, 0xc3, 0x13, 0xc6, 0xc4, 0xaf, 0x70, 0x23, 0xa2, 0x3d, 0x38,
0x7b, 0xc4, 0x04, 0x57, 0x18, 0x7d, 0x14, 0x19, 0xdc, 0x41, 0x2b, 0x97, 0xa4, 0x26, 0x43, 0xa8,
0xa9, 0xb4, 0x66, 0x94, 0x47, 0x7d, 0xcc, 0x6f, 0x6d, 0x75, 0xf4, 0x06, 0x2a, 0xda, 0x79, 0x9f,
0x4f, 0x3c, 0x70, 0x9f, 0xc3, 0x50, 0x30, 0x29, 0x3d, 0xc7, 0x14, 0xb3, 0x25, 0x1d, 0x83, 0x3b,
0xc1, 0x38, 0x66, 0x81, 0x22, 0x97, 0x50, 0xd1, 0xa7, 0x51, 0x6c, 0x66, 0x36, 0x65, 0x34, 0x74,
0x04, 0xd5, 0xc9, 0x0a, 0x25, 0x3b, 0x69, 0x68, 0x0c, 0xee, 0x02, 0x57, 0x3c, 0xe0, 0xa7, 0x79,
0x3d, 0x41, 0x45, 0x6f, 0xf3, 0x94, 0x99, 0xfc, 0x86, 0x9c, 0xa2, 0x1b, 0x32, 0x27, 0x67, 0x45,
0xc3, 0x4f, 0x07, 0xdc, 0x99, 0xad, 0x93, 0x29, 0xd4, 0x75, 0x90, 0x34, 0x4b, 0x92, 0xce, 0xe1,
0xdc, 0x4e, 0xe8, 0xda, 0x17, 0x45, 0x6d, 0x7b, 0x85, 0xb4, 0x94, 0xd1, 0xb4, 0xd9, 0x51, 0xda,
0x4e, 0xea, 0x8e, 0xd1, 0x76, 0x53, 0x47, 0x4b, 0x64, 0x01, 0xb5, 0x2c, 0x26, 0xa4, 0x77, 0xa8,
0xde, 0x4b, 0x59, 0x9b, 0xfe, 0x24, 0xd9, 0x42, 0xef, 0x01, 0xcc, 0x4f, 0xeb, 0xc7, 0x25, 0x89,
0x97, 0xcf, 0xa4, 0xcf, 0x2d, 0xa3, 0x75, 0x0e, 0x3a, 0xdf, 0xf7, 0xba, 0xfc, 0x6b, 0x1e, 0xe6,
0xe8, 0x2b, 0x00, 0x00, 0xff, 0xff, 0x0d, 0xd5, 0xbf, 0xdc, 0xf0, 0x03, 0x00, 0x00,
}

View File

@ -36,13 +36,11 @@ message PeerResponse {
message TopologyRequest {
// node id
string id = 1;
// topology depth
uint64 depth = 2;
}
// TopologyResponse is returned by Topology
message TopologyResponse {
Topology topology = 1;
Peer topology = 1;
}
// Node is network node
@ -53,14 +51,6 @@ message Node {
string address = 2;
}
// Topology is used to nnounce node neighbourhood
message Topology {
// network node
Node node = 1;
// neighbours
repeated Node nodes = 2;
}
// Connect is sent when the node connects to the network
message Connect {
// network mode