Update selector race, rename cache selector

This commit is contained in:
Asim Aslam 2018-12-29 15:44:51 +00:00
parent ff982b5fd1
commit 5cae330732
9 changed files with 416 additions and 501 deletions

View File

@ -26,7 +26,6 @@ import (
// selectors // selectors
"github.com/micro/go-micro/selector" "github.com/micro/go-micro/selector"
"github.com/micro/go-micro/selector/cache"
// transports // transports
"github.com/micro/go-micro/transport" "github.com/micro/go-micro/transport"
@ -149,7 +148,6 @@ var (
Name: "selector", Name: "selector",
EnvVar: "MICRO_SELECTOR", EnvVar: "MICRO_SELECTOR",
Usage: "Selector used to pick nodes for querying", Usage: "Selector used to pick nodes for querying",
Value: "cache",
}, },
cli.StringFlag{ cli.StringFlag{
Name: "transport", Name: "transport",
@ -179,7 +177,7 @@ var (
DefaultSelectors = map[string]func(...selector.Option) selector.Selector{ DefaultSelectors = map[string]func(...selector.Option) selector.Selector{
"default": selector.NewSelector, "default": selector.NewSelector,
"cache": cache.NewSelector, "cache": selector.NewSelector,
} }
DefaultServers = map[string]func(...server.Option) server.Server{ DefaultServers = map[string]func(...server.Option) server.Server{

View File

@ -38,20 +38,35 @@ func cp(current []*registry.Service) []*registry.Service {
} }
func addNodes(old, neu []*registry.Node) []*registry.Node { func addNodes(old, neu []*registry.Node) []*registry.Node {
var nodes []*registry.Node
// add all new nodes
for _, n := range neu { for _, n := range neu {
var seen bool node := *n
for i, o := range old { nodes = append(nodes, &node)
}
// look at old nodes
for _, o := range old {
var exists bool
// check against new nodes
for _, n := range nodes {
// ids match then skip
if o.Id == n.Id { if o.Id == n.Id {
seen = true exists = true
old[i] = n
break break
} }
} }
if !seen {
old = append(old, n) // keep old node
if !exists {
node := *o
nodes = append(nodes, &node)
} }
} }
return old
return nodes
} }
func addServices(old, neu []*registry.Service) []*registry.Service { func addServices(old, neu []*registry.Service) []*registry.Service {
@ -91,19 +106,27 @@ func delNodes(old, del []*registry.Node) []*registry.Node {
func delServices(old, del []*registry.Service) []*registry.Service { func delServices(old, del []*registry.Service) []*registry.Service {
var services []*registry.Service var services []*registry.Service
for i, o := range old {
for _, o := range old {
srv := new(registry.Service)
*srv = *o
var rem bool var rem bool
for _, s := range del { for _, s := range del {
if o.Version == s.Version { if srv.Version == s.Version {
old[i].Nodes = delNodes(o.Nodes, s.Nodes) srv.Nodes = delNodes(srv.Nodes, s.Nodes)
if len(old[i].Nodes) == 0 {
if len(srv.Nodes) == 0 {
rem = true rem = true
} }
} }
} }
if !rem { if !rem {
services = append(services, o) services = append(services, srv)
} }
} }
return services return services
} }

View File

@ -1,424 +0,0 @@
// Package cache is a caching selector. It uses the registry watcher.
package cache
import (
"sync"
"time"
"github.com/micro/go-log"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector"
)
type cacheSelector struct {
so selector.Options
ttl time.Duration
// registry cache
sync.RWMutex
cache map[string][]*registry.Service
ttls map[string]time.Time
watched map[string]bool
// used to close or reload watcher
reload chan bool
exit chan bool
}
var (
DefaultTTL = time.Minute
)
func (c *cacheSelector) quit() bool {
select {
case <-c.exit:
return true
default:
return false
}
}
// cp copies a service. Because we're caching handing back pointers would
// create a race condition, so we do this instead
// its fast enough
func (c *cacheSelector) cp(current []*registry.Service) []*registry.Service {
var services []*registry.Service
for _, service := range current {
// copy service
s := new(registry.Service)
*s = *service
// copy nodes
var nodes []*registry.Node
for _, node := range service.Nodes {
n := new(registry.Node)
*n = *node
nodes = append(nodes, n)
}
s.Nodes = nodes
// copy endpoints
var eps []*registry.Endpoint
for _, ep := range service.Endpoints {
e := new(registry.Endpoint)
*e = *ep
eps = append(eps, e)
}
s.Endpoints = eps
// append service
services = append(services, s)
}
return services
}
func (c *cacheSelector) del(service string) {
delete(c.cache, service)
delete(c.ttls, service)
}
func (c *cacheSelector) get(service string) ([]*registry.Service, error) {
// read lock
c.RLock()
// check the cache first
services, ok := c.cache[service]
// get cache ttl
ttl, kk := c.ttls[service]
// got services && within ttl so return cache
if ok && kk && time.Since(ttl) < c.ttl {
// make a copy
cp := c.cp(services)
// unlock the read
c.RUnlock()
// return servics
return cp, nil
}
// get does the actual request for a service and cache it
get := func(service string) ([]*registry.Service, error) {
// ask the registry
services, err := c.so.Registry.GetService(service)
if err != nil {
return nil, err
}
// cache results
c.Lock()
c.set(service, c.cp(services))
c.Unlock()
return services, nil
}
// watch service if not watched
if _, ok := c.watched[service]; !ok {
go c.run(service)
}
// unlock the read lock
c.RUnlock()
// get and return services
return get(service)
}
func (c *cacheSelector) set(service string, services []*registry.Service) {
c.cache[service] = services
c.ttls[service] = time.Now().Add(c.ttl)
}
func (c *cacheSelector) update(res *registry.Result) {
if res == nil || res.Service == nil {
return
}
c.Lock()
defer c.Unlock()
services, ok := c.cache[res.Service.Name]
if !ok {
// we're not going to cache anything
// unless there was already a lookup
return
}
if len(res.Service.Nodes) == 0 {
switch res.Action {
case "delete":
c.del(res.Service.Name)
}
return
}
// existing service found
var service *registry.Service
var index int
for i, s := range services {
if s.Version == res.Service.Version {
service = s
index = i
}
}
switch res.Action {
case "create", "update":
if service == nil {
c.set(res.Service.Name, append(services, res.Service))
return
}
// append old nodes to new service
for _, cur := range service.Nodes {
var seen bool
for _, node := range res.Service.Nodes {
if cur.Id == node.Id {
seen = true
break
}
}
if !seen {
res.Service.Nodes = append(res.Service.Nodes, cur)
}
}
services[index] = res.Service
c.set(res.Service.Name, services)
case "delete":
if service == nil {
return
}
var nodes []*registry.Node
// filter cur nodes to remove the dead one
for _, cur := range service.Nodes {
var seen bool
for _, del := range res.Service.Nodes {
if del.Id == cur.Id {
seen = true
break
}
}
if !seen {
nodes = append(nodes, cur)
}
}
// still got nodes, save and return
if len(nodes) > 0 {
service.Nodes = nodes
services[index] = service
c.set(service.Name, services)
return
}
// zero nodes left
// only have one thing to delete
// nuke the thing
if len(services) == 1 {
c.del(service.Name)
return
}
// still have more than 1 service
// check the version and keep what we know
var srvs []*registry.Service
for _, s := range services {
if s.Version != service.Version {
srvs = append(srvs, s)
}
}
// save
c.set(service.Name, srvs)
}
}
// run starts the cache watcher loop
// it creates a new watcher if there's a problem
// reloads the watcher if Init is called
// and returns when Close is called
func (c *cacheSelector) run(name string) {
// set watcher
c.Lock()
c.watched[name] = true
c.Unlock()
// delete watcher on exit
defer func() {
c.Lock()
delete(c.watched, name)
c.Unlock()
}()
for {
// exit early if already dead
if c.quit() {
return
}
// create new watcher
w, err := c.so.Registry.Watch(
registry.WatchService(name),
)
if err != nil {
if c.quit() {
return
}
log.Log(err)
time.Sleep(time.Second)
continue
}
// watch for events
if err := c.watch(w); err != nil {
if c.quit() {
return
}
log.Log(err)
continue
}
}
}
// watch loops the next event and calls update
// it returns if there's an error
func (c *cacheSelector) watch(w registry.Watcher) error {
defer w.Stop()
// manage this loop
go func() {
// wait for exit or reload signal
select {
case <-c.exit:
case <-c.reload:
}
// stop the watcher
w.Stop()
}()
for {
res, err := w.Next()
if err != nil {
return err
}
c.update(res)
}
}
func (c *cacheSelector) Init(opts ...selector.Option) error {
for _, o := range opts {
o(&c.so)
}
// reload the watcher
go func() {
select {
case <-c.exit:
return
default:
c.reload <- true
}
}()
return nil
}
func (c *cacheSelector) Options() selector.Options {
return c.so
}
func (c *cacheSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) {
sopts := selector.SelectOptions{
Strategy: c.so.Strategy,
}
for _, opt := range opts {
opt(&sopts)
}
// get the service
// try the cache first
// if that fails go directly to the registry
services, err := c.get(service)
if err != nil {
return nil, err
}
// apply the filters
for _, filter := range sopts.Filters {
services = filter(services)
}
// if there's nothing left, return
if len(services) == 0 {
return nil, selector.ErrNoneAvailable
}
return sopts.Strategy(services), nil
}
func (c *cacheSelector) Mark(service string, node *registry.Node, err error) {
}
func (c *cacheSelector) Reset(service string) {
}
// Close stops the watcher and destroys the cache
func (c *cacheSelector) Close() error {
c.Lock()
c.cache = make(map[string][]*registry.Service)
c.watched = make(map[string]bool)
c.Unlock()
select {
case <-c.exit:
return nil
default:
close(c.exit)
}
return nil
}
func (c *cacheSelector) String() string {
return "cache"
}
func NewSelector(opts ...selector.Option) selector.Selector {
sopts := selector.Options{
Strategy: selector.Random,
}
for _, opt := range opts {
opt(&sopts)
}
if sopts.Registry == nil {
sopts.Registry = registry.DefaultRegistry
}
ttl := DefaultTTL
if sopts.Context != nil {
if t, ok := sopts.Context.Value(ttlKey{}).(time.Duration); ok {
ttl = t
}
}
return &cacheSelector{
so: sopts,
ttl: ttl,
watched: make(map[string]bool),
cache: make(map[string][]*registry.Service),
ttls: make(map[string]time.Time),
reload: make(chan bool, 1),
exit: make(chan bool),
}
}

View File

@ -1,29 +0,0 @@
package cache
import (
"testing"
"github.com/micro/go-micro/registry/mock"
"github.com/micro/go-micro/selector"
)
func TestCacheSelector(t *testing.T) {
counts := map[string]int{}
cache := NewSelector(selector.Registry(mock.NewRegistry()))
next, err := cache.Select("foo")
if err != nil {
t.Errorf("Unexpected error calling cache select: %v", err)
}
for i := 0; i < 100; i++ {
node, err := next()
if err != nil {
t.Errorf("Expected node err, got err: %v", err)
}
counts[node.Id]++
}
t.Logf("Cache Counts %v", counts)
}

View File

@ -1,27 +1,341 @@
package selector package selector
import ( import (
"sync"
"time"
"github.com/micro/go-log"
"github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry"
) )
type defaultSelector struct { type registrySelector struct {
so Options so Options
ttl time.Duration
// registry cache
sync.RWMutex
cache map[string][]*registry.Service
ttls map[string]time.Time
watched map[string]bool
// used to close or reload watcher
reload chan bool
exit chan bool
} }
func (r *defaultSelector) Init(opts ...Option) error { var (
for _, o := range opts { DefaultTTL = time.Minute
o(&r.so) )
func (c *registrySelector) quit() bool {
select {
case <-c.exit:
return true
default:
return false
} }
}
// cp copies a service. Because we're caching handing back pointers would
// create a race condition, so we do this instead
// its fast enough
func (c *registrySelector) cp(current []*registry.Service) []*registry.Service {
var services []*registry.Service
for _, service := range current {
// copy service
s := new(registry.Service)
*s = *service
// copy nodes
var nodes []*registry.Node
for _, node := range service.Nodes {
n := new(registry.Node)
*n = *node
nodes = append(nodes, n)
}
s.Nodes = nodes
// copy endpoints
var eps []*registry.Endpoint
for _, ep := range service.Endpoints {
e := new(registry.Endpoint)
*e = *ep
eps = append(eps, e)
}
s.Endpoints = eps
// append service
services = append(services, s)
}
return services
}
func (c *registrySelector) del(service string) {
delete(c.cache, service)
delete(c.ttls, service)
}
func (c *registrySelector) get(service string) ([]*registry.Service, error) {
// read lock
c.RLock()
// check the cache first
services, ok := c.cache[service]
// get cache ttl
ttl, kk := c.ttls[service]
// got services && within ttl so return cache
if ok && kk && time.Since(ttl) < c.ttl {
// make a copy
cp := c.cp(services)
// unlock the read
c.RUnlock()
// return servics
return cp, nil
}
// get does the actual request for a service and cache it
get := func(service string) ([]*registry.Service, error) {
// ask the registry
services, err := c.so.Registry.GetService(service)
if err != nil {
return nil, err
}
// cache results
c.Lock()
c.set(service, c.cp(services))
c.Unlock()
return services, nil
}
// watch service if not watched
if _, ok := c.watched[service]; !ok {
go c.run(service)
}
// unlock the read lock
c.RUnlock()
// get and return services
return get(service)
}
func (c *registrySelector) set(service string, services []*registry.Service) {
c.cache[service] = services
c.ttls[service] = time.Now().Add(c.ttl)
}
func (c *registrySelector) update(res *registry.Result) {
if res == nil || res.Service == nil {
return
}
c.Lock()
defer c.Unlock()
services, ok := c.cache[res.Service.Name]
if !ok {
// we're not going to cache anything
// unless there was already a lookup
return
}
if len(res.Service.Nodes) == 0 {
switch res.Action {
case "delete":
c.del(res.Service.Name)
}
return
}
// existing service found
var service *registry.Service
var index int
for i, s := range services {
if s.Version == res.Service.Version {
service = s
index = i
}
}
switch res.Action {
case "create", "update":
if service == nil {
c.set(res.Service.Name, append(services, res.Service))
return
}
// append old nodes to new service
for _, cur := range service.Nodes {
var seen bool
for _, node := range res.Service.Nodes {
if cur.Id == node.Id {
seen = true
break
}
}
if !seen {
res.Service.Nodes = append(res.Service.Nodes, cur)
}
}
services[index] = res.Service
c.set(res.Service.Name, services)
case "delete":
if service == nil {
return
}
var nodes []*registry.Node
// filter cur nodes to remove the dead one
for _, cur := range service.Nodes {
var seen bool
for _, del := range res.Service.Nodes {
if del.Id == cur.Id {
seen = true
break
}
}
if !seen {
nodes = append(nodes, cur)
}
}
// still got nodes, save and return
if len(nodes) > 0 {
service.Nodes = nodes
services[index] = service
c.set(service.Name, services)
return
}
// zero nodes left
// only have one thing to delete
// nuke the thing
if len(services) == 1 {
c.del(service.Name)
return
}
// still have more than 1 service
// check the version and keep what we know
var srvs []*registry.Service
for _, s := range services {
if s.Version != service.Version {
srvs = append(srvs, s)
}
}
// save
c.set(service.Name, srvs)
}
}
// run starts the cache watcher loop
// it creates a new watcher if there's a problem
// reloads the watcher if Init is called
// and returns when Close is called
func (c *registrySelector) run(name string) {
// set watcher
c.Lock()
c.watched[name] = true
c.Unlock()
// delete watcher on exit
defer func() {
c.Lock()
delete(c.watched, name)
c.Unlock()
}()
for {
// exit early if already dead
if c.quit() {
return
}
// create new watcher
w, err := c.so.Registry.Watch(
registry.WatchService(name),
)
if err != nil {
if c.quit() {
return
}
log.Log(err)
time.Sleep(time.Second)
continue
}
// watch for events
if err := c.watch(w); err != nil {
if c.quit() {
return
}
log.Log(err)
continue
}
}
}
// watch loops the next event and calls update
// it returns if there's an error
func (c *registrySelector) watch(w registry.Watcher) error {
defer w.Stop()
// manage this loop
go func() {
// wait for exit or reload signal
select {
case <-c.exit:
case <-c.reload:
}
// stop the watcher
w.Stop()
}()
for {
res, err := w.Next()
if err != nil {
return err
}
c.update(res)
}
}
func (c *registrySelector) Init(opts ...Option) error {
for _, o := range opts {
o(&c.so)
}
// reload the watcher
go func() {
select {
case <-c.exit:
return
default:
c.reload <- true
}
}()
return nil return nil
} }
func (r *defaultSelector) Options() Options { func (c *registrySelector) Options() Options {
return r.so return c.so
} }
func (r *defaultSelector) Select(service string, opts ...SelectOption) (Next, error) { func (c *registrySelector) Select(service string, opts ...SelectOption) (Next, error) {
sopts := SelectOptions{ sopts := SelectOptions{
Strategy: r.so.Strategy, Strategy: c.so.Strategy,
} }
for _, opt := range opts { for _, opt := range opts {
@ -29,7 +343,9 @@ func (r *defaultSelector) Select(service string, opts ...SelectOption) (Next, er
} }
// get the service // get the service
services, err := r.so.Registry.GetService(service) // try the cache first
// if that fails go directly to the registry
services, err := c.get(service)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -47,21 +363,33 @@ func (r *defaultSelector) Select(service string, opts ...SelectOption) (Next, er
return sopts.Strategy(services), nil return sopts.Strategy(services), nil
} }
func (r *defaultSelector) Mark(service string, node *registry.Node, err error) { func (c *registrySelector) Mark(service string, node *registry.Node, err error) {
} }
func (r *defaultSelector) Reset(service string) { func (c *registrySelector) Reset(service string) {
} }
func (r *defaultSelector) Close() error { // Close stops the watcher and destroys the cache
func (c *registrySelector) Close() error {
c.Lock()
c.cache = make(map[string][]*registry.Service)
c.watched = make(map[string]bool)
c.Unlock()
select {
case <-c.exit:
return nil
default:
close(c.exit)
}
return nil return nil
} }
func (r *defaultSelector) String() string { func (c *registrySelector) String() string {
return "default" return "registry"
} }
func newDefaultSelector(opts ...Option) Selector { func NewSelector(opts ...Option) Selector {
sopts := Options{ sopts := Options{
Strategy: Random, Strategy: Random,
} }
@ -74,7 +402,21 @@ func newDefaultSelector(opts ...Option) Selector {
sopts.Registry = registry.DefaultRegistry sopts.Registry = registry.DefaultRegistry
} }
return &defaultSelector{ ttl := DefaultTTL
so: sopts,
if sopts.Context != nil {
if t, ok := sopts.Context.Value("selector_ttl").(time.Duration); ok {
ttl = t
}
}
return &registrySelector{
so: sopts,
ttl: ttl,
watched: make(map[string]bool),
cache: make(map[string][]*registry.Service),
ttls: make(map[string]time.Time),
reload: make(chan bool, 1),
exit: make(chan bool),
} }
} }

View File

@ -6,14 +6,14 @@ import (
"github.com/micro/go-micro/registry/mock" "github.com/micro/go-micro/registry/mock"
) )
func TestDefaultSelector(t *testing.T) { func TestRegistrySelector(t *testing.T) {
counts := map[string]int{} counts := map[string]int{}
rs := newDefaultSelector(Registry(mock.NewRegistry())) cache := NewSelector(Registry(mock.NewRegistry()))
next, err := rs.Select("foo") next, err := cache.Select("foo")
if err != nil { if err != nil {
t.Errorf("Unexpected error calling default select: %v", err) t.Errorf("Unexpected error calling cache select: %v", err)
} }
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
@ -24,5 +24,5 @@ func TestDefaultSelector(t *testing.T) {
counts[node.Id]++ counts[node.Id]++
} }
t.Logf("Default Counts %v", counts) t.Logf("Selector Counts %v", counts)
} }

View File

@ -1,4 +1,4 @@
package cache package registry
import ( import (
"context" "context"
@ -7,14 +7,12 @@ import (
"github.com/micro/go-micro/selector" "github.com/micro/go-micro/selector"
) )
type ttlKey struct{} // Set the registry cache ttl
// Set the cache ttl
func TTL(t time.Duration) selector.Option { func TTL(t time.Duration) selector.Option {
return func(o *selector.Options) { return func(o *selector.Options) {
if o.Context == nil { if o.Context == nil {
o.Context = context.Background() o.Context = context.Background()
} }
o.Context = context.WithValue(o.Context, ttlKey{}, t) o.Context = context.WithValue(o.Context, "selector_ttl", t)
} }
} }

View File

@ -0,0 +1,11 @@
// Package registry is uses the go-micro registry for selection
package registry
import (
"github.com/micro/go-micro/selector"
)
// NewSelector returns a new registry selector
func NewSelector(opts ...selector.Option) selector.Selector {
return selector.NewSelector(opts...)
}

View File

@ -35,12 +35,8 @@ type Filter func([]*registry.Service) []*registry.Service
type Strategy func([]*registry.Service) Next type Strategy func([]*registry.Service) Next
var ( var (
DefaultSelector = newDefaultSelector() DefaultSelector = NewSelector()
ErrNotFound = errors.New("not found") ErrNotFound = errors.New("not found")
ErrNoneAvailable = errors.New("none available") ErrNoneAvailable = errors.New("none available")
) )
func NewSelector(opts ...Option) Selector {
return newDefaultSelector(opts...)
}