Merge pull request #689 from milosgajdos83/router-stop
Make router.Stop idempotent
This commit is contained in:
		| @@ -181,10 +181,6 @@ func (r *router) manageRegistryRoutes(reg registry.Registry, action string) erro | ||||
| // watchRegistry watches registry and updates routing table based on the received events. | ||||
| // It returns error if either the registry watcher fails with error or if the routing table update fails. | ||||
| func (r *router) watchRegistry(w registry.Watcher) error { | ||||
| 	// wait in the background for the router to stop | ||||
| 	// when the router stops, stop the watcher and exit | ||||
| 	r.wg.Add(1) | ||||
|  | ||||
| 	exit := make(chan bool) | ||||
|  | ||||
| 	defer func() { | ||||
| @@ -193,6 +189,9 @@ func (r *router) watchRegistry(w registry.Watcher) error { | ||||
| 		r.wg.Done() | ||||
| 	}() | ||||
|  | ||||
| 	// wait in the background for the router to stop | ||||
| 	// when the router stops, stop the watcher and exit | ||||
| 	r.wg.Add(1) | ||||
| 	go func() { | ||||
| 		defer w.Stop() | ||||
|  | ||||
| @@ -226,9 +225,6 @@ func (r *router) watchRegistry(w registry.Watcher) error { | ||||
| // watchTable watches routing table entries and either adds or deletes locally registered service to/from network registry | ||||
| // It returns error if the locally registered services either fails to be added/deleted to/from network registry. | ||||
| func (r *router) watchTable(w Watcher) error { | ||||
| 	// wait in the background for the router to stop | ||||
| 	// when the router stops, stop the watcher and exit | ||||
| 	r.wg.Add(1) | ||||
| 	exit := make(chan bool) | ||||
|  | ||||
| 	defer func() { | ||||
| @@ -237,6 +233,9 @@ func (r *router) watchTable(w Watcher) error { | ||||
| 		r.wg.Done() | ||||
| 	}() | ||||
|  | ||||
| 	// wait in the background for the router to stop | ||||
| 	// when the router stops, stop the watcher and exit | ||||
| 	r.wg.Add(1) | ||||
| 	go func() { | ||||
| 		defer w.Stop() | ||||
|  | ||||
| @@ -471,12 +470,38 @@ func (r *router) advertiseEvents() error { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // close closes exit channels | ||||
| func (r *router) close() { | ||||
| 	// notify all goroutines to finish | ||||
| 	close(r.exit) | ||||
|  | ||||
| 	// drain the advertise channel only if advertising | ||||
| 	if r.status.Code == Advertising { | ||||
| 		// drain the event channel | ||||
| 		for range r.eventChan { | ||||
| 		} | ||||
|  | ||||
| 		// close advert subscribers | ||||
| 		for id, sub := range r.subscribers { | ||||
| 			// close the channel | ||||
| 			close(sub) | ||||
|  | ||||
| 			// delete the subscriber | ||||
| 			delete(r.subscribers, id) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// mark the router as Stopped and set its Error to nil | ||||
| 	r.status = Status{Code: Stopped, Error: nil} | ||||
| } | ||||
|  | ||||
| // watchErrors watches router errors and takes appropriate actions | ||||
| func (r *router) watchErrors() { | ||||
| 	var err error | ||||
|  | ||||
| 	select { | ||||
| 	case <-r.exit: | ||||
| 		return | ||||
| 	case err = <-r.errChan: | ||||
| 	} | ||||
|  | ||||
| @@ -484,22 +509,12 @@ func (r *router) watchErrors() { | ||||
| 	defer r.Unlock() | ||||
| 	// if the router is not stopped, stop it | ||||
| 	if r.status.Code != Stopped { | ||||
| 		// notify all goroutines to finish | ||||
| 		close(r.exit) | ||||
|  | ||||
| 		// drain the advertise channel only if the router is advertising | ||||
| 		if r.status.Code == Advertising { | ||||
| 			// drain the event channel | ||||
| 			for range r.eventChan { | ||||
| 			} | ||||
| 		// close all the channels | ||||
| 		r.close() | ||||
| 		// set the status error | ||||
| 		if err != nil { | ||||
| 			r.status.Error = err | ||||
| 		} | ||||
|  | ||||
| 		// mark the router as Stopped and set its Error to nil | ||||
| 		r.status = Status{Code: Stopped, Error: nil} | ||||
| 	} | ||||
|  | ||||
| 	if err != nil { | ||||
| 		r.status = Status{Code: Error, Error: err} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @@ -665,10 +680,12 @@ func (r *router) Process(a *Advert) error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Lookup routes in the routing table | ||||
| func (r *router) Lookup(q Query) ([]Route, error) { | ||||
| 	return r.table.Query(q) | ||||
| } | ||||
|  | ||||
| // Watch routes | ||||
| func (r *router) Watch(opts ...WatchOption) (Watcher, error) { | ||||
| 	return r.table.Watch(opts...) | ||||
| } | ||||
| @@ -687,31 +704,15 @@ func (r *router) Status() Status { | ||||
| // Stop stops the router | ||||
| func (r *router) Stop() error { | ||||
| 	r.Lock() | ||||
| 	// only close the channel if the router is running and/or advertising | ||||
| 	if r.status.Code == Running || r.status.Code == Advertising { | ||||
| 		// notify all goroutines to finish | ||||
| 		close(r.exit) | ||||
| 	defer r.Unlock() | ||||
|  | ||||
| 		// drain the advertise channel only if advertising | ||||
| 		if r.status.Code == Advertising { | ||||
| 			// drain the event channel | ||||
| 			for range r.eventChan { | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		// close advert subscribers | ||||
| 		for id, sub := range r.subscribers { | ||||
| 			// close the channel | ||||
| 			close(sub) | ||||
|  | ||||
| 			// delete the subscriber | ||||
| 			delete(r.subscribers, id) | ||||
| 		} | ||||
|  | ||||
| 		// mark the router as Stopped and set its Error to nil | ||||
| 		r.status = Status{Code: Stopped, Error: nil} | ||||
| 	switch r.status.Code { | ||||
| 	case Stopped, Error: | ||||
| 		return r.status.Error | ||||
| 	case Running, Advertising: | ||||
| 		// close all the channels | ||||
| 		r.close() | ||||
| 	} | ||||
| 	r.Unlock() | ||||
|  | ||||
| 	// wait for all goroutines to finish | ||||
| 	r.wg.Wait() | ||||
| @@ -721,5 +722,5 @@ func (r *router) Stop() error { | ||||
|  | ||||
| // String prints debugging information about router | ||||
| func (r *router) String() string { | ||||
| 	return "mucp" | ||||
| 	return "memory" | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user