From 7f9b3b5556471d574490eccbbde33abbf20ae962 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Thu, 16 Jan 2020 19:43:10 +0000 Subject: [PATCH] Remove Solicitation from the network Instead, when a new peer is discovered it is sent a sync message i.e. we do the full sync when discovering peers --- network/default.go | 232 +++++++----------------- router/default.go | 22 --- router/router.go | 6 - router/service/proto/router.pb.go | 182 +++++++------------ router/service/proto/router.pb.micro.go | 19 +- router/service/proto/router.proto | 7 - router/service/service.go | 36 ---- 7 files changed, 130 insertions(+), 374 deletions(-) diff --git a/network/default.go b/network/default.go index 9123949e..fec4a9ce 100644 --- a/network/default.go +++ b/network/default.go @@ -80,8 +80,6 @@ type network struct { closed chan bool // whether we've discovered by the network discovered chan bool - // solicted checks whether routes were solicited by one node - solicited chan *node } // message is network message @@ -176,7 +174,6 @@ func newNetwork(opts ...Option) Network { tunClient: make(map[string]tunnel.Session), peerLinks: make(map[string]tunnel.Link), discovered: make(chan bool, 1), - solicited: make(chan *node, 32), } network.node.network = network @@ -346,60 +343,23 @@ func (n *network) advertise(advertChan <-chan *router.Advert) { Events: events, } - // send the advert to a select number of random peers - if advert.Type != router.Solicitation { - // get a list of node peers - peers := n.Peers() - - // there is no one to send to - if len(peers) == 0 { - continue - } - - // advertise to max 3 peers - max := len(peers) - if max > 3 { - max = 3 - } - - for i := 0; i < max; i++ { - if peer := n.node.GetPeerNode(peers[rnd.Intn(len(peers))].Id()); peer != nil { - if err := n.sendTo("advert", ControlChannel, peer, msg); err != nil { - log.Debugf("Network failed to advertise routes to %s: %v", peer.Id(), err) - } - } - } + // get a list of node peers + peers := n.Peers() + // continue if there is no one to send to + if len(peers) == 0 { continue } - // it's a solication, someone asked for it - // so we're going to pick off the node and send it - select { - case peer := <-n.solicited: - // someone requested the route - n.sendTo("advert", ControlChannel, peer, msg) - default: - // get a list of node peers - peers := n.Peers() + // advertise to max 3 peers + max := len(peers) + if max > 3 { + max = 3 + } - // only proceed if we have a peer - if len(peers) == 0 { - continue - } - - // pick a random peer from the list of peers - peer := n.node.GetPeerNode(peers[rnd.Intn(len(peers))].Id()) - // only proceed with a peer - if peer == nil { - continue - } - - // attempt to send the advert to the peer - if err := n.sendTo("advert", ControlChannel, peer, msg); err != nil { - log.Debugf("Network failed to advertise routes to %s: %v, sending multicast", peer.Id(), err) - // send a multicast message if we fail to send Unicast message - if err := n.sendMsg("advert", ControlChannel, msg); err != nil { + for i := 0; i < max; i++ { + if peer := n.node.GetPeerNode(peers[rnd.Intn(len(peers))].Id()); peer != nil { + if err := n.sendTo("advert", ControlChannel, peer, msg); err != nil { log.Debugf("Network failed to advertise routes to %s: %v", peer.Id(), err) } } @@ -740,38 +700,6 @@ func (n *network) processCtrlChan(listener tunnel.Listener) { if err := n.router.Process(advert); err != nil { log.Debugf("Network failed to process advert %s: %v", advert.Id, err) } - case "solicit": - pbRtrSolicit := new(pbRtr.Solicit) - if err := proto.Unmarshal(m.msg.Body, pbRtrSolicit); err != nil { - log.Debugf("Network fail to unmarshal solicit message: %v", err) - continue - } - - log.Debugf("Network received solicit message from: %s", pbRtrSolicit.Id) - - // ignore solicitation when requested by you - if pbRtrSolicit.Id == n.options.Id { - continue - } - - log.Tracef("Network router flushing routes for: %s", pbRtrSolicit.Id) - - peer := &node{ - id: pbRtrSolicit.Id, - link: m.msg.Header["Micro-Link"], - } - - // specify that someone solicited the route - select { - case n.solicited <- peer: - default: - // don't block - } - - // advertise all the routes when a new node has connected - if err := n.router.Solicit(); err != nil { - log.Debugf("Network failed to solicit routes: %s", err) - } } case <-n.closed: return @@ -851,54 +779,17 @@ func (n *network) processNetChan(listener tunnel.Listener) { } // get a list of the best routes for each service in our routing table - q := []router.QueryOption{ - router.QueryStrategy(n.router.Options().Advertise), - } - routes, err := n.router.Table().Query(q...) - switch err { - case nil: - // encode the routes to protobuf - pbRoutes := make([]*pbRtr.Route, 0, len(routes)) - for _, route := range routes { - // generate new route proto - pbRoute := pbUtil.RouteToProto(route) - // mask the route before outbounding - n.maskRoute(pbRoute) - // add to list of routes - pbRoutes = append(pbRoutes, pbRoute) - } - // pack the routes into the sync message - msg.Routes = pbRoutes - default: - // we can't list the routes + routes, err := n.getProtoRoutes() + if err != nil { log.Debugf("Network node %s failed listing routes: %v", n.id, err) } + // attached the routes to the message + msg.Routes = routes // send sync message to the newly connected peer if err := n.sendTo("sync", NetworkChannel, peer, msg); err != nil { log.Debugf("Network failed to send sync message: %v", err) } - // wait for a short period of time before sending a solicit message - <-time.After(time.Millisecond * 100) - - // send a solicit message when discovering new peer - // this triggers the node to flush its routing table to the network - // and leads to faster convergence of the network - solicit := &pbRtr.Solicit{ - Id: n.options.Id, - } - - // ask for the new nodes routes - if err := n.sendTo("solicit", ControlChannel, peer, solicit); err != nil { - log.Debugf("Network failed to send solicit message: %s", err) - } - - // now advertise our own routes - select { - case n.solicited <- peer: - default: - // don't block - } }() case "peer": // mark the time the message has been received @@ -934,38 +825,27 @@ func (n *network) processNetChan(listener tunnel.Listener) { log.Debugf("Network failed updating peer links: %s", err) } - // if it's a new peer i.e. we do not have it in our graph, we solicit its routes + // if it's a new peer i.e. we do not have it in our graph, we request full sync if err := n.node.AddPeer(peer); err == nil { go func() { - msg := PeersToProto(n.node, MaxDepth) + // marshal node graph into protobuf + node := PeersToProto(n.node, MaxDepth) - // advertise yourself to the peer - if err := n.sendTo("peer", NetworkChannel, peer, msg); err != nil { - log.Debugf("Network failed to advertise peers: %v", err) + msg := &pbNet.Sync{ + Peer: node, } - <-time.After(time.Millisecond * 100) - - // send a solicit message when discovering new peer - solicit := &pbRtr.Solicit{ - Id: n.options.Id, + // get a list of the best routes for each service in our routing table + routes, err := n.getProtoRoutes() + if err != nil { + log.Debugf("Network node %s failed listing routes: %v", n.id, err) } + // attached the routes to the message + msg.Routes = routes - // then solicit this peer - if err := n.sendTo("solicit", ControlChannel, peer, solicit); err != nil { - log.Debugf("Network failed to send solicit message: %s", err) - } - - // now advertise our own routes - select { - case n.solicited <- peer: - default: - // don't block - } - - // advertise all the routes when a new node has connected - if err := n.router.Solicit(); err != nil { - log.Debugf("Network failed to solicit routes: %s", err) + // send sync message to the newly connected peer + if err := n.sendTo("sync", NetworkChannel, peer, msg); err != nil { + log.Debugf("Network failed to send sync message: %v", err) } }() @@ -1377,28 +1257,12 @@ func (n *network) manage() { } // get a list of the best routes for each service in our routing table - q := []router.QueryOption{ - router.QueryStrategy(n.router.Options().Advertise), - } - routes, err := n.router.Table().Query(q...) - switch err { - case nil: - // encode the routes to protobuf - pbRoutes := make([]*pbRtr.Route, 0, len(routes)) - for _, route := range routes { - // generate new route proto - pbRoute := pbUtil.RouteToProto(route) - // mask the route before outbounding - n.maskRoute(pbRoute) - // add to list of routes - pbRoutes = append(pbRoutes, pbRoute) - } - // pack the routes into the sync message - msg.Routes = pbRoutes - default: - // we can't list the routes + routes, err := n.getProtoRoutes() + if err != nil { log.Debugf("Network node %s failed listing routes: %v", n.id, err) } + // attached the routes to the message + msg.Routes = routes // send sync message to the newly connected peer if err := n.sendTo("sync", NetworkChannel, peer, msg); err != nil { @@ -1411,6 +1275,33 @@ func (n *network) manage() { } } +// getAdvertProtoRoutes returns a list of routes to advertise to remote peer +// based on the advertisement strategy encoded in protobuf +// It returns error if the routes failed to be retrieved from the routing table +func (n *network) getProtoRoutes() ([]*pbRtr.Route, error) { + // get a list of the best routes for each service in our routing table + q := []router.QueryOption{ + router.QueryStrategy(n.router.Options().Advertise), + } + + routes, err := n.router.Table().Query(q...) + if err != nil { + return nil, err + } + + // encode the routes to protobuf + pbRoutes := make([]*pbRtr.Route, 0, len(routes)) + for _, route := range routes { + // generate new route proto + pbRoute := pbUtil.RouteToProto(route) + // mask the route before outbounding + n.maskRoute(pbRoute) + // add to list of routes + pbRoutes = append(pbRoutes, pbRoute) + } + return pbRoutes, nil +} + func (n *network) sendConnect() { // send connect message to NetworkChannel // NOTE: in theory we could do this as soon as @@ -1822,7 +1713,6 @@ func (n *network) Close() error { n.Unlock() return nil default: - // TODO: send close message to the network channel close(n.closed) // set connected to false diff --git a/router/default.go b/router/default.go index 459488df..688d5337 100644 --- a/router/default.go +++ b/router/default.go @@ -115,9 +115,6 @@ func (r *router) manageRoute(route Route, action string) error { if err := r.table.Update(route); err != nil { return fmt.Errorf("failed updating route for service %s: %s", route.Service, err) } - case "solicit": - // nothing to do here - return nil default: return fmt.Errorf("failed to manage route for service %s: unknown action %s", route.Service, action) } @@ -816,25 +813,6 @@ func (r *router) flushRouteEvents(evType EventType) ([]*Event, error) { return events, nil } -// Solicit advertises all of its routes to the network -// It returns error if the router fails to list the routes -func (r *router) Solicit() error { - events, err := r.flushRouteEvents(Update) - if err != nil { - return fmt.Errorf("failed solicit routes: %s", err) - } - - // advertise the routes - r.advertWg.Add(1) - - go func() { - r.publishAdvert(Solicitation, events) - r.advertWg.Done() - }() - - return nil -} - // Lookup routes in the routing table func (r *router) Lookup(q ...QueryOption) ([]Route, error) { return r.table.Query(q...) diff --git a/router/router.go b/router/router.go index cf6294ef..b71cbcac 100644 --- a/router/router.go +++ b/router/router.go @@ -28,8 +28,6 @@ type Router interface { Advertise() (<-chan *Advert, error) // Process processes incoming adverts Process(*Advert) error - // Solicit advertises the whole routing table - Solicit() error // Lookup queries routes in the routing table Lookup(...QueryOption) ([]Route, error) // Watch returns a watcher which tracks updates to the routing table @@ -111,8 +109,6 @@ const ( Announce AdvertType = iota // RouteUpdate advertises route updates RouteUpdate - // Solicitation indicates routes were solicited - Solicitation ) // String returns human readable advertisement type @@ -122,8 +118,6 @@ func (t AdvertType) String() string { return "announce" case RouteUpdate: return "update" - case Solicitation: - return "solicitation" default: return "unknown" } diff --git a/router/service/proto/router.pb.go b/router/service/proto/router.pb.go index 4e2c4cc0..29904323 100644 --- a/router/service/proto/router.pb.go +++ b/router/service/proto/router.pb.go @@ -1,5 +1,5 @@ // Code generated by protoc-gen-go. DO NOT EDIT. -// source: github.com/micro/go-micro/router/proto/router.proto +// source: router.proto package go_micro_router @@ -43,7 +43,7 @@ func (x AdvertType) String() string { } func (AdvertType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_2dd64c6ec344e37e, []int{0} + return fileDescriptor_367072455c71aedc, []int{0} } // EventType defines the type of event @@ -72,7 +72,7 @@ func (x EventType) String() string { } func (EventType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_2dd64c6ec344e37e, []int{1} + return fileDescriptor_367072455c71aedc, []int{1} } // Empty request @@ -86,7 +86,7 @@ func (m *Request) Reset() { *m = Request{} } func (m *Request) String() string { return proto.CompactTextString(m) } func (*Request) ProtoMessage() {} func (*Request) Descriptor() ([]byte, []int) { - return fileDescriptor_2dd64c6ec344e37e, []int{0} + return fileDescriptor_367072455c71aedc, []int{0} } func (m *Request) XXX_Unmarshal(b []byte) error { @@ -118,7 +118,7 @@ func (m *Response) Reset() { *m = Response{} } func (m *Response) String() string { return proto.CompactTextString(m) } func (*Response) ProtoMessage() {} func (*Response) Descriptor() ([]byte, []int) { - return fileDescriptor_2dd64c6ec344e37e, []int{1} + return fileDescriptor_367072455c71aedc, []int{1} } func (m *Response) XXX_Unmarshal(b []byte) error { @@ -151,7 +151,7 @@ func (m *ListResponse) Reset() { *m = ListResponse{} } func (m *ListResponse) String() string { return proto.CompactTextString(m) } func (*ListResponse) ProtoMessage() {} func (*ListResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_2dd64c6ec344e37e, []int{2} + return fileDescriptor_367072455c71aedc, []int{2} } func (m *ListResponse) XXX_Unmarshal(b []byte) error { @@ -191,7 +191,7 @@ func (m *LookupRequest) Reset() { *m = LookupRequest{} } func (m *LookupRequest) String() string { return proto.CompactTextString(m) } func (*LookupRequest) ProtoMessage() {} func (*LookupRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_2dd64c6ec344e37e, []int{3} + return fileDescriptor_367072455c71aedc, []int{3} } func (m *LookupRequest) XXX_Unmarshal(b []byte) error { @@ -231,7 +231,7 @@ func (m *LookupResponse) Reset() { *m = LookupResponse{} } func (m *LookupResponse) String() string { return proto.CompactTextString(m) } func (*LookupResponse) ProtoMessage() {} func (*LookupResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_2dd64c6ec344e37e, []int{4} + return fileDescriptor_367072455c71aedc, []int{4} } func (m *LookupResponse) XXX_Unmarshal(b []byte) error { @@ -271,7 +271,7 @@ func (m *QueryRequest) Reset() { *m = QueryRequest{} } func (m *QueryRequest) String() string { return proto.CompactTextString(m) } func (*QueryRequest) ProtoMessage() {} func (*QueryRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_2dd64c6ec344e37e, []int{5} + return fileDescriptor_367072455c71aedc, []int{5} } func (m *QueryRequest) XXX_Unmarshal(b []byte) error { @@ -311,7 +311,7 @@ func (m *QueryResponse) Reset() { *m = QueryResponse{} } func (m *QueryResponse) String() string { return proto.CompactTextString(m) } func (*QueryResponse) ProtoMessage() {} func (*QueryResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_2dd64c6ec344e37e, []int{6} + return fileDescriptor_367072455c71aedc, []int{6} } func (m *QueryResponse) XXX_Unmarshal(b []byte) error { @@ -350,7 +350,7 @@ func (m *WatchRequest) Reset() { *m = WatchRequest{} } func (m *WatchRequest) String() string { return proto.CompactTextString(m) } func (*WatchRequest) ProtoMessage() {} func (*WatchRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_2dd64c6ec344e37e, []int{7} + return fileDescriptor_367072455c71aedc, []int{7} } func (m *WatchRequest) XXX_Unmarshal(b []byte) error { @@ -392,7 +392,7 @@ func (m *Advert) Reset() { *m = Advert{} } func (m *Advert) String() string { return proto.CompactTextString(m) } func (*Advert) ProtoMessage() {} func (*Advert) Descriptor() ([]byte, []int) { - return fileDescriptor_2dd64c6ec344e37e, []int{8} + return fileDescriptor_367072455c71aedc, []int{8} } func (m *Advert) XXX_Unmarshal(b []byte) error { @@ -448,47 +448,6 @@ func (m *Advert) GetEvents() []*Event { return nil } -// Solicit solicits routes -type Solicit struct { - // id of the soliciting router - Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *Solicit) Reset() { *m = Solicit{} } -func (m *Solicit) String() string { return proto.CompactTextString(m) } -func (*Solicit) ProtoMessage() {} -func (*Solicit) Descriptor() ([]byte, []int) { - return fileDescriptor_2dd64c6ec344e37e, []int{9} -} - -func (m *Solicit) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_Solicit.Unmarshal(m, b) -} -func (m *Solicit) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_Solicit.Marshal(b, m, deterministic) -} -func (m *Solicit) XXX_Merge(src proto.Message) { - xxx_messageInfo_Solicit.Merge(m, src) -} -func (m *Solicit) XXX_Size() int { - return xxx_messageInfo_Solicit.Size(m) -} -func (m *Solicit) XXX_DiscardUnknown() { - xxx_messageInfo_Solicit.DiscardUnknown(m) -} - -var xxx_messageInfo_Solicit proto.InternalMessageInfo - -func (m *Solicit) GetId() string { - if m != nil { - return m.Id - } - return "" -} - // ProcessResponse is returned by Process type ProcessResponse struct { XXX_NoUnkeyedLiteral struct{} `json:"-"` @@ -500,7 +459,7 @@ func (m *ProcessResponse) Reset() { *m = ProcessResponse{} } func (m *ProcessResponse) String() string { return proto.CompactTextString(m) } func (*ProcessResponse) ProtoMessage() {} func (*ProcessResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_2dd64c6ec344e37e, []int{10} + return fileDescriptor_367072455c71aedc, []int{9} } func (m *ProcessResponse) XXX_Unmarshal(b []byte) error { @@ -532,7 +491,7 @@ func (m *CreateResponse) Reset() { *m = CreateResponse{} } func (m *CreateResponse) String() string { return proto.CompactTextString(m) } func (*CreateResponse) ProtoMessage() {} func (*CreateResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_2dd64c6ec344e37e, []int{11} + return fileDescriptor_367072455c71aedc, []int{10} } func (m *CreateResponse) XXX_Unmarshal(b []byte) error { @@ -564,7 +523,7 @@ func (m *DeleteResponse) Reset() { *m = DeleteResponse{} } func (m *DeleteResponse) String() string { return proto.CompactTextString(m) } func (*DeleteResponse) ProtoMessage() {} func (*DeleteResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_2dd64c6ec344e37e, []int{12} + return fileDescriptor_367072455c71aedc, []int{11} } func (m *DeleteResponse) XXX_Unmarshal(b []byte) error { @@ -596,7 +555,7 @@ func (m *UpdateResponse) Reset() { *m = UpdateResponse{} } func (m *UpdateResponse) String() string { return proto.CompactTextString(m) } func (*UpdateResponse) ProtoMessage() {} func (*UpdateResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_2dd64c6ec344e37e, []int{13} + return fileDescriptor_367072455c71aedc, []int{12} } func (m *UpdateResponse) XXX_Unmarshal(b []byte) error { @@ -634,7 +593,7 @@ func (m *Event) Reset() { *m = Event{} } func (m *Event) String() string { return proto.CompactTextString(m) } func (*Event) ProtoMessage() {} func (*Event) Descriptor() ([]byte, []int) { - return fileDescriptor_2dd64c6ec344e37e, []int{14} + return fileDescriptor_367072455c71aedc, []int{13} } func (m *Event) XXX_Unmarshal(b []byte) error { @@ -693,7 +652,7 @@ func (m *Query) Reset() { *m = Query{} } func (m *Query) String() string { return proto.CompactTextString(m) } func (*Query) ProtoMessage() {} func (*Query) Descriptor() ([]byte, []int) { - return fileDescriptor_2dd64c6ec344e37e, []int{15} + return fileDescriptor_367072455c71aedc, []int{14} } func (m *Query) XXX_Unmarshal(b []byte) error { @@ -760,7 +719,7 @@ func (m *Route) Reset() { *m = Route{} } func (m *Route) String() string { return proto.CompactTextString(m) } func (*Route) ProtoMessage() {} func (*Route) Descriptor() ([]byte, []int) { - return fileDescriptor_2dd64c6ec344e37e, []int{16} + return fileDescriptor_367072455c71aedc, []int{15} } func (m *Route) XXX_Unmarshal(b []byte) error { @@ -842,7 +801,7 @@ func (m *Status) Reset() { *m = Status{} } func (m *Status) String() string { return proto.CompactTextString(m) } func (*Status) ProtoMessage() {} func (*Status) Descriptor() ([]byte, []int) { - return fileDescriptor_2dd64c6ec344e37e, []int{17} + return fileDescriptor_367072455c71aedc, []int{16} } func (m *Status) XXX_Unmarshal(b []byte) error { @@ -888,7 +847,7 @@ func (m *StatusResponse) Reset() { *m = StatusResponse{} } func (m *StatusResponse) String() string { return proto.CompactTextString(m) } func (*StatusResponse) ProtoMessage() {} func (*StatusResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_2dd64c6ec344e37e, []int{18} + return fileDescriptor_367072455c71aedc, []int{17} } func (m *StatusResponse) XXX_Unmarshal(b []byte) error { @@ -928,7 +887,6 @@ func init() { proto.RegisterType((*QueryResponse)(nil), "go.micro.router.QueryResponse") proto.RegisterType((*WatchRequest)(nil), "go.micro.router.WatchRequest") proto.RegisterType((*Advert)(nil), "go.micro.router.Advert") - proto.RegisterType((*Solicit)(nil), "go.micro.router.Solicit") proto.RegisterType((*ProcessResponse)(nil), "go.micro.router.ProcessResponse") proto.RegisterType((*CreateResponse)(nil), "go.micro.router.CreateResponse") proto.RegisterType((*DeleteResponse)(nil), "go.micro.router.DeleteResponse") @@ -940,56 +898,52 @@ func init() { proto.RegisterType((*StatusResponse)(nil), "go.micro.router.StatusResponse") } -func init() { - proto.RegisterFile("github.com/micro/go-micro/router/proto/router.proto", fileDescriptor_2dd64c6ec344e37e) -} +func init() { proto.RegisterFile("router.proto", fileDescriptor_367072455c71aedc) } -var fileDescriptor_2dd64c6ec344e37e = []byte{ - // 736 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x56, 0x4f, 0x4f, 0xdb, 0x4a, - 0x10, 0xb7, 0x93, 0xd8, 0x79, 0x99, 0x17, 0x42, 0xde, 0xe8, 0x09, 0x4c, 0xde, 0x03, 0x22, 0x9f, - 0x10, 0xa2, 0x4e, 0x15, 0xae, 0xfd, 0x43, 0xa0, 0x54, 0x95, 0xca, 0xa1, 0x35, 0xa0, 0x9e, 0x8d, - 0xb3, 0x0a, 0x16, 0x49, 0xd6, 0xec, 0xae, 0x41, 0x39, 0xf7, 0xd3, 0xf4, 0xd2, 0x4b, 0x3f, 0x52, - 0xbf, 0x48, 0xe5, 0xdd, 0x75, 0x08, 0x71, 0x16, 0x09, 0x4e, 0xd9, 0xf9, 0xf7, 0x9b, 0x99, 0xdd, - 0xdf, 0x8c, 0x03, 0x87, 0xa3, 0x44, 0x5c, 0x67, 0x57, 0x41, 0x4c, 0x27, 0xbd, 0x49, 0x12, 0x33, - 0xda, 0x1b, 0xd1, 0x57, 0xea, 0xc0, 0x68, 0x26, 0x08, 0xeb, 0xa5, 0x8c, 0x8a, 0x42, 0x08, 0xa4, - 0x80, 0xeb, 0x23, 0x1a, 0x48, 0x9f, 0x40, 0xa9, 0xfd, 0x06, 0xd4, 0x43, 0x72, 0x9b, 0x11, 0x2e, - 0x7c, 0x80, 0xbf, 0x42, 0xc2, 0x53, 0x3a, 0xe5, 0xc4, 0x7f, 0x07, 0xcd, 0xb3, 0x84, 0x8b, 0x42, - 0xc6, 0x00, 0x5c, 0x19, 0xc0, 0x3d, 0xbb, 0x5b, 0xdd, 0xfb, 0xbb, 0xbf, 0x11, 0x2c, 0x01, 0x05, - 0x61, 0xfe, 0x13, 0x6a, 0x2f, 0xff, 0x2d, 0xac, 0x9d, 0x51, 0x7a, 0x93, 0xa5, 0x1a, 0x1c, 0x0f, - 0xc0, 0xb9, 0xcd, 0x08, 0x9b, 0x79, 0x76, 0xd7, 0x5e, 0x19, 0xff, 0x35, 0xb7, 0x86, 0xca, 0xc9, - 0x3f, 0x82, 0x56, 0x11, 0xfe, 0xc2, 0x02, 0xde, 0x40, 0x53, 0x21, 0xbe, 0x28, 0xff, 0x7b, 0x58, - 0xd3, 0xd1, 0x2f, 0x4c, 0xdf, 0x82, 0xe6, 0xb7, 0x48, 0xc4, 0xd7, 0xc5, 0xdd, 0xfe, 0xb0, 0xc1, - 0x1d, 0x0c, 0xef, 0x08, 0x13, 0xd8, 0x82, 0x4a, 0x32, 0x94, 0x65, 0x34, 0xc2, 0x4a, 0x32, 0xc4, - 0x1e, 0xd4, 0xc4, 0x2c, 0x25, 0x5e, 0xa5, 0x6b, 0xef, 0xb5, 0xfa, 0xff, 0x95, 0x80, 0x55, 0xd8, - 0xc5, 0x2c, 0x25, 0xa1, 0x74, 0xc4, 0xff, 0xa1, 0x21, 0x92, 0x09, 0xe1, 0x22, 0x9a, 0xa4, 0x5e, - 0xb5, 0x6b, 0xef, 0x55, 0xc3, 0x07, 0x05, 0xb6, 0xa1, 0x2a, 0xc4, 0xd8, 0xab, 0x49, 0x7d, 0x7e, - 0xcc, 0x6b, 0x27, 0x77, 0x64, 0x2a, 0xb8, 0xe7, 0x18, 0x6a, 0x3f, 0xcd, 0xcd, 0xa1, 0xf6, 0xf2, - 0xb7, 0xa0, 0x7e, 0x4e, 0xc7, 0x49, 0x9c, 0x94, 0x6a, 0xf5, 0xff, 0x81, 0xf5, 0x2f, 0x8c, 0xc6, - 0x84, 0xf3, 0x39, 0x53, 0xda, 0xd0, 0x3a, 0x61, 0x24, 0x12, 0x64, 0x51, 0xf3, 0x81, 0x8c, 0xc9, - 0x63, 0xcd, 0x65, 0x3a, 0x5c, 0xf4, 0xf9, 0x6e, 0x83, 0x23, 0xb3, 0x62, 0xa0, 0xdb, 0xb7, 0x65, - 0xfb, 0x9d, 0xd5, 0xb5, 0x99, 0xba, 0xaf, 0x2c, 0x77, 0x7f, 0x00, 0x8e, 0x8c, 0x93, 0xf7, 0x62, - 0x7e, 0x26, 0xe5, 0xe4, 0x5f, 0x82, 0x23, 0x9f, 0x19, 0x3d, 0xa8, 0x73, 0xc2, 0xee, 0x92, 0x98, - 0xe8, 0x66, 0x0b, 0x31, 0xb7, 0x8c, 0x22, 0x41, 0xee, 0xa3, 0x99, 0x4c, 0xd6, 0x08, 0x0b, 0x31, - 0xb7, 0x4c, 0x89, 0xb8, 0xa7, 0xec, 0x46, 0x26, 0x6b, 0x84, 0x85, 0xe8, 0xff, 0xb2, 0xc1, 0x91, - 0x79, 0x9e, 0xc6, 0x8d, 0x86, 0x43, 0x46, 0x38, 0x2f, 0x70, 0xb5, 0xb8, 0x98, 0xb1, 0x6a, 0xcc, - 0x58, 0x7b, 0x94, 0x11, 0x37, 0x34, 0x3d, 0x99, 0xe7, 0x48, 0x83, 0x96, 0x10, 0xa1, 0x36, 0x4e, - 0xa6, 0x37, 0x9e, 0x2b, 0xb5, 0xf2, 0x9c, 0xfb, 0x4e, 0x88, 0x60, 0x49, 0xec, 0xd5, 0xe5, 0xed, - 0x69, 0xc9, 0xef, 0x83, 0x7b, 0x2e, 0x22, 0x91, 0xf1, 0x3c, 0x2a, 0xa6, 0xc3, 0xa2, 0x64, 0x79, - 0xc6, 0x7f, 0xc1, 0x21, 0x8c, 0x51, 0xa6, 0xab, 0x55, 0x82, 0x3f, 0x80, 0x96, 0x8a, 0x99, 0x0f, - 0x4a, 0x0f, 0x5c, 0x2e, 0x35, 0x7a, 0xd0, 0x36, 0x4b, 0x2f, 0xa0, 0x03, 0xb4, 0xdb, 0x7e, 0x1f, - 0xe0, 0x81, 0xe1, 0x88, 0xd0, 0x52, 0xd2, 0x60, 0x3a, 0xa5, 0xd9, 0x34, 0x26, 0x6d, 0x0b, 0xdb, - 0xd0, 0x54, 0x3a, 0xc5, 0xa1, 0xb6, 0xbd, 0xdf, 0x83, 0xc6, 0x9c, 0x16, 0x08, 0xe0, 0x2a, 0x02, - 0xb6, 0xad, 0xfc, 0xac, 0xa8, 0xd7, 0xb6, 0xf3, 0xb3, 0x0e, 0xa8, 0xf4, 0x7f, 0x56, 0xc1, 0x0d, - 0xd5, 0x95, 0x7c, 0x06, 0x57, 0xad, 0x16, 0xdc, 0x29, 0x95, 0xf6, 0x68, 0x65, 0x75, 0x76, 0x8d, - 0x76, 0x4d, 0x62, 0x0b, 0x8f, 0xc1, 0x91, 0x63, 0x8e, 0xdb, 0x25, 0xdf, 0xc5, 0xf1, 0xef, 0x18, - 0x46, 0xce, 0xb7, 0x5e, 0xdb, 0x78, 0x0c, 0x0d, 0xd5, 0x5e, 0xc2, 0x09, 0x7a, 0x65, 0xc2, 0x6a, - 0x88, 0x4d, 0xc3, 0x62, 0x90, 0x18, 0x47, 0x0f, 0x23, 0x6b, 0x46, 0xd8, 0x5a, 0x61, 0x99, 0x77, - 0xf2, 0x11, 0xea, 0x7a, 0xb2, 0xd1, 0x94, 0xa9, 0xd3, 0x2d, 0x19, 0x96, 0x97, 0x81, 0x85, 0xa7, - 0x73, 0x16, 0x99, 0x0b, 0xd9, 0x35, 0x71, 0x62, 0x0e, 0xd3, 0xff, 0x5d, 0x01, 0xe7, 0x22, 0xba, - 0x1a, 0x13, 0x3c, 0x29, 0x9e, 0x17, 0x0d, 0xc3, 0xbc, 0x02, 0x6e, 0x69, 0x21, 0x59, 0x39, 0x88, - 0xe2, 0xc5, 0x33, 0x40, 0x96, 0x76, 0x98, 0x04, 0x51, 0x84, 0x7a, 0x06, 0xc8, 0xd2, 0xda, 0xb3, - 0x70, 0x00, 0xb5, 0xfc, 0xc3, 0xfa, 0xc4, 0xed, 0x94, 0xa9, 0xb4, 0xf8, 0x25, 0xf6, 0x2d, 0xfc, - 0x54, 0x6c, 0xad, 0x6d, 0xc3, 0x47, 0x4c, 0x03, 0xed, 0x98, 0xcc, 0x05, 0xd2, 0x95, 0x2b, 0xff, - 0x14, 0x1c, 0xfe, 0x09, 0x00, 0x00, 0xff, 0xff, 0x47, 0x98, 0xd8, 0x20, 0x4b, 0x08, 0x00, 0x00, +var fileDescriptor_367072455c71aedc = []byte{ + // 693 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x55, 0x4f, 0x4f, 0xdb, 0x4a, + 0x10, 0xb7, 0x93, 0xd8, 0x79, 0x99, 0x17, 0x8c, 0xdf, 0xe8, 0x09, 0xac, 0xb4, 0x40, 0xe4, 0x13, + 0x42, 0xc8, 0x54, 0xe9, 0xb5, 0xff, 0x02, 0xa5, 0xaa, 0x54, 0x0e, 0xad, 0x0b, 0xea, 0xd9, 0xd8, + 0x23, 0x6a, 0x91, 0xd8, 0x66, 0x77, 0x03, 0xca, 0xb9, 0x9f, 0xa6, 0xe7, 0x7e, 0xa4, 0x5e, 0xfb, + 0x21, 0x2a, 0xef, 0xae, 0x43, 0x88, 0x31, 0x12, 0x9c, 0xbc, 0xf3, 0xef, 0x37, 0xb3, 0x3b, 0xbf, + 0x19, 0x43, 0x9f, 0xe5, 0x33, 0x41, 0x2c, 0x28, 0x58, 0x2e, 0x72, 0x5c, 0xbf, 0xc8, 0x83, 0x69, + 0x1a, 0xb3, 0x3c, 0x50, 0x6a, 0xbf, 0x07, 0xdd, 0x90, 0xae, 0x66, 0xc4, 0x85, 0x0f, 0xf0, 0x4f, + 0x48, 0xbc, 0xc8, 0x33, 0x4e, 0xfe, 0x1b, 0xe8, 0x9f, 0xa4, 0x5c, 0x54, 0x32, 0x06, 0x60, 0xcb, + 0x00, 0xee, 0x99, 0xc3, 0xf6, 0xee, 0xbf, 0xa3, 0x8d, 0x60, 0x05, 0x28, 0x08, 0xcb, 0x4f, 0xa8, + 0xbd, 0xfc, 0xd7, 0xb0, 0x76, 0x92, 0xe7, 0x97, 0xb3, 0x42, 0x83, 0xe3, 0x3e, 0x58, 0x57, 0x33, + 0x62, 0x73, 0xcf, 0x1c, 0x9a, 0xf7, 0xc6, 0x7f, 0x29, 0xad, 0xa1, 0x72, 0xf2, 0xdf, 0x81, 0x53, + 0x85, 0x3f, 0xb1, 0x80, 0x57, 0xd0, 0x57, 0x88, 0x4f, 0xca, 0xff, 0x16, 0xd6, 0x74, 0xf4, 0x13, + 0xd3, 0x3b, 0xd0, 0xff, 0x16, 0x89, 0xf8, 0x7b, 0xf5, 0xb6, 0x3f, 0x4d, 0xb0, 0xc7, 0xc9, 0x35, + 0x31, 0x81, 0x0e, 0xb4, 0xd2, 0x44, 0x96, 0xd1, 0x0b, 0x5b, 0x69, 0x82, 0x07, 0xd0, 0x11, 0xf3, + 0x82, 0xbc, 0xd6, 0xd0, 0xdc, 0x75, 0x46, 0xcf, 0x6a, 0xc0, 0x2a, 0xec, 0x74, 0x5e, 0x50, 0x28, + 0x1d, 0xf1, 0x39, 0xf4, 0x44, 0x3a, 0x25, 0x2e, 0xa2, 0x69, 0xe1, 0xb5, 0x87, 0xe6, 0x6e, 0x3b, + 0xbc, 0x55, 0xa0, 0x0b, 0x6d, 0x21, 0x26, 0x5e, 0x47, 0xea, 0xcb, 0x63, 0x59, 0x3b, 0x5d, 0x53, + 0x26, 0xb8, 0x67, 0x35, 0xd4, 0x7e, 0x5c, 0x9a, 0x43, 0xed, 0xe5, 0xff, 0x07, 0xeb, 0x9f, 0x59, + 0x1e, 0x13, 0xe7, 0x0b, 0x3a, 0xb8, 0xe0, 0x1c, 0x31, 0x8a, 0x04, 0x2d, 0x6b, 0xde, 0xd3, 0x84, + 0xee, 0x6a, 0xce, 0x8a, 0x64, 0xd9, 0xe7, 0x87, 0x09, 0x96, 0x84, 0xc6, 0x40, 0xdf, 0xd1, 0x94, + 0x77, 0x1c, 0xdc, 0x5f, 0x40, 0xd3, 0x15, 0x5b, 0xab, 0x57, 0xdc, 0x07, 0x4b, 0xc6, 0xc9, 0xcb, + 0x37, 0xf7, 0x42, 0x39, 0xf9, 0x67, 0x60, 0xc9, 0x5e, 0xa2, 0x07, 0x5d, 0x4e, 0xec, 0x3a, 0x8d, + 0x49, 0xbf, 0x7e, 0x25, 0x96, 0x96, 0x8b, 0x48, 0xd0, 0x4d, 0x34, 0x97, 0xc9, 0x7a, 0x61, 0x25, + 0x96, 0x96, 0x8c, 0xc4, 0x4d, 0xce, 0x2e, 0x65, 0xb2, 0x5e, 0x58, 0x89, 0xfe, 0x2f, 0x13, 0x2c, + 0x99, 0xe7, 0x61, 0xdc, 0x28, 0x49, 0x18, 0x71, 0x5e, 0xe1, 0x6a, 0x71, 0x39, 0x63, 0xbb, 0x31, + 0x63, 0xe7, 0x4e, 0x46, 0xdc, 0xd0, 0x1c, 0x64, 0x9e, 0x25, 0x0d, 0x5a, 0x42, 0x84, 0xce, 0x24, + 0xcd, 0x2e, 0x3d, 0x5b, 0x6a, 0xe5, 0xb9, 0xf4, 0x9d, 0x92, 0x60, 0x69, 0xec, 0x75, 0xe5, 0xeb, + 0x69, 0xc9, 0x1f, 0x81, 0xfd, 0x55, 0x44, 0x62, 0xc6, 0xcb, 0xa8, 0x38, 0x4f, 0xaa, 0x92, 0xe5, + 0x19, 0xff, 0x07, 0x8b, 0x18, 0xcb, 0x99, 0xae, 0x56, 0x09, 0xfe, 0x18, 0x1c, 0x15, 0xb3, 0x98, + 0x86, 0x03, 0xb0, 0xb9, 0xd4, 0xe8, 0x69, 0xda, 0xac, 0x75, 0x40, 0x07, 0x68, 0xb7, 0xbd, 0x11, + 0xc0, 0x2d, 0x8d, 0x11, 0xc1, 0x51, 0xd2, 0x38, 0xcb, 0xf2, 0x59, 0x16, 0x93, 0x6b, 0xa0, 0x0b, + 0x7d, 0xa5, 0x53, 0x1c, 0x72, 0xcd, 0xbd, 0x03, 0xe8, 0x2d, 0x68, 0x81, 0x00, 0xb6, 0x22, 0xa0, + 0x6b, 0x94, 0x67, 0x45, 0x3d, 0xd7, 0x2c, 0xcf, 0x3a, 0xa0, 0x35, 0xfa, 0xd3, 0x02, 0x3b, 0x54, + 0x4f, 0xf2, 0x09, 0x6c, 0xb5, 0x3f, 0x70, 0xbb, 0x56, 0xda, 0x9d, 0xbd, 0x34, 0xd8, 0x69, 0xb4, + 0x6b, 0x12, 0x1b, 0x78, 0x08, 0x96, 0x9c, 0x65, 0xdc, 0xaa, 0xf9, 0x2e, 0xcf, 0xf8, 0xa0, 0x61, + 0xae, 0x7c, 0xe3, 0x85, 0x89, 0x87, 0xd0, 0x53, 0xd7, 0x4b, 0x39, 0xa1, 0x57, 0x27, 0xac, 0x86, + 0xd8, 0x6c, 0x98, 0x7e, 0x89, 0xf1, 0x01, 0xba, 0x7a, 0x2e, 0xb1, 0xc9, 0x6f, 0x30, 0xac, 0x19, + 0x56, 0x47, 0xd9, 0xc0, 0xe3, 0x05, 0x07, 0x9a, 0x0b, 0xd9, 0x69, 0xea, 0xe8, 0x02, 0x66, 0xf4, + 0xbb, 0x05, 0xd6, 0x69, 0x74, 0x3e, 0x21, 0x3c, 0xaa, 0x9a, 0x83, 0x0d, 0xa3, 0x78, 0x0f, 0xdc, + 0xca, 0x3a, 0x31, 0x4a, 0x10, 0xd5, 0xd5, 0x47, 0x80, 0xac, 0x6c, 0x20, 0x09, 0xa2, 0xe8, 0xf0, + 0x08, 0x90, 0x95, 0xa5, 0x65, 0xe0, 0x18, 0x3a, 0xe5, 0xbf, 0xef, 0x81, 0xd7, 0xa9, 0x13, 0x61, + 0xf9, 0x67, 0xe9, 0x1b, 0xf8, 0xb1, 0xda, 0x39, 0x5b, 0x0d, 0xff, 0x19, 0x0d, 0xb4, 0xdd, 0x64, + 0xae, 0x90, 0xce, 0x6d, 0xf9, 0xdf, 0x7e, 0xf9, 0x37, 0x00, 0x00, 0xff, 0xff, 0x86, 0x75, 0x28, + 0x0b, 0xc7, 0x07, 0x00, 0x00, } diff --git a/router/service/proto/router.pb.micro.go b/router/service/proto/router.pb.micro.go index 024989b9..3c0059f8 100644 --- a/router/service/proto/router.pb.micro.go +++ b/router/service/proto/router.pb.micro.go @@ -1,5 +1,5 @@ // Code generated by protoc-gen-micro. DO NOT EDIT. -// source: github.com/micro/go-micro/router/proto/router.proto +// source: router.proto package go_micro_router @@ -37,7 +37,6 @@ type RouterService interface { Lookup(ctx context.Context, in *LookupRequest, opts ...client.CallOption) (*LookupResponse, error) Watch(ctx context.Context, in *WatchRequest, opts ...client.CallOption) (Router_WatchService, error) Advertise(ctx context.Context, in *Request, opts ...client.CallOption) (Router_AdvertiseService, error) - Solicit(ctx context.Context, in *Request, opts ...client.CallOption) (*Response, error) Process(ctx context.Context, in *Advert, opts ...client.CallOption) (*ProcessResponse, error) Status(ctx context.Context, in *Request, opts ...client.CallOption) (*StatusResponse, error) } @@ -158,16 +157,6 @@ func (x *routerServiceAdvertise) Recv() (*Advert, error) { return m, nil } -func (c *routerService) Solicit(ctx context.Context, in *Request, opts ...client.CallOption) (*Response, error) { - req := c.c.NewRequest(c.name, "Router.Solicit", in) - out := new(Response) - err := c.c.Call(ctx, req, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - func (c *routerService) Process(ctx context.Context, in *Advert, opts ...client.CallOption) (*ProcessResponse, error) { req := c.c.NewRequest(c.name, "Router.Process", in) out := new(ProcessResponse) @@ -194,7 +183,6 @@ type RouterHandler interface { Lookup(context.Context, *LookupRequest, *LookupResponse) error Watch(context.Context, *WatchRequest, Router_WatchStream) error Advertise(context.Context, *Request, Router_AdvertiseStream) error - Solicit(context.Context, *Request, *Response) error Process(context.Context, *Advert, *ProcessResponse) error Status(context.Context, *Request, *StatusResponse) error } @@ -204,7 +192,6 @@ func RegisterRouterHandler(s server.Server, hdlr RouterHandler, opts ...server.H Lookup(ctx context.Context, in *LookupRequest, out *LookupResponse) error Watch(ctx context.Context, stream server.Stream) error Advertise(ctx context.Context, stream server.Stream) error - Solicit(ctx context.Context, in *Request, out *Response) error Process(ctx context.Context, in *Advert, out *ProcessResponse) error Status(ctx context.Context, in *Request, out *StatusResponse) error } @@ -293,10 +280,6 @@ func (x *routerAdvertiseStream) Send(m *Advert) error { return x.stream.Send(m) } -func (h *routerHandler) Solicit(ctx context.Context, in *Request, out *Response) error { - return h.RouterHandler.Solicit(ctx, in, out) -} - func (h *routerHandler) Process(ctx context.Context, in *Advert, out *ProcessResponse) error { return h.RouterHandler.Process(ctx, in, out) } diff --git a/router/service/proto/router.proto b/router/service/proto/router.proto index 0ae180c9..44539332 100644 --- a/router/service/proto/router.proto +++ b/router/service/proto/router.proto @@ -7,7 +7,6 @@ service Router { rpc Lookup(LookupRequest) returns (LookupResponse) {}; rpc Watch(WatchRequest) returns (stream Event) {}; rpc Advertise(Request) returns (stream Advert) {}; - rpc Solicit(Request) returns (Response) {}; rpc Process(Advert) returns (ProcessResponse) {}; rpc Status(Request) returns (StatusResponse) {}; } @@ -74,12 +73,6 @@ message Advert { repeated Event events = 5; } -// Solicit solicits routes -message Solicit { - // id of the soliciting router - string id = 1; -} - // ProcessResponse is returned by Process message ProcessResponse {} diff --git a/router/service/service.go b/router/service/service.go index 1087da3d..b92a03fd 100644 --- a/router/service/service.go +++ b/router/service/service.go @@ -220,42 +220,6 @@ func (s *svc) Process(advert *router.Advert) error { return nil } -// Solicit advertise all routes -func (s *svc) Solicit() error { - // list all the routes - routes, err := s.table.List() - if err != nil { - return err - } - - // build events to advertise - events := make([]*router.Event, len(routes)) - for i := range events { - events[i] = &router.Event{ - Type: router.Update, - Timestamp: time.Now(), - Route: routes[i], - } - } - - advert := &router.Advert{ - Id: s.opts.Id, - Type: router.RouteUpdate, - Timestamp: time.Now(), - TTL: time.Duration(router.DefaultAdvertTTL), - Events: events, - } - - select { - case s.advertChan <- advert: - case <-s.exit: - close(s.advertChan) - return nil - } - - return nil -} - // Status returns router status func (s *svc) Status() router.Status { s.Lock()