Remove go routines for mdns watcher and cache registry (#919)

This commit is contained in:
Asim Aslam 2019-11-06 15:49:40 +00:00 committed by Vasiliy Tolstov
parent 537ce0a258
commit e1994ce661

View File

@ -39,6 +39,8 @@ type cache struct {
// used to stop the cache // used to stop the cache
exit chan bool exit chan bool
// indicate whether its running
running bool
// status of the registry // status of the registry
// used to hold onto the cache // used to hold onto the cache
// in failure state // in failure state
@ -157,13 +159,26 @@ func (c *cache) get(service string) ([]*registry.Service, error) {
} }
// watch service if not watched // watch service if not watched
if _, ok := c.watched[service]; !ok { _, ok := c.watched[service]
go c.run(service)
}
// unlock the read lock // unlock the read lock
c.RUnlock() c.RUnlock()
// check if its being watched
if !ok {
c.Lock()
// set to watched
c.watched[service] = true
// only kick it off if not running
if !c.running {
go c.run()
}
c.Unlock()
}
// get and return services // get and return services
return get(service, cp) return get(service, cp)
} }
@ -181,6 +196,11 @@ func (c *cache) update(res *registry.Result) {
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
// only save watched services
if _, ok := c.watched[res.Service.Name]; !ok {
return
}
services, ok := c.cache[res.Service.Name] services, ok := c.cache[res.Service.Name]
if !ok { if !ok {
// we're not going to cache anything // we're not going to cache anything
@ -283,16 +303,16 @@ func (c *cache) update(res *registry.Result) {
// run starts the cache watcher loop // run starts the cache watcher loop
// it creates a new watcher if there's a problem // it creates a new watcher if there's a problem
func (c *cache) run(service string) { func (c *cache) run() {
// set watcher
c.Lock() c.Lock()
c.watched[service] = true c.running = true
c.Unlock() c.Unlock()
// delete watcher on exit // reset watcher on exit
defer func() { defer func() {
c.Lock() c.Lock()
delete(c.watched, service) c.watched = make(map[string]bool)
c.running = false
c.Unlock() c.Unlock()
}() }()
@ -309,10 +329,7 @@ func (c *cache) run(service string) {
time.Sleep(time.Duration(j) * time.Millisecond) time.Sleep(time.Duration(j) * time.Millisecond)
// create new watcher // create new watcher
w, err := c.Registry.Watch( w, err := c.Registry.Watch()
registry.WatchService(service),
)
if err != nil { if err != nil {
if c.quit() { if c.quit() {
return return
@ -414,6 +431,9 @@ func (c *cache) GetService(service string) ([]*registry.Service, error) {
} }
func (c *cache) Stop() { func (c *cache) Stop() {
c.Lock()
defer c.Unlock()
select { select {
case <-c.exit: case <-c.exit:
return return