Merge pull request #352 from micro/rwmutex

move to using rwmutex for selector
This commit is contained in:
Asim Aslam 2018-12-18 18:10:19 +00:00 committed by GitHub
commit f2efc685d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -15,7 +15,7 @@ type cacheSelector struct {
ttl time.Duration
// registry cache
sync.Mutex
sync.RWMutex
cache map[string][]*registry.Service
ttls map[string]time.Time
@ -81,17 +81,25 @@ func (c *cacheSelector) del(service string) {
}
func (c *cacheSelector) get(service string) ([]*registry.Service, error) {
c.Lock()
defer c.Unlock()
// read lock
c.RLock()
// watch service if not watched
if _, ok := c.watched[service]; !ok {
go c.run(service)
c.watched[service] = true
// check the cache first
services, ok := c.cache[service]
// get cache ttl
ttl, kk := c.ttls[service]
// got services && within ttl so return cache
if ok && kk && time.Since(ttl) < c.ttl {
// make a copy
cp := c.cp(services)
// unlock the read
c.RUnlock()
// return servics
return cp, nil
}
// get does the actual request for a service
// it also caches it
// get does the actual request for a service and cache it
get := func(service string) ([]*registry.Service, error) {
// ask the registry
services, err := c.so.Registry.GetService(service)
@ -100,45 +108,25 @@ func (c *cacheSelector) get(service string) ([]*registry.Service, error) {
}
// cache results
c.Lock()
c.set(service, c.cp(services))
c.Unlock()
return services, nil
}
// check the cache first
services, ok := c.cache[service]
// watch service if not watched
if _, ok := c.watched[service]; !ok {
go c.run(service)
}
// cache miss or no services
if !ok || len(services) == 0 {
// unlock the read lock
c.RUnlock()
// get and return services
return get(service)
}
// got cache but lets check ttl
ttl, kk := c.ttls[service]
// within ttl so return cache
if kk && time.Since(ttl) < c.ttl {
return c.cp(services), nil
}
// expired entry so get service
services, err := get(service)
// no error then return error
if err == nil {
return services, nil
}
// not found error then return
if err == registry.ErrNotFound {
return nil, selector.ErrNotFound
}
// other error
// return expired cache as last resort
return c.cp(services), nil
}
func (c *cacheSelector) set(service string, services []*registry.Service) {
c.cache[service] = services
c.ttls[service] = time.Now().Add(c.ttl)
@ -257,6 +245,18 @@ func (c *cacheSelector) update(res *registry.Result) {
// reloads the watcher if Init is called
// and returns when Close is called
func (c *cacheSelector) run(name string) {
// set watcher
c.Lock()
c.watched[name] = true
c.Unlock()
// delete watcher on exit
defer func() {
c.Lock()
delete(c.watched, name)
c.Unlock()
}()
for {
// exit early if already dead
if c.quit() {