First commit in strategy rework
This commit is contained in:
parent
b13361d010
commit
0a4484b406
38
selector/cache/cache.go
vendored
38
selector/cache/cache.go
vendored
@ -2,7 +2,6 @@ package cache
|
||||
|
||||
import (
|
||||
"log"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -33,10 +32,6 @@ var (
|
||||
DefaultTTL = time.Minute
|
||||
)
|
||||
|
||||
func init() {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
}
|
||||
|
||||
func (c *cacheSelector) quit() bool {
|
||||
select {
|
||||
case <-c.exit:
|
||||
@ -329,7 +324,10 @@ func (c *cacheSelector) Options() selector.Options {
|
||||
}
|
||||
|
||||
func (c *cacheSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) {
|
||||
var sopts selector.SelectOptions
|
||||
sopts := selector.SelectOptions{
|
||||
Strategy: c.so.Strategy,
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(&sopts)
|
||||
}
|
||||
@ -352,29 +350,7 @@ func (c *cacheSelector) Select(service string, opts ...selector.SelectOption) (s
|
||||
return nil, selector.ErrNotFound
|
||||
}
|
||||
|
||||
var nodes []*registry.Node
|
||||
|
||||
for _, service := range services {
|
||||
for _, node := range service.Nodes {
|
||||
nodes = append(nodes, node)
|
||||
}
|
||||
}
|
||||
|
||||
if len(nodes) == 0 {
|
||||
return nil, selector.ErrNotFound
|
||||
}
|
||||
|
||||
return func() (*registry.Node, error) {
|
||||
i := rand.Int()
|
||||
j := i % len(services)
|
||||
|
||||
if len(services[j].Nodes) == 0 {
|
||||
return nil, selector.ErrNotFound
|
||||
}
|
||||
|
||||
k := i % len(services[j].Nodes)
|
||||
return services[j].Nodes[k], nil
|
||||
}, nil
|
||||
return sopts.Strategy(services), nil
|
||||
}
|
||||
|
||||
func (c *cacheSelector) Mark(service string, node *registry.Node, err error) {
|
||||
@ -407,7 +383,9 @@ func (c *cacheSelector) String() string {
|
||||
}
|
||||
|
||||
func NewSelector(opts ...selector.Option) selector.Selector {
|
||||
var sopts selector.Options
|
||||
sopts := selector.Options{
|
||||
Strategy: selector.Random,
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(&sopts)
|
||||
|
87
selector/default.go
Normal file
87
selector/default.go
Normal file
@ -0,0 +1,87 @@
|
||||
package selector
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
"github.com/micro/go-micro/registry"
|
||||
)
|
||||
|
||||
type defaultSelector struct {
|
||||
so Options
|
||||
}
|
||||
|
||||
func init() {
|
||||
rand.Seed(time.Now().Unix())
|
||||
}
|
||||
|
||||
func (r *defaultSelector) Init(opts ...Option) error {
|
||||
for _, o := range opts {
|
||||
o(&r.so)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *defaultSelector) Options() Options {
|
||||
return r.so
|
||||
}
|
||||
|
||||
func (r *defaultSelector) Select(service string, opts ...SelectOption) (Next, error) {
|
||||
sopts := SelectOptions{
|
||||
Strategy: r.so.Strategy,
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(&sopts)
|
||||
}
|
||||
|
||||
// get the service
|
||||
services, err := r.so.Registry.GetService(service)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// apply the filters
|
||||
for _, filter := range sopts.Filters {
|
||||
services = filter(services)
|
||||
}
|
||||
|
||||
// if there's nothing left, return
|
||||
if len(services) == 0 {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
|
||||
return sopts.Strategy(services), nil
|
||||
}
|
||||
|
||||
func (r *defaultSelector) Mark(service string, node *registry.Node, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (r *defaultSelector) Reset(service string) {
|
||||
return
|
||||
}
|
||||
|
||||
func (r *defaultSelector) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *defaultSelector) String() string {
|
||||
return "default"
|
||||
}
|
||||
|
||||
func newDefaultSelector(opts ...Option) Selector {
|
||||
sopts := Options{
|
||||
Strategy: Random,
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(&sopts)
|
||||
}
|
||||
|
||||
if sopts.Registry == nil {
|
||||
sopts.Registry = registry.DefaultRegistry
|
||||
}
|
||||
|
||||
return &defaultSelector{sopts}
|
||||
}
|
@ -9,15 +9,11 @@ import (
|
||||
func TestRandomSelector(t *testing.T) {
|
||||
counts := map[string]int{}
|
||||
|
||||
rs := &randomSelector{
|
||||
so: Options{
|
||||
Registry: mock.NewRegistry(),
|
||||
},
|
||||
}
|
||||
rs := newDefaultSelector(Registry(mock.NewRegistry()))
|
||||
|
||||
next, err := rs.Select("foo")
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error calling random select: %v", err)
|
||||
t.Errorf("Unexpected error calling default select: %v", err)
|
||||
}
|
||||
|
||||
for i := 0; i < 100; i++ {
|
@ -8,6 +8,7 @@ import (
|
||||
|
||||
type Options struct {
|
||||
Registry registry.Registry
|
||||
Strategy Strategy
|
||||
|
||||
// Other options for implementations of the interface
|
||||
// can be stored in a context
|
||||
@ -15,7 +16,8 @@ type Options struct {
|
||||
}
|
||||
|
||||
type SelectOptions struct {
|
||||
Filters []Filter
|
||||
Filters []Filter
|
||||
Strategy Strategy
|
||||
|
||||
// Other options for implementations of the interface
|
||||
// can be stored in a context
|
||||
@ -35,6 +37,13 @@ func Registry(r registry.Registry) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// SetStrategy sets the default strategy for the selector
|
||||
func SetStrategy(fn Strategy) Option {
|
||||
return func(o *Options) {
|
||||
o.Strategy = fn
|
||||
}
|
||||
}
|
||||
|
||||
// WithFilter adds a filter function to the list of filters
|
||||
// used during the Select call.
|
||||
func WithFilter(fn ...Filter) SelectOption {
|
||||
@ -42,3 +51,10 @@ func WithFilter(fn ...Filter) SelectOption {
|
||||
o.Filters = append(o.Filters, fn...)
|
||||
}
|
||||
}
|
||||
|
||||
// Strategy sets the selector strategy
|
||||
func WithStrategy(fn Strategy) SelectOption {
|
||||
return func(o *SelectOptions) {
|
||||
o.Strategy = fn
|
||||
}
|
||||
}
|
||||
|
@ -1,9 +0,0 @@
|
||||
package random
|
||||
|
||||
import (
|
||||
"github.com/micro/go-micro/selector"
|
||||
)
|
||||
|
||||
func NewSelector(opts ...selector.Option) selector.Selector {
|
||||
return selector.NewSelector(opts...)
|
||||
}
|
@ -1,104 +0,0 @@
|
||||
package selector
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
"github.com/micro/go-micro/registry"
|
||||
)
|
||||
|
||||
type randomSelector struct {
|
||||
so Options
|
||||
}
|
||||
|
||||
func init() {
|
||||
rand.Seed(time.Now().Unix())
|
||||
}
|
||||
|
||||
func (r *randomSelector) Init(opts ...Option) error {
|
||||
for _, o := range opts {
|
||||
o(&r.so)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *randomSelector) Options() Options {
|
||||
return r.so
|
||||
}
|
||||
|
||||
func (r *randomSelector) Select(service string, opts ...SelectOption) (Next, error) {
|
||||
var sopts SelectOptions
|
||||
for _, opt := range opts {
|
||||
opt(&sopts)
|
||||
}
|
||||
|
||||
// get the service
|
||||
services, err := r.so.Registry.GetService(service)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// apply the filters
|
||||
for _, filter := range sopts.Filters {
|
||||
services = filter(services)
|
||||
}
|
||||
|
||||
// if there's nothing left, return
|
||||
if len(services) == 0 {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
|
||||
var nodes []*registry.Node
|
||||
|
||||
for _, service := range services {
|
||||
for _, node := range service.Nodes {
|
||||
nodes = append(nodes, node)
|
||||
}
|
||||
}
|
||||
|
||||
if len(nodes) == 0 {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
|
||||
return func() (*registry.Node, error) {
|
||||
i := rand.Int()
|
||||
j := i % len(services)
|
||||
|
||||
if len(services[j].Nodes) == 0 {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
|
||||
k := i % len(services[j].Nodes)
|
||||
return services[j].Nodes[k], nil
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (r *randomSelector) Mark(service string, node *registry.Node, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (r *randomSelector) Reset(service string) {
|
||||
return
|
||||
}
|
||||
|
||||
func (r *randomSelector) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *randomSelector) String() string {
|
||||
return "random"
|
||||
}
|
||||
|
||||
func newRandomSelector(opts ...Option) Selector {
|
||||
var sopts Options
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(&sopts)
|
||||
}
|
||||
|
||||
if sopts.Registry == nil {
|
||||
sopts.Registry = registry.DefaultRegistry
|
||||
}
|
||||
|
||||
return &randomSelector{sopts}
|
||||
}
|
@ -1,98 +0,0 @@
|
||||
package roundrobin
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/micro/go-micro/registry"
|
||||
"github.com/micro/go-micro/selector"
|
||||
)
|
||||
|
||||
type roundRobinSelector struct {
|
||||
so selector.Options
|
||||
}
|
||||
|
||||
func (r *roundRobinSelector) Init(opts ...selector.Option) error {
|
||||
for _, o := range opts {
|
||||
o(&r.so)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *roundRobinSelector) Options() selector.Options {
|
||||
return r.so
|
||||
}
|
||||
|
||||
func (r *roundRobinSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) {
|
||||
var sopts selector.SelectOptions
|
||||
for _, opt := range opts {
|
||||
opt(&sopts)
|
||||
}
|
||||
|
||||
// get the service
|
||||
services, err := r.so.Registry.GetService(service)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// apply the filters
|
||||
for _, filter := range sopts.Filters {
|
||||
services = filter(services)
|
||||
}
|
||||
|
||||
// if there's nothing left, return
|
||||
if len(services) == 0 {
|
||||
return nil, selector.ErrNotFound
|
||||
}
|
||||
|
||||
var nodes []*registry.Node
|
||||
|
||||
for _, service := range services {
|
||||
for _, node := range service.Nodes {
|
||||
nodes = append(nodes, node)
|
||||
}
|
||||
}
|
||||
|
||||
if len(nodes) == 0 {
|
||||
return nil, selector.ErrNotFound
|
||||
}
|
||||
|
||||
var i int
|
||||
var mtx sync.Mutex
|
||||
|
||||
return func() (*registry.Node, error) {
|
||||
mtx.Lock()
|
||||
defer mtx.Unlock()
|
||||
i++
|
||||
return nodes[i%len(nodes)], nil
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (r *roundRobinSelector) Mark(service string, node *registry.Node, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (r *roundRobinSelector) Reset(service string) {
|
||||
return
|
||||
}
|
||||
|
||||
func (r *roundRobinSelector) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *roundRobinSelector) String() string {
|
||||
return "roundrobin"
|
||||
}
|
||||
|
||||
func NewSelector(opts ...selector.Option) selector.Selector {
|
||||
var sopts selector.Options
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(&sopts)
|
||||
}
|
||||
|
||||
if sopts.Registry == nil {
|
||||
sopts.Registry = registry.DefaultRegistry
|
||||
}
|
||||
|
||||
return &roundRobinSelector{sopts}
|
||||
}
|
@ -1,33 +0,0 @@
|
||||
package roundrobin
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/micro/go-micro/registry/mock"
|
||||
"github.com/micro/go-micro/selector"
|
||||
)
|
||||
|
||||
func TestRoundRobinSelector(t *testing.T) {
|
||||
counts := map[string]int{}
|
||||
|
||||
rr := &roundRobinSelector{
|
||||
so: selector.Options{
|
||||
Registry: mock.NewRegistry(),
|
||||
},
|
||||
}
|
||||
|
||||
next, err := rr.Select("foo")
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error calling rr select: %v", err)
|
||||
}
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
node, err := next()
|
||||
if err != nil {
|
||||
t.Errorf("Expected node err, got err: %v", err)
|
||||
}
|
||||
counts[node.Id]++
|
||||
}
|
||||
|
||||
t.Logf("Round Robin Counts %v", counts)
|
||||
}
|
@ -79,19 +79,22 @@ type Selector interface {
|
||||
}
|
||||
|
||||
// Next is a function that returns the next node
|
||||
// based on the selector's algorithm
|
||||
// based on the selector's strategy
|
||||
type Next func() (*registry.Node, error)
|
||||
|
||||
// Filter is used to filter a service during the selection process
|
||||
type Filter func([]*registry.Service) []*registry.Service
|
||||
|
||||
// Strategy is a selection strategy e.g random, round robin
|
||||
type Strategy func([]*registry.Service) Next
|
||||
|
||||
var (
|
||||
DefaultSelector = newRandomSelector()
|
||||
DefaultSelector = newDefaultSelector()
|
||||
|
||||
ErrNotFound = errors.New("not found")
|
||||
ErrNoneAvailable = errors.New("none available")
|
||||
)
|
||||
|
||||
func NewSelector(opts ...Option) Selector {
|
||||
return newRandomSelector(opts...)
|
||||
return newDefaultSelector(opts...)
|
||||
}
|
||||
|
56
selector/strategy.go
Normal file
56
selector/strategy.go
Normal file
@ -0,0 +1,56 @@
|
||||
package selector
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/micro/go-micro/registry"
|
||||
)
|
||||
|
||||
func init() {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
}
|
||||
|
||||
// Random is a random strategy algorithm for node selection
|
||||
func Random(services []*registry.Service) Next {
|
||||
var nodes []*registry.Node
|
||||
|
||||
for _, service := range services {
|
||||
nodes = append(nodes, service.Nodes...)
|
||||
}
|
||||
|
||||
return func() (*registry.Node, error) {
|
||||
if len(nodes) == 0 {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
|
||||
i := rand.Int() % len(nodes)
|
||||
return nodes[i], nil
|
||||
}
|
||||
}
|
||||
|
||||
// RoundRobin is a roundrobin strategy algorithm for node selection
|
||||
func RoundRobin(services []*registry.Service) Next {
|
||||
var nodes []*registry.Node
|
||||
|
||||
for _, service := range services {
|
||||
nodes = append(nodes, service.Nodes...)
|
||||
}
|
||||
|
||||
var i int
|
||||
var mtx sync.Mutex
|
||||
|
||||
return func() (*registry.Node, error) {
|
||||
if len(nodes) == 0 {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
|
||||
mtx.Lock()
|
||||
node := nodes[i%len(nodes)]
|
||||
i++
|
||||
mtx.Unlock()
|
||||
|
||||
return node, nil
|
||||
}
|
||||
}
|
59
selector/strategy_test.go
Normal file
59
selector/strategy_test.go
Normal file
@ -0,0 +1,59 @@
|
||||
package selector
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/micro/go-micro/registry"
|
||||
)
|
||||
|
||||
func TestStrategies(t *testing.T) {
|
||||
testData := []*registry.Service{
|
||||
®istry.Service{
|
||||
Name: "test1",
|
||||
Version: "latest",
|
||||
Nodes: []*registry.Node{
|
||||
®istry.Node{
|
||||
Id: "test1-1",
|
||||
Address: "10.0.0.1",
|
||||
Port: 1001,
|
||||
},
|
||||
®istry.Node{
|
||||
Id: "test1-2",
|
||||
Address: "10.0.0.2",
|
||||
Port: 1002,
|
||||
},
|
||||
},
|
||||
},
|
||||
®istry.Service{
|
||||
Name: "test1",
|
||||
Version: "default",
|
||||
Nodes: []*registry.Node{
|
||||
®istry.Node{
|
||||
Id: "test1-3",
|
||||
Address: "10.0.0.3",
|
||||
Port: 1003,
|
||||
},
|
||||
®istry.Node{
|
||||
Id: "test1-4",
|
||||
Address: "10.0.0.4",
|
||||
Port: 1004,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for name, strategy := range map[string]Strategy{"random": Random, "roundrobin": RoundRobin} {
|
||||
next := strategy(testData)
|
||||
counts := make(map[string]int)
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
node, err := next()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
counts[node.Id]++
|
||||
}
|
||||
|
||||
t.Logf("%s: %+v\n", name, counts)
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user