Move to a selector package

This commit is contained in:
Asim 2015-12-09 19:23:16 +00:00
parent 91405e5993
commit eefb9c53d4
18 changed files with 304 additions and 188 deletions

View File

@ -4,6 +4,7 @@ import (
"github.com/micro/go-micro/broker"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector"
"github.com/micro/go-micro/transport"
)
@ -12,13 +13,13 @@ type options struct {
broker broker.Broker
codecs map[string]codec.NewCodec
registry registry.Registry
selector registry.Selector
selector selector.Selector
transport transport.Transport
wrappers []Wrapper
}
type callOptions struct {
selectOptions []registry.SelectOption
selectOptions []selector.SelectOption
}
type publishOptions struct{}
@ -59,7 +60,7 @@ func Transport(t transport.Transport) Option {
}
// Select is used to select a node to route a request to
func Selector(s registry.Selector) Option {
func Selector(s selector.Selector) Option {
return func(o *options) {
o.selector = s
}
@ -74,7 +75,7 @@ func Wrap(w Wrapper) Option {
// Call Options
func WithSelectOption(so registry.SelectOption) CallOption {
func WithSelectOption(so selector.SelectOption) CallOption {
return func(o *callOptions) {
o.selectOptions = append(o.selectOptions, so)
}

View File

@ -10,6 +10,7 @@ import (
c "github.com/micro/go-micro/context"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector"
"github.com/micro/go-micro/transport"
"golang.org/x/net/context"
@ -44,8 +45,8 @@ func newRpcClient(opt ...Option) Client {
}
if opts.selector == nil {
opts.selector = registry.NewRandomSelector(
registry.SelectorRegistry(opts.registry),
opts.selector = selector.NewSelector(
selector.Registry(opts.registry),
)
}
@ -156,14 +157,14 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
}
next, err := r.opts.selector.Select(request.Service(), copts.selectOptions...)
if err != nil && err == registry.ErrNotFound {
if err != nil && err == selector.ErrNotFound {
return errors.NotFound("go.micro.client", err.Error())
} else if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
node, err := next()
if err != nil && err == registry.ErrNotFound {
if err != nil && err == selector.ErrNotFound {
return errors.NotFound("go.micro.client", err.Error())
} else if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
@ -190,14 +191,14 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, responseChan in
}
next, err := r.opts.selector.Select(request.Service(), copts.selectOptions...)
if err != nil && err == registry.ErrNotFound {
if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", err.Error())
} else if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
}
node, err := next()
if err != nil && err == registry.ErrNotFound {
if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", err.Error())
} else if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())

View File

@ -8,9 +8,11 @@ import (
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/cmd"
c "github.com/micro/go-micro/context"
example "github.com/micro/go-micro/examples/server/proto/example"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector"
"golang.org/x/net/context"
example "github.com/micro/go-micro/examples/server/proto/example"
)
func init() {
@ -38,7 +40,9 @@ func (dc *dcWrapper) Call(ctx context.Context, req client.Request, rsp interface
return services
}
callOptions := append(opts, client.WithSelectOption(registry.SelectFilter(filter)))
callOptions := append(opts, client.WithSelectOption(
selector.Filter(filter),
))
fmt.Printf("[DC Wrapper] filtering for datacenter %s\n", md["datacenter"])
return dc.Client.Call(ctx, req, rsp, callOptions...)

View File

@ -8,14 +8,16 @@ import (
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/cmd"
example "github.com/micro/go-micro/examples/server/proto/example"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector"
"golang.org/x/net/context"
example "github.com/micro/go-micro/examples/server/proto/example"
)
// Built in random hashed node selector
type dcSelector struct {
opts registry.SelectorOptions
opts selector.Options
}
var (
@ -26,14 +28,14 @@ func init() {
rand.Seed(time.Now().Unix())
}
func (n *dcSelector) Select(service string, opts ...registry.SelectOption) (registry.SelectNext, error) {
func (n *dcSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) {
services, err := n.opts.Registry.GetService(service)
if err != nil {
return nil, err
}
if len(services) == 0 {
return nil, registry.ErrNotFound
return nil, selector.ErrNotFound
}
var nodes []*registry.Node
@ -48,7 +50,7 @@ func (n *dcSelector) Select(service string, opts ...registry.SelectOption) (regi
}
if len(nodes) == 0 {
return nil, registry.ErrNotFound
return nil, selector.ErrNotFound
}
var i int
@ -75,8 +77,8 @@ func (n *dcSelector) Close() error {
}
// Return a new first node selector
func DCSelector(opts ...registry.SelectorOption) registry.Selector {
var sopts registry.SelectorOptions
func DCSelector(opts ...selector.Option) selector.Selector {
var sopts selector.Options
for _, opt := range opts {
opt(&sopts)
}

View File

@ -9,6 +9,7 @@ import (
"github.com/micro/go-micro/cmd"
example "github.com/micro/go-micro/examples/server/proto/example"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector"
"golang.org/x/net/context"
)
@ -18,20 +19,20 @@ func init() {
// Built in random hashed node selector
type firstNodeSelector struct {
opts registry.SelectorOptions
opts selector.Options
}
func (n *firstNodeSelector) Select(service string, opts ...registry.SelectOption) (registry.SelectNext, error) {
func (n *firstNodeSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) {
services, err := n.opts.Registry.GetService(service)
if err != nil {
return nil, err
}
if len(services) == 0 {
return nil, registry.ErrNotFound
return nil, selector.ErrNotFound
}
var sopts registry.SelectOptions
var sopts selector.SelectOptions
for _, opt := range opts {
opt(&sopts)
}
@ -41,11 +42,11 @@ func (n *firstNodeSelector) Select(service string, opts ...registry.SelectOption
}
if len(services) == 0 {
return nil, registry.ErrNotFound
return nil, selector.ErrNotFound
}
if len(services[0].Nodes) == 0 {
return nil, registry.ErrNotFound
return nil, selector.ErrNotFound
}
return func() (*registry.Node, error) {
@ -66,8 +67,8 @@ func (n *firstNodeSelector) Close() error {
}
// Return a new first node selector
func FirstNodeSelector(opts ...registry.SelectorOption) registry.Selector {
var sopts registry.SelectorOptions
func FirstNodeSelector(opts ...selector.Option) selector.Selector {
var sopts selector.Options
for _, opt := range opts {
opt(&sopts)
}

70
registry/mock/mock.go Normal file
View File

@ -0,0 +1,70 @@
package mock
import (
"github.com/micro/go-micro/registry"
)
type MockRegistry struct{}
func (m *MockRegistry) GetService(service string) ([]*registry.Service, error) {
return []*registry.Service{
{
Name: "foo",
Version: "1.0.0",
Nodes: []*registry.Node{
{
Id: "foo-1.0.0-123",
Address: "localhost",
Port: 9999,
},
{
Id: "foo-1.0.0-321",
Address: "localhost",
Port: 9999,
},
},
},
{
Name: "foo",
Version: "1.0.1",
Nodes: []*registry.Node{
{
Id: "foo-1.0.1-321",
Address: "localhost",
Port: 6666,
},
},
},
{
Name: "foo",
Version: "1.0.3",
Nodes: []*registry.Node{
{
Id: "foo-1.0.3-345",
Address: "localhost",
Port: 8888,
},
},
},
}, nil
}
func (m *MockRegistry) ListServices() ([]*registry.Service, error) {
return []*registry.Service{}, nil
}
func (m *MockRegistry) Register(s *registry.Service) error {
return nil
}
func (m *MockRegistry) Deregister(s *registry.Service) error {
return nil
}
func (m *MockRegistry) Watch() (registry.Watcher, error) {
return nil, nil
}
func NewRegistry() *MockRegistry {
return &MockRegistry{}
}

View File

@ -1,62 +0,0 @@
package registry
type mockRegistry struct{}
func (m *mockRegistry) GetService(service string) ([]*Service, error) {
return []*Service{
{
Name: "foo",
Version: "1.0.0",
Nodes: []*Node{
{
Id: "foo-1.0.0-123",
Address: "localhost",
Port: 9999,
},
{
Id: "foo-1.0.0-321",
Address: "localhost",
Port: 9999,
},
},
},
{
Name: "foo",
Version: "1.0.1",
Nodes: []*Node{
{
Id: "foo-1.0.1-321",
Address: "localhost",
Port: 6666,
},
},
},
{
Name: "foo",
Version: "1.0.3",
Nodes: []*Node{
{
Id: "foo-1.0.3-345",
Address: "localhost",
Port: 8888,
},
},
},
}, nil
}
func (m *mockRegistry) ListServices() ([]*Service, error) {
return []*Service{}, nil
}
func (m *mockRegistry) Register(s *Service) error {
return nil
}
func (m *mockRegistry) Deregister(s *Service) error {
return nil
}
func (m *mockRegistry) Watch() (Watcher, error) {
return nil, nil
}

View File

@ -1,9 +1,5 @@
package registry
import (
"errors"
)
type Registry interface {
Register(*Service) error
Deregister(*Service) error
@ -18,9 +14,6 @@ type Option func(*options)
var (
DefaultRegistry = newConsulRegistry([]string{})
ErrNotFound = errors.New("not found")
ErrNoneAvailable = errors.New("none available")
)
func NewRegistry(addrs []string, opt ...Option) Registry {

View File

@ -1,44 +0,0 @@
package registry
// Selector builds on the registry as a mechanism to pick nodes
// and mark their status. This allows host pools and other things
// to be built using various algorithms.
type Selector interface {
Select(service string, opts ...SelectOption) (SelectNext, error)
Mark(service string, node *Node, err error)
Reset(service string)
Close() error
}
// SelectNext is a function that returns the next node
// based on the selector's algorithm
type SelectNext func() (*Node, error)
type SelectorOptions struct {
Registry Registry
}
type SelectOptions struct {
Filters []func([]*Service) []*Service
}
// Option used to initialise the selector
type SelectorOption func(*SelectorOptions)
// Option used when making a select call
type SelectOption func(*SelectOptions)
// SelectorRegistry sets the registry used by the selector
func SelectorRegistry(r Registry) SelectorOption {
return func(o *SelectorOptions) {
o.Registry = r
}
}
// SelectFilter adds a filter function to the list of filters
// used during the Select call.
func SelectFilter(fn func([]*Service) []*Service) SelectOption {
return func(o *SelectOptions) {
o.Filters = append(o.Filters, fn)
}
}

View File

@ -1,9 +1,12 @@
package registry
package blacklist
import (
"math/rand"
"sync"
"time"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector"
)
type blackListNode struct {
@ -13,7 +16,7 @@ type blackListNode struct {
}
type blackListSelector struct {
so SelectorOptions
so selector.Options
ttl int64
exit chan bool
once sync.Once
@ -51,8 +54,8 @@ func (r *blackListSelector) run() {
}
}
func (r *blackListSelector) Select(service string, opts ...SelectOption) (SelectNext, error) {
var sopts SelectOptions
func (r *blackListSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) {
var sopts selector.SelectOptions
for _, opt := range opts {
opt(&sopts)
}
@ -70,10 +73,10 @@ func (r *blackListSelector) Select(service string, opts ...SelectOption) (Select
// if there's nothing left, return
if len(services) == 0 {
return nil, ErrNotFound
return nil, selector.ErrNotFound
}
var nodes []*Node
var nodes []*registry.Node
for _, service := range services {
for _, node := range service.Nodes {
@ -82,11 +85,11 @@ func (r *blackListSelector) Select(service string, opts ...SelectOption) (Select
}
if len(nodes) == 0 {
return nil, ErrNotFound
return nil, selector.ErrNotFound
}
return func() (*Node, error) {
var viable []*Node
return func() (*registry.Node, error) {
var viable []*registry.Node
r.RLock()
for _, node := range nodes {
@ -97,14 +100,14 @@ func (r *blackListSelector) Select(service string, opts ...SelectOption) (Select
r.RUnlock()
if len(viable) == 0 {
return nil, ErrNoneAvailable
return nil, selector.ErrNoneAvailable
}
return viable[rand.Int()%len(viable)], nil
}, nil
}
func (r *blackListSelector) Mark(service string, node *Node, err error) {
func (r *blackListSelector) Mark(service string, node *registry.Node, err error) {
r.Lock()
defer r.Unlock()
if err == nil {
@ -138,15 +141,15 @@ func (r *blackListSelector) Close() error {
return nil
}
func NewBlackListSelector(opts ...SelectorOption) Selector {
var sopts SelectorOptions
func NewSelector(opts ...selector.Option) selector.Selector {
var sopts selector.Options
for _, opt := range opts {
opt(&sopts)
}
if sopts.Registry == nil {
sopts.Registry = DefaultRegistry
sopts.Registry = registry.DefaultRegistry
}
var once sync.Once

View File

@ -1,17 +1,20 @@
package registry
package blacklist
import (
"errors"
"testing"
"time"
"github.com/micro/go-micro/registry/mock"
"github.com/micro/go-micro/selector"
)
func TestBlackListSelector(t *testing.T) {
counts := map[string]int{}
bl := &blackListSelector{
so: SelectorOptions{
Registry: &mockRegistry{},
so: selector.Options{
Registry: mock.NewRegistry(),
},
ttl: 2,
bl: make(map[string]blackListNode),
@ -44,7 +47,7 @@ func TestBlackListSelector(t *testing.T) {
}
bl.Mark("foo", node, errors.New("blacklist"))
}
if node, err := next(); err != ErrNoneAvailable {
if node, err := next(); err != selector.ErrNoneAvailable {
t.Errorf("Expected none available err, got node %v err %v", node, err)
}
time.Sleep(time.Second * time.Duration(bl.ttl) * 2)
@ -60,7 +63,7 @@ func TestBlackListSelector(t *testing.T) {
}
bl.Mark("foo", node, errors.New("blacklist"))
}
if node, err := next(); err != ErrNoneAvailable {
if node, err := next(); err != selector.ErrNoneAvailable {
t.Errorf("Expected none available err, got node %v err %v", node, err)
}
bl.Reset("foo")

34
selector/options.go Normal file
View File

@ -0,0 +1,34 @@
package selector
import (
"github.com/micro/go-micro/registry"
)
type Options struct {
Registry registry.Registry
}
type SelectOptions struct {
Filters []SelectFilter
}
// Option used to initialise the selector
type Option func(*Options)
// SelectOption used when making a select call
type SelectOption func(*SelectOptions)
// Registry sets the registry used by the selector
func Registry(r registry.Registry) Option {
return func(o *Options) {
o.Registry = r
}
}
// Filter adds a filter function to the list of filters
// used during the Select call.
func Filter(fn SelectFilter) SelectOption {
return func(o *SelectOptions) {
o.Filters = append(o.Filters, fn)
}
}

View File

@ -0,0 +1,7 @@
package random
import "github.com/micro/go-micro/selector"
func NewSelector(opts ...selector.Option) selector.Selector {
return selector.NewSelector(opts...)
}

View File

@ -1,19 +1,21 @@
package registry
package selector
import (
"math/rand"
"time"
"github.com/micro/go-micro/registry"
)
type randomSelector struct {
so SelectorOptions
so Options
}
func init() {
rand.Seed(time.Now().Unix())
}
func (r *randomSelector) Select(service string, opts ...SelectOption) (SelectNext, error) {
func (r *randomSelector) Select(service string, opts ...SelectOption) (Next, error) {
var sopts SelectOptions
for _, opt := range opts {
opt(&sopts)
@ -35,7 +37,7 @@ func (r *randomSelector) Select(service string, opts ...SelectOption) (SelectNex
return nil, ErrNotFound
}
var nodes []*Node
var nodes []*registry.Node
for _, service := range services {
for _, node := range service.Nodes {
@ -47,7 +49,7 @@ func (r *randomSelector) Select(service string, opts ...SelectOption) (SelectNex
return nil, ErrNotFound
}
return func() (*Node, error) {
return func() (*registry.Node, error) {
i := rand.Int()
j := i % len(services)
@ -60,7 +62,7 @@ func (r *randomSelector) Select(service string, opts ...SelectOption) (SelectNex
}, nil
}
func (r *randomSelector) Mark(service string, node *Node, err error) {
func (r *randomSelector) Mark(service string, node *registry.Node, err error) {
return
}
@ -72,15 +74,15 @@ func (r *randomSelector) Close() error {
return nil
}
func NewRandomSelector(opts ...SelectorOption) Selector {
var sopts SelectorOptions
func newRandomSelector(opts ...Option) Selector {
var sopts Options
for _, opt := range opts {
opt(&sopts)
}
if sopts.Registry == nil {
sopts.Registry = DefaultRegistry
sopts.Registry = registry.DefaultRegistry
}
return &randomSelector{sopts}

View File

@ -1,19 +1,21 @@
package registry
package selector
import (
"testing"
"github.com/micro/go-micro/registry/mock"
)
func TestRandomSelector(t *testing.T) {
counts := map[string]int{}
rr := &randomSelector{
so: SelectorOptions{
Registry: &mockRegistry{},
bl := &randomSelector{
so: Options{
Registry: mock.NewRegistry(),
},
}
next, err := rr.Select("foo")
next, err := bl.Select("foo")
if err != nil {
t.Errorf("Unexpected error calling random select: %v", err)
}

View File

@ -1,15 +1,18 @@
package registry
package roundrobin
import (
"sync"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector"
)
type roundRobinSelector struct {
so SelectorOptions
so selector.Options
}
func (r *roundRobinSelector) Select(service string, opts ...SelectOption) (SelectNext, error) {
var sopts SelectOptions
func (r *roundRobinSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) {
var sopts selector.SelectOptions
for _, opt := range opts {
opt(&sopts)
}
@ -27,10 +30,10 @@ func (r *roundRobinSelector) Select(service string, opts ...SelectOption) (Selec
// if there's nothing left, return
if len(services) == 0 {
return nil, ErrNotFound
return nil, selector.ErrNotFound
}
var nodes []*Node
var nodes []*registry.Node
for _, service := range services {
for _, node := range service.Nodes {
@ -39,13 +42,13 @@ func (r *roundRobinSelector) Select(service string, opts ...SelectOption) (Selec
}
if len(nodes) == 0 {
return nil, ErrNotFound
return nil, selector.ErrNotFound
}
var i int
var mtx sync.Mutex
return func() (*Node, error) {
return func() (*registry.Node, error) {
mtx.Lock()
defer mtx.Unlock()
i++
@ -53,7 +56,7 @@ func (r *roundRobinSelector) Select(service string, opts ...SelectOption) (Selec
}, nil
}
func (r *roundRobinSelector) Mark(service string, node *Node, err error) {
func (r *roundRobinSelector) Mark(service string, node *registry.Node, err error) {
return
}
@ -65,15 +68,15 @@ func (r *roundRobinSelector) Close() error {
return nil
}
func NewRoundRobinSelector(opts ...SelectorOption) Selector {
var sopts SelectorOptions
func NewRoundRobinSelector(opts ...selector.Option) selector.Selector {
var sopts selector.Options
for _, opt := range opts {
opt(&sopts)
}
if sopts.Registry == nil {
sopts.Registry = DefaultRegistry
sopts.Registry = registry.DefaultRegistry
}
return &roundRobinSelector{sopts}

View File

@ -1,15 +1,18 @@
package registry
package roundrobin
import (
"testing"
"github.com/micro/go-micro/registry/mock"
"github.com/micro/go-micro/selector"
)
func TestRoundRobinSelector(t *testing.T) {
counts := map[string]int{}
rr := &roundRobinSelector{
so: SelectorOptions{
Registry: &mockRegistry{},
so: selector.Options{
Registry: mock.NewRegistry(),
},
}

93
selector/selector.go Normal file
View File

@ -0,0 +1,93 @@
/*
The Selector package provides a way to algorithmically filter and return
nodes required by the client or any other system. Selector's implemented
by Micro build on the registry but it's of optional use. One could
provide a static Selector that has a fixed pool.
func (r *randomSelector) Select(service string, opts ...SelectOption) (Next, error) {
var sopts SelectOptions
for _, opt := range opts {
opt(&sopts)
}
// get the service
services, err := r.so.Registry.GetService(service)
if err != nil {
return nil, err
}
// apply the filters
for _, filter := range sopts.Filters {
services = filter(services)
}
// if there's nothing left, return
if len(services) == 0 {
return nil, ErrNotFound
}
var nodes []*registry.Node
for _, service := range services {
for _, node := range service.Nodes {
nodes = append(nodes, node)
}
}
if len(nodes) == 0 {
return nil, ErrNotFound
}
return func() (*registry.Node, error) {
i := rand.Int()
j := i % len(services)
if len(services[j].Nodes) == 0 {
return nil, ErrNotFound
}
k := i % len(services[j].Nodes)
return services[j].Nodes[k], nil
}, nil
}
*/
package selector
import (
"errors"
"github.com/micro/go-micro/registry"
)
// Selector builds on the registry as a mechanism to pick nodes
// and mark their status. This allows host pools and other things
// to be built using various algorithms.
type Selector interface {
// Select returns a function which should return the next node
Select(service string, opts ...SelectOption) (Next, error)
// Mark sets the success/error against a node
Mark(service string, node *registry.Node, err error)
// Reset returns state back to zero for a service
Reset(service string)
// Close renders the selector unusable
Close() error
}
// Next is a function that returns the next node
// based on the selector's algorithm
type Next func() (*registry.Node, error)
// SelectFilter is used to filter a service during the selection process
type SelectFilter func([]*registry.Service) []*registry.Service
var (
DefaultSelector = newRandomSelector()
ErrNotFound = errors.New("not found")
ErrNoneAvailable = errors.New("none available")
)
func NewSelector(opts ...Option) Selector {
return newRandomSelector(opts...)
}