Merge pull request #81 from micro/selector

Make Selector pluggable with Strategy's, Filters, etc.
This commit is contained in:
Asim Aslam 2016-05-07 00:24:57 +01:00
commit 24220fe615
17 changed files with 666 additions and 626 deletions

View File

@ -24,10 +24,7 @@ import (
// selectors // selectors
"github.com/micro/go-micro/selector" "github.com/micro/go-micro/selector"
"github.com/micro/go-micro/selector/blacklist"
"github.com/micro/go-micro/selector/cache" "github.com/micro/go-micro/selector/cache"
"github.com/micro/go-micro/selector/random"
"github.com/micro/go-micro/selector/roundrobin"
// transports // transports
"github.com/micro/go-micro/transport" "github.com/micro/go-micro/transport"
@ -119,7 +116,7 @@ var (
cli.StringFlag{ cli.StringFlag{
Name: "selector", Name: "selector",
EnvVar: "MICRO_SELECTOR", EnvVar: "MICRO_SELECTOR",
Usage: "Selector used to pick nodes for querying. random, roundrobin, blacklist", Usage: "Selector used to pick nodes for querying",
}, },
cli.StringFlag{ cli.StringFlag{
Name: "transport", Name: "transport",
@ -144,10 +141,8 @@ var (
} }
DefaultSelectors = map[string]func(...selector.Option) selector.Selector{ DefaultSelectors = map[string]func(...selector.Option) selector.Selector{
"default": selector.NewSelector,
"cache": cache.NewSelector, "cache": cache.NewSelector,
"random": random.NewSelector,
"roundrobin": roundrobin.NewSelector,
"blacklist": blacklist.NewSelector,
} }
DefaultTransports = map[string]func(...transport.Option) transport.Transport{ DefaultTransports = map[string]func(...transport.Option) transport.Transport{
@ -157,7 +152,7 @@ var (
// used for default selection as the fall back // used for default selection as the fall back
defaultBroker = "http" defaultBroker = "http"
defaultRegistry = "consul" defaultRegistry = "consul"
defaultSelector = "random" defaultSelector = "default"
defaultTransport = "http" defaultTransport = "http"
) )

View File

@ -1,182 +0,0 @@
package blacklist
import (
"math/rand"
"sync"
"time"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector"
)
type blackListNode struct {
age time.Time
id string
service string
}
type blackListSelector struct {
so selector.Options
ttl int64
exit chan bool
once sync.Once
sync.RWMutex
bl map[string]blackListNode
}
func init() {
rand.Seed(time.Now().Unix())
}
func (r *blackListSelector) purge() {
now := time.Now()
r.Lock()
for k, v := range r.bl {
if d := v.age.Sub(now); d.Seconds() < 0 {
delete(r.bl, k)
}
}
r.Unlock()
}
func (r *blackListSelector) run() {
t := time.NewTicker(time.Duration(r.ttl) * time.Second)
for {
select {
case <-r.exit:
t.Stop()
return
case <-t.C:
r.purge()
}
}
}
func (r *blackListSelector) Init(opts ...selector.Option) error {
for _, o := range opts {
o(&r.so)
}
return nil
}
func (r *blackListSelector) Options() selector.Options {
return r.so
}
func (r *blackListSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) {
var sopts selector.SelectOptions
for _, opt := range opts {
opt(&sopts)
}
// get the service
services, err := r.so.Registry.GetService(service)
if err != nil {
return nil, err
}
// apply the filters
for _, filter := range sopts.Filters {
services = filter(services)
}
// if there's nothing left, return
if len(services) == 0 {
return nil, selector.ErrNotFound
}
var nodes []*registry.Node
for _, service := range services {
for _, node := range service.Nodes {
nodes = append(nodes, node)
}
}
if len(nodes) == 0 {
return nil, selector.ErrNotFound
}
return func() (*registry.Node, error) {
var viable []*registry.Node
r.RLock()
for _, node := range nodes {
if _, ok := r.bl[node.Id]; !ok {
viable = append(viable, node)
}
}
r.RUnlock()
if len(viable) == 0 {
return nil, selector.ErrNoneAvailable
}
return viable[rand.Int()%len(viable)], nil
}, nil
}
func (r *blackListSelector) Mark(service string, node *registry.Node, err error) {
r.Lock()
defer r.Unlock()
if err == nil {
delete(r.bl, node.Id)
return
}
r.bl[node.Id] = blackListNode{
age: time.Now().Add(time.Duration(r.ttl) * time.Second),
id: node.Id,
service: service,
}
return
}
func (r *blackListSelector) Reset(service string) {
r.Lock()
defer r.Unlock()
for k, v := range r.bl {
if v.service == service {
delete(r.bl, k)
}
}
return
}
func (r *blackListSelector) Close() error {
r.once.Do(func() {
close(r.exit)
})
return nil
}
func (r *blackListSelector) String() string {
return "blacklist"
}
func NewSelector(opts ...selector.Option) selector.Selector {
var sopts selector.Options
for _, opt := range opts {
opt(&sopts)
}
if sopts.Registry == nil {
sopts.Registry = registry.DefaultRegistry
}
var once sync.Once
bl := &blackListSelector{
once: once,
so: sopts,
ttl: 60,
bl: make(map[string]blackListNode),
exit: make(chan bool),
}
go bl.run()
return bl
}

View File

@ -1,73 +0,0 @@
package blacklist
import (
"errors"
"testing"
"time"
"github.com/micro/go-micro/registry/mock"
"github.com/micro/go-micro/selector"
)
func TestBlackListSelector(t *testing.T) {
counts := map[string]int{}
bl := &blackListSelector{
so: selector.Options{
Registry: mock.NewRegistry(),
},
ttl: 2,
bl: make(map[string]blackListNode),
exit: make(chan bool),
}
go bl.run()
defer bl.Close()
next, err := bl.Select("foo")
if err != nil {
t.Errorf("Unexpected error calling bl select: %v", err)
}
for i := 0; i < 100; i++ {
node, err := next()
if err != nil {
t.Errorf("Expected node err, got err: %v", err)
}
counts[node.Id]++
}
t.Logf("BlackList Counts %v", counts)
// test blacklisting
for i := 0; i < 4; i++ {
node, err := next()
if err != nil {
t.Errorf("Expected node err, got err: %v", err)
}
bl.Mark("foo", node, errors.New("blacklist"))
}
if node, err := next(); err != selector.ErrNoneAvailable {
t.Errorf("Expected none available err, got node %v err %v", node, err)
}
time.Sleep(time.Second * time.Duration(bl.ttl) * 2)
if _, err := next(); err != nil {
t.Errorf("Unexpected err %v", err)
}
// test resetting
for i := 0; i < 4; i++ {
node, err := next()
if err != nil {
t.Errorf("Unexpected err: %v", err)
}
bl.Mark("foo", node, errors.New("blacklist"))
}
if node, err := next(); err != selector.ErrNoneAvailable {
t.Errorf("Expected none available err, got node %v err %v", node, err)
}
bl.Reset("foo")
if _, err := next(); err != nil {
t.Errorf("Unexpected err %v", err)
}
}

View File

@ -2,12 +2,12 @@ package cache
import ( import (
"log" "log"
"math/rand"
"sync" "sync"
"time" "time"
"github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector" "github.com/micro/go-micro/selector"
"github.com/micro/go-micro/selector/internal/blacklist"
) )
/* /*
@ -27,16 +27,15 @@ type cacheSelector struct {
// used to close or reload watcher // used to close or reload watcher
reload chan bool reload chan bool
exit chan bool exit chan bool
// blacklist
bl *blacklist.BlackList
} }
var ( var (
DefaultTTL = time.Minute DefaultTTL = time.Minute
) )
func init() {
rand.Seed(time.Now().UnixNano())
}
func (c *cacheSelector) quit() bool { func (c *cacheSelector) quit() bool {
select { select {
case <-c.exit: case <-c.exit:
@ -329,7 +328,10 @@ func (c *cacheSelector) Options() selector.Options {
} }
func (c *cacheSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) { func (c *cacheSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) {
var sopts selector.SelectOptions sopts := selector.SelectOptions{
Strategy: c.so.Strategy,
}
for _, opt := range opts { for _, opt := range opts {
opt(&sopts) opt(&sopts)
} }
@ -347,44 +349,28 @@ func (c *cacheSelector) Select(service string, opts ...selector.SelectOption) (s
services = filter(services) services = filter(services)
} }
services, err = c.bl.Filter(services)
if err != nil {
return nil, err
}
// if there's nothing left, return // if there's nothing left, return
if len(services) == 0 { if len(services) == 0 {
return nil, selector.ErrNotFound return nil, selector.ErrNoneAvailable
} }
var nodes []*registry.Node return sopts.Strategy(services), nil
for _, service := range services {
for _, node := range service.Nodes {
nodes = append(nodes, node)
}
}
if len(nodes) == 0 {
return nil, selector.ErrNotFound
}
return func() (*registry.Node, error) {
i := rand.Int()
j := i % len(services)
if len(services[j].Nodes) == 0 {
return nil, selector.ErrNotFound
}
k := i % len(services[j].Nodes)
return services[j].Nodes[k], nil
}, nil
} }
func (c *cacheSelector) Mark(service string, node *registry.Node, err error) { func (c *cacheSelector) Mark(service string, node *registry.Node, err error) {
return c.bl.Mark(service, node, err)
} }
func (c *cacheSelector) Reset(service string) { func (c *cacheSelector) Reset(service string) {
c.Lock() c.Lock()
delete(c.cache, service) c.del(service)
c.Unlock() c.Unlock()
c.bl.Reset(service)
} }
// Close stops the watcher and destroys the cache // Close stops the watcher and destroys the cache
@ -398,6 +384,7 @@ func (c *cacheSelector) Close() error {
return nil return nil
default: default:
close(c.exit) close(c.exit)
c.bl.Close()
} }
return nil return nil
} }
@ -407,7 +394,9 @@ func (c *cacheSelector) String() string {
} }
func NewSelector(opts ...selector.Option) selector.Selector { func NewSelector(opts ...selector.Option) selector.Selector {
var sopts selector.Options sopts := selector.Options{
Strategy: selector.Random,
}
for _, opt := range opts { for _, opt := range opts {
opt(&sopts) opt(&sopts)
@ -432,6 +421,7 @@ func NewSelector(opts ...selector.Option) selector.Selector {
ttls: make(map[string]time.Time), ttls: make(map[string]time.Time),
reload: make(chan bool, 1), reload: make(chan bool, 1),
exit: make(chan bool), exit: make(chan bool),
bl: blacklist.New(),
} }
go c.run() go c.run()

124
selector/default.go Normal file
View File

@ -0,0 +1,124 @@
package selector
import (
"math/rand"
"time"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector/internal/blacklist"
)
type defaultSelector struct {
so Options
exit chan bool
bl *blacklist.BlackList
}
func init() {
rand.Seed(time.Now().Unix())
}
func (r *defaultSelector) run() {
t := time.NewTicker(time.Second * 30)
for {
select {
case <-t.C:
// TODO
case <-r.exit:
t.Stop()
return
}
}
}
func (r *defaultSelector) Init(opts ...Option) error {
for _, o := range opts {
o(&r.so)
}
return nil
}
func (r *defaultSelector) Options() Options {
return r.so
}
func (r *defaultSelector) Select(service string, opts ...SelectOption) (Next, error) {
sopts := SelectOptions{
Strategy: r.so.Strategy,
}
for _, opt := range opts {
opt(&sopts)
}
// get the service
services, err := r.so.Registry.GetService(service)
if err != nil {
return nil, err
}
// apply the filters
for _, filter := range sopts.Filters {
services = filter(services)
}
// apply the blacklist
services, err = r.bl.Filter(services)
if err != nil {
return nil, err
}
// if there's nothing left, return
if len(services) == 0 {
return nil, ErrNoneAvailable
}
return sopts.Strategy(services), nil
}
func (r *defaultSelector) Mark(service string, node *registry.Node, err error) {
r.bl.Mark(service, node, err)
}
func (r *defaultSelector) Reset(service string) {
r.bl.Reset(service)
}
func (r *defaultSelector) Close() error {
select {
case <-r.exit:
return nil
default:
close(r.exit)
r.bl.Close()
}
return nil
}
func (r *defaultSelector) String() string {
return "default"
}
func newDefaultSelector(opts ...Option) Selector {
sopts := Options{
Strategy: Random,
}
for _, opt := range opts {
opt(&sopts)
}
if sopts.Registry == nil {
sopts.Registry = registry.DefaultRegistry
}
se := &defaultSelector{
so: sopts,
exit: make(chan bool),
bl: blacklist.New(),
}
go se.run()
return se
}

106
selector/default_test.go Normal file
View File

@ -0,0 +1,106 @@
package selector
import (
"errors"
"testing"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/registry/mock"
)
func TestDefaultSelector(t *testing.T) {
counts := map[string]int{}
rs := newDefaultSelector(Registry(mock.NewRegistry()))
next, err := rs.Select("foo")
if err != nil {
t.Errorf("Unexpected error calling default select: %v", err)
}
for i := 0; i < 100; i++ {
node, err := next()
if err != nil {
t.Errorf("Expected node err, got err: %v", err)
}
counts[node.Id]++
}
t.Logf("Default Counts %v", counts)
}
func TestBlackList(t *testing.T) {
r := mock.NewRegistry()
r.Register(&registry.Service{
Name: "test",
Nodes: []*registry.Node{
&registry.Node{
Id: "test-1",
Address: "localhost",
Port: 10001,
},
&registry.Node{
Id: "test-2",
Address: "localhost",
Port: 10002,
},
&registry.Node{
Id: "test-3",
Address: "localhost",
Port: 10002,
},
},
})
rs := newDefaultSelector(Registry(r))
next, err := rs.Select("test")
if err != nil {
t.Fatal(err)
}
node, err := next()
if err != nil {
t.Fatal(err)
}
for i := 0; i < 4; i++ {
rs.Mark("test", node, errors.New("error"))
}
next, err = rs.Select("test")
if err != nil {
t.Fatal(err)
}
// still expecting 2 nodes
seen := make(map[string]bool)
for i := 0; i < 10; i++ {
node, err = next()
if err != nil {
t.Fatal(err)
}
seen[node.Id] = true
}
if len(seen) != 2 {
t.Fatalf("Expected seen to be 2 %+v", seen)
}
// blacklist all of it
for i := 0; i < 9; i++ {
node, err = next()
if err != nil {
t.Fatal(err)
}
rs.Mark("test", node, errors.New("error"))
}
next, err = rs.Select("test")
if err != ErrNoneAvailable {
t.Fatalf("Expected %v got %v", ErrNoneAvailable, err)
}
}

View File

@ -0,0 +1,164 @@
package blacklist
import (
"math/rand"
"sync"
"time"
"github.com/micro/go-micro/registry"
)
type blackListNode struct {
age time.Time
id string
service string
count int
}
type BlackList struct {
ttl int
exit chan bool
sync.RWMutex
bl map[string]blackListNode
}
var (
// number of times we see an error before blacklisting
count = 3
// the ttl to blacklist for
ttl = 30
)
func init() {
rand.Seed(time.Now().Unix())
}
func (r *BlackList) purge() {
now := time.Now()
r.Lock()
for k, v := range r.bl {
if d := v.age.Sub(now); d.Seconds() < 0 {
delete(r.bl, k)
}
}
r.Unlock()
}
func (r *BlackList) run() {
t := time.NewTicker(time.Duration(r.ttl) * time.Second)
for {
select {
case <-r.exit:
t.Stop()
return
case <-t.C:
r.purge()
}
}
}
func (r *BlackList) Filter(services []*registry.Service) ([]*registry.Service, error) {
var viableServices []*registry.Service
r.RLock()
for _, service := range services {
var viableNodes []*registry.Node
for _, node := range service.Nodes {
n, ok := r.bl[node.Id]
if !ok {
// blacklist miss so add it
viableNodes = append(viableNodes, node)
continue
}
// got some blacklist info
// skip the node if it exceeds count
if n.count >= count {
continue
}
// doesn't exceed count, still viable
viableNodes = append(viableNodes, node)
}
if len(viableNodes) == 0 {
continue
}
viableService := new(registry.Service)
*viableService = *service
viableService.Nodes = viableNodes
viableServices = append(viableServices, viableService)
}
r.RUnlock()
return viableServices, nil
}
func (r *BlackList) Mark(service string, node *registry.Node, err error) {
r.Lock()
defer r.Unlock()
// reset when error is nil
// basically closing the circuit
if err == nil {
delete(r.bl, node.Id)
return
}
n, ok := r.bl[node.Id]
if !ok {
n = blackListNode{
id: node.Id,
service: service,
}
}
// mark it
n.count++
// set age to ttl seconds in future
n.age = time.Now().Add(time.Duration(r.ttl) * time.Second)
// save
r.bl[node.Id] = n
}
func (r *BlackList) Reset(service string) {
r.Lock()
defer r.Unlock()
for k, v := range r.bl {
// delete every node that matches the service
if v.service == service {
delete(r.bl, k)
}
}
}
func (r *BlackList) Close() error {
select {
case <-r.exit:
return nil
default:
close(r.exit)
}
return nil
}
func New() *BlackList {
bl := &BlackList{
ttl: ttl,
bl: make(map[string]blackListNode),
exit: make(chan bool),
}
go bl.run()
return bl
}

View File

@ -0,0 +1,107 @@
package blacklist
import (
"errors"
"testing"
"time"
"github.com/micro/go-micro/registry"
)
func TestBlackList(t *testing.T) {
bl := &BlackList{
ttl: 1,
bl: make(map[string]blackListNode),
exit: make(chan bool),
}
go bl.run()
defer bl.Close()
services := []*registry.Service{
&registry.Service{
Name: "foo",
Nodes: []*registry.Node{
&registry.Node{
Id: "foo-1",
Address: "localhost",
Port: 10001,
},
&registry.Node{
Id: "foo-2",
Address: "localhost",
Port: 10002,
},
&registry.Node{
Id: "foo-3",
Address: "localhost",
Port: 10002,
},
},
},
}
// check nothing is filtered on clean run
filterTest := func() {
for i := 0; i < 3; i++ {
srvs, err := bl.Filter(services)
if err != nil {
t.Fatal(err)
}
if len(srvs) != len(services) {
t.Fatal("nodes were filtered when they shouldn't be")
}
for _, node := range srvs[0].Nodes {
var seen bool
for _, n := range srvs[0].Nodes {
if n.Id == node.Id {
seen = true
break
}
}
if !seen {
t.Fatalf("Missing node %s", node.Id)
}
}
}
}
// run filter test
filterTest()
blacklistTest := func() {
// test blacklisting
// mark until failure
for i := 0; i < count+1; i++ {
for _, node := range services[0].Nodes {
bl.Mark("foo", node, errors.New("blacklist"))
}
}
filtered, err := bl.Filter(services)
if err != nil {
t.Fatal(err)
}
if len(filtered) > 0 {
t.Fatalf("Expected zero nodes got %+v", filtered)
}
}
// sleep the ttl duration
time.Sleep(time.Second * time.Duration(bl.ttl) * 2)
// now run filterTest again
filterTest()
// run the blacklist test
blacklistTest()
// reset
bl.Reset("foo")
// check again
filterTest()
}

View File

@ -8,6 +8,7 @@ import (
type Options struct { type Options struct {
Registry registry.Registry Registry registry.Registry
Strategy Strategy
// Other options for implementations of the interface // Other options for implementations of the interface
// can be stored in a context // can be stored in a context
@ -16,6 +17,7 @@ type Options struct {
type SelectOptions struct { type SelectOptions struct {
Filters []Filter Filters []Filter
Strategy Strategy
// Other options for implementations of the interface // Other options for implementations of the interface
// can be stored in a context // can be stored in a context
@ -35,6 +37,13 @@ func Registry(r registry.Registry) Option {
} }
} }
// SetStrategy sets the default strategy for the selector
func SetStrategy(fn Strategy) Option {
return func(o *Options) {
o.Strategy = fn
}
}
// WithFilter adds a filter function to the list of filters // WithFilter adds a filter function to the list of filters
// used during the Select call. // used during the Select call.
func WithFilter(fn ...Filter) SelectOption { func WithFilter(fn ...Filter) SelectOption {
@ -42,3 +51,10 @@ func WithFilter(fn ...Filter) SelectOption {
o.Filters = append(o.Filters, fn...) o.Filters = append(o.Filters, fn...)
} }
} }
// Strategy sets the selector strategy
func WithStrategy(fn Strategy) SelectOption {
return func(o *SelectOptions) {
o.Strategy = fn
}
}

View File

@ -1,9 +0,0 @@
package random
import (
"github.com/micro/go-micro/selector"
)
func NewSelector(opts ...selector.Option) selector.Selector {
return selector.NewSelector(opts...)
}

View File

@ -1,104 +0,0 @@
package selector
import (
"math/rand"
"time"
"github.com/micro/go-micro/registry"
)
type randomSelector struct {
so Options
}
func init() {
rand.Seed(time.Now().Unix())
}
func (r *randomSelector) Init(opts ...Option) error {
for _, o := range opts {
o(&r.so)
}
return nil
}
func (r *randomSelector) Options() Options {
return r.so
}
func (r *randomSelector) Select(service string, opts ...SelectOption) (Next, error) {
var sopts SelectOptions
for _, opt := range opts {
opt(&sopts)
}
// get the service
services, err := r.so.Registry.GetService(service)
if err != nil {
return nil, err
}
// apply the filters
for _, filter := range sopts.Filters {
services = filter(services)
}
// if there's nothing left, return
if len(services) == 0 {
return nil, ErrNotFound
}
var nodes []*registry.Node
for _, service := range services {
for _, node := range service.Nodes {
nodes = append(nodes, node)
}
}
if len(nodes) == 0 {
return nil, ErrNotFound
}
return func() (*registry.Node, error) {
i := rand.Int()
j := i % len(services)
if len(services[j].Nodes) == 0 {
return nil, ErrNotFound
}
k := i % len(services[j].Nodes)
return services[j].Nodes[k], nil
}, nil
}
func (r *randomSelector) Mark(service string, node *registry.Node, err error) {
return
}
func (r *randomSelector) Reset(service string) {
return
}
func (r *randomSelector) Close() error {
return nil
}
func (r *randomSelector) String() string {
return "random"
}
func newRandomSelector(opts ...Option) Selector {
var sopts Options
for _, opt := range opts {
opt(&sopts)
}
if sopts.Registry == nil {
sopts.Registry = registry.DefaultRegistry
}
return &randomSelector{sopts}
}

View File

@ -1,32 +0,0 @@
package selector
import (
"testing"
"github.com/micro/go-micro/registry/mock"
)
func TestRandomSelector(t *testing.T) {
counts := map[string]int{}
rs := &randomSelector{
so: Options{
Registry: mock.NewRegistry(),
},
}
next, err := rs.Select("foo")
if err != nil {
t.Errorf("Unexpected error calling random select: %v", err)
}
for i := 0; i < 100; i++ {
node, err := next()
if err != nil {
t.Errorf("Expected node err, got err: %v", err)
}
counts[node.Id]++
}
t.Logf("Random Counts %v", counts)
}

View File

@ -1,98 +0,0 @@
package roundrobin
import (
"sync"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector"
)
type roundRobinSelector struct {
so selector.Options
}
func (r *roundRobinSelector) Init(opts ...selector.Option) error {
for _, o := range opts {
o(&r.so)
}
return nil
}
func (r *roundRobinSelector) Options() selector.Options {
return r.so
}
func (r *roundRobinSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) {
var sopts selector.SelectOptions
for _, opt := range opts {
opt(&sopts)
}
// get the service
services, err := r.so.Registry.GetService(service)
if err != nil {
return nil, err
}
// apply the filters
for _, filter := range sopts.Filters {
services = filter(services)
}
// if there's nothing left, return
if len(services) == 0 {
return nil, selector.ErrNotFound
}
var nodes []*registry.Node
for _, service := range services {
for _, node := range service.Nodes {
nodes = append(nodes, node)
}
}
if len(nodes) == 0 {
return nil, selector.ErrNotFound
}
var i int
var mtx sync.Mutex
return func() (*registry.Node, error) {
mtx.Lock()
defer mtx.Unlock()
i++
return nodes[i%len(nodes)], nil
}, nil
}
func (r *roundRobinSelector) Mark(service string, node *registry.Node, err error) {
return
}
func (r *roundRobinSelector) Reset(service string) {
return
}
func (r *roundRobinSelector) Close() error {
return nil
}
func (r *roundRobinSelector) String() string {
return "roundrobin"
}
func NewSelector(opts ...selector.Option) selector.Selector {
var sopts selector.Options
for _, opt := range opts {
opt(&sopts)
}
if sopts.Registry == nil {
sopts.Registry = registry.DefaultRegistry
}
return &roundRobinSelector{sopts}
}

View File

@ -1,33 +0,0 @@
package roundrobin
import (
"testing"
"github.com/micro/go-micro/registry/mock"
"github.com/micro/go-micro/selector"
)
func TestRoundRobinSelector(t *testing.T) {
counts := map[string]int{}
rr := &roundRobinSelector{
so: selector.Options{
Registry: mock.NewRegistry(),
},
}
next, err := rr.Select("foo")
if err != nil {
t.Errorf("Unexpected error calling rr select: %v", err)
}
for i := 0; i < 100; i++ {
node, err := next()
if err != nil {
t.Errorf("Expected node err, got err: %v", err)
}
counts[node.Id]++
}
t.Logf("Round Robin Counts %v", counts)
}

View File

@ -3,55 +3,6 @@ The Selector package provides a way to algorithmically filter and return
nodes required by the client or any other system. Selector's implemented nodes required by the client or any other system. Selector's implemented
by Micro build on the registry but it's of optional use. One could by Micro build on the registry but it's of optional use. One could
provide a static Selector that has a fixed pool. provide a static Selector that has a fixed pool.
func (r *randomSelector) Select(service string, opts ...SelectOption) (Next, error) {
var sopts SelectOptions
for _, opt := range opts {
opt(&sopts)
}
// get the service
services, err := r.so.Registry.GetService(service)
if err != nil {
return nil, err
}
// apply the filters
for _, filter := range sopts.Filters {
services = filter(services)
}
// if there's nothing left, return
if len(services) == 0 {
return nil, ErrNotFound
}
var nodes []*registry.Node
for _, service := range services {
for _, node := range service.Nodes {
nodes = append(nodes, node)
}
}
if len(nodes) == 0 {
return nil, ErrNotFound
}
return func() (*registry.Node, error) {
i := rand.Int()
j := i % len(services)
if len(services[j].Nodes) == 0 {
return nil, ErrNotFound
}
k := i % len(services[j].Nodes)
return services[j].Nodes[k], nil
}, nil
}
*/ */
package selector package selector
@ -79,19 +30,22 @@ type Selector interface {
} }
// Next is a function that returns the next node // Next is a function that returns the next node
// based on the selector's algorithm // based on the selector's strategy
type Next func() (*registry.Node, error) type Next func() (*registry.Node, error)
// Filter is used to filter a service during the selection process // Filter is used to filter a service during the selection process
type Filter func([]*registry.Service) []*registry.Service type Filter func([]*registry.Service) []*registry.Service
// Strategy is a selection strategy e.g random, round robin
type Strategy func([]*registry.Service) Next
var ( var (
DefaultSelector = newRandomSelector() DefaultSelector = newDefaultSelector()
ErrNotFound = errors.New("not found") ErrNotFound = errors.New("not found")
ErrNoneAvailable = errors.New("none available") ErrNoneAvailable = errors.New("none available")
) )
func NewSelector(opts ...Option) Selector { func NewSelector(opts ...Option) Selector {
return newRandomSelector(opts...) return newDefaultSelector(opts...)
} }

56
selector/strategy.go Normal file
View File

@ -0,0 +1,56 @@
package selector
import (
"math/rand"
"sync"
"time"
"github.com/micro/go-micro/registry"
)
func init() {
rand.Seed(time.Now().UnixNano())
}
// Random is a random strategy algorithm for node selection
func Random(services []*registry.Service) Next {
var nodes []*registry.Node
for _, service := range services {
nodes = append(nodes, service.Nodes...)
}
return func() (*registry.Node, error) {
if len(nodes) == 0 {
return nil, ErrNoneAvailable
}
i := rand.Int() % len(nodes)
return nodes[i], nil
}
}
// RoundRobin is a roundrobin strategy algorithm for node selection
func RoundRobin(services []*registry.Service) Next {
var nodes []*registry.Node
for _, service := range services {
nodes = append(nodes, service.Nodes...)
}
var i int
var mtx sync.Mutex
return func() (*registry.Node, error) {
if len(nodes) == 0 {
return nil, ErrNoneAvailable
}
mtx.Lock()
node := nodes[i%len(nodes)]
i++
mtx.Unlock()
return node, nil
}
}

59
selector/strategy_test.go Normal file
View File

@ -0,0 +1,59 @@
package selector
import (
"testing"
"github.com/micro/go-micro/registry"
)
func TestStrategies(t *testing.T) {
testData := []*registry.Service{
&registry.Service{
Name: "test1",
Version: "latest",
Nodes: []*registry.Node{
&registry.Node{
Id: "test1-1",
Address: "10.0.0.1",
Port: 1001,
},
&registry.Node{
Id: "test1-2",
Address: "10.0.0.2",
Port: 1002,
},
},
},
&registry.Service{
Name: "test1",
Version: "default",
Nodes: []*registry.Node{
&registry.Node{
Id: "test1-3",
Address: "10.0.0.3",
Port: 1003,
},
&registry.Node{
Id: "test1-4",
Address: "10.0.0.4",
Port: 1004,
},
},
},
}
for name, strategy := range map[string]Strategy{"random": Random, "roundrobin": RoundRobin} {
next := strategy(testData)
counts := make(map[string]int)
for i := 0; i < 100; i++ {
node, err := next()
if err != nil {
t.Fatal(err)
}
counts[node.Id]++
}
t.Logf("%s: %+v\n", name, counts)
}
}