Send solicit message when new neighbour is discovered

This commit is contained in:
Milos Gajdos
2019-09-05 16:04:44 +01:00
parent 9161b20d6b
commit 2522d8cb96
6 changed files with 236 additions and 74 deletions

View File

@@ -275,12 +275,12 @@ func (n *network) acceptNetConn(l tunnel.Listener, recv chan *transport.Message)
}
// processNetChan processes messages received on NetworkChannel
func (n *network) processNetChan(l tunnel.Listener) {
func (n *network) processNetChan(client transport.Client, listener tunnel.Listener) {
// receive network message queue
recv := make(chan *transport.Message, 128)
// accept NetworkChannel connections
go n.acceptNetConn(l, recv)
go n.acceptNetConn(listener, recv)
for {
select {
@@ -319,6 +319,14 @@ func (n *network) processNetChan(l tunnel.Listener) {
lastSeen: now,
}
n.Unlock()
// advertise the new neighbour to the network
if err := n.advertiseNeighbours(client); err != nil {
log.Debugf("Network failed to advertise neighbours: %v", err)
}
// advertise all the routes when a new node has connected
if err := n.Router.Solicit(); err != nil {
log.Debugf("Network failed to solicit routes: %s", err)
}
case "neighbour":
// mark the time the message has been received
now := time.Now()
@@ -341,6 +349,29 @@ func (n *network) processNetChan(l tunnel.Listener) {
neighbours: make(map[string]*node),
lastSeen: now,
}
// send a solicit message when discovering a new node
node := &pbNet.Node{
Id: n.options.Id,
Address: n.options.Address,
}
pbNetSolicit := &pbNet.Solicit{
Node: node,
}
if body, err := proto.Marshal(pbNetSolicit); err == nil {
// create transport message and chuck it down the pipe
m := transport.Message{
Header: map[string]string{
"Micro-Method": "solicit",
},
Body: body,
}
log.Debugf("Network sending solicit message from: %s", node.Id)
if err := client.Send(&m); err != nil {
log.Debugf("Network failed to send solicit messsage: %v", err)
}
}
}
// update lastSeen timestamp
if n.neighbours[pbNetNeighbour.Node.Id].lastSeen.Before(now) {
@@ -383,6 +414,52 @@ func (n *network) processNetChan(l tunnel.Listener) {
}
}
// advertiseNeighbours sends a neighbour message to the network
func (n *network) advertiseNeighbours(client transport.Client) error {
n.RLock()
nodes := make([]*pbNet.Node, len(n.neighbours))
i := 0
for id, _ := range n.neighbours {
nodes[i] = &pbNet.Node{
Id: id,
Address: n.neighbours[id].address,
}
i++
}
n.RUnlock()
node := &pbNet.Node{
Id: n.options.Id,
Address: n.options.Address,
}
pbNetNeighbour := &pbNet.Neighbour{
Node: node,
Neighbours: nodes,
}
body, err := proto.Marshal(pbNetNeighbour)
if err != nil {
// TODO: should we bail here?
log.Debugf("Network failed to marshal neighbour message: %v", err)
return err
}
// create transport message and chuck it down the pipe
m := transport.Message{
Header: map[string]string{
"Micro-Method": "neighbour",
},
Body: body,
}
log.Debugf("Network sending neighbour message from: %s", node.Id)
if err := client.Send(&m); err != nil {
log.Debugf("Network failed to send neighbour messsage: %v", err)
return err
}
return nil
}
// announce announces node neighbourhood to the network
func (n *network) announce(client transport.Client) {
announce := time.NewTicker(AnnounceTime)
@@ -393,44 +470,9 @@ func (n *network) announce(client transport.Client) {
case <-n.closed:
return
case <-announce.C:
n.RLock()
nodes := make([]*pbNet.Node, len(n.neighbours))
i := 0
for id, _ := range n.neighbours {
nodes[i] = &pbNet.Node{
Id: id,
Address: n.neighbours[id].address,
}
i++
}
n.RUnlock()
node := &pbNet.Node{
Id: n.options.Id,
Address: n.options.Address,
}
pbNetNeighbour := &pbNet.Neighbour{
Node: node,
Neighbours: nodes,
}
body, err := proto.Marshal(pbNetNeighbour)
if err != nil {
// TODO: should we bail here?
log.Debugf("Network failed to marshal neighbour message: %v", err)
continue
}
// create transport message and chuck it down the pipe
m := transport.Message{
Header: map[string]string{
"Micro-Method": "neighbour",
},
Body: body,
}
log.Debugf("Network sending neighbour message from: %s", node.Id)
if err := client.Send(&m); err != nil {
log.Debugf("Network failed to send neighbour messsage: %v", err)
// advertise yourself to the network
if err := n.advertiseNeighbours(client); err != nil {
log.Debugf("Network failed to advertise neighbours: %v", err)
continue
}
}
@@ -565,12 +607,12 @@ func (n *network) setRouteMetric(route *router.Route) {
}
// processCtrlChan processes messages received on ControlChannel
func (n *network) processCtrlChan(l tunnel.Listener) {
func (n *network) processCtrlChan(client transport.Client, listener tunnel.Listener) {
// receive control message queue
recv := make(chan *transport.Message, 128)
// accept ControlChannel cconnections
go n.acceptCtrlConn(l, recv)
go n.acceptCtrlConn(listener, recv)
for {
select {
@@ -601,6 +643,29 @@ func (n *network) processCtrlChan(l tunnel.Listener) {
lastSeen: now,
}
n.neighbours[pbRtrAdvert.Id] = advertNode
// send a solicit message when discovering a new node
node := &pbNet.Node{
Id: n.options.Id,
Address: n.options.Address,
}
pbNetSolicit := &pbNet.Solicit{
Node: node,
}
if body, err := proto.Marshal(pbNetSolicit); err == nil {
// create transport message and chuck it down the pipe
m := transport.Message{
Header: map[string]string{
"Micro-Method": "solicit",
},
Body: body,
}
log.Debugf("Network sending solicit message from: %s", node.Id)
if err := client.Send(&m); err != nil {
log.Debugf("Network failed to send solicit messsage: %v", err)
}
}
}
n.RUnlock()
@@ -657,6 +722,11 @@ func (n *network) processCtrlChan(l tunnel.Listener) {
log.Debugf("Network failed to process advert %s: %v", advert.Id, err)
continue
}
case "solicit":
// advertise all the routes when a new node has connected
if err := n.Router.Solicit(); err != nil {
log.Debugf("Network failed to solicit routes: %s", err)
}
}
case <-n.closed:
return
@@ -828,11 +898,11 @@ func (n *network) Connect() error {
// prune stale nodes
go n.prune()
// listen to network messages
go n.processNetChan(netListener)
go n.processNetChan(netClient, netListener)
// advertise service routes
go n.advertise(ctrlClient, advertChan)
// accept and process routes
go n.processCtrlChan(ctrlListener)
go n.processCtrlChan(ctrlClient, ctrlListener)
// set connected to true
n.connected = true

View File

@@ -305,6 +305,47 @@ func (m *Close) GetNode() *Node {
return nil
}
// Solicit is sent when requesting route advertisement from the network nodes
type Solicit struct {
// network node
Node *Node `protobuf:"bytes,1,opt,name=node,proto3" json:"node,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Solicit) Reset() { *m = Solicit{} }
func (m *Solicit) String() string { return proto.CompactTextString(m) }
func (*Solicit) ProtoMessage() {}
func (*Solicit) Descriptor() ([]byte, []int) {
return fileDescriptor_8571034d60397816, []int{7}
}
func (m *Solicit) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Solicit.Unmarshal(m, b)
}
func (m *Solicit) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Solicit.Marshal(b, m, deterministic)
}
func (m *Solicit) XXX_Merge(src proto.Message) {
xxx_messageInfo_Solicit.Merge(m, src)
}
func (m *Solicit) XXX_Size() int {
return xxx_messageInfo_Solicit.Size(m)
}
func (m *Solicit) XXX_DiscardUnknown() {
xxx_messageInfo_Solicit.DiscardUnknown(m)
}
var xxx_messageInfo_Solicit proto.InternalMessageInfo
func (m *Solicit) GetNode() *Node {
if m != nil {
return m.Node
}
return nil
}
// Neighbour is used to nnounce node neighbourhood
type Neighbour struct {
// network node
@@ -320,7 +361,7 @@ func (m *Neighbour) Reset() { *m = Neighbour{} }
func (m *Neighbour) String() string { return proto.CompactTextString(m) }
func (*Neighbour) ProtoMessage() {}
func (*Neighbour) Descriptor() ([]byte, []int) {
return fileDescriptor_8571034d60397816, []int{7}
return fileDescriptor_8571034d60397816, []int{8}
}
func (m *Neighbour) XXX_Unmarshal(b []byte) error {
@@ -363,33 +404,35 @@ func init() {
proto.RegisterType((*Node)(nil), "go.micro.network.Node")
proto.RegisterType((*Connect)(nil), "go.micro.network.Connect")
proto.RegisterType((*Close)(nil), "go.micro.network.Close")
proto.RegisterType((*Solicit)(nil), "go.micro.network.Solicit")
proto.RegisterType((*Neighbour)(nil), "go.micro.network.Neighbour")
}
func init() { proto.RegisterFile("network.proto", fileDescriptor_8571034d60397816) }
var fileDescriptor_8571034d60397816 = []byte{
// 348 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x52, 0x41, 0x4f, 0x32, 0x31,
0x10, 0xfd, 0x58, 0xe0, 0x23, 0x0c, 0x62, 0x4c, 0xa3, 0x66, 0xb3, 0x06, 0x43, 0x7a, 0x40, 0x62,
0x74, 0x31, 0x10, 0x3d, 0x79, 0x31, 0x1c, 0xbc, 0x10, 0x0e, 0x7b, 0xf4, 0xe6, 0xd2, 0x66, 0x69,
0x94, 0x1d, 0x6c, 0xbb, 0xf1, 0x0f, 0xf8, 0xc3, 0x4d, 0xbb, 0x05, 0x97, 0x45, 0x30, 0xdc, 0xda,
0x99, 0xf7, 0xe6, 0xcd, 0xe4, 0x3d, 0x68, 0xa7, 0x5c, 0x7f, 0xa2, 0x7c, 0x0b, 0x97, 0x12, 0x35,
0x92, 0x93, 0x04, 0xc3, 0x85, 0x98, 0x49, 0x0c, 0x5d, 0x3d, 0x18, 0x25, 0x42, 0xcf, 0xb3, 0x38,
0x9c, 0xe1, 0x62, 0x60, 0x3b, 0x83, 0x04, 0x6f, 0xf3, 0x87, 0xc4, 0x4c, 0x73, 0x39, 0xb0, 0x4c,
0xf7, 0xc9, 0xc7, 0xd0, 0x36, 0xb4, 0x26, 0x42, 0xe9, 0x88, 0x7f, 0x64, 0x5c, 0x69, 0xfa, 0x08,
0x47, 0xf9, 0x57, 0x2d, 0x31, 0x55, 0x9c, 0xdc, 0x40, 0x3d, 0x45, 0xc6, 0x95, 0x5f, 0xe9, 0x56,
0xfb, 0xad, 0xe1, 0x79, 0x58, 0x56, 0x0d, 0xa7, 0xc8, 0x78, 0x94, 0x83, 0x68, 0x0f, 0x4e, 0xa7,
0x5c, 0x24, 0xf3, 0x18, 0x33, 0x39, 0x47, 0x64, 0x6e, 0x2a, 0x39, 0x06, 0x4f, 0x30, 0xbf, 0xd2,
0xad, 0xf4, 0x9b, 0x91, 0x27, 0x18, 0x7d, 0x81, 0xb3, 0x12, 0xce, 0xc9, 0x3d, 0x99, 0x2b, 0x0b,
0x0d, 0xcb, 0x69, 0x0d, 0x2f, 0x7e, 0x91, 0x5d, 0xc1, 0xa2, 0x4d, 0x06, 0xbd, 0x83, 0x9a, 0x59,
0xa9, 0xac, 0x49, 0x7c, 0x68, 0xbc, 0x32, 0x26, 0xb9, 0x52, 0xbe, 0x67, 0x8b, 0xab, 0x2f, 0xbd,
0x87, 0xc6, 0x18, 0xd3, 0x94, 0xcf, 0x34, 0xb9, 0x86, 0x9a, 0xb9, 0xc4, 0xc9, 0xee, 0xba, 0xd6,
0x62, 0xe8, 0x08, 0xea, 0xe3, 0x77, 0x54, 0xfc, 0x20, 0x12, 0x42, 0x73, 0xbd, 0xf9, 0x21, 0x44,
0xf2, 0x00, 0xb0, 0xbe, 0x53, 0xf9, 0xd5, 0xbd, 0x6e, 0x14, 0x90, 0xc3, 0x2f, 0x0f, 0x1a, 0xd3,
0xbc, 0x49, 0x9e, 0x01, 0xac, 0xb9, 0xc6, 0x7f, 0x45, 0xfc, 0x1f, 0xb6, 0x4b, 0x84, 0xb3, 0x2b,
0xe8, 0x6c, 0x75, 0x8a, 0x99, 0xa0, 0xff, 0xc8, 0x04, 0x9a, 0xa6, 0x62, 0xc4, 0x14, 0xe9, 0x6c,
0x6f, 0x51, 0x48, 0x54, 0x70, 0xb9, 0xab, 0xbd, 0x9e, 0x16, 0x43, 0x7b, 0x23, 0x0d, 0xa4, 0xb7,
0xc7, 0xee, 0x42, 0xac, 0x82, 0xab, 0x3f, 0x71, 0x2b, 0x8d, 0xf8, 0xbf, 0x4d, 0xfb, 0xe8, 0x3b,
0x00, 0x00, 0xff, 0xff, 0xfb, 0xa1, 0x6b, 0xb0, 0x45, 0x03, 0x00, 0x00,
// 360 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x53, 0x41, 0x4f, 0xf2, 0x40,
0x10, 0xfd, 0x28, 0xf0, 0x35, 0x0c, 0x1f, 0x5f, 0xcc, 0x46, 0x4d, 0x53, 0x83, 0x21, 0x7b, 0x40,
0x62, 0xb4, 0x18, 0x08, 0x9e, 0xbc, 0x18, 0x0e, 0x5e, 0x08, 0x87, 0x7a, 0xf3, 0x66, 0xbb, 0x9b,
0xb2, 0x11, 0x3a, 0xb8, 0xbb, 0x8d, 0x7f, 0xc0, 0x1f, 0x6e, 0xba, 0x5d, 0xb0, 0x80, 0x60, 0xb8,
0x75, 0xe6, 0xbd, 0x37, 0x6f, 0xa7, 0xfb, 0x16, 0x5a, 0x29, 0xd7, 0x1f, 0x28, 0xdf, 0x82, 0xa5,
0x44, 0x8d, 0xe4, 0x24, 0xc1, 0x60, 0x21, 0x62, 0x89, 0x81, 0xed, 0xfb, 0xc3, 0x44, 0xe8, 0x59,
0x16, 0x05, 0x31, 0x2e, 0xfa, 0x06, 0xe9, 0x27, 0x78, 0x5b, 0x7c, 0x48, 0xcc, 0x34, 0x97, 0x7d,
0xa3, 0xb4, 0x45, 0x31, 0x86, 0xb6, 0xa0, 0x39, 0x11, 0x4a, 0x87, 0xfc, 0x3d, 0xe3, 0x4a, 0xd3,
0x07, 0xf8, 0x57, 0x94, 0x6a, 0x89, 0xa9, 0xe2, 0xe4, 0x06, 0xea, 0x29, 0x32, 0xae, 0xbc, 0x4a,
0xa7, 0xda, 0x6b, 0x0e, 0xce, 0x83, 0x6d, 0xd7, 0x60, 0x8a, 0x8c, 0x87, 0x05, 0x89, 0x76, 0xe1,
0x74, 0xca, 0x45, 0x32, 0x8b, 0x30, 0x93, 0x33, 0x44, 0x66, 0xa7, 0x92, 0xff, 0xe0, 0x08, 0xe6,
0x55, 0x3a, 0x95, 0x5e, 0x23, 0x74, 0x04, 0xa3, 0x2f, 0x70, 0xb6, 0xc5, 0xb3, 0x76, 0x8f, 0xf9,
0x96, 0x25, 0xc0, 0x68, 0x9a, 0x83, 0x8b, 0x1f, 0x6c, 0x57, 0xb4, 0x70, 0x53, 0x41, 0xef, 0xa0,
0x96, 0x1f, 0x69, 0xdb, 0x93, 0x78, 0xe0, 0xbe, 0x32, 0x26, 0xb9, 0x52, 0x9e, 0x63, 0x9a, 0xab,
0x92, 0x8e, 0xc0, 0x1d, 0x63, 0x9a, 0xf2, 0x58, 0x93, 0x6b, 0xa8, 0xe5, 0x9b, 0x58, 0xdb, 0x7d,
0xdb, 0x1a, 0x0e, 0x1d, 0x42, 0x7d, 0x3c, 0x47, 0xc5, 0x8f, 0x12, 0x8d, 0xc0, 0x7d, 0xc6, 0xb9,
0x88, 0xc5, 0x71, 0x5e, 0x08, 0x8d, 0xf5, 0xc2, 0xc7, 0x08, 0xc9, 0x3d, 0xc0, 0xfa, 0xf7, 0x28,
0xaf, 0x7a, 0xf0, 0x12, 0x4b, 0xcc, 0xc1, 0xa7, 0x03, 0xee, 0xb4, 0x00, 0xc9, 0x13, 0x80, 0xc9,
0x44, 0x1e, 0x1b, 0x45, 0xbc, 0x6f, 0xb5, 0x0d, 0x92, 0xbd, 0x65, 0xbf, 0xbd, 0x83, 0x94, 0xa3,
0x44, 0xff, 0x90, 0x09, 0x34, 0xf2, 0x4e, 0x6e, 0xa6, 0x48, 0x7b, 0xf7, 0x14, 0xa5, 0x20, 0xfa,
0x97, 0xfb, 0xe0, 0xf5, 0xb4, 0x08, 0x5a, 0x1b, 0x21, 0x22, 0xdd, 0x03, 0x29, 0x29, 0xa5, 0xd1,
0xbf, 0xfa, 0x95, 0xb7, 0xf2, 0x88, 0xfe, 0x9a, 0x47, 0x32, 0xfc, 0x0a, 0x00, 0x00, 0xff, 0xff,
0x59, 0xcf, 0xab, 0xb5, 0x7c, 0x03, 0x00, 0x00,
}

View File

@@ -49,6 +49,12 @@ message Close {
Node node = 1;
}
// Solicit is sent when requesting route advertisement from the network nodes
message Solicit {
// network node
Node node = 1;
}
// Neighbour is used to nnounce node neighbourhood
message Neighbour {
// network node