519 lines
9.6 KiB
Go
519 lines
9.6 KiB
Go
// Package cache provides a registry cache
|
|
package cache
|
|
|
|
import (
|
|
"math"
|
|
"math/rand"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/unistack-org/micro/v3/logger"
|
|
"github.com/unistack-org/micro/v3/registry"
|
|
util "github.com/unistack-org/micro/v3/util/registry"
|
|
)
|
|
|
|
// Cache is the registry cache interface
|
|
type Cache interface {
|
|
// embed the registry interface
|
|
registry.Registry
|
|
// stop the cache watcher
|
|
Stop()
|
|
}
|
|
|
|
type Options struct {
|
|
// TTL is the cache TTL
|
|
TTL time.Duration
|
|
}
|
|
|
|
type Option func(o *Options)
|
|
|
|
type cache struct {
|
|
registry.Registry
|
|
opts Options
|
|
|
|
// registry cache. services,ttls,watched,running are grouped by doman
|
|
sync.RWMutex
|
|
services map[string]services
|
|
ttls map[string]ttls
|
|
watched map[string]watched
|
|
running map[string]bool
|
|
|
|
// used to stop the caches
|
|
exit chan bool
|
|
|
|
// indicate whether its running status of the registry used to hold onto the cache in failure state
|
|
status error
|
|
}
|
|
|
|
type services map[string][]*registry.Service
|
|
type ttls map[string]time.Time
|
|
type watched map[string]bool
|
|
|
|
var defaultTTL = time.Minute
|
|
|
|
func backoff(attempts int) time.Duration {
|
|
if attempts == 0 {
|
|
return time.Duration(0)
|
|
}
|
|
return time.Duration(math.Pow(10, float64(attempts))) * time.Millisecond
|
|
}
|
|
|
|
func (c *cache) getStatus() error {
|
|
c.RLock()
|
|
defer c.RUnlock()
|
|
return c.status
|
|
}
|
|
|
|
func (c *cache) setStatus(err error) {
|
|
c.Lock()
|
|
c.status = err
|
|
c.Unlock()
|
|
}
|
|
|
|
// isValid checks if the service is valid
|
|
func (c *cache) isValid(services []*registry.Service, ttl time.Time) bool {
|
|
// no services exist
|
|
if len(services) == 0 {
|
|
return false
|
|
}
|
|
|
|
// ttl is invalid
|
|
if ttl.IsZero() {
|
|
return false
|
|
}
|
|
|
|
// time since ttl is longer than timeout
|
|
if time.Since(ttl) > 0 {
|
|
return false
|
|
}
|
|
|
|
// ok
|
|
return true
|
|
}
|
|
|
|
func (c *cache) quit() bool {
|
|
select {
|
|
case <-c.exit:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (c *cache) del(domain, service string) {
|
|
// don't blow away cache in error state
|
|
if err := c.getStatus(); err != nil {
|
|
return
|
|
}
|
|
|
|
c.Lock()
|
|
defer c.Unlock()
|
|
|
|
if _, ok := c.services[domain]; ok {
|
|
delete(c.services[domain], service)
|
|
}
|
|
|
|
if _, ok := c.ttls[domain]; ok {
|
|
delete(c.ttls[domain], service)
|
|
}
|
|
}
|
|
|
|
func (c *cache) get(domain, service string) ([]*registry.Service, error) {
|
|
var services []*registry.Service
|
|
var ttl time.Time
|
|
|
|
// lookup the values in the cache before calling the underlying registrry
|
|
c.RLock()
|
|
if srvs, ok := c.services[domain]; ok {
|
|
services = srvs[service]
|
|
}
|
|
if tt, ok := c.ttls[domain]; ok {
|
|
ttl = tt[service]
|
|
}
|
|
c.RUnlock()
|
|
|
|
// got services && within ttl so return a copy of the services
|
|
if c.isValid(services, ttl) {
|
|
return util.Copy(services), nil
|
|
}
|
|
|
|
// get does the actual request for a service and cache it
|
|
get := func(domain string, service string, cached []*registry.Service) ([]*registry.Service, error) {
|
|
// ask the registry
|
|
services, err := c.Registry.GetService(service, registry.GetDomain(domain))
|
|
if err != nil {
|
|
// set the error status
|
|
c.setStatus(err)
|
|
|
|
// check the cache
|
|
if len(cached) > 0 {
|
|
return cached, nil
|
|
}
|
|
|
|
// otherwise return error
|
|
return nil, err
|
|
}
|
|
|
|
// reset the status
|
|
if err := c.getStatus(); err != nil {
|
|
c.setStatus(nil)
|
|
}
|
|
|
|
// cache results
|
|
c.set(domain, service, util.Copy(services))
|
|
|
|
return services, nil
|
|
}
|
|
|
|
// watch service if not watched
|
|
c.RLock()
|
|
var ok bool
|
|
if _, d := c.watched[domain]; d {
|
|
if _, s := c.watched[domain][service]; s {
|
|
ok = true
|
|
}
|
|
}
|
|
c.RUnlock()
|
|
|
|
// check if its being watched
|
|
if !ok {
|
|
c.Lock()
|
|
|
|
// add domain if not registered
|
|
if _, ok := c.watched[domain]; !ok {
|
|
c.watched[domain] = make(map[string]bool)
|
|
}
|
|
|
|
// set to watched
|
|
c.watched[domain][service] = true
|
|
|
|
running := c.running[domain]
|
|
c.Unlock()
|
|
|
|
// only kick it off if not running
|
|
if !running {
|
|
go c.run(domain)
|
|
}
|
|
}
|
|
|
|
// get and return services
|
|
return get(domain, service, services)
|
|
}
|
|
|
|
func (c *cache) set(domain string, service string, srvs []*registry.Service) {
|
|
c.Lock()
|
|
defer c.Unlock()
|
|
|
|
if _, ok := c.services[domain]; !ok {
|
|
c.services[domain] = make(services)
|
|
}
|
|
if _, ok := c.ttls[domain]; !ok {
|
|
c.ttls[domain] = make(ttls)
|
|
}
|
|
|
|
c.services[domain][service] = srvs
|
|
c.ttls[domain][service] = time.Now().Add(c.opts.TTL)
|
|
}
|
|
|
|
func (c *cache) update(domain string, res *registry.Result) {
|
|
if res == nil || res.Service == nil {
|
|
return
|
|
}
|
|
|
|
// only save watched services since the service using the cache may only depend on a handful
|
|
// of other services
|
|
c.RLock()
|
|
if _, ok := c.watched[res.Service.Name]; !ok {
|
|
c.RUnlock()
|
|
return
|
|
}
|
|
|
|
// we're not going to cache anything unless there was already a lookup
|
|
services, ok := c.services[domain][res.Service.Name]
|
|
if !ok {
|
|
c.RUnlock()
|
|
return
|
|
}
|
|
|
|
c.RUnlock()
|
|
|
|
if len(res.Service.Nodes) == 0 {
|
|
switch res.Action {
|
|
case "delete":
|
|
c.del(domain, res.Service.Name)
|
|
}
|
|
return
|
|
}
|
|
|
|
// existing service found
|
|
var service *registry.Service
|
|
var index int
|
|
for i, s := range services {
|
|
if s.Version == res.Service.Version {
|
|
service = s
|
|
index = i
|
|
}
|
|
}
|
|
|
|
switch res.Action {
|
|
case "create", "update":
|
|
if service == nil {
|
|
c.set(domain, res.Service.Name, append(services, res.Service))
|
|
return
|
|
}
|
|
|
|
// append old nodes to new service
|
|
for _, cur := range service.Nodes {
|
|
var seen bool
|
|
for _, node := range res.Service.Nodes {
|
|
if cur.Id == node.Id {
|
|
seen = true
|
|
break
|
|
}
|
|
}
|
|
if !seen {
|
|
res.Service.Nodes = append(res.Service.Nodes, cur)
|
|
}
|
|
}
|
|
|
|
services[index] = res.Service
|
|
c.set(domain, res.Service.Name, services)
|
|
case "delete":
|
|
if service == nil {
|
|
return
|
|
}
|
|
|
|
var nodes []*registry.Node
|
|
|
|
// filter cur nodes to remove the dead one
|
|
for _, cur := range service.Nodes {
|
|
var seen bool
|
|
for _, del := range res.Service.Nodes {
|
|
if del.Id == cur.Id {
|
|
seen = true
|
|
break
|
|
}
|
|
}
|
|
if !seen {
|
|
nodes = append(nodes, cur)
|
|
}
|
|
}
|
|
|
|
// still got nodes, save and return
|
|
if len(nodes) > 0 {
|
|
service.Nodes = nodes
|
|
services[index] = service
|
|
c.set(domain, service.Name, services)
|
|
return
|
|
}
|
|
|
|
// zero nodes left
|
|
|
|
// only have one thing to delete
|
|
// nuke the thing
|
|
if len(services) == 1 {
|
|
c.del(domain, service.Name)
|
|
return
|
|
}
|
|
|
|
// still have more than 1 service
|
|
// check the version and keep what we know
|
|
var srvs []*registry.Service
|
|
for _, s := range services {
|
|
if s.Version != service.Version {
|
|
srvs = append(srvs, s)
|
|
}
|
|
}
|
|
|
|
// save
|
|
c.set(domain, service.Name, srvs)
|
|
}
|
|
}
|
|
|
|
// run starts the cache watcher loop
|
|
// it creates a new watcher if there's a problem
|
|
func (c *cache) run(domain string) {
|
|
c.Lock()
|
|
c.running[domain] = true
|
|
c.Unlock()
|
|
|
|
// reset watcher on exit
|
|
defer func() {
|
|
c.Lock()
|
|
c.watched[domain] = make(map[string]bool)
|
|
c.running[domain] = false
|
|
c.Unlock()
|
|
}()
|
|
|
|
var a, b int
|
|
|
|
for {
|
|
// exit early if already dead
|
|
if c.quit() {
|
|
return
|
|
}
|
|
|
|
// jitter before starting
|
|
j := rand.Int63n(100)
|
|
time.Sleep(time.Duration(j) * time.Millisecond)
|
|
|
|
// create new watcher
|
|
w, err := c.Registry.Watch(registry.WatchDomain(domain))
|
|
if err != nil {
|
|
if c.quit() {
|
|
return
|
|
}
|
|
|
|
d := backoff(a)
|
|
c.setStatus(err)
|
|
|
|
if a > 3 {
|
|
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
|
|
logger.Debug("rcache: ", err, " backing off ", d)
|
|
}
|
|
a = 0
|
|
}
|
|
|
|
time.Sleep(d)
|
|
a++
|
|
|
|
continue
|
|
}
|
|
|
|
// reset a
|
|
a = 0
|
|
|
|
// watch for events
|
|
if err := c.watch(domain, w); err != nil {
|
|
if c.quit() {
|
|
return
|
|
}
|
|
|
|
d := backoff(b)
|
|
c.setStatus(err)
|
|
|
|
if b > 3 {
|
|
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
|
|
logger.Debug("rcache: ", err, " backing off ", d)
|
|
}
|
|
b = 0
|
|
}
|
|
|
|
time.Sleep(d)
|
|
b++
|
|
|
|
continue
|
|
}
|
|
|
|
// reset b
|
|
b = 0
|
|
}
|
|
}
|
|
|
|
// watch loops the next event and calls update
|
|
// it returns if there's an error
|
|
func (c *cache) watch(domain string, w registry.Watcher) error {
|
|
// used to stop the watch
|
|
stop := make(chan bool)
|
|
|
|
// manage this loop
|
|
go func() {
|
|
defer w.Stop()
|
|
|
|
select {
|
|
// wait for exit
|
|
case <-c.exit:
|
|
return
|
|
// we've been stopped
|
|
case <-stop:
|
|
return
|
|
}
|
|
}()
|
|
|
|
for {
|
|
res, err := w.Next()
|
|
if err != nil {
|
|
close(stop)
|
|
return err
|
|
}
|
|
|
|
// reset the error status since we succeeded
|
|
if err := c.getStatus(); err != nil {
|
|
// reset status
|
|
c.setStatus(nil)
|
|
}
|
|
|
|
// for wildcard queries, the domain will be * and not the services domain, so we'll check to
|
|
// see if it was provided in the metadata.
|
|
dom := domain
|
|
if res.Service.Metadata != nil && len(res.Service.Metadata["domain"]) > 0 {
|
|
dom = res.Service.Metadata["domain"]
|
|
}
|
|
|
|
c.update(dom, res)
|
|
}
|
|
}
|
|
|
|
func (c *cache) GetService(service string, opts ...registry.GetOption) ([]*registry.Service, error) {
|
|
// parse the options, fallback to the default domain
|
|
var options registry.GetOptions
|
|
for _, o := range opts {
|
|
o(&options)
|
|
}
|
|
if len(options.Domain) == 0 {
|
|
options.Domain = registry.DefaultDomain
|
|
}
|
|
|
|
// get the service
|
|
services, err := c.get(options.Domain, service)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// if there's nothing return err
|
|
if len(services) == 0 {
|
|
return nil, registry.ErrNotFound
|
|
}
|
|
|
|
// return services
|
|
return services, nil
|
|
}
|
|
|
|
func (c *cache) Stop() {
|
|
c.Lock()
|
|
defer c.Unlock()
|
|
|
|
select {
|
|
case <-c.exit:
|
|
return
|
|
default:
|
|
close(c.exit)
|
|
}
|
|
}
|
|
|
|
func (c *cache) String() string {
|
|
return "cache"
|
|
}
|
|
|
|
// New returns a new cache
|
|
func New(r registry.Registry, opts ...Option) Cache {
|
|
rand.Seed(time.Now().UnixNano())
|
|
options := Options{
|
|
TTL: defaultTTL,
|
|
}
|
|
|
|
for _, o := range opts {
|
|
o(&options)
|
|
}
|
|
|
|
return &cache{
|
|
Registry: r,
|
|
opts: options,
|
|
running: make(map[string]bool),
|
|
watched: make(map[string]watched),
|
|
services: make(map[string]services),
|
|
ttls: make(map[string]ttls),
|
|
exit: make(chan bool),
|
|
}
|
|
}
|