Strip blacklist

This commit is contained in:
Asim 2016-06-19 14:41:33 +01:00
parent 95f1e80af4
commit 30e0fef615
5 changed files with 6 additions and 393 deletions

View File

@ -7,7 +7,6 @@ import (
"github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector" "github.com/micro/go-micro/selector"
"github.com/micro/go-micro/selector/internal/blacklist"
) )
/* /*
@ -27,9 +26,6 @@ type cacheSelector struct {
// used to close or reload watcher // used to close or reload watcher
reload chan bool reload chan bool
exit chan bool exit chan bool
// blacklist
bl *blacklist.BlackList
} }
var ( var (
@ -349,13 +345,6 @@ func (c *cacheSelector) Select(service string, opts ...selector.SelectOption) (s
services = filter(services) services = filter(services)
} }
/*
services, err = c.bl.Filter(services)
if err != nil {
return nil, err
}
*/
// if there's nothing left, return // if there's nothing left, return
if len(services) == 0 { if len(services) == 0 {
return nil, selector.ErrNoneAvailable return nil, selector.ErrNoneAvailable
@ -365,14 +354,11 @@ func (c *cacheSelector) Select(service string, opts ...selector.SelectOption) (s
} }
func (c *cacheSelector) Mark(service string, node *registry.Node, err error) { func (c *cacheSelector) Mark(service string, node *registry.Node, err error) {
c.bl.Mark(service, node, err) return
} }
func (c *cacheSelector) Reset(service string) { func (c *cacheSelector) Reset(service string) {
c.Lock() return
c.del(service)
c.Unlock()
c.bl.Reset(service)
} }
// Close stops the watcher and destroys the cache // Close stops the watcher and destroys the cache
@ -386,7 +372,6 @@ func (c *cacheSelector) Close() error {
return nil return nil
default: default:
close(c.exit) close(c.exit)
c.bl.Close()
} }
return nil return nil
} }
@ -423,7 +408,6 @@ func NewSelector(opts ...selector.Option) selector.Selector {
ttls: make(map[string]time.Time), ttls: make(map[string]time.Time),
reload: make(chan bool, 1), reload: make(chan bool, 1),
exit: make(chan bool), exit: make(chan bool),
bl: blacklist.New(),
} }
go c.run() go c.run()

View File

@ -5,13 +5,10 @@ import (
"time" "time"
"github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector/internal/blacklist"
) )
type defaultSelector struct { type defaultSelector struct {
so Options so Options
exit chan bool
bl *blacklist.BlackList
} }
func init() { func init() {
@ -49,14 +46,6 @@ func (r *defaultSelector) Select(service string, opts ...SelectOption) (Next, er
services = filter(services) services = filter(services)
} }
// apply the blacklist
/*
services, err = r.bl.Filter(services)
if err != nil {
return nil, err
}
*/
// if there's nothing left, return // if there's nothing left, return
if len(services) == 0 { if len(services) == 0 {
return nil, ErrNoneAvailable return nil, ErrNoneAvailable
@ -66,21 +55,14 @@ func (r *defaultSelector) Select(service string, opts ...SelectOption) (Next, er
} }
func (r *defaultSelector) Mark(service string, node *registry.Node, err error) { func (r *defaultSelector) Mark(service string, node *registry.Node, err error) {
r.bl.Mark(service, node, err) return
} }
func (r *defaultSelector) Reset(service string) { func (r *defaultSelector) Reset(service string) {
r.bl.Reset(service) return
} }
func (r *defaultSelector) Close() error { func (r *defaultSelector) Close() error {
select {
case <-r.exit:
return nil
default:
close(r.exit)
r.bl.Close()
}
return nil return nil
} }
@ -103,7 +85,5 @@ func newDefaultSelector(opts ...Option) Selector {
return &defaultSelector{ return &defaultSelector{
so: sopts, so: sopts,
exit: make(chan bool),
bl: blacklist.New(),
} }
} }

View File

@ -1,10 +1,8 @@
package selector package selector
import ( import (
"errors"
"testing" "testing"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/registry/mock" "github.com/micro/go-micro/registry/mock"
) )
@ -28,81 +26,3 @@ func TestDefaultSelector(t *testing.T) {
t.Logf("Default Counts %v", counts) t.Logf("Default Counts %v", counts)
} }
func TestBlackList(t *testing.T) {
return
r := mock.NewRegistry()
r.Register(&registry.Service{
Name: "test",
Nodes: []*registry.Node{
&registry.Node{
Id: "test-1",
Address: "localhost",
Port: 10001,
},
&registry.Node{
Id: "test-2",
Address: "localhost",
Port: 10002,
},
&registry.Node{
Id: "test-3",
Address: "localhost",
Port: 10002,
},
},
})
rs := newDefaultSelector(Registry(r))
next, err := rs.Select("test")
if err != nil {
t.Fatal(err)
}
node, err := next()
if err != nil {
t.Fatal(err)
}
for i := 0; i < 4; i++ {
rs.Mark("test", node, errors.New("error"))
}
next, err = rs.Select("test")
if err != nil {
t.Fatal(err)
}
// still expecting 2 nodes
seen := make(map[string]bool)
for i := 0; i < 10; i++ {
node, err = next()
if err != nil {
t.Fatal(err)
}
seen[node.Id] = true
}
if len(seen) != 2 {
t.Fatalf("Expected seen to be 2 %+v", seen)
}
// blacklist all of it
for i := 0; i < 20; i++ {
node, err = next()
if err != nil {
t.Fatal(err)
}
rs.Mark("test", node, errors.New("error"))
}
next, err = rs.Select("test")
if err != ErrNoneAvailable {
t.Fatalf("Expected %v got %v", ErrNoneAvailable, err)
}
}

View File

@ -1,164 +0,0 @@
package blacklist
import (
"math/rand"
"sync"
"time"
"github.com/micro/go-micro/registry"
)
type blackListNode struct {
age time.Time
id string
service string
count int
}
type BlackList struct {
ttl int
exit chan bool
sync.RWMutex
bl map[string]blackListNode
}
var (
// number of times we see an error before blacklisting
count = 3
// the ttl to blacklist for
ttl = 30
)
func init() {
rand.Seed(time.Now().Unix())
}
func (r *BlackList) purge() {
now := time.Now()
r.Lock()
for k, v := range r.bl {
if d := v.age.Sub(now); d.Seconds() < 0 {
delete(r.bl, k)
}
}
r.Unlock()
}
func (r *BlackList) run() {
t := time.NewTicker(time.Duration(r.ttl) * time.Second)
for {
select {
case <-r.exit:
t.Stop()
return
case <-t.C:
r.purge()
}
}
}
func (r *BlackList) Filter(services []*registry.Service) ([]*registry.Service, error) {
var viableServices []*registry.Service
r.RLock()
for _, service := range services {
var viableNodes []*registry.Node
for _, node := range service.Nodes {
n, ok := r.bl[node.Id]
if !ok {
// blacklist miss so add it
viableNodes = append(viableNodes, node)
continue
}
// got some blacklist info
// skip the node if it exceeds count
if n.count >= count {
continue
}
// doesn't exceed count, still viable
viableNodes = append(viableNodes, node)
}
if len(viableNodes) == 0 {
continue
}
viableService := new(registry.Service)
*viableService = *service
viableService.Nodes = viableNodes
viableServices = append(viableServices, viableService)
}
r.RUnlock()
return viableServices, nil
}
func (r *BlackList) Mark(service string, node *registry.Node, err error) {
r.Lock()
defer r.Unlock()
// reset when error is nil
// basically closing the circuit
if err == nil {
delete(r.bl, node.Id)
return
}
n, ok := r.bl[node.Id]
if !ok {
n = blackListNode{
id: node.Id,
service: service,
}
}
// mark it
n.count++
// set age to ttl seconds in future
n.age = time.Now().Add(time.Duration(r.ttl) * time.Second)
// save
r.bl[node.Id] = n
}
func (r *BlackList) Reset(service string) {
r.Lock()
defer r.Unlock()
for k, v := range r.bl {
// delete every node that matches the service
if v.service == service {
delete(r.bl, k)
}
}
}
func (r *BlackList) Close() error {
select {
case <-r.exit:
return nil
default:
close(r.exit)
}
return nil
}
func New() *BlackList {
bl := &BlackList{
ttl: ttl,
bl: make(map[string]blackListNode),
exit: make(chan bool),
}
go bl.run()
return bl
}

View File

@ -1,107 +0,0 @@
package blacklist
import (
"errors"
"testing"
"time"
"github.com/micro/go-micro/registry"
)
func TestBlackList(t *testing.T) {
bl := &BlackList{
ttl: 1,
bl: make(map[string]blackListNode),
exit: make(chan bool),
}
go bl.run()
defer bl.Close()
services := []*registry.Service{
&registry.Service{
Name: "foo",
Nodes: []*registry.Node{
&registry.Node{
Id: "foo-1",
Address: "localhost",
Port: 10001,
},
&registry.Node{
Id: "foo-2",
Address: "localhost",
Port: 10002,
},
&registry.Node{
Id: "foo-3",
Address: "localhost",
Port: 10002,
},
},
},
}
// check nothing is filtered on clean run
filterTest := func() {
for i := 0; i < 3; i++ {
srvs, err := bl.Filter(services)
if err != nil {
t.Fatal(err)
}
if len(srvs) != len(services) {
t.Fatal("nodes were filtered when they shouldn't be")
}
for _, node := range srvs[0].Nodes {
var seen bool
for _, n := range srvs[0].Nodes {
if n.Id == node.Id {
seen = true
break
}
}
if !seen {
t.Fatalf("Missing node %s", node.Id)
}
}
}
}
// run filter test
filterTest()
blacklistTest := func() {
// test blacklisting
// mark until failure
for i := 0; i < count+1; i++ {
for _, node := range services[0].Nodes {
bl.Mark("foo", node, errors.New("blacklist"))
}
}
filtered, err := bl.Filter(services)
if err != nil {
t.Fatal(err)
}
if len(filtered) > 0 {
t.Fatalf("Expected zero nodes got %+v", filtered)
}
}
// sleep the ttl duration
time.Sleep(time.Second * time.Duration(bl.ttl) * 2)
// now run filterTest again
filterTest()
// run the blacklist test
blacklistTest()
// reset
bl.Reset("foo")
// check again
filterTest()
}