From 77e4d4d9c4947fbf82b3838315d51f22c109871c Mon Sep 17 00:00:00 2001 From: Asim Date: Fri, 6 May 2016 23:12:37 +0100 Subject: [PATCH] Next phase of selector --- selector/blacklist/black_list_selector.go | 182 ------------------ .../blacklist/black_list_selector_test.go | 73 ------- selector/default.go | 45 ++++- selector/internal/blacklist/blacklist.go | 164 ++++++++++++++++ selector/internal/blacklist/blacklist_test.go | 107 ++++++++++ 5 files changed, 312 insertions(+), 259 deletions(-) delete mode 100644 selector/blacklist/black_list_selector.go delete mode 100644 selector/blacklist/black_list_selector_test.go create mode 100644 selector/internal/blacklist/blacklist.go create mode 100644 selector/internal/blacklist/blacklist_test.go diff --git a/selector/blacklist/black_list_selector.go b/selector/blacklist/black_list_selector.go deleted file mode 100644 index ff7e2b19..00000000 --- a/selector/blacklist/black_list_selector.go +++ /dev/null @@ -1,182 +0,0 @@ -package blacklist - -import ( - "math/rand" - "sync" - "time" - - "github.com/micro/go-micro/registry" - "github.com/micro/go-micro/selector" -) - -type blackListNode struct { - age time.Time - id string - service string -} - -type blackListSelector struct { - so selector.Options - ttl int64 - exit chan bool - once sync.Once - - sync.RWMutex - bl map[string]blackListNode -} - -func init() { - rand.Seed(time.Now().Unix()) -} - -func (r *blackListSelector) purge() { - now := time.Now() - r.Lock() - for k, v := range r.bl { - if d := v.age.Sub(now); d.Seconds() < 0 { - delete(r.bl, k) - } - } - r.Unlock() -} - -func (r *blackListSelector) run() { - t := time.NewTicker(time.Duration(r.ttl) * time.Second) - - for { - select { - case <-r.exit: - t.Stop() - return - case <-t.C: - r.purge() - } - } -} - -func (r *blackListSelector) Init(opts ...selector.Option) error { - for _, o := range opts { - o(&r.so) - } - return nil -} - -func (r *blackListSelector) Options() selector.Options { - return r.so -} - -func (r *blackListSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) { - var sopts selector.SelectOptions - for _, opt := range opts { - opt(&sopts) - } - - // get the service - services, err := r.so.Registry.GetService(service) - if err != nil { - return nil, err - } - - // apply the filters - for _, filter := range sopts.Filters { - services = filter(services) - } - - // if there's nothing left, return - if len(services) == 0 { - return nil, selector.ErrNotFound - } - - var nodes []*registry.Node - - for _, service := range services { - for _, node := range service.Nodes { - nodes = append(nodes, node) - } - } - - if len(nodes) == 0 { - return nil, selector.ErrNotFound - } - - return func() (*registry.Node, error) { - var viable []*registry.Node - - r.RLock() - for _, node := range nodes { - if _, ok := r.bl[node.Id]; !ok { - viable = append(viable, node) - } - } - r.RUnlock() - - if len(viable) == 0 { - return nil, selector.ErrNoneAvailable - } - - return viable[rand.Int()%len(viable)], nil - }, nil -} - -func (r *blackListSelector) Mark(service string, node *registry.Node, err error) { - r.Lock() - defer r.Unlock() - if err == nil { - delete(r.bl, node.Id) - return - } - - r.bl[node.Id] = blackListNode{ - age: time.Now().Add(time.Duration(r.ttl) * time.Second), - id: node.Id, - service: service, - } - return -} - -func (r *blackListSelector) Reset(service string) { - r.Lock() - defer r.Unlock() - for k, v := range r.bl { - if v.service == service { - delete(r.bl, k) - } - } - return -} - -func (r *blackListSelector) Close() error { - r.once.Do(func() { - close(r.exit) - }) - return nil -} - -func (r *blackListSelector) String() string { - return "blacklist" -} - -func NewSelector(opts ...selector.Option) selector.Selector { - var sopts selector.Options - - for _, opt := range opts { - opt(&sopts) - } - - if sopts.Registry == nil { - sopts.Registry = registry.DefaultRegistry - } - - var once sync.Once - bl := &blackListSelector{ - once: once, - so: sopts, - ttl: 60, - bl: make(map[string]blackListNode), - exit: make(chan bool), - } - - go bl.run() - - return bl -} diff --git a/selector/blacklist/black_list_selector_test.go b/selector/blacklist/black_list_selector_test.go deleted file mode 100644 index a9fefc75..00000000 --- a/selector/blacklist/black_list_selector_test.go +++ /dev/null @@ -1,73 +0,0 @@ -package blacklist - -import ( - "errors" - "testing" - "time" - - "github.com/micro/go-micro/registry/mock" - "github.com/micro/go-micro/selector" -) - -func TestBlackListSelector(t *testing.T) { - counts := map[string]int{} - - bl := &blackListSelector{ - so: selector.Options{ - Registry: mock.NewRegistry(), - }, - ttl: 2, - bl: make(map[string]blackListNode), - exit: make(chan bool), - } - - go bl.run() - defer bl.Close() - - next, err := bl.Select("foo") - if err != nil { - t.Errorf("Unexpected error calling bl select: %v", err) - } - - for i := 0; i < 100; i++ { - node, err := next() - if err != nil { - t.Errorf("Expected node err, got err: %v", err) - } - counts[node.Id]++ - } - - t.Logf("BlackList Counts %v", counts) - - // test blacklisting - for i := 0; i < 4; i++ { - node, err := next() - if err != nil { - t.Errorf("Expected node err, got err: %v", err) - } - bl.Mark("foo", node, errors.New("blacklist")) - } - if node, err := next(); err != selector.ErrNoneAvailable { - t.Errorf("Expected none available err, got node %v err %v", node, err) - } - time.Sleep(time.Second * time.Duration(bl.ttl) * 2) - if _, err := next(); err != nil { - t.Errorf("Unexpected err %v", err) - } - - // test resetting - for i := 0; i < 4; i++ { - node, err := next() - if err != nil { - t.Errorf("Unexpected err: %v", err) - } - bl.Mark("foo", node, errors.New("blacklist")) - } - if node, err := next(); err != selector.ErrNoneAvailable { - t.Errorf("Expected none available err, got node %v err %v", node, err) - } - bl.Reset("foo") - if _, err := next(); err != nil { - t.Errorf("Unexpected err %v", err) - } -} diff --git a/selector/default.go b/selector/default.go index 48a23914..7f12e127 100644 --- a/selector/default.go +++ b/selector/default.go @@ -5,16 +5,33 @@ import ( "time" "github.com/micro/go-micro/registry" + "github.com/micro/go-micro/selector/internal/blacklist" ) type defaultSelector struct { - so Options + so Options + exit chan bool + bl *blacklist.BlackList } func init() { rand.Seed(time.Now().Unix()) } +func (r *defaultSelector) run() { + t := time.NewTicker(time.Second * 30) + + for { + select { + case <-t.C: + // TODO + case <-r.exit: + t.Stop() + return + } + } +} + func (r *defaultSelector) Init(opts ...Option) error { for _, o := range opts { o(&r.so) @@ -46,6 +63,12 @@ func (r *defaultSelector) Select(service string, opts ...SelectOption) (Next, er services = filter(services) } + // apply the blacklist + services, err = r.bl.Filter(services) + if err != nil { + return nil, err + } + // if there's nothing left, return if len(services) == 0 { return nil, ErrNotFound @@ -55,14 +78,21 @@ func (r *defaultSelector) Select(service string, opts ...SelectOption) (Next, er } func (r *defaultSelector) Mark(service string, node *registry.Node, err error) { - return + r.bl.Mark(service, node, err) } func (r *defaultSelector) Reset(service string) { - return + r.bl.Reset(service) } func (r *defaultSelector) Close() error { + select { + case <-r.exit: + return nil + default: + close(r.exit) + r.bl.Close() + } return nil } @@ -83,5 +113,12 @@ func newDefaultSelector(opts ...Option) Selector { sopts.Registry = registry.DefaultRegistry } - return &defaultSelector{sopts} + se := &defaultSelector{ + so: sopts, + exit: make(chan bool), + bl: blacklist.New(), + } + + go se.run() + return se } diff --git a/selector/internal/blacklist/blacklist.go b/selector/internal/blacklist/blacklist.go new file mode 100644 index 00000000..fc72c059 --- /dev/null +++ b/selector/internal/blacklist/blacklist.go @@ -0,0 +1,164 @@ +package blacklist + +import ( + "math/rand" + "sync" + "time" + + "github.com/micro/go-micro/registry" +) + +type blackListNode struct { + age time.Time + id string + service string + count int +} + +type BlackList struct { + ttl int + exit chan bool + + sync.RWMutex + bl map[string]blackListNode +} + +var ( + // number of times we see an error before blacklisting + count = 3 + + // the ttl to blacklist for + ttl = 30 +) + +func init() { + rand.Seed(time.Now().Unix()) +} + +func (r *BlackList) purge() { + now := time.Now() + r.Lock() + for k, v := range r.bl { + if d := v.age.Sub(now); d.Seconds() < 0 { + delete(r.bl, k) + } + } + r.Unlock() +} + +func (r *BlackList) run() { + t := time.NewTicker(time.Duration(r.ttl) * time.Second) + + for { + select { + case <-r.exit: + t.Stop() + return + case <-t.C: + r.purge() + } + } +} + +func (r *BlackList) Filter(services []*registry.Service) ([]*registry.Service, error) { + var viableServices []*registry.Service + + r.RLock() + + for _, service := range services { + var viableNodes []*registry.Node + + for _, node := range service.Nodes { + n, ok := r.bl[node.Id] + if !ok { + // blacklist miss so add it + viableNodes = append(viableNodes, node) + continue + } + + // got some blacklist info + // skip the node if it exceeds count + if n.count >= count { + continue + } + + // doesn't exceed count, still viable + viableNodes = append(viableNodes, node) + } + + if len(viableNodes) == 0 { + continue + } + + viableService := new(registry.Service) + *viableService = *service + viableService.Nodes = viableNodes + viableServices = append(viableServices, viableService) + } + + r.RUnlock() + + return viableServices, nil +} + +func (r *BlackList) Mark(service string, node *registry.Node, err error) { + r.Lock() + defer r.Unlock() + + // reset when error is nil + // basically closing the circuit + if err == nil { + delete(r.bl, node.Id) + return + } + + n, ok := r.bl[node.Id] + if !ok { + n = blackListNode{ + id: node.Id, + service: service, + } + } + + // mark it + n.count++ + + // set age to ttl seconds in future + n.age = time.Now().Add(time.Duration(r.ttl) * time.Second) + + // save + r.bl[node.Id] = n +} + +func (r *BlackList) Reset(service string) { + r.Lock() + defer r.Unlock() + + for k, v := range r.bl { + // delete every node that matches the service + if v.service == service { + delete(r.bl, k) + } + } +} + +func (r *BlackList) Close() error { + select { + case <-r.exit: + return nil + default: + close(r.exit) + } + return nil +} + +func New() *BlackList { + bl := &BlackList{ + ttl: ttl, + bl: make(map[string]blackListNode), + exit: make(chan bool), + } + + go bl.run() + return bl +} diff --git a/selector/internal/blacklist/blacklist_test.go b/selector/internal/blacklist/blacklist_test.go new file mode 100644 index 00000000..656b8c28 --- /dev/null +++ b/selector/internal/blacklist/blacklist_test.go @@ -0,0 +1,107 @@ +package blacklist + +import ( + "errors" + "testing" + "time" + + "github.com/micro/go-micro/registry" +) + +func TestBlackList(t *testing.T) { + bl := &BlackList{ + ttl: 1, + bl: make(map[string]blackListNode), + exit: make(chan bool), + } + + go bl.run() + defer bl.Close() + + services := []*registry.Service{ + ®istry.Service{ + Name: "foo", + Nodes: []*registry.Node{ + ®istry.Node{ + Id: "foo-1", + Address: "localhost", + Port: 10001, + }, + ®istry.Node{ + Id: "foo-2", + Address: "localhost", + Port: 10002, + }, + ®istry.Node{ + Id: "foo-3", + Address: "localhost", + Port: 10002, + }, + }, + }, + } + + // check nothing is filtered on clean run + filterTest := func() { + for i := 0; i < 3; i++ { + srvs, err := bl.Filter(services) + if err != nil { + t.Fatal(err) + } + + if len(srvs) != len(services) { + t.Fatal("nodes were filtered when they shouldn't be") + } + + for _, node := range srvs[0].Nodes { + var seen bool + for _, n := range srvs[0].Nodes { + if n.Id == node.Id { + seen = true + break + } + } + if !seen { + t.Fatalf("Missing node %s", node.Id) + } + } + } + } + + // run filter test + filterTest() + + blacklistTest := func() { + // test blacklisting + // mark until failure + for i := 0; i < count+1; i++ { + for _, node := range services[0].Nodes { + bl.Mark("foo", node, errors.New("blacklist")) + } + } + + filtered, err := bl.Filter(services) + if err != nil { + t.Fatal(err) + } + + if len(filtered) > 0 { + t.Fatalf("Expected zero nodes got %+v", filtered) + } + } + + // sleep the ttl duration + time.Sleep(time.Second * time.Duration(bl.ttl) * 2) + + // now run filterTest again + filterTest() + + // run the blacklist test + blacklistTest() + + // reset + bl.Reset("foo") + + // check again + filterTest() +}