diff --git a/network/default.go b/network/default.go index 2da925eb..1a314cee 100644 --- a/network/default.go +++ b/network/default.go @@ -124,9 +124,10 @@ func newNetwork(opts ...Option) Network { // Options returns network options func (n *network) Options() Options { - n.Lock() + n.RLock() + defer n.RUnlock() + options := n.options - n.Unlock() return options } @@ -196,7 +197,6 @@ func (n *network) handleNetConn(sess tunnel.Session, msg chan *transport.Message for { m := new(transport.Message) if err := sess.Recv(m); err != nil { - // TODO: should we bail here? log.Debugf("Network tunnel [%s] receive error: %v", NetworkChannel, err) return } @@ -255,28 +255,23 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen if pbNetConnect.Node.Id == n.options.Id { continue } - n.Lock() log.Debugf("Network received connect message from: %s", pbNetConnect.Node.Id) - // if the entry already exists skip adding it - if peer, ok := n.peers[pbNetConnect.Node.Id]; ok { - // update lastSeen timestamp - if n.peers[pbNetConnect.Node.Id].lastSeen.Before(now) { - peer.lastSeen = now - } - n.Unlock() - continue - } - // add a new peer to the node peers - // NOTE: new node does not have any peers, yet - n.peers[pbNetConnect.Node.Id] = &node{ + peer := &node{ id: pbNetConnect.Node.Id, address: pbNetConnect.Node.Address, peers: make(map[string]*node), lastSeen: now, } - n.Unlock() - // get all the node peers down to MaxDepth encoded in protobuf - msg := PeersToProto(n.node, n.Peers(), MaxDepth) + if ok := n.node.AddPeer(peer); !ok { + log.Debugf("Network peer exists, refreshing: %s", peer.id) + // update lastSeen time for the peer + if ok := n.RefreshPeer(peer.id, now); !ok { + log.Debugf("Network failed refreshing peer: %s", peer.id) + } + continue + } + // get node peers down to MaxDepth encoded in protobuf + msg := PeersToProto(n.node, MaxDepth) // advertise yourself to the network if err := n.sendMsg("peer", msg, NetworkChannel); err != nil { log.Debugf("Network failed to advertise peers: %v", err) @@ -297,18 +292,14 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen if pbNetPeer.Node.Id == n.options.Id { continue } - n.Lock() log.Debugf("Network received peer message from: %s", pbNetPeer.Node.Id) - // only add the peer if it is NOT already in node's list of peers - _, exists := n.peers[pbNetPeer.Node.Id] - if !exists { - n.peers[pbNetPeer.Node.Id] = &node{ - id: pbNetPeer.Node.Id, - address: pbNetPeer.Node.Address, - peers: make(map[string]*node), - lastSeen: now, - } - n.Unlock() + peer := &node{ + id: pbNetPeer.Node.Id, + address: pbNetPeer.Node.Address, + peers: make(map[string]*node), + lastSeen: now, + } + if ok := n.node.AddPeer(peer); ok { // send a solicit message when discovering new peer msg := &pbRtr.Solicit{ Id: n.options.Id, @@ -316,18 +307,19 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen if err := n.sendMsg("solicit", msg, ControlChannel); err != nil { log.Debugf("Network failed to send solicit message: %s", err) } - // after adding new peer go to the next step continue } - // NOTE: we don't update MaxDepth toplogy as we dont update this node only its peers - if err := n.node.updatePeerTopology(pbNetPeer, MaxDepth-1); err != nil { + log.Debugf("Network peer exists, refreshing: %s", pbNetPeer.Node.Id) + // update lastSeen time for the peer + if ok := n.RefreshPeer(pbNetPeer.Node.Id, now); !ok { + log.Debugf("Network failed refreshing peer: %s", pbNetPeer.Node.Id) + } + // NOTE: we don't uunpack MaxDepth toplogy + peer = UnpackPeerTopology(pbNetPeer, now, MaxDepth-1) + log.Debugf("Network updating topology of node: %s", n.node.id) + if ok := n.node.UpdatePeer(peer); !ok { log.Debugf("Network failed to update peers") } - // update lastSeen timestamp if outdated - if n.peers[pbNetPeer.Node.Id].lastSeen.Before(now) { - n.peers[pbNetPeer.Node.Id].lastSeen = now - } - n.Unlock() case "close": pbNetClose := &pbNet.Close{} if err := proto.Unmarshal(m.Body, pbNetClose); err != nil { @@ -338,13 +330,11 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen if pbNetClose.Node.Id == n.options.Id { continue } - n.Lock() log.Debugf("Network received close message from: %s", pbNetClose.Node.Id) if err := n.pruneNode(pbNetClose.Node.Id); err != nil { log.Debugf("Network failed to prune the node %s: %v", pbNetClose.Node.Id, err) continue } - n.Unlock() } case <-n.closed: return @@ -393,7 +383,7 @@ func (n *network) announce(client transport.Client) { case <-n.closed: return case <-announce.C: - msg := PeersToProto(n.node, n.Peers(), MaxDepth) + msg := PeersToProto(n.node, MaxDepth) // advertise yourself to the network if err := n.sendMsg("peer", msg, NetworkChannel); err != nil { log.Debugf("Network failed to advertise peers: %v", err) @@ -404,9 +394,9 @@ func (n *network) announce(client transport.Client) { } // pruneNode removes a node with given id from the list of peers. It also removes all routes originted by this node. -// NOTE: this method is not thread-safe; when calling it make sure you lock the particular code segment func (n *network) pruneNode(id string) error { - delete(n.peers, id) + // DeletePeer serializes access + n.node.DeletePeer(id) // lookup all the routes originated at this node q := router.NewQuery( router.QueryRouter(id), @@ -506,11 +496,9 @@ func (n *network) setRouteMetric(route *router.Route) { return } - n.RLock() // check if the route origin is our peer if _, ok := n.peers[route.Router]; ok { route.Metric = 10 - n.RUnlock() return } @@ -519,12 +507,10 @@ func (n *network) setRouteMetric(route *router.Route) { for id := range peer.peers { if route.Router == id { route.Metric = 100 - n.RUnlock() return } } } - n.RUnlock() // the origin of the route is beyond our neighbourhood route.Metric = 1000 @@ -544,7 +530,6 @@ func (n *network) processCtrlChan(client transport.Client, listener tunnel.Liste // switch on type of message and take action switch m.Header["Micro-Method"] { case "advert": - now := time.Now() pbRtrAdvert := &pbRtr.Advert{} if err := proto.Unmarshal(m.Body, pbRtrAdvert); err != nil { log.Debugf("Network fail to unmarshal advert message: %v", err) @@ -554,27 +539,13 @@ func (n *network) processCtrlChan(client transport.Client, listener tunnel.Liste if pbRtrAdvert.Id == n.options.Id { continue } - // loookup advertising node in our peers - n.Lock() log.Debugf("Network received advert message from: %s", pbRtrAdvert.Id) - advertNode, ok := n.peers[pbRtrAdvert.Id] - if !ok { - // advertising node has not been registered as our peer, yet - // let's add it to our peers - advertNode = &node{ - id: pbRtrAdvert.Id, - peers: make(map[string]*node), - lastSeen: now, - } - n.peers[pbRtrAdvert.Id] = advertNode - n.Unlock() - // send a solicit message when discovering a new node - msg := &pbRtr.Solicit{ - Id: n.options.Id, - } - if err := n.sendMsg("solicit", msg, ControlChannel); err != nil { - log.Debugf("Network failed to send solicit message: %s", err) - } + // loookup advertising node in our peers + advertNode := n.node.GetPeer(pbRtrAdvert.Id) + // if we dont recognize the node as our peer we skip processing its adverts + if advertNode == nil { + log.Debugf("Network skipping advert message from unknown peer: %s", pbRtrAdvert.Id) + continue } var events []*router.Event @@ -585,13 +556,17 @@ func (n *network) processCtrlChan(client transport.Client, listener tunnel.Liste // as our peer when we received the advert from it if advertNode.address == "" { advertNode.address = event.Route.Gateway + if ok := n.node.UpdatePeer(advertNode); !ok { + log.Debugf("Network failed to update peer: %s", advertNode.id) + continue + } } // if advertising node id is not the same as Route.Router // we know the advertising node is not the origin of the route if advertNode.id != event.Route.Router { // if the origin router is not the advertising node peer // we can't rule out potential routing loops so we bail here - if _, ok := advertNode.peers[event.Route.Router]; !ok { + if peer := advertNode.GetPeer(event.Route.Router); peer == nil { continue } } @@ -605,7 +580,9 @@ func (n *network) processCtrlChan(client transport.Client, listener tunnel.Liste Metric: int(event.Route.Metric), } // set the route metric + n.node.RLock() n.setRouteMetric(&route) + n.node.RUnlock() // throw away metric bigger than 1000 if route.Metric > 1000 { log.Debugf("Network route metric %d dropping node: %s", route.Metric, route.Router) @@ -619,6 +596,7 @@ func (n *network) processCtrlChan(client transport.Client, listener tunnel.Liste } events = append(events, e) } + // create an advert and process it advert := &router.Advert{ Id: pbRtrAdvert.Id, Type: router.AdvertType(pbRtrAdvert.Type), @@ -627,9 +605,9 @@ func (n *network) processCtrlChan(client transport.Client, listener tunnel.Liste Events: events, } + log.Debugf("Network router processing advert: %s", advert.Id) if err := n.Router.Process(advert); err != nil { log.Debugf("Network failed to process advert %s: %v", advert.Id, err) - continue } case "solicit": pbRtrSolicit := &pbRtr.Solicit{} @@ -642,6 +620,7 @@ func (n *network) processCtrlChan(client transport.Client, listener tunnel.Liste if pbRtrSolicit.Id == n.options.Id { continue } + log.Debugf("Network router flushing routes for: %s", pbRtrSolicit.Id) // advertise all the routes when a new node has connected if err := n.Router.Solicit(); err != nil { log.Debugf("Network failed to solicit routes: %s", err) @@ -685,7 +664,7 @@ func (n *network) advertise(client transport.Client, advertChan <-chan *router.A Timestamp: advert.Timestamp.UnixNano(), Events: events, } - if err := n.sendMsg("advert", msg, NetworkChannel); err != nil { + if err := n.sendMsg("advert", msg, ControlChannel); err != nil { log.Debugf("Network failed to advertise routes: %v", err) continue } @@ -823,16 +802,27 @@ func (n *network) close() error { // Close closes network connection func (n *network) Close() error { n.Lock() - defer n.Unlock() if !n.connected { + n.Unlock() return nil } select { case <-n.closed: + n.Unlock() return nil default: + // TODO: send close message to the network channel + close(n.closed) + // set connected to false + n.connected = false + + // unlock the lock otherwise we'll deadlock sending the close + n.Unlock() + + // send close message only if we managed to connect to NetworkChannel + log.Debugf("Sending close message from: %s", n.options.Id) msg := &pbNet.Close{ Node: &pbNet.Node{ Id: n.options.Id, @@ -842,10 +832,6 @@ func (n *network) Close() error { if err := n.sendMsg("close", msg, NetworkChannel); err != nil { log.Debugf("Network failed to send close message: %s", err) } - // TODO: send close message to the network channel - close(n.closed) - // set connected to false - n.connected = false } return n.close() diff --git a/network/handler/handler.go b/network/handler/handler.go index 759e21a4..a0905936 100644 --- a/network/handler/handler.go +++ b/network/handler/handler.go @@ -22,11 +22,8 @@ func (n *Network) ListPeers(ctx context.Context, req *pbNet.PeerRequest, resp *p depth = network.MaxDepth } - // get node peers - nodePeers := n.Network.Peers() - // get peers encoded into protobuf - peers := network.PeersToProto(n.Network, nodePeers, depth) + peers := network.PeersToProto(n.Network, depth) resp.Peers = peers diff --git a/network/node.go b/network/node.go index b5a318ee..cbd19d10 100644 --- a/network/node.go +++ b/network/node.go @@ -2,7 +2,6 @@ package network import ( "container/list" - "errors" "sync" "time" @@ -44,6 +43,86 @@ func (n *node) Network() Network { return n.network } +// AddPeer adds a new peer to node +// It returns false if the peer already exists +func (n *node) AddPeer(peer *node) bool { + n.Lock() + defer n.Unlock() + + if _, ok := n.peers[peer.id]; !ok { + n.peers[peer.id] = peer + return true + } + + return false +} + +// GetPeer returns a peer if it exists +// It returns nil if the peer was not found +func (n *node) GetPeer(id string) *node { + n.RLock() + defer n.RUnlock() + + p, ok := n.peers[id] + if !ok { + return nil + } + + peer := &node{ + id: p.id, + address: p.address, + peers: make(map[string]*node), + network: p.network, + lastSeen: p.lastSeen, + } + + // TODO: recursively retrieve all of its peers + for id, pop := range p.peers { + peer.peers[id] = pop + } + + return peer +} + +// UpdatePeer updates a peer if it exists +// It returns false if the peer does not exist +func (n *node) UpdatePeer(peer *node) bool { + n.Lock() + defer n.Unlock() + + if _, ok := n.peers[peer.id]; ok { + n.peers[peer.id] = peer + return true + } + + return false +} + +// DeletePeer deletes a peer if it exists +func (n *node) DeletePeer(id string) { + n.Lock() + defer n.Unlock() + + delete(n.peers, id) +} + +// Refresh updates node timestamp +func (n *node) RefreshPeer(id string, now time.Time) bool { + n.Lock() + defer n.Unlock() + + peer, ok := n.peers[id] + if !ok { + return false + } + + if peer.lastSeen.Before(now) { + peer.lastSeen = now + } + + return true +} + // Nodes returns a slice if all nodes in node topology func (n *node) Nodes() []Node { // we need to freeze the network graph here @@ -90,10 +169,11 @@ func (n *node) Nodes() []Node { func (n *node) topology(depth uint) *node { // make a copy of yourself node := &node{ - id: n.id, - address: n.address, - peers: make(map[string]*node), - network: n.network, + id: n.id, + address: n.address, + peers: make(map[string]*node), + network: n.network, + lastSeen: n.lastSeen, } // return if we reach requested depth or we have no more peers @@ -128,31 +208,13 @@ func (n *node) Peers() []Node { return peers } -// updateTopology updates node peer topology down to given depth -func (n *node) updatePeerTopology(pbPeer *pb.Peer, depth uint) error { - n.Lock() - defer n.Unlock() - - if pbPeer == nil { - return errors.New("peer not initialized") - } - - // unpack Peer topology into *node - peer := unpackPeer(pbPeer, depth) - - // update node peers with new topology - n.peers[pbPeer.Node.Id] = peer - - return nil -} - -// unpackPeer unpacks pb.Peer into node topology of given depth -// NOTE: this function is not thread-safe -func unpackPeer(pbPeer *pb.Peer, depth uint) *node { +// UnpackPeerTopology unpacks pb.Peer into node topology of given depth +func UnpackPeerTopology(pbPeer *pb.Peer, lastSeen time.Time, depth uint) *node { peerNode := &node{ - id: pbPeer.Node.Id, - address: pbPeer.Node.Address, - peers: make(map[string]*node), + id: pbPeer.Node.Id, + address: pbPeer.Node.Address, + peers: make(map[string]*node), + lastSeen: lastSeen, } // return if have either reached the depth or have no more peers @@ -165,7 +227,7 @@ func unpackPeer(pbPeer *pb.Peer, depth uint) *node { peers := make(map[string]*node) for _, pbPeer := range pbPeer.Peers { - peer := unpackPeer(pbPeer, depth) + peer := UnpackPeerTopology(pbPeer, lastSeen, depth) peers[pbPeer.Node.Id] = peer } @@ -203,19 +265,19 @@ func peerTopology(peer Node, depth uint) *pb.Peer { } // PeersToProto returns node peers graph encoded into protobuf -func PeersToProto(root Node, peers []Node, depth uint) *pb.Peer { +func PeersToProto(node Node, depth uint) *pb.Peer { // network node aka root node - node := &pb.Node{ - Id: root.Id(), - Address: root.Address(), + pbNode := &pb.Node{ + Id: node.Id(), + Address: node.Address(), } // we will build proto topology into this pbPeers := &pb.Peer{ - Node: node, + Node: pbNode, Peers: make([]*pb.Peer, 0), } - for _, peer := range peers { + for _, peer := range node.Peers() { pbPeer := peerTopology(peer, depth) pbPeers.Peers = append(pbPeers.Peers, pbPeer) } diff --git a/network/node_test.go b/network/node_test.go index f5941c1a..82959693 100644 --- a/network/node_test.go +++ b/network/node_test.go @@ -2,6 +2,7 @@ package network import ( "testing" + "time" pb "github.com/micro/go-micro/network/proto" ) @@ -185,20 +186,7 @@ func TestPeers(t *testing.T) { } } -func TestUpdatePeerTopology(t *testing.T) { - // single node - single := &node{ - id: testNodeId, - address: testNodeAddress, - peers: make(map[string]*node), - network: newNetwork(Name(testNodeNetName)), - } - // nil peer should return error - if err := single.updatePeerTopology(nil, 5); err == nil { - t.Errorf("Expected error, got %s", err) - } - - // update with peer that is not yet in the peer map +func TestUnpackPeerTopology(t *testing.T) { pbPeer := &pb.Peer{ Node: &pb.Node{ Id: "newPeer", @@ -207,14 +195,11 @@ func TestUpdatePeerTopology(t *testing.T) { Peers: make([]*pb.Peer, 0), } // it should add pbPeer to the single node peers - if err := single.updatePeerTopology(pbPeer, 5); err != nil { - t.Errorf("Error updating topology: %s", err) - } - if _, ok := single.peers[pbPeer.Node.Id]; !ok { - t.Errorf("Expected %s to be added to %s peers", pbPeer.Node.Id, single.id) + peer := UnpackPeerTopology(pbPeer, time.Now(), 5) + if peer.id != pbPeer.Node.Id { + t.Errorf("Expected peer id %s, found: %s", pbPeer.Node.Id, peer.id) } - // complicated node graph node := testSetup() // build a simple topology to update node peer1 peer1 := node.peers["peer1"] @@ -243,14 +228,12 @@ func TestUpdatePeerTopology(t *testing.T) { Node: pbPeer1Node, Peers: []*pb.Peer{pbPeer111, pbPeer121}, } - // update peer1 topology - if err := node.updatePeerTopology(pbPeer1, 5); err != nil { - t.Errorf("Error updating topology: %s", err) - } + // unpack peer1 topology + peer = UnpackPeerTopology(pbPeer1, time.Now(), 5) // make sure peer1 topology has been correctly updated newPeerIds := []string{pbPeer111.Node.Id, pbPeer121.Node.Id} for _, id := range newPeerIds { - if _, ok := node.peers["peer1"].peers[id]; !ok { + if _, ok := peer.peers[id]; !ok { t.Errorf("Expected %s to be a peer of %s", id, "peer1") } } @@ -266,7 +249,7 @@ func TestPeersToProto(t *testing.T) { } topCount := 0 - protoPeers := PeersToProto(single, single.Peers(), 0) + protoPeers := PeersToProto(single, 0) if len(protoPeers.Peers) != topCount { t.Errorf("Expected to find %d nodes, found: %d", topCount, len(protoPeers.Peers)) @@ -282,7 +265,7 @@ func TestPeersToProto(t *testing.T) { peerIds[id] = true } // depth 1 should give us immmediate neighbours only - protoPeers = PeersToProto(node, node.Peers(), 1) + protoPeers = PeersToProto(node, 1) if len(protoPeers.Peers) != topCount { t.Errorf("Expected to find %d nodes, found: %d", topCount, len(protoPeers.Peers))