don't close next chan, instead use exit chan

This commit is contained in:
Asim 2016-07-30 11:58:10 +01:00
parent 720bda1439
commit 2d6c403992

View File

@ -13,18 +13,17 @@ type consulWatcher struct {
wp *watch.WatchPlan wp *watch.WatchPlan
watchers map[string]*watch.WatchPlan watchers map[string]*watch.WatchPlan
once sync.Once
next chan *Result next chan *Result
exit chan bool
sync.RWMutex sync.RWMutex
services map[string][]*Service services map[string][]*Service
} }
func newConsulWatcher(cr *consulRegistry) (Watcher, error) { func newConsulWatcher(cr *consulRegistry) (Watcher, error) {
var once sync.Once
cw := &consulWatcher{ cw := &consulWatcher{
r: cr, r: cr,
once: once, exit: make(chan bool),
next: make(chan *Result, 10), next: make(chan *Result, 10),
watchers: make(map[string]*watch.WatchPlan), watchers: make(map[string]*watch.WatchPlan),
services: make(map[string][]*Service), services: make(map[string][]*Service),
@ -229,20 +228,36 @@ func (cw *consulWatcher) handle(idx uint64, data interface{}) {
} }
func (cw *consulWatcher) Next() (*Result, error) { func (cw *consulWatcher) Next() (*Result, error) {
r, ok := <-cw.next select {
case <-cw.exit:
return nil, errors.New("result chan closed")
case r, ok := <-cw.next:
if !ok { if !ok {
return nil, errors.New("chan closed") return nil, errors.New("result chan closed")
} }
return r, nil return r, nil
}
return nil, errors.New("result chan closed")
} }
func (cw *consulWatcher) Stop() { func (cw *consulWatcher) Stop() {
select {
case <-cw.exit:
return
default:
close(cw.exit)
if cw.wp == nil { if cw.wp == nil {
return return
} }
cw.wp.Stop() cw.wp.Stop()
cw.once.Do(func() { // drain results
close(cw.next) for {
}) select {
case <-cw.next:
default:
return
}
}
}
} }