Proper stopping of service router
This commit is contained in:
		| @@ -67,7 +67,12 @@ func (s *svc) Options() router.Options { | |||||||
|  |  | ||||||
| // watchRouter watches router and send events to all registered watchers | // watchRouter watches router and send events to all registered watchers | ||||||
| func (s *svc) watchRouter(stream pb.Router_WatchService) error { | func (s *svc) watchRouter(stream pb.Router_WatchService) error { | ||||||
| 	defer stream.Close() | 	s.wg.Add(1) | ||||||
|  | 	go func() { | ||||||
|  | 		defer s.wg.Done() | ||||||
|  | 		<-s.exit | ||||||
|  | 		stream.Close() | ||||||
|  | 	}() | ||||||
|  |  | ||||||
| 	var watchErr error | 	var watchErr error | ||||||
|  |  | ||||||
| @@ -123,10 +128,13 @@ func (s *svc) watchErrors() { | |||||||
| 	if s.status.Code != router.Stopped { | 	if s.status.Code != router.Stopped { | ||||||
| 		// notify all goroutines to finish | 		// notify all goroutines to finish | ||||||
| 		close(s.exit) | 		close(s.exit) | ||||||
|  | 		if s.status.Code == router.Advertising { | ||||||
| 			// drain the advertise channel | 			// drain the advertise channel | ||||||
| 			for range s.advertChan { | 			for range s.advertChan { | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
|  | 		s.status = router.Status{Code: router.Stopped, Error: nil} | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		s.status = router.Status{Code: router.Error, Error: err} | 		s.status = router.Status{Code: router.Error, Error: err} | ||||||
| @@ -176,7 +184,13 @@ func (s *svc) run() { | |||||||
| } | } | ||||||
|  |  | ||||||
| func (s *svc) advertiseEvents(stream pb.Router_AdvertiseService) error { | func (s *svc) advertiseEvents(stream pb.Router_AdvertiseService) error { | ||||||
| 	defer stream.Close() | 	s.wg.Add(1) | ||||||
|  | 	go func() { | ||||||
|  | 		defer s.wg.Done() | ||||||
|  | 		<-s.exit | ||||||
|  | 		stream.Close() | ||||||
|  | 	}() | ||||||
|  |  | ||||||
| 	var advErr error | 	var advErr error | ||||||
|  |  | ||||||
| 	for { | 	for { | ||||||
| @@ -217,10 +231,14 @@ func (s *svc) advertiseEvents(stream pb.Router_AdvertiseService) error { | |||||||
| 		select { | 		select { | ||||||
| 		case s.advertChan <- advert: | 		case s.advertChan <- advert: | ||||||
| 		case <-s.exit: | 		case <-s.exit: | ||||||
|  | 			close(s.advertChan) | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	// close the channel on exit | ||||||
|  | 	close(s.advertChan) | ||||||
|  |  | ||||||
| 	return advErr | 	return advErr | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -451,9 +469,13 @@ func (s *svc) Stop() error { | |||||||
| 	if s.status.Code == router.Running || s.status.Code == router.Advertising { | 	if s.status.Code == router.Running || s.status.Code == router.Advertising { | ||||||
| 		// notify all goroutines to finish | 		// notify all goroutines to finish | ||||||
| 		close(s.exit) | 		close(s.exit) | ||||||
| 		// drain the advertise channel |  | ||||||
|  | 		// drain the advertise channel only if advertising | ||||||
|  | 		if s.status.Code == router.Advertising { | ||||||
| 			for range s.advertChan { | 			for range s.advertChan { | ||||||
| 			} | 			} | ||||||
|  | 		} | ||||||
|  |  | ||||||
| 		// mark the router as Stopped and set its Error to nil | 		// mark the router as Stopped and set its Error to nil | ||||||
| 		s.status = router.Status{Code: router.Stopped, Error: nil} | 		s.status = router.Status{Code: router.Stopped, Error: nil} | ||||||
| 	} | 	} | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user