From 770c16a66d008f4e33af5efbade73c55e87c4310 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Tue, 18 Dec 2018 16:51:42 +0000 Subject: [PATCH] move to using rwmutex for selector --- selector/cache/cache.go | 49 ++++++++++++++++++++++++++++++++--------- 1 file changed, 38 insertions(+), 11 deletions(-) diff --git a/selector/cache/cache.go b/selector/cache/cache.go index 482c068b..b89a9bef 100644 --- a/selector/cache/cache.go +++ b/selector/cache/cache.go @@ -15,7 +15,7 @@ type cacheSelector struct { ttl time.Duration // registry cache - sync.Mutex + sync.RWMutex cache map[string][]*registry.Service ttls map[string]time.Time @@ -81,13 +81,12 @@ func (c *cacheSelector) del(service string) { } func (c *cacheSelector) get(service string) ([]*registry.Service, error) { - c.Lock() - defer c.Unlock() + // read lock for the duration + c.RLock() // watch service if not watched if _, ok := c.watched[service]; !ok { go c.run(service) - c.watched[service] = true } // get does the actual request for a service @@ -100,15 +99,24 @@ func (c *cacheSelector) get(service string) ([]*registry.Service, error) { } // cache results + c.Lock() c.set(service, c.cp(services)) + c.Unlock() + return services, nil } // check the cache first services, ok := c.cache[service] + // make a copy + cp := c.cp(services) // cache miss or no services if !ok || len(services) == 0 { + // unlock the read + c.RUnlock() + + // get and return services return get(service) } @@ -117,15 +125,22 @@ func (c *cacheSelector) get(service string) ([]*registry.Service, error) { // within ttl so return cache if kk && time.Since(ttl) < c.ttl { - return c.cp(services), nil + // unlock the read + c.RUnlock() + + // return servics + return cp, nil } - // expired entry so get service - services, err := get(service) + // unlock read + c.RUnlock() - // no error then return error + // expired entry so get service + rservices, err := get(service) + + // no error then return services if err == nil { - return services, nil + return rservices, nil } // not found error then return @@ -135,8 +150,8 @@ func (c *cacheSelector) get(service string) ([]*registry.Service, error) { // other error - // return expired cache as last resort - return c.cp(services), nil + // return expired cache copy as last resort + return cp, nil } func (c *cacheSelector) set(service string, services []*registry.Service) { @@ -257,6 +272,18 @@ func (c *cacheSelector) update(res *registry.Result) { // reloads the watcher if Init is called // and returns when Close is called func (c *cacheSelector) run(name string) { + // set watcher + c.Lock() + c.watched[name] = true + c.Unlock() + + // delete watcher on exit + defer func() { + c.Lock() + delete(c.watched, name) + c.Unlock() + }() + for { // exit early if already dead if c.quit() {