Merge pull request #750 from milosgajdos83/node-peers

[WIP] Neighbour is now Peer. Peer is a node in network topology.
This commit is contained in:
Asim Aslam 2019-09-13 12:00:16 -07:00 committed by GitHub
commit ef86c9625b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1042 additions and 1157 deletions

View File

@ -1,7 +1,6 @@
package network package network
import ( import (
"container/list"
"errors" "errors"
"sync" "sync"
"time" "time"
@ -30,61 +29,10 @@ var (
) )
var ( var (
// ErrMsgUnknown is returned when unknown message is attempted to send or receive
ErrMsgUnknown = errors.New("unknown message")
// ErrClientNotFound is returned when client for tunnel channel could not be found // ErrClientNotFound is returned when client for tunnel channel could not be found
ErrClientNotFound = errors.New("client not found") ErrClientNotFound = errors.New("client not found")
) )
// node is network node
type node struct {
sync.RWMutex
// id is node id
id string
// address is node address
address string
// neighbours maps the node neighbourhood
neighbours map[string]*node
// network returns the node network
network Network
// lastSeen stores the time the node has been seen last time
lastSeen time.Time
}
// Id is node ide
func (n *node) Id() string {
return n.id
}
// Address returns node address
func (n *node) Address() string {
return n.address
}
// Network returns node network
func (n *node) Network() Network {
return n.network
}
// Neighbourhood returns node neighbourhood
func (n *node) Neighbourhood() []Node {
var nodes []Node
n.RLock()
for _, neighbourNode := range n.neighbours {
// make a copy of the node
n := &node{
id: neighbourNode.id,
address: neighbourNode.address,
network: neighbourNode.network,
}
// NOTE: we do not care about neighbour's neighbours
nodes = append(nodes, n)
}
n.RUnlock()
return nodes
}
// network implements Network interface // network implements Network interface
type network struct { type network struct {
// node is network node // node is network node
@ -158,7 +106,7 @@ func newNetwork(opts ...Option) Network {
node: &node{ node: &node{
id: options.Id, id: options.Id,
address: options.Address, address: options.Address,
neighbours: make(map[string]*node), peers: make(map[string]*node),
}, },
options: options, options: options,
Router: options.Router, Router: options.Router,
@ -176,9 +124,10 @@ func newNetwork(opts ...Option) Network {
// Options returns network options // Options returns network options
func (n *network) Options() Options { func (n *network) Options() Options {
n.Lock() n.RLock()
defer n.RUnlock()
options := n.options options := n.options
n.Unlock()
return options return options
} }
@ -248,7 +197,6 @@ func (n *network) handleNetConn(sess tunnel.Session, msg chan *transport.Message
for { for {
m := new(transport.Message) m := new(transport.Message)
if err := sess.Recv(m); err != nil { if err := sess.Recv(m); err != nil {
// TODO: should we bail here?
log.Debugf("Network tunnel [%s] receive error: %v", NetworkChannel, err) log.Debugf("Network tunnel [%s] receive error: %v", NetworkChannel, err)
return return
} }
@ -307,81 +255,70 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen
if pbNetConnect.Node.Id == n.options.Id { if pbNetConnect.Node.Id == n.options.Id {
continue continue
} }
n.Lock()
log.Debugf("Network received connect message from: %s", pbNetConnect.Node.Id) log.Debugf("Network received connect message from: %s", pbNetConnect.Node.Id)
// if the entry already exists skip adding it peer := &node{
if neighbour, ok := n.neighbours[pbNetConnect.Node.Id]; ok {
// update lastSeen timestamp
if n.neighbours[pbNetConnect.Node.Id].lastSeen.Before(now) {
neighbour.lastSeen = now
}
n.Unlock()
continue
}
// add a new neighbour
// NOTE: new node does not have any neighbours
n.neighbours[pbNetConnect.Node.Id] = &node{
id: pbNetConnect.Node.Id, id: pbNetConnect.Node.Id,
address: pbNetConnect.Node.Address, address: pbNetConnect.Node.Address,
neighbours: make(map[string]*node), peers: make(map[string]*node),
lastSeen: now, lastSeen: now,
} }
n.Unlock() if ok := n.node.AddPeer(peer); !ok {
log.Debugf("Network peer exists, refreshing: %s", peer.id)
// update lastSeen time for the existing node
if ok := n.RefreshPeer(peer.id, now); !ok {
log.Debugf("Network failed refreshing peer: %s", peer.id)
}
continue
}
// get node peers down to MaxDepth encoded in protobuf
msg := PeersToProto(n.node, MaxDepth)
// advertise yourself to the network // advertise yourself to the network
if err := n.sendMsg("neighbour", NetworkChannel); err != nil { if err := n.sendMsg("peer", msg, NetworkChannel); err != nil {
log.Debugf("Network failed to advertise neighbours: %v", err) log.Debugf("Network failed to advertise peers: %v", err)
} }
// advertise all the routes when a new node has connected // advertise all the routes when a new node has connected
if err := n.Router.Solicit(); err != nil { if err := n.Router.Solicit(); err != nil {
log.Debugf("Network failed to solicit routes: %s", err) log.Debugf("Network failed to solicit routes: %s", err)
} }
case "neighbour": case "peer":
// mark the time the message has been received // mark the time the message has been received
now := time.Now() now := time.Now()
pbNetNeighbour := &pbNet.Neighbour{} pbNetPeer := &pbNet.Peer{}
if err := proto.Unmarshal(m.Body, pbNetNeighbour); err != nil { if err := proto.Unmarshal(m.Body, pbNetPeer); err != nil {
log.Debugf("Network tunnel [%s] neighbour unmarshal error: %v", NetworkChannel, err) log.Debugf("Network tunnel [%s] peer unmarshal error: %v", NetworkChannel, err)
continue continue
} }
// don't process your own messages // don't process your own messages
if pbNetNeighbour.Node.Id == n.options.Id { if pbNetPeer.Node.Id == n.options.Id {
continue continue
} }
n.Lock() log.Debugf("Network received peer message from: %s", pbNetPeer.Node.Id)
log.Debugf("Network received neighbour message from: %s", pbNetNeighbour.Node.Id) peer := &node{
// only add the neighbour if it is NOT already in node's list of neighbours id: pbNetPeer.Node.Id,
_, exists := n.neighbours[pbNetNeighbour.Node.Id] address: pbNetPeer.Node.Address,
if !exists { peers: make(map[string]*node),
n.neighbours[pbNetNeighbour.Node.Id] = &node{
id: pbNetNeighbour.Node.Id,
address: pbNetNeighbour.Node.Address,
neighbours: make(map[string]*node),
lastSeen: now, lastSeen: now,
} }
if ok := n.node.AddPeer(peer); ok {
// send a solicit message when discovering new peer
msg := &pbRtr.Solicit{
Id: n.options.Id,
} }
// update lastSeen timestamp if err := n.sendMsg("solicit", msg, ControlChannel); err != nil {
if n.neighbours[pbNetNeighbour.Node.Id].lastSeen.Before(now) {
n.neighbours[pbNetNeighbour.Node.Id].lastSeen = now
}
// update/store the neighbour node neighbours
// NOTE: * we do NOT update lastSeen time for the neighbours of the neighbour
// * even though we are NOT interested in neighbours of neighbours here
// we still allocate the map of neighbours for each of them
for _, pbNeighbour := range pbNetNeighbour.Neighbours {
neighbourNode := &node{
id: pbNeighbour.Id,
address: pbNeighbour.Address,
neighbours: make(map[string]*node),
}
n.neighbours[pbNetNeighbour.Node.Id].neighbours[pbNeighbour.Id] = neighbourNode
}
n.Unlock()
// send a solicit message when discovering a new node
// NOTE: we need to send the solicit message here after the Lock is released as sendMsg locks, too
if !exists {
if err := n.sendMsg("solicit", ControlChannel); err != nil {
log.Debugf("Network failed to send solicit message: %s", err) log.Debugf("Network failed to send solicit message: %s", err)
} }
continue
}
log.Debugf("Network peer exists, refreshing: %s", pbNetPeer.Node.Id)
// update lastSeen time for the peer
if ok := n.RefreshPeer(pbNetPeer.Node.Id, now); !ok {
log.Debugf("Network failed refreshing peer: %s", pbNetPeer.Node.Id)
}
// NOTE: we don't unpack MaxDepth toplogy
peer = UnpackPeerTopology(pbNetPeer, now, MaxDepth-1)
log.Debugf("Network updating topology of node: %s", n.node.id)
if ok := n.node.UpdatePeer(peer); !ok {
log.Debugf("Network failed to update peers")
} }
case "close": case "close":
pbNetClose := &pbNet.Close{} pbNetClose := &pbNet.Close{}
@ -393,13 +330,11 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen
if pbNetClose.Node.Id == n.options.Id { if pbNetClose.Node.Id == n.options.Id {
continue continue
} }
n.Lock()
log.Debugf("Network received close message from: %s", pbNetClose.Node.Id) log.Debugf("Network received close message from: %s", pbNetClose.Node.Id)
if err := n.pruneNode(pbNetClose.Node.Id); err != nil { if err := n.pruneNode(pbNetClose.Node.Id); err != nil {
log.Debugf("Network failed to prune the node %s: %v", pbNetClose.Node.Id, err) log.Debugf("Network failed to prune the node %s: %v", pbNetClose.Node.Id, err)
continue continue
} }
n.Unlock()
} }
case <-n.closed: case <-n.closed:
return return
@ -408,68 +343,29 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen
} }
// sendMsg sends a message to the tunnel channel // sendMsg sends a message to the tunnel channel
func (n *network) sendMsg(msgType string, channel string) error { func (n *network) sendMsg(method string, msg proto.Message, channel string) error {
node := &pbNet.Node{ body, err := proto.Marshal(msg)
Id: n.options.Id,
Address: n.options.Address,
}
var protoMsg proto.Message
switch msgType {
case "connect":
protoMsg = &pbNet.Connect{
Node: node,
}
case "close":
protoMsg = &pbNet.Close{
Node: node,
}
case "solicit":
protoMsg = &pbNet.Solicit{
Node: node,
}
case "neighbour":
n.RLock()
nodes := make([]*pbNet.Node, len(n.neighbours))
i := 0
for id := range n.neighbours {
nodes[i] = &pbNet.Node{
Id: id,
Address: n.neighbours[id].address,
}
i++
}
n.RUnlock()
protoMsg = &pbNet.Neighbour{
Node: node,
Neighbours: nodes,
}
default:
return ErrMsgUnknown
}
body, err := proto.Marshal(protoMsg)
if err != nil { if err != nil {
return err return err
} }
// create transport message and chuck it down the pipe // create transport message and chuck it down the pipe
m := transport.Message{ m := transport.Message{
Header: map[string]string{ Header: map[string]string{
"Micro-Method": msgType, "Micro-Method": method,
}, },
Body: body, Body: body,
} }
// check if the channel client is initialized
n.RLock() n.RLock()
client, ok := n.tunClient[channel] client, ok := n.tunClient[channel]
if !ok { if !ok || client == nil {
n.RUnlock() n.RUnlock()
return ErrClientNotFound return ErrClientNotFound
} }
n.RUnlock() n.RUnlock()
log.Debugf("Network sending %s message from: %s", msgType, node.Id) log.Debugf("Network sending %s message from: %s", method, n.options.Id)
if err := client.Send(&m); err != nil { if err := client.Send(&m); err != nil {
return err return err
} }
@ -477,7 +373,7 @@ func (n *network) sendMsg(msgType string, channel string) error {
return nil return nil
} }
// announce announces node neighbourhood to the network // announce announces node peers to the network
func (n *network) announce(client transport.Client) { func (n *network) announce(client transport.Client) {
announce := time.NewTicker(AnnounceTime) announce := time.NewTicker(AnnounceTime)
defer announce.Stop() defer announce.Stop()
@ -487,19 +383,20 @@ func (n *network) announce(client transport.Client) {
case <-n.closed: case <-n.closed:
return return
case <-announce.C: case <-announce.C:
msg := PeersToProto(n.node, MaxDepth)
// advertise yourself to the network // advertise yourself to the network
if err := n.sendMsg("neighbour", NetworkChannel); err != nil { if err := n.sendMsg("peer", msg, NetworkChannel); err != nil {
log.Debugf("Network failed to advertise neighbours: %v", err) log.Debugf("Network failed to advertise peers: %v", err)
continue continue
} }
} }
} }
} }
// pruneNode removes a node with given id from the list of neighbours. It also removes all routes originted by this node. // pruneNode removes a node with given id from the list of peers. It also removes all routes originted by this node.
// NOTE: this method is not thread-safe; when calling it make sure you lock the particular code segment
func (n *network) pruneNode(id string) error { func (n *network) pruneNode(id string) error {
delete(n.neighbours, id) // DeletePeer serializes access
n.node.DeletePeer(id)
// lookup all the routes originated at this node // lookup all the routes originated at this node
q := router.NewQuery( q := router.NewQuery(
router.QueryRouter(id), router.QueryRouter(id),
@ -531,7 +428,7 @@ func (n *network) prune() {
return return
case <-prune.C: case <-prune.C:
n.Lock() n.Lock()
for id, node := range n.neighbours { for id, node := range n.peers {
if id == n.options.Id { if id == n.options.Id {
continue continue
} }
@ -589,8 +486,8 @@ func (n *network) acceptCtrlConn(l tunnel.Listener, recv chan *transport.Message
// setRouteMetric calculates metric of the route and updates it in place // setRouteMetric calculates metric of the route and updates it in place
// - Local route metric is 1 // - Local route metric is 1
// - Routes with ID of adjacent neighbour are 10 // - Routes with ID of adjacent nodes are 10
// - Routes of neighbours of the advertiser are 100 // - Routes by peers of the advertiser are 100
// - Routes beyond your neighbourhood are 1000 // - Routes beyond your neighbourhood are 1000
func (n *network) setRouteMetric(route *router.Route) { func (n *network) setRouteMetric(route *router.Route) {
// we are the origin of the route // we are the origin of the route
@ -599,25 +496,21 @@ func (n *network) setRouteMetric(route *router.Route) {
return return
} }
n.RLock() // check if the route origin is our peer
// check if the route origin is our neighbour if _, ok := n.peers[route.Router]; ok {
if _, ok := n.neighbours[route.Router]; ok {
route.Metric = 10 route.Metric = 10
n.RUnlock()
return return
} }
// check if the route origin is the neighbour of our neighbour // check if the route origin is the peer of our peer
for _, node := range n.neighbours { for _, peer := range n.peers {
for id := range node.neighbours { for id := range peer.peers {
if route.Router == id { if route.Router == id {
route.Metric = 100 route.Metric = 100
n.RUnlock()
return return
} }
} }
} }
n.RUnlock()
// the origin of the route is beyond our neighbourhood // the origin of the route is beyond our neighbourhood
route.Metric = 1000 route.Metric = 1000
@ -637,7 +530,6 @@ func (n *network) processCtrlChan(client transport.Client, listener tunnel.Liste
// switch on type of message and take action // switch on type of message and take action
switch m.Header["Micro-Method"] { switch m.Header["Micro-Method"] {
case "advert": case "advert":
now := time.Now()
pbRtrAdvert := &pbRtr.Advert{} pbRtrAdvert := &pbRtr.Advert{}
if err := proto.Unmarshal(m.Body, pbRtrAdvert); err != nil { if err := proto.Unmarshal(m.Body, pbRtrAdvert); err != nil {
log.Debugf("Network fail to unmarshal advert message: %v", err) log.Debugf("Network fail to unmarshal advert message: %v", err)
@ -647,41 +539,23 @@ func (n *network) processCtrlChan(client transport.Client, listener tunnel.Liste
if pbRtrAdvert.Id == n.options.Id { if pbRtrAdvert.Id == n.options.Id {
continue continue
} }
// loookup advertising node in our neighbourhood log.Debugf("Network received advert message with %d events from: %s", len(pbRtrAdvert.Events), pbRtrAdvert.Id)
n.RLock() // loookup advertising node in our peer topology
log.Debugf("Network received advert message from: %s", pbRtrAdvert.Id) advertNode := n.node.GetPeerNode(pbRtrAdvert.Id)
advertNode, ok := n.neighbours[pbRtrAdvert.Id] if advertNode == nil {
if !ok { // if we can't find the node in our topology (MaxDepth) we skipp prcessing adverts
// advertising node has not been registered as our neighbour, yet log.Debugf("Network skipping advert message from unknown peer: %s", pbRtrAdvert.Id)
// let's add it to the map of our neighbours continue
advertNode = &node{
id: pbRtrAdvert.Id,
neighbours: make(map[string]*node),
lastSeen: now,
} }
n.neighbours[pbRtrAdvert.Id] = advertNode
// send a solicit message when discovering a new node
if err := n.sendMsg("solicit", NetworkChannel); err != nil {
log.Debugf("Network failed to send solicit message: %s", err)
}
}
n.RUnlock()
var events []*router.Event var events []*router.Event
for _, event := range pbRtrAdvert.Events { for _, event := range pbRtrAdvert.Events {
// set the address of the advertising node
// we know Route.Gateway is the address of advertNode
// NOTE: this is true only when advertNode had not been registered
// as our neighbour when we received the advert from it
if advertNode.address == "" {
advertNode.address = event.Route.Gateway
}
// if advertising node id is not the same as Route.Router
// we know the advertising node is not the origin of the route // we know the advertising node is not the origin of the route
if advertNode.id != event.Route.Router { if pbRtrAdvert.Id != event.Route.Router {
// if the origin router is not in the advertising node neighbourhood // if the origin router is not the advertising node peer
// we can't rule out potential routing loops so we bail here // we can't rule out potential routing loops so we bail here
if _, ok := advertNode.neighbours[event.Route.Router]; !ok { if peer := advertNode.GetPeerNode(event.Route.Router); peer == nil {
log.Debugf("Network skipping advert message from peer: %s", pbRtrAdvert.Id)
continue continue
} }
} }
@ -695,7 +569,9 @@ func (n *network) processCtrlChan(client transport.Client, listener tunnel.Liste
Metric: int(event.Route.Metric), Metric: int(event.Route.Metric),
} }
// set the route metric // set the route metric
n.node.RLock()
n.setRouteMetric(&route) n.setRouteMetric(&route)
n.node.RUnlock()
// throw away metric bigger than 1000 // throw away metric bigger than 1000
if route.Metric > 1000 { if route.Metric > 1000 {
log.Debugf("Network route metric %d dropping node: %s", route.Metric, route.Router) log.Debugf("Network route metric %d dropping node: %s", route.Metric, route.Router)
@ -709,6 +585,7 @@ func (n *network) processCtrlChan(client transport.Client, listener tunnel.Liste
} }
events = append(events, e) events = append(events, e)
} }
// create an advert and process it
advert := &router.Advert{ advert := &router.Advert{
Id: pbRtrAdvert.Id, Id: pbRtrAdvert.Id,
Type: router.AdvertType(pbRtrAdvert.Type), Type: router.AdvertType(pbRtrAdvert.Type),
@ -717,21 +594,22 @@ func (n *network) processCtrlChan(client transport.Client, listener tunnel.Liste
Events: events, Events: events,
} }
log.Debugf("Network router processing advert: %s", advert.Id)
if err := n.Router.Process(advert); err != nil { if err := n.Router.Process(advert); err != nil {
log.Debugf("Network failed to process advert %s: %v", advert.Id, err) log.Debugf("Network failed to process advert %s: %v", advert.Id, err)
continue
} }
case "solicit": case "solicit":
pbNetSolicit := &pbNet.Solicit{} pbRtrSolicit := &pbRtr.Solicit{}
if err := proto.Unmarshal(m.Body, pbNetSolicit); err != nil { if err := proto.Unmarshal(m.Body, pbRtrSolicit); err != nil {
log.Debugf("Network fail to unmarshal solicit message: %v", err) log.Debugf("Network fail to unmarshal solicit message: %v", err)
continue continue
} }
log.Debugf("Network received solicit message from: %s", pbNetSolicit.Node.Id) log.Debugf("Network received solicit message from: %s", pbRtrSolicit.Id)
// don't process your own messages // ignore solicitation when requested by you
if pbNetSolicit.Node.Id == n.options.Id { if pbRtrSolicit.Id == n.options.Id {
continue continue
} }
log.Debugf("Network router flushing routes for: %s", pbRtrSolicit.Id)
// advertise all the routes when a new node has connected // advertise all the routes when a new node has connected
if err := n.Router.Solicit(); err != nil { if err := n.Router.Solicit(); err != nil {
log.Debugf("Network failed to solicit routes: %s", err) log.Debugf("Network failed to solicit routes: %s", err)
@ -769,29 +647,14 @@ func (n *network) advertise(client transport.Client, advertChan <-chan *router.A
} }
events = append(events, e) events = append(events, e)
} }
pbRtrAdvert := &pbRtr.Advert{ msg := &pbRtr.Advert{
Id: advert.Id, Id: advert.Id,
Type: pbRtr.AdvertType(advert.Type), Type: pbRtr.AdvertType(advert.Type),
Timestamp: advert.Timestamp.UnixNano(), Timestamp: advert.Timestamp.UnixNano(),
Events: events, Events: events,
} }
body, err := proto.Marshal(pbRtrAdvert) if err := n.sendMsg("advert", msg, ControlChannel); err != nil {
if err != nil { log.Debugf("Network failed to advertise routes: %v", err)
// TODO: should we bail here?
log.Debugf("Network failed to marshal advert message: %v", err)
continue
}
// create transport message and chuck it down the pipe
m := transport.Message{
Header: map[string]string{
"Micro-Method": "advert",
},
Body: body,
}
log.Debugf("Network sending advert message from: %s", pbRtrAdvert.Id)
if err := client.Send(&m); err != nil {
log.Debugf("Network failed to send advert %s: %v", pbRtrAdvert.Id, err)
continue continue
} }
case <-n.closed: case <-n.closed:
@ -805,6 +668,7 @@ func (n *network) Connect() error {
n.Lock() n.Lock()
// return if already connected // return if already connected
if n.connected { if n.connected {
n.Unlock()
return nil return nil
} }
@ -816,6 +680,7 @@ func (n *network) Connect() error {
// connect network tunnel // connect network tunnel
if err := n.Tunnel.Connect(); err != nil { if err := n.Tunnel.Connect(); err != nil {
n.Unlock()
return err return err
} }
@ -827,6 +692,7 @@ func (n *network) Connect() error {
// dial into ControlChannel to send route adverts // dial into ControlChannel to send route adverts
ctrlClient, err := n.Tunnel.Dial(ControlChannel, tunnel.DialMulticast()) ctrlClient, err := n.Tunnel.Dial(ControlChannel, tunnel.DialMulticast())
if err != nil { if err != nil {
n.Unlock()
return err return err
} }
@ -835,12 +701,14 @@ func (n *network) Connect() error {
// listen on ControlChannel // listen on ControlChannel
ctrlListener, err := n.Tunnel.Listen(ControlChannel) ctrlListener, err := n.Tunnel.Listen(ControlChannel)
if err != nil { if err != nil {
n.Unlock()
return err return err
} }
// dial into NetworkChannel to send network messages // dial into NetworkChannel to send network messages
netClient, err := n.Tunnel.Dial(NetworkChannel, tunnel.DialMulticast()) netClient, err := n.Tunnel.Dial(NetworkChannel, tunnel.DialMulticast())
if err != nil { if err != nil {
n.Unlock()
return err return err
} }
@ -849,6 +717,7 @@ func (n *network) Connect() error {
// listen on NetworkChannel // listen on NetworkChannel
netListener, err := n.Tunnel.Listen(NetworkChannel) netListener, err := n.Tunnel.Listen(NetworkChannel)
if err != nil { if err != nil {
n.Unlock()
return err return err
} }
@ -857,17 +726,20 @@ func (n *network) Connect() error {
// start the router // start the router
if err := n.options.Router.Start(); err != nil { if err := n.options.Router.Start(); err != nil {
n.Unlock()
return err return err
} }
// start advertising routes // start advertising routes
advertChan, err := n.options.Router.Advertise() advertChan, err := n.options.Router.Advertise()
if err != nil { if err != nil {
n.Unlock()
return err return err
} }
// start the server // start the server
if err := n.server.Start(); err != nil { if err := n.server.Start(); err != nil {
n.Unlock()
return err return err
} }
n.Unlock() n.Unlock()
@ -876,13 +748,19 @@ func (n *network) Connect() error {
// NOTE: in theory we could do this as soon as // NOTE: in theory we could do this as soon as
// Dial to NetworkChannel succeeds, but instead // Dial to NetworkChannel succeeds, but instead
// we initialize all other node resources first // we initialize all other node resources first
if err := n.sendMsg("connect", NetworkChannel); err != nil { msg := &pbNet.Connect{
Node: &pbNet.Node{
Id: n.options.Id,
Address: n.options.Address,
},
}
if err := n.sendMsg("connect", msg, NetworkChannel); err != nil {
log.Debugf("Network failed to send connect message: %s", err) log.Debugf("Network failed to send connect message: %s", err)
} }
// go resolving network nodes // go resolving network nodes
go n.resolve() go n.resolve()
// broadcast neighbourhood // broadcast peers
go n.announce(netClient) go n.announce(netClient)
// prune stale nodes // prune stale nodes
go n.prune() go n.prune()
@ -900,48 +778,6 @@ func (n *network) Connect() error {
return nil return nil
} }
// Nodes returns a list of all network nodes
func (n *network) Nodes() []Node {
//track the visited nodes
visited := make(map[string]*node)
// queue of the nodes to visit
queue := list.New()
// we need to freeze the network graph here
// otherwise we might get invalid results
n.RLock()
defer n.RUnlock()
// push network node to the back of queue
queue.PushBack(n.node)
// mark the node as visited
visited[n.node.id] = n.node
// keep iterating over the queue until its empty
for queue.Len() > 0 {
// pop the node from the front of the queue
qnode := queue.Front()
// iterate through all of its neighbours
// mark the visited nodes; enqueue the non-visted
for id, node := range qnode.Value.(*node).neighbours {
if _, ok := visited[id]; !ok {
visited[id] = node
queue.PushBack(node)
}
}
// remove the node from the queue
queue.Remove(qnode)
}
var nodes []Node
// collect all the nodes and return them
for _, node := range visited {
nodes = append(nodes, node)
}
return nodes
}
func (n *network) close() error { func (n *network) close() error {
// stop the server // stop the server
if err := n.server.Stop(); err != nil { if err := n.server.Stop(); err != nil {
@ -963,7 +799,6 @@ func (n *network) close() error {
// Close closes network connection // Close closes network connection
func (n *network) Close() error { func (n *network) Close() error {
// lock this operation
n.Lock() n.Lock()
if !n.connected { if !n.connected {
@ -984,9 +819,13 @@ func (n *network) Close() error {
// unlock the lock otherwise we'll deadlock sending the close // unlock the lock otherwise we'll deadlock sending the close
n.Unlock() n.Unlock()
// send close message only if we managed to connect to NetworkChannel msg := &pbNet.Close{
log.Debugf("Sending close message from: %s", n.options.Id) Node: &pbNet.Node{
if err := n.sendMsg("close", NetworkChannel); err != nil { Id: n.options.Id,
Address: n.options.Address,
},
}
if err := n.sendMsg("close", msg, NetworkChannel); err != nil {
log.Debugf("Network failed to send close message: %s", err) log.Debugf("Network failed to send close message: %s", err)
} }
} }

View File

@ -3,7 +3,6 @@ package handler
import ( import (
"context" "context"
"sort"
"github.com/micro/go-micro/errors" "github.com/micro/go-micro/errors"
"github.com/micro/go-micro/network" "github.com/micro/go-micro/network"
@ -16,6 +15,21 @@ type Network struct {
Network network.Network Network network.Network
} }
// ListPeers returns a list of all the nodes the node has a direct link with
func (n *Network) ListPeers(ctx context.Context, req *pbNet.PeerRequest, resp *pbNet.PeerResponse) error {
depth := uint(req.Depth)
if depth <= 0 || depth > network.MaxDepth {
depth = network.MaxDepth
}
// get peers encoded into protobuf
peers := network.PeersToProto(n.Network, depth)
resp.Peers = peers
return nil
}
// ListRoutes returns a list of routing table routes // ListRoutes returns a list of routing table routes
func (n *Network) ListRoutes(ctx context.Context, req *pbRtr.Request, resp *pbRtr.ListResponse) error { func (n *Network) ListRoutes(ctx context.Context, req *pbRtr.Request, resp *pbRtr.ListResponse) error {
routes, err := n.Network.Options().Router.Table().List() routes, err := n.Network.Options().Router.Table().List()
@ -41,71 +55,3 @@ func (n *Network) ListRoutes(ctx context.Context, req *pbRtr.Request, resp *pbRt
return nil return nil
} }
// ListNodes returns a list of all accessible nodes in the network
func (n *Network) ListNodes(ctx context.Context, req *pbNet.ListRequest, resp *pbNet.ListResponse) error {
nodes := n.Network.Nodes()
var respNodes []*pbNet.Node
for _, node := range nodes {
respNode := &pbNet.Node{
Id: node.Id(),
Address: node.Address(),
}
respNodes = append(respNodes, respNode)
}
resp.Nodes = respNodes
return nil
}
// Neighbourhood returns a list of immediate neighbours
func (n *Network) Neighbourhood(ctx context.Context, req *pbNet.NeighbourhoodRequest, resp *pbNet.NeighbourhoodResponse) error {
// extract the id of the node to query
id := req.Id
// if no id is passed, we assume local node
if id == "" {
id = n.Network.Id()
}
// get all the nodes in the network
nodes := n.Network.Nodes()
// sort the slice of nodes
sort.Slice(nodes, func(i, j int) bool { return nodes[i].Id() <= nodes[j].Id() })
// find a node with a given id
i := sort.Search(len(nodes), func(j int) bool { return nodes[j].Id() >= id })
var neighbours []*pbNet.Node
// collect all the nodes in the neighbourhood of the found node
if i < len(nodes) && nodes[i].Id() == id {
for _, neighbour := range nodes[i].Neighbourhood() {
// don't return yourself in response
if neighbour.Id() == n.Network.Id() {
continue
}
pbNeighbour := &pbNet.Node{
Id: neighbour.Id(),
Address: neighbour.Address(),
}
neighbours = append(neighbours, pbNeighbour)
}
}
// requested neighbourhood node
node := &pbNet.Node{
Id: nodes[i].Id(),
Address: nodes[i].Address(),
}
// creaate neighbourhood answer
neighbourhood := &pbNet.Neighbour{
Node: node,
Neighbours: neighbours,
}
resp.Neighbourhood = neighbourhood
return nil
}

View File

@ -28,8 +28,8 @@ type Node interface {
Id() string Id() string
// Address is node bind address // Address is node bind address
Address() string Address() string
// Neighbourhood is node neighbourhood // Peers returns node peers
Neighbourhood() []Node Peers() []Node
// Network is the network node is in // Network is the network node is in
Network() Network Network() Network
} }
@ -44,8 +44,6 @@ type Network interface {
Name() string Name() string
// Connect starts the resolver and tunnel server // Connect starts the resolver and tunnel server
Connect() error Connect() error
// Nodes returns list of network nodes
Nodes() []Node
// Close stops the tunnel and resolving // Close stops the tunnel and resolving
Close() error Close() error
// Client is micro client // Client is micro client

310
network/node.go Normal file
View File

@ -0,0 +1,310 @@
package network
import (
"container/list"
"sync"
"time"
pb "github.com/micro/go-micro/network/proto"
)
var (
// MaxDepth defines max depth of peer topology
MaxDepth uint = 3
)
// node is network node
type node struct {
sync.RWMutex
// id is node id
id string
// address is node address
address string
// peers are nodes with direct link to this node
peers map[string]*node
// network returns the node network
network Network
// lastSeen keeps track of node lifetime and updates
lastSeen time.Time
}
// Id is node ide
func (n *node) Id() string {
return n.id
}
// Address returns node address
func (n *node) Address() string {
return n.address
}
// Network returns node network
func (n *node) Network() Network {
return n.network
}
// AddPeer adds a new peer to node topology
// It returns false if the peer already exists
func (n *node) AddPeer(peer *node) bool {
n.Lock()
defer n.Unlock()
if _, ok := n.peers[peer.id]; !ok {
n.peers[peer.id] = peer
return true
}
return false
}
// UpdatePeer updates a peer if it exists
// It returns false if the peer does not exist
func (n *node) UpdatePeer(peer *node) bool {
n.Lock()
defer n.Unlock()
if _, ok := n.peers[peer.id]; ok {
n.peers[peer.id] = peer
return true
}
return false
}
// DeletePeer deletes a peer if it exists
func (n *node) DeletePeer(id string) {
n.Lock()
defer n.Unlock()
delete(n.peers, id)
}
// HasPeer returns true if node has peer with given id
func (n *node) HasPeer(id string) bool {
n.RLock()
defer n.RUnlock()
_, ok := n.peers[id]
return ok
}
// RefreshPeer updates node timestamp
// It returns false if the peer has not been found.
func (n *node) RefreshPeer(id string, now time.Time) bool {
n.Lock()
defer n.Unlock()
peer, ok := n.peers[id]
if !ok {
return false
}
if peer.lastSeen.Before(now) {
peer.lastSeen = now
}
return true
}
func (n *node) walk(until func(peer *node) bool) map[string]*node {
// track the visited nodes
visited := make(map[string]*node)
// queue of the nodes to visit
queue := list.New()
// push node to the back of queue
queue.PushBack(n)
// mark the node as visited
visited[n.id] = n
// keep iterating over the queue until its empty
for queue.Len() > 0 {
// pop the node from the front of the queue
qnode := queue.Front()
// iterate through all of the node peers
// mark the visited nodes; enqueue the non-visted
for id, node := range qnode.Value.(*node).peers {
if _, ok := visited[id]; !ok {
visited[id] = node
queue.PushBack(node)
}
if until(node) {
return visited
}
}
// remove the node from the queue
queue.Remove(qnode)
}
return visited
}
// Nodes returns a slice if all nodes in node topology
func (n *node) Nodes() []Node {
// we need to freeze the network graph here
// otherwise we might get inconsisten results
n.RLock()
defer n.RUnlock()
// NOTE: this should never be true
untilNoMorePeers := func(n *node) bool {
return n == nil
}
visited := n.walk(untilNoMorePeers)
var nodes []Node
// collect all the nodes and return them
for _, node := range visited {
nodes = append(nodes, node)
}
return nodes
}
// GetPeerNode returns a peer from node topology i.e. up to MaxDepth
// It returns nil if the peer was not found in the node topology
func (n *node) GetPeerNode(id string) *node {
n.RLock()
defer n.RUnlock()
// get node topology up to MaxDepth
top := n.Topology(MaxDepth)
untilFoundPeer := func(n *node) bool {
return n.id == id
}
visited := top.walk(untilFoundPeer)
peerNode, ok := visited[id]
if !ok {
return nil
}
return peerNode
}
// Topology returns a copy of th node topology down to given depth
func (n *node) Topology(depth uint) *node {
n.RLock()
defer n.RUnlock()
// make a copy of yourself
node := &node{
id: n.id,
address: n.address,
peers: make(map[string]*node),
network: n.network,
lastSeen: n.lastSeen,
}
// return if we reach requested depth or we have no more peers
if depth == 0 || len(n.peers) == 0 {
return node
}
// decrement the depth
depth--
// iterate through our peers and update the node peers
for _, peer := range n.peers {
nodePeer := peer.Topology(depth)
if _, ok := node.peers[nodePeer.id]; !ok {
node.peers[nodePeer.id] = nodePeer
}
}
return node
}
// Peers returns node peers up to MaxDepth
func (n *node) Peers() []Node {
n.RLock()
defer n.RUnlock()
var peers []Node
for _, nodePeer := range n.peers {
peer := nodePeer.Topology(MaxDepth)
peers = append(peers, peer)
}
return peers
}
// UnpackPeerTopology unpacks pb.Peer into node topology of given depth
func UnpackPeerTopology(pbPeer *pb.Peer, lastSeen time.Time, depth uint) *node {
peerNode := &node{
id: pbPeer.Node.Id,
address: pbPeer.Node.Address,
peers: make(map[string]*node),
lastSeen: lastSeen,
}
// return if have either reached the depth or have no more peers
if depth == 0 || len(pbPeer.Peers) == 0 {
return peerNode
}
// decrement the depth
depth--
peers := make(map[string]*node)
for _, pbPeer := range pbPeer.Peers {
peer := UnpackPeerTopology(pbPeer, lastSeen, depth)
peers[pbPeer.Node.Id] = peer
}
peerNode.peers = peers
return peerNode
}
func peerProtoTopology(peer Node, depth uint) *pb.Peer {
node := &pb.Node{
Id: peer.Id(),
Address: peer.Address(),
}
pbPeers := &pb.Peer{
Node: node,
Peers: make([]*pb.Peer, 0),
}
// return if we reached the end of topology or depth
if depth == 0 || len(peer.Peers()) == 0 {
return pbPeers
}
// decrement the depth
depth--
// iterate through peers of peers aka pops
for _, pop := range peer.Peers() {
peer := peerProtoTopology(pop, depth)
pbPeers.Peers = append(pbPeers.Peers, peer)
}
return pbPeers
}
// PeersToProto returns node peers graph encoded into protobuf
func PeersToProto(node Node, depth uint) *pb.Peer {
// network node aka root node
pbNode := &pb.Node{
Id: node.Id(),
Address: node.Address(),
}
// we will build proto topology into this
pbPeers := &pb.Peer{
Node: pbNode,
Peers: make([]*pb.Peer, 0),
}
for _, peer := range node.Peers() {
pbPeer := peerProtoTopology(peer, depth)
pbPeers.Peers = append(pbPeers.Peers, pbPeer)
}
return pbPeers
}

279
network/node_test.go Normal file
View File

@ -0,0 +1,279 @@
package network
import (
"testing"
"time"
pb "github.com/micro/go-micro/network/proto"
)
var (
testNodeId = "testNode"
testNodeAddress = "testAddress"
testNodeNetName = "testNetwork"
testNodePeerIds = []string{"peer1", "peer2", "peer3"}
testPeerOfPeerIds = []string{"peer11", "peer12"}
)
func testSetup() *node {
testNode := &node{
id: testNodeId,
address: testNodeAddress,
peers: make(map[string]*node),
network: newNetwork(Name(testNodeNetName)),
}
// add some peers to the node
for _, id := range testNodePeerIds {
testNode.peers[id] = &node{
id: id,
address: testNode.address + "-" + id,
peers: make(map[string]*node),
network: testNode.network,
}
}
// add peers to peer1
// NOTE: these are peers of peers!
for _, id := range testPeerOfPeerIds {
testNode.peers["peer1"].peers[id] = &node{
id: id,
address: testNode.address + "-" + id,
peers: make(map[string]*node),
network: testNode.network,
}
}
// connect peer1 with peer2
testNode.peers["peer1"].peers["peer2"] = testNode.peers["peer2"]
// connect peer2 with peer3
testNode.peers["peer2"].peers["peer3"] = testNode.peers["peer3"]
return testNode
}
func TestNodeId(t *testing.T) {
node := testSetup()
if node.Id() != testNodeId {
t.Errorf("Expected id: %s, found: %s", testNodeId, node.Id())
}
}
func TestNodeAddress(t *testing.T) {
node := testSetup()
if node.Address() != testNodeAddress {
t.Errorf("Expected address: %s, found: %s", testNodeAddress, node.Address())
}
}
func TestNodeNetwork(t *testing.T) {
node := testSetup()
if node.Network().Name() != testNodeNetName {
t.Errorf("Expected network: %s, found: %s", testNodeNetName, node.Network().Name())
}
}
func TestNodes(t *testing.T) {
// single node
single := &node{
id: testNodeId,
address: testNodeAddress,
peers: make(map[string]*node),
network: newNetwork(Name(testNodeNetName)),
}
// get all the nodes including yourself
nodes := single.Nodes()
nodeCount := 1
if len(nodes) != nodeCount {
t.Errorf("Expected to find %d nodes, found: %d", nodeCount, len(nodes))
}
// complicated node graph
node := testSetup()
// get all the nodes including yourself
nodes = node.Nodes()
// compile a list of ids of all nodes in the network into map for easy indexing
nodeIds := make(map[string]bool)
// add yourself
nodeIds[node.id] = true
// add peer Ids
for _, id := range testNodePeerIds {
nodeIds[id] = true
}
// add peer1 peers i.e. peers of peer
for _, id := range testPeerOfPeerIds {
nodeIds[id] = true
}
// we should return the correct number of nodes
if len(nodes) != len(nodeIds) {
t.Errorf("Expected %d nodes, found: %d", len(nodeIds), len(nodes))
}
// iterate through the list of nodes and makes sure all have been returned
for _, node := range nodes {
if _, ok := nodeIds[node.Id()]; !ok {
t.Errorf("Expected to find %s node", node.Id())
}
}
// this is a leaf node
id := "peer11"
if nodePeer := node.GetPeerNode(id); nodePeer == nil {
t.Errorf("Expected to find %s node", id)
}
}
func collectPeerIds(peer Node, ids map[string]bool) map[string]bool {
if len(peer.Peers()) == 0 {
return ids
}
// iterate through the whole graph
for _, peer := range peer.Peers() {
ids = collectPeerIds(peer, ids)
if _, ok := ids[peer.Id()]; !ok {
ids[peer.Id()] = true
}
}
return ids
}
func TestPeers(t *testing.T) {
// single node
single := &node{
id: testNodeId,
address: testNodeAddress,
peers: make(map[string]*node),
network: newNetwork(Name(testNodeNetName)),
}
// get node peers
peers := single.Peers()
// there should be no peers
peerCount := 0
if len(peers) != peerCount {
t.Errorf("Expected to find %d nodes, found: %d", peerCount, len(peers))
}
// complicated node graph
node := testSetup()
// list of ids of nodes of MaxDepth
peerIds := make(map[string]bool)
// add peer Ids
for _, id := range testNodePeerIds {
peerIds[id] = true
}
// add peers of peers to peerIds
for _, id := range testPeerOfPeerIds {
peerIds[id] = true
}
// get node peers
peers = node.Peers()
// we will collect all returned Peer Ids into this map
resPeerIds := make(map[string]bool)
for _, peer := range peers {
resPeerIds[peer.Id()] = true
resPeerIds = collectPeerIds(peer, resPeerIds)
}
// if correct, we must collect all peerIds
if len(resPeerIds) != len(peerIds) {
t.Errorf("Expected to find %d peers, found: %d", len(peerIds), len(resPeerIds))
}
for id := range resPeerIds {
if _, ok := peerIds[id]; !ok {
t.Errorf("Expected to find %s peer", id)
}
}
}
func TestUnpackPeerTopology(t *testing.T) {
pbPeer := &pb.Peer{
Node: &pb.Node{
Id: "newPeer",
Address: "newPeerAddress",
},
Peers: make([]*pb.Peer, 0),
}
// it should add pbPeer to the single node peers
peer := UnpackPeerTopology(pbPeer, time.Now(), 5)
if peer.id != pbPeer.Node.Id {
t.Errorf("Expected peer id %s, found: %s", pbPeer.Node.Id, peer.id)
}
node := testSetup()
// build a simple topology to update node peer1
peer1 := node.peers["peer1"]
pbPeer1Node := &pb.Node{
Id: peer1.id,
Address: peer1.address,
}
pbPeer111 := &pb.Peer{
Node: &pb.Node{
Id: "peer111",
Address: "peer111Address",
},
Peers: make([]*pb.Peer, 0),
}
pbPeer121 := &pb.Peer{
Node: &pb.Node{
Id: "peer121",
Address: "peer121Address",
},
Peers: make([]*pb.Peer, 0),
}
// topology to update
pbPeer1 := &pb.Peer{
Node: pbPeer1Node,
Peers: []*pb.Peer{pbPeer111, pbPeer121},
}
// unpack peer1 topology
peer = UnpackPeerTopology(pbPeer1, time.Now(), 5)
// make sure peer1 topology has been correctly updated
newPeerIds := []string{pbPeer111.Node.Id, pbPeer121.Node.Id}
for _, id := range newPeerIds {
if _, ok := peer.peers[id]; !ok {
t.Errorf("Expected %s to be a peer of %s", id, "peer1")
}
}
}
func TestPeersToProto(t *testing.T) {
// single node
single := &node{
id: testNodeId,
address: testNodeAddress,
peers: make(map[string]*node),
network: newNetwork(Name(testNodeNetName)),
}
topCount := 0
protoPeers := PeersToProto(single, 0)
if len(protoPeers.Peers) != topCount {
t.Errorf("Expected to find %d nodes, found: %d", topCount, len(protoPeers.Peers))
}
// complicated node graph
node := testSetup()
topCount = 3
// list of ids of nodes of depth 1 i.e. node peers
peerIds := make(map[string]bool)
// add peer Ids
for _, id := range testNodePeerIds {
peerIds[id] = true
}
// depth 1 should give us immmediate neighbours only
protoPeers = PeersToProto(node, 1)
if len(protoPeers.Peers) != topCount {
t.Errorf("Expected to find %d nodes, found: %d", topCount, len(protoPeers.Peers))
}
}

View File

@ -35,9 +35,8 @@ var _ server.Option
// Client API for Network service // Client API for Network service
type NetworkService interface { type NetworkService interface {
ListPeers(ctx context.Context, in *PeerRequest, opts ...client.CallOption) (*PeerResponse, error)
ListRoutes(ctx context.Context, in *proto1.Request, opts ...client.CallOption) (*proto1.ListResponse, error) ListRoutes(ctx context.Context, in *proto1.Request, opts ...client.CallOption) (*proto1.ListResponse, error)
ListNodes(ctx context.Context, in *ListRequest, opts ...client.CallOption) (*ListResponse, error)
Neighbourhood(ctx context.Context, in *NeighbourhoodRequest, opts ...client.CallOption) (*NeighbourhoodResponse, error)
} }
type networkService struct { type networkService struct {
@ -58,6 +57,16 @@ func NewNetworkService(name string, c client.Client) NetworkService {
} }
} }
func (c *networkService) ListPeers(ctx context.Context, in *PeerRequest, opts ...client.CallOption) (*PeerResponse, error) {
req := c.c.NewRequest(c.name, "Network.ListPeers", in)
out := new(PeerResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *networkService) ListRoutes(ctx context.Context, in *proto1.Request, opts ...client.CallOption) (*proto1.ListResponse, error) { func (c *networkService) ListRoutes(ctx context.Context, in *proto1.Request, opts ...client.CallOption) (*proto1.ListResponse, error) {
req := c.c.NewRequest(c.name, "Network.ListRoutes", in) req := c.c.NewRequest(c.name, "Network.ListRoutes", in)
out := new(proto1.ListResponse) out := new(proto1.ListResponse)
@ -68,39 +77,17 @@ func (c *networkService) ListRoutes(ctx context.Context, in *proto1.Request, opt
return out, nil return out, nil
} }
func (c *networkService) ListNodes(ctx context.Context, in *ListRequest, opts ...client.CallOption) (*ListResponse, error) {
req := c.c.NewRequest(c.name, "Network.ListNodes", in)
out := new(ListResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *networkService) Neighbourhood(ctx context.Context, in *NeighbourhoodRequest, opts ...client.CallOption) (*NeighbourhoodResponse, error) {
req := c.c.NewRequest(c.name, "Network.Neighbourhood", in)
out := new(NeighbourhoodResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for Network service // Server API for Network service
type NetworkHandler interface { type NetworkHandler interface {
ListPeers(context.Context, *PeerRequest, *PeerResponse) error
ListRoutes(context.Context, *proto1.Request, *proto1.ListResponse) error ListRoutes(context.Context, *proto1.Request, *proto1.ListResponse) error
ListNodes(context.Context, *ListRequest, *ListResponse) error
Neighbourhood(context.Context, *NeighbourhoodRequest, *NeighbourhoodResponse) error
} }
func RegisterNetworkHandler(s server.Server, hdlr NetworkHandler, opts ...server.HandlerOption) error { func RegisterNetworkHandler(s server.Server, hdlr NetworkHandler, opts ...server.HandlerOption) error {
type network interface { type network interface {
ListPeers(ctx context.Context, in *PeerRequest, out *PeerResponse) error
ListRoutes(ctx context.Context, in *proto1.Request, out *proto1.ListResponse) error ListRoutes(ctx context.Context, in *proto1.Request, out *proto1.ListResponse) error
ListNodes(ctx context.Context, in *ListRequest, out *ListResponse) error
Neighbourhood(ctx context.Context, in *NeighbourhoodRequest, out *NeighbourhoodResponse) error
} }
type Network struct { type Network struct {
network network
@ -113,14 +100,10 @@ type networkHandler struct {
NetworkHandler NetworkHandler
} }
func (h *networkHandler) ListPeers(ctx context.Context, in *PeerRequest, out *PeerResponse) error {
return h.NetworkHandler.ListPeers(ctx, in, out)
}
func (h *networkHandler) ListRoutes(ctx context.Context, in *proto1.Request, out *proto1.ListResponse) error { func (h *networkHandler) ListRoutes(ctx context.Context, in *proto1.Request, out *proto1.ListResponse) error {
return h.NetworkHandler.ListRoutes(ctx, in, out) return h.NetworkHandler.ListRoutes(ctx, in, out)
} }
func (h *networkHandler) ListNodes(ctx context.Context, in *ListRequest, out *ListResponse) error {
return h.NetworkHandler.ListNodes(ctx, in, out)
}
func (h *networkHandler) Neighbourhood(ctx context.Context, in *NeighbourhoodRequest, out *NeighbourhoodResponse) error {
return h.NetworkHandler.Neighbourhood(ctx, in, out)
}

View File

@ -21,161 +21,91 @@ var _ = math.Inf
// proto package needs to be updated. // proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
// Empty request // PeerRequest requests list of peers
type ListRequest struct { type PeerRequest struct {
// node topology depth
Depth uint32 `protobuf:"varint,1,opt,name=depth,proto3" json:"depth,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
} }
func (m *ListRequest) Reset() { *m = ListRequest{} } func (m *PeerRequest) Reset() { *m = PeerRequest{} }
func (m *ListRequest) String() string { return proto.CompactTextString(m) } func (m *PeerRequest) String() string { return proto.CompactTextString(m) }
func (*ListRequest) ProtoMessage() {} func (*PeerRequest) ProtoMessage() {}
func (*ListRequest) Descriptor() ([]byte, []int) { func (*PeerRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_8571034d60397816, []int{0} return fileDescriptor_8571034d60397816, []int{0}
} }
func (m *ListRequest) XXX_Unmarshal(b []byte) error { func (m *PeerRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ListRequest.Unmarshal(m, b) return xxx_messageInfo_PeerRequest.Unmarshal(m, b)
} }
func (m *ListRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { func (m *PeerRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ListRequest.Marshal(b, m, deterministic) return xxx_messageInfo_PeerRequest.Marshal(b, m, deterministic)
} }
func (m *ListRequest) XXX_Merge(src proto.Message) { func (m *PeerRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_ListRequest.Merge(m, src) xxx_messageInfo_PeerRequest.Merge(m, src)
} }
func (m *ListRequest) XXX_Size() int { func (m *PeerRequest) XXX_Size() int {
return xxx_messageInfo_ListRequest.Size(m) return xxx_messageInfo_PeerRequest.Size(m)
} }
func (m *ListRequest) XXX_DiscardUnknown() { func (m *PeerRequest) XXX_DiscardUnknown() {
xxx_messageInfo_ListRequest.DiscardUnknown(m) xxx_messageInfo_PeerRequest.DiscardUnknown(m)
} }
var xxx_messageInfo_ListRequest proto.InternalMessageInfo var xxx_messageInfo_PeerRequest proto.InternalMessageInfo
// ListResponse is returned by ListNodes and ListNeighbours func (m *PeerRequest) GetDepth() uint32 {
type ListResponse struct { if m != nil {
Nodes []*Node `protobuf:"bytes,1,rep,name=nodes,proto3" json:"nodes,omitempty"` return m.Depth
}
return 0
}
// PeerResponse is returned by ListPeers
type PeerResponse struct {
// return peer topology
Peers *Peer `protobuf:"bytes,1,opt,name=peers,proto3" json:"peers,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
} }
func (m *ListResponse) Reset() { *m = ListResponse{} } func (m *PeerResponse) Reset() { *m = PeerResponse{} }
func (m *ListResponse) String() string { return proto.CompactTextString(m) } func (m *PeerResponse) String() string { return proto.CompactTextString(m) }
func (*ListResponse) ProtoMessage() {} func (*PeerResponse) ProtoMessage() {}
func (*ListResponse) Descriptor() ([]byte, []int) { func (*PeerResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_8571034d60397816, []int{1} return fileDescriptor_8571034d60397816, []int{1}
} }
func (m *ListResponse) XXX_Unmarshal(b []byte) error { func (m *PeerResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ListResponse.Unmarshal(m, b) return xxx_messageInfo_PeerResponse.Unmarshal(m, b)
} }
func (m *ListResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { func (m *PeerResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ListResponse.Marshal(b, m, deterministic) return xxx_messageInfo_PeerResponse.Marshal(b, m, deterministic)
} }
func (m *ListResponse) XXX_Merge(src proto.Message) { func (m *PeerResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_ListResponse.Merge(m, src) xxx_messageInfo_PeerResponse.Merge(m, src)
} }
func (m *ListResponse) XXX_Size() int { func (m *PeerResponse) XXX_Size() int {
return xxx_messageInfo_ListResponse.Size(m) return xxx_messageInfo_PeerResponse.Size(m)
} }
func (m *ListResponse) XXX_DiscardUnknown() { func (m *PeerResponse) XXX_DiscardUnknown() {
xxx_messageInfo_ListResponse.DiscardUnknown(m) xxx_messageInfo_PeerResponse.DiscardUnknown(m)
} }
var xxx_messageInfo_ListResponse proto.InternalMessageInfo var xxx_messageInfo_PeerResponse proto.InternalMessageInfo
func (m *ListResponse) GetNodes() []*Node { func (m *PeerResponse) GetPeers() *Peer {
if m != nil { if m != nil {
return m.Nodes return m.Peers
}
return nil
}
// NeighbourhoodRequest is sent to query node neighbourhood
type NeighbourhoodRequest struct {
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *NeighbourhoodRequest) Reset() { *m = NeighbourhoodRequest{} }
func (m *NeighbourhoodRequest) String() string { return proto.CompactTextString(m) }
func (*NeighbourhoodRequest) ProtoMessage() {}
func (*NeighbourhoodRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_8571034d60397816, []int{2}
}
func (m *NeighbourhoodRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_NeighbourhoodRequest.Unmarshal(m, b)
}
func (m *NeighbourhoodRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_NeighbourhoodRequest.Marshal(b, m, deterministic)
}
func (m *NeighbourhoodRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_NeighbourhoodRequest.Merge(m, src)
}
func (m *NeighbourhoodRequest) XXX_Size() int {
return xxx_messageInfo_NeighbourhoodRequest.Size(m)
}
func (m *NeighbourhoodRequest) XXX_DiscardUnknown() {
xxx_messageInfo_NeighbourhoodRequest.DiscardUnknown(m)
}
var xxx_messageInfo_NeighbourhoodRequest proto.InternalMessageInfo
func (m *NeighbourhoodRequest) GetId() string {
if m != nil {
return m.Id
}
return ""
}
// NeighbourhoodResponse contains node neighbourhood hierarchy
type NeighbourhoodResponse struct {
Neighbourhood *Neighbour `protobuf:"bytes,1,opt,name=neighbourhood,proto3" json:"neighbourhood,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *NeighbourhoodResponse) Reset() { *m = NeighbourhoodResponse{} }
func (m *NeighbourhoodResponse) String() string { return proto.CompactTextString(m) }
func (*NeighbourhoodResponse) ProtoMessage() {}
func (*NeighbourhoodResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_8571034d60397816, []int{3}
}
func (m *NeighbourhoodResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_NeighbourhoodResponse.Unmarshal(m, b)
}
func (m *NeighbourhoodResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_NeighbourhoodResponse.Marshal(b, m, deterministic)
}
func (m *NeighbourhoodResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_NeighbourhoodResponse.Merge(m, src)
}
func (m *NeighbourhoodResponse) XXX_Size() int {
return xxx_messageInfo_NeighbourhoodResponse.Size(m)
}
func (m *NeighbourhoodResponse) XXX_DiscardUnknown() {
xxx_messageInfo_NeighbourhoodResponse.DiscardUnknown(m)
}
var xxx_messageInfo_NeighbourhoodResponse proto.InternalMessageInfo
func (m *NeighbourhoodResponse) GetNeighbourhood() *Neighbour {
if m != nil {
return m.Neighbourhood
} }
return nil return nil
} }
// Node is network node // Node is network node
type Node struct { type Node struct {
// node ide // node id
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
// node address // node address
Address string `protobuf:"bytes,2,opt,name=address,proto3" json:"address,omitempty"` Address string `protobuf:"bytes,2,opt,name=address,proto3" json:"address,omitempty"`
@ -188,7 +118,7 @@ func (m *Node) Reset() { *m = Node{} }
func (m *Node) String() string { return proto.CompactTextString(m) } func (m *Node) String() string { return proto.CompactTextString(m) }
func (*Node) ProtoMessage() {} func (*Node) ProtoMessage() {}
func (*Node) Descriptor() ([]byte, []int) { func (*Node) Descriptor() ([]byte, []int) {
return fileDescriptor_8571034d60397816, []int{4} return fileDescriptor_8571034d60397816, []int{2}
} }
func (m *Node) XXX_Unmarshal(b []byte) error { func (m *Node) XXX_Unmarshal(b []byte) error {
@ -236,7 +166,7 @@ func (m *Connect) Reset() { *m = Connect{} }
func (m *Connect) String() string { return proto.CompactTextString(m) } func (m *Connect) String() string { return proto.CompactTextString(m) }
func (*Connect) ProtoMessage() {} func (*Connect) ProtoMessage() {}
func (*Connect) Descriptor() ([]byte, []int) { func (*Connect) Descriptor() ([]byte, []int) {
return fileDescriptor_8571034d60397816, []int{5} return fileDescriptor_8571034d60397816, []int{3}
} }
func (m *Connect) XXX_Unmarshal(b []byte) error { func (m *Connect) XXX_Unmarshal(b []byte) error {
@ -277,7 +207,7 @@ func (m *Close) Reset() { *m = Close{} }
func (m *Close) String() string { return proto.CompactTextString(m) } func (m *Close) String() string { return proto.CompactTextString(m) }
func (*Close) ProtoMessage() {} func (*Close) ProtoMessage() {}
func (*Close) Descriptor() ([]byte, []int) { func (*Close) Descriptor() ([]byte, []int) {
return fileDescriptor_8571034d60397816, []int{6} return fileDescriptor_8571034d60397816, []int{4}
} }
func (m *Close) XXX_Unmarshal(b []byte) error { func (m *Close) XXX_Unmarshal(b []byte) error {
@ -305,134 +235,86 @@ func (m *Close) GetNode() *Node {
return nil return nil
} }
// Solicit is sent when requesting route advertisement from the network nodes // Peer is used to advertise node peers
type Solicit struct { type Peer struct {
// network node // network node
Node *Node `protobuf:"bytes,1,opt,name=node,proto3" json:"node,omitempty"` Node *Node `protobuf:"bytes,1,opt,name=node,proto3" json:"node,omitempty"`
// node peers
Peers []*Peer `protobuf:"bytes,2,rep,name=peers,proto3" json:"peers,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
} }
func (m *Solicit) Reset() { *m = Solicit{} } func (m *Peer) Reset() { *m = Peer{} }
func (m *Solicit) String() string { return proto.CompactTextString(m) } func (m *Peer) String() string { return proto.CompactTextString(m) }
func (*Solicit) ProtoMessage() {} func (*Peer) ProtoMessage() {}
func (*Solicit) Descriptor() ([]byte, []int) { func (*Peer) Descriptor() ([]byte, []int) {
return fileDescriptor_8571034d60397816, []int{7} return fileDescriptor_8571034d60397816, []int{5}
} }
func (m *Solicit) XXX_Unmarshal(b []byte) error { func (m *Peer) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Solicit.Unmarshal(m, b) return xxx_messageInfo_Peer.Unmarshal(m, b)
} }
func (m *Solicit) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { func (m *Peer) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Solicit.Marshal(b, m, deterministic) return xxx_messageInfo_Peer.Marshal(b, m, deterministic)
} }
func (m *Solicit) XXX_Merge(src proto.Message) { func (m *Peer) XXX_Merge(src proto.Message) {
xxx_messageInfo_Solicit.Merge(m, src) xxx_messageInfo_Peer.Merge(m, src)
} }
func (m *Solicit) XXX_Size() int { func (m *Peer) XXX_Size() int {
return xxx_messageInfo_Solicit.Size(m) return xxx_messageInfo_Peer.Size(m)
} }
func (m *Solicit) XXX_DiscardUnknown() { func (m *Peer) XXX_DiscardUnknown() {
xxx_messageInfo_Solicit.DiscardUnknown(m) xxx_messageInfo_Peer.DiscardUnknown(m)
} }
var xxx_messageInfo_Solicit proto.InternalMessageInfo var xxx_messageInfo_Peer proto.InternalMessageInfo
func (m *Solicit) GetNode() *Node { func (m *Peer) GetNode() *Node {
if m != nil { if m != nil {
return m.Node return m.Node
} }
return nil return nil
} }
// Neighbour is used to nnounce node neighbourhood func (m *Peer) GetPeers() []*Peer {
type Neighbour struct {
// network node
Node *Node `protobuf:"bytes,1,opt,name=node,proto3" json:"node,omitempty"`
// neighbours
Neighbours []*Node `protobuf:"bytes,3,rep,name=neighbours,proto3" json:"neighbours,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Neighbour) Reset() { *m = Neighbour{} }
func (m *Neighbour) String() string { return proto.CompactTextString(m) }
func (*Neighbour) ProtoMessage() {}
func (*Neighbour) Descriptor() ([]byte, []int) {
return fileDescriptor_8571034d60397816, []int{8}
}
func (m *Neighbour) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Neighbour.Unmarshal(m, b)
}
func (m *Neighbour) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Neighbour.Marshal(b, m, deterministic)
}
func (m *Neighbour) XXX_Merge(src proto.Message) {
xxx_messageInfo_Neighbour.Merge(m, src)
}
func (m *Neighbour) XXX_Size() int {
return xxx_messageInfo_Neighbour.Size(m)
}
func (m *Neighbour) XXX_DiscardUnknown() {
xxx_messageInfo_Neighbour.DiscardUnknown(m)
}
var xxx_messageInfo_Neighbour proto.InternalMessageInfo
func (m *Neighbour) GetNode() *Node {
if m != nil { if m != nil {
return m.Node return m.Peers
}
return nil
}
func (m *Neighbour) GetNeighbours() []*Node {
if m != nil {
return m.Neighbours
} }
return nil return nil
} }
func init() { func init() {
proto.RegisterType((*ListRequest)(nil), "go.micro.network.ListRequest") proto.RegisterType((*PeerRequest)(nil), "go.micro.network.PeerRequest")
proto.RegisterType((*ListResponse)(nil), "go.micro.network.ListResponse") proto.RegisterType((*PeerResponse)(nil), "go.micro.network.PeerResponse")
proto.RegisterType((*NeighbourhoodRequest)(nil), "go.micro.network.NeighbourhoodRequest")
proto.RegisterType((*NeighbourhoodResponse)(nil), "go.micro.network.NeighbourhoodResponse")
proto.RegisterType((*Node)(nil), "go.micro.network.Node") proto.RegisterType((*Node)(nil), "go.micro.network.Node")
proto.RegisterType((*Connect)(nil), "go.micro.network.Connect") proto.RegisterType((*Connect)(nil), "go.micro.network.Connect")
proto.RegisterType((*Close)(nil), "go.micro.network.Close") proto.RegisterType((*Close)(nil), "go.micro.network.Close")
proto.RegisterType((*Solicit)(nil), "go.micro.network.Solicit") proto.RegisterType((*Peer)(nil), "go.micro.network.Peer")
proto.RegisterType((*Neighbour)(nil), "go.micro.network.Neighbour")
} }
func init() { proto.RegisterFile("network.proto", fileDescriptor_8571034d60397816) } func init() { proto.RegisterFile("network.proto", fileDescriptor_8571034d60397816) }
var fileDescriptor_8571034d60397816 = []byte{ var fileDescriptor_8571034d60397816 = []byte{
// 360 bytes of a gzipped FileDescriptorProto // 292 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x53, 0x41, 0x4f, 0xf2, 0x40, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x52, 0xcd, 0x4a, 0xc3, 0x40,
0x10, 0xfd, 0x28, 0xf0, 0x35, 0x0c, 0x1f, 0x5f, 0xcc, 0x46, 0x4d, 0x53, 0x83, 0x21, 0x7b, 0x40, 0x10, 0x36, 0x31, 0x31, 0x74, 0x6a, 0x45, 0x16, 0x91, 0x50, 0xa8, 0x94, 0xf5, 0x22, 0xa2, 0x1b,
0x62, 0xb4, 0x18, 0x08, 0x9e, 0xbc, 0x18, 0x0e, 0x5e, 0x08, 0x87, 0x7a, 0xf3, 0x66, 0xbb, 0x9b, 0x69, 0xf0, 0xe6, 0xad, 0x07, 0x2f, 0xa5, 0x48, 0x9e, 0x40, 0x9b, 0x1d, 0xd2, 0xa0, 0xcd, 0xc4,
0xb2, 0x11, 0x3a, 0xb8, 0xbb, 0x8d, 0x7f, 0xc0, 0x1f, 0x6e, 0xba, 0x5d, 0xb0, 0x80, 0x60, 0xb8, 0xdd, 0x0d, 0xbe, 0x8e, 0x8f, 0x2a, 0xd9, 0x4d, 0x2d, 0x28, 0xa1, 0xf4, 0x96, 0xf9, 0xfe, 0x32,
0x75, 0xe6, 0xbd, 0x37, 0x6f, 0xa7, 0xfb, 0x16, 0x5a, 0x29, 0xd7, 0x1f, 0x28, 0xdf, 0x82, 0xa5, 0xc3, 0xb7, 0x30, 0xaa, 0xd0, 0x7c, 0x91, 0x7a, 0x17, 0xb5, 0x22, 0x43, 0xec, 0xbc, 0x20, 0xb1,
0x44, 0x8d, 0xe4, 0x24, 0xc1, 0x60, 0x21, 0x62, 0x89, 0x81, 0xed, 0xfb, 0xc3, 0x44, 0xe8, 0x59, 0x29, 0x73, 0x45, 0xa2, 0xc3, 0xc7, 0x69, 0x51, 0x9a, 0x75, 0xb3, 0x12, 0x39, 0x6d, 0x12, 0xcb,
0x16, 0x05, 0x31, 0x2e, 0xfa, 0x06, 0xe9, 0x27, 0x78, 0x5b, 0x7c, 0x48, 0xcc, 0x34, 0x97, 0x7d, 0x24, 0x05, 0xdd, 0xbb, 0x0f, 0x45, 0x8d, 0x41, 0x95, 0x58, 0x67, 0x37, 0xb8, 0x18, 0x7e, 0x0d,
0xa3, 0xb4, 0x45, 0x31, 0x86, 0xb6, 0xa0, 0x39, 0x11, 0x4a, 0x87, 0xfc, 0x3d, 0xe3, 0x4a, 0xd3, 0xc3, 0x17, 0x44, 0x95, 0xe1, 0x67, 0x83, 0xda, 0xb0, 0x0b, 0x08, 0x25, 0xd6, 0x66, 0x1d, 0x7b,
0x07, 0xf8, 0x57, 0x94, 0x6a, 0x89, 0xa9, 0xe2, 0xe4, 0x06, 0xea, 0x29, 0x32, 0xae, 0xbc, 0x4a, 0x53, 0xef, 0x66, 0x94, 0xb9, 0x81, 0x3f, 0xc1, 0xa9, 0x13, 0xe9, 0x9a, 0x2a, 0x8d, 0xec, 0x0e,
0xa7, 0xda, 0x6b, 0x0e, 0xce, 0x83, 0x6d, 0xd7, 0x60, 0x8a, 0x8c, 0x87, 0x05, 0x89, 0x76, 0xe1, 0xc2, 0x1a, 0x51, 0x69, 0xab, 0x1a, 0xce, 0x2e, 0xc5, 0xdf, 0x5d, 0x84, 0x95, 0x3b, 0x11, 0x7f,
0x74, 0xca, 0x45, 0x32, 0x8b, 0x30, 0x93, 0x33, 0x44, 0x66, 0xa7, 0x92, 0xff, 0xe0, 0x08, 0xe6, 0x80, 0x60, 0x49, 0x12, 0xd9, 0x19, 0xf8, 0xa5, 0xb4, 0x96, 0x41, 0xe6, 0x97, 0x92, 0xc5, 0x10,
0x55, 0x3a, 0x95, 0x5e, 0x23, 0x74, 0x04, 0xa3, 0x2f, 0x70, 0xb6, 0xc5, 0xb3, 0x76, 0x8f, 0xf9, 0xbd, 0x49, 0xa9, 0x50, 0xeb, 0xd8, 0xb7, 0xe0, 0x76, 0xe4, 0x8f, 0x10, 0xcd, 0xa9, 0xaa, 0x30,
0x96, 0x25, 0xc0, 0x68, 0x9a, 0x83, 0x8b, 0x1f, 0x6c, 0x57, 0xb4, 0x70, 0x53, 0x41, 0xef, 0xa0, 0x37, 0xec, 0x16, 0x82, 0x8a, 0x24, 0xf6, 0xff, 0xa9, 0x8d, 0xce, 0xac, 0x86, 0xa7, 0x10, 0xce,
0x96, 0x1f, 0x69, 0xdb, 0x93, 0x78, 0xe0, 0xbe, 0x32, 0x26, 0xb9, 0x52, 0x9e, 0x63, 0x9a, 0xab, 0x3f, 0x48, 0xe3, 0x41, 0xa6, 0x57, 0x08, 0xda, 0x65, 0x0f, 0xf1, 0xec, 0xee, 0xf7, 0xa7, 0xc7,
0x92, 0x8e, 0xc0, 0x1d, 0x63, 0x9a, 0xf2, 0x58, 0x93, 0x6b, 0xa8, 0xe5, 0x9b, 0x58, 0xdb, 0x7d, 0x7b, 0xef, 0x9f, 0x7d, 0x7b, 0x10, 0x2d, 0x1d, 0xce, 0x16, 0x30, 0x58, 0x94, 0xda, 0xb4, 0xb4,
0xdb, 0x1a, 0x0e, 0x1d, 0x42, 0x7d, 0x3c, 0x47, 0xc5, 0x8f, 0x12, 0x8d, 0xc0, 0x7d, 0xc6, 0xb9, 0x66, 0x93, 0x1e, 0x9f, 0xeb, 0x62, 0x7c, 0xd5, 0x47, 0xbb, 0x16, 0xf8, 0x11, 0x7b, 0x06, 0x68,
0x88, 0xc5, 0x71, 0x5e, 0x08, 0x8d, 0xf5, 0xc2, 0xc7, 0x08, 0xc9, 0x3d, 0xc0, 0xfa, 0xf7, 0x28, 0xd3, 0xb2, 0xb6, 0x50, 0xcd, 0xe2, 0x9d, 0xbe, 0xab, 0x78, 0x9b, 0x34, 0xf9, 0xc7, 0x58, 0xdb,
0xaf, 0x7a, 0xf0, 0x12, 0x4b, 0xcc, 0xc1, 0xa7, 0x03, 0xee, 0xb4, 0x00, 0xc9, 0x13, 0x80, 0xc9, 0x6f, 0xd0, 0xea, 0xc4, 0x3e, 0x86, 0xf4, 0x27, 0x00, 0x00, 0xff, 0xff, 0x68, 0x11, 0x14, 0x79,
0x44, 0x1e, 0x1b, 0x45, 0xbc, 0x6f, 0xb5, 0x0d, 0x92, 0xbd, 0x65, 0xbf, 0xbd, 0x83, 0x94, 0xa3, 0x64, 0x02, 0x00, 0x00,
0x44, 0xff, 0x90, 0x09, 0x34, 0xf2, 0x4e, 0x6e, 0xa6, 0x48, 0x7b, 0xf7, 0x14, 0xa5, 0x20, 0xfa,
0x97, 0xfb, 0xe0, 0xf5, 0xb4, 0x08, 0x5a, 0x1b, 0x21, 0x22, 0xdd, 0x03, 0x29, 0x29, 0xa5, 0xd1,
0xbf, 0xfa, 0x95, 0xb7, 0xf2, 0x88, 0xfe, 0x9a, 0x47, 0x32, 0xfc, 0x0a, 0x00, 0x00, 0xff, 0xff,
0x59, 0xcf, 0xab, 0xb5, 0x7c, 0x03, 0x00, 0x00,
} }

View File

@ -6,32 +6,25 @@ import "github.com/micro/go-micro/router/proto/router.proto";
// Network service is usesd to gain visibility into networks // Network service is usesd to gain visibility into networks
service Network { service Network {
rpc ListPeers(PeerRequest) returns (PeerResponse) {};
rpc ListRoutes(go.micro.router.Request) returns (go.micro.router.ListResponse) {}; rpc ListRoutes(go.micro.router.Request) returns (go.micro.router.ListResponse) {};
rpc ListNodes(ListRequest) returns (ListResponse) {};
rpc Neighbourhood(NeighbourhoodRequest) returns (NeighbourhoodResponse) {};
} }
// Empty request // PeerRequest requests list of peers
message ListRequest {} message PeerRequest {
// node topology depth
// ListResponse is returned by ListNodes and ListNeighbours uint32 depth = 1;
message ListResponse {
repeated Node nodes = 1;
} }
// NeighbourhoodRequest is sent to query node neighbourhood // PeerResponse is returned by ListPeers
message NeighbourhoodRequest { message PeerResponse {
string id = 1; // return peer topology
} Peer peers = 1;
// NeighbourhoodResponse contains node neighbourhood hierarchy
message NeighbourhoodResponse {
Neighbour neighbourhood = 1;
} }
// Node is network node // Node is network node
message Node { message Node {
// node ide // node id
string id = 1; string id = 1;
// node address // node address
string address = 2; string address = 2;
@ -49,16 +42,10 @@ message Close {
Node node = 1; Node node = 1;
} }
// Solicit is sent when requesting route advertisement from the network nodes // Peer is used to advertise node peers
message Solicit { message Peer {
// network node // network node
Node node = 1; Node node = 1;
} // node peers
repeated Peer peers = 2;
// Neighbour is used to nnounce node neighbourhood
message Neighbour {
// network node
Node node = 1;
// neighbours
repeated Node neighbours = 3;
} }

View File

@ -337,6 +337,7 @@ func (r *router) advertiseTable() error {
// advertise all routes as Update events to subscribers // advertise all routes as Update events to subscribers
if len(events) > 0 { if len(events) > 0 {
log.Debugf("Network router flushing table with %d events: %s", len(events), r.options.Id)
r.advertWg.Add(1) r.advertWg.Add(1)
go r.publishAdvert(RouteUpdate, events) go r.publishAdvert(RouteUpdate, events)
} }
@ -668,11 +669,13 @@ func (r *router) Process(a *Advert) error {
for _, event := range events { for _, event := range events {
// skip if the router is the origin of this route // skip if the router is the origin of this route
if event.Route.Router == r.options.Id { if event.Route.Router == r.options.Id {
log.Debugf("Network router skipping processing its own route: %s", r.options.Id)
continue continue
} }
// create a copy of the route // create a copy of the route
route := event.Route route := event.Route
action := event.Type action := event.Type
log.Debugf("Network router processing route action %s: %s", action, r.options.Id)
if err := r.manageRoute(route, fmt.Sprintf("%s", action)); err != nil { if err := r.manageRoute(route, fmt.Sprintf("%s", action)); err != nil {
return fmt.Errorf("failed applying action %s to routing table: %s", action, err) return fmt.Errorf("failed applying action %s to routing table: %s", action, err)
} }

View File

@ -45,6 +45,16 @@ func (r *Router) Lookup(ctx context.Context, req *pb.LookupRequest, resp *pb.Loo
return nil return nil
} }
// Solicit triggers full routing table advertisement
func (r *Router) Solicit(ctx context.Context, req *pb.Request, resp *pb.Response) error {
if err := r.Router.Solicit(); err != nil {
return err
}
return nil
}
// Advertise streams router advertisements
func (r *Router) Advertise(ctx context.Context, req *pb.Request, stream pb.Router_AdvertiseStream) error { func (r *Router) Advertise(ctx context.Context, req *pb.Request, stream pb.Router_AdvertiseStream) error {
advertChan, err := r.Router.Advertise() advertChan, err := r.Router.Advertise()
if err != nil { if err != nil {
@ -91,6 +101,7 @@ func (r *Router) Advertise(ctx context.Context, req *pb.Request, stream pb.Route
return nil return nil
} }
// Process processes advertisements
func (r *Router) Process(ctx context.Context, req *pb.Advert, rsp *pb.ProcessResponse) error { func (r *Router) Process(ctx context.Context, req *pb.Advert, rsp *pb.ProcessResponse) error {
events := make([]*router.Event, len(req.Events)) events := make([]*router.Event, len(req.Events))
for i, event := range req.Events { for i, event := range req.Events {
@ -126,6 +137,7 @@ func (r *Router) Process(ctx context.Context, req *pb.Advert, rsp *pb.ProcessRes
return nil return nil
} }
// Status returns router status
func (r *Router) Status(ctx context.Context, req *pb.Request, rsp *pb.StatusResponse) error { func (r *Router) Status(ctx context.Context, req *pb.Request, rsp *pb.StatusResponse) error {
status := r.Router.Status() status := r.Router.Status()

View File

@ -1,5 +1,5 @@
// Code generated by protoc-gen-micro. DO NOT EDIT. // Code generated by protoc-gen-micro. DO NOT EDIT.
// source: micro/go-micro/router/proto/router.proto // source: router.proto
package go_micro_router package go_micro_router
@ -37,6 +37,7 @@ type RouterService interface {
Lookup(ctx context.Context, in *LookupRequest, opts ...client.CallOption) (*LookupResponse, error) Lookup(ctx context.Context, in *LookupRequest, opts ...client.CallOption) (*LookupResponse, error)
Watch(ctx context.Context, in *WatchRequest, opts ...client.CallOption) (Router_WatchService, error) Watch(ctx context.Context, in *WatchRequest, opts ...client.CallOption) (Router_WatchService, error)
Advertise(ctx context.Context, in *Request, opts ...client.CallOption) (Router_AdvertiseService, error) Advertise(ctx context.Context, in *Request, opts ...client.CallOption) (Router_AdvertiseService, error)
Solicit(ctx context.Context, in *Request, opts ...client.CallOption) (*Response, error)
Process(ctx context.Context, in *Advert, opts ...client.CallOption) (*ProcessResponse, error) Process(ctx context.Context, in *Advert, opts ...client.CallOption) (*ProcessResponse, error)
Status(ctx context.Context, in *Request, opts ...client.CallOption) (*StatusResponse, error) Status(ctx context.Context, in *Request, opts ...client.CallOption) (*StatusResponse, error)
} }
@ -157,6 +158,16 @@ func (x *routerServiceAdvertise) Recv() (*Advert, error) {
return m, nil return m, nil
} }
func (c *routerService) Solicit(ctx context.Context, in *Request, opts ...client.CallOption) (*Response, error) {
req := c.c.NewRequest(c.name, "Router.Solicit", in)
out := new(Response)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *routerService) Process(ctx context.Context, in *Advert, opts ...client.CallOption) (*ProcessResponse, error) { func (c *routerService) Process(ctx context.Context, in *Advert, opts ...client.CallOption) (*ProcessResponse, error) {
req := c.c.NewRequest(c.name, "Router.Process", in) req := c.c.NewRequest(c.name, "Router.Process", in)
out := new(ProcessResponse) out := new(ProcessResponse)
@ -183,6 +194,7 @@ type RouterHandler interface {
Lookup(context.Context, *LookupRequest, *LookupResponse) error Lookup(context.Context, *LookupRequest, *LookupResponse) error
Watch(context.Context, *WatchRequest, Router_WatchStream) error Watch(context.Context, *WatchRequest, Router_WatchStream) error
Advertise(context.Context, *Request, Router_AdvertiseStream) error Advertise(context.Context, *Request, Router_AdvertiseStream) error
Solicit(context.Context, *Request, *Response) error
Process(context.Context, *Advert, *ProcessResponse) error Process(context.Context, *Advert, *ProcessResponse) error
Status(context.Context, *Request, *StatusResponse) error Status(context.Context, *Request, *StatusResponse) error
} }
@ -192,6 +204,7 @@ func RegisterRouterHandler(s server.Server, hdlr RouterHandler, opts ...server.H
Lookup(ctx context.Context, in *LookupRequest, out *LookupResponse) error Lookup(ctx context.Context, in *LookupRequest, out *LookupResponse) error
Watch(ctx context.Context, stream server.Stream) error Watch(ctx context.Context, stream server.Stream) error
Advertise(ctx context.Context, stream server.Stream) error Advertise(ctx context.Context, stream server.Stream) error
Solicit(ctx context.Context, in *Request, out *Response) error
Process(ctx context.Context, in *Advert, out *ProcessResponse) error Process(ctx context.Context, in *Advert, out *ProcessResponse) error
Status(ctx context.Context, in *Request, out *StatusResponse) error Status(ctx context.Context, in *Request, out *StatusResponse) error
} }
@ -280,6 +293,10 @@ func (x *routerAdvertiseStream) Send(m *Advert) error {
return x.stream.Send(m) return x.stream.Send(m)
} }
func (h *routerHandler) Solicit(ctx context.Context, in *Request, out *Response) error {
return h.RouterHandler.Solicit(ctx, in, out)
}
func (h *routerHandler) Process(ctx context.Context, in *Advert, out *ProcessResponse) error { func (h *routerHandler) Process(ctx context.Context, in *Advert, out *ProcessResponse) error {
return h.RouterHandler.Process(ctx, in, out) return h.RouterHandler.Process(ctx, in, out)
} }

View File

@ -1,13 +1,11 @@
// Code generated by protoc-gen-go. DO NOT EDIT. // Code generated by protoc-gen-go. DO NOT EDIT.
// source: micro/go-micro/router/proto/router.proto // source: router.proto
package go_micro_router package go_micro_router
import ( import (
context "context"
fmt "fmt" fmt "fmt"
proto "github.com/golang/protobuf/proto" proto "github.com/golang/protobuf/proto"
grpc "google.golang.org/grpc"
math "math" math "math"
) )
@ -45,7 +43,7 @@ func (x AdvertType) String() string {
} }
func (AdvertType) EnumDescriptor() ([]byte, []int) { func (AdvertType) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_6a36eee0b1adf739, []int{0} return fileDescriptor_367072455c71aedc, []int{0}
} }
// EventType defines the type of event // EventType defines the type of event
@ -74,7 +72,7 @@ func (x EventType) String() string {
} }
func (EventType) EnumDescriptor() ([]byte, []int) { func (EventType) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_6a36eee0b1adf739, []int{1} return fileDescriptor_367072455c71aedc, []int{1}
} }
// Empty request // Empty request
@ -88,7 +86,7 @@ func (m *Request) Reset() { *m = Request{} }
func (m *Request) String() string { return proto.CompactTextString(m) } func (m *Request) String() string { return proto.CompactTextString(m) }
func (*Request) ProtoMessage() {} func (*Request) ProtoMessage() {}
func (*Request) Descriptor() ([]byte, []int) { func (*Request) Descriptor() ([]byte, []int) {
return fileDescriptor_6a36eee0b1adf739, []int{0} return fileDescriptor_367072455c71aedc, []int{0}
} }
func (m *Request) XXX_Unmarshal(b []byte) error { func (m *Request) XXX_Unmarshal(b []byte) error {
@ -109,6 +107,38 @@ func (m *Request) XXX_DiscardUnknown() {
var xxx_messageInfo_Request proto.InternalMessageInfo var xxx_messageInfo_Request proto.InternalMessageInfo
// Empty response
type Response struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Response) Reset() { *m = Response{} }
func (m *Response) String() string { return proto.CompactTextString(m) }
func (*Response) ProtoMessage() {}
func (*Response) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{1}
}
func (m *Response) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Response.Unmarshal(m, b)
}
func (m *Response) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Response.Marshal(b, m, deterministic)
}
func (m *Response) XXX_Merge(src proto.Message) {
xxx_messageInfo_Response.Merge(m, src)
}
func (m *Response) XXX_Size() int {
return xxx_messageInfo_Response.Size(m)
}
func (m *Response) XXX_DiscardUnknown() {
xxx_messageInfo_Response.DiscardUnknown(m)
}
var xxx_messageInfo_Response proto.InternalMessageInfo
// ListResponse is returned by List // ListResponse is returned by List
type ListResponse struct { type ListResponse struct {
Routes []*Route `protobuf:"bytes,1,rep,name=routes,proto3" json:"routes,omitempty"` Routes []*Route `protobuf:"bytes,1,rep,name=routes,proto3" json:"routes,omitempty"`
@ -121,7 +151,7 @@ func (m *ListResponse) Reset() { *m = ListResponse{} }
func (m *ListResponse) String() string { return proto.CompactTextString(m) } func (m *ListResponse) String() string { return proto.CompactTextString(m) }
func (*ListResponse) ProtoMessage() {} func (*ListResponse) ProtoMessage() {}
func (*ListResponse) Descriptor() ([]byte, []int) { func (*ListResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_6a36eee0b1adf739, []int{1} return fileDescriptor_367072455c71aedc, []int{2}
} }
func (m *ListResponse) XXX_Unmarshal(b []byte) error { func (m *ListResponse) XXX_Unmarshal(b []byte) error {
@ -161,7 +191,7 @@ func (m *LookupRequest) Reset() { *m = LookupRequest{} }
func (m *LookupRequest) String() string { return proto.CompactTextString(m) } func (m *LookupRequest) String() string { return proto.CompactTextString(m) }
func (*LookupRequest) ProtoMessage() {} func (*LookupRequest) ProtoMessage() {}
func (*LookupRequest) Descriptor() ([]byte, []int) { func (*LookupRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_6a36eee0b1adf739, []int{2} return fileDescriptor_367072455c71aedc, []int{3}
} }
func (m *LookupRequest) XXX_Unmarshal(b []byte) error { func (m *LookupRequest) XXX_Unmarshal(b []byte) error {
@ -201,7 +231,7 @@ func (m *LookupResponse) Reset() { *m = LookupResponse{} }
func (m *LookupResponse) String() string { return proto.CompactTextString(m) } func (m *LookupResponse) String() string { return proto.CompactTextString(m) }
func (*LookupResponse) ProtoMessage() {} func (*LookupResponse) ProtoMessage() {}
func (*LookupResponse) Descriptor() ([]byte, []int) { func (*LookupResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_6a36eee0b1adf739, []int{3} return fileDescriptor_367072455c71aedc, []int{4}
} }
func (m *LookupResponse) XXX_Unmarshal(b []byte) error { func (m *LookupResponse) XXX_Unmarshal(b []byte) error {
@ -229,6 +259,7 @@ func (m *LookupResponse) GetRoutes() []*Route {
return nil return nil
} }
// QueryRequest queries Table for Routes
type QueryRequest struct { type QueryRequest struct {
Query *Query `protobuf:"bytes,1,opt,name=query,proto3" json:"query,omitempty"` Query *Query `protobuf:"bytes,1,opt,name=query,proto3" json:"query,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
@ -240,7 +271,7 @@ func (m *QueryRequest) Reset() { *m = QueryRequest{} }
func (m *QueryRequest) String() string { return proto.CompactTextString(m) } func (m *QueryRequest) String() string { return proto.CompactTextString(m) }
func (*QueryRequest) ProtoMessage() {} func (*QueryRequest) ProtoMessage() {}
func (*QueryRequest) Descriptor() ([]byte, []int) { func (*QueryRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_6a36eee0b1adf739, []int{4} return fileDescriptor_367072455c71aedc, []int{5}
} }
func (m *QueryRequest) XXX_Unmarshal(b []byte) error { func (m *QueryRequest) XXX_Unmarshal(b []byte) error {
@ -268,6 +299,7 @@ func (m *QueryRequest) GetQuery() *Query {
return nil return nil
} }
// QueryResponse is returned by Query
type QueryResponse struct { type QueryResponse struct {
Routes []*Route `protobuf:"bytes,1,rep,name=routes,proto3" json:"routes,omitempty"` Routes []*Route `protobuf:"bytes,1,rep,name=routes,proto3" json:"routes,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
@ -279,7 +311,7 @@ func (m *QueryResponse) Reset() { *m = QueryResponse{} }
func (m *QueryResponse) String() string { return proto.CompactTextString(m) } func (m *QueryResponse) String() string { return proto.CompactTextString(m) }
func (*QueryResponse) ProtoMessage() {} func (*QueryResponse) ProtoMessage() {}
func (*QueryResponse) Descriptor() ([]byte, []int) { func (*QueryResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_6a36eee0b1adf739, []int{5} return fileDescriptor_367072455c71aedc, []int{6}
} }
func (m *QueryResponse) XXX_Unmarshal(b []byte) error { func (m *QueryResponse) XXX_Unmarshal(b []byte) error {
@ -318,7 +350,7 @@ func (m *WatchRequest) Reset() { *m = WatchRequest{} }
func (m *WatchRequest) String() string { return proto.CompactTextString(m) } func (m *WatchRequest) String() string { return proto.CompactTextString(m) }
func (*WatchRequest) ProtoMessage() {} func (*WatchRequest) ProtoMessage() {}
func (*WatchRequest) Descriptor() ([]byte, []int) { func (*WatchRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_6a36eee0b1adf739, []int{6} return fileDescriptor_367072455c71aedc, []int{7}
} }
func (m *WatchRequest) XXX_Unmarshal(b []byte) error { func (m *WatchRequest) XXX_Unmarshal(b []byte) error {
@ -360,7 +392,7 @@ func (m *Advert) Reset() { *m = Advert{} }
func (m *Advert) String() string { return proto.CompactTextString(m) } func (m *Advert) String() string { return proto.CompactTextString(m) }
func (*Advert) ProtoMessage() {} func (*Advert) ProtoMessage() {}
func (*Advert) Descriptor() ([]byte, []int) { func (*Advert) Descriptor() ([]byte, []int) {
return fileDescriptor_6a36eee0b1adf739, []int{7} return fileDescriptor_367072455c71aedc, []int{8}
} }
func (m *Advert) XXX_Unmarshal(b []byte) error { func (m *Advert) XXX_Unmarshal(b []byte) error {
@ -416,6 +448,47 @@ func (m *Advert) GetEvents() []*Event {
return nil return nil
} }
// Solicit solicits routes
type Solicit struct {
// id of the soliciting router
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Solicit) Reset() { *m = Solicit{} }
func (m *Solicit) String() string { return proto.CompactTextString(m) }
func (*Solicit) ProtoMessage() {}
func (*Solicit) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{9}
}
func (m *Solicit) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Solicit.Unmarshal(m, b)
}
func (m *Solicit) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Solicit.Marshal(b, m, deterministic)
}
func (m *Solicit) XXX_Merge(src proto.Message) {
xxx_messageInfo_Solicit.Merge(m, src)
}
func (m *Solicit) XXX_Size() int {
return xxx_messageInfo_Solicit.Size(m)
}
func (m *Solicit) XXX_DiscardUnknown() {
xxx_messageInfo_Solicit.DiscardUnknown(m)
}
var xxx_messageInfo_Solicit proto.InternalMessageInfo
func (m *Solicit) GetId() string {
if m != nil {
return m.Id
}
return ""
}
// ProcessResponse is returned by Process // ProcessResponse is returned by Process
type ProcessResponse struct { type ProcessResponse struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
@ -427,7 +500,7 @@ func (m *ProcessResponse) Reset() { *m = ProcessResponse{} }
func (m *ProcessResponse) String() string { return proto.CompactTextString(m) } func (m *ProcessResponse) String() string { return proto.CompactTextString(m) }
func (*ProcessResponse) ProtoMessage() {} func (*ProcessResponse) ProtoMessage() {}
func (*ProcessResponse) Descriptor() ([]byte, []int) { func (*ProcessResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_6a36eee0b1adf739, []int{8} return fileDescriptor_367072455c71aedc, []int{10}
} }
func (m *ProcessResponse) XXX_Unmarshal(b []byte) error { func (m *ProcessResponse) XXX_Unmarshal(b []byte) error {
@ -459,7 +532,7 @@ func (m *CreateResponse) Reset() { *m = CreateResponse{} }
func (m *CreateResponse) String() string { return proto.CompactTextString(m) } func (m *CreateResponse) String() string { return proto.CompactTextString(m) }
func (*CreateResponse) ProtoMessage() {} func (*CreateResponse) ProtoMessage() {}
func (*CreateResponse) Descriptor() ([]byte, []int) { func (*CreateResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_6a36eee0b1adf739, []int{9} return fileDescriptor_367072455c71aedc, []int{11}
} }
func (m *CreateResponse) XXX_Unmarshal(b []byte) error { func (m *CreateResponse) XXX_Unmarshal(b []byte) error {
@ -491,7 +564,7 @@ func (m *DeleteResponse) Reset() { *m = DeleteResponse{} }
func (m *DeleteResponse) String() string { return proto.CompactTextString(m) } func (m *DeleteResponse) String() string { return proto.CompactTextString(m) }
func (*DeleteResponse) ProtoMessage() {} func (*DeleteResponse) ProtoMessage() {}
func (*DeleteResponse) Descriptor() ([]byte, []int) { func (*DeleteResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_6a36eee0b1adf739, []int{10} return fileDescriptor_367072455c71aedc, []int{12}
} }
func (m *DeleteResponse) XXX_Unmarshal(b []byte) error { func (m *DeleteResponse) XXX_Unmarshal(b []byte) error {
@ -523,7 +596,7 @@ func (m *UpdateResponse) Reset() { *m = UpdateResponse{} }
func (m *UpdateResponse) String() string { return proto.CompactTextString(m) } func (m *UpdateResponse) String() string { return proto.CompactTextString(m) }
func (*UpdateResponse) ProtoMessage() {} func (*UpdateResponse) ProtoMessage() {}
func (*UpdateResponse) Descriptor() ([]byte, []int) { func (*UpdateResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_6a36eee0b1adf739, []int{11} return fileDescriptor_367072455c71aedc, []int{13}
} }
func (m *UpdateResponse) XXX_Unmarshal(b []byte) error { func (m *UpdateResponse) XXX_Unmarshal(b []byte) error {
@ -561,7 +634,7 @@ func (m *Event) Reset() { *m = Event{} }
func (m *Event) String() string { return proto.CompactTextString(m) } func (m *Event) String() string { return proto.CompactTextString(m) }
func (*Event) ProtoMessage() {} func (*Event) ProtoMessage() {}
func (*Event) Descriptor() ([]byte, []int) { func (*Event) Descriptor() ([]byte, []int) {
return fileDescriptor_6a36eee0b1adf739, []int{12} return fileDescriptor_367072455c71aedc, []int{14}
} }
func (m *Event) XXX_Unmarshal(b []byte) error { func (m *Event) XXX_Unmarshal(b []byte) error {
@ -620,7 +693,7 @@ func (m *Query) Reset() { *m = Query{} }
func (m *Query) String() string { return proto.CompactTextString(m) } func (m *Query) String() string { return proto.CompactTextString(m) }
func (*Query) ProtoMessage() {} func (*Query) ProtoMessage() {}
func (*Query) Descriptor() ([]byte, []int) { func (*Query) Descriptor() ([]byte, []int) {
return fileDescriptor_6a36eee0b1adf739, []int{13} return fileDescriptor_367072455c71aedc, []int{15}
} }
func (m *Query) XXX_Unmarshal(b []byte) error { func (m *Query) XXX_Unmarshal(b []byte) error {
@ -687,7 +760,7 @@ func (m *Route) Reset() { *m = Route{} }
func (m *Route) String() string { return proto.CompactTextString(m) } func (m *Route) String() string { return proto.CompactTextString(m) }
func (*Route) ProtoMessage() {} func (*Route) ProtoMessage() {}
func (*Route) Descriptor() ([]byte, []int) { func (*Route) Descriptor() ([]byte, []int) {
return fileDescriptor_6a36eee0b1adf739, []int{14} return fileDescriptor_367072455c71aedc, []int{16}
} }
func (m *Route) XXX_Unmarshal(b []byte) error { func (m *Route) XXX_Unmarshal(b []byte) error {
@ -769,7 +842,7 @@ func (m *Status) Reset() { *m = Status{} }
func (m *Status) String() string { return proto.CompactTextString(m) } func (m *Status) String() string { return proto.CompactTextString(m) }
func (*Status) ProtoMessage() {} func (*Status) ProtoMessage() {}
func (*Status) Descriptor() ([]byte, []int) { func (*Status) Descriptor() ([]byte, []int) {
return fileDescriptor_6a36eee0b1adf739, []int{15} return fileDescriptor_367072455c71aedc, []int{17}
} }
func (m *Status) XXX_Unmarshal(b []byte) error { func (m *Status) XXX_Unmarshal(b []byte) error {
@ -815,7 +888,7 @@ func (m *StatusResponse) Reset() { *m = StatusResponse{} }
func (m *StatusResponse) String() string { return proto.CompactTextString(m) } func (m *StatusResponse) String() string { return proto.CompactTextString(m) }
func (*StatusResponse) ProtoMessage() {} func (*StatusResponse) ProtoMessage() {}
func (*StatusResponse) Descriptor() ([]byte, []int) { func (*StatusResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_6a36eee0b1adf739, []int{16} return fileDescriptor_367072455c71aedc, []int{18}
} }
func (m *StatusResponse) XXX_Unmarshal(b []byte) error { func (m *StatusResponse) XXX_Unmarshal(b []byte) error {
@ -847,6 +920,7 @@ func init() {
proto.RegisterEnum("go.micro.router.AdvertType", AdvertType_name, AdvertType_value) proto.RegisterEnum("go.micro.router.AdvertType", AdvertType_name, AdvertType_value)
proto.RegisterEnum("go.micro.router.EventType", EventType_name, EventType_value) proto.RegisterEnum("go.micro.router.EventType", EventType_name, EventType_value)
proto.RegisterType((*Request)(nil), "go.micro.router.Request") proto.RegisterType((*Request)(nil), "go.micro.router.Request")
proto.RegisterType((*Response)(nil), "go.micro.router.Response")
proto.RegisterType((*ListResponse)(nil), "go.micro.router.ListResponse") proto.RegisterType((*ListResponse)(nil), "go.micro.router.ListResponse")
proto.RegisterType((*LookupRequest)(nil), "go.micro.router.LookupRequest") proto.RegisterType((*LookupRequest)(nil), "go.micro.router.LookupRequest")
proto.RegisterType((*LookupResponse)(nil), "go.micro.router.LookupResponse") proto.RegisterType((*LookupResponse)(nil), "go.micro.router.LookupResponse")
@ -854,6 +928,7 @@ func init() {
proto.RegisterType((*QueryResponse)(nil), "go.micro.router.QueryResponse") proto.RegisterType((*QueryResponse)(nil), "go.micro.router.QueryResponse")
proto.RegisterType((*WatchRequest)(nil), "go.micro.router.WatchRequest") proto.RegisterType((*WatchRequest)(nil), "go.micro.router.WatchRequest")
proto.RegisterType((*Advert)(nil), "go.micro.router.Advert") proto.RegisterType((*Advert)(nil), "go.micro.router.Advert")
proto.RegisterType((*Solicit)(nil), "go.micro.router.Solicit")
proto.RegisterType((*ProcessResponse)(nil), "go.micro.router.ProcessResponse") proto.RegisterType((*ProcessResponse)(nil), "go.micro.router.ProcessResponse")
proto.RegisterType((*CreateResponse)(nil), "go.micro.router.CreateResponse") proto.RegisterType((*CreateResponse)(nil), "go.micro.router.CreateResponse")
proto.RegisterType((*DeleteResponse)(nil), "go.micro.router.DeleteResponse") proto.RegisterType((*DeleteResponse)(nil), "go.micro.router.DeleteResponse")
@ -865,509 +940,53 @@ func init() {
proto.RegisterType((*StatusResponse)(nil), "go.micro.router.StatusResponse") proto.RegisterType((*StatusResponse)(nil), "go.micro.router.StatusResponse")
} }
func init() { func init() { proto.RegisterFile("router.proto", fileDescriptor_367072455c71aedc) }
proto.RegisterFile("micro/go-micro/router/proto/router.proto", fileDescriptor_6a36eee0b1adf739)
}
var fileDescriptor_6a36eee0b1adf739 = []byte{ var fileDescriptor_367072455c71aedc = []byte{
// 699 bytes of a gzipped FileDescriptorProto // 714 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x55, 0xcd, 0x4e, 0xdb, 0x40, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x56, 0xcd, 0x4e, 0xdb, 0x40,
0x10, 0xb6, 0x9d, 0xd8, 0x91, 0xa7, 0xc1, 0xb8, 0xa3, 0x0a, 0xac, 0xb4, 0x40, 0xe4, 0x53, 0x84, 0x10, 0xb6, 0x93, 0xd8, 0x69, 0xa6, 0x21, 0xa4, 0xa3, 0x0a, 0x4c, 0x5a, 0x20, 0xf2, 0x09, 0x21,
0xa8, 0x53, 0xa5, 0xd7, 0xfe, 0x05, 0x4a, 0x55, 0xa9, 0x1c, 0x5a, 0x17, 0xd4, 0xb3, 0xb1, 0x57, 0x64, 0xaa, 0xf4, 0xda, 0x1f, 0x02, 0xa5, 0xaa, 0x54, 0x0e, 0xad, 0x01, 0xf5, 0x6c, 0xec, 0x15,
0xd4, 0x22, 0xb1, 0xcd, 0xee, 0x06, 0x94, 0x73, 0x9f, 0xa6, 0xe7, 0x3e, 0x52, 0xaf, 0x7d, 0x88, 0xb5, 0x48, 0xbc, 0x66, 0x77, 0x03, 0xca, 0xb9, 0x4f, 0xd3, 0x4b, 0x2f, 0x7d, 0xa4, 0xbe, 0x48,
0xca, 0xbb, 0xeb, 0x10, 0x62, 0x8c, 0x44, 0x4e, 0xde, 0x99, 0xf9, 0xe6, 0x9b, 0x99, 0xdd, 0x99, 0xe5, 0xdd, 0x75, 0x08, 0x71, 0x16, 0x09, 0x4e, 0xd9, 0xf9, 0xfb, 0x66, 0x66, 0xf7, 0x9b, 0x71,
0x31, 0x0c, 0xa6, 0x69, 0x4c, 0xf3, 0xe1, 0x45, 0xfe, 0x52, 0x1e, 0x68, 0x3e, 0xe3, 0x84, 0x0e, 0xa0, 0xcd, 0xe8, 0x44, 0x10, 0x16, 0xe4, 0x8c, 0x0a, 0x8a, 0xab, 0x97, 0x34, 0x18, 0xa7, 0x31,
0x0b, 0x9a, 0xf3, 0x4a, 0x08, 0x84, 0x80, 0x9b, 0x17, 0x79, 0x20, 0x30, 0x81, 0x54, 0xfb, 0x36, 0xa3, 0x81, 0x52, 0xfb, 0x2d, 0x68, 0x86, 0xe4, 0x7a, 0x42, 0xb8, 0xf0, 0x01, 0x9e, 0x85, 0x84,
0x74, 0x42, 0x72, 0x35, 0x23, 0x8c, 0xfb, 0xef, 0xa0, 0x7b, 0x92, 0x32, 0x1e, 0x12, 0x56, 0xe4, 0xe7, 0x34, 0xe3, 0xc4, 0xff, 0x00, 0xed, 0x93, 0x94, 0x8b, 0x52, 0xc6, 0x00, 0x5c, 0x19, 0xc0,
0x19, 0x23, 0x18, 0x80, 0x25, 0x40, 0xcc, 0xd3, 0xfb, 0xad, 0xc1, 0x93, 0xd1, 0x56, 0xb0, 0xe2, 0x3d, 0xbb, 0x5f, 0xdf, 0x79, 0x3e, 0x58, 0x0b, 0x16, 0x80, 0x82, 0xb0, 0xf8, 0x09, 0xb5, 0x97,
0x1c, 0x84, 0xe5, 0x27, 0x54, 0x28, 0xff, 0x2d, 0x6c, 0x9c, 0xe4, 0xf9, 0xe5, 0xac, 0x50, 0x84, 0xff, 0x1e, 0x56, 0x4e, 0x28, 0xbd, 0x9a, 0xe4, 0x1a, 0x1c, 0xf7, 0xc0, 0xb9, 0x9e, 0x10, 0x36,
0x78, 0x00, 0xe6, 0xd5, 0x8c, 0xd0, 0xb9, 0xa7, 0xf7, 0xf5, 0x7b, 0xfd, 0xbf, 0x95, 0xd6, 0x50, 0xf5, 0xec, 0xbe, 0xbd, 0x34, 0xfe, 0x7b, 0x61, 0x0d, 0x95, 0x93, 0x7f, 0x00, 0x9d, 0x32, 0xfc,
0x82, 0xfc, 0x0f, 0xe0, 0x54, 0xee, 0x6b, 0x26, 0xf0, 0x06, 0xba, 0x92, 0x71, 0xad, 0xf8, 0xef, 0x89, 0x05, 0xbc, 0x83, 0xb6, 0x42, 0x7c, 0x52, 0xfe, 0x8f, 0xb0, 0xa2, 0xa3, 0x9f, 0x98, 0xbe,
0x61, 0x43, 0x79, 0xaf, 0x19, 0xde, 0x81, 0xee, 0x8f, 0x88, 0xc7, 0x3f, 0xab, 0xfb, 0xfc, 0xad, 0x03, 0xed, 0x1f, 0x91, 0x88, 0x7f, 0x96, 0x77, 0xfb, 0xdb, 0x06, 0x77, 0x98, 0xdc, 0x10, 0x26,
0x83, 0x35, 0x4e, 0xae, 0x09, 0xe5, 0xe8, 0x80, 0x91, 0x26, 0x22, 0x0d, 0x3b, 0x34, 0xd2, 0x04, 0xb0, 0x03, 0xb5, 0x34, 0x91, 0x65, 0xb4, 0xc2, 0x5a, 0x9a, 0xe0, 0x3e, 0x34, 0xc4, 0x34, 0x27,
0x87, 0xd0, 0xe6, 0xf3, 0x82, 0x78, 0x46, 0x5f, 0x1f, 0x38, 0xa3, 0xe7, 0x35, 0x62, 0xe9, 0x76, 0x5e, 0xad, 0x6f, 0xef, 0x74, 0x06, 0xaf, 0x2a, 0xc0, 0x2a, 0xec, 0x6c, 0x9a, 0x93, 0x50, 0x3a,
0x3a, 0x2f, 0x48, 0x28, 0x80, 0xf8, 0x02, 0x6c, 0x9e, 0x4e, 0x09, 0xe3, 0xd1, 0xb4, 0xf0, 0x5a, 0xe2, 0x6b, 0x68, 0x89, 0x74, 0x4c, 0xb8, 0x88, 0xc6, 0xb9, 0x57, 0xef, 0xdb, 0x3b, 0xf5, 0xf0,
0x7d, 0x7d, 0xd0, 0x0a, 0x6f, 0x15, 0xe8, 0x42, 0x8b, 0xf3, 0x89, 0xd7, 0x16, 0xfa, 0xf2, 0x58, 0x4e, 0x81, 0x5d, 0xa8, 0x0b, 0x31, 0xf2, 0x1a, 0x52, 0x5f, 0x1c, 0x8b, 0xda, 0xc9, 0x0d, 0xc9,
0xe6, 0x4e, 0xae, 0x49, 0xc6, 0x99, 0x67, 0x36, 0xe4, 0x7e, 0x5c, 0x9a, 0x43, 0x85, 0xf2, 0x9f, 0x04, 0xf7, 0x1c, 0x43, 0xed, 0xc7, 0x85, 0x39, 0xd4, 0x5e, 0xfe, 0x06, 0x34, 0x4f, 0xe9, 0x28,
0xc2, 0xe6, 0x57, 0x9a, 0xc7, 0x84, 0xb1, 0xaa, 0x7c, 0xdf, 0x05, 0xe7, 0x88, 0x92, 0x88, 0x93, 0x8d, 0xd3, 0x4a, 0xad, 0xfe, 0x0b, 0x58, 0xfd, 0xc6, 0x68, 0x4c, 0x38, 0x9f, 0x31, 0xa5, 0x0b,
0x65, 0xcd, 0x47, 0x32, 0x21, 0x77, 0x35, 0x67, 0x45, 0xb2, 0x8c, 0xf9, 0xa5, 0x83, 0x29, 0xa8, 0x9d, 0x23, 0x46, 0x22, 0x41, 0xe6, 0x35, 0x9f, 0xc8, 0x88, 0xdc, 0xd7, 0x9c, 0xe7, 0xc9, 0xbc,
0x31, 0x50, 0x35, 0xea, 0xa2, 0xc6, 0xde, 0xfd, 0x09, 0x34, 0x95, 0x68, 0xac, 0x96, 0x78, 0x00, 0xcf, 0x2f, 0x1b, 0x1c, 0x99, 0x15, 0x03, 0xdd, 0xbe, 0x2d, 0xdb, 0xef, 0x2d, 0xaf, 0xcd, 0xd4,
0xa6, 0xf0, 0x13, 0xc5, 0x37, 0xbf, 0x85, 0x04, 0xf9, 0x67, 0x60, 0x8a, 0xb7, 0x44, 0x0f, 0x3a, 0x7d, 0x6d, 0xb1, 0xfb, 0x3d, 0x70, 0x64, 0x9c, 0xbc, 0x17, 0xf3, 0x33, 0x29, 0x27, 0xff, 0x1c,
0x8c, 0xd0, 0xeb, 0x34, 0x26, 0xea, 0xf6, 0x2b, 0xb1, 0xb4, 0x5c, 0x44, 0x9c, 0xdc, 0x44, 0x73, 0x1c, 0xf9, 0xcc, 0xe8, 0x41, 0x93, 0x13, 0x76, 0x93, 0xc6, 0x44, 0x37, 0x5b, 0x8a, 0x85, 0xe5,
0x11, 0xcc, 0x0e, 0x2b, 0xb1, 0xb4, 0x64, 0x84, 0xdf, 0xe4, 0xf4, 0x52, 0x04, 0xb3, 0xc3, 0x4a, 0x32, 0x12, 0xe4, 0x36, 0x9a, 0xca, 0x64, 0xad, 0xb0, 0x14, 0x0b, 0x4b, 0x46, 0xc4, 0x2d, 0x65,
0xf4, 0xff, 0xe8, 0x60, 0x8a, 0x38, 0x0f, 0xf3, 0x46, 0x49, 0x42, 0x09, 0x63, 0x15, 0xaf, 0x12, 0x57, 0x32, 0x59, 0x2b, 0x2c, 0x45, 0xff, 0xaf, 0x0d, 0x8e, 0xcc, 0xf3, 0x30, 0x6e, 0x94, 0x24,
0x97, 0x23, 0xb6, 0x1a, 0x23, 0xb6, 0xef, 0x44, 0xc4, 0x2d, 0xd5, 0x83, 0xd4, 0x33, 0x85, 0x41, 0x8c, 0x70, 0x5e, 0xe2, 0x6a, 0x71, 0x3e, 0x63, 0xdd, 0x98, 0xb1, 0x71, 0x2f, 0x23, 0xae, 0x69,
0x49, 0x88, 0xd0, 0x9e, 0xa4, 0xd9, 0xa5, 0x67, 0x09, 0xad, 0x38, 0x97, 0xd8, 0x29, 0xe1, 0x34, 0x7a, 0x32, 0xcf, 0x91, 0x06, 0x2d, 0x21, 0x42, 0x63, 0x94, 0x66, 0x57, 0x9e, 0x2b, 0xb5, 0xf2,
0x8d, 0xbd, 0x8e, 0xb8, 0x3d, 0x25, 0xf9, 0x23, 0xb0, 0xbe, 0xf3, 0x88, 0xcf, 0x58, 0xe9, 0x15, 0x5c, 0xf8, 0x8e, 0x89, 0x60, 0x69, 0xec, 0x35, 0xe5, 0xed, 0x69, 0xc9, 0x1f, 0x80, 0x7b, 0x2a,
0xe7, 0x49, 0x95, 0xb2, 0x38, 0xe3, 0x33, 0x30, 0x09, 0xa5, 0x39, 0x55, 0xd9, 0x4a, 0xc1, 0x1f, 0x22, 0x31, 0xe1, 0x45, 0x54, 0x4c, 0x93, 0xb2, 0x64, 0x79, 0xc6, 0x97, 0xe0, 0x10, 0xc6, 0x28,
0x83, 0x23, 0x7d, 0x16, 0xd3, 0x30, 0x04, 0x8b, 0x09, 0x8d, 0x9a, 0xa6, 0xed, 0xda, 0x0b, 0x28, 0xd3, 0xd5, 0x2a, 0xc1, 0x1f, 0x42, 0x47, 0xc5, 0xcc, 0x06, 0x65, 0x1f, 0x5c, 0x2e, 0x35, 0x7a,
0x07, 0x05, 0xdb, 0x1f, 0x01, 0xdc, 0xb6, 0x31, 0x22, 0x38, 0x52, 0x1a, 0x67, 0x59, 0x3e, 0xcb, 0xd0, 0xd6, 0x2b, 0x2f, 0xa0, 0x03, 0xb4, 0xdb, 0xee, 0x00, 0xe0, 0x8e, 0xe1, 0x88, 0xd0, 0x51,
0x62, 0xe2, 0x6a, 0xe8, 0x42, 0x57, 0xea, 0x64, 0x0f, 0xb9, 0xfa, 0xfe, 0x10, 0xec, 0x45, 0x5b, 0xd2, 0x30, 0xcb, 0xe8, 0x24, 0x8b, 0x49, 0xd7, 0xc2, 0x2e, 0xb4, 0x95, 0x4e, 0x71, 0xa8, 0x6b,
0x20, 0x80, 0x25, 0x1b, 0xd0, 0xd5, 0xca, 0xb3, 0x6c, 0x3d, 0x57, 0x2f, 0xcf, 0xca, 0xc1, 0x18, 0xef, 0xee, 0x43, 0x6b, 0x46, 0x0b, 0x04, 0x70, 0x15, 0x01, 0xbb, 0x56, 0x71, 0x56, 0xd4, 0xeb,
0xfd, 0x33, 0xc0, 0x0a, 0xe5, 0x95, 0x7c, 0x01, 0x4b, 0xee, 0x0f, 0xdc, 0xad, 0xa5, 0x76, 0x67, 0xda, 0xc5, 0x59, 0x07, 0xd4, 0x06, 0x7f, 0xea, 0xe0, 0x86, 0xea, 0x4a, 0xbe, 0x82, 0xab, 0x56,
0x2f, 0xf5, 0xf6, 0x1a, 0xed, 0xaa, 0x89, 0x35, 0x3c, 0x04, 0x53, 0xcc, 0x32, 0xee, 0xd4, 0xb0, 0x0b, 0x6e, 0x55, 0x4a, 0xbb, 0xb7, 0xb2, 0x7a, 0xdb, 0x46, 0xbb, 0x26, 0xb1, 0x85, 0x87, 0xe0,
0xcb, 0x33, 0xde, 0x6b, 0x98, 0x2b, 0x5f, 0x7b, 0xa5, 0xe3, 0x21, 0xd8, 0xb2, 0xbc, 0x94, 0x11, 0xc8, 0x31, 0xc7, 0xcd, 0x8a, 0xef, 0xfc, 0xf8, 0xf7, 0x0c, 0x23, 0xe7, 0x5b, 0x6f, 0x6c, 0x3c,
0xf4, 0xea, 0x0d, 0xab, 0x28, 0xb6, 0x1b, 0xa6, 0x5f, 0x70, 0x7c, 0x82, 0x8e, 0x9a, 0x4b, 0x6c, 0x84, 0x96, 0x6a, 0x2f, 0xe5, 0x04, 0xbd, 0x2a, 0x61, 0x35, 0xc4, 0xba, 0x61, 0x31, 0x48, 0x8c,
0xc2, 0xf5, 0xfa, 0x35, 0xc3, 0xea, 0x28, 0x6b, 0x78, 0xbc, 0xe8, 0x81, 0xe6, 0x44, 0xf6, 0x9a, 0x83, 0xbb, 0x91, 0x35, 0x23, 0x6c, 0x2c, 0xb1, 0xcc, 0x3a, 0xf9, 0x0c, 0x4d, 0x3d, 0xd9, 0x68,
0x5e, 0x74, 0x41, 0x33, 0xfa, 0x6b, 0x80, 0x79, 0x1a, 0x9d, 0x4f, 0x08, 0x1e, 0x55, 0x8f, 0x83, 0xca, 0xd4, 0xeb, 0x57, 0x0c, 0x8b, 0xcb, 0xc0, 0xc2, 0xe3, 0x19, 0x8b, 0xcc, 0x85, 0x6c, 0x9b,
0x0d, 0xa3, 0x78, 0x0f, 0xdd, 0xca, 0x3a, 0xd1, 0x4a, 0x12, 0xf9, 0xaa, 0x8f, 0x20, 0x59, 0xd9, 0x38, 0x31, 0x83, 0x19, 0xfc, 0xab, 0x81, 0x73, 0x16, 0x5d, 0x8c, 0x08, 0x1e, 0x95, 0xcf, 0x8b,
0x40, 0x82, 0x44, 0xb6, 0xc3, 0x23, 0x48, 0x56, 0x96, 0x96, 0x86, 0x63, 0x68, 0x97, 0xff, 0xbe, 0x86, 0x61, 0x5e, 0x02, 0xb7, 0xb0, 0x90, 0xac, 0x02, 0x44, 0xf1, 0xe2, 0x11, 0x20, 0x0b, 0x3b,
0x07, 0x6e, 0xa7, 0xde, 0x08, 0xcb, 0x3f, 0x4b, 0x5f, 0xc3, 0xcf, 0xd5, 0xce, 0xd9, 0x69, 0xf8, 0x4c, 0x82, 0x28, 0x42, 0x3d, 0x02, 0x64, 0x61, 0xed, 0x59, 0x38, 0x84, 0x46, 0xf1, 0x61, 0x7d,
0xcf, 0x28, 0xa2, 0xdd, 0x26, 0x73, 0xc5, 0x74, 0x6e, 0x89, 0x7f, 0xf5, 0xeb, 0xff, 0x01, 0x00, 0xe0, 0x76, 0xaa, 0x54, 0x9a, 0xff, 0x12, 0xfb, 0x16, 0x7e, 0x29, 0xb7, 0xd6, 0xa6, 0xe1, 0x23,
0x00, 0xff, 0xff, 0xe2, 0xe9, 0xe2, 0x3b, 0xd7, 0x07, 0x00, 0x00, 0xa6, 0x81, 0xb6, 0x4c, 0xe6, 0x12, 0xe9, 0xc2, 0x95, 0x7f, 0x0a, 0xde, 0xfe, 0x0f, 0x00, 0x00,
} 0xff, 0xff, 0xb7, 0x25, 0xac, 0xac, 0x24, 0x08, 0x00, 0x00,
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// RouterClient is the client API for Router service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type RouterClient interface {
Lookup(ctx context.Context, in *LookupRequest, opts ...grpc.CallOption) (*LookupResponse, error)
Watch(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (Router_WatchClient, error)
Advertise(ctx context.Context, in *Request, opts ...grpc.CallOption) (Router_AdvertiseClient, error)
Process(ctx context.Context, in *Advert, opts ...grpc.CallOption) (*ProcessResponse, error)
Status(ctx context.Context, in *Request, opts ...grpc.CallOption) (*StatusResponse, error)
}
type routerClient struct {
cc *grpc.ClientConn
}
func NewRouterClient(cc *grpc.ClientConn) RouterClient {
return &routerClient{cc}
}
func (c *routerClient) Lookup(ctx context.Context, in *LookupRequest, opts ...grpc.CallOption) (*LookupResponse, error) {
out := new(LookupResponse)
err := c.cc.Invoke(ctx, "/go.micro.router.Router/Lookup", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *routerClient) Watch(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (Router_WatchClient, error) {
stream, err := c.cc.NewStream(ctx, &_Router_serviceDesc.Streams[0], "/go.micro.router.Router/Watch", opts...)
if err != nil {
return nil, err
}
x := &routerWatchClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type Router_WatchClient interface {
Recv() (*Event, error)
grpc.ClientStream
}
type routerWatchClient struct {
grpc.ClientStream
}
func (x *routerWatchClient) Recv() (*Event, error) {
m := new(Event)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *routerClient) Advertise(ctx context.Context, in *Request, opts ...grpc.CallOption) (Router_AdvertiseClient, error) {
stream, err := c.cc.NewStream(ctx, &_Router_serviceDesc.Streams[1], "/go.micro.router.Router/Advertise", opts...)
if err != nil {
return nil, err
}
x := &routerAdvertiseClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type Router_AdvertiseClient interface {
Recv() (*Advert, error)
grpc.ClientStream
}
type routerAdvertiseClient struct {
grpc.ClientStream
}
func (x *routerAdvertiseClient) Recv() (*Advert, error) {
m := new(Advert)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *routerClient) Process(ctx context.Context, in *Advert, opts ...grpc.CallOption) (*ProcessResponse, error) {
out := new(ProcessResponse)
err := c.cc.Invoke(ctx, "/go.micro.router.Router/Process", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *routerClient) Status(ctx context.Context, in *Request, opts ...grpc.CallOption) (*StatusResponse, error) {
out := new(StatusResponse)
err := c.cc.Invoke(ctx, "/go.micro.router.Router/Status", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// RouterServer is the server API for Router service.
type RouterServer interface {
Lookup(context.Context, *LookupRequest) (*LookupResponse, error)
Watch(*WatchRequest, Router_WatchServer) error
Advertise(*Request, Router_AdvertiseServer) error
Process(context.Context, *Advert) (*ProcessResponse, error)
Status(context.Context, *Request) (*StatusResponse, error)
}
func RegisterRouterServer(s *grpc.Server, srv RouterServer) {
s.RegisterService(&_Router_serviceDesc, srv)
}
func _Router_Lookup_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(LookupRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(RouterServer).Lookup(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/go.micro.router.Router/Lookup",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(RouterServer).Lookup(ctx, req.(*LookupRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Router_Watch_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(WatchRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(RouterServer).Watch(m, &routerWatchServer{stream})
}
type Router_WatchServer interface {
Send(*Event) error
grpc.ServerStream
}
type routerWatchServer struct {
grpc.ServerStream
}
func (x *routerWatchServer) Send(m *Event) error {
return x.ServerStream.SendMsg(m)
}
func _Router_Advertise_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(Request)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(RouterServer).Advertise(m, &routerAdvertiseServer{stream})
}
type Router_AdvertiseServer interface {
Send(*Advert) error
grpc.ServerStream
}
type routerAdvertiseServer struct {
grpc.ServerStream
}
func (x *routerAdvertiseServer) Send(m *Advert) error {
return x.ServerStream.SendMsg(m)
}
func _Router_Process_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(Advert)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(RouterServer).Process(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/go.micro.router.Router/Process",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(RouterServer).Process(ctx, req.(*Advert))
}
return interceptor(ctx, in, info, handler)
}
func _Router_Status_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(Request)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(RouterServer).Status(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/go.micro.router.Router/Status",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(RouterServer).Status(ctx, req.(*Request))
}
return interceptor(ctx, in, info, handler)
}
var _Router_serviceDesc = grpc.ServiceDesc{
ServiceName: "go.micro.router.Router",
HandlerType: (*RouterServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Lookup",
Handler: _Router_Lookup_Handler,
},
{
MethodName: "Process",
Handler: _Router_Process_Handler,
},
{
MethodName: "Status",
Handler: _Router_Status_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "Watch",
Handler: _Router_Watch_Handler,
ServerStreams: true,
},
{
StreamName: "Advertise",
Handler: _Router_Advertise_Handler,
ServerStreams: true,
},
},
Metadata: "micro/go-micro/router/proto/router.proto",
}
// TableClient is the client API for Table service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type TableClient interface {
Create(ctx context.Context, in *Route, opts ...grpc.CallOption) (*CreateResponse, error)
Delete(ctx context.Context, in *Route, opts ...grpc.CallOption) (*DeleteResponse, error)
Update(ctx context.Context, in *Route, opts ...grpc.CallOption) (*UpdateResponse, error)
List(ctx context.Context, in *Request, opts ...grpc.CallOption) (*ListResponse, error)
Query(ctx context.Context, in *QueryRequest, opts ...grpc.CallOption) (*QueryResponse, error)
}
type tableClient struct {
cc *grpc.ClientConn
}
func NewTableClient(cc *grpc.ClientConn) TableClient {
return &tableClient{cc}
}
func (c *tableClient) Create(ctx context.Context, in *Route, opts ...grpc.CallOption) (*CreateResponse, error) {
out := new(CreateResponse)
err := c.cc.Invoke(ctx, "/go.micro.router.Table/Create", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *tableClient) Delete(ctx context.Context, in *Route, opts ...grpc.CallOption) (*DeleteResponse, error) {
out := new(DeleteResponse)
err := c.cc.Invoke(ctx, "/go.micro.router.Table/Delete", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *tableClient) Update(ctx context.Context, in *Route, opts ...grpc.CallOption) (*UpdateResponse, error) {
out := new(UpdateResponse)
err := c.cc.Invoke(ctx, "/go.micro.router.Table/Update", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *tableClient) List(ctx context.Context, in *Request, opts ...grpc.CallOption) (*ListResponse, error) {
out := new(ListResponse)
err := c.cc.Invoke(ctx, "/go.micro.router.Table/List", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *tableClient) Query(ctx context.Context, in *QueryRequest, opts ...grpc.CallOption) (*QueryResponse, error) {
out := new(QueryResponse)
err := c.cc.Invoke(ctx, "/go.micro.router.Table/Query", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// TableServer is the server API for Table service.
type TableServer interface {
Create(context.Context, *Route) (*CreateResponse, error)
Delete(context.Context, *Route) (*DeleteResponse, error)
Update(context.Context, *Route) (*UpdateResponse, error)
List(context.Context, *Request) (*ListResponse, error)
Query(context.Context, *QueryRequest) (*QueryResponse, error)
}
func RegisterTableServer(s *grpc.Server, srv TableServer) {
s.RegisterService(&_Table_serviceDesc, srv)
}
func _Table_Create_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(Route)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(TableServer).Create(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/go.micro.router.Table/Create",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(TableServer).Create(ctx, req.(*Route))
}
return interceptor(ctx, in, info, handler)
}
func _Table_Delete_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(Route)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(TableServer).Delete(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/go.micro.router.Table/Delete",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(TableServer).Delete(ctx, req.(*Route))
}
return interceptor(ctx, in, info, handler)
}
func _Table_Update_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(Route)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(TableServer).Update(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/go.micro.router.Table/Update",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(TableServer).Update(ctx, req.(*Route))
}
return interceptor(ctx, in, info, handler)
}
func _Table_List_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(Request)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(TableServer).List(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/go.micro.router.Table/List",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(TableServer).List(ctx, req.(*Request))
}
return interceptor(ctx, in, info, handler)
}
func _Table_Query_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(QueryRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(TableServer).Query(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/go.micro.router.Table/Query",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(TableServer).Query(ctx, req.(*QueryRequest))
}
return interceptor(ctx, in, info, handler)
}
var _Table_serviceDesc = grpc.ServiceDesc{
ServiceName: "go.micro.router.Table",
HandlerType: (*TableServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Create",
Handler: _Table_Create_Handler,
},
{
MethodName: "Delete",
Handler: _Table_Delete_Handler,
},
{
MethodName: "Update",
Handler: _Table_Update_Handler,
},
{
MethodName: "List",
Handler: _Table_List_Handler,
},
{
MethodName: "Query",
Handler: _Table_Query_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "micro/go-micro/router/proto/router.proto",
} }

View File

@ -74,6 +74,12 @@ message Advert {
repeated Event events = 5; repeated Event events = 5;
} }
// Solicit solicits routes
message Solicit {
// id of the soliciting router
string id = 1;
}
// ProcessResponse is returned by Process // ProcessResponse is returned by Process
message ProcessResponse {} message ProcessResponse {}

View File

@ -110,8 +110,12 @@ func (t *table) Update(r Route) error {
if _, ok := t.routes[service][sum]; !ok { if _, ok := t.routes[service][sum]; !ok {
t.routes[service][sum] = r t.routes[service][sum] = r
go t.sendEvent(&Event{Type: Update, Timestamp: time.Now(), Route: r}) go t.sendEvent(&Event{Type: Update, Timestamp: time.Now(), Route: r})
return nil
} }
t.routes[service][sum] = r
go t.sendEvent(&Event{Type: Update, Timestamp: time.Now(), Route: r})
return nil return nil
} }