Add cache ttl

This commit is contained in:
Asim 2016-05-03 19:26:50 +01:00
parent e541d45f38
commit b13361d010
2 changed files with 125 additions and 36 deletions

View File

@ -10,18 +10,29 @@ import (
"github.com/micro/go-micro/selector" "github.com/micro/go-micro/selector"
) )
/*
Cache selector is a selector which uses the registry.Watcher to Cache service entries.
It defaults to a TTL for 1 minute and causes a cache miss on the next request.
*/
type cacheSelector struct { type cacheSelector struct {
so selector.Options so selector.Options
ttl time.Duration
// registry cache // registry cache
sync.Mutex sync.Mutex
cache map[string][]*registry.Service cache map[string][]*registry.Service
ttls map[string]time.Time
// used to close or reload watcher // used to close or reload watcher
reload chan bool reload chan bool
exit chan bool exit chan bool
} }
var (
DefaultTTL = time.Minute
)
func init() { func init() {
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
} }
@ -71,31 +82,46 @@ func (c *cacheSelector) cp(current []*registry.Service) []*registry.Service {
return services return services
} }
func (c *cacheSelector) del(service string) {
delete(c.cache, service)
delete(c.ttls, service)
}
func (c *cacheSelector) get(service string) ([]*registry.Service, error) { func (c *cacheSelector) get(service string) ([]*registry.Service, error) {
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
// check the cache first // check the cache first
services, ok := c.cache[service] services, ok := c.cache[service]
ttl, kk := c.ttls[service]
// got results, copy and return // got results, copy and return
if ok && len(services) > 0 { if ok && len(services) > 0 {
return c.cp(services), nil // only return if its less than the ttl
if kk && time.Since(ttl) < c.ttl {
return c.cp(services), nil
}
} }
// cache miss or ttl expired
// now ask the registry // now ask the registry
services, err := c.so.Registry.GetService(service) services, err := c.so.Registry.GetService(service)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// we didn't have any results so cache // we didn't have any results so cache
c.cache[service] = c.cp(services) c.cache[service] = c.cp(services)
c.ttls[service] = time.Now().Add(c.ttl)
return services, nil return services, nil
} }
func (c *cacheSelector) set(service string, services []*registry.Service) {
c.cache[service] = services
c.ttls[service] = time.Now().Add(c.ttl)
}
func (c *cacheSelector) update(res *registry.Result) { func (c *cacheSelector) update(res *registry.Result) {
if res == nil || res.Service == nil { if res == nil || res.Service == nil {
return return
@ -114,7 +140,7 @@ func (c *cacheSelector) update(res *registry.Result) {
if len(res.Service.Nodes) == 0 { if len(res.Service.Nodes) == 0 {
switch res.Action { switch res.Action {
case "delete": case "delete":
delete(c.cache, res.Service.Name) c.del(res.Service.Name)
} }
return return
} }
@ -132,8 +158,7 @@ func (c *cacheSelector) update(res *registry.Result) {
switch res.Action { switch res.Action {
case "create", "update": case "create", "update":
if service == nil { if service == nil {
services = append(services, res.Service) c.set(res.Service.Name, append(services, res.Service))
c.cache[res.Service.Name] = services
return return
} }
@ -152,7 +177,7 @@ func (c *cacheSelector) update(res *registry.Result) {
} }
services[index] = res.Service services[index] = res.Service
c.cache[res.Service.Name] = services c.set(res.Service.Name, services)
case "delete": case "delete":
if service == nil { if service == nil {
return return
@ -174,24 +199,34 @@ func (c *cacheSelector) update(res *registry.Result) {
} }
} }
if len(nodes) == 0 { // still got nodes, save and return
if len(services) == 1 { if len(nodes) > 0 {
delete(c.cache, service.Name) service.Nodes = nodes
} else { services[index] = service
var srvs []*registry.Service c.set(service.Name, services)
for _, s := range services {
if s.Version != service.Version {
srvs = append(srvs, s)
}
}
c.cache[service.Name] = srvs
}
return return
} }
service.Nodes = nodes // zero nodes left
services[index] = service
c.cache[res.Service.Name] = services // only have one thing to delete
// nuke the thing
if len(services) == 1 {
c.del(service.Name)
return
}
// still have more than 1 service
// check the version and keep what we know
var srvs []*registry.Service
for _, s := range services {
if s.Version != service.Version {
srvs = append(srvs, s)
}
}
// save
c.set(service.Name, srvs)
} }
} }
@ -200,6 +235,8 @@ func (c *cacheSelector) update(res *registry.Result) {
// reloads the watcher if Init is called // reloads the watcher if Init is called
// and returns when Close is called // and returns when Close is called
func (c *cacheSelector) run() { func (c *cacheSelector) run() {
go c.tick()
for { for {
// exit early if already dead // exit early if already dead
if c.quit() { if c.quit() {
@ -214,18 +251,6 @@ func (c *cacheSelector) run() {
continue continue
} }
// manage this loop
go func() {
// wait for exit or reload signal
select {
case <-c.exit:
case <-c.reload:
}
// stop the watcher
w.Stop()
}()
// watch for events // watch for events
if err := c.watch(w); err != nil { if err := c.watch(w); err != nil {
log.Println(err) log.Println(err)
@ -234,9 +259,44 @@ func (c *cacheSelector) run() {
} }
} }
// check cache and expire on each tick
func (c *cacheSelector) tick() {
t := time.NewTicker(time.Minute)
for {
select {
case <-t.C:
c.Lock()
for service, expiry := range c.ttls {
if d := time.Since(expiry); d > c.ttl {
// TODO: maybe refresh the cache rather than blowing it away
c.del(service)
}
}
c.Unlock()
case <-c.exit:
return
}
}
}
// watch loops the next event and calls update // watch loops the next event and calls update
// it returns if there's an error // it returns if there's an error
func (c *cacheSelector) watch(w registry.Watcher) error { func (c *cacheSelector) watch(w registry.Watcher) error {
defer w.Stop()
// manage this loop
go func() {
// wait for exit or reload signal
select {
case <-c.exit:
case <-c.reload:
}
// stop the watcher
w.Stop()
}()
for { for {
res, err := w.Next() res, err := w.Next()
if err != nil { if err != nil {
@ -357,14 +417,23 @@ func NewSelector(opts ...selector.Option) selector.Selector {
sopts.Registry = registry.DefaultRegistry sopts.Registry = registry.DefaultRegistry
} }
ttl := DefaultTTL
if sopts.Context != nil {
if t, ok := sopts.Context.Value(ttlKey{}).(time.Duration); ok {
ttl = t
}
}
c := &cacheSelector{ c := &cacheSelector{
so: sopts, so: sopts,
ttl: ttl,
cache: make(map[string][]*registry.Service), cache: make(map[string][]*registry.Service),
ttls: make(map[string]time.Time),
reload: make(chan bool, 1), reload: make(chan bool, 1),
exit: make(chan bool), exit: make(chan bool),
} }
go c.run() go c.run()
return c return c
} }

20
selector/cache/options.go vendored Normal file
View File

@ -0,0 +1,20 @@
package cache
import (
"time"
"github.com/micro/go-micro/selector"
"golang.org/x/net/context"
)
type ttlKey struct{}
// Set the cache ttl
func TTL(t time.Duration) selector.Option {
return func(o *selector.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, ttlKey{}, t)
}
}