From 770c16a66d008f4e33af5efbade73c55e87c4310 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Tue, 18 Dec 2018 16:51:42 +0000 Subject: [PATCH 1/2] move to using rwmutex for selector --- selector/cache/cache.go | 49 ++++++++++++++++++++++++++++++++--------- 1 file changed, 38 insertions(+), 11 deletions(-) diff --git a/selector/cache/cache.go b/selector/cache/cache.go index 482c068b..b89a9bef 100644 --- a/selector/cache/cache.go +++ b/selector/cache/cache.go @@ -15,7 +15,7 @@ type cacheSelector struct { ttl time.Duration // registry cache - sync.Mutex + sync.RWMutex cache map[string][]*registry.Service ttls map[string]time.Time @@ -81,13 +81,12 @@ func (c *cacheSelector) del(service string) { } func (c *cacheSelector) get(service string) ([]*registry.Service, error) { - c.Lock() - defer c.Unlock() + // read lock for the duration + c.RLock() // watch service if not watched if _, ok := c.watched[service]; !ok { go c.run(service) - c.watched[service] = true } // get does the actual request for a service @@ -100,15 +99,24 @@ func (c *cacheSelector) get(service string) ([]*registry.Service, error) { } // cache results + c.Lock() c.set(service, c.cp(services)) + c.Unlock() + return services, nil } // check the cache first services, ok := c.cache[service] + // make a copy + cp := c.cp(services) // cache miss or no services if !ok || len(services) == 0 { + // unlock the read + c.RUnlock() + + // get and return services return get(service) } @@ -117,15 +125,22 @@ func (c *cacheSelector) get(service string) ([]*registry.Service, error) { // within ttl so return cache if kk && time.Since(ttl) < c.ttl { - return c.cp(services), nil + // unlock the read + c.RUnlock() + + // return servics + return cp, nil } - // expired entry so get service - services, err := get(service) + // unlock read + c.RUnlock() - // no error then return error + // expired entry so get service + rservices, err := get(service) + + // no error then return services if err == nil { - return services, nil + return rservices, nil } // not found error then return @@ -135,8 +150,8 @@ func (c *cacheSelector) get(service string) ([]*registry.Service, error) { // other error - // return expired cache as last resort - return c.cp(services), nil + // return expired cache copy as last resort + return cp, nil } func (c *cacheSelector) set(service string, services []*registry.Service) { @@ -257,6 +272,18 @@ func (c *cacheSelector) update(res *registry.Result) { // reloads the watcher if Init is called // and returns when Close is called func (c *cacheSelector) run(name string) { + // set watcher + c.Lock() + c.watched[name] = true + c.Unlock() + + // delete watcher on exit + defer func() { + c.Lock() + delete(c.watched, name) + c.Unlock() + }() + for { // exit early if already dead if c.quit() { From 67d10e5f39a473cae345ad49d40ca0395f8bff31 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Tue, 18 Dec 2018 18:06:34 +0000 Subject: [PATCH 2/2] simplify get code --- selector/cache/cache.go | 69 +++++++++++++---------------------------- 1 file changed, 21 insertions(+), 48 deletions(-) diff --git a/selector/cache/cache.go b/selector/cache/cache.go index b89a9bef..4947d95c 100644 --- a/selector/cache/cache.go +++ b/selector/cache/cache.go @@ -81,16 +81,25 @@ func (c *cacheSelector) del(service string) { } func (c *cacheSelector) get(service string) ([]*registry.Service, error) { - // read lock for the duration + // read lock c.RLock() - // watch service if not watched - if _, ok := c.watched[service]; !ok { - go c.run(service) + // check the cache first + services, ok := c.cache[service] + // get cache ttl + ttl, kk := c.ttls[service] + + // got services && within ttl so return cache + if ok && kk && time.Since(ttl) < c.ttl { + // make a copy + cp := c.cp(services) + // unlock the read + c.RUnlock() + // return servics + return cp, nil } - // get does the actual request for a service - // it also caches it + // get does the actual request for a service and cache it get := func(service string) ([]*registry.Service, error) { // ask the registry services, err := c.so.Registry.GetService(service) @@ -106,52 +115,16 @@ func (c *cacheSelector) get(service string) ([]*registry.Service, error) { return services, nil } - // check the cache first - services, ok := c.cache[service] - // make a copy - cp := c.cp(services) - - // cache miss or no services - if !ok || len(services) == 0 { - // unlock the read - c.RUnlock() - - // get and return services - return get(service) + // watch service if not watched + if _, ok := c.watched[service]; !ok { + go c.run(service) } - // got cache but lets check ttl - ttl, kk := c.ttls[service] - - // within ttl so return cache - if kk && time.Since(ttl) < c.ttl { - // unlock the read - c.RUnlock() - - // return servics - return cp, nil - } - - // unlock read + // unlock the read lock c.RUnlock() - // expired entry so get service - rservices, err := get(service) - - // no error then return services - if err == nil { - return rservices, nil - } - - // not found error then return - if err == registry.ErrNotFound { - return nil, selector.ErrNotFound - } - - // other error - - // return expired cache copy as last resort - return cp, nil + // get and return services + return get(service) } func (c *cacheSelector) set(service string, services []*registry.Service) {