Merge pull request #25 from micro/selector

Add selector interface and implementation
This commit is contained in:
Asim 2015-12-09 19:55:11 +00:00
commit b4e319ee65
22 changed files with 1090 additions and 248 deletions

View File

@ -16,6 +16,7 @@ Feature | Package | Built-in Plugin | Description
Discovery | [Registry](https://godoc.org/github.com/micro/go-micro/registry) | consul | A way of locating services to communicate with Discovery | [Registry](https://godoc.org/github.com/micro/go-micro/registry) | consul | A way of locating services to communicate with
Client | [Client](https://godoc.org/github.com/micro/go-micro/client) | rpc | Used to make RPC requests to a service Client | [Client](https://godoc.org/github.com/micro/go-micro/client) | rpc | Used to make RPC requests to a service
Codec | [Codec](https://godoc.org/github.com/micro/go-micro/codec) | proto,json | Encoding/Decoding handler for requests Codec | [Codec](https://godoc.org/github.com/micro/go-micro/codec) | proto,json | Encoding/Decoding handler for requests
Selector | [Selector](https://godoc.org/github.com/micro/go-micro/selector) | random | Service node filter and pool
Server | [Server](https://godoc.org/github.com/micro/go-micro/server) | rpc | Listens and serves RPC requests Server | [Server](https://godoc.org/github.com/micro/go-micro/server) | rpc | Listens and serves RPC requests
Pub/Sub | [Broker](https://godoc.org/github.com/micro/go-micro/broker) | http | Publish and Subscribe to events Pub/Sub | [Broker](https://godoc.org/github.com/micro/go-micro/broker) | http | Publish and Subscribe to events
Transport | [Transport](https://godoc.org/github.com/micro/go-micro/transport) | http | Communication mechanism between services Transport | [Transport](https://godoc.org/github.com/micro/go-micro/transport) | http | Communication mechanism between services

View File

@ -1,9 +0,0 @@
package client
import (
"github.com/micro/go-micro/registry"
)
// Selector takes a Registry and returns a NodeSelector.
// Used by the client to initialise a selector.
type Selector func(registry.Registry) NodeSelector

View File

@ -1,60 +0,0 @@
package client
import (
"math/rand"
"time"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/registry"
"golang.org/x/net/context"
)
// NodeSelector is used to Retrieve a node to which a request
// should be routed. It takes context and Request and returns a
// single node. If a node cannot be selected it should return
// an error. Response is called to inform the selector of the
// response from a client call. Reset is called to zero out
// any state.
type NodeSelector interface {
Select(context.Context, Request) (*registry.Node, error)
Response(*registry.Node, error)
Reset()
}
func init() {
rand.Seed(time.Now().UnixNano())
}
// Built in random hashed node selector
type nodeSelector struct {
r registry.Registry
}
func (n *nodeSelector) Select(ctx context.Context, req Request) (*registry.Node, error) {
service, err := n.r.GetService(req.Service())
if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
}
if len(service) == 0 {
return nil, errors.NotFound("go.micro.client", "Service not found")
}
i := rand.Int()
j := i % len(service)
if len(service[j].Nodes) == 0 {
return nil, errors.NotFound("go.micro.client", "Service not found")
}
k := i % len(service[j].Nodes)
return service[j].Nodes[k], nil
}
func (n *nodeSelector) Response(node *registry.Node, err error) {
return
}
func (n *nodeSelector) Reset() {
return
}

View File

@ -1,70 +0,0 @@
package client
import (
"testing"
"github.com/micro/go-micro/registry"
"golang.org/x/net/context"
)
type mockRegistry struct{}
func (m *mockRegistry) GetService(service string) ([]*registry.Service, error) {
return []*registry.Service{
{
Name: "foo",
Version: "1.0.0",
Nodes: []*registry.Node{
{
Id: "foo-123",
Address: "localhost",
Port: 9999,
},
},
},
{
Name: "foo",
Version: "1.0.1",
Nodes: []*registry.Node{
{
Id: "foo-321",
Address: "localhost",
Port: 6666,
},
},
},
}, nil
}
func (m *mockRegistry) ListServices() ([]*registry.Service, error) {
return []*registry.Service{}, nil
}
func (m *mockRegistry) Register(s *registry.Service) error {
return nil
}
func (m *mockRegistry) Deregister(s *registry.Service) error {
return nil
}
func (m *mockRegistry) Watch() (registry.Watcher, error) {
return nil, nil
}
func TestNodeSelector(t *testing.T) {
counts := map[string]int{}
n := &nodeSelector{
&mockRegistry{},
}
for i := 0; i < 100; i++ {
n, err := n.Select(context.Background(), newRpcRequest("foo", "Foo.Bar", nil, ""))
if err != nil {
t.Errorf("Expected node, got err: %v", err)
}
counts[n.Id]++
}
t.Logf("Counts %v", counts)
}

View File

@ -4,6 +4,7 @@ import (
"github.com/micro/go-micro/broker" "github.com/micro/go-micro/broker"
"github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec"
"github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector"
"github.com/micro/go-micro/transport" "github.com/micro/go-micro/transport"
) )
@ -12,12 +13,14 @@ type options struct {
broker broker.Broker broker broker.Broker
codecs map[string]codec.NewCodec codecs map[string]codec.NewCodec
registry registry.Registry registry registry.Registry
selector selector.Selector
transport transport.Transport transport transport.Transport
wrappers []Wrapper wrappers []Wrapper
selector Selector
} }
type callOptions struct{} type callOptions struct {
selectOptions []selector.SelectOption
}
type publishOptions struct{} type publishOptions struct{}
@ -57,7 +60,7 @@ func Transport(t transport.Transport) Option {
} }
// Select is used to select a node to route a request to // Select is used to select a node to route a request to
func Select(s Selector) Option { func Selector(s selector.Selector) Option {
return func(o *options) { return func(o *options) {
o.selector = s o.selector = s
} }
@ -69,3 +72,11 @@ func Wrap(w Wrapper) Option {
o.wrappers = append(o.wrappers, w) o.wrappers = append(o.wrappers, w)
} }
} }
// Call Options
func WithSelectOption(so selector.SelectOption) CallOption {
return func(o *callOptions) {
o.selectOptions = append(o.selectOptions, so)
}
}

View File

@ -10,6 +10,7 @@ import (
c "github.com/micro/go-micro/context" c "github.com/micro/go-micro/context"
"github.com/micro/go-micro/errors" "github.com/micro/go-micro/errors"
"github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector"
"github.com/micro/go-micro/transport" "github.com/micro/go-micro/transport"
"golang.org/x/net/context" "golang.org/x/net/context"
@ -18,11 +19,9 @@ import (
type rpcClient struct { type rpcClient struct {
once sync.Once once sync.Once
opts options opts options
sel NodeSelector
} }
func newRpcClient(opt ...Option) Client { func newRpcClient(opt ...Option) Client {
var sel NodeSelector
var once sync.Once var once sync.Once
opts := options{ opts := options{
@ -37,28 +36,27 @@ func newRpcClient(opt ...Option) Client {
opts.contentType = defaultContentType opts.contentType = defaultContentType
} }
if opts.transport == nil { if opts.broker == nil {
opts.transport = transport.DefaultTransport opts.broker = broker.DefaultBroker
} }
if opts.registry == nil { if opts.registry == nil {
opts.registry = registry.DefaultRegistry opts.registry = registry.DefaultRegistry
} }
if opts.broker == nil { if opts.selector == nil {
opts.broker = broker.DefaultBroker opts.selector = selector.NewSelector(
selector.Registry(opts.registry),
)
} }
if opts.selector != nil { if opts.transport == nil {
sel = opts.selector(opts.registry) opts.transport = transport.DefaultTransport
} else {
sel = &nodeSelector{opts.registry}
} }
rc := &rpcClient{ rc := &rpcClient{
once: once, once: once,
opts: opts, opts: opts,
sel: sel,
} }
c := Client(rc) c := Client(rc)
@ -153,9 +151,23 @@ func (r *rpcClient) CallRemote(ctx context.Context, address string, request Requ
} }
func (r *rpcClient) Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error { func (r *rpcClient) Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error {
node, err := r.sel.Select(ctx, request) var copts callOptions
if err != nil { for _, opt := range opts {
return err opt(&copts)
}
next, err := r.opts.selector.Select(request.Service(), copts.selectOptions...)
if err != nil && err == selector.ErrNotFound {
return errors.NotFound("go.micro.client", err.Error())
} else if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
node, err := next()
if err != nil && err == selector.ErrNotFound {
return errors.NotFound("go.micro.client", err.Error())
} else if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
} }
address := node.Address address := node.Address
@ -164,7 +176,7 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
} }
err = r.call(ctx, address, request, response) err = r.call(ctx, address, request, response)
r.sel.Response(node, err) r.opts.selector.Mark(request.Service(), node, err)
return err return err
} }
@ -173,9 +185,23 @@ func (r *rpcClient) StreamRemote(ctx context.Context, address string, request Re
} }
func (r *rpcClient) Stream(ctx context.Context, request Request, responseChan interface{}, opts ...CallOption) (Streamer, error) { func (r *rpcClient) Stream(ctx context.Context, request Request, responseChan interface{}, opts ...CallOption) (Streamer, error) {
node, err := r.sel.Select(ctx, request) var copts callOptions
if err != nil { for _, opt := range opts {
return nil, err opt(&copts)
}
next, err := r.opts.selector.Select(request.Service(), copts.selectOptions...)
if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", err.Error())
} else if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
}
node, err := next()
if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", err.Error())
} else if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
} }
address := node.Address address := node.Address
@ -184,7 +210,7 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, responseChan in
} }
stream, err := r.stream(ctx, address, request, responseChan) stream, err := r.stream(ctx, address, request, responseChan)
r.sel.Response(node, err) r.opts.selector.Mark(request.Service(), node, err)
return stream, err return stream, err
} }

View File

@ -19,6 +19,7 @@ import (
"github.com/micro/go-micro/broker" "github.com/micro/go-micro/broker"
"github.com/micro/go-micro/client" "github.com/micro/go-micro/client"
"github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector"
"github.com/micro/go-micro/server" "github.com/micro/go-micro/server"
"github.com/micro/go-micro/transport" "github.com/micro/go-micro/transport"
"github.com/pborman/uuid" "github.com/pborman/uuid"
@ -80,6 +81,12 @@ var (
EnvVar: "MICRO_REGISTRY_ADDRESS", EnvVar: "MICRO_REGISTRY_ADDRESS",
Usage: "Comma-separated list of registry addresses", Usage: "Comma-separated list of registry addresses",
}, },
cli.StringFlag{
Name: "selector",
EnvVar: "MICRO_SELECTOR",
Value: "selector",
Usage: "Selector used to pick nodes for querying. random, roundrobin, blacklist",
},
cli.StringFlag{ cli.StringFlag{
Name: "transport", Name: "transport",
EnvVar: "MICRO_TRANSPORT", EnvVar: "MICRO_TRANSPORT",
@ -137,6 +144,10 @@ var (
"consul": registry.NewRegistry, "consul": registry.NewRegistry,
} }
Selectors = map[string]func(...selector.Option) selector.Selector{
"random": selector.NewSelector,
}
Transports = map[string]func([]string, ...transport.Option) transport.Transport{ Transports = map[string]func([]string, ...transport.Option) transport.Transport{
"http": transport.NewTransport, "http": transport.NewTransport,
} }
@ -214,6 +225,10 @@ func Setup(c *cli.Context) error {
registry.DefaultRegistry = r(strings.Split(c.String("registry_address"), ",")) registry.DefaultRegistry = r(strings.Split(c.String("registry_address"), ","))
} }
if s, ok := Selectors[c.String("selector")]; ok {
selector.DefaultSelector = s(selector.Registry(registry.DefaultRegistry))
}
if t, ok := Transports[c.String("transport")]; ok { if t, ok := Transports[c.String("transport")]; ok {
transport.DefaultTransport = t(strings.Split(c.String("transport_address"), ",")) transport.DefaultTransport = t(strings.Split(c.String("transport_address"), ","))
} }

View File

@ -0,0 +1,88 @@
package main
import (
"fmt"
"math/rand"
"time"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/cmd"
c "github.com/micro/go-micro/context"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector"
"golang.org/x/net/context"
example "github.com/micro/go-micro/examples/server/proto/example"
)
func init() {
rand.Seed(time.Now().Unix())
}
// A Wrapper that creates a Datacenter Selector Option
type dcWrapper struct {
client.Client
}
func (dc *dcWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
md, _ := c.GetMetadata(ctx)
filter := func(services []*registry.Service) []*registry.Service {
for _, service := range services {
var nodes []*registry.Node
for _, node := range service.Nodes {
if node.Metadata["datacenter"] == md["datacenter"] {
nodes = append(nodes, node)
}
}
service.Nodes = nodes
}
return services
}
callOptions := append(opts, client.WithSelectOption(
selector.Filter(filter),
))
fmt.Printf("[DC Wrapper] filtering for datacenter %s\n", md["datacenter"])
return dc.Client.Call(ctx, req, rsp, callOptions...)
}
func NewDCWrapper(c client.Client) client.Client {
return &dcWrapper{c}
}
func call(i int) {
// Create new request to service go.micro.srv.example, method Example.Call
req := client.NewRequest("go.micro.srv.example", "Example.Call", &example.Request{
Name: "John",
})
// create context with metadata
ctx := c.WithMetadata(context.Background(), map[string]string{
"datacenter": "local",
})
rsp := &example.Response{}
// Call service
if err := client.Call(ctx, req, rsp); err != nil {
fmt.Println("call err: ", err, rsp)
return
}
fmt.Println("Call:", i, "rsp:", rsp.Msg)
}
func main() {
cmd.Init()
client.DefaultClient = client.NewClient(
client.Wrap(NewDCWrapper),
)
fmt.Println("\n--- Call example ---\n")
for i := 0; i < 10; i++ {
call(i)
}
}

View File

@ -0,0 +1,119 @@
package main
import (
"fmt"
"math/rand"
"sync"
"time"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/cmd"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector"
"golang.org/x/net/context"
example "github.com/micro/go-micro/examples/server/proto/example"
)
// Built in random hashed node selector
type dcSelector struct {
opts selector.Options
}
var (
datacenter = "local"
)
func init() {
rand.Seed(time.Now().Unix())
}
func (n *dcSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) {
services, err := n.opts.Registry.GetService(service)
if err != nil {
return nil, err
}
if len(services) == 0 {
return nil, selector.ErrNotFound
}
var nodes []*registry.Node
// Filter the nodes for datacenter
for _, service := range services {
for _, node := range service.Nodes {
if node.Metadata["datacenter"] == datacenter {
nodes = append(nodes, node)
}
}
}
if len(nodes) == 0 {
return nil, selector.ErrNotFound
}
var i int
var mtx sync.Mutex
return func() (*registry.Node, error) {
mtx.Lock()
defer mtx.Unlock()
i++
return nodes[i%len(nodes)], nil
}, nil
}
func (n *dcSelector) Mark(service string, node *registry.Node, err error) {
return
}
func (n *dcSelector) Reset(service string) {
return
}
func (n *dcSelector) Close() error {
return nil
}
// Return a new first node selector
func DCSelector(opts ...selector.Option) selector.Selector {
var sopts selector.Options
for _, opt := range opts {
opt(&sopts)
}
if sopts.Registry == nil {
sopts.Registry = registry.DefaultRegistry
}
return &dcSelector{sopts}
}
func call(i int) {
// Create new request to service go.micro.srv.example, method Example.Call
req := client.NewRequest("go.micro.srv.example", "Example.Call", &example.Request{
Name: "John",
})
rsp := &example.Response{}
// Call service
if err := client.Call(context.Background(), req, rsp); err != nil {
fmt.Println("call err: ", err, rsp)
return
}
fmt.Println("Call:", i, "rsp:", rsp.Msg)
}
func main() {
cmd.Init()
client.DefaultClient = client.NewClient(
client.Selector(DCSelector()),
)
fmt.Println("\n--- Call example ---\n")
for i := 0; i < 10; i++ {
call(i)
}
}

View File

@ -1,87 +0,0 @@
package main
import (
"fmt"
"math/rand"
"time"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/cmd"
"github.com/micro/go-micro/errors"
example "github.com/micro/go-micro/examples/server/proto/example"
"github.com/micro/go-micro/registry"
"golang.org/x/net/context"
)
func init() {
rand.Seed(time.Now().Unix())
}
// Built in random hashed node selector
type nodeSelector struct {
r registry.Registry
}
func (n *nodeSelector) Select(ctx context.Context, req client.Request) (*registry.Node, error) {
service, err := n.r.GetService(req.Service())
if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
}
if len(service) == 0 {
return nil, errors.NotFound("go.micro.client", "Service not found")
}
i := rand.Int()
j := i % len(service)
if len(service[j].Nodes) == 0 {
return nil, errors.NotFound("go.micro.client", "Service not found")
}
k := i % len(service[j].Nodes)
return service[j].Nodes[k], nil
}
func (n *nodeSelector) Response(node *registry.Node, err error) {
return
}
func (n *nodeSelector) Reset() {
return
}
// Return a new random node selector
func RandomSelector(r registry.Registry) client.NodeSelector {
return &nodeSelector{r}
}
func call(i int) {
// Create new request to service go.micro.srv.example, method Example.Call
req := client.NewRequest("go.micro.srv.example", "Example.Call", &example.Request{
Name: "John",
})
rsp := &example.Response{}
// Call service
if err := client.Call(context.Background(), req, rsp); err != nil {
fmt.Println("call err: ", err, rsp)
return
}
fmt.Println("Call:", i, "rsp:", rsp.Msg)
}
func main() {
cmd.Init()
client.DefaultClient = client.NewClient(
client.Select(RandomSelector),
)
fmt.Println("\n--- Call example ---\n")
for i := 0; i < 10; i++ {
call(i)
}
}

View File

@ -0,0 +1,109 @@
package main
import (
"fmt"
"math/rand"
"time"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/cmd"
example "github.com/micro/go-micro/examples/server/proto/example"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector"
"golang.org/x/net/context"
)
func init() {
rand.Seed(time.Now().Unix())
}
// Built in random hashed node selector
type firstNodeSelector struct {
opts selector.Options
}
func (n *firstNodeSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) {
services, err := n.opts.Registry.GetService(service)
if err != nil {
return nil, err
}
if len(services) == 0 {
return nil, selector.ErrNotFound
}
var sopts selector.SelectOptions
for _, opt := range opts {
opt(&sopts)
}
for _, filter := range sopts.Filters {
services = filter(services)
}
if len(services) == 0 {
return nil, selector.ErrNotFound
}
if len(services[0].Nodes) == 0 {
return nil, selector.ErrNotFound
}
return func() (*registry.Node, error) {
return services[0].Nodes[0], nil
}, nil
}
func (n *firstNodeSelector) Mark(service string, node *registry.Node, err error) {
return
}
func (n *firstNodeSelector) Reset(service string) {
return
}
func (n *firstNodeSelector) Close() error {
return nil
}
// Return a new first node selector
func FirstNodeSelector(opts ...selector.Option) selector.Selector {
var sopts selector.Options
for _, opt := range opts {
opt(&sopts)
}
if sopts.Registry == nil {
sopts.Registry = registry.DefaultRegistry
}
return &firstNodeSelector{sopts}
}
func call(i int) {
// Create new request to service go.micro.srv.example, method Example.Call
req := client.NewRequest("go.micro.srv.example", "Example.Call", &example.Request{
Name: "John",
})
rsp := &example.Response{}
// Call service
if err := client.Call(context.Background(), req, rsp); err != nil {
fmt.Println("call err: ", err, rsp)
return
}
fmt.Println("Call:", i, "rsp:", rsp.Msg)
}
func main() {
cmd.Init()
client.DefaultClient = client.NewClient(
client.Selector(FirstNodeSelector()),
)
fmt.Println("\n--- Call example ---\n")
for i := 0; i < 10; i++ {
call(i)
}
}

View File

@ -31,9 +31,13 @@ func main() {
// optionally setup command line usage // optionally setup command line usage
cmd.Init() cmd.Init()
md := server.Config().Metadata()
md["datacenter"] = "local"
server.DefaultServer = server.NewServer( server.DefaultServer = server.NewServer(
server.WrapHandler(logWrapper), server.WrapHandler(logWrapper),
server.WrapSubscriber(logSubWrapper), server.WrapSubscriber(logSubWrapper),
server.Metadata(md),
) )
// Initialise Server // Initialise Server

70
registry/mock/mock.go Normal file
View File

@ -0,0 +1,70 @@
package mock
import (
"github.com/micro/go-micro/registry"
)
type MockRegistry struct{}
func (m *MockRegistry) GetService(service string) ([]*registry.Service, error) {
return []*registry.Service{
{
Name: "foo",
Version: "1.0.0",
Nodes: []*registry.Node{
{
Id: "foo-1.0.0-123",
Address: "localhost",
Port: 9999,
},
{
Id: "foo-1.0.0-321",
Address: "localhost",
Port: 9999,
},
},
},
{
Name: "foo",
Version: "1.0.1",
Nodes: []*registry.Node{
{
Id: "foo-1.0.1-321",
Address: "localhost",
Port: 6666,
},
},
},
{
Name: "foo",
Version: "1.0.3",
Nodes: []*registry.Node{
{
Id: "foo-1.0.3-345",
Address: "localhost",
Port: 8888,
},
},
},
}, nil
}
func (m *MockRegistry) ListServices() ([]*registry.Service, error) {
return []*registry.Service{}, nil
}
func (m *MockRegistry) Register(s *registry.Service) error {
return nil
}
func (m *MockRegistry) Deregister(s *registry.Service) error {
return nil
}
func (m *MockRegistry) Watch() (registry.Watcher, error) {
return nil, nil
}
func NewRegistry() *MockRegistry {
return &MockRegistry{}
}

View File

@ -0,0 +1,169 @@
package blacklist
import (
"math/rand"
"sync"
"time"
"github.com/micro/go-micro/cmd"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector"
)
type blackListNode struct {
age time.Time
id string
service string
}
type blackListSelector struct {
so selector.Options
ttl int64
exit chan bool
once sync.Once
sync.RWMutex
bl map[string]blackListNode
}
func init() {
cmd.Selectors["blacklist"] = NewSelector
rand.Seed(time.Now().Unix())
}
func (r *blackListSelector) purge() {
now := time.Now()
r.Lock()
for k, v := range r.bl {
if d := v.age.Sub(now); d.Seconds() < 0 {
delete(r.bl, k)
}
}
r.Unlock()
}
func (r *blackListSelector) run() {
t := time.NewTicker(time.Duration(r.ttl) * time.Second)
for {
select {
case <-r.exit:
t.Stop()
return
case <-t.C:
r.purge()
}
}
}
func (r *blackListSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) {
var sopts selector.SelectOptions
for _, opt := range opts {
opt(&sopts)
}
// get the service
services, err := r.so.Registry.GetService(service)
if err != nil {
return nil, err
}
// apply the filters
for _, filter := range sopts.Filters {
services = filter(services)
}
// if there's nothing left, return
if len(services) == 0 {
return nil, selector.ErrNotFound
}
var nodes []*registry.Node
for _, service := range services {
for _, node := range service.Nodes {
nodes = append(nodes, node)
}
}
if len(nodes) == 0 {
return nil, selector.ErrNotFound
}
return func() (*registry.Node, error) {
var viable []*registry.Node
r.RLock()
for _, node := range nodes {
if _, ok := r.bl[node.Id]; !ok {
viable = append(viable, node)
}
}
r.RUnlock()
if len(viable) == 0 {
return nil, selector.ErrNoneAvailable
}
return viable[rand.Int()%len(viable)], nil
}, nil
}
func (r *blackListSelector) Mark(service string, node *registry.Node, err error) {
r.Lock()
defer r.Unlock()
if err == nil {
delete(r.bl, node.Id)
return
}
r.bl[node.Id] = blackListNode{
age: time.Now().Add(time.Duration(r.ttl) * time.Second),
id: node.Id,
service: service,
}
return
}
func (r *blackListSelector) Reset(service string) {
r.Lock()
defer r.Unlock()
for k, v := range r.bl {
if v.service == service {
delete(r.bl, k)
}
}
return
}
func (r *blackListSelector) Close() error {
r.once.Do(func() {
close(r.exit)
})
return nil
}
func NewSelector(opts ...selector.Option) selector.Selector {
var sopts selector.Options
for _, opt := range opts {
opt(&sopts)
}
if sopts.Registry == nil {
sopts.Registry = registry.DefaultRegistry
}
var once sync.Once
bl := &blackListSelector{
once: once,
so: sopts,
ttl: 60,
bl: make(map[string]blackListNode),
exit: make(chan bool),
}
go bl.run()
return bl
}

View File

@ -0,0 +1,73 @@
package blacklist
import (
"errors"
"testing"
"time"
"github.com/micro/go-micro/registry/mock"
"github.com/micro/go-micro/selector"
)
func TestBlackListSelector(t *testing.T) {
counts := map[string]int{}
bl := &blackListSelector{
so: selector.Options{
Registry: mock.NewRegistry(),
},
ttl: 2,
bl: make(map[string]blackListNode),
exit: make(chan bool),
}
go bl.run()
defer bl.Close()
next, err := bl.Select("foo")
if err != nil {
t.Errorf("Unexpected error calling bl select: %v", err)
}
for i := 0; i < 100; i++ {
node, err := next()
if err != nil {
t.Errorf("Expected node err, got err: %v", err)
}
counts[node.Id]++
}
t.Logf("BlackList Counts %v", counts)
// test blacklisting
for i := 0; i < 4; i++ {
node, err := next()
if err != nil {
t.Errorf("Expected node err, got err: %v", err)
}
bl.Mark("foo", node, errors.New("blacklist"))
}
if node, err := next(); err != selector.ErrNoneAvailable {
t.Errorf("Expected none available err, got node %v err %v", node, err)
}
time.Sleep(time.Second * time.Duration(bl.ttl) * 2)
if _, err := next(); err != nil {
t.Errorf("Unexpected err %v", err)
}
// test resetting
for i := 0; i < 4; i++ {
node, err := next()
if err != nil {
t.Errorf("Unexpected err: %v", err)
}
bl.Mark("foo", node, errors.New("blacklist"))
}
if node, err := next(); err != selector.ErrNoneAvailable {
t.Errorf("Expected none available err, got node %v err %v", node, err)
}
bl.Reset("foo")
if _, err := next(); err != nil {
t.Errorf("Unexpected err %v", err)
}
}

34
selector/options.go Normal file
View File

@ -0,0 +1,34 @@
package selector
import (
"github.com/micro/go-micro/registry"
)
type Options struct {
Registry registry.Registry
}
type SelectOptions struct {
Filters []SelectFilter
}
// Option used to initialise the selector
type Option func(*Options)
// SelectOption used when making a select call
type SelectOption func(*SelectOptions)
// Registry sets the registry used by the selector
func Registry(r registry.Registry) Option {
return func(o *Options) {
o.Registry = r
}
}
// Filter adds a filter function to the list of filters
// used during the Select call.
func Filter(fn SelectFilter) SelectOption {
return func(o *SelectOptions) {
o.Filters = append(o.Filters, fn)
}
}

14
selector/random/random.go Normal file
View File

@ -0,0 +1,14 @@
package random
import (
"github.com/micro/go-micro/cmd"
"github.com/micro/go-micro/selector"
)
func init() {
cmd.Selectors["random"] = NewSelector
}
func NewSelector(opts ...selector.Option) selector.Selector {
return selector.NewSelector(opts...)
}

View File

@ -0,0 +1,89 @@
package selector
import (
"math/rand"
"time"
"github.com/micro/go-micro/registry"
)
type randomSelector struct {
so Options
}
func init() {
rand.Seed(time.Now().Unix())
}
func (r *randomSelector) Select(service string, opts ...SelectOption) (Next, error) {
var sopts SelectOptions
for _, opt := range opts {
opt(&sopts)
}
// get the service
services, err := r.so.Registry.GetService(service)
if err != nil {
return nil, err
}
// apply the filters
for _, filter := range sopts.Filters {
services = filter(services)
}
// if there's nothing left, return
if len(services) == 0 {
return nil, ErrNotFound
}
var nodes []*registry.Node
for _, service := range services {
for _, node := range service.Nodes {
nodes = append(nodes, node)
}
}
if len(nodes) == 0 {
return nil, ErrNotFound
}
return func() (*registry.Node, error) {
i := rand.Int()
j := i % len(services)
if len(services[j].Nodes) == 0 {
return nil, ErrNotFound
}
k := i % len(services[j].Nodes)
return services[j].Nodes[k], nil
}, nil
}
func (r *randomSelector) Mark(service string, node *registry.Node, err error) {
return
}
func (r *randomSelector) Reset(service string) {
return
}
func (r *randomSelector) Close() error {
return nil
}
func newRandomSelector(opts ...Option) Selector {
var sopts Options
for _, opt := range opts {
opt(&sopts)
}
if sopts.Registry == nil {
sopts.Registry = registry.DefaultRegistry
}
return &randomSelector{sopts}
}

View File

@ -0,0 +1,32 @@
package selector
import (
"testing"
"github.com/micro/go-micro/registry/mock"
)
func TestRandomSelector(t *testing.T) {
counts := map[string]int{}
bl := &randomSelector{
so: Options{
Registry: mock.NewRegistry(),
},
}
next, err := bl.Select("foo")
if err != nil {
t.Errorf("Unexpected error calling random select: %v", err)
}
for i := 0; i < 100; i++ {
node, err := next()
if err != nil {
t.Errorf("Expected node err, got err: %v", err)
}
counts[node.Id]++
}
t.Logf("Random Counts %v", counts)
}

View File

@ -0,0 +1,88 @@
package roundrobin
import (
"sync"
"github.com/micro/go-micro/cmd"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector"
)
type roundRobinSelector struct {
so selector.Options
}
func init() {
cmd.Selectors["roundrobin"] = NewSelector
}
func (r *roundRobinSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) {
var sopts selector.SelectOptions
for _, opt := range opts {
opt(&sopts)
}
// get the service
services, err := r.so.Registry.GetService(service)
if err != nil {
return nil, err
}
// apply the filters
for _, filter := range sopts.Filters {
services = filter(services)
}
// if there's nothing left, return
if len(services) == 0 {
return nil, selector.ErrNotFound
}
var nodes []*registry.Node
for _, service := range services {
for _, node := range service.Nodes {
nodes = append(nodes, node)
}
}
if len(nodes) == 0 {
return nil, selector.ErrNotFound
}
var i int
var mtx sync.Mutex
return func() (*registry.Node, error) {
mtx.Lock()
defer mtx.Unlock()
i++
return nodes[i%len(nodes)], nil
}, nil
}
func (r *roundRobinSelector) Mark(service string, node *registry.Node, err error) {
return
}
func (r *roundRobinSelector) Reset(service string) {
return
}
func (r *roundRobinSelector) Close() error {
return nil
}
func NewSelector(opts ...selector.Option) selector.Selector {
var sopts selector.Options
for _, opt := range opts {
opt(&sopts)
}
if sopts.Registry == nil {
sopts.Registry = registry.DefaultRegistry
}
return &roundRobinSelector{sopts}
}

View File

@ -0,0 +1,33 @@
package roundrobin
import (
"testing"
"github.com/micro/go-micro/registry/mock"
"github.com/micro/go-micro/selector"
)
func TestRoundRobinSelector(t *testing.T) {
counts := map[string]int{}
rr := &roundRobinSelector{
so: selector.Options{
Registry: mock.NewRegistry(),
},
}
next, err := rr.Select("foo")
if err != nil {
t.Errorf("Unexpected error calling rr select: %v", err)
}
for i := 0; i < 100; i++ {
node, err := next()
if err != nil {
t.Errorf("Expected node err, got err: %v", err)
}
counts[node.Id]++
}
t.Logf("Round Robin Counts %v", counts)
}

93
selector/selector.go Normal file
View File

@ -0,0 +1,93 @@
/*
The Selector package provides a way to algorithmically filter and return
nodes required by the client or any other system. Selector's implemented
by Micro build on the registry but it's of optional use. One could
provide a static Selector that has a fixed pool.
func (r *randomSelector) Select(service string, opts ...SelectOption) (Next, error) {
var sopts SelectOptions
for _, opt := range opts {
opt(&sopts)
}
// get the service
services, err := r.so.Registry.GetService(service)
if err != nil {
return nil, err
}
// apply the filters
for _, filter := range sopts.Filters {
services = filter(services)
}
// if there's nothing left, return
if len(services) == 0 {
return nil, ErrNotFound
}
var nodes []*registry.Node
for _, service := range services {
for _, node := range service.Nodes {
nodes = append(nodes, node)
}
}
if len(nodes) == 0 {
return nil, ErrNotFound
}
return func() (*registry.Node, error) {
i := rand.Int()
j := i % len(services)
if len(services[j].Nodes) == 0 {
return nil, ErrNotFound
}
k := i % len(services[j].Nodes)
return services[j].Nodes[k], nil
}, nil
}
*/
package selector
import (
"errors"
"github.com/micro/go-micro/registry"
)
// Selector builds on the registry as a mechanism to pick nodes
// and mark their status. This allows host pools and other things
// to be built using various algorithms.
type Selector interface {
// Select returns a function which should return the next node
Select(service string, opts ...SelectOption) (Next, error)
// Mark sets the success/error against a node
Mark(service string, node *registry.Node, err error)
// Reset returns state back to zero for a service
Reset(service string)
// Close renders the selector unusable
Close() error
}
// Next is a function that returns the next node
// based on the selector's algorithm
type Next func() (*registry.Node, error)
// SelectFilter is used to filter a service during the selection process
type SelectFilter func([]*registry.Service) []*registry.Service
var (
DefaultSelector = newRandomSelector()
ErrNotFound = errors.New("not found")
ErrNoneAvailable = errors.New("none available")
)
func NewSelector(opts ...Option) Selector {
return newRandomSelector(opts...)
}