Merge pull request #736 from milosgajdos83/solicit-routes

Solicit routes when new node is discovered
This commit is contained in:
Asim Aslam 2019-09-05 18:08:49 +01:00 committed by GitHub
commit 1d9298ae2b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 281 additions and 134 deletions

View File

@ -2,6 +2,7 @@ package network
import ( import (
"container/list" "container/list"
"errors"
"sync" "sync"
"time" "time"
@ -28,6 +29,13 @@ var (
DefaultLink = "network" DefaultLink = "network"
) )
var (
// ErrMsgUnknown is returned when unknown message is attempted to send or receive
ErrMsgUnknown = errors.New("unknown message")
// ErrClientNotFound is returned when client for tunnel channel could not be found
ErrClientNotFound = errors.New("client not found")
)
// node is network node // node is network node
type node struct { type node struct {
sync.RWMutex sync.RWMutex
@ -275,12 +283,12 @@ func (n *network) acceptNetConn(l tunnel.Listener, recv chan *transport.Message)
} }
// processNetChan processes messages received on NetworkChannel // processNetChan processes messages received on NetworkChannel
func (n *network) processNetChan(l tunnel.Listener) { func (n *network) processNetChan(client transport.Client, listener tunnel.Listener) {
// receive network message queue // receive network message queue
recv := make(chan *transport.Message, 128) recv := make(chan *transport.Message, 128)
// accept NetworkChannel connections // accept NetworkChannel connections
go n.acceptNetConn(l, recv) go n.acceptNetConn(listener, recv)
for { for {
select { select {
@ -319,6 +327,14 @@ func (n *network) processNetChan(l tunnel.Listener) {
lastSeen: now, lastSeen: now,
} }
n.Unlock() n.Unlock()
// advertise yourself to the network
if err := n.sendMsg("neighbour", NetworkChannel); err != nil {
log.Debugf("Network failed to advertise neighbours: %v", err)
}
// advertise all the routes when a new node has connected
if err := n.Router.Solicit(); err != nil {
log.Debugf("Network failed to solicit routes: %s", err)
}
case "neighbour": case "neighbour":
// mark the time the message has been received // mark the time the message has been received
now := time.Now() now := time.Now()
@ -334,7 +350,8 @@ func (n *network) processNetChan(l tunnel.Listener) {
n.Lock() n.Lock()
log.Debugf("Network received neighbour message from: %s", pbNetNeighbour.Node.Id) log.Debugf("Network received neighbour message from: %s", pbNetNeighbour.Node.Id)
// only add the neighbour if it is NOT already in node's list of neighbours // only add the neighbour if it is NOT already in node's list of neighbours
if _, ok := n.neighbours[pbNetNeighbour.Node.Id]; !ok { _, exists := n.neighbours[pbNetNeighbour.Node.Id]
if !exists {
n.neighbours[pbNetNeighbour.Node.Id] = &node{ n.neighbours[pbNetNeighbour.Node.Id] = &node{
id: pbNetNeighbour.Node.Id, id: pbNetNeighbour.Node.Id,
address: pbNetNeighbour.Node.Address, address: pbNetNeighbour.Node.Address,
@ -347,7 +364,7 @@ func (n *network) processNetChan(l tunnel.Listener) {
n.neighbours[pbNetNeighbour.Node.Id].lastSeen = now n.neighbours[pbNetNeighbour.Node.Id].lastSeen = now
} }
// update/store the neighbour node neighbours // update/store the neighbour node neighbours
// NOTE: * we dont update lastSeen time for the neighbours of the neighbour // NOTE: * we do NOT update lastSeen time for the neighbours of the neighbour
// * even though we are NOT interested in neighbours of neighbours here // * even though we are NOT interested in neighbours of neighbours here
// we still allocate the map of neighbours for each of them // we still allocate the map of neighbours for each of them
for _, pbNeighbour := range pbNetNeighbour.Neighbours { for _, pbNeighbour := range pbNetNeighbour.Neighbours {
@ -359,6 +376,13 @@ func (n *network) processNetChan(l tunnel.Listener) {
n.neighbours[pbNetNeighbour.Node.Id].neighbours[neighbourNode.id] = neighbourNode n.neighbours[pbNetNeighbour.Node.Id].neighbours[neighbourNode.id] = neighbourNode
} }
n.Unlock() n.Unlock()
// send a solicit message when discovering a new node
// NOTE: we need to send the solicit message here after the Lock is released as sendMsg locks, too
if !exists {
if err := n.sendMsg("solicit", NetworkChannel); err != nil {
log.Debugf("Network failed to send solicit message: %s", err)
}
}
case "close": case "close":
pbNetClose := &pbNet.Close{} pbNetClose := &pbNet.Close{}
if err := proto.Unmarshal(m.Body, pbNetClose); err != nil { if err := proto.Unmarshal(m.Body, pbNetClose); err != nil {
@ -383,6 +407,76 @@ func (n *network) processNetChan(l tunnel.Listener) {
} }
} }
// sendMsg sends a message to the tunnel channel
func (n *network) sendMsg(msgType string, channel string) error {
node := &pbNet.Node{
Id: n.options.Id,
Address: n.options.Address,
}
var protoMsg proto.Message
switch msgType {
case "connect":
protoMsg = &pbNet.Connect{
Node: node,
}
case "close":
protoMsg = &pbNet.Close{
Node: node,
}
case "solicit":
protoMsg = &pbNet.Solicit{
Node: node,
}
case "neighbour":
n.RLock()
nodes := make([]*pbNet.Node, len(n.neighbours))
i := 0
for id := range n.neighbours {
nodes[i] = &pbNet.Node{
Id: id,
Address: n.neighbours[id].address,
}
i++
}
n.RUnlock()
protoMsg = &pbNet.Neighbour{
Node: node,
Neighbours: nodes,
}
default:
return ErrMsgUnknown
}
body, err := proto.Marshal(protoMsg)
if err != nil {
return err
}
// create transport message and chuck it down the pipe
m := transport.Message{
Header: map[string]string{
"Micro-Method": msgType,
},
Body: body,
}
n.RLock()
client, ok := n.tunClient[channel]
if !ok {
n.RUnlock()
return ErrClientNotFound
}
n.RUnlock()
log.Debugf("Network sending %s message from: %s", msgType, node.Id)
if err := client.Send(&m); err != nil {
return err
}
return nil
}
// announce announces node neighbourhood to the network // announce announces node neighbourhood to the network
func (n *network) announce(client transport.Client) { func (n *network) announce(client transport.Client) {
announce := time.NewTicker(AnnounceTime) announce := time.NewTicker(AnnounceTime)
@ -393,44 +487,9 @@ func (n *network) announce(client transport.Client) {
case <-n.closed: case <-n.closed:
return return
case <-announce.C: case <-announce.C:
n.RLock() // advertise yourself to the network
nodes := make([]*pbNet.Node, len(n.neighbours)) if err := n.sendMsg("neighbour", NetworkChannel); err != nil {
i := 0 log.Debugf("Network failed to advertise neighbours: %v", err)
for id, _ := range n.neighbours {
nodes[i] = &pbNet.Node{
Id: id,
Address: n.neighbours[id].address,
}
i++
}
n.RUnlock()
node := &pbNet.Node{
Id: n.options.Id,
Address: n.options.Address,
}
pbNetNeighbour := &pbNet.Neighbour{
Node: node,
Neighbours: nodes,
}
body, err := proto.Marshal(pbNetNeighbour)
if err != nil {
// TODO: should we bail here?
log.Debugf("Network failed to marshal neighbour message: %v", err)
continue
}
// create transport message and chuck it down the pipe
m := transport.Message{
Header: map[string]string{
"Micro-Method": "neighbour",
},
Body: body,
}
log.Debugf("Network sending neighbour message from: %s", node.Id)
if err := client.Send(&m); err != nil {
log.Debugf("Network failed to send neighbour messsage: %v", err)
continue continue
} }
} }
@ -550,7 +609,7 @@ func (n *network) setRouteMetric(route *router.Route) {
// check if the route origin is the neighbour of our neighbour // check if the route origin is the neighbour of our neighbour
for _, node := range n.neighbours { for _, node := range n.neighbours {
for id, _ := range node.neighbours { for id := range node.neighbours {
if route.Router == id { if route.Router == id {
route.Metric = 100 route.Metric = 100
n.RUnlock() n.RUnlock()
@ -565,12 +624,12 @@ func (n *network) setRouteMetric(route *router.Route) {
} }
// processCtrlChan processes messages received on ControlChannel // processCtrlChan processes messages received on ControlChannel
func (n *network) processCtrlChan(l tunnel.Listener) { func (n *network) processCtrlChan(client transport.Client, listener tunnel.Listener) {
// receive control message queue // receive control message queue
recv := make(chan *transport.Message, 128) recv := make(chan *transport.Message, 128)
// accept ControlChannel cconnections // accept ControlChannel cconnections
go n.acceptCtrlConn(l, recv) go n.acceptCtrlConn(listener, recv)
for { for {
select { select {
@ -601,6 +660,10 @@ func (n *network) processCtrlChan(l tunnel.Listener) {
lastSeen: now, lastSeen: now,
} }
n.neighbours[pbRtrAdvert.Id] = advertNode n.neighbours[pbRtrAdvert.Id] = advertNode
// send a solicit message when discovering a new node
if err := n.sendMsg("solicit", NetworkChannel); err != nil {
log.Debugf("Network failed to send solicit message: %s", err)
}
} }
n.RUnlock() n.RUnlock()
@ -657,6 +720,11 @@ func (n *network) processCtrlChan(l tunnel.Listener) {
log.Debugf("Network failed to process advert %s: %v", advert.Id, err) log.Debugf("Network failed to process advert %s: %v", advert.Id, err)
continue continue
} }
case "solicit":
// advertise all the routes when a new node has connected
if err := n.Router.Solicit(); err != nil {
log.Debugf("Network failed to solicit routes: %s", err)
}
} }
case <-n.closed: case <-n.closed:
return return
@ -724,8 +792,6 @@ func (n *network) advertise(client transport.Client, advertChan <-chan *router.A
// Connect connects the network // Connect connects the network
func (n *network) Connect() error { func (n *network) Connect() error {
n.Lock() n.Lock()
defer n.Unlock()
// return if already connected // return if already connected
if n.connected { if n.connected {
return nil return nil
@ -793,32 +859,14 @@ func (n *network) Connect() error {
if err := n.server.Start(); err != nil { if err := n.server.Start(); err != nil {
return err return err
} }
n.Unlock()
// send connect message to NetworkChannel // send connect message to NetworkChannel
// NOTE: in theory we could do this as soon as // NOTE: in theory we could do this as soon as
// Dial to NetworkChannel succeeds, but instead // Dial to NetworkChannel succeeds, but instead
// we initialize all other node resources first // we initialize all other node resources first
node := &pbNet.Node{ if err := n.sendMsg("connect", NetworkChannel); err != nil {
Id: n.options.Id, log.Debugf("Network failed to send connect message: %s", err)
Address: n.options.Address,
}
pbNetConnect := &pbNet.Connect{
Node: node,
}
// only proceed with sending to NetworkChannel if marshal succeeds
if body, err := proto.Marshal(pbNetConnect); err == nil {
m := transport.Message{
Header: map[string]string{
"Micro-Method": "connect",
},
Body: body,
}
log.Debugf("Network sending connect message: %s", node.Id)
if err := netClient.Send(&m); err != nil {
log.Debugf("Network failed to send connect messsage: %v", err)
}
} }
// go resolving network nodes // go resolving network nodes
@ -828,14 +876,15 @@ func (n *network) Connect() error {
// prune stale nodes // prune stale nodes
go n.prune() go n.prune()
// listen to network messages // listen to network messages
go n.processNetChan(netListener) go n.processNetChan(netClient, netListener)
// advertise service routes // advertise service routes
go n.advertise(ctrlClient, advertChan) go n.advertise(ctrlClient, advertChan)
// accept and process routes // accept and process routes
go n.processCtrlChan(ctrlListener) go n.processCtrlChan(ctrlClient, ctrlListener)
// set connected to true n.Lock()
n.connected = true n.connected = true
n.Unlock()
return nil return nil
} }
@ -912,31 +961,9 @@ func (n *network) Close() error {
return nil return nil
default: default:
// send close message only if we managed to connect to NetworkChannel // send close message only if we managed to connect to NetworkChannel
if netClient, ok := n.tunClient[NetworkChannel]; ok { log.Debugf("Sending close message from: %s", n.options.Id)
// send connect message to NetworkChannel if err := n.sendMsg("close", NetworkChannel); err != nil {
node := &pbNet.Node{ log.Debugf("Network failed to send close message: %s", err)
Id: n.options.Id,
Address: n.options.Address,
}
pbNetClose := &pbNet.Close{
Node: node,
}
// only proceed with sending to NetworkChannel if marshal succeeds
if body, err := proto.Marshal(pbNetClose); err == nil {
// create transport message and chuck it down the pipe
m := transport.Message{
Header: map[string]string{
"Micro-Method": "close",
},
Body: body,
}
log.Debugf("Network sending close message from: %s", node.Id)
if err := netClient.Send(&m); err != nil {
log.Debugf("Network failed to send close messsage: %v", err)
}
}
} }
// TODO: send close message to the network channel // TODO: send close message to the network channel
close(n.closed) close(n.closed)

View File

@ -305,6 +305,47 @@ func (m *Close) GetNode() *Node {
return nil return nil
} }
// Solicit is sent when requesting route advertisement from the network nodes
type Solicit struct {
// network node
Node *Node `protobuf:"bytes,1,opt,name=node,proto3" json:"node,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Solicit) Reset() { *m = Solicit{} }
func (m *Solicit) String() string { return proto.CompactTextString(m) }
func (*Solicit) ProtoMessage() {}
func (*Solicit) Descriptor() ([]byte, []int) {
return fileDescriptor_8571034d60397816, []int{7}
}
func (m *Solicit) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Solicit.Unmarshal(m, b)
}
func (m *Solicit) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Solicit.Marshal(b, m, deterministic)
}
func (m *Solicit) XXX_Merge(src proto.Message) {
xxx_messageInfo_Solicit.Merge(m, src)
}
func (m *Solicit) XXX_Size() int {
return xxx_messageInfo_Solicit.Size(m)
}
func (m *Solicit) XXX_DiscardUnknown() {
xxx_messageInfo_Solicit.DiscardUnknown(m)
}
var xxx_messageInfo_Solicit proto.InternalMessageInfo
func (m *Solicit) GetNode() *Node {
if m != nil {
return m.Node
}
return nil
}
// Neighbour is used to nnounce node neighbourhood // Neighbour is used to nnounce node neighbourhood
type Neighbour struct { type Neighbour struct {
// network node // network node
@ -320,7 +361,7 @@ func (m *Neighbour) Reset() { *m = Neighbour{} }
func (m *Neighbour) String() string { return proto.CompactTextString(m) } func (m *Neighbour) String() string { return proto.CompactTextString(m) }
func (*Neighbour) ProtoMessage() {} func (*Neighbour) ProtoMessage() {}
func (*Neighbour) Descriptor() ([]byte, []int) { func (*Neighbour) Descriptor() ([]byte, []int) {
return fileDescriptor_8571034d60397816, []int{7} return fileDescriptor_8571034d60397816, []int{8}
} }
func (m *Neighbour) XXX_Unmarshal(b []byte) error { func (m *Neighbour) XXX_Unmarshal(b []byte) error {
@ -363,33 +404,35 @@ func init() {
proto.RegisterType((*Node)(nil), "go.micro.network.Node") proto.RegisterType((*Node)(nil), "go.micro.network.Node")
proto.RegisterType((*Connect)(nil), "go.micro.network.Connect") proto.RegisterType((*Connect)(nil), "go.micro.network.Connect")
proto.RegisterType((*Close)(nil), "go.micro.network.Close") proto.RegisterType((*Close)(nil), "go.micro.network.Close")
proto.RegisterType((*Solicit)(nil), "go.micro.network.Solicit")
proto.RegisterType((*Neighbour)(nil), "go.micro.network.Neighbour") proto.RegisterType((*Neighbour)(nil), "go.micro.network.Neighbour")
} }
func init() { proto.RegisterFile("network.proto", fileDescriptor_8571034d60397816) } func init() { proto.RegisterFile("network.proto", fileDescriptor_8571034d60397816) }
var fileDescriptor_8571034d60397816 = []byte{ var fileDescriptor_8571034d60397816 = []byte{
// 348 bytes of a gzipped FileDescriptorProto // 360 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x52, 0x41, 0x4f, 0x32, 0x31, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x53, 0x41, 0x4f, 0xf2, 0x40,
0x10, 0xfd, 0x58, 0xe0, 0x23, 0x0c, 0x62, 0x4c, 0xa3, 0x66, 0xb3, 0x06, 0x43, 0x7a, 0x40, 0x62, 0x10, 0xfd, 0x28, 0xf0, 0x35, 0x0c, 0x1f, 0x5f, 0xcc, 0x46, 0x4d, 0x53, 0x83, 0x21, 0x7b, 0x40,
0x74, 0x31, 0x10, 0x3d, 0x79, 0x31, 0x1c, 0xbc, 0x10, 0x0e, 0x7b, 0xf4, 0xe6, 0xd2, 0x66, 0x69, 0x62, 0xb4, 0x18, 0x08, 0x9e, 0xbc, 0x18, 0x0e, 0x5e, 0x08, 0x87, 0x7a, 0xf3, 0x66, 0xbb, 0x9b,
0x94, 0x1d, 0x6c, 0xbb, 0xf1, 0x0f, 0xf8, 0xc3, 0x4d, 0xbb, 0x05, 0x97, 0x45, 0x30, 0xdc, 0xda, 0xb2, 0x11, 0x3a, 0xb8, 0xbb, 0x8d, 0x7f, 0xc0, 0x1f, 0x6e, 0xba, 0x5d, 0xb0, 0x80, 0x60, 0xb8,
0x99, 0xf7, 0xe6, 0xcd, 0xe4, 0x3d, 0x68, 0xa7, 0x5c, 0x7f, 0xa2, 0x7c, 0x0b, 0x97, 0x12, 0x35, 0x75, 0xe6, 0xbd, 0x37, 0x6f, 0xa7, 0xfb, 0x16, 0x5a, 0x29, 0xd7, 0x1f, 0x28, 0xdf, 0x82, 0xa5,
0x92, 0x93, 0x04, 0xc3, 0x85, 0x98, 0x49, 0x0c, 0x5d, 0x3d, 0x18, 0x25, 0x42, 0xcf, 0xb3, 0x38, 0x44, 0x8d, 0xe4, 0x24, 0xc1, 0x60, 0x21, 0x62, 0x89, 0x81, 0xed, 0xfb, 0xc3, 0x44, 0xe8, 0x59,
0x9c, 0xe1, 0x62, 0x60, 0x3b, 0x83, 0x04, 0x6f, 0xf3, 0x87, 0xc4, 0x4c, 0x73, 0x39, 0xb0, 0x4c, 0x16, 0x05, 0x31, 0x2e, 0xfa, 0x06, 0xe9, 0x27, 0x78, 0x5b, 0x7c, 0x48, 0xcc, 0x34, 0x97, 0x7d,
0xf7, 0xc9, 0xc7, 0xd0, 0x36, 0xb4, 0x26, 0x42, 0xe9, 0x88, 0x7f, 0x64, 0x5c, 0x69, 0xfa, 0x08, 0xa3, 0xb4, 0x45, 0x31, 0x86, 0xb6, 0xa0, 0x39, 0x11, 0x4a, 0x87, 0xfc, 0x3d, 0xe3, 0x4a, 0xd3,
0x47, 0xf9, 0x57, 0x2d, 0x31, 0x55, 0x9c, 0xdc, 0x40, 0x3d, 0x45, 0xc6, 0x95, 0x5f, 0xe9, 0x56, 0x07, 0xf8, 0x57, 0x94, 0x6a, 0x89, 0xa9, 0xe2, 0xe4, 0x06, 0xea, 0x29, 0x32, 0xae, 0xbc, 0x4a,
0xfb, 0xad, 0xe1, 0x79, 0x58, 0x56, 0x0d, 0xa7, 0xc8, 0x78, 0x94, 0x83, 0x68, 0x0f, 0x4e, 0xa7, 0xa7, 0xda, 0x6b, 0x0e, 0xce, 0x83, 0x6d, 0xd7, 0x60, 0x8a, 0x8c, 0x87, 0x05, 0x89, 0x76, 0xe1,
0x5c, 0x24, 0xf3, 0x18, 0x33, 0x39, 0x47, 0x64, 0x6e, 0x2a, 0x39, 0x06, 0x4f, 0x30, 0xbf, 0xd2, 0x74, 0xca, 0x45, 0x32, 0x8b, 0x30, 0x93, 0x33, 0x44, 0x66, 0xa7, 0x92, 0xff, 0xe0, 0x08, 0xe6,
0xad, 0xf4, 0x9b, 0x91, 0x27, 0x18, 0x7d, 0x81, 0xb3, 0x12, 0xce, 0xc9, 0x3d, 0x99, 0x2b, 0x0b, 0x55, 0x3a, 0x95, 0x5e, 0x23, 0x74, 0x04, 0xa3, 0x2f, 0x70, 0xb6, 0xc5, 0xb3, 0x76, 0x8f, 0xf9,
0x0d, 0xcb, 0x69, 0x0d, 0x2f, 0x7e, 0x91, 0x5d, 0xc1, 0xa2, 0x4d, 0x06, 0xbd, 0x83, 0x9a, 0x59, 0x96, 0x25, 0xc0, 0x68, 0x9a, 0x83, 0x8b, 0x1f, 0x6c, 0x57, 0xb4, 0x70, 0x53, 0x41, 0xef, 0xa0,
0xa9, 0xac, 0x49, 0x7c, 0x68, 0xbc, 0x32, 0x26, 0xb9, 0x52, 0xbe, 0x67, 0x8b, 0xab, 0x2f, 0xbd, 0x96, 0x1f, 0x69, 0xdb, 0x93, 0x78, 0xe0, 0xbe, 0x32, 0x26, 0xb9, 0x52, 0x9e, 0x63, 0x9a, 0xab,
0x87, 0xc6, 0x18, 0xd3, 0x94, 0xcf, 0x34, 0xb9, 0x86, 0x9a, 0xb9, 0xc4, 0xc9, 0xee, 0xba, 0xd6, 0x92, 0x8e, 0xc0, 0x1d, 0x63, 0x9a, 0xf2, 0x58, 0x93, 0x6b, 0xa8, 0xe5, 0x9b, 0x58, 0xdb, 0x7d,
0x62, 0xe8, 0x08, 0xea, 0xe3, 0x77, 0x54, 0xfc, 0x20, 0x12, 0x42, 0x73, 0xbd, 0xf9, 0x21, 0x44, 0xdb, 0x1a, 0x0e, 0x1d, 0x42, 0x7d, 0x3c, 0x47, 0xc5, 0x8f, 0x12, 0x8d, 0xc0, 0x7d, 0xc6, 0xb9,
0xf2, 0x00, 0xb0, 0xbe, 0x53, 0xf9, 0xd5, 0xbd, 0x6e, 0x14, 0x90, 0xc3, 0x2f, 0x0f, 0x1a, 0xd3, 0x88, 0xc5, 0x71, 0x5e, 0x08, 0x8d, 0xf5, 0xc2, 0xc7, 0x08, 0xc9, 0x3d, 0xc0, 0xfa, 0xf7, 0x28,
0xbc, 0x49, 0x9e, 0x01, 0xac, 0xb9, 0xc6, 0x7f, 0x45, 0xfc, 0x1f, 0xb6, 0x4b, 0x84, 0xb3, 0x2b, 0xaf, 0x7a, 0xf0, 0x12, 0x4b, 0xcc, 0xc1, 0xa7, 0x03, 0xee, 0xb4, 0x00, 0xc9, 0x13, 0x80, 0xc9,
0xe8, 0x6c, 0x75, 0x8a, 0x99, 0xa0, 0xff, 0xc8, 0x04, 0x9a, 0xa6, 0x62, 0xc4, 0x14, 0xe9, 0x6c, 0x44, 0x1e, 0x1b, 0x45, 0xbc, 0x6f, 0xb5, 0x0d, 0x92, 0xbd, 0x65, 0xbf, 0xbd, 0x83, 0x94, 0xa3,
0x6f, 0x51, 0x48, 0x54, 0x70, 0xb9, 0xab, 0xbd, 0x9e, 0x16, 0x43, 0x7b, 0x23, 0x0d, 0xa4, 0xb7, 0x44, 0xff, 0x90, 0x09, 0x34, 0xf2, 0x4e, 0x6e, 0xa6, 0x48, 0x7b, 0xf7, 0x14, 0xa5, 0x20, 0xfa,
0xc7, 0xee, 0x42, 0xac, 0x82, 0xab, 0x3f, 0x71, 0x2b, 0x8d, 0xf8, 0xbf, 0x4d, 0xfb, 0xe8, 0x3b, 0x97, 0xfb, 0xe0, 0xf5, 0xb4, 0x08, 0x5a, 0x1b, 0x21, 0x22, 0xdd, 0x03, 0x29, 0x29, 0xa5, 0xd1,
0x00, 0x00, 0xff, 0xff, 0xfb, 0xa1, 0x6b, 0xb0, 0x45, 0x03, 0x00, 0x00, 0xbf, 0xfa, 0x95, 0xb7, 0xf2, 0x88, 0xfe, 0x9a, 0x47, 0x32, 0xfc, 0x0a, 0x00, 0x00, 0xff, 0xff,
0x59, 0xcf, 0xab, 0xb5, 0x7c, 0x03, 0x00, 0x00,
} }

View File

@ -49,6 +49,12 @@ message Close {
Node node = 1; Node node = 1;
} }
// Solicit is sent when requesting route advertisement from the network nodes
message Solicit {
// network node
Node node = 1;
}
// Neighbour is used to nnounce node neighbourhood // Neighbour is used to nnounce node neighbourhood
message Neighbour { message Neighbour {
// network node // network node

View File

@ -120,6 +120,9 @@ func (r *router) manageRoute(route Route, action string) error {
if err := r.table.Delete(route); err != nil && err != ErrRouteNotFound { if err := r.table.Delete(route); err != nil && err != ErrRouteNotFound {
return fmt.Errorf("failed deleting route for service %s: %s", route.Service, err) return fmt.Errorf("failed deleting route for service %s: %s", route.Service, err)
} }
case "solicit":
// nothing to do here
return nil
default: default:
return fmt.Errorf("failed to manage route for service %s. Unknown action: %s", route.Service, action) return fmt.Errorf("failed to manage route for service %s. Unknown action: %s", route.Service, action)
} }
@ -602,21 +605,10 @@ func (r *router) Advertise() (<-chan *Advert, error) {
r.subscribers[uuid.New().String()] = advertChan r.subscribers[uuid.New().String()] = advertChan
return advertChan, nil return advertChan, nil
case Running: case Running:
// list routing table routes to announce // list all the routes and pack them into even slice to advertise
routes, err := r.table.List() events, err := r.flushRouteEvents(Create)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed listing routes: %s", err) return nil, fmt.Errorf("failed to flush routes: %s", err)
}
// collect all the added routes before we attempt to add default gateway
events := make([]*Event, len(routes))
for i, route := range routes {
event := &Event{
Type: Create,
Timestamp: time.Now(),
Route: route,
}
events[i] = event
} }
// create event channels // create event channels
@ -687,6 +679,43 @@ func (r *router) Process(a *Advert) error {
return nil return nil
} }
// flushRouteEvents returns a slice of events, one per each route in the routing table
func (r *router) flushRouteEvents(evType EventType) ([]*Event, error) {
// list all routes
routes, err := r.table.List()
if err != nil {
return nil, fmt.Errorf("failed listing routes: %s", err)
}
// build a list of events to advertise
events := make([]*Event, len(routes))
for i, route := range routes {
event := &Event{
Type: evType,
Timestamp: time.Now(),
Route: route,
}
events[i] = event
}
return events, nil
}
// Solicit advertises all of its routes to the network
// It returns error if the router fails to list the routes
func (r *router) Solicit() error {
events, err := r.flushRouteEvents(Update)
if err != nil {
return fmt.Errorf("failed solicit routes: %s", err)
}
// advertise the routes
r.advertWg.Add(1)
go r.publishAdvert(RouteUpdate, events)
return nil
}
// Lookup routes in the routing table // Lookup routes in the routing table
func (r *router) Lookup(q Query) ([]Route, error) { func (r *router) Lookup(q Query) ([]Route, error) {
return r.table.Query(q) return r.table.Query(q)

View File

@ -7,6 +7,7 @@ service Router {
rpc Lookup(LookupRequest) returns (LookupResponse) {}; rpc Lookup(LookupRequest) returns (LookupResponse) {};
rpc Watch(WatchRequest) returns (stream Event) {}; rpc Watch(WatchRequest) returns (stream Event) {};
rpc Advertise(Request) returns (stream Advert) {}; rpc Advertise(Request) returns (stream Advert) {};
rpc Solicit(Request) returns (Response) {};
rpc Process(Advert) returns (ProcessResponse) {}; rpc Process(Advert) returns (ProcessResponse) {};
rpc Status(Request) returns (StatusResponse) {}; rpc Status(Request) returns (StatusResponse) {};
} }
@ -22,6 +23,9 @@ service Table {
// Empty request // Empty request
message Request {} message Request {}
// Empty response
message Response {}
// ListResponse is returned by List // ListResponse is returned by List
message ListResponse { message ListResponse {
repeated Route routes = 1; repeated Route routes = 1;

View File

@ -28,6 +28,8 @@ type Router interface {
Advertise() (<-chan *Advert, error) Advertise() (<-chan *Advert, error)
// Process processes incoming adverts // Process processes incoming adverts
Process(*Advert) error Process(*Advert) error
// Solicit advertises the whole routing table to the network
Solicit() error
// Lookup queries routes in the routing table // Lookup queries routes in the routing table
Lookup(Query) ([]Route, error) Lookup(Query) ([]Route, error)
// Watch returns a watcher which tracks updates to the routing table // Watch returns a watcher which tracks updates to the routing table

View File

@ -220,6 +220,42 @@ func (s *svc) Process(advert *router.Advert) error {
return nil return nil
} }
// Solicit advertise all routes
func (s *svc) Solicit() error {
// list all the routes
routes, err := s.table.List()
if err != nil {
return err
}
// build events to advertise
events := make([]*router.Event, len(routes))
for i, _ := range events {
events[i] = &router.Event{
Type: router.Update,
Timestamp: time.Now(),
Route: routes[i],
}
}
advert := &router.Advert{
Id: s.opts.Id,
Type: router.RouteUpdate,
Timestamp: time.Now(),
TTL: time.Duration(router.DefaultAdvertTTL),
Events: events,
}
select {
case s.advertChan <- advert:
case <-s.exit:
close(s.advertChan)
return nil
}
return nil
}
// Status returns router status // Status returns router status
func (s *svc) Status() router.Status { func (s *svc) Status() router.Status {
s.Lock() s.Lock()