move to using rwmutex for selector

This commit is contained in:
Asim Aslam 2018-12-18 16:51:42 +00:00
parent c2cc03a472
commit 770c16a66d

View File

@ -15,7 +15,7 @@ type cacheSelector struct {
ttl time.Duration ttl time.Duration
// registry cache // registry cache
sync.Mutex sync.RWMutex
cache map[string][]*registry.Service cache map[string][]*registry.Service
ttls map[string]time.Time ttls map[string]time.Time
@ -81,13 +81,12 @@ func (c *cacheSelector) del(service string) {
} }
func (c *cacheSelector) get(service string) ([]*registry.Service, error) { func (c *cacheSelector) get(service string) ([]*registry.Service, error) {
c.Lock() // read lock for the duration
defer c.Unlock() c.RLock()
// watch service if not watched // watch service if not watched
if _, ok := c.watched[service]; !ok { if _, ok := c.watched[service]; !ok {
go c.run(service) go c.run(service)
c.watched[service] = true
} }
// get does the actual request for a service // get does the actual request for a service
@ -100,15 +99,24 @@ func (c *cacheSelector) get(service string) ([]*registry.Service, error) {
} }
// cache results // cache results
c.Lock()
c.set(service, c.cp(services)) c.set(service, c.cp(services))
c.Unlock()
return services, nil return services, nil
} }
// check the cache first // check the cache first
services, ok := c.cache[service] services, ok := c.cache[service]
// make a copy
cp := c.cp(services)
// cache miss or no services // cache miss or no services
if !ok || len(services) == 0 { if !ok || len(services) == 0 {
// unlock the read
c.RUnlock()
// get and return services
return get(service) return get(service)
} }
@ -117,15 +125,22 @@ func (c *cacheSelector) get(service string) ([]*registry.Service, error) {
// within ttl so return cache // within ttl so return cache
if kk && time.Since(ttl) < c.ttl { if kk && time.Since(ttl) < c.ttl {
return c.cp(services), nil // unlock the read
c.RUnlock()
// return servics
return cp, nil
} }
// expired entry so get service // unlock read
services, err := get(service) c.RUnlock()
// no error then return error // expired entry so get service
rservices, err := get(service)
// no error then return services
if err == nil { if err == nil {
return services, nil return rservices, nil
} }
// not found error then return // not found error then return
@ -135,8 +150,8 @@ func (c *cacheSelector) get(service string) ([]*registry.Service, error) {
// other error // other error
// return expired cache as last resort // return expired cache copy as last resort
return c.cp(services), nil return cp, nil
} }
func (c *cacheSelector) set(service string, services []*registry.Service) { func (c *cacheSelector) set(service string, services []*registry.Service) {
@ -257,6 +272,18 @@ func (c *cacheSelector) update(res *registry.Result) {
// reloads the watcher if Init is called // reloads the watcher if Init is called
// and returns when Close is called // and returns when Close is called
func (c *cacheSelector) run(name string) { func (c *cacheSelector) run(name string) {
// set watcher
c.Lock()
c.watched[name] = true
c.Unlock()
// delete watcher on exit
defer func() {
c.Lock()
delete(c.watched, name)
c.Unlock()
}()
for { for {
// exit early if already dead // exit early if already dead
if c.quit() { if c.quit() {