From 0a4484b406a3d839c3e7da88df394c4200281dfd Mon Sep 17 00:00:00 2001 From: Asim Date: Tue, 3 May 2016 22:06:19 +0100 Subject: [PATCH 1/9] First commit in strategy rework --- selector/cache/cache.go | 38 ++----- selector/default.go | 87 +++++++++++++++ ...andom_selector_test.go => default_test.go} | 8 +- selector/options.go | 18 ++- selector/random/random.go | 9 -- selector/random_selector.go | 104 ------------------ selector/roundrobin/round_robin_selector.go | 98 ----------------- .../roundrobin/round_robin_selector_test.go | 33 ------ selector/selector.go | 9 +- selector/strategy.go | 56 ++++++++++ selector/strategy_test.go | 59 ++++++++++ 11 files changed, 235 insertions(+), 284 deletions(-) create mode 100644 selector/default.go rename selector/{random_selector_test.go => default_test.go} (73%) delete mode 100644 selector/random/random.go delete mode 100644 selector/random_selector.go delete mode 100644 selector/roundrobin/round_robin_selector.go delete mode 100644 selector/roundrobin/round_robin_selector_test.go create mode 100644 selector/strategy.go create mode 100644 selector/strategy_test.go diff --git a/selector/cache/cache.go b/selector/cache/cache.go index d9e5e2b4..15e68b13 100644 --- a/selector/cache/cache.go +++ b/selector/cache/cache.go @@ -2,7 +2,6 @@ package cache import ( "log" - "math/rand" "sync" "time" @@ -33,10 +32,6 @@ var ( DefaultTTL = time.Minute ) -func init() { - rand.Seed(time.Now().UnixNano()) -} - func (c *cacheSelector) quit() bool { select { case <-c.exit: @@ -329,7 +324,10 @@ func (c *cacheSelector) Options() selector.Options { } func (c *cacheSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) { - var sopts selector.SelectOptions + sopts := selector.SelectOptions{ + Strategy: c.so.Strategy, + } + for _, opt := range opts { opt(&sopts) } @@ -352,29 +350,7 @@ func (c *cacheSelector) Select(service string, opts ...selector.SelectOption) (s return nil, selector.ErrNotFound } - var nodes []*registry.Node - - for _, service := range services { - for _, node := range service.Nodes { - nodes = append(nodes, node) - } - } - - if len(nodes) == 0 { - return nil, selector.ErrNotFound - } - - return func() (*registry.Node, error) { - i := rand.Int() - j := i % len(services) - - if len(services[j].Nodes) == 0 { - return nil, selector.ErrNotFound - } - - k := i % len(services[j].Nodes) - return services[j].Nodes[k], nil - }, nil + return sopts.Strategy(services), nil } func (c *cacheSelector) Mark(service string, node *registry.Node, err error) { @@ -407,7 +383,9 @@ func (c *cacheSelector) String() string { } func NewSelector(opts ...selector.Option) selector.Selector { - var sopts selector.Options + sopts := selector.Options{ + Strategy: selector.Random, + } for _, opt := range opts { opt(&sopts) diff --git a/selector/default.go b/selector/default.go new file mode 100644 index 00000000..48a23914 --- /dev/null +++ b/selector/default.go @@ -0,0 +1,87 @@ +package selector + +import ( + "math/rand" + "time" + + "github.com/micro/go-micro/registry" +) + +type defaultSelector struct { + so Options +} + +func init() { + rand.Seed(time.Now().Unix()) +} + +func (r *defaultSelector) Init(opts ...Option) error { + for _, o := range opts { + o(&r.so) + } + return nil +} + +func (r *defaultSelector) Options() Options { + return r.so +} + +func (r *defaultSelector) Select(service string, opts ...SelectOption) (Next, error) { + sopts := SelectOptions{ + Strategy: r.so.Strategy, + } + + for _, opt := range opts { + opt(&sopts) + } + + // get the service + services, err := r.so.Registry.GetService(service) + if err != nil { + return nil, err + } + + // apply the filters + for _, filter := range sopts.Filters { + services = filter(services) + } + + // if there's nothing left, return + if len(services) == 0 { + return nil, ErrNotFound + } + + return sopts.Strategy(services), nil +} + +func (r *defaultSelector) Mark(service string, node *registry.Node, err error) { + return +} + +func (r *defaultSelector) Reset(service string) { + return +} + +func (r *defaultSelector) Close() error { + return nil +} + +func (r *defaultSelector) String() string { + return "default" +} + +func newDefaultSelector(opts ...Option) Selector { + sopts := Options{ + Strategy: Random, + } + + for _, opt := range opts { + opt(&sopts) + } + + if sopts.Registry == nil { + sopts.Registry = registry.DefaultRegistry + } + + return &defaultSelector{sopts} +} diff --git a/selector/random_selector_test.go b/selector/default_test.go similarity index 73% rename from selector/random_selector_test.go rename to selector/default_test.go index 07115936..813ca48f 100644 --- a/selector/random_selector_test.go +++ b/selector/default_test.go @@ -9,15 +9,11 @@ import ( func TestRandomSelector(t *testing.T) { counts := map[string]int{} - rs := &randomSelector{ - so: Options{ - Registry: mock.NewRegistry(), - }, - } + rs := newDefaultSelector(Registry(mock.NewRegistry())) next, err := rs.Select("foo") if err != nil { - t.Errorf("Unexpected error calling random select: %v", err) + t.Errorf("Unexpected error calling default select: %v", err) } for i := 0; i < 100; i++ { diff --git a/selector/options.go b/selector/options.go index 47bc81c0..32ba1059 100644 --- a/selector/options.go +++ b/selector/options.go @@ -8,6 +8,7 @@ import ( type Options struct { Registry registry.Registry + Strategy Strategy // Other options for implementations of the interface // can be stored in a context @@ -15,7 +16,8 @@ type Options struct { } type SelectOptions struct { - Filters []Filter + Filters []Filter + Strategy Strategy // Other options for implementations of the interface // can be stored in a context @@ -35,6 +37,13 @@ func Registry(r registry.Registry) Option { } } +// SetStrategy sets the default strategy for the selector +func SetStrategy(fn Strategy) Option { + return func(o *Options) { + o.Strategy = fn + } +} + // WithFilter adds a filter function to the list of filters // used during the Select call. func WithFilter(fn ...Filter) SelectOption { @@ -42,3 +51,10 @@ func WithFilter(fn ...Filter) SelectOption { o.Filters = append(o.Filters, fn...) } } + +// Strategy sets the selector strategy +func WithStrategy(fn Strategy) SelectOption { + return func(o *SelectOptions) { + o.Strategy = fn + } +} diff --git a/selector/random/random.go b/selector/random/random.go deleted file mode 100644 index c30f3b53..00000000 --- a/selector/random/random.go +++ /dev/null @@ -1,9 +0,0 @@ -package random - -import ( - "github.com/micro/go-micro/selector" -) - -func NewSelector(opts ...selector.Option) selector.Selector { - return selector.NewSelector(opts...) -} diff --git a/selector/random_selector.go b/selector/random_selector.go deleted file mode 100644 index dd3babca..00000000 --- a/selector/random_selector.go +++ /dev/null @@ -1,104 +0,0 @@ -package selector - -import ( - "math/rand" - "time" - - "github.com/micro/go-micro/registry" -) - -type randomSelector struct { - so Options -} - -func init() { - rand.Seed(time.Now().Unix()) -} - -func (r *randomSelector) Init(opts ...Option) error { - for _, o := range opts { - o(&r.so) - } - return nil -} - -func (r *randomSelector) Options() Options { - return r.so -} - -func (r *randomSelector) Select(service string, opts ...SelectOption) (Next, error) { - var sopts SelectOptions - for _, opt := range opts { - opt(&sopts) - } - - // get the service - services, err := r.so.Registry.GetService(service) - if err != nil { - return nil, err - } - - // apply the filters - for _, filter := range sopts.Filters { - services = filter(services) - } - - // if there's nothing left, return - if len(services) == 0 { - return nil, ErrNotFound - } - - var nodes []*registry.Node - - for _, service := range services { - for _, node := range service.Nodes { - nodes = append(nodes, node) - } - } - - if len(nodes) == 0 { - return nil, ErrNotFound - } - - return func() (*registry.Node, error) { - i := rand.Int() - j := i % len(services) - - if len(services[j].Nodes) == 0 { - return nil, ErrNotFound - } - - k := i % len(services[j].Nodes) - return services[j].Nodes[k], nil - }, nil -} - -func (r *randomSelector) Mark(service string, node *registry.Node, err error) { - return -} - -func (r *randomSelector) Reset(service string) { - return -} - -func (r *randomSelector) Close() error { - return nil -} - -func (r *randomSelector) String() string { - return "random" -} - -func newRandomSelector(opts ...Option) Selector { - var sopts Options - - for _, opt := range opts { - opt(&sopts) - } - - if sopts.Registry == nil { - sopts.Registry = registry.DefaultRegistry - } - - return &randomSelector{sopts} -} diff --git a/selector/roundrobin/round_robin_selector.go b/selector/roundrobin/round_robin_selector.go deleted file mode 100644 index 2d1a1365..00000000 --- a/selector/roundrobin/round_robin_selector.go +++ /dev/null @@ -1,98 +0,0 @@ -package roundrobin - -import ( - "sync" - - "github.com/micro/go-micro/registry" - "github.com/micro/go-micro/selector" -) - -type roundRobinSelector struct { - so selector.Options -} - -func (r *roundRobinSelector) Init(opts ...selector.Option) error { - for _, o := range opts { - o(&r.so) - } - return nil -} - -func (r *roundRobinSelector) Options() selector.Options { - return r.so -} - -func (r *roundRobinSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) { - var sopts selector.SelectOptions - for _, opt := range opts { - opt(&sopts) - } - - // get the service - services, err := r.so.Registry.GetService(service) - if err != nil { - return nil, err - } - - // apply the filters - for _, filter := range sopts.Filters { - services = filter(services) - } - - // if there's nothing left, return - if len(services) == 0 { - return nil, selector.ErrNotFound - } - - var nodes []*registry.Node - - for _, service := range services { - for _, node := range service.Nodes { - nodes = append(nodes, node) - } - } - - if len(nodes) == 0 { - return nil, selector.ErrNotFound - } - - var i int - var mtx sync.Mutex - - return func() (*registry.Node, error) { - mtx.Lock() - defer mtx.Unlock() - i++ - return nodes[i%len(nodes)], nil - }, nil -} - -func (r *roundRobinSelector) Mark(service string, node *registry.Node, err error) { - return -} - -func (r *roundRobinSelector) Reset(service string) { - return -} - -func (r *roundRobinSelector) Close() error { - return nil -} - -func (r *roundRobinSelector) String() string { - return "roundrobin" -} - -func NewSelector(opts ...selector.Option) selector.Selector { - var sopts selector.Options - - for _, opt := range opts { - opt(&sopts) - } - - if sopts.Registry == nil { - sopts.Registry = registry.DefaultRegistry - } - - return &roundRobinSelector{sopts} -} diff --git a/selector/roundrobin/round_robin_selector_test.go b/selector/roundrobin/round_robin_selector_test.go deleted file mode 100644 index f02babe7..00000000 --- a/selector/roundrobin/round_robin_selector_test.go +++ /dev/null @@ -1,33 +0,0 @@ -package roundrobin - -import ( - "testing" - - "github.com/micro/go-micro/registry/mock" - "github.com/micro/go-micro/selector" -) - -func TestRoundRobinSelector(t *testing.T) { - counts := map[string]int{} - - rr := &roundRobinSelector{ - so: selector.Options{ - Registry: mock.NewRegistry(), - }, - } - - next, err := rr.Select("foo") - if err != nil { - t.Errorf("Unexpected error calling rr select: %v", err) - } - - for i := 0; i < 100; i++ { - node, err := next() - if err != nil { - t.Errorf("Expected node err, got err: %v", err) - } - counts[node.Id]++ - } - - t.Logf("Round Robin Counts %v", counts) -} diff --git a/selector/selector.go b/selector/selector.go index 42ccd300..b65256d7 100644 --- a/selector/selector.go +++ b/selector/selector.go @@ -79,19 +79,22 @@ type Selector interface { } // Next is a function that returns the next node -// based on the selector's algorithm +// based on the selector's strategy type Next func() (*registry.Node, error) // Filter is used to filter a service during the selection process type Filter func([]*registry.Service) []*registry.Service +// Strategy is a selection strategy e.g random, round robin +type Strategy func([]*registry.Service) Next + var ( - DefaultSelector = newRandomSelector() + DefaultSelector = newDefaultSelector() ErrNotFound = errors.New("not found") ErrNoneAvailable = errors.New("none available") ) func NewSelector(opts ...Option) Selector { - return newRandomSelector(opts...) + return newDefaultSelector(opts...) } diff --git a/selector/strategy.go b/selector/strategy.go new file mode 100644 index 00000000..639218cc --- /dev/null +++ b/selector/strategy.go @@ -0,0 +1,56 @@ +package selector + +import ( + "math/rand" + "sync" + "time" + + "github.com/micro/go-micro/registry" +) + +func init() { + rand.Seed(time.Now().UnixNano()) +} + +// Random is a random strategy algorithm for node selection +func Random(services []*registry.Service) Next { + var nodes []*registry.Node + + for _, service := range services { + nodes = append(nodes, service.Nodes...) + } + + return func() (*registry.Node, error) { + if len(nodes) == 0 { + return nil, ErrNotFound + } + + i := rand.Int() % len(nodes) + return nodes[i], nil + } +} + +// RoundRobin is a roundrobin strategy algorithm for node selection +func RoundRobin(services []*registry.Service) Next { + var nodes []*registry.Node + + for _, service := range services { + nodes = append(nodes, service.Nodes...) + } + + var i int + var mtx sync.Mutex + + return func() (*registry.Node, error) { + if len(nodes) == 0 { + return nil, ErrNotFound + } + + mtx.Lock() + node := nodes[i%len(nodes)] + i++ + mtx.Unlock() + + return node, nil + } +} diff --git a/selector/strategy_test.go b/selector/strategy_test.go new file mode 100644 index 00000000..17090e73 --- /dev/null +++ b/selector/strategy_test.go @@ -0,0 +1,59 @@ +package selector + +import ( + "testing" + + "github.com/micro/go-micro/registry" +) + +func TestStrategies(t *testing.T) { + testData := []*registry.Service{ + ®istry.Service{ + Name: "test1", + Version: "latest", + Nodes: []*registry.Node{ + ®istry.Node{ + Id: "test1-1", + Address: "10.0.0.1", + Port: 1001, + }, + ®istry.Node{ + Id: "test1-2", + Address: "10.0.0.2", + Port: 1002, + }, + }, + }, + ®istry.Service{ + Name: "test1", + Version: "default", + Nodes: []*registry.Node{ + ®istry.Node{ + Id: "test1-3", + Address: "10.0.0.3", + Port: 1003, + }, + ®istry.Node{ + Id: "test1-4", + Address: "10.0.0.4", + Port: 1004, + }, + }, + }, + } + + for name, strategy := range map[string]Strategy{"random": Random, "roundrobin": RoundRobin} { + next := strategy(testData) + counts := make(map[string]int) + + for i := 0; i < 100; i++ { + node, err := next() + if err != nil { + t.Fatal(err) + } + counts[node.Id]++ + } + + t.Logf("%s: %+v\n", name, counts) + } +} From 818d500f98a781fc8cab7cc34a7dc084c5b3970c Mon Sep 17 00:00:00 2001 From: Asim Date: Tue, 3 May 2016 22:11:42 +0100 Subject: [PATCH 2/9] Fix compile error --- cmd/cmd.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/cmd/cmd.go b/cmd/cmd.go index 5c3ce99f..47b43f21 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -26,8 +26,6 @@ import ( "github.com/micro/go-micro/selector" "github.com/micro/go-micro/selector/blacklist" "github.com/micro/go-micro/selector/cache" - "github.com/micro/go-micro/selector/random" - "github.com/micro/go-micro/selector/roundrobin" // transports "github.com/micro/go-micro/transport" @@ -119,7 +117,7 @@ var ( cli.StringFlag{ Name: "selector", EnvVar: "MICRO_SELECTOR", - Usage: "Selector used to pick nodes for querying. random, roundrobin, blacklist", + Usage: "Selector used to pick nodes for querying", }, cli.StringFlag{ Name: "transport", @@ -144,10 +142,8 @@ var ( } DefaultSelectors = map[string]func(...selector.Option) selector.Selector{ - "cache": cache.NewSelector, - "random": random.NewSelector, - "roundrobin": roundrobin.NewSelector, - "blacklist": blacklist.NewSelector, + "cache": cache.NewSelector, + "blacklist": blacklist.NewSelector, } DefaultTransports = map[string]func(...transport.Option) transport.Transport{ From 670ed74a13e9776e4e9709262d61ee4885d36144 Mon Sep 17 00:00:00 2001 From: Asim Date: Thu, 5 May 2016 21:12:59 +0100 Subject: [PATCH 3/9] Use del method --- selector/cache/cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/selector/cache/cache.go b/selector/cache/cache.go index 15e68b13..29aa12c2 100644 --- a/selector/cache/cache.go +++ b/selector/cache/cache.go @@ -359,7 +359,7 @@ func (c *cacheSelector) Mark(service string, node *registry.Node, err error) { func (c *cacheSelector) Reset(service string) { c.Lock() - delete(c.cache, service) + c.del(service) c.Unlock() } From 6070e235ebe477e07fc99f1592747475a3cfc295 Mon Sep 17 00:00:00 2001 From: Asim Date: Thu, 5 May 2016 21:13:07 +0100 Subject: [PATCH 4/9] remove comment --- selector/selector.go | 49 -------------------------------------------- 1 file changed, 49 deletions(-) diff --git a/selector/selector.go b/selector/selector.go index b65256d7..e7dd8996 100644 --- a/selector/selector.go +++ b/selector/selector.go @@ -3,55 +3,6 @@ The Selector package provides a way to algorithmically filter and return nodes required by the client or any other system. Selector's implemented by Micro build on the registry but it's of optional use. One could provide a static Selector that has a fixed pool. - - func (r *randomSelector) Select(service string, opts ...SelectOption) (Next, error) { - var sopts SelectOptions - for _, opt := range opts { - opt(&sopts) - } - - // get the service - services, err := r.so.Registry.GetService(service) - if err != nil { - return nil, err - } - - // apply the filters - for _, filter := range sopts.Filters { - services = filter(services) - } - - // if there's nothing left, return - if len(services) == 0 { - return nil, ErrNotFound - } - - var nodes []*registry.Node - - for _, service := range services { - for _, node := range service.Nodes { - nodes = append(nodes, node) - } - } - - if len(nodes) == 0 { - return nil, ErrNotFound - } - - return func() (*registry.Node, error) { - i := rand.Int() - j := i % len(services) - - if len(services[j].Nodes) == 0 { - return nil, ErrNotFound - } - - k := i % len(services[j].Nodes) - return services[j].Nodes[k], nil - }, nil - } - - */ package selector From ba391e228c3a51bb821cf584abe41694b41659c8 Mon Sep 17 00:00:00 2001 From: Asim Date: Thu, 5 May 2016 21:14:57 +0100 Subject: [PATCH 5/9] Add default to selector list --- cmd/cmd.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/cmd.go b/cmd/cmd.go index 47b43f21..c8c2c820 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -142,6 +142,7 @@ var ( } DefaultSelectors = map[string]func(...selector.Option) selector.Selector{ + "default": selector.NewSelector, "cache": cache.NewSelector, "blacklist": blacklist.NewSelector, } @@ -153,7 +154,7 @@ var ( // used for default selection as the fall back defaultBroker = "http" defaultRegistry = "consul" - defaultSelector = "random" + defaultSelector = "default" defaultTransport = "http" ) From 77e4d4d9c4947fbf82b3838315d51f22c109871c Mon Sep 17 00:00:00 2001 From: Asim Date: Fri, 6 May 2016 23:12:37 +0100 Subject: [PATCH 6/9] Next phase of selector --- selector/blacklist/black_list_selector.go | 182 ------------------ .../blacklist/black_list_selector_test.go | 73 ------- selector/default.go | 45 ++++- selector/internal/blacklist/blacklist.go | 164 ++++++++++++++++ selector/internal/blacklist/blacklist_test.go | 107 ++++++++++ 5 files changed, 312 insertions(+), 259 deletions(-) delete mode 100644 selector/blacklist/black_list_selector.go delete mode 100644 selector/blacklist/black_list_selector_test.go create mode 100644 selector/internal/blacklist/blacklist.go create mode 100644 selector/internal/blacklist/blacklist_test.go diff --git a/selector/blacklist/black_list_selector.go b/selector/blacklist/black_list_selector.go deleted file mode 100644 index ff7e2b19..00000000 --- a/selector/blacklist/black_list_selector.go +++ /dev/null @@ -1,182 +0,0 @@ -package blacklist - -import ( - "math/rand" - "sync" - "time" - - "github.com/micro/go-micro/registry" - "github.com/micro/go-micro/selector" -) - -type blackListNode struct { - age time.Time - id string - service string -} - -type blackListSelector struct { - so selector.Options - ttl int64 - exit chan bool - once sync.Once - - sync.RWMutex - bl map[string]blackListNode -} - -func init() { - rand.Seed(time.Now().Unix()) -} - -func (r *blackListSelector) purge() { - now := time.Now() - r.Lock() - for k, v := range r.bl { - if d := v.age.Sub(now); d.Seconds() < 0 { - delete(r.bl, k) - } - } - r.Unlock() -} - -func (r *blackListSelector) run() { - t := time.NewTicker(time.Duration(r.ttl) * time.Second) - - for { - select { - case <-r.exit: - t.Stop() - return - case <-t.C: - r.purge() - } - } -} - -func (r *blackListSelector) Init(opts ...selector.Option) error { - for _, o := range opts { - o(&r.so) - } - return nil -} - -func (r *blackListSelector) Options() selector.Options { - return r.so -} - -func (r *blackListSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) { - var sopts selector.SelectOptions - for _, opt := range opts { - opt(&sopts) - } - - // get the service - services, err := r.so.Registry.GetService(service) - if err != nil { - return nil, err - } - - // apply the filters - for _, filter := range sopts.Filters { - services = filter(services) - } - - // if there's nothing left, return - if len(services) == 0 { - return nil, selector.ErrNotFound - } - - var nodes []*registry.Node - - for _, service := range services { - for _, node := range service.Nodes { - nodes = append(nodes, node) - } - } - - if len(nodes) == 0 { - return nil, selector.ErrNotFound - } - - return func() (*registry.Node, error) { - var viable []*registry.Node - - r.RLock() - for _, node := range nodes { - if _, ok := r.bl[node.Id]; !ok { - viable = append(viable, node) - } - } - r.RUnlock() - - if len(viable) == 0 { - return nil, selector.ErrNoneAvailable - } - - return viable[rand.Int()%len(viable)], nil - }, nil -} - -func (r *blackListSelector) Mark(service string, node *registry.Node, err error) { - r.Lock() - defer r.Unlock() - if err == nil { - delete(r.bl, node.Id) - return - } - - r.bl[node.Id] = blackListNode{ - age: time.Now().Add(time.Duration(r.ttl) * time.Second), - id: node.Id, - service: service, - } - return -} - -func (r *blackListSelector) Reset(service string) { - r.Lock() - defer r.Unlock() - for k, v := range r.bl { - if v.service == service { - delete(r.bl, k) - } - } - return -} - -func (r *blackListSelector) Close() error { - r.once.Do(func() { - close(r.exit) - }) - return nil -} - -func (r *blackListSelector) String() string { - return "blacklist" -} - -func NewSelector(opts ...selector.Option) selector.Selector { - var sopts selector.Options - - for _, opt := range opts { - opt(&sopts) - } - - if sopts.Registry == nil { - sopts.Registry = registry.DefaultRegistry - } - - var once sync.Once - bl := &blackListSelector{ - once: once, - so: sopts, - ttl: 60, - bl: make(map[string]blackListNode), - exit: make(chan bool), - } - - go bl.run() - - return bl -} diff --git a/selector/blacklist/black_list_selector_test.go b/selector/blacklist/black_list_selector_test.go deleted file mode 100644 index a9fefc75..00000000 --- a/selector/blacklist/black_list_selector_test.go +++ /dev/null @@ -1,73 +0,0 @@ -package blacklist - -import ( - "errors" - "testing" - "time" - - "github.com/micro/go-micro/registry/mock" - "github.com/micro/go-micro/selector" -) - -func TestBlackListSelector(t *testing.T) { - counts := map[string]int{} - - bl := &blackListSelector{ - so: selector.Options{ - Registry: mock.NewRegistry(), - }, - ttl: 2, - bl: make(map[string]blackListNode), - exit: make(chan bool), - } - - go bl.run() - defer bl.Close() - - next, err := bl.Select("foo") - if err != nil { - t.Errorf("Unexpected error calling bl select: %v", err) - } - - for i := 0; i < 100; i++ { - node, err := next() - if err != nil { - t.Errorf("Expected node err, got err: %v", err) - } - counts[node.Id]++ - } - - t.Logf("BlackList Counts %v", counts) - - // test blacklisting - for i := 0; i < 4; i++ { - node, err := next() - if err != nil { - t.Errorf("Expected node err, got err: %v", err) - } - bl.Mark("foo", node, errors.New("blacklist")) - } - if node, err := next(); err != selector.ErrNoneAvailable { - t.Errorf("Expected none available err, got node %v err %v", node, err) - } - time.Sleep(time.Second * time.Duration(bl.ttl) * 2) - if _, err := next(); err != nil { - t.Errorf("Unexpected err %v", err) - } - - // test resetting - for i := 0; i < 4; i++ { - node, err := next() - if err != nil { - t.Errorf("Unexpected err: %v", err) - } - bl.Mark("foo", node, errors.New("blacklist")) - } - if node, err := next(); err != selector.ErrNoneAvailable { - t.Errorf("Expected none available err, got node %v err %v", node, err) - } - bl.Reset("foo") - if _, err := next(); err != nil { - t.Errorf("Unexpected err %v", err) - } -} diff --git a/selector/default.go b/selector/default.go index 48a23914..7f12e127 100644 --- a/selector/default.go +++ b/selector/default.go @@ -5,16 +5,33 @@ import ( "time" "github.com/micro/go-micro/registry" + "github.com/micro/go-micro/selector/internal/blacklist" ) type defaultSelector struct { - so Options + so Options + exit chan bool + bl *blacklist.BlackList } func init() { rand.Seed(time.Now().Unix()) } +func (r *defaultSelector) run() { + t := time.NewTicker(time.Second * 30) + + for { + select { + case <-t.C: + // TODO + case <-r.exit: + t.Stop() + return + } + } +} + func (r *defaultSelector) Init(opts ...Option) error { for _, o := range opts { o(&r.so) @@ -46,6 +63,12 @@ func (r *defaultSelector) Select(service string, opts ...SelectOption) (Next, er services = filter(services) } + // apply the blacklist + services, err = r.bl.Filter(services) + if err != nil { + return nil, err + } + // if there's nothing left, return if len(services) == 0 { return nil, ErrNotFound @@ -55,14 +78,21 @@ func (r *defaultSelector) Select(service string, opts ...SelectOption) (Next, er } func (r *defaultSelector) Mark(service string, node *registry.Node, err error) { - return + r.bl.Mark(service, node, err) } func (r *defaultSelector) Reset(service string) { - return + r.bl.Reset(service) } func (r *defaultSelector) Close() error { + select { + case <-r.exit: + return nil + default: + close(r.exit) + r.bl.Close() + } return nil } @@ -83,5 +113,12 @@ func newDefaultSelector(opts ...Option) Selector { sopts.Registry = registry.DefaultRegistry } - return &defaultSelector{sopts} + se := &defaultSelector{ + so: sopts, + exit: make(chan bool), + bl: blacklist.New(), + } + + go se.run() + return se } diff --git a/selector/internal/blacklist/blacklist.go b/selector/internal/blacklist/blacklist.go new file mode 100644 index 00000000..fc72c059 --- /dev/null +++ b/selector/internal/blacklist/blacklist.go @@ -0,0 +1,164 @@ +package blacklist + +import ( + "math/rand" + "sync" + "time" + + "github.com/micro/go-micro/registry" +) + +type blackListNode struct { + age time.Time + id string + service string + count int +} + +type BlackList struct { + ttl int + exit chan bool + + sync.RWMutex + bl map[string]blackListNode +} + +var ( + // number of times we see an error before blacklisting + count = 3 + + // the ttl to blacklist for + ttl = 30 +) + +func init() { + rand.Seed(time.Now().Unix()) +} + +func (r *BlackList) purge() { + now := time.Now() + r.Lock() + for k, v := range r.bl { + if d := v.age.Sub(now); d.Seconds() < 0 { + delete(r.bl, k) + } + } + r.Unlock() +} + +func (r *BlackList) run() { + t := time.NewTicker(time.Duration(r.ttl) * time.Second) + + for { + select { + case <-r.exit: + t.Stop() + return + case <-t.C: + r.purge() + } + } +} + +func (r *BlackList) Filter(services []*registry.Service) ([]*registry.Service, error) { + var viableServices []*registry.Service + + r.RLock() + + for _, service := range services { + var viableNodes []*registry.Node + + for _, node := range service.Nodes { + n, ok := r.bl[node.Id] + if !ok { + // blacklist miss so add it + viableNodes = append(viableNodes, node) + continue + } + + // got some blacklist info + // skip the node if it exceeds count + if n.count >= count { + continue + } + + // doesn't exceed count, still viable + viableNodes = append(viableNodes, node) + } + + if len(viableNodes) == 0 { + continue + } + + viableService := new(registry.Service) + *viableService = *service + viableService.Nodes = viableNodes + viableServices = append(viableServices, viableService) + } + + r.RUnlock() + + return viableServices, nil +} + +func (r *BlackList) Mark(service string, node *registry.Node, err error) { + r.Lock() + defer r.Unlock() + + // reset when error is nil + // basically closing the circuit + if err == nil { + delete(r.bl, node.Id) + return + } + + n, ok := r.bl[node.Id] + if !ok { + n = blackListNode{ + id: node.Id, + service: service, + } + } + + // mark it + n.count++ + + // set age to ttl seconds in future + n.age = time.Now().Add(time.Duration(r.ttl) * time.Second) + + // save + r.bl[node.Id] = n +} + +func (r *BlackList) Reset(service string) { + r.Lock() + defer r.Unlock() + + for k, v := range r.bl { + // delete every node that matches the service + if v.service == service { + delete(r.bl, k) + } + } +} + +func (r *BlackList) Close() error { + select { + case <-r.exit: + return nil + default: + close(r.exit) + } + return nil +} + +func New() *BlackList { + bl := &BlackList{ + ttl: ttl, + bl: make(map[string]blackListNode), + exit: make(chan bool), + } + + go bl.run() + return bl +} diff --git a/selector/internal/blacklist/blacklist_test.go b/selector/internal/blacklist/blacklist_test.go new file mode 100644 index 00000000..656b8c28 --- /dev/null +++ b/selector/internal/blacklist/blacklist_test.go @@ -0,0 +1,107 @@ +package blacklist + +import ( + "errors" + "testing" + "time" + + "github.com/micro/go-micro/registry" +) + +func TestBlackList(t *testing.T) { + bl := &BlackList{ + ttl: 1, + bl: make(map[string]blackListNode), + exit: make(chan bool), + } + + go bl.run() + defer bl.Close() + + services := []*registry.Service{ + ®istry.Service{ + Name: "foo", + Nodes: []*registry.Node{ + ®istry.Node{ + Id: "foo-1", + Address: "localhost", + Port: 10001, + }, + ®istry.Node{ + Id: "foo-2", + Address: "localhost", + Port: 10002, + }, + ®istry.Node{ + Id: "foo-3", + Address: "localhost", + Port: 10002, + }, + }, + }, + } + + // check nothing is filtered on clean run + filterTest := func() { + for i := 0; i < 3; i++ { + srvs, err := bl.Filter(services) + if err != nil { + t.Fatal(err) + } + + if len(srvs) != len(services) { + t.Fatal("nodes were filtered when they shouldn't be") + } + + for _, node := range srvs[0].Nodes { + var seen bool + for _, n := range srvs[0].Nodes { + if n.Id == node.Id { + seen = true + break + } + } + if !seen { + t.Fatalf("Missing node %s", node.Id) + } + } + } + } + + // run filter test + filterTest() + + blacklistTest := func() { + // test blacklisting + // mark until failure + for i := 0; i < count+1; i++ { + for _, node := range services[0].Nodes { + bl.Mark("foo", node, errors.New("blacklist")) + } + } + + filtered, err := bl.Filter(services) + if err != nil { + t.Fatal(err) + } + + if len(filtered) > 0 { + t.Fatalf("Expected zero nodes got %+v", filtered) + } + } + + // sleep the ttl duration + time.Sleep(time.Second * time.Duration(bl.ttl) * 2) + + // now run filterTest again + filterTest() + + // run the blacklist test + blacklistTest() + + // reset + bl.Reset("foo") + + // check again + filterTest() +} From 8353b7b865faf0c30483620c92d588c162a369d3 Mon Sep 17 00:00:00 2001 From: Asim Date: Fri, 6 May 2016 23:15:40 +0100 Subject: [PATCH 7/9] Add blacklist to cache --- selector/cache/cache.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/selector/cache/cache.go b/selector/cache/cache.go index 29aa12c2..3033650e 100644 --- a/selector/cache/cache.go +++ b/selector/cache/cache.go @@ -7,6 +7,7 @@ import ( "github.com/micro/go-micro/registry" "github.com/micro/go-micro/selector" + "github.com/micro/go-micro/selector/internal/blacklist" ) /* @@ -26,6 +27,9 @@ type cacheSelector struct { // used to close or reload watcher reload chan bool exit chan bool + + // blacklist + bl *blacklist.BlackList } var ( @@ -345,6 +349,11 @@ func (c *cacheSelector) Select(service string, opts ...selector.SelectOption) (s services = filter(services) } + services, err = c.bl.Filter(services) + if err != nil { + return nil, err + } + // if there's nothing left, return if len(services) == 0 { return nil, selector.ErrNotFound @@ -354,13 +363,14 @@ func (c *cacheSelector) Select(service string, opts ...selector.SelectOption) (s } func (c *cacheSelector) Mark(service string, node *registry.Node, err error) { - return + c.bl.Mark(service, node, err) } func (c *cacheSelector) Reset(service string) { c.Lock() c.del(service) c.Unlock() + c.bl.Reset(service) } // Close stops the watcher and destroys the cache @@ -374,6 +384,7 @@ func (c *cacheSelector) Close() error { return nil default: close(c.exit) + c.bl.Close() } return nil } @@ -410,6 +421,7 @@ func NewSelector(opts ...selector.Option) selector.Selector { ttls: make(map[string]time.Time), reload: make(chan bool, 1), exit: make(chan bool), + bl: blacklist.New(), } go c.run() From 63c6e821924eea81a90b667432fd23afb3336c5a Mon Sep 17 00:00:00 2001 From: Asim Date: Fri, 6 May 2016 23:18:47 +0100 Subject: [PATCH 8/9] Strip blacklist --- cmd/cmd.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/cmd/cmd.go b/cmd/cmd.go index c8c2c820..7fbb86c7 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -24,7 +24,6 @@ import ( // selectors "github.com/micro/go-micro/selector" - "github.com/micro/go-micro/selector/blacklist" "github.com/micro/go-micro/selector/cache" // transports @@ -143,8 +142,7 @@ var ( DefaultSelectors = map[string]func(...selector.Option) selector.Selector{ "default": selector.NewSelector, - "cache": cache.NewSelector, - "blacklist": blacklist.NewSelector, + "cache": cache.NewSelector, } DefaultTransports = map[string]func(...transport.Option) transport.Transport{ From 75f2706fd0671e1e55c9468a44ec89b0d178407a Mon Sep 17 00:00:00 2001 From: Asim Date: Sat, 7 May 2016 00:04:08 +0100 Subject: [PATCH 9/9] Use ErrNoneAvailable and test blacklisting --- selector/cache/cache.go | 2 +- selector/default.go | 2 +- selector/default_test.go | 82 +++++++++++++++++++++++++++++++++++++++- selector/strategy.go | 4 +- 4 files changed, 84 insertions(+), 6 deletions(-) diff --git a/selector/cache/cache.go b/selector/cache/cache.go index 3033650e..40a598ed 100644 --- a/selector/cache/cache.go +++ b/selector/cache/cache.go @@ -356,7 +356,7 @@ func (c *cacheSelector) Select(service string, opts ...selector.SelectOption) (s // if there's nothing left, return if len(services) == 0 { - return nil, selector.ErrNotFound + return nil, selector.ErrNoneAvailable } return sopts.Strategy(services), nil diff --git a/selector/default.go b/selector/default.go index 7f12e127..a1c29351 100644 --- a/selector/default.go +++ b/selector/default.go @@ -71,7 +71,7 @@ func (r *defaultSelector) Select(service string, opts ...SelectOption) (Next, er // if there's nothing left, return if len(services) == 0 { - return nil, ErrNotFound + return nil, ErrNoneAvailable } return sopts.Strategy(services), nil diff --git a/selector/default_test.go b/selector/default_test.go index 813ca48f..c735040b 100644 --- a/selector/default_test.go +++ b/selector/default_test.go @@ -1,12 +1,14 @@ package selector import ( + "errors" "testing" + "github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry/mock" ) -func TestRandomSelector(t *testing.T) { +func TestDefaultSelector(t *testing.T) { counts := map[string]int{} rs := newDefaultSelector(Registry(mock.NewRegistry())) @@ -24,5 +26,81 @@ func TestRandomSelector(t *testing.T) { counts[node.Id]++ } - t.Logf("Random Counts %v", counts) + t.Logf("Default Counts %v", counts) +} + +func TestBlackList(t *testing.T) { + r := mock.NewRegistry() + + r.Register(®istry.Service{ + Name: "test", + Nodes: []*registry.Node{ + ®istry.Node{ + Id: "test-1", + Address: "localhost", + Port: 10001, + }, + ®istry.Node{ + Id: "test-2", + Address: "localhost", + Port: 10002, + }, + ®istry.Node{ + Id: "test-3", + Address: "localhost", + Port: 10002, + }, + }, + }) + + rs := newDefaultSelector(Registry(r)) + + next, err := rs.Select("test") + if err != nil { + t.Fatal(err) + } + + node, err := next() + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 4; i++ { + rs.Mark("test", node, errors.New("error")) + } + + next, err = rs.Select("test") + if err != nil { + t.Fatal(err) + } + + // still expecting 2 nodes + seen := make(map[string]bool) + + for i := 0; i < 10; i++ { + node, err = next() + if err != nil { + t.Fatal(err) + } + seen[node.Id] = true + } + + if len(seen) != 2 { + t.Fatalf("Expected seen to be 2 %+v", seen) + } + + // blacklist all of it + for i := 0; i < 9; i++ { + node, err = next() + if err != nil { + t.Fatal(err) + } + rs.Mark("test", node, errors.New("error")) + } + + next, err = rs.Select("test") + if err != ErrNoneAvailable { + t.Fatalf("Expected %v got %v", ErrNoneAvailable, err) + } + } diff --git a/selector/strategy.go b/selector/strategy.go index 639218cc..76b59904 100644 --- a/selector/strategy.go +++ b/selector/strategy.go @@ -22,7 +22,7 @@ func Random(services []*registry.Service) Next { return func() (*registry.Node, error) { if len(nodes) == 0 { - return nil, ErrNotFound + return nil, ErrNoneAvailable } i := rand.Int() % len(nodes) @@ -43,7 +43,7 @@ func RoundRobin(services []*registry.Service) Next { return func() (*registry.Node, error) { if len(nodes) == 0 { - return nil, ErrNotFound + return nil, ErrNoneAvailable } mtx.Lock()