registry/cache: add support for the domain option (#1722)
This commit is contained in:
parent
43fa6998f6
commit
0e55a08e22
205
cache.go
205
cache.go
@ -31,26 +31,25 @@ type cache struct {
|
|||||||
registry.Registry
|
registry.Registry
|
||||||
opts Options
|
opts Options
|
||||||
|
|
||||||
// registry cache
|
// registry cache. services,ttls,watched,running are grouped by doman
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
cache map[string][]*registry.Service
|
services map[string]services
|
||||||
ttls map[string]time.Time
|
ttls map[string]ttls
|
||||||
watched map[string]bool
|
watched map[string]watched
|
||||||
|
running map[string]bool
|
||||||
|
|
||||||
// used to stop the cache
|
// used to stop the caches
|
||||||
exit chan bool
|
exit chan bool
|
||||||
|
|
||||||
// indicate whether its running
|
// indicate whether its running status of the registry used to hold onto the cache in failure state
|
||||||
running bool
|
|
||||||
// status of the registry
|
|
||||||
// used to hold onto the cache
|
|
||||||
// in failure state
|
|
||||||
status error
|
status error
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
type services map[string][]*registry.Service
|
||||||
DefaultTTL = time.Minute
|
type ttls map[string]time.Time
|
||||||
)
|
type watched map[string]bool
|
||||||
|
|
||||||
|
var defaultTTL = time.Minute
|
||||||
|
|
||||||
func backoff(attempts int) time.Duration {
|
func backoff(attempts int) time.Duration {
|
||||||
if attempts == 0 {
|
if attempts == 0 {
|
||||||
@ -101,47 +100,56 @@ func (c *cache) quit() bool {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) del(service string) {
|
func (c *cache) del(domain, service string) {
|
||||||
// don't blow away cache in error state
|
// don't blow away cache in error state
|
||||||
if err := c.status; err != nil {
|
if err := c.getStatus(); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// otherwise delete entries
|
|
||||||
delete(c.cache, service)
|
c.Lock()
|
||||||
delete(c.ttls, service)
|
defer c.Unlock()
|
||||||
|
|
||||||
|
if _, ok := c.services[domain]; ok {
|
||||||
|
delete(c.services[domain], service)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) get(service string) ([]*registry.Service, error) {
|
if _, ok := c.ttls[domain]; ok {
|
||||||
// read lock
|
delete(c.ttls[domain], service)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cache) get(domain, service string) ([]*registry.Service, error) {
|
||||||
|
var services []*registry.Service
|
||||||
|
var ttl time.Time
|
||||||
|
|
||||||
|
// lookup the values in the cache before calling the underlying registrry
|
||||||
c.RLock()
|
c.RLock()
|
||||||
|
if srvs, ok := c.services[domain]; ok {
|
||||||
// check the cache first
|
services = srvs[service]
|
||||||
services := c.cache[service]
|
}
|
||||||
// get cache ttl
|
if tt, ok := c.ttls[domain]; ok {
|
||||||
ttl := c.ttls[service]
|
ttl = tt[service]
|
||||||
// make a copy
|
}
|
||||||
cp := util.Copy(services)
|
|
||||||
|
|
||||||
// got services && within ttl so return cache
|
|
||||||
if c.isValid(cp, ttl) {
|
|
||||||
c.RUnlock()
|
c.RUnlock()
|
||||||
// return services
|
|
||||||
return cp, nil
|
// got services && within ttl so return a copy of the services
|
||||||
|
if c.isValid(services, ttl) {
|
||||||
|
return util.Copy(services), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// get does the actual request for a service and cache it
|
// get does the actual request for a service and cache it
|
||||||
get := func(service string, cached []*registry.Service) ([]*registry.Service, error) {
|
get := func(domain string, service string, cached []*registry.Service) ([]*registry.Service, error) {
|
||||||
// ask the registry
|
// ask the registry
|
||||||
services, err := c.Registry.GetService(service)
|
services, err := c.Registry.GetService(service, registry.GetDomain(domain))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// check the cache
|
|
||||||
if len(cached) > 0 {
|
|
||||||
// set the error status
|
// set the error status
|
||||||
c.setStatus(err)
|
c.setStatus(err)
|
||||||
|
|
||||||
// return the stale cache
|
// check the cache
|
||||||
|
if len(cached) > 0 {
|
||||||
return cached, nil
|
return cached, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// otherwise return error
|
// otherwise return error
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -152,67 +160,87 @@ func (c *cache) get(service string) ([]*registry.Service, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// cache results
|
// cache results
|
||||||
c.Lock()
|
c.set(domain, service, util.Copy(services))
|
||||||
c.set(service, util.Copy(services))
|
|
||||||
c.Unlock()
|
|
||||||
|
|
||||||
return services, nil
|
return services, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// watch service if not watched
|
// watch service if not watched
|
||||||
_, ok := c.watched[service]
|
c.RLock()
|
||||||
|
var ok bool
|
||||||
// unlock the read lock
|
if _, d := c.watched[domain]; d {
|
||||||
|
if _, s := c.watched[domain][service]; s {
|
||||||
|
ok = true
|
||||||
|
}
|
||||||
|
}
|
||||||
c.RUnlock()
|
c.RUnlock()
|
||||||
|
|
||||||
// check if its being watched
|
// check if its being watched
|
||||||
if !ok {
|
if !ok {
|
||||||
c.Lock()
|
c.Lock()
|
||||||
|
|
||||||
// set to watched
|
// add domain if not registered
|
||||||
c.watched[service] = true
|
if _, ok := c.watched[domain]; !ok {
|
||||||
|
c.watched[domain] = make(map[string]bool)
|
||||||
// only kick it off if not running
|
|
||||||
if !c.running {
|
|
||||||
go c.run()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// set to watched
|
||||||
|
c.watched[domain][service] = true
|
||||||
|
|
||||||
|
running := c.running[domain]
|
||||||
c.Unlock()
|
c.Unlock()
|
||||||
|
|
||||||
|
// only kick it off if not running
|
||||||
|
if !running {
|
||||||
|
go c.run(domain)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// get and return services
|
// get and return services
|
||||||
return get(service, cp)
|
return get(domain, service, services)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) set(service string, services []*registry.Service) {
|
func (c *cache) set(domain string, service string, srvs []*registry.Service) {
|
||||||
c.cache[service] = services
|
c.Lock()
|
||||||
c.ttls[service] = time.Now().Add(c.opts.TTL)
|
defer c.Unlock()
|
||||||
|
|
||||||
|
if _, ok := c.services[domain]; !ok {
|
||||||
|
c.services[domain] = make(services)
|
||||||
|
}
|
||||||
|
if _, ok := c.ttls[domain]; !ok {
|
||||||
|
c.ttls[domain] = make(ttls)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) update(res *registry.Result) {
|
c.services[domain][service] = srvs
|
||||||
|
c.ttls[domain][service] = time.Now().Add(c.opts.TTL)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cache) update(domain string, res *registry.Result) {
|
||||||
if res == nil || res.Service == nil {
|
if res == nil || res.Service == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
c.Lock()
|
// only save watched services since the service using the cache may only depend on a handful
|
||||||
defer c.Unlock()
|
// of other services
|
||||||
|
c.RLock()
|
||||||
// only save watched services
|
|
||||||
if _, ok := c.watched[res.Service.Name]; !ok {
|
if _, ok := c.watched[res.Service.Name]; !ok {
|
||||||
|
c.RUnlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
services, ok := c.cache[res.Service.Name]
|
// we're not going to cache anything unless there was already a lookup
|
||||||
|
services, ok := c.services[domain][res.Service.Name]
|
||||||
if !ok {
|
if !ok {
|
||||||
// we're not going to cache anything
|
c.RUnlock()
|
||||||
// unless there was already a lookup
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
c.RUnlock()
|
||||||
|
|
||||||
if len(res.Service.Nodes) == 0 {
|
if len(res.Service.Nodes) == 0 {
|
||||||
switch res.Action {
|
switch res.Action {
|
||||||
case "delete":
|
case "delete":
|
||||||
c.del(res.Service.Name)
|
c.del(domain, res.Service.Name)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -230,7 +258,7 @@ func (c *cache) update(res *registry.Result) {
|
|||||||
switch res.Action {
|
switch res.Action {
|
||||||
case "create", "update":
|
case "create", "update":
|
||||||
if service == nil {
|
if service == nil {
|
||||||
c.set(res.Service.Name, append(services, res.Service))
|
c.set(domain, res.Service.Name, append(services, res.Service))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -249,7 +277,7 @@ func (c *cache) update(res *registry.Result) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
services[index] = res.Service
|
services[index] = res.Service
|
||||||
c.set(res.Service.Name, services)
|
c.set(domain, res.Service.Name, services)
|
||||||
case "delete":
|
case "delete":
|
||||||
if service == nil {
|
if service == nil {
|
||||||
return
|
return
|
||||||
@ -275,7 +303,7 @@ func (c *cache) update(res *registry.Result) {
|
|||||||
if len(nodes) > 0 {
|
if len(nodes) > 0 {
|
||||||
service.Nodes = nodes
|
service.Nodes = nodes
|
||||||
services[index] = service
|
services[index] = service
|
||||||
c.set(service.Name, services)
|
c.set(domain, service.Name, services)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -284,7 +312,7 @@ func (c *cache) update(res *registry.Result) {
|
|||||||
// only have one thing to delete
|
// only have one thing to delete
|
||||||
// nuke the thing
|
// nuke the thing
|
||||||
if len(services) == 1 {
|
if len(services) == 1 {
|
||||||
c.del(service.Name)
|
c.del(domain, service.Name)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -298,22 +326,22 @@ func (c *cache) update(res *registry.Result) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// save
|
// save
|
||||||
c.set(service.Name, srvs)
|
c.set(domain, service.Name, srvs)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// run starts the cache watcher loop
|
// run starts the cache watcher loop
|
||||||
// it creates a new watcher if there's a problem
|
// it creates a new watcher if there's a problem
|
||||||
func (c *cache) run() {
|
func (c *cache) run(domain string) {
|
||||||
c.Lock()
|
c.Lock()
|
||||||
c.running = true
|
c.running[domain] = true
|
||||||
c.Unlock()
|
c.Unlock()
|
||||||
|
|
||||||
// reset watcher on exit
|
// reset watcher on exit
|
||||||
defer func() {
|
defer func() {
|
||||||
c.Lock()
|
c.Lock()
|
||||||
c.watched = make(map[string]bool)
|
c.watched[domain] = make(map[string]bool)
|
||||||
c.running = false
|
c.running[domain] = false
|
||||||
c.Unlock()
|
c.Unlock()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -330,7 +358,7 @@ func (c *cache) run() {
|
|||||||
time.Sleep(time.Duration(j) * time.Millisecond)
|
time.Sleep(time.Duration(j) * time.Millisecond)
|
||||||
|
|
||||||
// create new watcher
|
// create new watcher
|
||||||
w, err := c.Registry.Watch()
|
w, err := c.Registry.Watch(registry.WatchDomain(domain))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if c.quit() {
|
if c.quit() {
|
||||||
return
|
return
|
||||||
@ -356,7 +384,7 @@ func (c *cache) run() {
|
|||||||
a = 0
|
a = 0
|
||||||
|
|
||||||
// watch for events
|
// watch for events
|
||||||
if err := c.watch(w); err != nil {
|
if err := c.watch(domain, w); err != nil {
|
||||||
if c.quit() {
|
if c.quit() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -384,7 +412,7 @@ func (c *cache) run() {
|
|||||||
|
|
||||||
// watch loops the next event and calls update
|
// watch loops the next event and calls update
|
||||||
// it returns if there's an error
|
// it returns if there's an error
|
||||||
func (c *cache) watch(w registry.Watcher) error {
|
func (c *cache) watch(domain string, w registry.Watcher) error {
|
||||||
// used to stop the watch
|
// used to stop the watch
|
||||||
stop := make(chan bool)
|
stop := make(chan bool)
|
||||||
|
|
||||||
@ -415,13 +443,29 @@ func (c *cache) watch(w registry.Watcher) error {
|
|||||||
c.setStatus(nil)
|
c.setStatus(nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.update(res)
|
// for wildcard queries, the domain will be * and not the services domain, so we'll check to
|
||||||
|
// see if it was provided in the metadata.
|
||||||
|
dom := domain
|
||||||
|
if res.Service.Metadata != nil && len(res.Service.Metadata["domain"]) > 0 {
|
||||||
|
dom = res.Service.Metadata["domain"]
|
||||||
|
}
|
||||||
|
|
||||||
|
c.update(dom, res)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) GetService(service string, opts ...registry.GetOption) ([]*registry.Service, error) {
|
func (c *cache) GetService(service string, opts ...registry.GetOption) ([]*registry.Service, error) {
|
||||||
|
// parse the options, fallback to the default domain
|
||||||
|
var options registry.GetOptions
|
||||||
|
for _, o := range opts {
|
||||||
|
o(&options)
|
||||||
|
}
|
||||||
|
if len(options.Domain) == 0 {
|
||||||
|
options.Domain = registry.DefaultDomain
|
||||||
|
}
|
||||||
|
|
||||||
// get the service
|
// get the service
|
||||||
services, err := c.get(service)
|
services, err := c.get(options.Domain, service)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -455,7 +499,7 @@ func (c *cache) String() string {
|
|||||||
func New(r registry.Registry, opts ...Option) Cache {
|
func New(r registry.Registry, opts ...Option) Cache {
|
||||||
rand.Seed(time.Now().UnixNano())
|
rand.Seed(time.Now().UnixNano())
|
||||||
options := Options{
|
options := Options{
|
||||||
TTL: DefaultTTL,
|
TTL: defaultTTL,
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
@ -465,9 +509,10 @@ func New(r registry.Registry, opts ...Option) Cache {
|
|||||||
return &cache{
|
return &cache{
|
||||||
Registry: r,
|
Registry: r,
|
||||||
opts: options,
|
opts: options,
|
||||||
watched: make(map[string]bool),
|
running: make(map[string]bool),
|
||||||
cache: make(map[string][]*registry.Service),
|
watched: make(map[string]watched),
|
||||||
ttls: make(map[string]time.Time),
|
services: make(map[string]services),
|
||||||
|
ttls: make(map[string]ttls),
|
||||||
exit: make(chan bool),
|
exit: make(chan bool),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user