Move selector to client/selector

This commit is contained in:
Asim Aslam
2019-06-21 15:13:54 +01:00
parent c350e19552
commit ca5acba0c6
28 changed files with 21 additions and 21 deletions

View File

@@ -16,7 +16,7 @@ import (
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/metadata"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector"
"github.com/micro/go-micro/client/selector"
"github.com/micro/go-micro/transport"
"google.golang.org/grpc"

View File

@@ -10,7 +10,7 @@ import (
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/registry/memory"
"github.com/micro/go-micro/selector"
"github.com/micro/go-micro/client/selector"
pgrpc "google.golang.org/grpc"
pb "google.golang.org/grpc/examples/helloworld/helloworld"
)

View File

@@ -7,7 +7,7 @@ import (
"github.com/micro/go-micro/broker"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector"
"github.com/micro/go-micro/client/selector"
"github.com/micro/go-micro/transport"
)

View File

@@ -17,7 +17,7 @@ import (
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/metadata"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector"
"github.com/micro/go-micro/client/selector"
"github.com/micro/go-micro/transport"
)

View File

@@ -8,7 +8,7 @@ import (
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/registry/memory"
"github.com/micro/go-micro/selector"
"github.com/micro/go-micro/client/selector"
)
func newTestRegistry() registry.Registry {

View File

@@ -0,0 +1,51 @@
package selector
import (
"github.com/micro/go-micro/registry"
)
var (
// mock data
testData = map[string][]*registry.Service{
"foo": []*registry.Service{
{
Name: "foo",
Version: "1.0.0",
Nodes: []*registry.Node{
{
Id: "foo-1.0.0-123",
Address: "localhost",
Port: 9999,
},
{
Id: "foo-1.0.0-321",
Address: "localhost",
Port: 9999,
},
},
},
{
Name: "foo",
Version: "1.0.1",
Nodes: []*registry.Node{
{
Id: "foo-1.0.1-321",
Address: "localhost",
Port: 6666,
},
},
},
{
Name: "foo",
Version: "1.0.3",
Nodes: []*registry.Node{
{
Id: "foo-1.0.3-345",
Address: "localhost",
Port: 8888,
},
},
},
},
}
)

106
client/selector/default.go Normal file
View File

@@ -0,0 +1,106 @@
package selector
import (
"time"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/registry/cache"
)
type registrySelector struct {
so Options
rc cache.Cache
}
func (c *registrySelector) newCache() cache.Cache {
ropts := []cache.Option{}
if c.so.Context != nil {
if t, ok := c.so.Context.Value("selector_ttl").(time.Duration); ok {
ropts = append(ropts, cache.WithTTL(t))
}
}
return cache.New(c.so.Registry, ropts...)
}
func (c *registrySelector) Init(opts ...Option) error {
for _, o := range opts {
o(&c.so)
}
c.rc.Stop()
c.rc = c.newCache()
return nil
}
func (c *registrySelector) Options() Options {
return c.so
}
func (c *registrySelector) Select(service string, opts ...SelectOption) (Next, error) {
sopts := SelectOptions{
Strategy: c.so.Strategy,
}
for _, opt := range opts {
opt(&sopts)
}
// get the service
// try the cache first
// if that fails go directly to the registry
services, err := c.rc.GetService(service)
if err != nil {
return nil, err
}
// apply the filters
for _, filter := range sopts.Filters {
services = filter(services)
}
// if there's nothing left, return
if len(services) == 0 {
return nil, ErrNoneAvailable
}
return sopts.Strategy(services), nil
}
func (c *registrySelector) Mark(service string, node *registry.Node, err error) {
}
func (c *registrySelector) Reset(service string) {
}
// Close stops the watcher and destroys the cache
func (c *registrySelector) Close() error {
c.rc.Stop()
return nil
}
func (c *registrySelector) String() string {
return "registry"
}
func NewSelector(opts ...Option) Selector {
sopts := Options{
Strategy: Random,
}
for _, opt := range opts {
opt(&sopts)
}
if sopts.Registry == nil {
sopts.Registry = registry.DefaultRegistry
}
s := &registrySelector{
so: sopts,
}
s.rc = s.newCache()
return s
}

View File

@@ -0,0 +1,31 @@
package selector
import (
"testing"
"github.com/micro/go-micro/registry/memory"
)
func TestRegistrySelector(t *testing.T) {
counts := map[string]int{}
r := memory.NewRegistry()
rg := r.(*memory.Registry)
rg.Services = testData
cache := NewSelector(Registry(r))
next, err := cache.Select("foo")
if err != nil {
t.Errorf("Unexpected error calling cache select: %v", err)
}
for i := 0; i < 100; i++ {
node, err := next()
if err != nil {
t.Errorf("Expected node err, got err: %v", err)
}
counts[node.Id]++
}
t.Logf("Selector Counts %v", counts)
}

128
client/selector/dns/dns.go Normal file
View File

@@ -0,0 +1,128 @@
// Package dns provides a dns SRV selector
package dns
import (
"net"
"strconv"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/client/selector"
)
type dnsSelector struct {
options selector.Options
domain string
}
var (
DefaultDomain = "local"
)
func (d *dnsSelector) Init(opts ...selector.Option) error {
for _, o := range opts {
o(&d.options)
}
return nil
}
func (d *dnsSelector) Options() selector.Options {
return d.options
}
func (d *dnsSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) {
var srv []*net.SRV
// check if its host:port
host, port, err := net.SplitHostPort(service)
// not host:port
if err != nil {
// lookup the SRV record
_, srvs, err := net.LookupSRV(service, "tcp", d.domain)
if err != nil {
return nil, err
}
// set SRV records
srv = srvs
// got host:port
} else {
p, _ := strconv.Atoi(port)
// lookup the A record
ips, err := net.LookupHost(host)
if err != nil {
return nil, err
}
// create SRV records
for _, ip := range ips {
srv = append(srv, &net.SRV{
Target: ip,
Port: uint16(p),
})
}
}
var nodes []*registry.Node
for _, node := range srv {
nodes = append(nodes, &registry.Node{
Id: node.Target,
Address: node.Target,
Port: int(node.Port),
})
}
services := []*registry.Service{
&registry.Service{
Name: service,
Nodes: nodes,
},
}
sopts := selector.SelectOptions{
Strategy: d.options.Strategy,
}
for _, opt := range opts {
opt(&sopts)
}
// apply the filters
for _, filter := range sopts.Filters {
services = filter(services)
}
// if there's nothing left, return
if len(services) == 0 {
return nil, selector.ErrNoneAvailable
}
return sopts.Strategy(services), nil
}
func (d *dnsSelector) Mark(service string, node *registry.Node, err error) {
return
}
func (d *dnsSelector) Reset(service string) {
return
}
func (d *dnsSelector) Close() error {
return nil
}
func (d *dnsSelector) String() string {
return "dns"
}
func NewSelector(opts ...selector.Option) selector.Selector {
options := selector.Options{
Strategy: selector.Random,
}
for _, o := range opts {
o(&options)
}
return &dnsSelector{options: options, domain: DefaultDomain}
}

73
client/selector/filter.go Normal file
View File

@@ -0,0 +1,73 @@
package selector
import (
"github.com/micro/go-micro/registry"
)
// FilterEndpoint is an endpoint based Select Filter which will
// only return services with the endpoint specified.
func FilterEndpoint(name string) Filter {
return func(old []*registry.Service) []*registry.Service {
var services []*registry.Service
for _, service := range old {
for _, ep := range service.Endpoints {
if ep.Name == name {
services = append(services, service)
break
}
}
}
return services
}
}
// FilterLabel is a label based Select Filter which will
// only return services with the label specified.
func FilterLabel(key, val string) Filter {
return func(old []*registry.Service) []*registry.Service {
var services []*registry.Service
for _, service := range old {
serv := new(registry.Service)
var nodes []*registry.Node
for _, node := range service.Nodes {
if node.Metadata == nil {
continue
}
if node.Metadata[key] == val {
nodes = append(nodes, node)
}
}
// only add service if there's some nodes
if len(nodes) > 0 {
// copy
*serv = *service
serv.Nodes = nodes
services = append(services, serv)
}
}
return services
}
}
// FilterVersion is a version based Select Filter which will
// only return services with the version specified.
func FilterVersion(version string) Filter {
return func(old []*registry.Service) []*registry.Service {
var services []*registry.Service
for _, service := range old {
if service.Version == version {
services = append(services, service)
}
}
return services
}
}

View File

@@ -0,0 +1,239 @@
package selector
import (
"testing"
"github.com/micro/go-micro/registry"
)
func TestFilterEndpoint(t *testing.T) {
testData := []struct {
services []*registry.Service
endpoint string
count int
}{
{
services: []*registry.Service{
&registry.Service{
Name: "test",
Version: "1.0.0",
Endpoints: []*registry.Endpoint{
&registry.Endpoint{
Name: "Foo.Bar",
},
},
},
&registry.Service{
Name: "test",
Version: "1.1.0",
Endpoints: []*registry.Endpoint{
&registry.Endpoint{
Name: "Baz.Bar",
},
},
},
},
endpoint: "Foo.Bar",
count: 1,
},
{
services: []*registry.Service{
&registry.Service{
Name: "test",
Version: "1.0.0",
Endpoints: []*registry.Endpoint{
&registry.Endpoint{
Name: "Foo.Bar",
},
},
},
&registry.Service{
Name: "test",
Version: "1.1.0",
Endpoints: []*registry.Endpoint{
&registry.Endpoint{
Name: "Foo.Bar",
},
},
},
},
endpoint: "Bar.Baz",
count: 0,
},
}
for _, data := range testData {
filter := FilterEndpoint(data.endpoint)
services := filter(data.services)
if len(services) != data.count {
t.Fatalf("Expected %d services, got %d", data.count, len(services))
}
for _, service := range services {
var seen bool
for _, ep := range service.Endpoints {
if ep.Name == data.endpoint {
seen = true
break
}
}
if !seen && data.count > 0 {
t.Fatalf("Expected %d services but seen is %t; result %+v", data.count, seen, services)
}
}
}
}
func TestFilterLabel(t *testing.T) {
testData := []struct {
services []*registry.Service
label [2]string
count int
}{
{
services: []*registry.Service{
&registry.Service{
Name: "test",
Version: "1.0.0",
Nodes: []*registry.Node{
&registry.Node{
Id: "test-1",
Address: "localhost",
Metadata: map[string]string{
"foo": "bar",
},
},
},
},
&registry.Service{
Name: "test",
Version: "1.1.0",
Nodes: []*registry.Node{
&registry.Node{
Id: "test-2",
Address: "localhost",
Metadata: map[string]string{
"foo": "baz",
},
},
},
},
},
label: [2]string{"foo", "bar"},
count: 1,
},
{
services: []*registry.Service{
&registry.Service{
Name: "test",
Version: "1.0.0",
Nodes: []*registry.Node{
&registry.Node{
Id: "test-1",
Address: "localhost",
},
},
},
&registry.Service{
Name: "test",
Version: "1.1.0",
Nodes: []*registry.Node{
&registry.Node{
Id: "test-2",
Address: "localhost",
},
},
},
},
label: [2]string{"foo", "bar"},
count: 0,
},
}
for _, data := range testData {
filter := FilterLabel(data.label[0], data.label[1])
services := filter(data.services)
if len(services) != data.count {
t.Fatalf("Expected %d services, got %d", data.count, len(services))
}
for _, service := range services {
var seen bool
for _, node := range service.Nodes {
if node.Metadata[data.label[0]] != data.label[1] {
t.Fatalf("Expected %s=%s but got %s=%s for service %+v node %+v",
data.label[0], data.label[1], data.label[0], node.Metadata[data.label[0]], service, node)
}
seen = true
}
if !seen {
t.Fatalf("Expected node for %s=%s but saw none; results %+v", data.label[0], data.label[1], service)
}
}
}
}
func TestFilterVersion(t *testing.T) {
testData := []struct {
services []*registry.Service
version string
count int
}{
{
services: []*registry.Service{
&registry.Service{
Name: "test",
Version: "1.0.0",
},
&registry.Service{
Name: "test",
Version: "1.1.0",
},
},
version: "1.0.0",
count: 1,
},
{
services: []*registry.Service{
&registry.Service{
Name: "test",
Version: "1.0.0",
},
&registry.Service{
Name: "test",
Version: "1.1.0",
},
},
version: "2.0.0",
count: 0,
},
}
for _, data := range testData {
filter := FilterVersion(data.version)
services := filter(data.services)
if len(services) != data.count {
t.Fatalf("Expected %d services, got %d", data.count, len(services))
}
var seen bool
for _, service := range services {
if service.Version != data.version {
t.Fatalf("Expected version %s, got %s", data.version, service.Version)
}
seen = true
}
if !seen && data.count > 0 {
t.Fatalf("Expected %d services but seen is %t; result %+v", data.count, seen, services)
}
}
}

View File

@@ -0,0 +1,60 @@
package selector
import (
"context"
"github.com/micro/go-micro/registry"
)
type Options struct {
Registry registry.Registry
Strategy Strategy
// Other options for implementations of the interface
// can be stored in a context
Context context.Context
}
type SelectOptions struct {
Filters []Filter
Strategy Strategy
// Other options for implementations of the interface
// can be stored in a context
Context context.Context
}
// Option used to initialise the selector
type Option func(*Options)
// SelectOption used when making a select call
type SelectOption func(*SelectOptions)
// Registry sets the registry used by the selector
func Registry(r registry.Registry) Option {
return func(o *Options) {
o.Registry = r
}
}
// SetStrategy sets the default strategy for the selector
func SetStrategy(fn Strategy) Option {
return func(o *Options) {
o.Strategy = fn
}
}
// WithFilter adds a filter function to the list of filters
// used during the Select call.
func WithFilter(fn ...Filter) SelectOption {
return func(o *SelectOptions) {
o.Filters = append(o.Filters, fn...)
}
}
// Strategy sets the selector strategy
func WithStrategy(fn Strategy) SelectOption {
return func(o *SelectOptions) {
o.Strategy = fn
}
}

View File

@@ -0,0 +1,18 @@
package registry
import (
"context"
"time"
"github.com/micro/go-micro/client/selector"
)
// Set the registry cache ttl
func TTL(t time.Duration) selector.Option {
return func(o *selector.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, "selector_ttl", t)
}
}

View File

@@ -0,0 +1,11 @@
// Package registry uses the go-micro registry for selection
package registry
import (
"github.com/micro/go-micro/client/selector"
)
// NewSelector returns a new registry selector
func NewSelector(opts ...selector.Option) selector.Selector {
return selector.NewSelector(opts...)
}

View File

@@ -0,0 +1,42 @@
// Package selector is a way to pick a list of service nodes
package selector
import (
"errors"
"github.com/micro/go-micro/registry"
)
// Selector builds on the registry as a mechanism to pick nodes
// and mark their status. This allows host pools and other things
// to be built using various algorithms.
type Selector interface {
Init(opts ...Option) error
Options() Options
// Select returns a function which should return the next node
Select(service string, opts ...SelectOption) (Next, error)
// Mark sets the success/error against a node
Mark(service string, node *registry.Node, err error)
// Reset returns state back to zero for a service
Reset(service string)
// Close renders the selector unusable
Close() error
// Name of the selector
String() string
}
// Next is a function that returns the next node
// based on the selector's strategy
type Next func() (*registry.Node, error)
// Filter is used to filter a service during the selection process
type Filter func([]*registry.Service) []*registry.Service
// Strategy is a selection strategy e.g random, round robin
type Strategy func([]*registry.Service) Next
var (
DefaultSelector = NewSelector()
ErrNotFound = errors.New("not found")
ErrNoneAvailable = errors.New("none available")
)

View File

@@ -0,0 +1,71 @@
// Package static provides a static resolver which returns the name/ip passed in without any change
package static
import (
"net"
"strconv"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/client/selector"
)
// staticSelector is a static selector
type staticSelector struct {
opts selector.Options
}
func (s *staticSelector) Init(opts ...selector.Option) error {
for _, o := range opts {
o(&s.opts)
}
return nil
}
func (s *staticSelector) Options() selector.Options {
return s.opts
}
func (s *staticSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) {
var port int
addr, pt, err := net.SplitHostPort(service)
if err != nil {
addr = service
port = 0
} else {
port, _ = strconv.Atoi(pt)
}
return func() (*registry.Node, error) {
return &registry.Node{
Id: service,
Address: addr,
Port: port,
}, nil
}, nil
}
func (s *staticSelector) Mark(service string, node *registry.Node, err error) {
return
}
func (s *staticSelector) Reset(service string) {
return
}
func (s *staticSelector) Close() error {
return nil
}
func (s *staticSelector) String() string {
return "static"
}
func NewSelector(opts ...selector.Option) selector.Selector {
var options selector.Options
for _, o := range opts {
o(&options)
}
return &staticSelector{
opts: options,
}
}

View File

@@ -0,0 +1,56 @@
package selector
import (
"math/rand"
"sync"
"time"
"github.com/micro/go-micro/registry"
)
func init() {
rand.Seed(time.Now().UnixNano())
}
// Random is a random strategy algorithm for node selection
func Random(services []*registry.Service) Next {
var nodes []*registry.Node
for _, service := range services {
nodes = append(nodes, service.Nodes...)
}
return func() (*registry.Node, error) {
if len(nodes) == 0 {
return nil, ErrNoneAvailable
}
i := rand.Int() % len(nodes)
return nodes[i], nil
}
}
// RoundRobin is a roundrobin strategy algorithm for node selection
func RoundRobin(services []*registry.Service) Next {
var nodes []*registry.Node
for _, service := range services {
nodes = append(nodes, service.Nodes...)
}
var i = rand.Int()
var mtx sync.Mutex
return func() (*registry.Node, error) {
if len(nodes) == 0 {
return nil, ErrNoneAvailable
}
mtx.Lock()
node := nodes[i%len(nodes)]
i++
mtx.Unlock()
return node, nil
}
}

View File

@@ -0,0 +1,59 @@
package selector
import (
"testing"
"github.com/micro/go-micro/registry"
)
func TestStrategies(t *testing.T) {
testData := []*registry.Service{
&registry.Service{
Name: "test1",
Version: "latest",
Nodes: []*registry.Node{
&registry.Node{
Id: "test1-1",
Address: "10.0.0.1",
Port: 1001,
},
&registry.Node{
Id: "test1-2",
Address: "10.0.0.2",
Port: 1002,
},
},
},
&registry.Service{
Name: "test1",
Version: "default",
Nodes: []*registry.Node{
&registry.Node{
Id: "test1-3",
Address: "10.0.0.3",
Port: 1003,
},
&registry.Node{
Id: "test1-4",
Address: "10.0.0.4",
Port: 1004,
},
},
},
}
for name, strategy := range map[string]Strategy{"random": Random, "roundrobin": RoundRobin} {
next := strategy(testData)
counts := make(map[string]int)
for i := 0; i < 100; i++ {
node, err := next()
if err != nil {
t.Fatal(err)
}
counts[node.Id]++
}
t.Logf("%s: %+v\n", name, counts)
}
}