registry/cache: add support for the domain option (#1722)

This commit is contained in:
ben-toogood 2020-06-19 13:16:44 +01:00 committed by GitHub
parent 2b889087bd
commit 5f9c3a6efd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -31,26 +31,25 @@ type cache struct {
registry.Registry registry.Registry
opts Options opts Options
// registry cache // registry cache. services,ttls,watched,running are grouped by doman
sync.RWMutex sync.RWMutex
cache map[string][]*registry.Service services map[string]services
ttls map[string]time.Time ttls map[string]ttls
watched map[string]bool watched map[string]watched
running map[string]bool
// used to stop the cache // used to stop the caches
exit chan bool exit chan bool
// indicate whether its running // indicate whether its running status of the registry used to hold onto the cache in failure state
running bool
// status of the registry
// used to hold onto the cache
// in failure state
status error status error
} }
var ( type services map[string][]*registry.Service
DefaultTTL = time.Minute type ttls map[string]time.Time
) type watched map[string]bool
var defaultTTL = time.Minute
func backoff(attempts int) time.Duration { func backoff(attempts int) time.Duration {
if attempts == 0 { if attempts == 0 {
@ -101,47 +100,56 @@ func (c *cache) quit() bool {
} }
} }
func (c *cache) del(service string) { func (c *cache) del(domain, service string) {
// don't blow away cache in error state // don't blow away cache in error state
if err := c.status; err != nil { if err := c.getStatus(); err != nil {
return return
} }
// otherwise delete entries
delete(c.cache, service) c.Lock()
delete(c.ttls, service) defer c.Unlock()
if _, ok := c.services[domain]; ok {
delete(c.services[domain], service)
}
if _, ok := c.ttls[domain]; ok {
delete(c.ttls[domain], service)
}
} }
func (c *cache) get(service string) ([]*registry.Service, error) { func (c *cache) get(domain, service string) ([]*registry.Service, error) {
// read lock var services []*registry.Service
var ttl time.Time
// lookup the values in the cache before calling the underlying registrry
c.RLock() c.RLock()
if srvs, ok := c.services[domain]; ok {
// check the cache first services = srvs[service]
services := c.cache[service] }
// get cache ttl if tt, ok := c.ttls[domain]; ok {
ttl := c.ttls[service] ttl = tt[service]
// make a copy }
cp := util.Copy(services)
// got services && within ttl so return cache
if c.isValid(cp, ttl) {
c.RUnlock() c.RUnlock()
// return services
return cp, nil // got services && within ttl so return a copy of the services
if c.isValid(services, ttl) {
return util.Copy(services), nil
} }
// get does the actual request for a service and cache it // get does the actual request for a service and cache it
get := func(service string, cached []*registry.Service) ([]*registry.Service, error) { get := func(domain string, service string, cached []*registry.Service) ([]*registry.Service, error) {
// ask the registry // ask the registry
services, err := c.Registry.GetService(service) services, err := c.Registry.GetService(service, registry.GetDomain(domain))
if err != nil { if err != nil {
// check the cache
if len(cached) > 0 {
// set the error status // set the error status
c.setStatus(err) c.setStatus(err)
// return the stale cache // check the cache
if len(cached) > 0 {
return cached, nil return cached, nil
} }
// otherwise return error // otherwise return error
return nil, err return nil, err
} }
@ -152,67 +160,87 @@ func (c *cache) get(service string) ([]*registry.Service, error) {
} }
// cache results // cache results
c.Lock() c.set(domain, service, util.Copy(services))
c.set(service, util.Copy(services))
c.Unlock()
return services, nil return services, nil
} }
// watch service if not watched // watch service if not watched
_, ok := c.watched[service] c.RLock()
var ok bool
// unlock the read lock if _, d := c.watched[domain]; d {
if _, s := c.watched[domain][service]; s {
ok = true
}
}
c.RUnlock() c.RUnlock()
// check if its being watched // check if its being watched
if !ok { if !ok {
c.Lock() c.Lock()
// set to watched // add domain if not registered
c.watched[service] = true if _, ok := c.watched[domain]; !ok {
c.watched[domain] = make(map[string]bool)
// only kick it off if not running
if !c.running {
go c.run()
} }
// set to watched
c.watched[domain][service] = true
running := c.running[domain]
c.Unlock() c.Unlock()
// only kick it off if not running
if !running {
go c.run(domain)
}
} }
// get and return services // get and return services
return get(service, cp) return get(domain, service, services)
} }
func (c *cache) set(service string, services []*registry.Service) { func (c *cache) set(domain string, service string, srvs []*registry.Service) {
c.cache[service] = services c.Lock()
c.ttls[service] = time.Now().Add(c.opts.TTL) defer c.Unlock()
if _, ok := c.services[domain]; !ok {
c.services[domain] = make(services)
}
if _, ok := c.ttls[domain]; !ok {
c.ttls[domain] = make(ttls)
}
c.services[domain][service] = srvs
c.ttls[domain][service] = time.Now().Add(c.opts.TTL)
} }
func (c *cache) update(res *registry.Result) { func (c *cache) update(domain string, res *registry.Result) {
if res == nil || res.Service == nil { if res == nil || res.Service == nil {
return return
} }
c.Lock() // only save watched services since the service using the cache may only depend on a handful
defer c.Unlock() // of other services
c.RLock()
// only save watched services
if _, ok := c.watched[res.Service.Name]; !ok { if _, ok := c.watched[res.Service.Name]; !ok {
c.RUnlock()
return return
} }
services, ok := c.cache[res.Service.Name] // we're not going to cache anything unless there was already a lookup
services, ok := c.services[domain][res.Service.Name]
if !ok { if !ok {
// we're not going to cache anything c.RUnlock()
// unless there was already a lookup
return return
} }
c.RUnlock()
if len(res.Service.Nodes) == 0 { if len(res.Service.Nodes) == 0 {
switch res.Action { switch res.Action {
case "delete": case "delete":
c.del(res.Service.Name) c.del(domain, res.Service.Name)
} }
return return
} }
@ -230,7 +258,7 @@ func (c *cache) update(res *registry.Result) {
switch res.Action { switch res.Action {
case "create", "update": case "create", "update":
if service == nil { if service == nil {
c.set(res.Service.Name, append(services, res.Service)) c.set(domain, res.Service.Name, append(services, res.Service))
return return
} }
@ -249,7 +277,7 @@ func (c *cache) update(res *registry.Result) {
} }
services[index] = res.Service services[index] = res.Service
c.set(res.Service.Name, services) c.set(domain, res.Service.Name, services)
case "delete": case "delete":
if service == nil { if service == nil {
return return
@ -275,7 +303,7 @@ func (c *cache) update(res *registry.Result) {
if len(nodes) > 0 { if len(nodes) > 0 {
service.Nodes = nodes service.Nodes = nodes
services[index] = service services[index] = service
c.set(service.Name, services) c.set(domain, service.Name, services)
return return
} }
@ -284,7 +312,7 @@ func (c *cache) update(res *registry.Result) {
// only have one thing to delete // only have one thing to delete
// nuke the thing // nuke the thing
if len(services) == 1 { if len(services) == 1 {
c.del(service.Name) c.del(domain, service.Name)
return return
} }
@ -298,22 +326,22 @@ func (c *cache) update(res *registry.Result) {
} }
// save // save
c.set(service.Name, srvs) c.set(domain, service.Name, srvs)
} }
} }
// run starts the cache watcher loop // run starts the cache watcher loop
// it creates a new watcher if there's a problem // it creates a new watcher if there's a problem
func (c *cache) run() { func (c *cache) run(domain string) {
c.Lock() c.Lock()
c.running = true c.running[domain] = true
c.Unlock() c.Unlock()
// reset watcher on exit // reset watcher on exit
defer func() { defer func() {
c.Lock() c.Lock()
c.watched = make(map[string]bool) c.watched[domain] = make(map[string]bool)
c.running = false c.running[domain] = false
c.Unlock() c.Unlock()
}() }()
@ -330,7 +358,7 @@ func (c *cache) run() {
time.Sleep(time.Duration(j) * time.Millisecond) time.Sleep(time.Duration(j) * time.Millisecond)
// create new watcher // create new watcher
w, err := c.Registry.Watch() w, err := c.Registry.Watch(registry.WatchDomain(domain))
if err != nil { if err != nil {
if c.quit() { if c.quit() {
return return
@ -356,7 +384,7 @@ func (c *cache) run() {
a = 0 a = 0
// watch for events // watch for events
if err := c.watch(w); err != nil { if err := c.watch(domain, w); err != nil {
if c.quit() { if c.quit() {
return return
} }
@ -384,7 +412,7 @@ func (c *cache) run() {
// watch loops the next event and calls update // watch loops the next event and calls update
// it returns if there's an error // it returns if there's an error
func (c *cache) watch(w registry.Watcher) error { func (c *cache) watch(domain string, w registry.Watcher) error {
// used to stop the watch // used to stop the watch
stop := make(chan bool) stop := make(chan bool)
@ -415,13 +443,29 @@ func (c *cache) watch(w registry.Watcher) error {
c.setStatus(nil) c.setStatus(nil)
} }
c.update(res) // for wildcard queries, the domain will be * and not the services domain, so we'll check to
// see if it was provided in the metadata.
dom := domain
if res.Service.Metadata != nil && len(res.Service.Metadata["domain"]) > 0 {
dom = res.Service.Metadata["domain"]
}
c.update(dom, res)
} }
} }
func (c *cache) GetService(service string, opts ...registry.GetOption) ([]*registry.Service, error) { func (c *cache) GetService(service string, opts ...registry.GetOption) ([]*registry.Service, error) {
// parse the options, fallback to the default domain
var options registry.GetOptions
for _, o := range opts {
o(&options)
}
if len(options.Domain) == 0 {
options.Domain = registry.DefaultDomain
}
// get the service // get the service
services, err := c.get(service) services, err := c.get(options.Domain, service)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -455,7 +499,7 @@ func (c *cache) String() string {
func New(r registry.Registry, opts ...Option) Cache { func New(r registry.Registry, opts ...Option) Cache {
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
options := Options{ options := Options{
TTL: DefaultTTL, TTL: defaultTTL,
} }
for _, o := range opts { for _, o := range opts {
@ -465,9 +509,10 @@ func New(r registry.Registry, opts ...Option) Cache {
return &cache{ return &cache{
Registry: r, Registry: r,
opts: options, opts: options,
watched: make(map[string]bool), running: make(map[string]bool),
cache: make(map[string][]*registry.Service), watched: make(map[string]watched),
ttls: make(map[string]time.Time), services: make(map[string]services),
ttls: make(map[string]ttls),
exit: make(chan bool), exit: make(chan bool),
} }
} }