Make stop idempotent. Small refactoring. Router name is memory.

This commit is contained in:
Milos Gajdos 2019-08-21 21:10:42 +01:00
parent fb750a0bb1
commit 75871287a1
No known key found for this signature in database
GPG Key ID: 8B31058CC55DFD4F

View File

@ -181,10 +181,6 @@ func (r *router) manageRegistryRoutes(reg registry.Registry, action string) erro
// watchRegistry watches registry and updates routing table based on the received events. // watchRegistry watches registry and updates routing table based on the received events.
// It returns error if either the registry watcher fails with error or if the routing table update fails. // It returns error if either the registry watcher fails with error or if the routing table update fails.
func (r *router) watchRegistry(w registry.Watcher) error { func (r *router) watchRegistry(w registry.Watcher) error {
// wait in the background for the router to stop
// when the router stops, stop the watcher and exit
r.wg.Add(1)
exit := make(chan bool) exit := make(chan bool)
defer func() { defer func() {
@ -193,6 +189,9 @@ func (r *router) watchRegistry(w registry.Watcher) error {
r.wg.Done() r.wg.Done()
}() }()
// wait in the background for the router to stop
// when the router stops, stop the watcher and exit
r.wg.Add(1)
go func() { go func() {
defer w.Stop() defer w.Stop()
@ -226,9 +225,6 @@ func (r *router) watchRegistry(w registry.Watcher) error {
// watchTable watches routing table entries and either adds or deletes locally registered service to/from network registry // watchTable watches routing table entries and either adds or deletes locally registered service to/from network registry
// It returns error if the locally registered services either fails to be added/deleted to/from network registry. // It returns error if the locally registered services either fails to be added/deleted to/from network registry.
func (r *router) watchTable(w Watcher) error { func (r *router) watchTable(w Watcher) error {
// wait in the background for the router to stop
// when the router stops, stop the watcher and exit
r.wg.Add(1)
exit := make(chan bool) exit := make(chan bool)
defer func() { defer func() {
@ -237,6 +233,9 @@ func (r *router) watchTable(w Watcher) error {
r.wg.Done() r.wg.Done()
}() }()
// wait in the background for the router to stop
// when the router stops, stop the watcher and exit
r.wg.Add(1)
go func() { go func() {
defer w.Stop() defer w.Stop()
@ -471,12 +470,38 @@ func (r *router) advertiseEvents() error {
} }
} }
// close closes exit channels
func (r *router) close() {
// notify all goroutines to finish
close(r.exit)
// drain the advertise channel only if advertising
if r.status.Code == Advertising {
// drain the event channel
for range r.eventChan {
}
// close advert subscribers
for id, sub := range r.subscribers {
// close the channel
close(sub)
// delete the subscriber
delete(r.subscribers, id)
}
}
// mark the router as Stopped and set its Error to nil
r.status = Status{Code: Stopped, Error: nil}
}
// watchErrors watches router errors and takes appropriate actions // watchErrors watches router errors and takes appropriate actions
func (r *router) watchErrors() { func (r *router) watchErrors() {
var err error var err error
select { select {
case <-r.exit: case <-r.exit:
return
case err = <-r.errChan: case err = <-r.errChan:
} }
@ -484,22 +509,12 @@ func (r *router) watchErrors() {
defer r.Unlock() defer r.Unlock()
// if the router is not stopped, stop it // if the router is not stopped, stop it
if r.status.Code != Stopped { if r.status.Code != Stopped {
// notify all goroutines to finish // close all the channels
close(r.exit) r.close()
// set the status error
// drain the advertise channel only if the router is advertising
if r.status.Code == Advertising {
// drain the event channel
for range r.eventChan {
}
}
// mark the router as Stopped and set its Error to nil
r.status = Status{Code: Stopped, Error: nil}
}
if err != nil { if err != nil {
r.status = Status{Code: Error, Error: err} r.status.Error = err
}
} }
} }
@ -665,10 +680,12 @@ func (r *router) Process(a *Advert) error {
return nil return nil
} }
// Lookup routes in the routing table
func (r *router) Lookup(q Query) ([]Route, error) { func (r *router) Lookup(q Query) ([]Route, error) {
return r.table.Query(q) return r.table.Query(q)
} }
// Watch routes
func (r *router) Watch(opts ...WatchOption) (Watcher, error) { func (r *router) Watch(opts ...WatchOption) (Watcher, error) {
return r.table.Watch(opts...) return r.table.Watch(opts...)
} }
@ -687,31 +704,15 @@ func (r *router) Status() Status {
// Stop stops the router // Stop stops the router
func (r *router) Stop() error { func (r *router) Stop() error {
r.Lock() r.Lock()
// only close the channel if the router is running and/or advertising defer r.Unlock()
if r.status.Code == Running || r.status.Code == Advertising {
// notify all goroutines to finish
close(r.exit)
// drain the advertise channel only if advertising switch r.status.Code {
if r.status.Code == Advertising { case Stopped, Error:
// drain the event channel return r.status.Error
for range r.eventChan { case Running, Advertising:
// close all the channels
r.close()
} }
}
// close advert subscribers
for id, sub := range r.subscribers {
// close the channel
close(sub)
// delete the subscriber
delete(r.subscribers, id)
}
// mark the router as Stopped and set its Error to nil
r.status = Status{Code: Stopped, Error: nil}
}
r.Unlock()
// wait for all goroutines to finish // wait for all goroutines to finish
r.wg.Wait() r.wg.Wait()
@ -721,5 +722,5 @@ func (r *router) Stop() error {
// String prints debugging information about router // String prints debugging information about router
func (r *router) String() string { func (r *router) String() string {
return "mucp" return "memory"
} }