diff --git a/registry/cache/cache.go b/registry/cache/cache.go index 2799f2d4..70c359ca 100644 --- a/registry/cache/cache.go +++ b/registry/cache/cache.go @@ -31,26 +31,25 @@ type cache struct { registry.Registry opts Options - // registry cache + // registry cache. services,ttls,watched,running are grouped by doman sync.RWMutex - cache map[string][]*registry.Service - ttls map[string]time.Time - watched map[string]bool + services map[string]services + ttls map[string]ttls + watched map[string]watched + running map[string]bool - // used to stop the cache + // used to stop the caches exit chan bool - // indicate whether its running - running bool - // status of the registry - // used to hold onto the cache - // in failure state + // indicate whether its running status of the registry used to hold onto the cache in failure state status error } -var ( - DefaultTTL = time.Minute -) +type services map[string][]*registry.Service +type ttls map[string]time.Time +type watched map[string]bool + +var defaultTTL = time.Minute func backoff(attempts int) time.Duration { if attempts == 0 { @@ -101,47 +100,56 @@ func (c *cache) quit() bool { } } -func (c *cache) del(service string) { +func (c *cache) del(domain, service string) { // don't blow away cache in error state - if err := c.status; err != nil { + if err := c.getStatus(); err != nil { return } - // otherwise delete entries - delete(c.cache, service) - delete(c.ttls, service) + + c.Lock() + defer c.Unlock() + + if _, ok := c.services[domain]; ok { + delete(c.services[domain], service) + } + + if _, ok := c.ttls[domain]; ok { + delete(c.ttls[domain], service) + } } -func (c *cache) get(service string) ([]*registry.Service, error) { - // read lock +func (c *cache) get(domain, service string) ([]*registry.Service, error) { + var services []*registry.Service + var ttl time.Time + + // lookup the values in the cache before calling the underlying registrry c.RLock() + if srvs, ok := c.services[domain]; ok { + services = srvs[service] + } + if tt, ok := c.ttls[domain]; ok { + ttl = tt[service] + } + c.RUnlock() - // check the cache first - services := c.cache[service] - // get cache ttl - ttl := c.ttls[service] - // make a copy - cp := util.Copy(services) - - // got services && within ttl so return cache - if c.isValid(cp, ttl) { - c.RUnlock() - // return services - return cp, nil + // got services && within ttl so return a copy of the services + if c.isValid(services, ttl) { + return util.Copy(services), nil } // get does the actual request for a service and cache it - get := func(service string, cached []*registry.Service) ([]*registry.Service, error) { + get := func(domain string, service string, cached []*registry.Service) ([]*registry.Service, error) { // ask the registry - services, err := c.Registry.GetService(service) + services, err := c.Registry.GetService(service, registry.GetDomain(domain)) if err != nil { + // set the error status + c.setStatus(err) + // check the cache if len(cached) > 0 { - // set the error status - c.setStatus(err) - - // return the stale cache return cached, nil } + // otherwise return error return nil, err } @@ -152,67 +160,87 @@ func (c *cache) get(service string) ([]*registry.Service, error) { } // cache results - c.Lock() - c.set(service, util.Copy(services)) - c.Unlock() + c.set(domain, service, util.Copy(services)) return services, nil } // watch service if not watched - _, ok := c.watched[service] - - // unlock the read lock + c.RLock() + var ok bool + if _, d := c.watched[domain]; d { + if _, s := c.watched[domain][service]; s { + ok = true + } + } c.RUnlock() // check if its being watched if !ok { c.Lock() - // set to watched - c.watched[service] = true - - // only kick it off if not running - if !c.running { - go c.run() + // add domain if not registered + if _, ok := c.watched[domain]; !ok { + c.watched[domain] = make(map[string]bool) } + // set to watched + c.watched[domain][service] = true + + running := c.running[domain] c.Unlock() + + // only kick it off if not running + if !running { + go c.run(domain) + } } // get and return services - return get(service, cp) + return get(domain, service, services) } -func (c *cache) set(service string, services []*registry.Service) { - c.cache[service] = services - c.ttls[service] = time.Now().Add(c.opts.TTL) +func (c *cache) set(domain string, service string, srvs []*registry.Service) { + c.Lock() + defer c.Unlock() + + if _, ok := c.services[domain]; !ok { + c.services[domain] = make(services) + } + if _, ok := c.ttls[domain]; !ok { + c.ttls[domain] = make(ttls) + } + + c.services[domain][service] = srvs + c.ttls[domain][service] = time.Now().Add(c.opts.TTL) } -func (c *cache) update(res *registry.Result) { +func (c *cache) update(domain string, res *registry.Result) { if res == nil || res.Service == nil { return } - c.Lock() - defer c.Unlock() - - // only save watched services + // only save watched services since the service using the cache may only depend on a handful + // of other services + c.RLock() if _, ok := c.watched[res.Service.Name]; !ok { + c.RUnlock() return } - services, ok := c.cache[res.Service.Name] + // we're not going to cache anything unless there was already a lookup + services, ok := c.services[domain][res.Service.Name] if !ok { - // we're not going to cache anything - // unless there was already a lookup + c.RUnlock() return } + c.RUnlock() + if len(res.Service.Nodes) == 0 { switch res.Action { case "delete": - c.del(res.Service.Name) + c.del(domain, res.Service.Name) } return } @@ -230,7 +258,7 @@ func (c *cache) update(res *registry.Result) { switch res.Action { case "create", "update": if service == nil { - c.set(res.Service.Name, append(services, res.Service)) + c.set(domain, res.Service.Name, append(services, res.Service)) return } @@ -249,7 +277,7 @@ func (c *cache) update(res *registry.Result) { } services[index] = res.Service - c.set(res.Service.Name, services) + c.set(domain, res.Service.Name, services) case "delete": if service == nil { return @@ -275,7 +303,7 @@ func (c *cache) update(res *registry.Result) { if len(nodes) > 0 { service.Nodes = nodes services[index] = service - c.set(service.Name, services) + c.set(domain, service.Name, services) return } @@ -284,7 +312,7 @@ func (c *cache) update(res *registry.Result) { // only have one thing to delete // nuke the thing if len(services) == 1 { - c.del(service.Name) + c.del(domain, service.Name) return } @@ -298,22 +326,22 @@ func (c *cache) update(res *registry.Result) { } // save - c.set(service.Name, srvs) + c.set(domain, service.Name, srvs) } } // run starts the cache watcher loop // it creates a new watcher if there's a problem -func (c *cache) run() { +func (c *cache) run(domain string) { c.Lock() - c.running = true + c.running[domain] = true c.Unlock() // reset watcher on exit defer func() { c.Lock() - c.watched = make(map[string]bool) - c.running = false + c.watched[domain] = make(map[string]bool) + c.running[domain] = false c.Unlock() }() @@ -330,7 +358,7 @@ func (c *cache) run() { time.Sleep(time.Duration(j) * time.Millisecond) // create new watcher - w, err := c.Registry.Watch() + w, err := c.Registry.Watch(registry.WatchDomain(domain)) if err != nil { if c.quit() { return @@ -356,7 +384,7 @@ func (c *cache) run() { a = 0 // watch for events - if err := c.watch(w); err != nil { + if err := c.watch(domain, w); err != nil { if c.quit() { return } @@ -384,7 +412,7 @@ func (c *cache) run() { // watch loops the next event and calls update // it returns if there's an error -func (c *cache) watch(w registry.Watcher) error { +func (c *cache) watch(domain string, w registry.Watcher) error { // used to stop the watch stop := make(chan bool) @@ -415,13 +443,29 @@ func (c *cache) watch(w registry.Watcher) error { c.setStatus(nil) } - c.update(res) + // for wildcard queries, the domain will be * and not the services domain, so we'll check to + // see if it was provided in the metadata. + dom := domain + if res.Service.Metadata != nil && len(res.Service.Metadata["domain"]) > 0 { + dom = res.Service.Metadata["domain"] + } + + c.update(dom, res) } } func (c *cache) GetService(service string, opts ...registry.GetOption) ([]*registry.Service, error) { + // parse the options, fallback to the default domain + var options registry.GetOptions + for _, o := range opts { + o(&options) + } + if len(options.Domain) == 0 { + options.Domain = registry.DefaultDomain + } + // get the service - services, err := c.get(service) + services, err := c.get(options.Domain, service) if err != nil { return nil, err } @@ -455,7 +499,7 @@ func (c *cache) String() string { func New(r registry.Registry, opts ...Option) Cache { rand.Seed(time.Now().UnixNano()) options := Options{ - TTL: DefaultTTL, + TTL: defaultTTL, } for _, o := range opts { @@ -465,9 +509,10 @@ func New(r registry.Registry, opts ...Option) Cache { return &cache{ Registry: r, opts: options, - watched: make(map[string]bool), - cache: make(map[string][]*registry.Service), - ttls: make(map[string]time.Time), + running: make(map[string]bool), + watched: make(map[string]watched), + services: make(map[string]services), + ttls: make(map[string]ttls), exit: make(chan bool), } }