From 0a4484b406a3d839c3e7da88df394c4200281dfd Mon Sep 17 00:00:00 2001 From: Asim Date: Tue, 3 May 2016 22:06:19 +0100 Subject: [PATCH] First commit in strategy rework --- selector/cache/cache.go | 38 ++----- selector/default.go | 87 +++++++++++++++ ...andom_selector_test.go => default_test.go} | 8 +- selector/options.go | 18 ++- selector/random/random.go | 9 -- selector/random_selector.go | 104 ------------------ selector/roundrobin/round_robin_selector.go | 98 ----------------- .../roundrobin/round_robin_selector_test.go | 33 ------ selector/selector.go | 9 +- selector/strategy.go | 56 ++++++++++ selector/strategy_test.go | 59 ++++++++++ 11 files changed, 235 insertions(+), 284 deletions(-) create mode 100644 selector/default.go rename selector/{random_selector_test.go => default_test.go} (73%) delete mode 100644 selector/random/random.go delete mode 100644 selector/random_selector.go delete mode 100644 selector/roundrobin/round_robin_selector.go delete mode 100644 selector/roundrobin/round_robin_selector_test.go create mode 100644 selector/strategy.go create mode 100644 selector/strategy_test.go diff --git a/selector/cache/cache.go b/selector/cache/cache.go index d9e5e2b4..15e68b13 100644 --- a/selector/cache/cache.go +++ b/selector/cache/cache.go @@ -2,7 +2,6 @@ package cache import ( "log" - "math/rand" "sync" "time" @@ -33,10 +32,6 @@ var ( DefaultTTL = time.Minute ) -func init() { - rand.Seed(time.Now().UnixNano()) -} - func (c *cacheSelector) quit() bool { select { case <-c.exit: @@ -329,7 +324,10 @@ func (c *cacheSelector) Options() selector.Options { } func (c *cacheSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) { - var sopts selector.SelectOptions + sopts := selector.SelectOptions{ + Strategy: c.so.Strategy, + } + for _, opt := range opts { opt(&sopts) } @@ -352,29 +350,7 @@ func (c *cacheSelector) Select(service string, opts ...selector.SelectOption) (s return nil, selector.ErrNotFound } - var nodes []*registry.Node - - for _, service := range services { - for _, node := range service.Nodes { - nodes = append(nodes, node) - } - } - - if len(nodes) == 0 { - return nil, selector.ErrNotFound - } - - return func() (*registry.Node, error) { - i := rand.Int() - j := i % len(services) - - if len(services[j].Nodes) == 0 { - return nil, selector.ErrNotFound - } - - k := i % len(services[j].Nodes) - return services[j].Nodes[k], nil - }, nil + return sopts.Strategy(services), nil } func (c *cacheSelector) Mark(service string, node *registry.Node, err error) { @@ -407,7 +383,9 @@ func (c *cacheSelector) String() string { } func NewSelector(opts ...selector.Option) selector.Selector { - var sopts selector.Options + sopts := selector.Options{ + Strategy: selector.Random, + } for _, opt := range opts { opt(&sopts) diff --git a/selector/default.go b/selector/default.go new file mode 100644 index 00000000..48a23914 --- /dev/null +++ b/selector/default.go @@ -0,0 +1,87 @@ +package selector + +import ( + "math/rand" + "time" + + "github.com/micro/go-micro/registry" +) + +type defaultSelector struct { + so Options +} + +func init() { + rand.Seed(time.Now().Unix()) +} + +func (r *defaultSelector) Init(opts ...Option) error { + for _, o := range opts { + o(&r.so) + } + return nil +} + +func (r *defaultSelector) Options() Options { + return r.so +} + +func (r *defaultSelector) Select(service string, opts ...SelectOption) (Next, error) { + sopts := SelectOptions{ + Strategy: r.so.Strategy, + } + + for _, opt := range opts { + opt(&sopts) + } + + // get the service + services, err := r.so.Registry.GetService(service) + if err != nil { + return nil, err + } + + // apply the filters + for _, filter := range sopts.Filters { + services = filter(services) + } + + // if there's nothing left, return + if len(services) == 0 { + return nil, ErrNotFound + } + + return sopts.Strategy(services), nil +} + +func (r *defaultSelector) Mark(service string, node *registry.Node, err error) { + return +} + +func (r *defaultSelector) Reset(service string) { + return +} + +func (r *defaultSelector) Close() error { + return nil +} + +func (r *defaultSelector) String() string { + return "default" +} + +func newDefaultSelector(opts ...Option) Selector { + sopts := Options{ + Strategy: Random, + } + + for _, opt := range opts { + opt(&sopts) + } + + if sopts.Registry == nil { + sopts.Registry = registry.DefaultRegistry + } + + return &defaultSelector{sopts} +} diff --git a/selector/random_selector_test.go b/selector/default_test.go similarity index 73% rename from selector/random_selector_test.go rename to selector/default_test.go index 07115936..813ca48f 100644 --- a/selector/random_selector_test.go +++ b/selector/default_test.go @@ -9,15 +9,11 @@ import ( func TestRandomSelector(t *testing.T) { counts := map[string]int{} - rs := &randomSelector{ - so: Options{ - Registry: mock.NewRegistry(), - }, - } + rs := newDefaultSelector(Registry(mock.NewRegistry())) next, err := rs.Select("foo") if err != nil { - t.Errorf("Unexpected error calling random select: %v", err) + t.Errorf("Unexpected error calling default select: %v", err) } for i := 0; i < 100; i++ { diff --git a/selector/options.go b/selector/options.go index 47bc81c0..32ba1059 100644 --- a/selector/options.go +++ b/selector/options.go @@ -8,6 +8,7 @@ import ( type Options struct { Registry registry.Registry + Strategy Strategy // Other options for implementations of the interface // can be stored in a context @@ -15,7 +16,8 @@ type Options struct { } type SelectOptions struct { - Filters []Filter + Filters []Filter + Strategy Strategy // Other options for implementations of the interface // can be stored in a context @@ -35,6 +37,13 @@ func Registry(r registry.Registry) Option { } } +// SetStrategy sets the default strategy for the selector +func SetStrategy(fn Strategy) Option { + return func(o *Options) { + o.Strategy = fn + } +} + // WithFilter adds a filter function to the list of filters // used during the Select call. func WithFilter(fn ...Filter) SelectOption { @@ -42,3 +51,10 @@ func WithFilter(fn ...Filter) SelectOption { o.Filters = append(o.Filters, fn...) } } + +// Strategy sets the selector strategy +func WithStrategy(fn Strategy) SelectOption { + return func(o *SelectOptions) { + o.Strategy = fn + } +} diff --git a/selector/random/random.go b/selector/random/random.go deleted file mode 100644 index c30f3b53..00000000 --- a/selector/random/random.go +++ /dev/null @@ -1,9 +0,0 @@ -package random - -import ( - "github.com/micro/go-micro/selector" -) - -func NewSelector(opts ...selector.Option) selector.Selector { - return selector.NewSelector(opts...) -} diff --git a/selector/random_selector.go b/selector/random_selector.go deleted file mode 100644 index dd3babca..00000000 --- a/selector/random_selector.go +++ /dev/null @@ -1,104 +0,0 @@ -package selector - -import ( - "math/rand" - "time" - - "github.com/micro/go-micro/registry" -) - -type randomSelector struct { - so Options -} - -func init() { - rand.Seed(time.Now().Unix()) -} - -func (r *randomSelector) Init(opts ...Option) error { - for _, o := range opts { - o(&r.so) - } - return nil -} - -func (r *randomSelector) Options() Options { - return r.so -} - -func (r *randomSelector) Select(service string, opts ...SelectOption) (Next, error) { - var sopts SelectOptions - for _, opt := range opts { - opt(&sopts) - } - - // get the service - services, err := r.so.Registry.GetService(service) - if err != nil { - return nil, err - } - - // apply the filters - for _, filter := range sopts.Filters { - services = filter(services) - } - - // if there's nothing left, return - if len(services) == 0 { - return nil, ErrNotFound - } - - var nodes []*registry.Node - - for _, service := range services { - for _, node := range service.Nodes { - nodes = append(nodes, node) - } - } - - if len(nodes) == 0 { - return nil, ErrNotFound - } - - return func() (*registry.Node, error) { - i := rand.Int() - j := i % len(services) - - if len(services[j].Nodes) == 0 { - return nil, ErrNotFound - } - - k := i % len(services[j].Nodes) - return services[j].Nodes[k], nil - }, nil -} - -func (r *randomSelector) Mark(service string, node *registry.Node, err error) { - return -} - -func (r *randomSelector) Reset(service string) { - return -} - -func (r *randomSelector) Close() error { - return nil -} - -func (r *randomSelector) String() string { - return "random" -} - -func newRandomSelector(opts ...Option) Selector { - var sopts Options - - for _, opt := range opts { - opt(&sopts) - } - - if sopts.Registry == nil { - sopts.Registry = registry.DefaultRegistry - } - - return &randomSelector{sopts} -} diff --git a/selector/roundrobin/round_robin_selector.go b/selector/roundrobin/round_robin_selector.go deleted file mode 100644 index 2d1a1365..00000000 --- a/selector/roundrobin/round_robin_selector.go +++ /dev/null @@ -1,98 +0,0 @@ -package roundrobin - -import ( - "sync" - - "github.com/micro/go-micro/registry" - "github.com/micro/go-micro/selector" -) - -type roundRobinSelector struct { - so selector.Options -} - -func (r *roundRobinSelector) Init(opts ...selector.Option) error { - for _, o := range opts { - o(&r.so) - } - return nil -} - -func (r *roundRobinSelector) Options() selector.Options { - return r.so -} - -func (r *roundRobinSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) { - var sopts selector.SelectOptions - for _, opt := range opts { - opt(&sopts) - } - - // get the service - services, err := r.so.Registry.GetService(service) - if err != nil { - return nil, err - } - - // apply the filters - for _, filter := range sopts.Filters { - services = filter(services) - } - - // if there's nothing left, return - if len(services) == 0 { - return nil, selector.ErrNotFound - } - - var nodes []*registry.Node - - for _, service := range services { - for _, node := range service.Nodes { - nodes = append(nodes, node) - } - } - - if len(nodes) == 0 { - return nil, selector.ErrNotFound - } - - var i int - var mtx sync.Mutex - - return func() (*registry.Node, error) { - mtx.Lock() - defer mtx.Unlock() - i++ - return nodes[i%len(nodes)], nil - }, nil -} - -func (r *roundRobinSelector) Mark(service string, node *registry.Node, err error) { - return -} - -func (r *roundRobinSelector) Reset(service string) { - return -} - -func (r *roundRobinSelector) Close() error { - return nil -} - -func (r *roundRobinSelector) String() string { - return "roundrobin" -} - -func NewSelector(opts ...selector.Option) selector.Selector { - var sopts selector.Options - - for _, opt := range opts { - opt(&sopts) - } - - if sopts.Registry == nil { - sopts.Registry = registry.DefaultRegistry - } - - return &roundRobinSelector{sopts} -} diff --git a/selector/roundrobin/round_robin_selector_test.go b/selector/roundrobin/round_robin_selector_test.go deleted file mode 100644 index f02babe7..00000000 --- a/selector/roundrobin/round_robin_selector_test.go +++ /dev/null @@ -1,33 +0,0 @@ -package roundrobin - -import ( - "testing" - - "github.com/micro/go-micro/registry/mock" - "github.com/micro/go-micro/selector" -) - -func TestRoundRobinSelector(t *testing.T) { - counts := map[string]int{} - - rr := &roundRobinSelector{ - so: selector.Options{ - Registry: mock.NewRegistry(), - }, - } - - next, err := rr.Select("foo") - if err != nil { - t.Errorf("Unexpected error calling rr select: %v", err) - } - - for i := 0; i < 100; i++ { - node, err := next() - if err != nil { - t.Errorf("Expected node err, got err: %v", err) - } - counts[node.Id]++ - } - - t.Logf("Round Robin Counts %v", counts) -} diff --git a/selector/selector.go b/selector/selector.go index 42ccd300..b65256d7 100644 --- a/selector/selector.go +++ b/selector/selector.go @@ -79,19 +79,22 @@ type Selector interface { } // Next is a function that returns the next node -// based on the selector's algorithm +// based on the selector's strategy type Next func() (*registry.Node, error) // Filter is used to filter a service during the selection process type Filter func([]*registry.Service) []*registry.Service +// Strategy is a selection strategy e.g random, round robin +type Strategy func([]*registry.Service) Next + var ( - DefaultSelector = newRandomSelector() + DefaultSelector = newDefaultSelector() ErrNotFound = errors.New("not found") ErrNoneAvailable = errors.New("none available") ) func NewSelector(opts ...Option) Selector { - return newRandomSelector(opts...) + return newDefaultSelector(opts...) } diff --git a/selector/strategy.go b/selector/strategy.go new file mode 100644 index 00000000..639218cc --- /dev/null +++ b/selector/strategy.go @@ -0,0 +1,56 @@ +package selector + +import ( + "math/rand" + "sync" + "time" + + "github.com/micro/go-micro/registry" +) + +func init() { + rand.Seed(time.Now().UnixNano()) +} + +// Random is a random strategy algorithm for node selection +func Random(services []*registry.Service) Next { + var nodes []*registry.Node + + for _, service := range services { + nodes = append(nodes, service.Nodes...) + } + + return func() (*registry.Node, error) { + if len(nodes) == 0 { + return nil, ErrNotFound + } + + i := rand.Int() % len(nodes) + return nodes[i], nil + } +} + +// RoundRobin is a roundrobin strategy algorithm for node selection +func RoundRobin(services []*registry.Service) Next { + var nodes []*registry.Node + + for _, service := range services { + nodes = append(nodes, service.Nodes...) + } + + var i int + var mtx sync.Mutex + + return func() (*registry.Node, error) { + if len(nodes) == 0 { + return nil, ErrNotFound + } + + mtx.Lock() + node := nodes[i%len(nodes)] + i++ + mtx.Unlock() + + return node, nil + } +} diff --git a/selector/strategy_test.go b/selector/strategy_test.go new file mode 100644 index 00000000..17090e73 --- /dev/null +++ b/selector/strategy_test.go @@ -0,0 +1,59 @@ +package selector + +import ( + "testing" + + "github.com/micro/go-micro/registry" +) + +func TestStrategies(t *testing.T) { + testData := []*registry.Service{ + ®istry.Service{ + Name: "test1", + Version: "latest", + Nodes: []*registry.Node{ + ®istry.Node{ + Id: "test1-1", + Address: "10.0.0.1", + Port: 1001, + }, + ®istry.Node{ + Id: "test1-2", + Address: "10.0.0.2", + Port: 1002, + }, + }, + }, + ®istry.Service{ + Name: "test1", + Version: "default", + Nodes: []*registry.Node{ + ®istry.Node{ + Id: "test1-3", + Address: "10.0.0.3", + Port: 1003, + }, + ®istry.Node{ + Id: "test1-4", + Address: "10.0.0.4", + Port: 1004, + }, + }, + }, + } + + for name, strategy := range map[string]Strategy{"random": Random, "roundrobin": RoundRobin} { + next := strategy(testData) + counts := make(map[string]int) + + for i := 0; i < 100; i++ { + node, err := next() + if err != nil { + t.Fatal(err) + } + counts[node.Id]++ + } + + t.Logf("%s: %+v\n", name, counts) + } +}