diff --git a/registry/cache/README.md b/registry/cache/README.md new file mode 100644 index 00000000..1c2d6e90 --- /dev/null +++ b/registry/cache/README.md @@ -0,0 +1,31 @@ +# Registry Cache + +Cache is a library that provides a caching layer for the go-micro [registry](https://godoc.org/github.com/micro/go-micro/registry#Registry). + +If you're looking for caching in your microservices use the [selector](https://micro.mu/docs/fault-tolerance.html#caching-discovery). + +## Interface + +``` +// Cache is the registry cache interface +type Cache interface { + // embed the registry interface + registry.Registry + // stop the cache watcher + Stop() +} +``` + +## Usage + +``` +import ( + "github.com/micro/go-micro/registry" + "github.com/micro/go-micro/registry/cache" +) + +r := registry.NewRegistry() +cache := cache.New(r) + +services, _ := cache.GetService("my.service") +``` diff --git a/registry/cache/options.go b/registry/cache/options.go new file mode 100644 index 00000000..284721a9 --- /dev/null +++ b/registry/cache/options.go @@ -0,0 +1,12 @@ +package cache + +import ( + "time" +) + +// WithTTL sets the cache TTL +func WithTTL(t time.Duration) Option { + return func(o *Options) { + o.TTL = t + } +} diff --git a/registry/cache/rcache.go b/registry/cache/rcache.go new file mode 100644 index 00000000..53b11576 --- /dev/null +++ b/registry/cache/rcache.go @@ -0,0 +1,429 @@ +// Package cache provides a registry cache +package cache + +import ( + "math" + "math/rand" + "sync" + "time" + + log "github.com/micro/go-log" + "github.com/micro/go-micro/registry" +) + +// Cache is the registry cache interface +type Cache interface { + // embed the registry interface + registry.Registry + // stop the cache watcher + Stop() +} + +type Options struct { + // TTL is the cache TTL + TTL time.Duration +} + +type Option func(o *Options) + +type cache struct { + registry.Registry + opts Options + + // registry cache + sync.RWMutex + cache map[string][]*registry.Service + ttls map[string]time.Time + watched map[string]bool + + exit chan bool +} + +var ( + DefaultTTL = time.Minute +) + +func backoff(attempts int) time.Duration { + if attempts == 0 { + return time.Duration(0) + } + return time.Duration(math.Pow(10, float64(attempts))) * time.Millisecond +} + +// isValid checks if the service is valid +func (c *cache) isValid(services []*registry.Service, ttl time.Time) bool { + // no services exist + if len(services) == 0 { + return false + } + + // ttl is invalid + if ttl.IsZero() { + return false + } + + // time since ttl is longer than timeout + if time.Since(ttl) > c.opts.TTL { + return false + } + + // ok + return true +} + +func (c *cache) quit() bool { + select { + case <-c.exit: + return true + default: + return false + } +} + +// cp copies a service. Because we're caching handing back pointers would +// create a race condition, so we do this instead its fast enough +func (c *cache) cp(current []*registry.Service) []*registry.Service { + var services []*registry.Service + + for _, service := range current { + // copy service + s := new(registry.Service) + *s = *service + + // copy nodes + var nodes []*registry.Node + for _, node := range service.Nodes { + n := new(registry.Node) + *n = *node + nodes = append(nodes, n) + } + s.Nodes = nodes + + // copy endpoints + var eps []*registry.Endpoint + for _, ep := range service.Endpoints { + e := new(registry.Endpoint) + *e = *ep + eps = append(eps, e) + } + s.Endpoints = eps + + // append service + services = append(services, s) + } + + return services +} + +func (c *cache) del(service string) { + delete(c.cache, service) + delete(c.ttls, service) +} + +func (c *cache) get(service string) ([]*registry.Service, error) { + // read lock + c.RLock() + + // check the cache first + services := c.cache[service] + // get cache ttl + ttl := c.ttls[service] + + // got services && within ttl so return cache + if c.isValid(services, ttl) { + // make a copy + cp := c.cp(services) + // unlock the read + c.RUnlock() + // return servics + return cp, nil + } + + // get does the actual request for a service and cache it + get := func(service string) ([]*registry.Service, error) { + // ask the registry + services, err := c.Registry.GetService(service) + if err != nil { + return nil, err + } + + // cache results + c.Lock() + c.set(service, c.cp(services)) + c.Unlock() + + return services, nil + } + + // watch service if not watched + if _, ok := c.watched[service]; !ok { + go c.run(service) + } + + // unlock the read lock + c.RUnlock() + + // get and return services + return get(service) +} + +func (c *cache) set(service string, services []*registry.Service) { + c.cache[service] = services + c.ttls[service] = time.Now().Add(c.opts.TTL) +} + +func (c *cache) update(res *registry.Result) { + if res == nil || res.Service == nil { + return + } + + c.Lock() + defer c.Unlock() + + services, ok := c.cache[res.Service.Name] + if !ok { + // we're not going to cache anything + // unless there was already a lookup + return + } + + if len(res.Service.Nodes) == 0 { + switch res.Action { + case "delete": + c.del(res.Service.Name) + } + return + } + + // existing service found + var service *registry.Service + var index int + for i, s := range services { + if s.Version == res.Service.Version { + service = s + index = i + } + } + + switch res.Action { + case "create", "update": + if service == nil { + c.set(res.Service.Name, append(services, res.Service)) + return + } + + // append old nodes to new service + for _, cur := range service.Nodes { + var seen bool + for _, node := range res.Service.Nodes { + if cur.Id == node.Id { + seen = true + break + } + } + if !seen { + res.Service.Nodes = append(res.Service.Nodes, cur) + } + } + + services[index] = res.Service + c.set(res.Service.Name, services) + case "delete": + if service == nil { + return + } + + var nodes []*registry.Node + + // filter cur nodes to remove the dead one + for _, cur := range service.Nodes { + var seen bool + for _, del := range res.Service.Nodes { + if del.Id == cur.Id { + seen = true + break + } + } + if !seen { + nodes = append(nodes, cur) + } + } + + // still got nodes, save and return + if len(nodes) > 0 { + service.Nodes = nodes + services[index] = service + c.set(service.Name, services) + return + } + + // zero nodes left + + // only have one thing to delete + // nuke the thing + if len(services) == 1 { + c.del(service.Name) + return + } + + // still have more than 1 service + // check the version and keep what we know + var srvs []*registry.Service + for _, s := range services { + if s.Version != service.Version { + srvs = append(srvs, s) + } + } + + // save + c.set(service.Name, srvs) + } +} + +// run starts the cache watcher loop +// it creates a new watcher if there's a problem +func (c *cache) run(service string) { + // set watcher + c.Lock() + c.watched[service] = true + c.Unlock() + + // delete watcher on exit + defer func() { + c.Lock() + delete(c.watched, service) + c.Unlock() + }() + + var a, b int + + for { + // exit early if already dead + if c.quit() { + return + } + + // jitter before starting + j := rand.Int63n(100) + time.Sleep(time.Duration(j) * time.Millisecond) + + // create new watcher + w, err := c.Registry.Watch( + registry.WatchService(service), + ) + + if err != nil { + if c.quit() { + return + } + + d := backoff(a) + + if a > 3 { + log.Log("rcache: ", err, " backing off ", d) + a = 0 + } + + time.Sleep(d) + a++ + + continue + } + + // reset a + a = 0 + + // watch for events + if err := c.watch(w); err != nil { + if c.quit() { + return + } + + d := backoff(b) + + if b > 3 { + log.Log("rcache: ", err, " backing off ", d) + b = 0 + } + + time.Sleep(d) + b++ + + continue + } + + // reset b + b = 0 + } +} + +// watch loops the next event and calls update +// it returns if there's an error +func (c *cache) watch(w registry.Watcher) error { + defer w.Stop() + + // manage this loop + go func() { + // wait for exit + <-c.exit + w.Stop() + }() + + for { + res, err := w.Next() + if err != nil { + return err + } + c.update(res) + } +} + +func (c *cache) GetService(service string) ([]*registry.Service, error) { + // get the service + services, err := c.get(service) + if err != nil { + return nil, err + } + + // if there's nothing return err + if len(services) == 0 { + return nil, registry.ErrNotFound + } + + // return services + return services, nil +} + +func (c *cache) Stop() { + select { + case <-c.exit: + return + default: + close(c.exit) + } +} + +func (c *cache) String() string { + return "rcache" +} + +// New returns a new cache +func New(r registry.Registry, opts ...Option) Cache { + rand.Seed(time.Now().UnixNano()) + options := Options{ + TTL: DefaultTTL, + } + + for _, o := range opts { + o(&options) + } + + return &cache{ + Registry: r, + opts: options, + watched: make(map[string]bool), + cache: make(map[string][]*registry.Service), + ttls: make(map[string]time.Time), + exit: make(chan bool), + } +}