From 9161b20d6b211ddbbfeead8272fc556a5f66c76e Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Thu, 5 Sep 2019 13:23:33 +0100 Subject: [PATCH 1/6] Add Solicit method to router interface When calling Solicit, router lists all the routes and advertise them straight away --- router/default.go | 54 +++++++++++++++++++++++++++++++++++------------ router/router.go | 2 ++ 2 files changed, 42 insertions(+), 14 deletions(-) diff --git a/router/default.go b/router/default.go index 05e033ce..1fa8161a 100644 --- a/router/default.go +++ b/router/default.go @@ -602,21 +602,10 @@ func (r *router) Advertise() (<-chan *Advert, error) { r.subscribers[uuid.New().String()] = advertChan return advertChan, nil case Running: - // list routing table routes to announce - routes, err := r.table.List() + // list all the routes and pack them into even slice to advertise + events, err := r.flushRouteEvents() if err != nil { - return nil, fmt.Errorf("failed listing routes: %s", err) - } - - // collect all the added routes before we attempt to add default gateway - events := make([]*Event, len(routes)) - for i, route := range routes { - event := &Event{ - Type: Create, - Timestamp: time.Now(), - Route: route, - } - events[i] = event + return nil, fmt.Errorf("failed to advertise routes: %s", err) } // create event channels @@ -687,6 +676,43 @@ func (r *router) Process(a *Advert) error { return nil } +// flushRouteEvents lists all the routes and builds a slice of events +func (r *router) flushRouteEvents() ([]*Event, error) { + // list all routes + routes, err := r.table.List() + if err != nil { + return nil, fmt.Errorf("failed listing routes: %s", err) + } + + // build a list of events to advertise + events := make([]*Event, len(routes)) + for i, route := range routes { + event := &Event{ + Type: Create, + Timestamp: time.Now(), + Route: route, + } + events[i] = event + } + + return events, nil +} + +// Solicit advertises all of its routes to the network +// It returns error if the router fails to list the routes +func (r *router) Solicit() error { + events, err := r.flushRouteEvents() + if err != nil { + return fmt.Errorf("failed solicit routes: %s", err) + } + + // advertise the routes + r.advertWg.Add(1) + go r.publishAdvert(RouteUpdate, events) + + return nil +} + // Lookup routes in the routing table func (r *router) Lookup(q Query) ([]Route, error) { return r.table.Query(q) diff --git a/router/router.go b/router/router.go index 8b6618d1..224ef124 100644 --- a/router/router.go +++ b/router/router.go @@ -28,6 +28,8 @@ type Router interface { Advertise() (<-chan *Advert, error) // Process processes incoming adverts Process(*Advert) error + // Solicit advertises the whole routing table ot the network + Solicit() error // Lookup queries routes in the routing table Lookup(Query) ([]Route, error) // Watch returns a watcher which tracks updates to the routing table From 2522d8cb961ce34e3ca04fa9f484ef6556b14364 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Thu, 5 Sep 2019 16:04:44 +0100 Subject: [PATCH 2/6] Send solicit message when new neighbour is discovered --- network/default.go | 158 ++++++++++++++++++++++++++---------- network/proto/network.pb.go | 91 +++++++++++++++------ network/proto/network.proto | 6 ++ router/default.go | 15 ++-- router/proto/router.proto | 4 + router/service/service.go | 36 ++++++++ 6 files changed, 236 insertions(+), 74 deletions(-) diff --git a/network/default.go b/network/default.go index 94ee1d25..75731c43 100644 --- a/network/default.go +++ b/network/default.go @@ -275,12 +275,12 @@ func (n *network) acceptNetConn(l tunnel.Listener, recv chan *transport.Message) } // processNetChan processes messages received on NetworkChannel -func (n *network) processNetChan(l tunnel.Listener) { +func (n *network) processNetChan(client transport.Client, listener tunnel.Listener) { // receive network message queue recv := make(chan *transport.Message, 128) // accept NetworkChannel connections - go n.acceptNetConn(l, recv) + go n.acceptNetConn(listener, recv) for { select { @@ -319,6 +319,14 @@ func (n *network) processNetChan(l tunnel.Listener) { lastSeen: now, } n.Unlock() + // advertise the new neighbour to the network + if err := n.advertiseNeighbours(client); err != nil { + log.Debugf("Network failed to advertise neighbours: %v", err) + } + // advertise all the routes when a new node has connected + if err := n.Router.Solicit(); err != nil { + log.Debugf("Network failed to solicit routes: %s", err) + } case "neighbour": // mark the time the message has been received now := time.Now() @@ -341,6 +349,29 @@ func (n *network) processNetChan(l tunnel.Listener) { neighbours: make(map[string]*node), lastSeen: now, } + // send a solicit message when discovering a new node + node := &pbNet.Node{ + Id: n.options.Id, + Address: n.options.Address, + } + pbNetSolicit := &pbNet.Solicit{ + Node: node, + } + + if body, err := proto.Marshal(pbNetSolicit); err == nil { + // create transport message and chuck it down the pipe + m := transport.Message{ + Header: map[string]string{ + "Micro-Method": "solicit", + }, + Body: body, + } + + log.Debugf("Network sending solicit message from: %s", node.Id) + if err := client.Send(&m); err != nil { + log.Debugf("Network failed to send solicit messsage: %v", err) + } + } } // update lastSeen timestamp if n.neighbours[pbNetNeighbour.Node.Id].lastSeen.Before(now) { @@ -383,6 +414,52 @@ func (n *network) processNetChan(l tunnel.Listener) { } } +// advertiseNeighbours sends a neighbour message to the network +func (n *network) advertiseNeighbours(client transport.Client) error { + n.RLock() + nodes := make([]*pbNet.Node, len(n.neighbours)) + i := 0 + for id, _ := range n.neighbours { + nodes[i] = &pbNet.Node{ + Id: id, + Address: n.neighbours[id].address, + } + i++ + } + n.RUnlock() + + node := &pbNet.Node{ + Id: n.options.Id, + Address: n.options.Address, + } + pbNetNeighbour := &pbNet.Neighbour{ + Node: node, + Neighbours: nodes, + } + + body, err := proto.Marshal(pbNetNeighbour) + if err != nil { + // TODO: should we bail here? + log.Debugf("Network failed to marshal neighbour message: %v", err) + return err + } + // create transport message and chuck it down the pipe + m := transport.Message{ + Header: map[string]string{ + "Micro-Method": "neighbour", + }, + Body: body, + } + + log.Debugf("Network sending neighbour message from: %s", node.Id) + if err := client.Send(&m); err != nil { + log.Debugf("Network failed to send neighbour messsage: %v", err) + return err + } + + return nil +} + // announce announces node neighbourhood to the network func (n *network) announce(client transport.Client) { announce := time.NewTicker(AnnounceTime) @@ -393,44 +470,9 @@ func (n *network) announce(client transport.Client) { case <-n.closed: return case <-announce.C: - n.RLock() - nodes := make([]*pbNet.Node, len(n.neighbours)) - i := 0 - for id, _ := range n.neighbours { - nodes[i] = &pbNet.Node{ - Id: id, - Address: n.neighbours[id].address, - } - i++ - } - n.RUnlock() - - node := &pbNet.Node{ - Id: n.options.Id, - Address: n.options.Address, - } - pbNetNeighbour := &pbNet.Neighbour{ - Node: node, - Neighbours: nodes, - } - - body, err := proto.Marshal(pbNetNeighbour) - if err != nil { - // TODO: should we bail here? - log.Debugf("Network failed to marshal neighbour message: %v", err) - continue - } - // create transport message and chuck it down the pipe - m := transport.Message{ - Header: map[string]string{ - "Micro-Method": "neighbour", - }, - Body: body, - } - - log.Debugf("Network sending neighbour message from: %s", node.Id) - if err := client.Send(&m); err != nil { - log.Debugf("Network failed to send neighbour messsage: %v", err) + // advertise yourself to the network + if err := n.advertiseNeighbours(client); err != nil { + log.Debugf("Network failed to advertise neighbours: %v", err) continue } } @@ -565,12 +607,12 @@ func (n *network) setRouteMetric(route *router.Route) { } // processCtrlChan processes messages received on ControlChannel -func (n *network) processCtrlChan(l tunnel.Listener) { +func (n *network) processCtrlChan(client transport.Client, listener tunnel.Listener) { // receive control message queue recv := make(chan *transport.Message, 128) // accept ControlChannel cconnections - go n.acceptCtrlConn(l, recv) + go n.acceptCtrlConn(listener, recv) for { select { @@ -601,6 +643,29 @@ func (n *network) processCtrlChan(l tunnel.Listener) { lastSeen: now, } n.neighbours[pbRtrAdvert.Id] = advertNode + // send a solicit message when discovering a new node + node := &pbNet.Node{ + Id: n.options.Id, + Address: n.options.Address, + } + pbNetSolicit := &pbNet.Solicit{ + Node: node, + } + + if body, err := proto.Marshal(pbNetSolicit); err == nil { + // create transport message and chuck it down the pipe + m := transport.Message{ + Header: map[string]string{ + "Micro-Method": "solicit", + }, + Body: body, + } + + log.Debugf("Network sending solicit message from: %s", node.Id) + if err := client.Send(&m); err != nil { + log.Debugf("Network failed to send solicit messsage: %v", err) + } + } } n.RUnlock() @@ -657,6 +722,11 @@ func (n *network) processCtrlChan(l tunnel.Listener) { log.Debugf("Network failed to process advert %s: %v", advert.Id, err) continue } + case "solicit": + // advertise all the routes when a new node has connected + if err := n.Router.Solicit(); err != nil { + log.Debugf("Network failed to solicit routes: %s", err) + } } case <-n.closed: return @@ -828,11 +898,11 @@ func (n *network) Connect() error { // prune stale nodes go n.prune() // listen to network messages - go n.processNetChan(netListener) + go n.processNetChan(netClient, netListener) // advertise service routes go n.advertise(ctrlClient, advertChan) // accept and process routes - go n.processCtrlChan(ctrlListener) + go n.processCtrlChan(ctrlClient, ctrlListener) // set connected to true n.connected = true diff --git a/network/proto/network.pb.go b/network/proto/network.pb.go index 74124324..dc6c0d48 100644 --- a/network/proto/network.pb.go +++ b/network/proto/network.pb.go @@ -305,6 +305,47 @@ func (m *Close) GetNode() *Node { return nil } +// Solicit is sent when requesting route advertisement from the network nodes +type Solicit struct { + // network node + Node *Node `protobuf:"bytes,1,opt,name=node,proto3" json:"node,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Solicit) Reset() { *m = Solicit{} } +func (m *Solicit) String() string { return proto.CompactTextString(m) } +func (*Solicit) ProtoMessage() {} +func (*Solicit) Descriptor() ([]byte, []int) { + return fileDescriptor_8571034d60397816, []int{7} +} + +func (m *Solicit) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Solicit.Unmarshal(m, b) +} +func (m *Solicit) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Solicit.Marshal(b, m, deterministic) +} +func (m *Solicit) XXX_Merge(src proto.Message) { + xxx_messageInfo_Solicit.Merge(m, src) +} +func (m *Solicit) XXX_Size() int { + return xxx_messageInfo_Solicit.Size(m) +} +func (m *Solicit) XXX_DiscardUnknown() { + xxx_messageInfo_Solicit.DiscardUnknown(m) +} + +var xxx_messageInfo_Solicit proto.InternalMessageInfo + +func (m *Solicit) GetNode() *Node { + if m != nil { + return m.Node + } + return nil +} + // Neighbour is used to nnounce node neighbourhood type Neighbour struct { // network node @@ -320,7 +361,7 @@ func (m *Neighbour) Reset() { *m = Neighbour{} } func (m *Neighbour) String() string { return proto.CompactTextString(m) } func (*Neighbour) ProtoMessage() {} func (*Neighbour) Descriptor() ([]byte, []int) { - return fileDescriptor_8571034d60397816, []int{7} + return fileDescriptor_8571034d60397816, []int{8} } func (m *Neighbour) XXX_Unmarshal(b []byte) error { @@ -363,33 +404,35 @@ func init() { proto.RegisterType((*Node)(nil), "go.micro.network.Node") proto.RegisterType((*Connect)(nil), "go.micro.network.Connect") proto.RegisterType((*Close)(nil), "go.micro.network.Close") + proto.RegisterType((*Solicit)(nil), "go.micro.network.Solicit") proto.RegisterType((*Neighbour)(nil), "go.micro.network.Neighbour") } func init() { proto.RegisterFile("network.proto", fileDescriptor_8571034d60397816) } var fileDescriptor_8571034d60397816 = []byte{ - // 348 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x52, 0x41, 0x4f, 0x32, 0x31, - 0x10, 0xfd, 0x58, 0xe0, 0x23, 0x0c, 0x62, 0x4c, 0xa3, 0x66, 0xb3, 0x06, 0x43, 0x7a, 0x40, 0x62, - 0x74, 0x31, 0x10, 0x3d, 0x79, 0x31, 0x1c, 0xbc, 0x10, 0x0e, 0x7b, 0xf4, 0xe6, 0xd2, 0x66, 0x69, - 0x94, 0x1d, 0x6c, 0xbb, 0xf1, 0x0f, 0xf8, 0xc3, 0x4d, 0xbb, 0x05, 0x97, 0x45, 0x30, 0xdc, 0xda, - 0x99, 0xf7, 0xe6, 0xcd, 0xe4, 0x3d, 0x68, 0xa7, 0x5c, 0x7f, 0xa2, 0x7c, 0x0b, 0x97, 0x12, 0x35, - 0x92, 0x93, 0x04, 0xc3, 0x85, 0x98, 0x49, 0x0c, 0x5d, 0x3d, 0x18, 0x25, 0x42, 0xcf, 0xb3, 0x38, - 0x9c, 0xe1, 0x62, 0x60, 0x3b, 0x83, 0x04, 0x6f, 0xf3, 0x87, 0xc4, 0x4c, 0x73, 0x39, 0xb0, 0x4c, - 0xf7, 0xc9, 0xc7, 0xd0, 0x36, 0xb4, 0x26, 0x42, 0xe9, 0x88, 0x7f, 0x64, 0x5c, 0x69, 0xfa, 0x08, - 0x47, 0xf9, 0x57, 0x2d, 0x31, 0x55, 0x9c, 0xdc, 0x40, 0x3d, 0x45, 0xc6, 0x95, 0x5f, 0xe9, 0x56, - 0xfb, 0xad, 0xe1, 0x79, 0x58, 0x56, 0x0d, 0xa7, 0xc8, 0x78, 0x94, 0x83, 0x68, 0x0f, 0x4e, 0xa7, - 0x5c, 0x24, 0xf3, 0x18, 0x33, 0x39, 0x47, 0x64, 0x6e, 0x2a, 0x39, 0x06, 0x4f, 0x30, 0xbf, 0xd2, - 0xad, 0xf4, 0x9b, 0x91, 0x27, 0x18, 0x7d, 0x81, 0xb3, 0x12, 0xce, 0xc9, 0x3d, 0x99, 0x2b, 0x0b, - 0x0d, 0xcb, 0x69, 0x0d, 0x2f, 0x7e, 0x91, 0x5d, 0xc1, 0xa2, 0x4d, 0x06, 0xbd, 0x83, 0x9a, 0x59, - 0xa9, 0xac, 0x49, 0x7c, 0x68, 0xbc, 0x32, 0x26, 0xb9, 0x52, 0xbe, 0x67, 0x8b, 0xab, 0x2f, 0xbd, - 0x87, 0xc6, 0x18, 0xd3, 0x94, 0xcf, 0x34, 0xb9, 0x86, 0x9a, 0xb9, 0xc4, 0xc9, 0xee, 0xba, 0xd6, - 0x62, 0xe8, 0x08, 0xea, 0xe3, 0x77, 0x54, 0xfc, 0x20, 0x12, 0x42, 0x73, 0xbd, 0xf9, 0x21, 0x44, - 0xf2, 0x00, 0xb0, 0xbe, 0x53, 0xf9, 0xd5, 0xbd, 0x6e, 0x14, 0x90, 0xc3, 0x2f, 0x0f, 0x1a, 0xd3, - 0xbc, 0x49, 0x9e, 0x01, 0xac, 0xb9, 0xc6, 0x7f, 0x45, 0xfc, 0x1f, 0xb6, 0x4b, 0x84, 0xb3, 0x2b, - 0xe8, 0x6c, 0x75, 0x8a, 0x99, 0xa0, 0xff, 0xc8, 0x04, 0x9a, 0xa6, 0x62, 0xc4, 0x14, 0xe9, 0x6c, - 0x6f, 0x51, 0x48, 0x54, 0x70, 0xb9, 0xab, 0xbd, 0x9e, 0x16, 0x43, 0x7b, 0x23, 0x0d, 0xa4, 0xb7, - 0xc7, 0xee, 0x42, 0xac, 0x82, 0xab, 0x3f, 0x71, 0x2b, 0x8d, 0xf8, 0xbf, 0x4d, 0xfb, 0xe8, 0x3b, - 0x00, 0x00, 0xff, 0xff, 0xfb, 0xa1, 0x6b, 0xb0, 0x45, 0x03, 0x00, 0x00, + // 360 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x53, 0x41, 0x4f, 0xf2, 0x40, + 0x10, 0xfd, 0x28, 0xf0, 0x35, 0x0c, 0x1f, 0x5f, 0xcc, 0x46, 0x4d, 0x53, 0x83, 0x21, 0x7b, 0x40, + 0x62, 0xb4, 0x18, 0x08, 0x9e, 0xbc, 0x18, 0x0e, 0x5e, 0x08, 0x87, 0x7a, 0xf3, 0x66, 0xbb, 0x9b, + 0xb2, 0x11, 0x3a, 0xb8, 0xbb, 0x8d, 0x7f, 0xc0, 0x1f, 0x6e, 0xba, 0x5d, 0xb0, 0x80, 0x60, 0xb8, + 0x75, 0xe6, 0xbd, 0x37, 0x6f, 0xa7, 0xfb, 0x16, 0x5a, 0x29, 0xd7, 0x1f, 0x28, 0xdf, 0x82, 0xa5, + 0x44, 0x8d, 0xe4, 0x24, 0xc1, 0x60, 0x21, 0x62, 0x89, 0x81, 0xed, 0xfb, 0xc3, 0x44, 0xe8, 0x59, + 0x16, 0x05, 0x31, 0x2e, 0xfa, 0x06, 0xe9, 0x27, 0x78, 0x5b, 0x7c, 0x48, 0xcc, 0x34, 0x97, 0x7d, + 0xa3, 0xb4, 0x45, 0x31, 0x86, 0xb6, 0xa0, 0x39, 0x11, 0x4a, 0x87, 0xfc, 0x3d, 0xe3, 0x4a, 0xd3, + 0x07, 0xf8, 0x57, 0x94, 0x6a, 0x89, 0xa9, 0xe2, 0xe4, 0x06, 0xea, 0x29, 0x32, 0xae, 0xbc, 0x4a, + 0xa7, 0xda, 0x6b, 0x0e, 0xce, 0x83, 0x6d, 0xd7, 0x60, 0x8a, 0x8c, 0x87, 0x05, 0x89, 0x76, 0xe1, + 0x74, 0xca, 0x45, 0x32, 0x8b, 0x30, 0x93, 0x33, 0x44, 0x66, 0xa7, 0x92, 0xff, 0xe0, 0x08, 0xe6, + 0x55, 0x3a, 0x95, 0x5e, 0x23, 0x74, 0x04, 0xa3, 0x2f, 0x70, 0xb6, 0xc5, 0xb3, 0x76, 0x8f, 0xf9, + 0x96, 0x25, 0xc0, 0x68, 0x9a, 0x83, 0x8b, 0x1f, 0x6c, 0x57, 0xb4, 0x70, 0x53, 0x41, 0xef, 0xa0, + 0x96, 0x1f, 0x69, 0xdb, 0x93, 0x78, 0xe0, 0xbe, 0x32, 0x26, 0xb9, 0x52, 0x9e, 0x63, 0x9a, 0xab, + 0x92, 0x8e, 0xc0, 0x1d, 0x63, 0x9a, 0xf2, 0x58, 0x93, 0x6b, 0xa8, 0xe5, 0x9b, 0x58, 0xdb, 0x7d, + 0xdb, 0x1a, 0x0e, 0x1d, 0x42, 0x7d, 0x3c, 0x47, 0xc5, 0x8f, 0x12, 0x8d, 0xc0, 0x7d, 0xc6, 0xb9, + 0x88, 0xc5, 0x71, 0x5e, 0x08, 0x8d, 0xf5, 0xc2, 0xc7, 0x08, 0xc9, 0x3d, 0xc0, 0xfa, 0xf7, 0x28, + 0xaf, 0x7a, 0xf0, 0x12, 0x4b, 0xcc, 0xc1, 0xa7, 0x03, 0xee, 0xb4, 0x00, 0xc9, 0x13, 0x80, 0xc9, + 0x44, 0x1e, 0x1b, 0x45, 0xbc, 0x6f, 0xb5, 0x0d, 0x92, 0xbd, 0x65, 0xbf, 0xbd, 0x83, 0x94, 0xa3, + 0x44, 0xff, 0x90, 0x09, 0x34, 0xf2, 0x4e, 0x6e, 0xa6, 0x48, 0x7b, 0xf7, 0x14, 0xa5, 0x20, 0xfa, + 0x97, 0xfb, 0xe0, 0xf5, 0xb4, 0x08, 0x5a, 0x1b, 0x21, 0x22, 0xdd, 0x03, 0x29, 0x29, 0xa5, 0xd1, + 0xbf, 0xfa, 0x95, 0xb7, 0xf2, 0x88, 0xfe, 0x9a, 0x47, 0x32, 0xfc, 0x0a, 0x00, 0x00, 0xff, 0xff, + 0x59, 0xcf, 0xab, 0xb5, 0x7c, 0x03, 0x00, 0x00, } diff --git a/network/proto/network.proto b/network/proto/network.proto index cfba6d04..6025d90b 100644 --- a/network/proto/network.proto +++ b/network/proto/network.proto @@ -49,6 +49,12 @@ message Close { Node node = 1; } +// Solicit is sent when requesting route advertisement from the network nodes +message Solicit { + // network node + Node node = 1; +} + // Neighbour is used to nnounce node neighbourhood message Neighbour { // network node diff --git a/router/default.go b/router/default.go index 1fa8161a..2294a149 100644 --- a/router/default.go +++ b/router/default.go @@ -120,6 +120,9 @@ func (r *router) manageRoute(route Route, action string) error { if err := r.table.Delete(route); err != nil && err != ErrRouteNotFound { return fmt.Errorf("failed deleting route for service %s: %s", route.Service, err) } + case "solicit": + // nothing to do here + return nil default: return fmt.Errorf("failed to manage route for service %s. Unknown action: %s", route.Service, action) } @@ -603,9 +606,9 @@ func (r *router) Advertise() (<-chan *Advert, error) { return advertChan, nil case Running: // list all the routes and pack them into even slice to advertise - events, err := r.flushRouteEvents() + events, err := r.flushRouteEvents(Create) if err != nil { - return nil, fmt.Errorf("failed to advertise routes: %s", err) + return nil, fmt.Errorf("failed to flush routes: %s", err) } // create event channels @@ -676,8 +679,8 @@ func (r *router) Process(a *Advert) error { return nil } -// flushRouteEvents lists all the routes and builds a slice of events -func (r *router) flushRouteEvents() ([]*Event, error) { +// flushRouteEvents returns a slice of events, one per each route in the routing table +func (r *router) flushRouteEvents(evType EventType) ([]*Event, error) { // list all routes routes, err := r.table.List() if err != nil { @@ -688,7 +691,7 @@ func (r *router) flushRouteEvents() ([]*Event, error) { events := make([]*Event, len(routes)) for i, route := range routes { event := &Event{ - Type: Create, + Type: evType, Timestamp: time.Now(), Route: route, } @@ -701,7 +704,7 @@ func (r *router) flushRouteEvents() ([]*Event, error) { // Solicit advertises all of its routes to the network // It returns error if the router fails to list the routes func (r *router) Solicit() error { - events, err := r.flushRouteEvents() + events, err := r.flushRouteEvents(Update) if err != nil { return fmt.Errorf("failed solicit routes: %s", err) } diff --git a/router/proto/router.proto b/router/proto/router.proto index 543fdd97..64445ff3 100644 --- a/router/proto/router.proto +++ b/router/proto/router.proto @@ -7,6 +7,7 @@ service Router { rpc Lookup(LookupRequest) returns (LookupResponse) {}; rpc Watch(WatchRequest) returns (stream Event) {}; rpc Advertise(Request) returns (stream Advert) {}; + rpc Solicit(Request) returns (Response) {}; rpc Process(Advert) returns (ProcessResponse) {}; rpc Status(Request) returns (StatusResponse) {}; } @@ -22,6 +23,9 @@ service Table { // Empty request message Request {} +// Empty response +message Response {} + // ListResponse is returned by List message ListResponse { repeated Route routes = 1; diff --git a/router/service/service.go b/router/service/service.go index 77619762..f3063546 100644 --- a/router/service/service.go +++ b/router/service/service.go @@ -220,6 +220,42 @@ func (s *svc) Process(advert *router.Advert) error { return nil } +// Solicit advertise all routes +func (s *svc) Solicit() error { + // list all the routes + routes, err := s.table.List() + if err != nil { + return err + } + + // build events to advertise + events := make([]*router.Event, len(routes)) + for i, _ := range events { + events[i] = &router.Event{ + Type: router.Update, + Timestamp: time.Now(), + Route: routes[i], + } + } + + advert := &router.Advert{ + Id: s.opts.Id, + Type: router.RouteUpdate, + Timestamp: time.Now(), + TTL: time.Duration(router.DefaultAdvertTTL), + Events: events, + } + + select { + case s.advertChan <- advert: + case <-s.exit: + close(s.advertChan) + return nil + } + + return nil +} + // Status returns router status func (s *svc) Status() router.Status { s.Lock() From 5ddfd911ba37abfcd17908e4ca2459db5142abe3 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Thu, 5 Sep 2019 17:18:16 +0100 Subject: [PATCH 3/6] Replace send message code by one network method --- network/default.go | 196 +++++++++++++++++---------------------------- 1 file changed, 75 insertions(+), 121 deletions(-) diff --git a/network/default.go b/network/default.go index 75731c43..4d8b1156 100644 --- a/network/default.go +++ b/network/default.go @@ -2,6 +2,7 @@ package network import ( "container/list" + "errors" "sync" "time" @@ -26,6 +27,10 @@ var ( ControlChannel = "control" // DefaultLink is default network link DefaultLink = "network" + // ErrMsgUnknown is returned when unknown message is attempted to send or receive + ErrMsgUnknown = errors.New("unknown message") + // ErrChannelUnknown is returned when attempting to send or received on unknown channel + ErrChannelUnknown = errors.New("unknown channel") ) // node is network node @@ -319,8 +324,8 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen lastSeen: now, } n.Unlock() - // advertise the new neighbour to the network - if err := n.advertiseNeighbours(client); err != nil { + // advertise yourself to the network + if err := n.sendMsg("neighbour", NetworkChannel); err != nil { log.Debugf("Network failed to advertise neighbours: %v", err) } // advertise all the routes when a new node has connected @@ -342,43 +347,21 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen n.Lock() log.Debugf("Network received neighbour message from: %s", pbNetNeighbour.Node.Id) // only add the neighbour if it is NOT already in node's list of neighbours - if _, ok := n.neighbours[pbNetNeighbour.Node.Id]; !ok { + _, exists := n.neighbours[pbNetNeighbour.Node.Id] + if !exists { n.neighbours[pbNetNeighbour.Node.Id] = &node{ id: pbNetNeighbour.Node.Id, address: pbNetNeighbour.Node.Address, neighbours: make(map[string]*node), lastSeen: now, } - // send a solicit message when discovering a new node - node := &pbNet.Node{ - Id: n.options.Id, - Address: n.options.Address, - } - pbNetSolicit := &pbNet.Solicit{ - Node: node, - } - - if body, err := proto.Marshal(pbNetSolicit); err == nil { - // create transport message and chuck it down the pipe - m := transport.Message{ - Header: map[string]string{ - "Micro-Method": "solicit", - }, - Body: body, - } - - log.Debugf("Network sending solicit message from: %s", node.Id) - if err := client.Send(&m); err != nil { - log.Debugf("Network failed to send solicit messsage: %v", err) - } - } } // update lastSeen timestamp if n.neighbours[pbNetNeighbour.Node.Id].lastSeen.Before(now) { n.neighbours[pbNetNeighbour.Node.Id].lastSeen = now } // update/store the neighbour node neighbours - // NOTE: * we dont update lastSeen time for the neighbours of the neighbour + // NOTE: * we do NOT update lastSeen time for the neighbours of the neighbour // * even though we are NOT interested in neighbours of neighbours here // we still allocate the map of neighbours for each of them for _, pbNeighbour := range pbNetNeighbour.Neighbours { @@ -390,6 +373,13 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen n.neighbours[pbNetNeighbour.Node.Id].neighbours[neighbourNode.id] = neighbourNode } n.Unlock() + // send a solicit message when discovering a new node + // NOTE: we need to send the solicit message here after the Lock is released as sendMsg locs + if !exists { + if err := n.sendMsg("solicit", NetworkChannel); err != nil { + log.Debugf("Network failed to send solicit message: %s", err) + } + } case "close": pbNetClose := &pbNet.Close{} if err := proto.Unmarshal(m.Body, pbNetClose); err != nil { @@ -414,46 +404,70 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen } } -// advertiseNeighbours sends a neighbour message to the network -func (n *network) advertiseNeighbours(client transport.Client) error { - n.RLock() - nodes := make([]*pbNet.Node, len(n.neighbours)) - i := 0 - for id, _ := range n.neighbours { - nodes[i] = &pbNet.Node{ - Id: id, - Address: n.neighbours[id].address, - } - i++ - } - n.RUnlock() - +// sendMsg sends a message to the tunnel channel +func (n *network) sendMsg(msgType string, channel string) error { node := &pbNet.Node{ Id: n.options.Id, Address: n.options.Address, } - pbNetNeighbour := &pbNet.Neighbour{ - Node: node, - Neighbours: nodes, + + var protoMsg proto.Message + + switch msgType { + case "connect": + protoMsg = &pbNet.Connect{ + Node: node, + } + case "close": + protoMsg = &pbNet.Close{ + Node: node, + } + case "solicit": + protoMsg = &pbNet.Solicit{ + Node: node, + } + case "neighbour": + n.RLock() + nodes := make([]*pbNet.Node, len(n.neighbours)) + i := 0 + for id, _ := range n.neighbours { + nodes[i] = &pbNet.Node{ + Id: id, + Address: n.neighbours[id].address, + } + i++ + } + n.RUnlock() + protoMsg = &pbNet.Neighbour{ + Node: node, + Neighbours: nodes, + } + default: + return ErrMsgUnknown } - body, err := proto.Marshal(pbNetNeighbour) + body, err := proto.Marshal(protoMsg) if err != nil { - // TODO: should we bail here? - log.Debugf("Network failed to marshal neighbour message: %v", err) return err } // create transport message and chuck it down the pipe m := transport.Message{ Header: map[string]string{ - "Micro-Method": "neighbour", + "Micro-Method": msgType, }, Body: body, } - log.Debugf("Network sending neighbour message from: %s", node.Id) + n.RLock() + client, ok := n.tunClient[channel] + if !ok { + n.RUnlock() + return ErrChannelUnknown + } + n.RUnlock() + + log.Debugf("Network sending %s message from: %s", msgType, node.Id) if err := client.Send(&m); err != nil { - log.Debugf("Network failed to send neighbour messsage: %v", err) return err } @@ -471,7 +485,7 @@ func (n *network) announce(client transport.Client) { return case <-announce.C: // advertise yourself to the network - if err := n.advertiseNeighbours(client); err != nil { + if err := n.sendMsg("neighbour", NetworkChannel); err != nil { log.Debugf("Network failed to advertise neighbours: %v", err) continue } @@ -644,27 +658,8 @@ func (n *network) processCtrlChan(client transport.Client, listener tunnel.Liste } n.neighbours[pbRtrAdvert.Id] = advertNode // send a solicit message when discovering a new node - node := &pbNet.Node{ - Id: n.options.Id, - Address: n.options.Address, - } - pbNetSolicit := &pbNet.Solicit{ - Node: node, - } - - if body, err := proto.Marshal(pbNetSolicit); err == nil { - // create transport message and chuck it down the pipe - m := transport.Message{ - Header: map[string]string{ - "Micro-Method": "solicit", - }, - Body: body, - } - - log.Debugf("Network sending solicit message from: %s", node.Id) - if err := client.Send(&m); err != nil { - log.Debugf("Network failed to send solicit messsage: %v", err) - } + if err := n.sendMsg("solicit", NetworkChannel); err != nil { + log.Debugf("Network failed to send solicit message: %s", err) } } n.RUnlock() @@ -794,8 +789,6 @@ func (n *network) advertise(client transport.Client, advertChan <-chan *router.A // Connect connects the network func (n *network) Connect() error { n.Lock() - defer n.Unlock() - // return if already connected if n.connected { return nil @@ -863,32 +856,14 @@ func (n *network) Connect() error { if err := n.server.Start(); err != nil { return err } + n.Unlock() // send connect message to NetworkChannel // NOTE: in theory we could do this as soon as // Dial to NetworkChannel succeeds, but instead // we initialize all other node resources first - node := &pbNet.Node{ - Id: n.options.Id, - Address: n.options.Address, - } - pbNetConnect := &pbNet.Connect{ - Node: node, - } - - // only proceed with sending to NetworkChannel if marshal succeeds - if body, err := proto.Marshal(pbNetConnect); err == nil { - m := transport.Message{ - Header: map[string]string{ - "Micro-Method": "connect", - }, - Body: body, - } - - log.Debugf("Network sending connect message: %s", node.Id) - if err := netClient.Send(&m); err != nil { - log.Debugf("Network failed to send connect messsage: %v", err) - } + if err := n.sendMsg("connect", NetworkChannel); err != nil { + log.Debugf("Network failed to send connect message: %s", err) } // go resolving network nodes @@ -904,8 +879,9 @@ func (n *network) Connect() error { // accept and process routes go n.processCtrlChan(ctrlClient, ctrlListener) - // set connected to true + n.Lock() n.connected = true + n.Unlock() return nil } @@ -982,31 +958,9 @@ func (n *network) Close() error { return nil default: // send close message only if we managed to connect to NetworkChannel - if netClient, ok := n.tunClient[NetworkChannel]; ok { - // send connect message to NetworkChannel - node := &pbNet.Node{ - Id: n.options.Id, - Address: n.options.Address, - } - pbNetClose := &pbNet.Close{ - Node: node, - } - - // only proceed with sending to NetworkChannel if marshal succeeds - if body, err := proto.Marshal(pbNetClose); err == nil { - // create transport message and chuck it down the pipe - m := transport.Message{ - Header: map[string]string{ - "Micro-Method": "close", - }, - Body: body, - } - - log.Debugf("Network sending close message from: %s", node.Id) - if err := netClient.Send(&m); err != nil { - log.Debugf("Network failed to send close messsage: %v", err) - } - } + log.Debugf("Sending close message from: %s", n.options.Id) + if err := n.sendMsg("close", NetworkChannel); err != nil { + log.Debugf("Network failed to send close message: %s", err) } // TODO: send close message to the network channel close(n.closed) From b01c8e06e08ce1dc22a3680e3f3a02efdf02b6f9 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Thu, 5 Sep 2019 17:43:59 +0100 Subject: [PATCH 4/6] Update error name to ErrClientNotFound --- network/default.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/network/default.go b/network/default.go index 4d8b1156..3da04650 100644 --- a/network/default.go +++ b/network/default.go @@ -29,8 +29,8 @@ var ( DefaultLink = "network" // ErrMsgUnknown is returned when unknown message is attempted to send or receive ErrMsgUnknown = errors.New("unknown message") - // ErrChannelUnknown is returned when attempting to send or received on unknown channel - ErrChannelUnknown = errors.New("unknown channel") + // ErrClientNotFound is returned when client for tunnel channel could not be found + ErrClientNotFound = errors.New("client not found") ) // node is network node @@ -462,7 +462,7 @@ func (n *network) sendMsg(msgType string, channel string) error { client, ok := n.tunClient[channel] if !ok { n.RUnlock() - return ErrChannelUnknown + return ErrClientNotFound } n.RUnlock() From ec354934e3f7bc93075b0e73fb2e9996ad4de02d Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Thu, 5 Sep 2019 17:44:47 +0100 Subject: [PATCH 5/6] Move Errors to separate init block --- network/default.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/network/default.go b/network/default.go index 3da04650..e0123c3f 100644 --- a/network/default.go +++ b/network/default.go @@ -27,6 +27,9 @@ var ( ControlChannel = "control" // DefaultLink is default network link DefaultLink = "network" +) + +var ( // ErrMsgUnknown is returned when unknown message is attempted to send or receive ErrMsgUnknown = errors.New("unknown message") // ErrClientNotFound is returned when client for tunnel channel could not be found From dddfb6f878a0856aba3fd22b91943ebe49fb3655 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Thu, 5 Sep 2019 17:59:14 +0100 Subject: [PATCH 6/6] Fixed typos and simplified map iteration --- network/default.go | 6 +++--- router/router.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/network/default.go b/network/default.go index e0123c3f..674bba3c 100644 --- a/network/default.go +++ b/network/default.go @@ -377,7 +377,7 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen } n.Unlock() // send a solicit message when discovering a new node - // NOTE: we need to send the solicit message here after the Lock is released as sendMsg locs + // NOTE: we need to send the solicit message here after the Lock is released as sendMsg locks, too if !exists { if err := n.sendMsg("solicit", NetworkChannel); err != nil { log.Debugf("Network failed to send solicit message: %s", err) @@ -433,7 +433,7 @@ func (n *network) sendMsg(msgType string, channel string) error { n.RLock() nodes := make([]*pbNet.Node, len(n.neighbours)) i := 0 - for id, _ := range n.neighbours { + for id := range n.neighbours { nodes[i] = &pbNet.Node{ Id: id, Address: n.neighbours[id].address, @@ -609,7 +609,7 @@ func (n *network) setRouteMetric(route *router.Route) { // check if the route origin is the neighbour of our neighbour for _, node := range n.neighbours { - for id, _ := range node.neighbours { + for id := range node.neighbours { if route.Router == id { route.Metric = 100 n.RUnlock() diff --git a/router/router.go b/router/router.go index 224ef124..3e8f00cd 100644 --- a/router/router.go +++ b/router/router.go @@ -28,7 +28,7 @@ type Router interface { Advertise() (<-chan *Advert, error) // Process processes incoming adverts Process(*Advert) error - // Solicit advertises the whole routing table ot the network + // Solicit advertises the whole routing table to the network Solicit() error // Lookup queries routes in the routing table Lookup(Query) ([]Route, error)