diff --git a/network/default.go b/network/default.go index 23c924f6..605fa017 100644 --- a/network/default.go +++ b/network/default.go @@ -1,7 +1,6 @@ package network import ( - "container/list" "errors" "sync" "time" @@ -30,61 +29,10 @@ var ( ) var ( - // ErrMsgUnknown is returned when unknown message is attempted to send or receive - ErrMsgUnknown = errors.New("unknown message") // ErrClientNotFound is returned when client for tunnel channel could not be found ErrClientNotFound = errors.New("client not found") ) -// node is network node -type node struct { - sync.RWMutex - // id is node id - id string - // address is node address - address string - // neighbours maps the node neighbourhood - neighbours map[string]*node - // network returns the node network - network Network - // lastSeen stores the time the node has been seen last time - lastSeen time.Time -} - -// Id is node ide -func (n *node) Id() string { - return n.id -} - -// Address returns node address -func (n *node) Address() string { - return n.address -} - -// Network returns node network -func (n *node) Network() Network { - return n.network -} - -// Neighbourhood returns node neighbourhood -func (n *node) Neighbourhood() []Node { - var nodes []Node - n.RLock() - for _, neighbourNode := range n.neighbours { - // make a copy of the node - n := &node{ - id: neighbourNode.id, - address: neighbourNode.address, - network: neighbourNode.network, - } - // NOTE: we do not care about neighbour's neighbours - nodes = append(nodes, n) - } - n.RUnlock() - - return nodes -} - // network implements Network interface type network struct { // node is network node @@ -156,9 +104,9 @@ func newNetwork(opts ...Option) Network { network := &network{ node: &node{ - id: options.Id, - address: options.Address, - neighbours: make(map[string]*node), + id: options.Id, + address: options.Address, + peers: make(map[string]*node), }, options: options, Router: options.Router, @@ -176,9 +124,10 @@ func newNetwork(opts ...Option) Network { // Options returns network options func (n *network) Options() Options { - n.Lock() + n.RLock() + defer n.RUnlock() + options := n.options - n.Unlock() return options } @@ -248,7 +197,6 @@ func (n *network) handleNetConn(sess tunnel.Session, msg chan *transport.Message for { m := new(transport.Message) if err := sess.Recv(m); err != nil { - // TODO: should we bail here? log.Debugf("Network tunnel [%s] receive error: %v", NetworkChannel, err) return } @@ -307,81 +255,70 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen if pbNetConnect.Node.Id == n.options.Id { continue } - n.Lock() log.Debugf("Network received connect message from: %s", pbNetConnect.Node.Id) - // if the entry already exists skip adding it - if neighbour, ok := n.neighbours[pbNetConnect.Node.Id]; ok { - // update lastSeen timestamp - if n.neighbours[pbNetConnect.Node.Id].lastSeen.Before(now) { - neighbour.lastSeen = now + peer := &node{ + id: pbNetConnect.Node.Id, + address: pbNetConnect.Node.Address, + peers: make(map[string]*node), + lastSeen: now, + } + if ok := n.node.AddPeer(peer); !ok { + log.Debugf("Network peer exists, refreshing: %s", peer.id) + // update lastSeen time for the existing node + if ok := n.RefreshPeer(peer.id, now); !ok { + log.Debugf("Network failed refreshing peer: %s", peer.id) } - n.Unlock() continue } - // add a new neighbour - // NOTE: new node does not have any neighbours - n.neighbours[pbNetConnect.Node.Id] = &node{ - id: pbNetConnect.Node.Id, - address: pbNetConnect.Node.Address, - neighbours: make(map[string]*node), - lastSeen: now, - } - n.Unlock() + // get node peers down to MaxDepth encoded in protobuf + msg := PeersToProto(n.node, MaxDepth) // advertise yourself to the network - if err := n.sendMsg("neighbour", NetworkChannel); err != nil { - log.Debugf("Network failed to advertise neighbours: %v", err) + if err := n.sendMsg("peer", msg, NetworkChannel); err != nil { + log.Debugf("Network failed to advertise peers: %v", err) } // advertise all the routes when a new node has connected if err := n.Router.Solicit(); err != nil { log.Debugf("Network failed to solicit routes: %s", err) } - case "neighbour": + case "peer": // mark the time the message has been received now := time.Now() - pbNetNeighbour := &pbNet.Neighbour{} - if err := proto.Unmarshal(m.Body, pbNetNeighbour); err != nil { - log.Debugf("Network tunnel [%s] neighbour unmarshal error: %v", NetworkChannel, err) + pbNetPeer := &pbNet.Peer{} + if err := proto.Unmarshal(m.Body, pbNetPeer); err != nil { + log.Debugf("Network tunnel [%s] peer unmarshal error: %v", NetworkChannel, err) continue } // don't process your own messages - if pbNetNeighbour.Node.Id == n.options.Id { + if pbNetPeer.Node.Id == n.options.Id { continue } - n.Lock() - log.Debugf("Network received neighbour message from: %s", pbNetNeighbour.Node.Id) - // only add the neighbour if it is NOT already in node's list of neighbours - _, exists := n.neighbours[pbNetNeighbour.Node.Id] - if !exists { - n.neighbours[pbNetNeighbour.Node.Id] = &node{ - id: pbNetNeighbour.Node.Id, - address: pbNetNeighbour.Node.Address, - neighbours: make(map[string]*node), - lastSeen: now, + log.Debugf("Network received peer message from: %s", pbNetPeer.Node.Id) + peer := &node{ + id: pbNetPeer.Node.Id, + address: pbNetPeer.Node.Address, + peers: make(map[string]*node), + lastSeen: now, + } + if ok := n.node.AddPeer(peer); ok { + // send a solicit message when discovering new peer + msg := &pbRtr.Solicit{ + Id: n.options.Id, } - } - // update lastSeen timestamp - if n.neighbours[pbNetNeighbour.Node.Id].lastSeen.Before(now) { - n.neighbours[pbNetNeighbour.Node.Id].lastSeen = now - } - // update/store the neighbour node neighbours - // NOTE: * we do NOT update lastSeen time for the neighbours of the neighbour - // * even though we are NOT interested in neighbours of neighbours here - // we still allocate the map of neighbours for each of them - for _, pbNeighbour := range pbNetNeighbour.Neighbours { - neighbourNode := &node{ - id: pbNeighbour.Id, - address: pbNeighbour.Address, - neighbours: make(map[string]*node), - } - n.neighbours[pbNetNeighbour.Node.Id].neighbours[pbNeighbour.Id] = neighbourNode - } - n.Unlock() - // send a solicit message when discovering a new node - // NOTE: we need to send the solicit message here after the Lock is released as sendMsg locks, too - if !exists { - if err := n.sendMsg("solicit", ControlChannel); err != nil { + if err := n.sendMsg("solicit", msg, ControlChannel); err != nil { log.Debugf("Network failed to send solicit message: %s", err) } + continue + } + log.Debugf("Network peer exists, refreshing: %s", pbNetPeer.Node.Id) + // update lastSeen time for the peer + if ok := n.RefreshPeer(pbNetPeer.Node.Id, now); !ok { + log.Debugf("Network failed refreshing peer: %s", pbNetPeer.Node.Id) + } + // NOTE: we don't unpack MaxDepth toplogy + peer = UnpackPeerTopology(pbNetPeer, now, MaxDepth-1) + log.Debugf("Network updating topology of node: %s", n.node.id) + if ok := n.node.UpdatePeer(peer); !ok { + log.Debugf("Network failed to update peers") } case "close": pbNetClose := &pbNet.Close{} @@ -393,13 +330,11 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen if pbNetClose.Node.Id == n.options.Id { continue } - n.Lock() log.Debugf("Network received close message from: %s", pbNetClose.Node.Id) if err := n.pruneNode(pbNetClose.Node.Id); err != nil { log.Debugf("Network failed to prune the node %s: %v", pbNetClose.Node.Id, err) continue } - n.Unlock() } case <-n.closed: return @@ -408,68 +343,29 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen } // sendMsg sends a message to the tunnel channel -func (n *network) sendMsg(msgType string, channel string) error { - node := &pbNet.Node{ - Id: n.options.Id, - Address: n.options.Address, - } - - var protoMsg proto.Message - - switch msgType { - case "connect": - protoMsg = &pbNet.Connect{ - Node: node, - } - case "close": - protoMsg = &pbNet.Close{ - Node: node, - } - case "solicit": - protoMsg = &pbNet.Solicit{ - Node: node, - } - case "neighbour": - n.RLock() - nodes := make([]*pbNet.Node, len(n.neighbours)) - i := 0 - for id := range n.neighbours { - nodes[i] = &pbNet.Node{ - Id: id, - Address: n.neighbours[id].address, - } - i++ - } - n.RUnlock() - protoMsg = &pbNet.Neighbour{ - Node: node, - Neighbours: nodes, - } - default: - return ErrMsgUnknown - } - - body, err := proto.Marshal(protoMsg) +func (n *network) sendMsg(method string, msg proto.Message, channel string) error { + body, err := proto.Marshal(msg) if err != nil { return err } // create transport message and chuck it down the pipe m := transport.Message{ Header: map[string]string{ - "Micro-Method": msgType, + "Micro-Method": method, }, Body: body, } + // check if the channel client is initialized n.RLock() client, ok := n.tunClient[channel] - if !ok { + if !ok || client == nil { n.RUnlock() return ErrClientNotFound } n.RUnlock() - log.Debugf("Network sending %s message from: %s", msgType, node.Id) + log.Debugf("Network sending %s message from: %s", method, n.options.Id) if err := client.Send(&m); err != nil { return err } @@ -477,7 +373,7 @@ func (n *network) sendMsg(msgType string, channel string) error { return nil } -// announce announces node neighbourhood to the network +// announce announces node peers to the network func (n *network) announce(client transport.Client) { announce := time.NewTicker(AnnounceTime) defer announce.Stop() @@ -487,19 +383,20 @@ func (n *network) announce(client transport.Client) { case <-n.closed: return case <-announce.C: + msg := PeersToProto(n.node, MaxDepth) // advertise yourself to the network - if err := n.sendMsg("neighbour", NetworkChannel); err != nil { - log.Debugf("Network failed to advertise neighbours: %v", err) + if err := n.sendMsg("peer", msg, NetworkChannel); err != nil { + log.Debugf("Network failed to advertise peers: %v", err) continue } } } } -// pruneNode removes a node with given id from the list of neighbours. It also removes all routes originted by this node. -// NOTE: this method is not thread-safe; when calling it make sure you lock the particular code segment +// pruneNode removes a node with given id from the list of peers. It also removes all routes originted by this node. func (n *network) pruneNode(id string) error { - delete(n.neighbours, id) + // DeletePeer serializes access + n.node.DeletePeer(id) // lookup all the routes originated at this node q := router.NewQuery( router.QueryRouter(id), @@ -531,7 +428,7 @@ func (n *network) prune() { return case <-prune.C: n.Lock() - for id, node := range n.neighbours { + for id, node := range n.peers { if id == n.options.Id { continue } @@ -589,8 +486,8 @@ func (n *network) acceptCtrlConn(l tunnel.Listener, recv chan *transport.Message // setRouteMetric calculates metric of the route and updates it in place // - Local route metric is 1 -// - Routes with ID of adjacent neighbour are 10 -// - Routes of neighbours of the advertiser are 100 +// - Routes with ID of adjacent nodes are 10 +// - Routes by peers of the advertiser are 100 // - Routes beyond your neighbourhood are 1000 func (n *network) setRouteMetric(route *router.Route) { // we are the origin of the route @@ -599,25 +496,21 @@ func (n *network) setRouteMetric(route *router.Route) { return } - n.RLock() - // check if the route origin is our neighbour - if _, ok := n.neighbours[route.Router]; ok { + // check if the route origin is our peer + if _, ok := n.peers[route.Router]; ok { route.Metric = 10 - n.RUnlock() return } - // check if the route origin is the neighbour of our neighbour - for _, node := range n.neighbours { - for id := range node.neighbours { + // check if the route origin is the peer of our peer + for _, peer := range n.peers { + for id := range peer.peers { if route.Router == id { route.Metric = 100 - n.RUnlock() return } } } - n.RUnlock() // the origin of the route is beyond our neighbourhood route.Metric = 1000 @@ -637,7 +530,6 @@ func (n *network) processCtrlChan(client transport.Client, listener tunnel.Liste // switch on type of message and take action switch m.Header["Micro-Method"] { case "advert": - now := time.Now() pbRtrAdvert := &pbRtr.Advert{} if err := proto.Unmarshal(m.Body, pbRtrAdvert); err != nil { log.Debugf("Network fail to unmarshal advert message: %v", err) @@ -647,41 +539,23 @@ func (n *network) processCtrlChan(client transport.Client, listener tunnel.Liste if pbRtrAdvert.Id == n.options.Id { continue } - // loookup advertising node in our neighbourhood - n.RLock() - log.Debugf("Network received advert message from: %s", pbRtrAdvert.Id) - advertNode, ok := n.neighbours[pbRtrAdvert.Id] - if !ok { - // advertising node has not been registered as our neighbour, yet - // let's add it to the map of our neighbours - advertNode = &node{ - id: pbRtrAdvert.Id, - neighbours: make(map[string]*node), - lastSeen: now, - } - n.neighbours[pbRtrAdvert.Id] = advertNode - // send a solicit message when discovering a new node - if err := n.sendMsg("solicit", NetworkChannel); err != nil { - log.Debugf("Network failed to send solicit message: %s", err) - } + log.Debugf("Network received advert message with %d events from: %s", len(pbRtrAdvert.Events), pbRtrAdvert.Id) + // loookup advertising node in our peer topology + advertNode := n.node.GetPeerNode(pbRtrAdvert.Id) + if advertNode == nil { + // if we can't find the node in our topology (MaxDepth) we skipp prcessing adverts + log.Debugf("Network skipping advert message from unknown peer: %s", pbRtrAdvert.Id) + continue } - n.RUnlock() var events []*router.Event for _, event := range pbRtrAdvert.Events { - // set the address of the advertising node - // we know Route.Gateway is the address of advertNode - // NOTE: this is true only when advertNode had not been registered - // as our neighbour when we received the advert from it - if advertNode.address == "" { - advertNode.address = event.Route.Gateway - } - // if advertising node id is not the same as Route.Router // we know the advertising node is not the origin of the route - if advertNode.id != event.Route.Router { - // if the origin router is not in the advertising node neighbourhood + if pbRtrAdvert.Id != event.Route.Router { + // if the origin router is not the advertising node peer // we can't rule out potential routing loops so we bail here - if _, ok := advertNode.neighbours[event.Route.Router]; !ok { + if peer := advertNode.GetPeerNode(event.Route.Router); peer == nil { + log.Debugf("Network skipping advert message from peer: %s", pbRtrAdvert.Id) continue } } @@ -695,7 +569,9 @@ func (n *network) processCtrlChan(client transport.Client, listener tunnel.Liste Metric: int(event.Route.Metric), } // set the route metric + n.node.RLock() n.setRouteMetric(&route) + n.node.RUnlock() // throw away metric bigger than 1000 if route.Metric > 1000 { log.Debugf("Network route metric %d dropping node: %s", route.Metric, route.Router) @@ -709,6 +585,7 @@ func (n *network) processCtrlChan(client transport.Client, listener tunnel.Liste } events = append(events, e) } + // create an advert and process it advert := &router.Advert{ Id: pbRtrAdvert.Id, Type: router.AdvertType(pbRtrAdvert.Type), @@ -717,21 +594,22 @@ func (n *network) processCtrlChan(client transport.Client, listener tunnel.Liste Events: events, } + log.Debugf("Network router processing advert: %s", advert.Id) if err := n.Router.Process(advert); err != nil { log.Debugf("Network failed to process advert %s: %v", advert.Id, err) - continue } case "solicit": - pbNetSolicit := &pbNet.Solicit{} - if err := proto.Unmarshal(m.Body, pbNetSolicit); err != nil { + pbRtrSolicit := &pbRtr.Solicit{} + if err := proto.Unmarshal(m.Body, pbRtrSolicit); err != nil { log.Debugf("Network fail to unmarshal solicit message: %v", err) continue } - log.Debugf("Network received solicit message from: %s", pbNetSolicit.Node.Id) - // don't process your own messages - if pbNetSolicit.Node.Id == n.options.Id { + log.Debugf("Network received solicit message from: %s", pbRtrSolicit.Id) + // ignore solicitation when requested by you + if pbRtrSolicit.Id == n.options.Id { continue } + log.Debugf("Network router flushing routes for: %s", pbRtrSolicit.Id) // advertise all the routes when a new node has connected if err := n.Router.Solicit(); err != nil { log.Debugf("Network failed to solicit routes: %s", err) @@ -769,29 +647,14 @@ func (n *network) advertise(client transport.Client, advertChan <-chan *router.A } events = append(events, e) } - pbRtrAdvert := &pbRtr.Advert{ + msg := &pbRtr.Advert{ Id: advert.Id, Type: pbRtr.AdvertType(advert.Type), Timestamp: advert.Timestamp.UnixNano(), Events: events, } - body, err := proto.Marshal(pbRtrAdvert) - if err != nil { - // TODO: should we bail here? - log.Debugf("Network failed to marshal advert message: %v", err) - continue - } - // create transport message and chuck it down the pipe - m := transport.Message{ - Header: map[string]string{ - "Micro-Method": "advert", - }, - Body: body, - } - - log.Debugf("Network sending advert message from: %s", pbRtrAdvert.Id) - if err := client.Send(&m); err != nil { - log.Debugf("Network failed to send advert %s: %v", pbRtrAdvert.Id, err) + if err := n.sendMsg("advert", msg, ControlChannel); err != nil { + log.Debugf("Network failed to advertise routes: %v", err) continue } case <-n.closed: @@ -805,6 +668,7 @@ func (n *network) Connect() error { n.Lock() // return if already connected if n.connected { + n.Unlock() return nil } @@ -816,6 +680,7 @@ func (n *network) Connect() error { // connect network tunnel if err := n.Tunnel.Connect(); err != nil { + n.Unlock() return err } @@ -827,6 +692,7 @@ func (n *network) Connect() error { // dial into ControlChannel to send route adverts ctrlClient, err := n.Tunnel.Dial(ControlChannel, tunnel.DialMulticast()) if err != nil { + n.Unlock() return err } @@ -835,12 +701,14 @@ func (n *network) Connect() error { // listen on ControlChannel ctrlListener, err := n.Tunnel.Listen(ControlChannel) if err != nil { + n.Unlock() return err } // dial into NetworkChannel to send network messages netClient, err := n.Tunnel.Dial(NetworkChannel, tunnel.DialMulticast()) if err != nil { + n.Unlock() return err } @@ -849,6 +717,7 @@ func (n *network) Connect() error { // listen on NetworkChannel netListener, err := n.Tunnel.Listen(NetworkChannel) if err != nil { + n.Unlock() return err } @@ -857,17 +726,20 @@ func (n *network) Connect() error { // start the router if err := n.options.Router.Start(); err != nil { + n.Unlock() return err } // start advertising routes advertChan, err := n.options.Router.Advertise() if err != nil { + n.Unlock() return err } // start the server if err := n.server.Start(); err != nil { + n.Unlock() return err } n.Unlock() @@ -876,13 +748,19 @@ func (n *network) Connect() error { // NOTE: in theory we could do this as soon as // Dial to NetworkChannel succeeds, but instead // we initialize all other node resources first - if err := n.sendMsg("connect", NetworkChannel); err != nil { + msg := &pbNet.Connect{ + Node: &pbNet.Node{ + Id: n.options.Id, + Address: n.options.Address, + }, + } + if err := n.sendMsg("connect", msg, NetworkChannel); err != nil { log.Debugf("Network failed to send connect message: %s", err) } // go resolving network nodes go n.resolve() - // broadcast neighbourhood + // broadcast peers go n.announce(netClient) // prune stale nodes go n.prune() @@ -900,48 +778,6 @@ func (n *network) Connect() error { return nil } -// Nodes returns a list of all network nodes -func (n *network) Nodes() []Node { - //track the visited nodes - visited := make(map[string]*node) - // queue of the nodes to visit - queue := list.New() - - // we need to freeze the network graph here - // otherwise we might get invalid results - n.RLock() - defer n.RUnlock() - - // push network node to the back of queue - queue.PushBack(n.node) - // mark the node as visited - visited[n.node.id] = n.node - - // keep iterating over the queue until its empty - for queue.Len() > 0 { - // pop the node from the front of the queue - qnode := queue.Front() - // iterate through all of its neighbours - // mark the visited nodes; enqueue the non-visted - for id, node := range qnode.Value.(*node).neighbours { - if _, ok := visited[id]; !ok { - visited[id] = node - queue.PushBack(node) - } - } - // remove the node from the queue - queue.Remove(qnode) - } - - var nodes []Node - // collect all the nodes and return them - for _, node := range visited { - nodes = append(nodes, node) - } - - return nodes -} - func (n *network) close() error { // stop the server if err := n.server.Stop(); err != nil { @@ -963,7 +799,6 @@ func (n *network) close() error { // Close closes network connection func (n *network) Close() error { - // lock this operation n.Lock() if !n.connected { @@ -984,9 +819,13 @@ func (n *network) Close() error { // unlock the lock otherwise we'll deadlock sending the close n.Unlock() - // send close message only if we managed to connect to NetworkChannel - log.Debugf("Sending close message from: %s", n.options.Id) - if err := n.sendMsg("close", NetworkChannel); err != nil { + msg := &pbNet.Close{ + Node: &pbNet.Node{ + Id: n.options.Id, + Address: n.options.Address, + }, + } + if err := n.sendMsg("close", msg, NetworkChannel); err != nil { log.Debugf("Network failed to send close message: %s", err) } } diff --git a/network/handler/handler.go b/network/handler/handler.go index 1d8d74a8..a0905936 100644 --- a/network/handler/handler.go +++ b/network/handler/handler.go @@ -3,7 +3,6 @@ package handler import ( "context" - "sort" "github.com/micro/go-micro/errors" "github.com/micro/go-micro/network" @@ -16,6 +15,21 @@ type Network struct { Network network.Network } +// ListPeers returns a list of all the nodes the node has a direct link with +func (n *Network) ListPeers(ctx context.Context, req *pbNet.PeerRequest, resp *pbNet.PeerResponse) error { + depth := uint(req.Depth) + if depth <= 0 || depth > network.MaxDepth { + depth = network.MaxDepth + } + + // get peers encoded into protobuf + peers := network.PeersToProto(n.Network, depth) + + resp.Peers = peers + + return nil +} + // ListRoutes returns a list of routing table routes func (n *Network) ListRoutes(ctx context.Context, req *pbRtr.Request, resp *pbRtr.ListResponse) error { routes, err := n.Network.Options().Router.Table().List() @@ -41,71 +55,3 @@ func (n *Network) ListRoutes(ctx context.Context, req *pbRtr.Request, resp *pbRt return nil } - -// ListNodes returns a list of all accessible nodes in the network -func (n *Network) ListNodes(ctx context.Context, req *pbNet.ListRequest, resp *pbNet.ListResponse) error { - nodes := n.Network.Nodes() - - var respNodes []*pbNet.Node - for _, node := range nodes { - respNode := &pbNet.Node{ - Id: node.Id(), - Address: node.Address(), - } - respNodes = append(respNodes, respNode) - } - - resp.Nodes = respNodes - - return nil -} - -// Neighbourhood returns a list of immediate neighbours -func (n *Network) Neighbourhood(ctx context.Context, req *pbNet.NeighbourhoodRequest, resp *pbNet.NeighbourhoodResponse) error { - // extract the id of the node to query - id := req.Id - // if no id is passed, we assume local node - if id == "" { - id = n.Network.Id() - } - - // get all the nodes in the network - nodes := n.Network.Nodes() - - // sort the slice of nodes - sort.Slice(nodes, func(i, j int) bool { return nodes[i].Id() <= nodes[j].Id() }) - // find a node with a given id - i := sort.Search(len(nodes), func(j int) bool { return nodes[j].Id() >= id }) - - var neighbours []*pbNet.Node - // collect all the nodes in the neighbourhood of the found node - if i < len(nodes) && nodes[i].Id() == id { - for _, neighbour := range nodes[i].Neighbourhood() { - // don't return yourself in response - if neighbour.Id() == n.Network.Id() { - continue - } - pbNeighbour := &pbNet.Node{ - Id: neighbour.Id(), - Address: neighbour.Address(), - } - neighbours = append(neighbours, pbNeighbour) - } - } - - // requested neighbourhood node - node := &pbNet.Node{ - Id: nodes[i].Id(), - Address: nodes[i].Address(), - } - - // creaate neighbourhood answer - neighbourhood := &pbNet.Neighbour{ - Node: node, - Neighbours: neighbours, - } - - resp.Neighbourhood = neighbourhood - - return nil -} diff --git a/network/network.go b/network/network.go index 5a9b7f6a..ef303992 100644 --- a/network/network.go +++ b/network/network.go @@ -28,8 +28,8 @@ type Node interface { Id() string // Address is node bind address Address() string - // Neighbourhood is node neighbourhood - Neighbourhood() []Node + // Peers returns node peers + Peers() []Node // Network is the network node is in Network() Network } @@ -44,8 +44,6 @@ type Network interface { Name() string // Connect starts the resolver and tunnel server Connect() error - // Nodes returns list of network nodes - Nodes() []Node // Close stops the tunnel and resolving Close() error // Client is micro client diff --git a/network/node.go b/network/node.go new file mode 100644 index 00000000..b9c362bb --- /dev/null +++ b/network/node.go @@ -0,0 +1,310 @@ +package network + +import ( + "container/list" + "sync" + "time" + + pb "github.com/micro/go-micro/network/proto" +) + +var ( + // MaxDepth defines max depth of peer topology + MaxDepth uint = 3 +) + +// node is network node +type node struct { + sync.RWMutex + // id is node id + id string + // address is node address + address string + // peers are nodes with direct link to this node + peers map[string]*node + // network returns the node network + network Network + // lastSeen keeps track of node lifetime and updates + lastSeen time.Time +} + +// Id is node ide +func (n *node) Id() string { + return n.id +} + +// Address returns node address +func (n *node) Address() string { + return n.address +} + +// Network returns node network +func (n *node) Network() Network { + return n.network +} + +// AddPeer adds a new peer to node topology +// It returns false if the peer already exists +func (n *node) AddPeer(peer *node) bool { + n.Lock() + defer n.Unlock() + + if _, ok := n.peers[peer.id]; !ok { + n.peers[peer.id] = peer + return true + } + + return false +} + +// UpdatePeer updates a peer if it exists +// It returns false if the peer does not exist +func (n *node) UpdatePeer(peer *node) bool { + n.Lock() + defer n.Unlock() + + if _, ok := n.peers[peer.id]; ok { + n.peers[peer.id] = peer + return true + } + + return false +} + +// DeletePeer deletes a peer if it exists +func (n *node) DeletePeer(id string) { + n.Lock() + defer n.Unlock() + + delete(n.peers, id) +} + +// HasPeer returns true if node has peer with given id +func (n *node) HasPeer(id string) bool { + n.RLock() + defer n.RUnlock() + + _, ok := n.peers[id] + return ok +} + +// RefreshPeer updates node timestamp +// It returns false if the peer has not been found. +func (n *node) RefreshPeer(id string, now time.Time) bool { + n.Lock() + defer n.Unlock() + + peer, ok := n.peers[id] + if !ok { + return false + } + + if peer.lastSeen.Before(now) { + peer.lastSeen = now + } + + return true +} + +func (n *node) walk(until func(peer *node) bool) map[string]*node { + // track the visited nodes + visited := make(map[string]*node) + // queue of the nodes to visit + queue := list.New() + + // push node to the back of queue + queue.PushBack(n) + // mark the node as visited + visited[n.id] = n + + // keep iterating over the queue until its empty + for queue.Len() > 0 { + // pop the node from the front of the queue + qnode := queue.Front() + // iterate through all of the node peers + // mark the visited nodes; enqueue the non-visted + for id, node := range qnode.Value.(*node).peers { + if _, ok := visited[id]; !ok { + visited[id] = node + queue.PushBack(node) + } + if until(node) { + return visited + } + } + // remove the node from the queue + queue.Remove(qnode) + } + + return visited +} + +// Nodes returns a slice if all nodes in node topology +func (n *node) Nodes() []Node { + // we need to freeze the network graph here + // otherwise we might get inconsisten results + n.RLock() + defer n.RUnlock() + + // NOTE: this should never be true + untilNoMorePeers := func(n *node) bool { + return n == nil + } + + visited := n.walk(untilNoMorePeers) + + var nodes []Node + // collect all the nodes and return them + for _, node := range visited { + nodes = append(nodes, node) + } + + return nodes +} + +// GetPeerNode returns a peer from node topology i.e. up to MaxDepth +// It returns nil if the peer was not found in the node topology +func (n *node) GetPeerNode(id string) *node { + n.RLock() + defer n.RUnlock() + + // get node topology up to MaxDepth + top := n.Topology(MaxDepth) + + untilFoundPeer := func(n *node) bool { + return n.id == id + } + + visited := top.walk(untilFoundPeer) + + peerNode, ok := visited[id] + if !ok { + return nil + } + + return peerNode +} + +// Topology returns a copy of th node topology down to given depth +func (n *node) Topology(depth uint) *node { + n.RLock() + defer n.RUnlock() + + // make a copy of yourself + node := &node{ + id: n.id, + address: n.address, + peers: make(map[string]*node), + network: n.network, + lastSeen: n.lastSeen, + } + + // return if we reach requested depth or we have no more peers + if depth == 0 || len(n.peers) == 0 { + return node + } + + // decrement the depth + depth-- + + // iterate through our peers and update the node peers + for _, peer := range n.peers { + nodePeer := peer.Topology(depth) + if _, ok := node.peers[nodePeer.id]; !ok { + node.peers[nodePeer.id] = nodePeer + } + } + + return node +} + +// Peers returns node peers up to MaxDepth +func (n *node) Peers() []Node { + n.RLock() + defer n.RUnlock() + + var peers []Node + for _, nodePeer := range n.peers { + peer := nodePeer.Topology(MaxDepth) + peers = append(peers, peer) + } + + return peers +} + +// UnpackPeerTopology unpacks pb.Peer into node topology of given depth +func UnpackPeerTopology(pbPeer *pb.Peer, lastSeen time.Time, depth uint) *node { + peerNode := &node{ + id: pbPeer.Node.Id, + address: pbPeer.Node.Address, + peers: make(map[string]*node), + lastSeen: lastSeen, + } + + // return if have either reached the depth or have no more peers + if depth == 0 || len(pbPeer.Peers) == 0 { + return peerNode + } + + // decrement the depth + depth-- + + peers := make(map[string]*node) + for _, pbPeer := range pbPeer.Peers { + peer := UnpackPeerTopology(pbPeer, lastSeen, depth) + peers[pbPeer.Node.Id] = peer + } + + peerNode.peers = peers + + return peerNode +} + +func peerProtoTopology(peer Node, depth uint) *pb.Peer { + node := &pb.Node{ + Id: peer.Id(), + Address: peer.Address(), + } + + pbPeers := &pb.Peer{ + Node: node, + Peers: make([]*pb.Peer, 0), + } + + // return if we reached the end of topology or depth + if depth == 0 || len(peer.Peers()) == 0 { + return pbPeers + } + + // decrement the depth + depth-- + + // iterate through peers of peers aka pops + for _, pop := range peer.Peers() { + peer := peerProtoTopology(pop, depth) + pbPeers.Peers = append(pbPeers.Peers, peer) + } + + return pbPeers +} + +// PeersToProto returns node peers graph encoded into protobuf +func PeersToProto(node Node, depth uint) *pb.Peer { + // network node aka root node + pbNode := &pb.Node{ + Id: node.Id(), + Address: node.Address(), + } + // we will build proto topology into this + pbPeers := &pb.Peer{ + Node: pbNode, + Peers: make([]*pb.Peer, 0), + } + + for _, peer := range node.Peers() { + pbPeer := peerProtoTopology(peer, depth) + pbPeers.Peers = append(pbPeers.Peers, pbPeer) + } + + return pbPeers +} diff --git a/network/node_test.go b/network/node_test.go new file mode 100644 index 00000000..10bc17bc --- /dev/null +++ b/network/node_test.go @@ -0,0 +1,279 @@ +package network + +import ( + "testing" + "time" + + pb "github.com/micro/go-micro/network/proto" +) + +var ( + testNodeId = "testNode" + testNodeAddress = "testAddress" + testNodeNetName = "testNetwork" + testNodePeerIds = []string{"peer1", "peer2", "peer3"} + testPeerOfPeerIds = []string{"peer11", "peer12"} +) + +func testSetup() *node { + testNode := &node{ + id: testNodeId, + address: testNodeAddress, + peers: make(map[string]*node), + network: newNetwork(Name(testNodeNetName)), + } + + // add some peers to the node + for _, id := range testNodePeerIds { + testNode.peers[id] = &node{ + id: id, + address: testNode.address + "-" + id, + peers: make(map[string]*node), + network: testNode.network, + } + } + + // add peers to peer1 + // NOTE: these are peers of peers! + for _, id := range testPeerOfPeerIds { + testNode.peers["peer1"].peers[id] = &node{ + id: id, + address: testNode.address + "-" + id, + peers: make(map[string]*node), + network: testNode.network, + } + } + + // connect peer1 with peer2 + testNode.peers["peer1"].peers["peer2"] = testNode.peers["peer2"] + // connect peer2 with peer3 + testNode.peers["peer2"].peers["peer3"] = testNode.peers["peer3"] + + return testNode +} + +func TestNodeId(t *testing.T) { + node := testSetup() + if node.Id() != testNodeId { + t.Errorf("Expected id: %s, found: %s", testNodeId, node.Id()) + } +} + +func TestNodeAddress(t *testing.T) { + node := testSetup() + if node.Address() != testNodeAddress { + t.Errorf("Expected address: %s, found: %s", testNodeAddress, node.Address()) + } +} +func TestNodeNetwork(t *testing.T) { + node := testSetup() + if node.Network().Name() != testNodeNetName { + t.Errorf("Expected network: %s, found: %s", testNodeNetName, node.Network().Name()) + } +} + +func TestNodes(t *testing.T) { + // single node + single := &node{ + id: testNodeId, + address: testNodeAddress, + peers: make(map[string]*node), + network: newNetwork(Name(testNodeNetName)), + } + // get all the nodes including yourself + nodes := single.Nodes() + nodeCount := 1 + + if len(nodes) != nodeCount { + t.Errorf("Expected to find %d nodes, found: %d", nodeCount, len(nodes)) + } + + // complicated node graph + node := testSetup() + // get all the nodes including yourself + nodes = node.Nodes() + + // compile a list of ids of all nodes in the network into map for easy indexing + nodeIds := make(map[string]bool) + // add yourself + nodeIds[node.id] = true + // add peer Ids + for _, id := range testNodePeerIds { + nodeIds[id] = true + } + // add peer1 peers i.e. peers of peer + for _, id := range testPeerOfPeerIds { + nodeIds[id] = true + } + + // we should return the correct number of nodes + if len(nodes) != len(nodeIds) { + t.Errorf("Expected %d nodes, found: %d", len(nodeIds), len(nodes)) + } + + // iterate through the list of nodes and makes sure all have been returned + for _, node := range nodes { + if _, ok := nodeIds[node.Id()]; !ok { + t.Errorf("Expected to find %s node", node.Id()) + } + } + + // this is a leaf node + id := "peer11" + if nodePeer := node.GetPeerNode(id); nodePeer == nil { + t.Errorf("Expected to find %s node", id) + } +} + +func collectPeerIds(peer Node, ids map[string]bool) map[string]bool { + if len(peer.Peers()) == 0 { + return ids + } + + // iterate through the whole graph + for _, peer := range peer.Peers() { + ids = collectPeerIds(peer, ids) + if _, ok := ids[peer.Id()]; !ok { + ids[peer.Id()] = true + } + } + + return ids +} + +func TestPeers(t *testing.T) { + // single node + single := &node{ + id: testNodeId, + address: testNodeAddress, + peers: make(map[string]*node), + network: newNetwork(Name(testNodeNetName)), + } + // get node peers + peers := single.Peers() + // there should be no peers + peerCount := 0 + + if len(peers) != peerCount { + t.Errorf("Expected to find %d nodes, found: %d", peerCount, len(peers)) + } + + // complicated node graph + node := testSetup() + // list of ids of nodes of MaxDepth + peerIds := make(map[string]bool) + // add peer Ids + for _, id := range testNodePeerIds { + peerIds[id] = true + } + // add peers of peers to peerIds + for _, id := range testPeerOfPeerIds { + peerIds[id] = true + } + // get node peers + peers = node.Peers() + + // we will collect all returned Peer Ids into this map + resPeerIds := make(map[string]bool) + for _, peer := range peers { + resPeerIds[peer.Id()] = true + resPeerIds = collectPeerIds(peer, resPeerIds) + } + + // if correct, we must collect all peerIds + if len(resPeerIds) != len(peerIds) { + t.Errorf("Expected to find %d peers, found: %d", len(peerIds), len(resPeerIds)) + } + + for id := range resPeerIds { + if _, ok := peerIds[id]; !ok { + t.Errorf("Expected to find %s peer", id) + } + } +} + +func TestUnpackPeerTopology(t *testing.T) { + pbPeer := &pb.Peer{ + Node: &pb.Node{ + Id: "newPeer", + Address: "newPeerAddress", + }, + Peers: make([]*pb.Peer, 0), + } + // it should add pbPeer to the single node peers + peer := UnpackPeerTopology(pbPeer, time.Now(), 5) + if peer.id != pbPeer.Node.Id { + t.Errorf("Expected peer id %s, found: %s", pbPeer.Node.Id, peer.id) + } + + node := testSetup() + // build a simple topology to update node peer1 + peer1 := node.peers["peer1"] + pbPeer1Node := &pb.Node{ + Id: peer1.id, + Address: peer1.address, + } + + pbPeer111 := &pb.Peer{ + Node: &pb.Node{ + Id: "peer111", + Address: "peer111Address", + }, + Peers: make([]*pb.Peer, 0), + } + + pbPeer121 := &pb.Peer{ + Node: &pb.Node{ + Id: "peer121", + Address: "peer121Address", + }, + Peers: make([]*pb.Peer, 0), + } + // topology to update + pbPeer1 := &pb.Peer{ + Node: pbPeer1Node, + Peers: []*pb.Peer{pbPeer111, pbPeer121}, + } + // unpack peer1 topology + peer = UnpackPeerTopology(pbPeer1, time.Now(), 5) + // make sure peer1 topology has been correctly updated + newPeerIds := []string{pbPeer111.Node.Id, pbPeer121.Node.Id} + for _, id := range newPeerIds { + if _, ok := peer.peers[id]; !ok { + t.Errorf("Expected %s to be a peer of %s", id, "peer1") + } + } +} + +func TestPeersToProto(t *testing.T) { + // single node + single := &node{ + id: testNodeId, + address: testNodeAddress, + peers: make(map[string]*node), + network: newNetwork(Name(testNodeNetName)), + } + topCount := 0 + + protoPeers := PeersToProto(single, 0) + + if len(protoPeers.Peers) != topCount { + t.Errorf("Expected to find %d nodes, found: %d", topCount, len(protoPeers.Peers)) + } + + // complicated node graph + node := testSetup() + topCount = 3 + // list of ids of nodes of depth 1 i.e. node peers + peerIds := make(map[string]bool) + // add peer Ids + for _, id := range testNodePeerIds { + peerIds[id] = true + } + // depth 1 should give us immmediate neighbours only + protoPeers = PeersToProto(node, 1) + + if len(protoPeers.Peers) != topCount { + t.Errorf("Expected to find %d nodes, found: %d", topCount, len(protoPeers.Peers)) + } +} diff --git a/network/proto/network.micro.go b/network/proto/network.micro.go index c114ee96..e1499490 100644 --- a/network/proto/network.micro.go +++ b/network/proto/network.micro.go @@ -35,9 +35,8 @@ var _ server.Option // Client API for Network service type NetworkService interface { + ListPeers(ctx context.Context, in *PeerRequest, opts ...client.CallOption) (*PeerResponse, error) ListRoutes(ctx context.Context, in *proto1.Request, opts ...client.CallOption) (*proto1.ListResponse, error) - ListNodes(ctx context.Context, in *ListRequest, opts ...client.CallOption) (*ListResponse, error) - Neighbourhood(ctx context.Context, in *NeighbourhoodRequest, opts ...client.CallOption) (*NeighbourhoodResponse, error) } type networkService struct { @@ -58,6 +57,16 @@ func NewNetworkService(name string, c client.Client) NetworkService { } } +func (c *networkService) ListPeers(ctx context.Context, in *PeerRequest, opts ...client.CallOption) (*PeerResponse, error) { + req := c.c.NewRequest(c.name, "Network.ListPeers", in) + out := new(PeerResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *networkService) ListRoutes(ctx context.Context, in *proto1.Request, opts ...client.CallOption) (*proto1.ListResponse, error) { req := c.c.NewRequest(c.name, "Network.ListRoutes", in) out := new(proto1.ListResponse) @@ -68,39 +77,17 @@ func (c *networkService) ListRoutes(ctx context.Context, in *proto1.Request, opt return out, nil } -func (c *networkService) ListNodes(ctx context.Context, in *ListRequest, opts ...client.CallOption) (*ListResponse, error) { - req := c.c.NewRequest(c.name, "Network.ListNodes", in) - out := new(ListResponse) - err := c.c.Call(ctx, req, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *networkService) Neighbourhood(ctx context.Context, in *NeighbourhoodRequest, opts ...client.CallOption) (*NeighbourhoodResponse, error) { - req := c.c.NewRequest(c.name, "Network.Neighbourhood", in) - out := new(NeighbourhoodResponse) - err := c.c.Call(ctx, req, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - // Server API for Network service type NetworkHandler interface { + ListPeers(context.Context, *PeerRequest, *PeerResponse) error ListRoutes(context.Context, *proto1.Request, *proto1.ListResponse) error - ListNodes(context.Context, *ListRequest, *ListResponse) error - Neighbourhood(context.Context, *NeighbourhoodRequest, *NeighbourhoodResponse) error } func RegisterNetworkHandler(s server.Server, hdlr NetworkHandler, opts ...server.HandlerOption) error { type network interface { + ListPeers(ctx context.Context, in *PeerRequest, out *PeerResponse) error ListRoutes(ctx context.Context, in *proto1.Request, out *proto1.ListResponse) error - ListNodes(ctx context.Context, in *ListRequest, out *ListResponse) error - Neighbourhood(ctx context.Context, in *NeighbourhoodRequest, out *NeighbourhoodResponse) error } type Network struct { network @@ -113,14 +100,10 @@ type networkHandler struct { NetworkHandler } +func (h *networkHandler) ListPeers(ctx context.Context, in *PeerRequest, out *PeerResponse) error { + return h.NetworkHandler.ListPeers(ctx, in, out) +} + func (h *networkHandler) ListRoutes(ctx context.Context, in *proto1.Request, out *proto1.ListResponse) error { return h.NetworkHandler.ListRoutes(ctx, in, out) } - -func (h *networkHandler) ListNodes(ctx context.Context, in *ListRequest, out *ListResponse) error { - return h.NetworkHandler.ListNodes(ctx, in, out) -} - -func (h *networkHandler) Neighbourhood(ctx context.Context, in *NeighbourhoodRequest, out *NeighbourhoodResponse) error { - return h.NetworkHandler.Neighbourhood(ctx, in, out) -} diff --git a/network/proto/network.pb.go b/network/proto/network.pb.go index dc6c0d48..3cacfb00 100644 --- a/network/proto/network.pb.go +++ b/network/proto/network.pb.go @@ -21,161 +21,91 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package -// Empty request -type ListRequest struct { +// PeerRequest requests list of peers +type PeerRequest struct { + // node topology depth + Depth uint32 `protobuf:"varint,1,opt,name=depth,proto3" json:"depth,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` } -func (m *ListRequest) Reset() { *m = ListRequest{} } -func (m *ListRequest) String() string { return proto.CompactTextString(m) } -func (*ListRequest) ProtoMessage() {} -func (*ListRequest) Descriptor() ([]byte, []int) { +func (m *PeerRequest) Reset() { *m = PeerRequest{} } +func (m *PeerRequest) String() string { return proto.CompactTextString(m) } +func (*PeerRequest) ProtoMessage() {} +func (*PeerRequest) Descriptor() ([]byte, []int) { return fileDescriptor_8571034d60397816, []int{0} } -func (m *ListRequest) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_ListRequest.Unmarshal(m, b) +func (m *PeerRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_PeerRequest.Unmarshal(m, b) } -func (m *ListRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_ListRequest.Marshal(b, m, deterministic) +func (m *PeerRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_PeerRequest.Marshal(b, m, deterministic) } -func (m *ListRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_ListRequest.Merge(m, src) +func (m *PeerRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_PeerRequest.Merge(m, src) } -func (m *ListRequest) XXX_Size() int { - return xxx_messageInfo_ListRequest.Size(m) +func (m *PeerRequest) XXX_Size() int { + return xxx_messageInfo_PeerRequest.Size(m) } -func (m *ListRequest) XXX_DiscardUnknown() { - xxx_messageInfo_ListRequest.DiscardUnknown(m) +func (m *PeerRequest) XXX_DiscardUnknown() { + xxx_messageInfo_PeerRequest.DiscardUnknown(m) } -var xxx_messageInfo_ListRequest proto.InternalMessageInfo +var xxx_messageInfo_PeerRequest proto.InternalMessageInfo -// ListResponse is returned by ListNodes and ListNeighbours -type ListResponse struct { - Nodes []*Node `protobuf:"bytes,1,rep,name=nodes,proto3" json:"nodes,omitempty"` +func (m *PeerRequest) GetDepth() uint32 { + if m != nil { + return m.Depth + } + return 0 +} + +// PeerResponse is returned by ListPeers +type PeerResponse struct { + // return peer topology + Peers *Peer `protobuf:"bytes,1,opt,name=peers,proto3" json:"peers,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` } -func (m *ListResponse) Reset() { *m = ListResponse{} } -func (m *ListResponse) String() string { return proto.CompactTextString(m) } -func (*ListResponse) ProtoMessage() {} -func (*ListResponse) Descriptor() ([]byte, []int) { +func (m *PeerResponse) Reset() { *m = PeerResponse{} } +func (m *PeerResponse) String() string { return proto.CompactTextString(m) } +func (*PeerResponse) ProtoMessage() {} +func (*PeerResponse) Descriptor() ([]byte, []int) { return fileDescriptor_8571034d60397816, []int{1} } -func (m *ListResponse) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_ListResponse.Unmarshal(m, b) +func (m *PeerResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_PeerResponse.Unmarshal(m, b) } -func (m *ListResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_ListResponse.Marshal(b, m, deterministic) +func (m *PeerResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_PeerResponse.Marshal(b, m, deterministic) } -func (m *ListResponse) XXX_Merge(src proto.Message) { - xxx_messageInfo_ListResponse.Merge(m, src) +func (m *PeerResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_PeerResponse.Merge(m, src) } -func (m *ListResponse) XXX_Size() int { - return xxx_messageInfo_ListResponse.Size(m) +func (m *PeerResponse) XXX_Size() int { + return xxx_messageInfo_PeerResponse.Size(m) } -func (m *ListResponse) XXX_DiscardUnknown() { - xxx_messageInfo_ListResponse.DiscardUnknown(m) +func (m *PeerResponse) XXX_DiscardUnknown() { + xxx_messageInfo_PeerResponse.DiscardUnknown(m) } -var xxx_messageInfo_ListResponse proto.InternalMessageInfo +var xxx_messageInfo_PeerResponse proto.InternalMessageInfo -func (m *ListResponse) GetNodes() []*Node { +func (m *PeerResponse) GetPeers() *Peer { if m != nil { - return m.Nodes - } - return nil -} - -// NeighbourhoodRequest is sent to query node neighbourhood -type NeighbourhoodRequest struct { - Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *NeighbourhoodRequest) Reset() { *m = NeighbourhoodRequest{} } -func (m *NeighbourhoodRequest) String() string { return proto.CompactTextString(m) } -func (*NeighbourhoodRequest) ProtoMessage() {} -func (*NeighbourhoodRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_8571034d60397816, []int{2} -} - -func (m *NeighbourhoodRequest) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_NeighbourhoodRequest.Unmarshal(m, b) -} -func (m *NeighbourhoodRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_NeighbourhoodRequest.Marshal(b, m, deterministic) -} -func (m *NeighbourhoodRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_NeighbourhoodRequest.Merge(m, src) -} -func (m *NeighbourhoodRequest) XXX_Size() int { - return xxx_messageInfo_NeighbourhoodRequest.Size(m) -} -func (m *NeighbourhoodRequest) XXX_DiscardUnknown() { - xxx_messageInfo_NeighbourhoodRequest.DiscardUnknown(m) -} - -var xxx_messageInfo_NeighbourhoodRequest proto.InternalMessageInfo - -func (m *NeighbourhoodRequest) GetId() string { - if m != nil { - return m.Id - } - return "" -} - -// NeighbourhoodResponse contains node neighbourhood hierarchy -type NeighbourhoodResponse struct { - Neighbourhood *Neighbour `protobuf:"bytes,1,opt,name=neighbourhood,proto3" json:"neighbourhood,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *NeighbourhoodResponse) Reset() { *m = NeighbourhoodResponse{} } -func (m *NeighbourhoodResponse) String() string { return proto.CompactTextString(m) } -func (*NeighbourhoodResponse) ProtoMessage() {} -func (*NeighbourhoodResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_8571034d60397816, []int{3} -} - -func (m *NeighbourhoodResponse) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_NeighbourhoodResponse.Unmarshal(m, b) -} -func (m *NeighbourhoodResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_NeighbourhoodResponse.Marshal(b, m, deterministic) -} -func (m *NeighbourhoodResponse) XXX_Merge(src proto.Message) { - xxx_messageInfo_NeighbourhoodResponse.Merge(m, src) -} -func (m *NeighbourhoodResponse) XXX_Size() int { - return xxx_messageInfo_NeighbourhoodResponse.Size(m) -} -func (m *NeighbourhoodResponse) XXX_DiscardUnknown() { - xxx_messageInfo_NeighbourhoodResponse.DiscardUnknown(m) -} - -var xxx_messageInfo_NeighbourhoodResponse proto.InternalMessageInfo - -func (m *NeighbourhoodResponse) GetNeighbourhood() *Neighbour { - if m != nil { - return m.Neighbourhood + return m.Peers } return nil } // Node is network node type Node struct { - // node ide + // node id Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` // node address Address string `protobuf:"bytes,2,opt,name=address,proto3" json:"address,omitempty"` @@ -188,7 +118,7 @@ func (m *Node) Reset() { *m = Node{} } func (m *Node) String() string { return proto.CompactTextString(m) } func (*Node) ProtoMessage() {} func (*Node) Descriptor() ([]byte, []int) { - return fileDescriptor_8571034d60397816, []int{4} + return fileDescriptor_8571034d60397816, []int{2} } func (m *Node) XXX_Unmarshal(b []byte) error { @@ -236,7 +166,7 @@ func (m *Connect) Reset() { *m = Connect{} } func (m *Connect) String() string { return proto.CompactTextString(m) } func (*Connect) ProtoMessage() {} func (*Connect) Descriptor() ([]byte, []int) { - return fileDescriptor_8571034d60397816, []int{5} + return fileDescriptor_8571034d60397816, []int{3} } func (m *Connect) XXX_Unmarshal(b []byte) error { @@ -277,7 +207,7 @@ func (m *Close) Reset() { *m = Close{} } func (m *Close) String() string { return proto.CompactTextString(m) } func (*Close) ProtoMessage() {} func (*Close) Descriptor() ([]byte, []int) { - return fileDescriptor_8571034d60397816, []int{6} + return fileDescriptor_8571034d60397816, []int{4} } func (m *Close) XXX_Unmarshal(b []byte) error { @@ -305,134 +235,86 @@ func (m *Close) GetNode() *Node { return nil } -// Solicit is sent when requesting route advertisement from the network nodes -type Solicit struct { - // network node - Node *Node `protobuf:"bytes,1,opt,name=node,proto3" json:"node,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *Solicit) Reset() { *m = Solicit{} } -func (m *Solicit) String() string { return proto.CompactTextString(m) } -func (*Solicit) ProtoMessage() {} -func (*Solicit) Descriptor() ([]byte, []int) { - return fileDescriptor_8571034d60397816, []int{7} -} - -func (m *Solicit) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_Solicit.Unmarshal(m, b) -} -func (m *Solicit) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_Solicit.Marshal(b, m, deterministic) -} -func (m *Solicit) XXX_Merge(src proto.Message) { - xxx_messageInfo_Solicit.Merge(m, src) -} -func (m *Solicit) XXX_Size() int { - return xxx_messageInfo_Solicit.Size(m) -} -func (m *Solicit) XXX_DiscardUnknown() { - xxx_messageInfo_Solicit.DiscardUnknown(m) -} - -var xxx_messageInfo_Solicit proto.InternalMessageInfo - -func (m *Solicit) GetNode() *Node { - if m != nil { - return m.Node - } - return nil -} - -// Neighbour is used to nnounce node neighbourhood -type Neighbour struct { +// Peer is used to advertise node peers +type Peer struct { // network node Node *Node `protobuf:"bytes,1,opt,name=node,proto3" json:"node,omitempty"` - // neighbours - Neighbours []*Node `protobuf:"bytes,3,rep,name=neighbours,proto3" json:"neighbours,omitempty"` + // node peers + Peers []*Peer `protobuf:"bytes,2,rep,name=peers,proto3" json:"peers,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` } -func (m *Neighbour) Reset() { *m = Neighbour{} } -func (m *Neighbour) String() string { return proto.CompactTextString(m) } -func (*Neighbour) ProtoMessage() {} -func (*Neighbour) Descriptor() ([]byte, []int) { - return fileDescriptor_8571034d60397816, []int{8} +func (m *Peer) Reset() { *m = Peer{} } +func (m *Peer) String() string { return proto.CompactTextString(m) } +func (*Peer) ProtoMessage() {} +func (*Peer) Descriptor() ([]byte, []int) { + return fileDescriptor_8571034d60397816, []int{5} } -func (m *Neighbour) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_Neighbour.Unmarshal(m, b) +func (m *Peer) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Peer.Unmarshal(m, b) } -func (m *Neighbour) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_Neighbour.Marshal(b, m, deterministic) +func (m *Peer) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Peer.Marshal(b, m, deterministic) } -func (m *Neighbour) XXX_Merge(src proto.Message) { - xxx_messageInfo_Neighbour.Merge(m, src) +func (m *Peer) XXX_Merge(src proto.Message) { + xxx_messageInfo_Peer.Merge(m, src) } -func (m *Neighbour) XXX_Size() int { - return xxx_messageInfo_Neighbour.Size(m) +func (m *Peer) XXX_Size() int { + return xxx_messageInfo_Peer.Size(m) } -func (m *Neighbour) XXX_DiscardUnknown() { - xxx_messageInfo_Neighbour.DiscardUnknown(m) +func (m *Peer) XXX_DiscardUnknown() { + xxx_messageInfo_Peer.DiscardUnknown(m) } -var xxx_messageInfo_Neighbour proto.InternalMessageInfo +var xxx_messageInfo_Peer proto.InternalMessageInfo -func (m *Neighbour) GetNode() *Node { +func (m *Peer) GetNode() *Node { if m != nil { return m.Node } return nil } -func (m *Neighbour) GetNeighbours() []*Node { +func (m *Peer) GetPeers() []*Peer { if m != nil { - return m.Neighbours + return m.Peers } return nil } func init() { - proto.RegisterType((*ListRequest)(nil), "go.micro.network.ListRequest") - proto.RegisterType((*ListResponse)(nil), "go.micro.network.ListResponse") - proto.RegisterType((*NeighbourhoodRequest)(nil), "go.micro.network.NeighbourhoodRequest") - proto.RegisterType((*NeighbourhoodResponse)(nil), "go.micro.network.NeighbourhoodResponse") + proto.RegisterType((*PeerRequest)(nil), "go.micro.network.PeerRequest") + proto.RegisterType((*PeerResponse)(nil), "go.micro.network.PeerResponse") proto.RegisterType((*Node)(nil), "go.micro.network.Node") proto.RegisterType((*Connect)(nil), "go.micro.network.Connect") proto.RegisterType((*Close)(nil), "go.micro.network.Close") - proto.RegisterType((*Solicit)(nil), "go.micro.network.Solicit") - proto.RegisterType((*Neighbour)(nil), "go.micro.network.Neighbour") + proto.RegisterType((*Peer)(nil), "go.micro.network.Peer") } func init() { proto.RegisterFile("network.proto", fileDescriptor_8571034d60397816) } var fileDescriptor_8571034d60397816 = []byte{ - // 360 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x53, 0x41, 0x4f, 0xf2, 0x40, - 0x10, 0xfd, 0x28, 0xf0, 0x35, 0x0c, 0x1f, 0x5f, 0xcc, 0x46, 0x4d, 0x53, 0x83, 0x21, 0x7b, 0x40, - 0x62, 0xb4, 0x18, 0x08, 0x9e, 0xbc, 0x18, 0x0e, 0x5e, 0x08, 0x87, 0x7a, 0xf3, 0x66, 0xbb, 0x9b, - 0xb2, 0x11, 0x3a, 0xb8, 0xbb, 0x8d, 0x7f, 0xc0, 0x1f, 0x6e, 0xba, 0x5d, 0xb0, 0x80, 0x60, 0xb8, - 0x75, 0xe6, 0xbd, 0x37, 0x6f, 0xa7, 0xfb, 0x16, 0x5a, 0x29, 0xd7, 0x1f, 0x28, 0xdf, 0x82, 0xa5, - 0x44, 0x8d, 0xe4, 0x24, 0xc1, 0x60, 0x21, 0x62, 0x89, 0x81, 0xed, 0xfb, 0xc3, 0x44, 0xe8, 0x59, - 0x16, 0x05, 0x31, 0x2e, 0xfa, 0x06, 0xe9, 0x27, 0x78, 0x5b, 0x7c, 0x48, 0xcc, 0x34, 0x97, 0x7d, - 0xa3, 0xb4, 0x45, 0x31, 0x86, 0xb6, 0xa0, 0x39, 0x11, 0x4a, 0x87, 0xfc, 0x3d, 0xe3, 0x4a, 0xd3, - 0x07, 0xf8, 0x57, 0x94, 0x6a, 0x89, 0xa9, 0xe2, 0xe4, 0x06, 0xea, 0x29, 0x32, 0xae, 0xbc, 0x4a, - 0xa7, 0xda, 0x6b, 0x0e, 0xce, 0x83, 0x6d, 0xd7, 0x60, 0x8a, 0x8c, 0x87, 0x05, 0x89, 0x76, 0xe1, - 0x74, 0xca, 0x45, 0x32, 0x8b, 0x30, 0x93, 0x33, 0x44, 0x66, 0xa7, 0x92, 0xff, 0xe0, 0x08, 0xe6, - 0x55, 0x3a, 0x95, 0x5e, 0x23, 0x74, 0x04, 0xa3, 0x2f, 0x70, 0xb6, 0xc5, 0xb3, 0x76, 0x8f, 0xf9, - 0x96, 0x25, 0xc0, 0x68, 0x9a, 0x83, 0x8b, 0x1f, 0x6c, 0x57, 0xb4, 0x70, 0x53, 0x41, 0xef, 0xa0, - 0x96, 0x1f, 0x69, 0xdb, 0x93, 0x78, 0xe0, 0xbe, 0x32, 0x26, 0xb9, 0x52, 0x9e, 0x63, 0x9a, 0xab, - 0x92, 0x8e, 0xc0, 0x1d, 0x63, 0x9a, 0xf2, 0x58, 0x93, 0x6b, 0xa8, 0xe5, 0x9b, 0x58, 0xdb, 0x7d, - 0xdb, 0x1a, 0x0e, 0x1d, 0x42, 0x7d, 0x3c, 0x47, 0xc5, 0x8f, 0x12, 0x8d, 0xc0, 0x7d, 0xc6, 0xb9, - 0x88, 0xc5, 0x71, 0x5e, 0x08, 0x8d, 0xf5, 0xc2, 0xc7, 0x08, 0xc9, 0x3d, 0xc0, 0xfa, 0xf7, 0x28, - 0xaf, 0x7a, 0xf0, 0x12, 0x4b, 0xcc, 0xc1, 0xa7, 0x03, 0xee, 0xb4, 0x00, 0xc9, 0x13, 0x80, 0xc9, - 0x44, 0x1e, 0x1b, 0x45, 0xbc, 0x6f, 0xb5, 0x0d, 0x92, 0xbd, 0x65, 0xbf, 0xbd, 0x83, 0x94, 0xa3, - 0x44, 0xff, 0x90, 0x09, 0x34, 0xf2, 0x4e, 0x6e, 0xa6, 0x48, 0x7b, 0xf7, 0x14, 0xa5, 0x20, 0xfa, - 0x97, 0xfb, 0xe0, 0xf5, 0xb4, 0x08, 0x5a, 0x1b, 0x21, 0x22, 0xdd, 0x03, 0x29, 0x29, 0xa5, 0xd1, - 0xbf, 0xfa, 0x95, 0xb7, 0xf2, 0x88, 0xfe, 0x9a, 0x47, 0x32, 0xfc, 0x0a, 0x00, 0x00, 0xff, 0xff, - 0x59, 0xcf, 0xab, 0xb5, 0x7c, 0x03, 0x00, 0x00, + // 292 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x52, 0xcd, 0x4a, 0xc3, 0x40, + 0x10, 0x36, 0x31, 0x31, 0x74, 0x6a, 0x45, 0x16, 0x91, 0x50, 0xa8, 0x94, 0xf5, 0x22, 0xa2, 0x1b, + 0x69, 0xf0, 0xe6, 0xad, 0x07, 0x2f, 0xa5, 0x48, 0x9e, 0x40, 0x9b, 0x1d, 0xd2, 0xa0, 0xcd, 0xc4, + 0xdd, 0x0d, 0xbe, 0x8e, 0x8f, 0x2a, 0xd9, 0x4d, 0x2d, 0x28, 0xa1, 0xf4, 0x96, 0xf9, 0xfe, 0x32, + 0xc3, 0xb7, 0x30, 0xaa, 0xd0, 0x7c, 0x91, 0x7a, 0x17, 0xb5, 0x22, 0x43, 0xec, 0xbc, 0x20, 0xb1, + 0x29, 0x73, 0x45, 0xa2, 0xc3, 0xc7, 0x69, 0x51, 0x9a, 0x75, 0xb3, 0x12, 0x39, 0x6d, 0x12, 0xcb, + 0x24, 0x05, 0xdd, 0xbb, 0x0f, 0x45, 0x8d, 0x41, 0x95, 0x58, 0x67, 0x37, 0xb8, 0x18, 0x7e, 0x0d, + 0xc3, 0x17, 0x44, 0x95, 0xe1, 0x67, 0x83, 0xda, 0xb0, 0x0b, 0x08, 0x25, 0xd6, 0x66, 0x1d, 0x7b, + 0x53, 0xef, 0x66, 0x94, 0xb9, 0x81, 0x3f, 0xc1, 0xa9, 0x13, 0xe9, 0x9a, 0x2a, 0x8d, 0xec, 0x0e, + 0xc2, 0x1a, 0x51, 0x69, 0xab, 0x1a, 0xce, 0x2e, 0xc5, 0xdf, 0x5d, 0x84, 0x95, 0x3b, 0x11, 0x7f, + 0x80, 0x60, 0x49, 0x12, 0xd9, 0x19, 0xf8, 0xa5, 0xb4, 0x96, 0x41, 0xe6, 0x97, 0x92, 0xc5, 0x10, + 0xbd, 0x49, 0xa9, 0x50, 0xeb, 0xd8, 0xb7, 0xe0, 0x76, 0xe4, 0x8f, 0x10, 0xcd, 0xa9, 0xaa, 0x30, + 0x37, 0xec, 0x16, 0x82, 0x8a, 0x24, 0xf6, 0xff, 0xa9, 0x8d, 0xce, 0xac, 0x86, 0xa7, 0x10, 0xce, + 0x3f, 0x48, 0xe3, 0x41, 0xa6, 0x57, 0x08, 0xda, 0x65, 0x0f, 0xf1, 0xec, 0xee, 0xf7, 0xa7, 0xc7, + 0x7b, 0xef, 0x9f, 0x7d, 0x7b, 0x10, 0x2d, 0x1d, 0xce, 0x16, 0x30, 0x58, 0x94, 0xda, 0xb4, 0xb4, + 0x66, 0x93, 0x1e, 0x9f, 0xeb, 0x62, 0x7c, 0xd5, 0x47, 0xbb, 0x16, 0xf8, 0x11, 0x7b, 0x06, 0x68, + 0xd3, 0xb2, 0xb6, 0x50, 0xcd, 0xe2, 0x9d, 0xbe, 0xab, 0x78, 0x9b, 0x34, 0xf9, 0xc7, 0x58, 0xdb, + 0x6f, 0xd0, 0xea, 0xc4, 0x3e, 0x86, 0xf4, 0x27, 0x00, 0x00, 0xff, 0xff, 0x68, 0x11, 0x14, 0x79, + 0x64, 0x02, 0x00, 0x00, } diff --git a/network/proto/network.proto b/network/proto/network.proto index 6025d90b..2d95a455 100644 --- a/network/proto/network.proto +++ b/network/proto/network.proto @@ -6,32 +6,25 @@ import "github.com/micro/go-micro/router/proto/router.proto"; // Network service is usesd to gain visibility into networks service Network { + rpc ListPeers(PeerRequest) returns (PeerResponse) {}; rpc ListRoutes(go.micro.router.Request) returns (go.micro.router.ListResponse) {}; - rpc ListNodes(ListRequest) returns (ListResponse) {}; - rpc Neighbourhood(NeighbourhoodRequest) returns (NeighbourhoodResponse) {}; } -// Empty request -message ListRequest {} - -// ListResponse is returned by ListNodes and ListNeighbours -message ListResponse { - repeated Node nodes = 1; +// PeerRequest requests list of peers +message PeerRequest { + // node topology depth + uint32 depth = 1; } -// NeighbourhoodRequest is sent to query node neighbourhood -message NeighbourhoodRequest { - string id = 1; -} - -// NeighbourhoodResponse contains node neighbourhood hierarchy -message NeighbourhoodResponse { - Neighbour neighbourhood = 1; +// PeerResponse is returned by ListPeers +message PeerResponse { + // return peer topology + Peer peers = 1; } // Node is network node message Node { - // node ide + // node id string id = 1; // node address string address = 2; @@ -49,16 +42,10 @@ message Close { Node node = 1; } -// Solicit is sent when requesting route advertisement from the network nodes -message Solicit { +// Peer is used to advertise node peers +message Peer { // network node Node node = 1; -} - -// Neighbour is used to nnounce node neighbourhood -message Neighbour { - // network node - Node node = 1; - // neighbours - repeated Node neighbours = 3; + // node peers + repeated Peer peers = 2; } diff --git a/router/default.go b/router/default.go index cfc30548..0006df29 100644 --- a/router/default.go +++ b/router/default.go @@ -337,6 +337,7 @@ func (r *router) advertiseTable() error { // advertise all routes as Update events to subscribers if len(events) > 0 { + log.Debugf("Network router flushing table with %d events: %s", len(events), r.options.Id) r.advertWg.Add(1) go r.publishAdvert(RouteUpdate, events) } @@ -668,11 +669,13 @@ func (r *router) Process(a *Advert) error { for _, event := range events { // skip if the router is the origin of this route if event.Route.Router == r.options.Id { + log.Debugf("Network router skipping processing its own route: %s", r.options.Id) continue } // create a copy of the route route := event.Route action := event.Type + log.Debugf("Network router processing route action %s: %s", action, r.options.Id) if err := r.manageRoute(route, fmt.Sprintf("%s", action)); err != nil { return fmt.Errorf("failed applying action %s to routing table: %s", action, err) } diff --git a/router/handler/router.go b/router/handler/router.go index ac05f012..5e4bceb7 100644 --- a/router/handler/router.go +++ b/router/handler/router.go @@ -45,6 +45,16 @@ func (r *Router) Lookup(ctx context.Context, req *pb.LookupRequest, resp *pb.Loo return nil } +// Solicit triggers full routing table advertisement +func (r *Router) Solicit(ctx context.Context, req *pb.Request, resp *pb.Response) error { + if err := r.Router.Solicit(); err != nil { + return err + } + + return nil +} + +// Advertise streams router advertisements func (r *Router) Advertise(ctx context.Context, req *pb.Request, stream pb.Router_AdvertiseStream) error { advertChan, err := r.Router.Advertise() if err != nil { @@ -91,6 +101,7 @@ func (r *Router) Advertise(ctx context.Context, req *pb.Request, stream pb.Route return nil } +// Process processes advertisements func (r *Router) Process(ctx context.Context, req *pb.Advert, rsp *pb.ProcessResponse) error { events := make([]*router.Event, len(req.Events)) for i, event := range req.Events { @@ -126,6 +137,7 @@ func (r *Router) Process(ctx context.Context, req *pb.Advert, rsp *pb.ProcessRes return nil } +// Status returns router status func (r *Router) Status(ctx context.Context, req *pb.Request, rsp *pb.StatusResponse) error { status := r.Router.Status() diff --git a/router/proto/router.micro.go b/router/proto/router.micro.go index b55c3043..a27af9e6 100644 --- a/router/proto/router.micro.go +++ b/router/proto/router.micro.go @@ -1,5 +1,5 @@ // Code generated by protoc-gen-micro. DO NOT EDIT. -// source: micro/go-micro/router/proto/router.proto +// source: router.proto package go_micro_router @@ -37,6 +37,7 @@ type RouterService interface { Lookup(ctx context.Context, in *LookupRequest, opts ...client.CallOption) (*LookupResponse, error) Watch(ctx context.Context, in *WatchRequest, opts ...client.CallOption) (Router_WatchService, error) Advertise(ctx context.Context, in *Request, opts ...client.CallOption) (Router_AdvertiseService, error) + Solicit(ctx context.Context, in *Request, opts ...client.CallOption) (*Response, error) Process(ctx context.Context, in *Advert, opts ...client.CallOption) (*ProcessResponse, error) Status(ctx context.Context, in *Request, opts ...client.CallOption) (*StatusResponse, error) } @@ -157,6 +158,16 @@ func (x *routerServiceAdvertise) Recv() (*Advert, error) { return m, nil } +func (c *routerService) Solicit(ctx context.Context, in *Request, opts ...client.CallOption) (*Response, error) { + req := c.c.NewRequest(c.name, "Router.Solicit", in) + out := new(Response) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *routerService) Process(ctx context.Context, in *Advert, opts ...client.CallOption) (*ProcessResponse, error) { req := c.c.NewRequest(c.name, "Router.Process", in) out := new(ProcessResponse) @@ -183,6 +194,7 @@ type RouterHandler interface { Lookup(context.Context, *LookupRequest, *LookupResponse) error Watch(context.Context, *WatchRequest, Router_WatchStream) error Advertise(context.Context, *Request, Router_AdvertiseStream) error + Solicit(context.Context, *Request, *Response) error Process(context.Context, *Advert, *ProcessResponse) error Status(context.Context, *Request, *StatusResponse) error } @@ -192,6 +204,7 @@ func RegisterRouterHandler(s server.Server, hdlr RouterHandler, opts ...server.H Lookup(ctx context.Context, in *LookupRequest, out *LookupResponse) error Watch(ctx context.Context, stream server.Stream) error Advertise(ctx context.Context, stream server.Stream) error + Solicit(ctx context.Context, in *Request, out *Response) error Process(ctx context.Context, in *Advert, out *ProcessResponse) error Status(ctx context.Context, in *Request, out *StatusResponse) error } @@ -280,6 +293,10 @@ func (x *routerAdvertiseStream) Send(m *Advert) error { return x.stream.Send(m) } +func (h *routerHandler) Solicit(ctx context.Context, in *Request, out *Response) error { + return h.RouterHandler.Solicit(ctx, in, out) +} + func (h *routerHandler) Process(ctx context.Context, in *Advert, out *ProcessResponse) error { return h.RouterHandler.Process(ctx, in, out) } diff --git a/router/proto/router.pb.go b/router/proto/router.pb.go index 6dedf13d..19813a34 100644 --- a/router/proto/router.pb.go +++ b/router/proto/router.pb.go @@ -1,13 +1,11 @@ // Code generated by protoc-gen-go. DO NOT EDIT. -// source: micro/go-micro/router/proto/router.proto +// source: router.proto package go_micro_router import ( - context "context" fmt "fmt" proto "github.com/golang/protobuf/proto" - grpc "google.golang.org/grpc" math "math" ) @@ -45,7 +43,7 @@ func (x AdvertType) String() string { } func (AdvertType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_6a36eee0b1adf739, []int{0} + return fileDescriptor_367072455c71aedc, []int{0} } // EventType defines the type of event @@ -74,7 +72,7 @@ func (x EventType) String() string { } func (EventType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_6a36eee0b1adf739, []int{1} + return fileDescriptor_367072455c71aedc, []int{1} } // Empty request @@ -88,7 +86,7 @@ func (m *Request) Reset() { *m = Request{} } func (m *Request) String() string { return proto.CompactTextString(m) } func (*Request) ProtoMessage() {} func (*Request) Descriptor() ([]byte, []int) { - return fileDescriptor_6a36eee0b1adf739, []int{0} + return fileDescriptor_367072455c71aedc, []int{0} } func (m *Request) XXX_Unmarshal(b []byte) error { @@ -109,6 +107,38 @@ func (m *Request) XXX_DiscardUnknown() { var xxx_messageInfo_Request proto.InternalMessageInfo +// Empty response +type Response struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Response) Reset() { *m = Response{} } +func (m *Response) String() string { return proto.CompactTextString(m) } +func (*Response) ProtoMessage() {} +func (*Response) Descriptor() ([]byte, []int) { + return fileDescriptor_367072455c71aedc, []int{1} +} + +func (m *Response) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Response.Unmarshal(m, b) +} +func (m *Response) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Response.Marshal(b, m, deterministic) +} +func (m *Response) XXX_Merge(src proto.Message) { + xxx_messageInfo_Response.Merge(m, src) +} +func (m *Response) XXX_Size() int { + return xxx_messageInfo_Response.Size(m) +} +func (m *Response) XXX_DiscardUnknown() { + xxx_messageInfo_Response.DiscardUnknown(m) +} + +var xxx_messageInfo_Response proto.InternalMessageInfo + // ListResponse is returned by List type ListResponse struct { Routes []*Route `protobuf:"bytes,1,rep,name=routes,proto3" json:"routes,omitempty"` @@ -121,7 +151,7 @@ func (m *ListResponse) Reset() { *m = ListResponse{} } func (m *ListResponse) String() string { return proto.CompactTextString(m) } func (*ListResponse) ProtoMessage() {} func (*ListResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_6a36eee0b1adf739, []int{1} + return fileDescriptor_367072455c71aedc, []int{2} } func (m *ListResponse) XXX_Unmarshal(b []byte) error { @@ -161,7 +191,7 @@ func (m *LookupRequest) Reset() { *m = LookupRequest{} } func (m *LookupRequest) String() string { return proto.CompactTextString(m) } func (*LookupRequest) ProtoMessage() {} func (*LookupRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_6a36eee0b1adf739, []int{2} + return fileDescriptor_367072455c71aedc, []int{3} } func (m *LookupRequest) XXX_Unmarshal(b []byte) error { @@ -201,7 +231,7 @@ func (m *LookupResponse) Reset() { *m = LookupResponse{} } func (m *LookupResponse) String() string { return proto.CompactTextString(m) } func (*LookupResponse) ProtoMessage() {} func (*LookupResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_6a36eee0b1adf739, []int{3} + return fileDescriptor_367072455c71aedc, []int{4} } func (m *LookupResponse) XXX_Unmarshal(b []byte) error { @@ -229,6 +259,7 @@ func (m *LookupResponse) GetRoutes() []*Route { return nil } +// QueryRequest queries Table for Routes type QueryRequest struct { Query *Query `protobuf:"bytes,1,opt,name=query,proto3" json:"query,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` @@ -240,7 +271,7 @@ func (m *QueryRequest) Reset() { *m = QueryRequest{} } func (m *QueryRequest) String() string { return proto.CompactTextString(m) } func (*QueryRequest) ProtoMessage() {} func (*QueryRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_6a36eee0b1adf739, []int{4} + return fileDescriptor_367072455c71aedc, []int{5} } func (m *QueryRequest) XXX_Unmarshal(b []byte) error { @@ -268,6 +299,7 @@ func (m *QueryRequest) GetQuery() *Query { return nil } +// QueryResponse is returned by Query type QueryResponse struct { Routes []*Route `protobuf:"bytes,1,rep,name=routes,proto3" json:"routes,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` @@ -279,7 +311,7 @@ func (m *QueryResponse) Reset() { *m = QueryResponse{} } func (m *QueryResponse) String() string { return proto.CompactTextString(m) } func (*QueryResponse) ProtoMessage() {} func (*QueryResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_6a36eee0b1adf739, []int{5} + return fileDescriptor_367072455c71aedc, []int{6} } func (m *QueryResponse) XXX_Unmarshal(b []byte) error { @@ -318,7 +350,7 @@ func (m *WatchRequest) Reset() { *m = WatchRequest{} } func (m *WatchRequest) String() string { return proto.CompactTextString(m) } func (*WatchRequest) ProtoMessage() {} func (*WatchRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_6a36eee0b1adf739, []int{6} + return fileDescriptor_367072455c71aedc, []int{7} } func (m *WatchRequest) XXX_Unmarshal(b []byte) error { @@ -360,7 +392,7 @@ func (m *Advert) Reset() { *m = Advert{} } func (m *Advert) String() string { return proto.CompactTextString(m) } func (*Advert) ProtoMessage() {} func (*Advert) Descriptor() ([]byte, []int) { - return fileDescriptor_6a36eee0b1adf739, []int{7} + return fileDescriptor_367072455c71aedc, []int{8} } func (m *Advert) XXX_Unmarshal(b []byte) error { @@ -416,6 +448,47 @@ func (m *Advert) GetEvents() []*Event { return nil } +// Solicit solicits routes +type Solicit struct { + // id of the soliciting router + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Solicit) Reset() { *m = Solicit{} } +func (m *Solicit) String() string { return proto.CompactTextString(m) } +func (*Solicit) ProtoMessage() {} +func (*Solicit) Descriptor() ([]byte, []int) { + return fileDescriptor_367072455c71aedc, []int{9} +} + +func (m *Solicit) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Solicit.Unmarshal(m, b) +} +func (m *Solicit) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Solicit.Marshal(b, m, deterministic) +} +func (m *Solicit) XXX_Merge(src proto.Message) { + xxx_messageInfo_Solicit.Merge(m, src) +} +func (m *Solicit) XXX_Size() int { + return xxx_messageInfo_Solicit.Size(m) +} +func (m *Solicit) XXX_DiscardUnknown() { + xxx_messageInfo_Solicit.DiscardUnknown(m) +} + +var xxx_messageInfo_Solicit proto.InternalMessageInfo + +func (m *Solicit) GetId() string { + if m != nil { + return m.Id + } + return "" +} + // ProcessResponse is returned by Process type ProcessResponse struct { XXX_NoUnkeyedLiteral struct{} `json:"-"` @@ -427,7 +500,7 @@ func (m *ProcessResponse) Reset() { *m = ProcessResponse{} } func (m *ProcessResponse) String() string { return proto.CompactTextString(m) } func (*ProcessResponse) ProtoMessage() {} func (*ProcessResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_6a36eee0b1adf739, []int{8} + return fileDescriptor_367072455c71aedc, []int{10} } func (m *ProcessResponse) XXX_Unmarshal(b []byte) error { @@ -459,7 +532,7 @@ func (m *CreateResponse) Reset() { *m = CreateResponse{} } func (m *CreateResponse) String() string { return proto.CompactTextString(m) } func (*CreateResponse) ProtoMessage() {} func (*CreateResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_6a36eee0b1adf739, []int{9} + return fileDescriptor_367072455c71aedc, []int{11} } func (m *CreateResponse) XXX_Unmarshal(b []byte) error { @@ -491,7 +564,7 @@ func (m *DeleteResponse) Reset() { *m = DeleteResponse{} } func (m *DeleteResponse) String() string { return proto.CompactTextString(m) } func (*DeleteResponse) ProtoMessage() {} func (*DeleteResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_6a36eee0b1adf739, []int{10} + return fileDescriptor_367072455c71aedc, []int{12} } func (m *DeleteResponse) XXX_Unmarshal(b []byte) error { @@ -523,7 +596,7 @@ func (m *UpdateResponse) Reset() { *m = UpdateResponse{} } func (m *UpdateResponse) String() string { return proto.CompactTextString(m) } func (*UpdateResponse) ProtoMessage() {} func (*UpdateResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_6a36eee0b1adf739, []int{11} + return fileDescriptor_367072455c71aedc, []int{13} } func (m *UpdateResponse) XXX_Unmarshal(b []byte) error { @@ -561,7 +634,7 @@ func (m *Event) Reset() { *m = Event{} } func (m *Event) String() string { return proto.CompactTextString(m) } func (*Event) ProtoMessage() {} func (*Event) Descriptor() ([]byte, []int) { - return fileDescriptor_6a36eee0b1adf739, []int{12} + return fileDescriptor_367072455c71aedc, []int{14} } func (m *Event) XXX_Unmarshal(b []byte) error { @@ -620,7 +693,7 @@ func (m *Query) Reset() { *m = Query{} } func (m *Query) String() string { return proto.CompactTextString(m) } func (*Query) ProtoMessage() {} func (*Query) Descriptor() ([]byte, []int) { - return fileDescriptor_6a36eee0b1adf739, []int{13} + return fileDescriptor_367072455c71aedc, []int{15} } func (m *Query) XXX_Unmarshal(b []byte) error { @@ -687,7 +760,7 @@ func (m *Route) Reset() { *m = Route{} } func (m *Route) String() string { return proto.CompactTextString(m) } func (*Route) ProtoMessage() {} func (*Route) Descriptor() ([]byte, []int) { - return fileDescriptor_6a36eee0b1adf739, []int{14} + return fileDescriptor_367072455c71aedc, []int{16} } func (m *Route) XXX_Unmarshal(b []byte) error { @@ -769,7 +842,7 @@ func (m *Status) Reset() { *m = Status{} } func (m *Status) String() string { return proto.CompactTextString(m) } func (*Status) ProtoMessage() {} func (*Status) Descriptor() ([]byte, []int) { - return fileDescriptor_6a36eee0b1adf739, []int{15} + return fileDescriptor_367072455c71aedc, []int{17} } func (m *Status) XXX_Unmarshal(b []byte) error { @@ -815,7 +888,7 @@ func (m *StatusResponse) Reset() { *m = StatusResponse{} } func (m *StatusResponse) String() string { return proto.CompactTextString(m) } func (*StatusResponse) ProtoMessage() {} func (*StatusResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_6a36eee0b1adf739, []int{16} + return fileDescriptor_367072455c71aedc, []int{18} } func (m *StatusResponse) XXX_Unmarshal(b []byte) error { @@ -847,6 +920,7 @@ func init() { proto.RegisterEnum("go.micro.router.AdvertType", AdvertType_name, AdvertType_value) proto.RegisterEnum("go.micro.router.EventType", EventType_name, EventType_value) proto.RegisterType((*Request)(nil), "go.micro.router.Request") + proto.RegisterType((*Response)(nil), "go.micro.router.Response") proto.RegisterType((*ListResponse)(nil), "go.micro.router.ListResponse") proto.RegisterType((*LookupRequest)(nil), "go.micro.router.LookupRequest") proto.RegisterType((*LookupResponse)(nil), "go.micro.router.LookupResponse") @@ -854,6 +928,7 @@ func init() { proto.RegisterType((*QueryResponse)(nil), "go.micro.router.QueryResponse") proto.RegisterType((*WatchRequest)(nil), "go.micro.router.WatchRequest") proto.RegisterType((*Advert)(nil), "go.micro.router.Advert") + proto.RegisterType((*Solicit)(nil), "go.micro.router.Solicit") proto.RegisterType((*ProcessResponse)(nil), "go.micro.router.ProcessResponse") proto.RegisterType((*CreateResponse)(nil), "go.micro.router.CreateResponse") proto.RegisterType((*DeleteResponse)(nil), "go.micro.router.DeleteResponse") @@ -865,509 +940,53 @@ func init() { proto.RegisterType((*StatusResponse)(nil), "go.micro.router.StatusResponse") } -func init() { - proto.RegisterFile("micro/go-micro/router/proto/router.proto", fileDescriptor_6a36eee0b1adf739) -} - -var fileDescriptor_6a36eee0b1adf739 = []byte{ - // 699 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x55, 0xcd, 0x4e, 0xdb, 0x40, - 0x10, 0xb6, 0x9d, 0xd8, 0x91, 0xa7, 0xc1, 0xb8, 0xa3, 0x0a, 0xac, 0xb4, 0x40, 0xe4, 0x53, 0x84, - 0xa8, 0x53, 0xa5, 0xd7, 0xfe, 0x05, 0x4a, 0x55, 0xa9, 0x1c, 0x5a, 0x17, 0xd4, 0xb3, 0xb1, 0x57, - 0xd4, 0x22, 0xb1, 0xcd, 0xee, 0x06, 0x94, 0x73, 0x9f, 0xa6, 0xe7, 0x3e, 0x52, 0xaf, 0x7d, 0x88, - 0xca, 0xbb, 0xeb, 0x10, 0x62, 0x8c, 0x44, 0x4e, 0xde, 0x99, 0xf9, 0xe6, 0x9b, 0x99, 0xdd, 0x99, - 0x31, 0x0c, 0xa6, 0x69, 0x4c, 0xf3, 0xe1, 0x45, 0xfe, 0x52, 0x1e, 0x68, 0x3e, 0xe3, 0x84, 0x0e, - 0x0b, 0x9a, 0xf3, 0x4a, 0x08, 0x84, 0x80, 0x9b, 0x17, 0x79, 0x20, 0x30, 0x81, 0x54, 0xfb, 0x36, - 0x74, 0x42, 0x72, 0x35, 0x23, 0x8c, 0xfb, 0xef, 0xa0, 0x7b, 0x92, 0x32, 0x1e, 0x12, 0x56, 0xe4, - 0x19, 0x23, 0x18, 0x80, 0x25, 0x40, 0xcc, 0xd3, 0xfb, 0xad, 0xc1, 0x93, 0xd1, 0x56, 0xb0, 0xe2, - 0x1c, 0x84, 0xe5, 0x27, 0x54, 0x28, 0xff, 0x2d, 0x6c, 0x9c, 0xe4, 0xf9, 0xe5, 0xac, 0x50, 0x84, - 0x78, 0x00, 0xe6, 0xd5, 0x8c, 0xd0, 0xb9, 0xa7, 0xf7, 0xf5, 0x7b, 0xfd, 0xbf, 0x95, 0xd6, 0x50, - 0x82, 0xfc, 0x0f, 0xe0, 0x54, 0xee, 0x6b, 0x26, 0xf0, 0x06, 0xba, 0x92, 0x71, 0xad, 0xf8, 0xef, - 0x61, 0x43, 0x79, 0xaf, 0x19, 0xde, 0x81, 0xee, 0x8f, 0x88, 0xc7, 0x3f, 0xab, 0xfb, 0xfc, 0xad, - 0x83, 0x35, 0x4e, 0xae, 0x09, 0xe5, 0xe8, 0x80, 0x91, 0x26, 0x22, 0x0d, 0x3b, 0x34, 0xd2, 0x04, - 0x87, 0xd0, 0xe6, 0xf3, 0x82, 0x78, 0x46, 0x5f, 0x1f, 0x38, 0xa3, 0xe7, 0x35, 0x62, 0xe9, 0x76, - 0x3a, 0x2f, 0x48, 0x28, 0x80, 0xf8, 0x02, 0x6c, 0x9e, 0x4e, 0x09, 0xe3, 0xd1, 0xb4, 0xf0, 0x5a, - 0x7d, 0x7d, 0xd0, 0x0a, 0x6f, 0x15, 0xe8, 0x42, 0x8b, 0xf3, 0x89, 0xd7, 0x16, 0xfa, 0xf2, 0x58, - 0xe6, 0x4e, 0xae, 0x49, 0xc6, 0x99, 0x67, 0x36, 0xe4, 0x7e, 0x5c, 0x9a, 0x43, 0x85, 0xf2, 0x9f, - 0xc2, 0xe6, 0x57, 0x9a, 0xc7, 0x84, 0xb1, 0xaa, 0x7c, 0xdf, 0x05, 0xe7, 0x88, 0x92, 0x88, 0x93, - 0x65, 0xcd, 0x47, 0x32, 0x21, 0x77, 0x35, 0x67, 0x45, 0xb2, 0x8c, 0xf9, 0xa5, 0x83, 0x29, 0xa8, - 0x31, 0x50, 0x35, 0xea, 0xa2, 0xc6, 0xde, 0xfd, 0x09, 0x34, 0x95, 0x68, 0xac, 0x96, 0x78, 0x00, - 0xa6, 0xf0, 0x13, 0xc5, 0x37, 0xbf, 0x85, 0x04, 0xf9, 0x67, 0x60, 0x8a, 0xb7, 0x44, 0x0f, 0x3a, - 0x8c, 0xd0, 0xeb, 0x34, 0x26, 0xea, 0xf6, 0x2b, 0xb1, 0xb4, 0x5c, 0x44, 0x9c, 0xdc, 0x44, 0x73, - 0x11, 0xcc, 0x0e, 0x2b, 0xb1, 0xb4, 0x64, 0x84, 0xdf, 0xe4, 0xf4, 0x52, 0x04, 0xb3, 0xc3, 0x4a, - 0xf4, 0xff, 0xe8, 0x60, 0x8a, 0x38, 0x0f, 0xf3, 0x46, 0x49, 0x42, 0x09, 0x63, 0x15, 0xaf, 0x12, - 0x97, 0x23, 0xb6, 0x1a, 0x23, 0xb6, 0xef, 0x44, 0xc4, 0x2d, 0xd5, 0x83, 0xd4, 0x33, 0x85, 0x41, - 0x49, 0x88, 0xd0, 0x9e, 0xa4, 0xd9, 0xa5, 0x67, 0x09, 0xad, 0x38, 0x97, 0xd8, 0x29, 0xe1, 0x34, - 0x8d, 0xbd, 0x8e, 0xb8, 0x3d, 0x25, 0xf9, 0x23, 0xb0, 0xbe, 0xf3, 0x88, 0xcf, 0x58, 0xe9, 0x15, - 0xe7, 0x49, 0x95, 0xb2, 0x38, 0xe3, 0x33, 0x30, 0x09, 0xa5, 0x39, 0x55, 0xd9, 0x4a, 0xc1, 0x1f, - 0x83, 0x23, 0x7d, 0x16, 0xd3, 0x30, 0x04, 0x8b, 0x09, 0x8d, 0x9a, 0xa6, 0xed, 0xda, 0x0b, 0x28, - 0x07, 0x05, 0xdb, 0x1f, 0x01, 0xdc, 0xb6, 0x31, 0x22, 0x38, 0x52, 0x1a, 0x67, 0x59, 0x3e, 0xcb, - 0x62, 0xe2, 0x6a, 0xe8, 0x42, 0x57, 0xea, 0x64, 0x0f, 0xb9, 0xfa, 0xfe, 0x10, 0xec, 0x45, 0x5b, - 0x20, 0x80, 0x25, 0x1b, 0xd0, 0xd5, 0xca, 0xb3, 0x6c, 0x3d, 0x57, 0x2f, 0xcf, 0xca, 0xc1, 0x18, - 0xfd, 0x33, 0xc0, 0x0a, 0xe5, 0x95, 0x7c, 0x01, 0x4b, 0xee, 0x0f, 0xdc, 0xad, 0xa5, 0x76, 0x67, - 0x2f, 0xf5, 0xf6, 0x1a, 0xed, 0xaa, 0x89, 0x35, 0x3c, 0x04, 0x53, 0xcc, 0x32, 0xee, 0xd4, 0xb0, - 0xcb, 0x33, 0xde, 0x6b, 0x98, 0x2b, 0x5f, 0x7b, 0xa5, 0xe3, 0x21, 0xd8, 0xb2, 0xbc, 0x94, 0x11, - 0xf4, 0xea, 0x0d, 0xab, 0x28, 0xb6, 0x1b, 0xa6, 0x5f, 0x70, 0x7c, 0x82, 0x8e, 0x9a, 0x4b, 0x6c, - 0xc2, 0xf5, 0xfa, 0x35, 0xc3, 0xea, 0x28, 0x6b, 0x78, 0xbc, 0xe8, 0x81, 0xe6, 0x44, 0xf6, 0x9a, - 0x5e, 0x74, 0x41, 0x33, 0xfa, 0x6b, 0x80, 0x79, 0x1a, 0x9d, 0x4f, 0x08, 0x1e, 0x55, 0x8f, 0x83, - 0x0d, 0xa3, 0x78, 0x0f, 0xdd, 0xca, 0x3a, 0xd1, 0x4a, 0x12, 0xf9, 0xaa, 0x8f, 0x20, 0x59, 0xd9, - 0x40, 0x82, 0x44, 0xb6, 0xc3, 0x23, 0x48, 0x56, 0x96, 0x96, 0x86, 0x63, 0x68, 0x97, 0xff, 0xbe, - 0x07, 0x6e, 0xa7, 0xde, 0x08, 0xcb, 0x3f, 0x4b, 0x5f, 0xc3, 0xcf, 0xd5, 0xce, 0xd9, 0x69, 0xf8, - 0xcf, 0x28, 0xa2, 0xdd, 0x26, 0x73, 0xc5, 0x74, 0x6e, 0x89, 0x7f, 0xf5, 0xeb, 0xff, 0x01, 0x00, - 0x00, 0xff, 0xff, 0xe2, 0xe9, 0xe2, 0x3b, 0xd7, 0x07, 0x00, 0x00, -} - -// Reference imports to suppress errors if they are not otherwise used. -var _ context.Context -var _ grpc.ClientConn - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -const _ = grpc.SupportPackageIsVersion4 - -// RouterClient is the client API for Router service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. -type RouterClient interface { - Lookup(ctx context.Context, in *LookupRequest, opts ...grpc.CallOption) (*LookupResponse, error) - Watch(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (Router_WatchClient, error) - Advertise(ctx context.Context, in *Request, opts ...grpc.CallOption) (Router_AdvertiseClient, error) - Process(ctx context.Context, in *Advert, opts ...grpc.CallOption) (*ProcessResponse, error) - Status(ctx context.Context, in *Request, opts ...grpc.CallOption) (*StatusResponse, error) -} - -type routerClient struct { - cc *grpc.ClientConn -} - -func NewRouterClient(cc *grpc.ClientConn) RouterClient { - return &routerClient{cc} -} - -func (c *routerClient) Lookup(ctx context.Context, in *LookupRequest, opts ...grpc.CallOption) (*LookupResponse, error) { - out := new(LookupResponse) - err := c.cc.Invoke(ctx, "/go.micro.router.Router/Lookup", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *routerClient) Watch(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (Router_WatchClient, error) { - stream, err := c.cc.NewStream(ctx, &_Router_serviceDesc.Streams[0], "/go.micro.router.Router/Watch", opts...) - if err != nil { - return nil, err - } - x := &routerWatchClient{stream} - if err := x.ClientStream.SendMsg(in); err != nil { - return nil, err - } - if err := x.ClientStream.CloseSend(); err != nil { - return nil, err - } - return x, nil -} - -type Router_WatchClient interface { - Recv() (*Event, error) - grpc.ClientStream -} - -type routerWatchClient struct { - grpc.ClientStream -} - -func (x *routerWatchClient) Recv() (*Event, error) { - m := new(Event) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -func (c *routerClient) Advertise(ctx context.Context, in *Request, opts ...grpc.CallOption) (Router_AdvertiseClient, error) { - stream, err := c.cc.NewStream(ctx, &_Router_serviceDesc.Streams[1], "/go.micro.router.Router/Advertise", opts...) - if err != nil { - return nil, err - } - x := &routerAdvertiseClient{stream} - if err := x.ClientStream.SendMsg(in); err != nil { - return nil, err - } - if err := x.ClientStream.CloseSend(); err != nil { - return nil, err - } - return x, nil -} - -type Router_AdvertiseClient interface { - Recv() (*Advert, error) - grpc.ClientStream -} - -type routerAdvertiseClient struct { - grpc.ClientStream -} - -func (x *routerAdvertiseClient) Recv() (*Advert, error) { - m := new(Advert) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -func (c *routerClient) Process(ctx context.Context, in *Advert, opts ...grpc.CallOption) (*ProcessResponse, error) { - out := new(ProcessResponse) - err := c.cc.Invoke(ctx, "/go.micro.router.Router/Process", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *routerClient) Status(ctx context.Context, in *Request, opts ...grpc.CallOption) (*StatusResponse, error) { - out := new(StatusResponse) - err := c.cc.Invoke(ctx, "/go.micro.router.Router/Status", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -// RouterServer is the server API for Router service. -type RouterServer interface { - Lookup(context.Context, *LookupRequest) (*LookupResponse, error) - Watch(*WatchRequest, Router_WatchServer) error - Advertise(*Request, Router_AdvertiseServer) error - Process(context.Context, *Advert) (*ProcessResponse, error) - Status(context.Context, *Request) (*StatusResponse, error) -} - -func RegisterRouterServer(s *grpc.Server, srv RouterServer) { - s.RegisterService(&_Router_serviceDesc, srv) -} - -func _Router_Lookup_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(LookupRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(RouterServer).Lookup(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/go.micro.router.Router/Lookup", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(RouterServer).Lookup(ctx, req.(*LookupRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _Router_Watch_Handler(srv interface{}, stream grpc.ServerStream) error { - m := new(WatchRequest) - if err := stream.RecvMsg(m); err != nil { - return err - } - return srv.(RouterServer).Watch(m, &routerWatchServer{stream}) -} - -type Router_WatchServer interface { - Send(*Event) error - grpc.ServerStream -} - -type routerWatchServer struct { - grpc.ServerStream -} - -func (x *routerWatchServer) Send(m *Event) error { - return x.ServerStream.SendMsg(m) -} - -func _Router_Advertise_Handler(srv interface{}, stream grpc.ServerStream) error { - m := new(Request) - if err := stream.RecvMsg(m); err != nil { - return err - } - return srv.(RouterServer).Advertise(m, &routerAdvertiseServer{stream}) -} - -type Router_AdvertiseServer interface { - Send(*Advert) error - grpc.ServerStream -} - -type routerAdvertiseServer struct { - grpc.ServerStream -} - -func (x *routerAdvertiseServer) Send(m *Advert) error { - return x.ServerStream.SendMsg(m) -} - -func _Router_Process_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(Advert) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(RouterServer).Process(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/go.micro.router.Router/Process", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(RouterServer).Process(ctx, req.(*Advert)) - } - return interceptor(ctx, in, info, handler) -} - -func _Router_Status_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(Request) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(RouterServer).Status(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/go.micro.router.Router/Status", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(RouterServer).Status(ctx, req.(*Request)) - } - return interceptor(ctx, in, info, handler) -} - -var _Router_serviceDesc = grpc.ServiceDesc{ - ServiceName: "go.micro.router.Router", - HandlerType: (*RouterServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "Lookup", - Handler: _Router_Lookup_Handler, - }, - { - MethodName: "Process", - Handler: _Router_Process_Handler, - }, - { - MethodName: "Status", - Handler: _Router_Status_Handler, - }, - }, - Streams: []grpc.StreamDesc{ - { - StreamName: "Watch", - Handler: _Router_Watch_Handler, - ServerStreams: true, - }, - { - StreamName: "Advertise", - Handler: _Router_Advertise_Handler, - ServerStreams: true, - }, - }, - Metadata: "micro/go-micro/router/proto/router.proto", -} - -// TableClient is the client API for Table service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. -type TableClient interface { - Create(ctx context.Context, in *Route, opts ...grpc.CallOption) (*CreateResponse, error) - Delete(ctx context.Context, in *Route, opts ...grpc.CallOption) (*DeleteResponse, error) - Update(ctx context.Context, in *Route, opts ...grpc.CallOption) (*UpdateResponse, error) - List(ctx context.Context, in *Request, opts ...grpc.CallOption) (*ListResponse, error) - Query(ctx context.Context, in *QueryRequest, opts ...grpc.CallOption) (*QueryResponse, error) -} - -type tableClient struct { - cc *grpc.ClientConn -} - -func NewTableClient(cc *grpc.ClientConn) TableClient { - return &tableClient{cc} -} - -func (c *tableClient) Create(ctx context.Context, in *Route, opts ...grpc.CallOption) (*CreateResponse, error) { - out := new(CreateResponse) - err := c.cc.Invoke(ctx, "/go.micro.router.Table/Create", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *tableClient) Delete(ctx context.Context, in *Route, opts ...grpc.CallOption) (*DeleteResponse, error) { - out := new(DeleteResponse) - err := c.cc.Invoke(ctx, "/go.micro.router.Table/Delete", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *tableClient) Update(ctx context.Context, in *Route, opts ...grpc.CallOption) (*UpdateResponse, error) { - out := new(UpdateResponse) - err := c.cc.Invoke(ctx, "/go.micro.router.Table/Update", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *tableClient) List(ctx context.Context, in *Request, opts ...grpc.CallOption) (*ListResponse, error) { - out := new(ListResponse) - err := c.cc.Invoke(ctx, "/go.micro.router.Table/List", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *tableClient) Query(ctx context.Context, in *QueryRequest, opts ...grpc.CallOption) (*QueryResponse, error) { - out := new(QueryResponse) - err := c.cc.Invoke(ctx, "/go.micro.router.Table/Query", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -// TableServer is the server API for Table service. -type TableServer interface { - Create(context.Context, *Route) (*CreateResponse, error) - Delete(context.Context, *Route) (*DeleteResponse, error) - Update(context.Context, *Route) (*UpdateResponse, error) - List(context.Context, *Request) (*ListResponse, error) - Query(context.Context, *QueryRequest) (*QueryResponse, error) -} - -func RegisterTableServer(s *grpc.Server, srv TableServer) { - s.RegisterService(&_Table_serviceDesc, srv) -} - -func _Table_Create_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(Route) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(TableServer).Create(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/go.micro.router.Table/Create", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(TableServer).Create(ctx, req.(*Route)) - } - return interceptor(ctx, in, info, handler) -} - -func _Table_Delete_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(Route) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(TableServer).Delete(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/go.micro.router.Table/Delete", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(TableServer).Delete(ctx, req.(*Route)) - } - return interceptor(ctx, in, info, handler) -} - -func _Table_Update_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(Route) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(TableServer).Update(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/go.micro.router.Table/Update", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(TableServer).Update(ctx, req.(*Route)) - } - return interceptor(ctx, in, info, handler) -} - -func _Table_List_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(Request) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(TableServer).List(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/go.micro.router.Table/List", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(TableServer).List(ctx, req.(*Request)) - } - return interceptor(ctx, in, info, handler) -} - -func _Table_Query_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(QueryRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(TableServer).Query(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/go.micro.router.Table/Query", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(TableServer).Query(ctx, req.(*QueryRequest)) - } - return interceptor(ctx, in, info, handler) -} - -var _Table_serviceDesc = grpc.ServiceDesc{ - ServiceName: "go.micro.router.Table", - HandlerType: (*TableServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "Create", - Handler: _Table_Create_Handler, - }, - { - MethodName: "Delete", - Handler: _Table_Delete_Handler, - }, - { - MethodName: "Update", - Handler: _Table_Update_Handler, - }, - { - MethodName: "List", - Handler: _Table_List_Handler, - }, - { - MethodName: "Query", - Handler: _Table_Query_Handler, - }, - }, - Streams: []grpc.StreamDesc{}, - Metadata: "micro/go-micro/router/proto/router.proto", +func init() { proto.RegisterFile("router.proto", fileDescriptor_367072455c71aedc) } + +var fileDescriptor_367072455c71aedc = []byte{ + // 714 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x56, 0xcd, 0x4e, 0xdb, 0x40, + 0x10, 0xb6, 0x93, 0xd8, 0x69, 0xa6, 0x21, 0xa4, 0xa3, 0x0a, 0x4c, 0x5a, 0x20, 0xf2, 0x09, 0x21, + 0x64, 0xaa, 0xf4, 0xda, 0x1f, 0x02, 0xa5, 0xaa, 0x54, 0x0e, 0xad, 0x01, 0xf5, 0x6c, 0xec, 0x15, + 0xb5, 0x48, 0xbc, 0x66, 0x77, 0x03, 0xca, 0xb9, 0x4f, 0xd3, 0x4b, 0x2f, 0x7d, 0xa4, 0xbe, 0x48, + 0xe5, 0xdd, 0x75, 0x08, 0x71, 0x16, 0x09, 0x4e, 0xd9, 0xf9, 0xfb, 0x66, 0x66, 0xf7, 0x9b, 0x71, + 0xa0, 0xcd, 0xe8, 0x44, 0x10, 0x16, 0xe4, 0x8c, 0x0a, 0x8a, 0xab, 0x97, 0x34, 0x18, 0xa7, 0x31, + 0xa3, 0x81, 0x52, 0xfb, 0x2d, 0x68, 0x86, 0xe4, 0x7a, 0x42, 0xb8, 0xf0, 0x01, 0x9e, 0x85, 0x84, + 0xe7, 0x34, 0xe3, 0xc4, 0xff, 0x00, 0xed, 0x93, 0x94, 0x8b, 0x52, 0xc6, 0x00, 0x5c, 0x19, 0xc0, + 0x3d, 0xbb, 0x5f, 0xdf, 0x79, 0x3e, 0x58, 0x0b, 0x16, 0x80, 0x82, 0xb0, 0xf8, 0x09, 0xb5, 0x97, + 0xff, 0x1e, 0x56, 0x4e, 0x28, 0xbd, 0x9a, 0xe4, 0x1a, 0x1c, 0xf7, 0xc0, 0xb9, 0x9e, 0x10, 0x36, + 0xf5, 0xec, 0xbe, 0xbd, 0x34, 0xfe, 0x7b, 0x61, 0x0d, 0x95, 0x93, 0x7f, 0x00, 0x9d, 0x32, 0xfc, + 0x89, 0x05, 0xbc, 0x83, 0xb6, 0x42, 0x7c, 0x52, 0xfe, 0x8f, 0xb0, 0xa2, 0xa3, 0x9f, 0x98, 0xbe, + 0x03, 0xed, 0x1f, 0x91, 0x88, 0x7f, 0x96, 0x77, 0xfb, 0xdb, 0x06, 0x77, 0x98, 0xdc, 0x10, 0x26, + 0xb0, 0x03, 0xb5, 0x34, 0x91, 0x65, 0xb4, 0xc2, 0x5a, 0x9a, 0xe0, 0x3e, 0x34, 0xc4, 0x34, 0x27, + 0x5e, 0xad, 0x6f, 0xef, 0x74, 0x06, 0xaf, 0x2a, 0xc0, 0x2a, 0xec, 0x6c, 0x9a, 0x93, 0x50, 0x3a, + 0xe2, 0x6b, 0x68, 0x89, 0x74, 0x4c, 0xb8, 0x88, 0xc6, 0xb9, 0x57, 0xef, 0xdb, 0x3b, 0xf5, 0xf0, + 0x4e, 0x81, 0x5d, 0xa8, 0x0b, 0x31, 0xf2, 0x1a, 0x52, 0x5f, 0x1c, 0x8b, 0xda, 0xc9, 0x0d, 0xc9, + 0x04, 0xf7, 0x1c, 0x43, 0xed, 0xc7, 0x85, 0x39, 0xd4, 0x5e, 0xfe, 0x06, 0x34, 0x4f, 0xe9, 0x28, + 0x8d, 0xd3, 0x4a, 0xad, 0xfe, 0x0b, 0x58, 0xfd, 0xc6, 0x68, 0x4c, 0x38, 0x9f, 0x31, 0xa5, 0x0b, + 0x9d, 0x23, 0x46, 0x22, 0x41, 0xe6, 0x35, 0x9f, 0xc8, 0x88, 0xdc, 0xd7, 0x9c, 0xe7, 0xc9, 0xbc, + 0xcf, 0x2f, 0x1b, 0x1c, 0x99, 0x15, 0x03, 0xdd, 0xbe, 0x2d, 0xdb, 0xef, 0x2d, 0xaf, 0xcd, 0xd4, + 0x7d, 0x6d, 0xb1, 0xfb, 0x3d, 0x70, 0x64, 0x9c, 0xbc, 0x17, 0xf3, 0x33, 0x29, 0x27, 0xff, 0x1c, + 0x1c, 0xf9, 0xcc, 0xe8, 0x41, 0x93, 0x13, 0x76, 0x93, 0xc6, 0x44, 0x37, 0x5b, 0x8a, 0x85, 0xe5, + 0x32, 0x12, 0xe4, 0x36, 0x9a, 0xca, 0x64, 0xad, 0xb0, 0x14, 0x0b, 0x4b, 0x46, 0xc4, 0x2d, 0x65, + 0x57, 0x32, 0x59, 0x2b, 0x2c, 0x45, 0xff, 0xaf, 0x0d, 0x8e, 0xcc, 0xf3, 0x30, 0x6e, 0x94, 0x24, + 0x8c, 0x70, 0x5e, 0xe2, 0x6a, 0x71, 0x3e, 0x63, 0xdd, 0x98, 0xb1, 0x71, 0x2f, 0x23, 0xae, 0x69, + 0x7a, 0x32, 0xcf, 0x91, 0x06, 0x2d, 0x21, 0x42, 0x63, 0x94, 0x66, 0x57, 0x9e, 0x2b, 0xb5, 0xf2, + 0x5c, 0xf8, 0x8e, 0x89, 0x60, 0x69, 0xec, 0x35, 0xe5, 0xed, 0x69, 0xc9, 0x1f, 0x80, 0x7b, 0x2a, + 0x22, 0x31, 0xe1, 0x45, 0x54, 0x4c, 0x93, 0xb2, 0x64, 0x79, 0xc6, 0x97, 0xe0, 0x10, 0xc6, 0x28, + 0xd3, 0xd5, 0x2a, 0xc1, 0x1f, 0x42, 0x47, 0xc5, 0xcc, 0x06, 0x65, 0x1f, 0x5c, 0x2e, 0x35, 0x7a, + 0xd0, 0xd6, 0x2b, 0x2f, 0xa0, 0x03, 0xb4, 0xdb, 0xee, 0x00, 0xe0, 0x8e, 0xe1, 0x88, 0xd0, 0x51, + 0xd2, 0x30, 0xcb, 0xe8, 0x24, 0x8b, 0x49, 0xd7, 0xc2, 0x2e, 0xb4, 0x95, 0x4e, 0x71, 0xa8, 0x6b, + 0xef, 0xee, 0x43, 0x6b, 0x46, 0x0b, 0x04, 0x70, 0x15, 0x01, 0xbb, 0x56, 0x71, 0x56, 0xd4, 0xeb, + 0xda, 0xc5, 0x59, 0x07, 0xd4, 0x06, 0x7f, 0xea, 0xe0, 0x86, 0xea, 0x4a, 0xbe, 0x82, 0xab, 0x56, + 0x0b, 0x6e, 0x55, 0x4a, 0xbb, 0xb7, 0xb2, 0x7a, 0xdb, 0x46, 0xbb, 0x26, 0xb1, 0x85, 0x87, 0xe0, + 0xc8, 0x31, 0xc7, 0xcd, 0x8a, 0xef, 0xfc, 0xf8, 0xf7, 0x0c, 0x23, 0xe7, 0x5b, 0x6f, 0x6c, 0x3c, + 0x84, 0x96, 0x6a, 0x2f, 0xe5, 0x04, 0xbd, 0x2a, 0x61, 0x35, 0xc4, 0xba, 0x61, 0x31, 0x48, 0x8c, + 0x83, 0xbb, 0x91, 0x35, 0x23, 0x6c, 0x2c, 0xb1, 0xcc, 0x3a, 0xf9, 0x0c, 0x4d, 0x3d, 0xd9, 0x68, + 0xca, 0xd4, 0xeb, 0x57, 0x0c, 0x8b, 0xcb, 0xc0, 0xc2, 0xe3, 0x19, 0x8b, 0xcc, 0x85, 0x6c, 0x9b, + 0x38, 0x31, 0x83, 0x19, 0xfc, 0xab, 0x81, 0x73, 0x16, 0x5d, 0x8c, 0x08, 0x1e, 0x95, 0xcf, 0x8b, + 0x86, 0x61, 0x5e, 0x02, 0xb7, 0xb0, 0x90, 0xac, 0x02, 0x44, 0xf1, 0xe2, 0x11, 0x20, 0x0b, 0x3b, + 0x4c, 0x82, 0x28, 0x42, 0x3d, 0x02, 0x64, 0x61, 0xed, 0x59, 0x38, 0x84, 0x46, 0xf1, 0x61, 0x7d, + 0xe0, 0x76, 0xaa, 0x54, 0x9a, 0xff, 0x12, 0xfb, 0x16, 0x7e, 0x29, 0xb7, 0xd6, 0xa6, 0xe1, 0x23, + 0xa6, 0x81, 0xb6, 0x4c, 0xe6, 0x12, 0xe9, 0xc2, 0x95, 0x7f, 0x0a, 0xde, 0xfe, 0x0f, 0x00, 0x00, + 0xff, 0xff, 0xb7, 0x25, 0xac, 0xac, 0x24, 0x08, 0x00, 0x00, } diff --git a/router/proto/router.proto b/router/proto/router.proto index 9c2ebed4..0ae180c9 100644 --- a/router/proto/router.proto +++ b/router/proto/router.proto @@ -74,6 +74,12 @@ message Advert { repeated Event events = 5; } +// Solicit solicits routes +message Solicit { + // id of the soliciting router + string id = 1; +} + // ProcessResponse is returned by Process message ProcessResponse {} diff --git a/router/table.go b/router/table.go index c61b560c..1db4c265 100644 --- a/router/table.go +++ b/router/table.go @@ -110,8 +110,12 @@ func (t *table) Update(r Route) error { if _, ok := t.routes[service][sum]; !ok { t.routes[service][sum] = r go t.sendEvent(&Event{Type: Update, Timestamp: time.Now(), Route: r}) + return nil } + t.routes[service][sum] = r + go t.sendEvent(&Event{Type: Update, Timestamp: time.Now(), Route: r}) + return nil }