diff --git a/router/default.go b/router/default.go index 07bddc0d..f699651c 100644 --- a/router/default.go +++ b/router/default.go @@ -1,6 +1,7 @@ package router import ( + "errors" "fmt" "math" "sort" @@ -16,8 +17,6 @@ import ( var ( // AdvertiseEventsTick is time interval in which the router advertises route updates AdvertiseEventsTick = 10 * time.Second - // AdvertiseTableTick is time interval in which router advertises all routes found in routing table - AdvertiseTableTick = 2 * time.Minute // DefaultAdvertTTL is default advertisement TTL DefaultAdvertTTL = 2 * time.Minute // AdvertSuppress is advert suppression threshold @@ -37,14 +36,12 @@ var ( // router implements default router type router struct { sync.RWMutex - options Options - status Status + + running bool table *table - exit chan struct{} - errChan chan error + options Options + exit chan bool eventChan chan *Event - advertWg *sync.WaitGroup - wg *sync.WaitGroup // advert subscribers sub sync.RWMutex @@ -61,15 +58,9 @@ func newRouter(opts ...Option) Router { o(&options) } - // set initial status to Stopped - status := Status{Code: Stopped, Error: nil} - return &router{ options: options, - status: status, table: newTable(), - advertWg: &sync.WaitGroup{}, - wg: &sync.WaitGroup{}, subscribers: make(map[string]chan *Advert), } } @@ -125,7 +116,7 @@ func (r *router) manageRoute(route Route, action string) error { // manageServiceRoutes applies action to all routes of the service. // It returns error of the action fails with error. -func (r *router) manageServiceRoutes(service *registry.Service, action string) error { +func (r *router) manageRoutes(service *registry.Service, action string) error { // action is the routing table action action = strings.ToLower(action) @@ -166,7 +157,7 @@ func (r *router) manageRegistryRoutes(reg registry.Registry, action string) erro } // manage the routes for all returned services for _, srv := range srvs { - if err := r.manageServiceRoutes(srv, action); err != nil { + if err := r.manageRoutes(srv, action); err != nil { return err } } @@ -181,42 +172,35 @@ func (r *router) watchRegistry(w registry.Watcher) error { exit := make(chan bool) defer func() { - // close the exit channel when the go routine finishes close(exit) }() - // wait in the background for the router to stop - // when the router stops, stop the watcher and exit - r.wg.Add(1) go func() { defer w.Stop() - defer r.wg.Done() select { - case <-r.exit: - return case <-exit: return + case <-r.exit: + return } }() - var watchErr error - for { res, err := w.Next() if err != nil { if err != registry.ErrWatcherStopped { - watchErr = err + return err } break } - if err := r.manageServiceRoutes(res.Service, res.Action); err != nil { + if err := r.manageRoutes(res.Service, res.Action); err != nil { return err } } - return watchErr + return nil } // watchTable watches routing table entries and either adds or deletes locally registered service to/from network registry @@ -225,16 +209,13 @@ func (r *router) watchTable(w Watcher) error { exit := make(chan bool) defer func() { - // close the exit channel when the go routine finishes close(exit) }() // wait in the background for the router to stop // when the router stops, stop the watcher and exit - r.wg.Add(1) go func() { defer w.Stop() - defer r.wg.Done() select { case <-r.exit: @@ -244,13 +225,11 @@ func (r *router) watchTable(w Watcher) error { } }() - var watchErr error - for { event, err := w.Next() if err != nil { if err != ErrWatcherStopped { - watchErr = err + return err } break } @@ -260,13 +239,11 @@ func (r *router) watchTable(w Watcher) error { close(r.eventChan) return nil case r.eventChan <- event: + // process event } } - // close event channel on error - close(r.eventChan) - - return watchErr + return nil } // publishAdvert publishes router advert to advert channel @@ -292,36 +269,6 @@ func (r *router) publishAdvert(advType AdvertType, events []*Event) { r.sub.RUnlock() } -// advertiseTable advertises the whole routing table to the network -func (r *router) advertiseTable() error { - // create table advertisement ticker - ticker := time.NewTicker(AdvertiseTableTick) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - // do full table flush - events, err := r.flushRouteEvents(Update) - if err != nil { - return fmt.Errorf("failed flushing routes: %s", err) - } - - // advertise routes to subscribers - if len(events) > 0 { - log.Debugf("Router flushing table with %d events: %s", len(events), r.options.Id) - r.advertWg.Add(1) - go func() { - defer r.advertWg.Done() - r.publishAdvert(RouteUpdate, events) - }() - } - case <-r.exit: - return nil - } - } -} - // advert contains a route event to be advertised type advert struct { // event received from routing table @@ -392,17 +339,39 @@ func (r *router) advertiseEvents() error { adverts := make(adverts) // routing table watcher - tableWatcher, err := r.Watch() + w, err := r.Watch() if err != nil { - return fmt.Errorf("failed creating routing table watcher: %v", err) + return err } + defer w.Stop() - r.wg.Add(1) go func() { - defer r.wg.Done() - select { - case r.errChan <- r.watchTable(tableWatcher): - case <-r.exit: + var err error + + for { + select { + case <-r.exit: + return + default: + if w == nil { + // routing table watcher + w, err = r.Watch() + if err != nil { + log.Logf("Error creating watcher: %v", err) + time.Sleep(time.Second) + continue + } + } + + if err := r.watchTable(w); err != nil { + log.Logf("Error watching table: %v", err) + time.Sleep(time.Second) + } + + // reset + w.Stop() + w = nil + } } }() @@ -446,11 +415,7 @@ func (r *router) advertiseEvents() error { // advertise events to subscribers if len(events) > 0 { log.Debugf("Router publishing %d events", len(events)) - r.advertWg.Add(1) - go func() { - defer r.advertWg.Done() - r.publishAdvert(RouteUpdate, events) - }() + go r.publishAdvert(RouteUpdate, events) } case e := <-r.eventChan: // if event is nil, continue @@ -502,65 +467,19 @@ func (r *router) advertiseEvents() error { a.penalty += Penalty log.Debugf("Router advert %d for route %s %s event penalty: %f", hash, a.event.Route.Service, a.event.Route.Address, a.penalty) case <-r.exit: - // first wait for the advertiser to finish - r.advertWg.Wait() + w.Stop() return nil } } } -// close closes exit channels -func (r *router) close() { - log.Debugf("Router closing remaining channels") - // drain the advertise channel only if advertising - if r.status.Code == Advertising { - // drain the event channel - for range r.eventChan { - } - - // close advert subscribers - for id, sub := range r.subscribers { - select { - case <-sub: - default: - } - - // close the channel - close(sub) - - // delete the subscriber - r.sub.Lock() - delete(r.subscribers, id) - r.sub.Unlock() - } - } - - // mark the router as Stopped and set its Error to nil - r.status = Status{Code: Stopped, Error: nil} -} - -// watchErrors watches router errors and takes appropriate actions -func (r *router) watchErrors() { - var err error - - select { - case <-r.exit: - return - case err = <-r.errChan: - } - - r.Lock() - defer r.Unlock() - // if the router is not stopped, stop it - if r.status.Code != Stopped { - // notify all goroutines to finish - close(r.exit) - - // close all the channels - r.close() - // set the status error - if err != nil { - r.status.Error = err +// drain all the events, only called on Stop +func (r *router) drain() { + for { + select { + case <-r.eventChan: + default: + return } } } @@ -570,16 +489,13 @@ func (r *router) Start() error { r.Lock() defer r.Unlock() - // only start if we're stopped - if r.status.Code != Stopped { + if r.running { return nil } // add all local service routes into the routing table if err := r.manageRegistryRoutes(r.options.Registry, "create"); err != nil { - e := fmt.Errorf("failed adding registry routes: %s", err) - r.status = Status{Code: Error, Error: e} - return e + return fmt.Errorf("failed adding registry routes: %s", err) } // add default gateway into routing table @@ -595,42 +511,49 @@ func (r *router) Start() error { Metric: DefaultLocalMetric, } if err := r.table.Create(route); err != nil { - e := fmt.Errorf("failed adding default gateway route: %s", err) - r.status = Status{Code: Error, Error: e} - return e + return fmt.Errorf("failed adding default gateway route: %s", err) } } // create error and exit channels - r.errChan = make(chan error, 1) - r.exit = make(chan struct{}) + r.exit = make(chan bool) // registry watcher - regWatcher, err := r.options.Registry.Watch() + w, err := r.options.Registry.Watch() if err != nil { - e := fmt.Errorf("failed creating registry watcher: %v", err) - r.status = Status{Code: Error, Error: e} - return e + return fmt.Errorf("failed creating registry watcher: %v", err) } - r.wg.Add(1) go func() { - defer r.wg.Done() - select { - case r.errChan <- r.watchRegistry(regWatcher): - case <-r.exit: + var err error + + for { + select { + case <-r.exit: + w.Stop() + return + default: + if w == nil { + w, err = r.options.Registry.Watch() + if err != nil { + log.Logf("failed creating registry watcher: %v", err) + time.Sleep(time.Second) + continue + } + } + + if err := r.watchRegistry(w); err != nil { + log.Logf("Error watching the registry: %v", err) + time.Sleep(time.Second) + } + + w.Stop() + w = nil + } } }() - // watch for errors and cleanup - r.wg.Add(1) - go func() { - defer r.wg.Done() - r.watchErrors() - }() - - // mark router as Running - r.status = Status{Code: Running, Error: nil} + r.running = true return nil } @@ -642,61 +565,46 @@ func (r *router) Advertise() (<-chan *Advert, error) { r.Lock() defer r.Unlock() - switch r.status.Code { - case Advertising: - advertChan := make(chan *Advert, 128) - r.subscribers[uuid.New().String()] = advertChan - return advertChan, nil - case Running: - // list all the routes and pack them into even slice to advertise - events, err := r.flushRouteEvents(Create) - if err != nil { - return nil, fmt.Errorf("failed to flush routes: %s", err) - } - - // create event channels - r.eventChan = make(chan *Event) - - // create advert channel - advertChan := make(chan *Advert, 128) - r.subscribers[uuid.New().String()] = advertChan - - // advertise your presence - r.advertWg.Add(1) - go func() { - defer r.advertWg.Done() - r.publishAdvert(Announce, events) - }() - - r.wg.Add(1) - go func() { - defer r.wg.Done() - select { - case r.errChan <- r.advertiseEvents(): - case <-r.exit: - } - }() - - r.advertWg.Add(1) - go func() { - defer r.advertWg.Done() - // advertise the whole routing table - select { - case r.errChan <- r.advertiseTable(): - case <-r.exit: - } - }() - - // mark router as Running and set its Error to nil - r.status = Status{Code: Advertising, Error: nil} - - log.Debugf("Router starting to advertise") - return advertChan, nil - case Stopped: - return nil, fmt.Errorf("not running") + if !r.running { + return nil, errors.New("not running") } - return nil, fmt.Errorf("error: %s", r.status.Error) + // already advertising + if r.eventChan != nil { + advertChan := make(chan *Advert, 128) + r.subscribers[uuid.New().String()] = advertChan + return advertChan, nil + } + + // list all the routes and pack them into even slice to advertise + events, err := r.flushRouteEvents(Create) + if err != nil { + return nil, fmt.Errorf("failed to flush routes: %s", err) + } + + // create event channels + r.eventChan = make(chan *Event) + + // create advert channel + advertChan := make(chan *Advert, 128) + r.subscribers[uuid.New().String()] = advertChan + + // advertise your presence + go r.publishAdvert(Announce, events) + + go func() { + select { + case <-r.exit: + return + default: + if err := r.advertiseEvents(); err != nil { + log.Logf("Error adveritising events: %v", err) + } + } + }() + + return advertChan, nil + } // Process updates the routing table using the advertised values @@ -774,48 +682,39 @@ func (r *router) Watch(opts ...WatchOption) (Watcher, error) { return r.table.Watch(opts...) } -// Status returns router status -func (r *router) Status() Status { - r.RLock() - defer r.RUnlock() - - // make a copy of the status - status := r.status - - return status -} - // Stop stops the router func (r *router) Stop() error { r.Lock() + defer r.Unlock() - log.Debugf("Router shutting down") - - switch r.status.Code { - case Stopped, Error: - r.Unlock() - return r.status.Error - case Running, Advertising: - // notify all goroutines to finish + select { + case <-r.exit: + return nil + default: close(r.exit) - // close all the channels - // NOTE: close marks the router status as Stopped - r.close() + // extract the events + r.drain() + + // close advert subscribers + for id, sub := range r.subscribers { + // close the channel + close(sub) + + // delete the subscriber + r.sub.Lock() + delete(r.subscribers, id) + r.sub.Unlock() + } } - r.Unlock() - log.Tracef("Router waiting for all goroutines to finish") - - // wait for all goroutines to finish - r.wg.Wait() - - log.Debugf("Router successfully stopped") + // remove event chan + r.eventChan = nil return nil } // String prints debugging information about router func (r *router) String() string { - return "memory" + return "registry" } diff --git a/router/default_test.go b/router/default_test.go index 9427ecbc..581ef7ae 100644 --- a/router/default_test.go +++ b/router/default_test.go @@ -38,7 +38,6 @@ func TestRouterAdvertise(t *testing.T) { // lower the advertise interval AdvertiseEventsTick = 500 * time.Millisecond - AdvertiseTableTick = 1 * time.Second if err := r.Start(); err != nil { t.Errorf("failed to start router: %v", err) diff --git a/router/router.go b/router/router.go index b71cbcac..20316477 100644 --- a/router/router.go +++ b/router/router.go @@ -34,8 +34,6 @@ type Router interface { Watch(opts ...WatchOption) (Watcher, error) // Start starts the router Start() error - // Status returns router status - Status() Status // Stop stops the router Stop() error // Returns the router implementation @@ -73,34 +71,6 @@ const ( Error ) -func (s StatusCode) String() string { - switch s { - case Running: - return "running" - case Advertising: - return "advertising" - case Stopped: - return "stopped" - case Error: - return "error" - default: - return "unknown" - } -} - -// Status is router status -type Status struct { - // Code defines router status - Code StatusCode - // Error contains error description - Error error -} - -// String returns human readable status -func (s Status) String() string { - return s.Code.String() -} - // AdvertType is route advertisement type type AdvertType int diff --git a/router/service/proto/router.pb.go b/router/service/proto/router.pb.go index 29904323..c6259a72 100644 --- a/router/service/proto/router.pb.go +++ b/router/service/proto/router.pb.go @@ -1,5 +1,5 @@ // Code generated by protoc-gen-go. DO NOT EDIT. -// source: router.proto +// source: micro/go-micro/router/service/proto/router.proto package go_micro_router @@ -43,7 +43,7 @@ func (x AdvertType) String() string { } func (AdvertType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_367072455c71aedc, []int{0} + return fileDescriptor_c2b04f200fb3e806, []int{0} } // EventType defines the type of event @@ -72,7 +72,7 @@ func (x EventType) String() string { } func (EventType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_367072455c71aedc, []int{1} + return fileDescriptor_c2b04f200fb3e806, []int{1} } // Empty request @@ -86,7 +86,7 @@ func (m *Request) Reset() { *m = Request{} } func (m *Request) String() string { return proto.CompactTextString(m) } func (*Request) ProtoMessage() {} func (*Request) Descriptor() ([]byte, []int) { - return fileDescriptor_367072455c71aedc, []int{0} + return fileDescriptor_c2b04f200fb3e806, []int{0} } func (m *Request) XXX_Unmarshal(b []byte) error { @@ -118,7 +118,7 @@ func (m *Response) Reset() { *m = Response{} } func (m *Response) String() string { return proto.CompactTextString(m) } func (*Response) ProtoMessage() {} func (*Response) Descriptor() ([]byte, []int) { - return fileDescriptor_367072455c71aedc, []int{1} + return fileDescriptor_c2b04f200fb3e806, []int{1} } func (m *Response) XXX_Unmarshal(b []byte) error { @@ -151,7 +151,7 @@ func (m *ListResponse) Reset() { *m = ListResponse{} } func (m *ListResponse) String() string { return proto.CompactTextString(m) } func (*ListResponse) ProtoMessage() {} func (*ListResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_367072455c71aedc, []int{2} + return fileDescriptor_c2b04f200fb3e806, []int{2} } func (m *ListResponse) XXX_Unmarshal(b []byte) error { @@ -191,7 +191,7 @@ func (m *LookupRequest) Reset() { *m = LookupRequest{} } func (m *LookupRequest) String() string { return proto.CompactTextString(m) } func (*LookupRequest) ProtoMessage() {} func (*LookupRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_367072455c71aedc, []int{3} + return fileDescriptor_c2b04f200fb3e806, []int{3} } func (m *LookupRequest) XXX_Unmarshal(b []byte) error { @@ -231,7 +231,7 @@ func (m *LookupResponse) Reset() { *m = LookupResponse{} } func (m *LookupResponse) String() string { return proto.CompactTextString(m) } func (*LookupResponse) ProtoMessage() {} func (*LookupResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_367072455c71aedc, []int{4} + return fileDescriptor_c2b04f200fb3e806, []int{4} } func (m *LookupResponse) XXX_Unmarshal(b []byte) error { @@ -271,7 +271,7 @@ func (m *QueryRequest) Reset() { *m = QueryRequest{} } func (m *QueryRequest) String() string { return proto.CompactTextString(m) } func (*QueryRequest) ProtoMessage() {} func (*QueryRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_367072455c71aedc, []int{5} + return fileDescriptor_c2b04f200fb3e806, []int{5} } func (m *QueryRequest) XXX_Unmarshal(b []byte) error { @@ -311,7 +311,7 @@ func (m *QueryResponse) Reset() { *m = QueryResponse{} } func (m *QueryResponse) String() string { return proto.CompactTextString(m) } func (*QueryResponse) ProtoMessage() {} func (*QueryResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_367072455c71aedc, []int{6} + return fileDescriptor_c2b04f200fb3e806, []int{6} } func (m *QueryResponse) XXX_Unmarshal(b []byte) error { @@ -350,7 +350,7 @@ func (m *WatchRequest) Reset() { *m = WatchRequest{} } func (m *WatchRequest) String() string { return proto.CompactTextString(m) } func (*WatchRequest) ProtoMessage() {} func (*WatchRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_367072455c71aedc, []int{7} + return fileDescriptor_c2b04f200fb3e806, []int{7} } func (m *WatchRequest) XXX_Unmarshal(b []byte) error { @@ -392,7 +392,7 @@ func (m *Advert) Reset() { *m = Advert{} } func (m *Advert) String() string { return proto.CompactTextString(m) } func (*Advert) ProtoMessage() {} func (*Advert) Descriptor() ([]byte, []int) { - return fileDescriptor_367072455c71aedc, []int{8} + return fileDescriptor_c2b04f200fb3e806, []int{8} } func (m *Advert) XXX_Unmarshal(b []byte) error { @@ -459,7 +459,7 @@ func (m *ProcessResponse) Reset() { *m = ProcessResponse{} } func (m *ProcessResponse) String() string { return proto.CompactTextString(m) } func (*ProcessResponse) ProtoMessage() {} func (*ProcessResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_367072455c71aedc, []int{9} + return fileDescriptor_c2b04f200fb3e806, []int{9} } func (m *ProcessResponse) XXX_Unmarshal(b []byte) error { @@ -491,7 +491,7 @@ func (m *CreateResponse) Reset() { *m = CreateResponse{} } func (m *CreateResponse) String() string { return proto.CompactTextString(m) } func (*CreateResponse) ProtoMessage() {} func (*CreateResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_367072455c71aedc, []int{10} + return fileDescriptor_c2b04f200fb3e806, []int{10} } func (m *CreateResponse) XXX_Unmarshal(b []byte) error { @@ -523,7 +523,7 @@ func (m *DeleteResponse) Reset() { *m = DeleteResponse{} } func (m *DeleteResponse) String() string { return proto.CompactTextString(m) } func (*DeleteResponse) ProtoMessage() {} func (*DeleteResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_367072455c71aedc, []int{11} + return fileDescriptor_c2b04f200fb3e806, []int{11} } func (m *DeleteResponse) XXX_Unmarshal(b []byte) error { @@ -555,7 +555,7 @@ func (m *UpdateResponse) Reset() { *m = UpdateResponse{} } func (m *UpdateResponse) String() string { return proto.CompactTextString(m) } func (*UpdateResponse) ProtoMessage() {} func (*UpdateResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_367072455c71aedc, []int{12} + return fileDescriptor_c2b04f200fb3e806, []int{12} } func (m *UpdateResponse) XXX_Unmarshal(b []byte) error { @@ -593,7 +593,7 @@ func (m *Event) Reset() { *m = Event{} } func (m *Event) String() string { return proto.CompactTextString(m) } func (*Event) ProtoMessage() {} func (*Event) Descriptor() ([]byte, []int) { - return fileDescriptor_367072455c71aedc, []int{13} + return fileDescriptor_c2b04f200fb3e806, []int{13} } func (m *Event) XXX_Unmarshal(b []byte) error { @@ -652,7 +652,7 @@ func (m *Query) Reset() { *m = Query{} } func (m *Query) String() string { return proto.CompactTextString(m) } func (*Query) ProtoMessage() {} func (*Query) Descriptor() ([]byte, []int) { - return fileDescriptor_367072455c71aedc, []int{14} + return fileDescriptor_c2b04f200fb3e806, []int{14} } func (m *Query) XXX_Unmarshal(b []byte) error { @@ -719,7 +719,7 @@ func (m *Route) Reset() { *m = Route{} } func (m *Route) String() string { return proto.CompactTextString(m) } func (*Route) ProtoMessage() {} func (*Route) Descriptor() ([]byte, []int) { - return fileDescriptor_367072455c71aedc, []int{15} + return fileDescriptor_c2b04f200fb3e806, []int{15} } func (m *Route) XXX_Unmarshal(b []byte) error { @@ -789,92 +789,6 @@ func (m *Route) GetMetric() int64 { return 0 } -type Status struct { - Code string `protobuf:"bytes,1,opt,name=code,proto3" json:"code,omitempty"` - Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *Status) Reset() { *m = Status{} } -func (m *Status) String() string { return proto.CompactTextString(m) } -func (*Status) ProtoMessage() {} -func (*Status) Descriptor() ([]byte, []int) { - return fileDescriptor_367072455c71aedc, []int{16} -} - -func (m *Status) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_Status.Unmarshal(m, b) -} -func (m *Status) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_Status.Marshal(b, m, deterministic) -} -func (m *Status) XXX_Merge(src proto.Message) { - xxx_messageInfo_Status.Merge(m, src) -} -func (m *Status) XXX_Size() int { - return xxx_messageInfo_Status.Size(m) -} -func (m *Status) XXX_DiscardUnknown() { - xxx_messageInfo_Status.DiscardUnknown(m) -} - -var xxx_messageInfo_Status proto.InternalMessageInfo - -func (m *Status) GetCode() string { - if m != nil { - return m.Code - } - return "" -} - -func (m *Status) GetError() string { - if m != nil { - return m.Error - } - return "" -} - -type StatusResponse struct { - Status *Status `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *StatusResponse) Reset() { *m = StatusResponse{} } -func (m *StatusResponse) String() string { return proto.CompactTextString(m) } -func (*StatusResponse) ProtoMessage() {} -func (*StatusResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_367072455c71aedc, []int{17} -} - -func (m *StatusResponse) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_StatusResponse.Unmarshal(m, b) -} -func (m *StatusResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_StatusResponse.Marshal(b, m, deterministic) -} -func (m *StatusResponse) XXX_Merge(src proto.Message) { - xxx_messageInfo_StatusResponse.Merge(m, src) -} -func (m *StatusResponse) XXX_Size() int { - return xxx_messageInfo_StatusResponse.Size(m) -} -func (m *StatusResponse) XXX_DiscardUnknown() { - xxx_messageInfo_StatusResponse.DiscardUnknown(m) -} - -var xxx_messageInfo_StatusResponse proto.InternalMessageInfo - -func (m *StatusResponse) GetStatus() *Status { - if m != nil { - return m.Status - } - return nil -} - func init() { proto.RegisterEnum("go.micro.router.AdvertType", AdvertType_name, AdvertType_value) proto.RegisterEnum("go.micro.router.EventType", EventType_name, EventType_value) @@ -894,56 +808,53 @@ func init() { proto.RegisterType((*Event)(nil), "go.micro.router.Event") proto.RegisterType((*Query)(nil), "go.micro.router.Query") proto.RegisterType((*Route)(nil), "go.micro.router.Route") - proto.RegisterType((*Status)(nil), "go.micro.router.Status") - proto.RegisterType((*StatusResponse)(nil), "go.micro.router.StatusResponse") } -func init() { proto.RegisterFile("router.proto", fileDescriptor_367072455c71aedc) } - -var fileDescriptor_367072455c71aedc = []byte{ - // 693 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x55, 0x4f, 0x4f, 0xdb, 0x4a, - 0x10, 0xb7, 0x93, 0xd8, 0x79, 0x99, 0x17, 0x8c, 0xdf, 0xe8, 0x09, 0xac, 0xb4, 0x40, 0xe4, 0x13, - 0x42, 0xc8, 0x54, 0xe9, 0xb5, 0xff, 0x02, 0xa5, 0xaa, 0x54, 0x0e, 0xad, 0x0b, 0xea, 0xd9, 0xd8, - 0x23, 0x6a, 0x91, 0xd8, 0x66, 0x77, 0x03, 0xca, 0xb9, 0x9f, 0xa6, 0xe7, 0x7e, 0xa4, 0x5e, 0xfb, - 0x21, 0x2a, 0xef, 0xae, 0x43, 0x88, 0x31, 0x12, 0x9c, 0xbc, 0xf3, 0xef, 0x37, 0xb3, 0x3b, 0xbf, - 0x19, 0x43, 0x9f, 0xe5, 0x33, 0x41, 0x2c, 0x28, 0x58, 0x2e, 0x72, 0x5c, 0xbf, 0xc8, 0x83, 0x69, - 0x1a, 0xb3, 0x3c, 0x50, 0x6a, 0xbf, 0x07, 0xdd, 0x90, 0xae, 0x66, 0xc4, 0x85, 0x0f, 0xf0, 0x4f, - 0x48, 0xbc, 0xc8, 0x33, 0x4e, 0xfe, 0x1b, 0xe8, 0x9f, 0xa4, 0x5c, 0x54, 0x32, 0x06, 0x60, 0xcb, - 0x00, 0xee, 0x99, 0xc3, 0xf6, 0xee, 0xbf, 0xa3, 0x8d, 0x60, 0x05, 0x28, 0x08, 0xcb, 0x4f, 0xa8, - 0xbd, 0xfc, 0xd7, 0xb0, 0x76, 0x92, 0xe7, 0x97, 0xb3, 0x42, 0x83, 0xe3, 0x3e, 0x58, 0x57, 0x33, - 0x62, 0x73, 0xcf, 0x1c, 0x9a, 0xf7, 0xc6, 0x7f, 0x29, 0xad, 0xa1, 0x72, 0xf2, 0xdf, 0x81, 0x53, - 0x85, 0x3f, 0xb1, 0x80, 0x57, 0xd0, 0x57, 0x88, 0x4f, 0xca, 0xff, 0x16, 0xd6, 0x74, 0xf4, 0x13, - 0xd3, 0x3b, 0xd0, 0xff, 0x16, 0x89, 0xf8, 0x7b, 0xf5, 0xb6, 0x3f, 0x4d, 0xb0, 0xc7, 0xc9, 0x35, - 0x31, 0x81, 0x0e, 0xb4, 0xd2, 0x44, 0x96, 0xd1, 0x0b, 0x5b, 0x69, 0x82, 0x07, 0xd0, 0x11, 0xf3, - 0x82, 0xbc, 0xd6, 0xd0, 0xdc, 0x75, 0x46, 0xcf, 0x6a, 0xc0, 0x2a, 0xec, 0x74, 0x5e, 0x50, 0x28, - 0x1d, 0xf1, 0x39, 0xf4, 0x44, 0x3a, 0x25, 0x2e, 0xa2, 0x69, 0xe1, 0xb5, 0x87, 0xe6, 0x6e, 0x3b, - 0xbc, 0x55, 0xa0, 0x0b, 0x6d, 0x21, 0x26, 0x5e, 0x47, 0xea, 0xcb, 0x63, 0x59, 0x3b, 0x5d, 0x53, - 0x26, 0xb8, 0x67, 0x35, 0xd4, 0x7e, 0x5c, 0x9a, 0x43, 0xed, 0xe5, 0xff, 0x07, 0xeb, 0x9f, 0x59, - 0x1e, 0x13, 0xe7, 0x0b, 0x3a, 0xb8, 0xe0, 0x1c, 0x31, 0x8a, 0x04, 0x2d, 0x6b, 0xde, 0xd3, 0x84, - 0xee, 0x6a, 0xce, 0x8a, 0x64, 0xd9, 0xe7, 0x87, 0x09, 0x96, 0x84, 0xc6, 0x40, 0xdf, 0xd1, 0x94, - 0x77, 0x1c, 0xdc, 0x5f, 0x40, 0xd3, 0x15, 0x5b, 0xab, 0x57, 0xdc, 0x07, 0x4b, 0xc6, 0xc9, 0xcb, - 0x37, 0xf7, 0x42, 0x39, 0xf9, 0x67, 0x60, 0xc9, 0x5e, 0xa2, 0x07, 0x5d, 0x4e, 0xec, 0x3a, 0x8d, - 0x49, 0xbf, 0x7e, 0x25, 0x96, 0x96, 0x8b, 0x48, 0xd0, 0x4d, 0x34, 0x97, 0xc9, 0x7a, 0x61, 0x25, - 0x96, 0x96, 0x8c, 0xc4, 0x4d, 0xce, 0x2e, 0x65, 0xb2, 0x5e, 0x58, 0x89, 0xfe, 0x2f, 0x13, 0x2c, - 0x99, 0xe7, 0x61, 0xdc, 0x28, 0x49, 0x18, 0x71, 0x5e, 0xe1, 0x6a, 0x71, 0x39, 0x63, 0xbb, 0x31, - 0x63, 0xe7, 0x4e, 0x46, 0xdc, 0xd0, 0x1c, 0x64, 0x9e, 0x25, 0x0d, 0x5a, 0x42, 0x84, 0xce, 0x24, - 0xcd, 0x2e, 0x3d, 0x5b, 0x6a, 0xe5, 0xb9, 0xf4, 0x9d, 0x92, 0x60, 0x69, 0xec, 0x75, 0xe5, 0xeb, - 0x69, 0xc9, 0x1f, 0x81, 0xfd, 0x55, 0x44, 0x62, 0xc6, 0xcb, 0xa8, 0x38, 0x4f, 0xaa, 0x92, 0xe5, - 0x19, 0xff, 0x07, 0x8b, 0x18, 0xcb, 0x99, 0xae, 0x56, 0x09, 0xfe, 0x18, 0x1c, 0x15, 0xb3, 0x98, - 0x86, 0x03, 0xb0, 0xb9, 0xd4, 0xe8, 0x69, 0xda, 0xac, 0x75, 0x40, 0x07, 0x68, 0xb7, 0xbd, 0x11, - 0xc0, 0x2d, 0x8d, 0x11, 0xc1, 0x51, 0xd2, 0x38, 0xcb, 0xf2, 0x59, 0x16, 0x93, 0x6b, 0xa0, 0x0b, - 0x7d, 0xa5, 0x53, 0x1c, 0x72, 0xcd, 0xbd, 0x03, 0xe8, 0x2d, 0x68, 0x81, 0x00, 0xb6, 0x22, 0xa0, - 0x6b, 0x94, 0x67, 0x45, 0x3d, 0xd7, 0x2c, 0xcf, 0x3a, 0xa0, 0x35, 0xfa, 0xd3, 0x02, 0x3b, 0x54, - 0x4f, 0xf2, 0x09, 0x6c, 0xb5, 0x3f, 0x70, 0xbb, 0x56, 0xda, 0x9d, 0xbd, 0x34, 0xd8, 0x69, 0xb4, - 0x6b, 0x12, 0x1b, 0x78, 0x08, 0x96, 0x9c, 0x65, 0xdc, 0xaa, 0xf9, 0x2e, 0xcf, 0xf8, 0xa0, 0x61, - 0xae, 0x7c, 0xe3, 0x85, 0x89, 0x87, 0xd0, 0x53, 0xd7, 0x4b, 0x39, 0xa1, 0x57, 0x27, 0xac, 0x86, - 0xd8, 0x6c, 0x98, 0x7e, 0x89, 0xf1, 0x01, 0xba, 0x7a, 0x2e, 0xb1, 0xc9, 0x6f, 0x30, 0xac, 0x19, - 0x56, 0x47, 0xd9, 0xc0, 0xe3, 0x05, 0x07, 0x9a, 0x0b, 0xd9, 0x69, 0xea, 0xe8, 0x02, 0x66, 0xf4, - 0xbb, 0x05, 0xd6, 0x69, 0x74, 0x3e, 0x21, 0x3c, 0xaa, 0x9a, 0x83, 0x0d, 0xa3, 0x78, 0x0f, 0xdc, - 0xca, 0x3a, 0x31, 0x4a, 0x10, 0xd5, 0xd5, 0x47, 0x80, 0xac, 0x6c, 0x20, 0x09, 0xa2, 0xe8, 0xf0, - 0x08, 0x90, 0x95, 0xa5, 0x65, 0xe0, 0x18, 0x3a, 0xe5, 0xbf, 0xef, 0x81, 0xd7, 0xa9, 0x13, 0x61, - 0xf9, 0x67, 0xe9, 0x1b, 0xf8, 0xb1, 0xda, 0x39, 0x5b, 0x0d, 0xff, 0x19, 0x0d, 0xb4, 0xdd, 0x64, - 0xae, 0x90, 0xce, 0x6d, 0xf9, 0xdf, 0x7e, 0xf9, 0x37, 0x00, 0x00, 0xff, 0xff, 0x86, 0x75, 0x28, - 0x0b, 0xc7, 0x07, 0x00, 0x00, +func init() { + proto.RegisterFile("micro/go-micro/router/service/proto/router.proto", fileDescriptor_c2b04f200fb3e806) +} + +var fileDescriptor_c2b04f200fb3e806 = []byte{ + // 646 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x55, 0xcb, 0x4e, 0xdb, 0x4c, + 0x14, 0xb6, 0x9d, 0xd8, 0xf9, 0x7d, 0xfe, 0x10, 0xdc, 0xb3, 0xa0, 0x56, 0x5a, 0x68, 0xe4, 0x15, + 0x42, 0xd4, 0x41, 0xe9, 0xb6, 0x37, 0xa0, 0xad, 0x2a, 0x95, 0x45, 0x6b, 0x81, 0xba, 0x36, 0xc9, + 0x11, 0xb5, 0x48, 0x6c, 0x33, 0x33, 0x01, 0x65, 0xdd, 0x67, 0xe9, 0xa2, 0xeb, 0x3e, 0x52, 0x5f, + 0xa4, 0x9a, 0x8b, 0x21, 0xc4, 0x18, 0x09, 0x56, 0x99, 0x73, 0xfb, 0xce, 0xf5, 0x8b, 0x61, 0x6f, + 0x96, 0x8d, 0x59, 0x31, 0x3c, 0x2b, 0x5e, 0xea, 0x07, 0x2b, 0xe6, 0x82, 0xd8, 0x90, 0x13, 0xbb, + 0xcc, 0xc6, 0x34, 0x2c, 0x59, 0x21, 0x2a, 0x65, 0xac, 0x04, 0x5c, 0x3f, 0x2b, 0x62, 0xe5, 0x1b, + 0x6b, 0x75, 0xe4, 0x43, 0x27, 0xa1, 0x8b, 0x39, 0x71, 0x11, 0x01, 0xfc, 0x97, 0x10, 0x2f, 0x8b, + 0x9c, 0x53, 0xf4, 0x16, 0xba, 0x47, 0x19, 0x17, 0x95, 0x8c, 0x31, 0x78, 0x2a, 0x80, 0x87, 0xf6, + 0xa0, 0xb5, 0xfd, 0xff, 0x68, 0x23, 0x5e, 0x01, 0x8a, 0x13, 0xf9, 0x93, 0x18, 0xaf, 0xe8, 0x0d, + 0xac, 0x1d, 0x15, 0xc5, 0xf9, 0xbc, 0x34, 0xe0, 0xb8, 0x0b, 0xee, 0xc5, 0x9c, 0xd8, 0x22, 0xb4, + 0x07, 0xf6, 0x9d, 0xf1, 0xdf, 0xa4, 0x35, 0xd1, 0x4e, 0xd1, 0x7b, 0xe8, 0x55, 0xe1, 0x8f, 0x2c, + 0xe0, 0x35, 0x74, 0x35, 0xe2, 0xa3, 0xf2, 0xbf, 0x83, 0x35, 0x13, 0xfd, 0xc8, 0xf4, 0x3d, 0xe8, + 0x7e, 0x4f, 0xc5, 0xf8, 0x47, 0x35, 0xdb, 0xdf, 0x36, 0x78, 0xfb, 0x93, 0x4b, 0x62, 0x02, 0x7b, + 0xe0, 0x64, 0x13, 0x55, 0x86, 0x9f, 0x38, 0xd9, 0x04, 0x87, 0xd0, 0x16, 0x8b, 0x92, 0x42, 0x67, + 0x60, 0x6f, 0xf7, 0x46, 0xcf, 0x6a, 0xc0, 0x3a, 0xec, 0x78, 0x51, 0x52, 0xa2, 0x1c, 0xf1, 0x39, + 0xf8, 0x22, 0x9b, 0x11, 0x17, 0xe9, 0xac, 0x0c, 0x5b, 0x03, 0x7b, 0xbb, 0x95, 0xdc, 0x28, 0x30, + 0x80, 0x96, 0x10, 0xd3, 0xb0, 0xad, 0xf4, 0xf2, 0x29, 0x6b, 0xa7, 0x4b, 0xca, 0x05, 0x0f, 0xdd, + 0x86, 0xda, 0x3f, 0x4a, 0x73, 0x62, 0xbc, 0xa2, 0x27, 0xb0, 0xfe, 0x95, 0x15, 0x63, 0xe2, 0xfc, + 0xfa, 0x1c, 0x02, 0xe8, 0x1d, 0x32, 0x4a, 0x05, 0x2d, 0x6b, 0x3e, 0xd0, 0x94, 0x6e, 0x6b, 0x4e, + 0xca, 0xc9, 0xb2, 0xcf, 0x4f, 0x1b, 0x5c, 0x05, 0x8d, 0xb1, 0xe9, 0xd1, 0x56, 0x3d, 0xf6, 0xef, + 0x2e, 0xa0, 0xa9, 0x45, 0x67, 0xb5, 0xc5, 0x5d, 0x70, 0x55, 0x9c, 0x6a, 0xbe, 0x79, 0x17, 0xda, + 0x29, 0x3a, 0x01, 0x57, 0xed, 0x12, 0x43, 0xe8, 0x18, 0x66, 0x98, 0xe9, 0x57, 0xa2, 0xb4, 0x9c, + 0xa5, 0x82, 0xae, 0xd2, 0x85, 0x4a, 0xe6, 0x27, 0x95, 0x28, 0x2d, 0x39, 0x89, 0xab, 0x82, 0x9d, + 0xab, 0x64, 0x7e, 0x52, 0x89, 0xd1, 0x1f, 0x1b, 0x5c, 0x95, 0xe7, 0x7e, 0xdc, 0x74, 0x32, 0x61, + 0xc4, 0x79, 0x85, 0x6b, 0xc4, 0xe5, 0x8c, 0xad, 0xc6, 0x8c, 0xed, 0x5b, 0x19, 0x71, 0xc3, 0xdc, + 0x20, 0x0b, 0x5d, 0x65, 0x30, 0x12, 0x22, 0xb4, 0xa7, 0x59, 0x7e, 0x1e, 0x7a, 0x4a, 0xab, 0xde, + 0xd2, 0x77, 0x46, 0x82, 0x65, 0xe3, 0xb0, 0xa3, 0xa6, 0x67, 0xa4, 0x9d, 0x11, 0xc0, 0xcd, 0x3d, + 0x21, 0x42, 0x4f, 0x4b, 0xfb, 0x79, 0x5e, 0xcc, 0xf3, 0x31, 0x05, 0x16, 0x06, 0xd0, 0xd5, 0x3a, + 0xbd, 0xcc, 0xc0, 0xde, 0x19, 0x82, 0x7f, 0xbd, 0x1f, 0x04, 0xf0, 0xf4, 0x25, 0x04, 0x96, 0x7c, + 0xeb, 0x1b, 0x08, 0x6c, 0xf9, 0x36, 0x01, 0xce, 0xe8, 0x97, 0x03, 0x5e, 0xa2, 0x6b, 0xfb, 0x02, + 0x9e, 0x26, 0x32, 0x6e, 0xd5, 0xb6, 0x74, 0xeb, 0x0f, 0xa2, 0xff, 0xa2, 0xd1, 0x6e, 0xae, 0xc9, + 0xc2, 0x03, 0x70, 0x15, 0xa9, 0x70, 0xb3, 0xe6, 0xbb, 0x4c, 0xb6, 0x7e, 0xc3, 0x81, 0x47, 0xd6, + 0x9e, 0x8d, 0x07, 0xe0, 0xeb, 0xf6, 0x32, 0x4e, 0x18, 0xd6, 0x2f, 0xc7, 0x40, 0x3c, 0x6d, 0xa0, + 0xa1, 0xc2, 0xf8, 0x04, 0x1d, 0x43, 0x10, 0x6c, 0xf2, 0xeb, 0x0f, 0x6a, 0x86, 0x55, 0x4e, 0x59, + 0xa3, 0xbf, 0x0e, 0xb8, 0xc7, 0xe9, 0xe9, 0x94, 0xf0, 0xb0, 0x9a, 0x2a, 0x36, 0x1c, 0xf3, 0x1d, + 0xe3, 0x59, 0x21, 0xa4, 0x25, 0x41, 0xf4, 0x3a, 0x1e, 0x00, 0xb2, 0xc2, 0x61, 0x05, 0xa2, 0xf7, + 0xf8, 0x00, 0x90, 0x15, 0xda, 0x5b, 0xb8, 0x0f, 0x6d, 0xf9, 0xf5, 0xb8, 0x67, 0xbe, 0xf5, 0x0d, + 0x2e, 0x7f, 0x6e, 0x22, 0x0b, 0x3f, 0x57, 0xac, 0xdd, 0x6c, 0xf8, 0xa7, 0x36, 0x40, 0x5b, 0x4d, + 0xe6, 0x0a, 0xe9, 0xd4, 0x53, 0x5f, 0xbe, 0x57, 0xff, 0x02, 0x00, 0x00, 0xff, 0xff, 0x53, 0xd8, + 0xc9, 0xf6, 0x2d, 0x07, 0x00, 0x00, } diff --git a/router/service/proto/router.pb.micro.go b/router/service/proto/router.pb.micro.go index 3c0059f8..ae8143bc 100644 --- a/router/service/proto/router.pb.micro.go +++ b/router/service/proto/router.pb.micro.go @@ -1,5 +1,5 @@ // Code generated by protoc-gen-micro. DO NOT EDIT. -// source: router.proto +// source: micro/go-micro/router/service/proto/router.proto package go_micro_router @@ -38,7 +38,6 @@ type RouterService interface { Watch(ctx context.Context, in *WatchRequest, opts ...client.CallOption) (Router_WatchService, error) Advertise(ctx context.Context, in *Request, opts ...client.CallOption) (Router_AdvertiseService, error) Process(ctx context.Context, in *Advert, opts ...client.CallOption) (*ProcessResponse, error) - Status(ctx context.Context, in *Request, opts ...client.CallOption) (*StatusResponse, error) } type routerService struct { @@ -167,16 +166,6 @@ func (c *routerService) Process(ctx context.Context, in *Advert, opts ...client. return out, nil } -func (c *routerService) Status(ctx context.Context, in *Request, opts ...client.CallOption) (*StatusResponse, error) { - req := c.c.NewRequest(c.name, "Router.Status", in) - out := new(StatusResponse) - err := c.c.Call(ctx, req, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - // Server API for Router service type RouterHandler interface { @@ -184,7 +173,6 @@ type RouterHandler interface { Watch(context.Context, *WatchRequest, Router_WatchStream) error Advertise(context.Context, *Request, Router_AdvertiseStream) error Process(context.Context, *Advert, *ProcessResponse) error - Status(context.Context, *Request, *StatusResponse) error } func RegisterRouterHandler(s server.Server, hdlr RouterHandler, opts ...server.HandlerOption) error { @@ -193,7 +181,6 @@ func RegisterRouterHandler(s server.Server, hdlr RouterHandler, opts ...server.H Watch(ctx context.Context, stream server.Stream) error Advertise(ctx context.Context, stream server.Stream) error Process(ctx context.Context, in *Advert, out *ProcessResponse) error - Status(ctx context.Context, in *Request, out *StatusResponse) error } type Router struct { router @@ -284,10 +271,6 @@ func (h *routerHandler) Process(ctx context.Context, in *Advert, out *ProcessRes return h.RouterHandler.Process(ctx, in, out) } -func (h *routerHandler) Status(ctx context.Context, in *Request, out *StatusResponse) error { - return h.RouterHandler.Status(ctx, in, out) -} - // Client API for Table service type TableService interface { diff --git a/router/service/proto/router.proto b/router/service/proto/router.proto index 44539332..ec2de13b 100644 --- a/router/service/proto/router.proto +++ b/router/service/proto/router.proto @@ -8,7 +8,6 @@ service Router { rpc Watch(WatchRequest) returns (stream Event) {}; rpc Advertise(Request) returns (stream Advert) {}; rpc Process(Advert) returns (ProcessResponse) {}; - rpc Status(Request) returns (StatusResponse) {}; } service Table { @@ -129,12 +128,3 @@ message Route { // the metric / score of this route int64 metric = 7; } - -message Status { - string code = 1; - string error = 2; -} - -message StatusResponse { - Status status = 1; -} diff --git a/router/service/service.go b/router/service/service.go index b92a03fd..3184283d 100644 --- a/router/service/service.go +++ b/router/service/service.go @@ -2,7 +2,6 @@ package service import ( "context" - "errors" "fmt" "io" "sync" @@ -19,7 +18,6 @@ type svc struct { callOpts []client.CallOption router pb.RouterService table *table - status *router.Status exit chan bool errChan chan error advertChan chan *router.Advert @@ -43,16 +41,9 @@ func NewRouter(opts ...router.Option) router.Router { cli = options.Client } - // set the status to Stopped - status := &router.Status{ - Code: router.Stopped, - Error: nil, - } - // NOTE: should we have Client/Service option in router.Options? s := &svc{ opts: options, - status: status, router: pb.NewRouterService(router.DefaultName, cli), } @@ -98,12 +89,6 @@ func (s *svc) Table() router.Table { func (s *svc) Start() error { s.Lock() defer s.Unlock() - - s.status = &router.Status{ - Code: router.Running, - Error: nil, - } - return nil } @@ -169,21 +154,16 @@ func (s *svc) Advertise() (<-chan *router.Advert, error) { s.Lock() defer s.Unlock() - switch s.status.Code { - case router.Running, router.Advertising: - stream, err := s.router.Advertise(context.Background(), &pb.Request{}, s.callOpts...) - if err != nil { - return nil, fmt.Errorf("failed getting advert stream: %s", err) - } - // create advertise and event channels - advertChan := make(chan *router.Advert) - go s.advertiseEvents(advertChan, stream) - return advertChan, nil - case router.Stopped: - return nil, fmt.Errorf("not running") + stream, err := s.router.Advertise(context.Background(), &pb.Request{}, s.callOpts...) + if err != nil { + return nil, fmt.Errorf("failed getting advert stream: %s", err) } - return nil, fmt.Errorf("error: %s", s.status.Error) + // create advertise and event channels + advertChan := make(chan *router.Advert) + go s.advertiseEvents(advertChan, stream) + + return advertChan, nil } // Process processes incoming adverts @@ -220,55 +200,6 @@ func (s *svc) Process(advert *router.Advert) error { return nil } -// Status returns router status -func (s *svc) Status() router.Status { - s.Lock() - defer s.Unlock() - - // check if its stopped - select { - case <-s.exit: - return router.Status{ - Code: router.Stopped, - Error: nil, - } - default: - // don't block - } - - // check the remote router - rsp, err := s.router.Status(context.Background(), &pb.Request{}, s.callOpts...) - if err != nil { - return router.Status{ - Code: router.Error, - Error: err, - } - } - - code := router.Running - var serr error - - switch rsp.Status.Code { - case "running": - code = router.Running - case "advertising": - code = router.Advertising - case "stopped": - code = router.Stopped - case "error": - code = router.Error - } - - if len(rsp.Status.Error) > 0 { - serr = errors.New(rsp.Status.Error) - } - - return router.Status{ - Code: code, - Error: serr, - } -} - // Remote router cannot be stopped func (s *svc) Stop() error { s.Lock()