Deprecate client/selector (#1767)

* client/{grpc,rpc}: depricate selector (wip)

* {client,cmd}: remove client/selector

* deprecate client/selector

* router/static: fix lookup

* config/cmd: add support for legacy static selector flag

* config/cmd: add support for legacy dns selector flag
This commit is contained in:
ben-toogood
2020-07-01 17:06:59 +01:00
committed by GitHub
parent a63480a81a
commit 174e44b846
46 changed files with 428 additions and 1572 deletions

View File

@@ -22,11 +22,6 @@ type Client interface {
String() string
}
// Router manages request routing
type Router interface {
SendRequest(context.Context, Request) (Response, error)
}
// Message is the interface for publishing asynchronously
type Message interface {
Topic() string

View File

@@ -5,6 +5,7 @@ import (
"context"
"crypto/tls"
"fmt"
"math/rand"
"net"
"reflect"
"strings"
@@ -13,11 +14,11 @@ import (
"github.com/micro/go-micro/v2/broker"
"github.com/micro/go-micro/v2/client"
"github.com/micro/go-micro/v2/client/selector"
raw "github.com/micro/go-micro/v2/codec/bytes"
"github.com/micro/go-micro/v2/errors"
"github.com/micro/go-micro/v2/metadata"
"github.com/micro/go-micro/v2/registry"
"github.com/micro/go-micro/v2/router"
"github.com/micro/go-micro/v2/selector"
pnet "github.com/micro/go-micro/v2/util/net"
"google.golang.org/grpc"
@@ -72,41 +73,59 @@ func (g *grpcClient) secure(addr string) grpc.DialOption {
return grpc.WithInsecure()
}
func (g *grpcClient) next(request client.Request, opts client.CallOptions) (selector.Next, error) {
service, address, _ := pnet.Proxy(request.Service(), opts.Address)
// return remote address
if len(address) > 0 {
return func() (*registry.Node, error) {
return &registry.Node{
Address: address[0],
}, nil
// lookupRoute for a request using the router and then choose one using the selector
func (g *grpcClient) lookupRoute(req client.Request, opts client.CallOptions) (*router.Route, error) {
// check to see if the proxy has been set, if it has we don't need to lookup the routes; net.Proxy
// returns a slice of addresses, so we'll use a random one. Eventually we should to use the
// selector for this.
service, addresses, _ := pnet.Proxy(req.Service(), opts.Address)
if len(addresses) > 0 {
return &router.Route{
Service: service,
Address: addresses[rand.Int()%len(addresses)],
}, nil
}
// if the network was set, pass it to the selector
sopts := opts.SelectOptions
// construct the router query
query := []router.QueryOption{router.QueryService(req.Service())}
// if a custom network was requested, pass this to the router. By default the router will use it's
// own network, which is set during initialisation.
if len(opts.Network) > 0 {
sopts = append(sopts, selector.WithDomain(opts.Network))
query = append(query, router.QueryNetwork(opts.Network))
}
// get next nodes from the selector
next, err := g.opts.Selector.Select(service, sopts...)
if err != nil {
if err == selector.ErrNotFound {
return nil, errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error())
}
return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %s", service, err.Error())
// use the router passed as a call option, or fallback to the grpc clients router
if opts.Router == nil {
opts.Router = g.opts.Router
}
return next, nil
// lookup the routes which can be used to execute the request
routes, err := opts.Router.Lookup(query...)
if err == router.ErrRouteNotFound {
return nil, errors.InternalServerError("go.micro.client", "service %s: %s", req.Service(), err.Error())
} else if err != nil {
return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %s", req.Service(), err.Error())
}
// use the selector passed as a call option, or fallback to the grpc clients selector
if opts.Selector == nil {
opts.Selector = g.opts.Selector
}
// select the route to use for the request
if route, err := opts.Selector.Select(routes); err == selector.ErrNoneAvailable {
return nil, errors.InternalServerError("go.micro.client", "service %s: %s", req.Service(), err.Error())
} else if err != nil {
return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %s", req.Service(), err.Error())
} else {
return route, nil
}
}
func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.Request, rsp interface{}, opts client.CallOptions) error {
func (g *grpcClient) call(ctx context.Context, route *router.Route, req client.Request, rsp interface{}, opts client.CallOptions) error {
var header map[string]string
address := node.Address
header = make(map[string]string)
if md, ok := metadata.FromContext(ctx); ok {
header = make(map[string]string, len(md))
@@ -137,7 +156,7 @@ func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.R
grpcDialOptions := []grpc.DialOption{
grpc.WithTimeout(opts.DialTimeout),
g.secure(address),
g.secure(route.Address),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(maxRecvMsgSize),
grpc.MaxCallSendMsgSize(maxSendMsgSize),
@@ -148,13 +167,13 @@ func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.R
grpcDialOptions = append(grpcDialOptions, opts...)
}
cc, err := g.pool.getConn(address, grpcDialOptions...)
cc, err := g.pool.getConn(route.Address, grpcDialOptions...)
if err != nil {
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
}
defer func() {
// defer execution of release
g.pool.release(address, cc, grr)
g.pool.release(route.Address, cc, grr)
}()
ch := make(chan error, 1)
@@ -180,11 +199,9 @@ func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.R
return grr
}
func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client.Request, rsp interface{}, opts client.CallOptions) error {
func (g *grpcClient) stream(ctx context.Context, route *router.Route, req client.Request, rsp interface{}, opts client.CallOptions) error {
var header map[string]string
address := node.Address
if md, ok := metadata.FromContext(ctx); ok {
header = make(map[string]string, len(md))
for k, v := range md {
@@ -222,14 +239,14 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client
grpcDialOptions := []grpc.DialOption{
grpc.WithTimeout(opts.DialTimeout),
g.secure(address),
g.secure(route.Address),
}
if opts := g.getGrpcDialOptions(); opts != nil {
grpcDialOptions = append(grpcDialOptions, opts...)
}
cc, err := grpc.DialContext(dialCtx, address, grpcDialOptions...)
cc, err := grpc.DialContext(dialCtx, route.Address, grpcDialOptions...)
if err != nil {
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
}
@@ -396,11 +413,6 @@ func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface
opt(&callOpts)
}
next, err := g.next(req, callOpts)
if err != nil {
return err
}
// check if we already have a deadline
d, ok := ctx.Deadline()
if !ok {
@@ -443,19 +455,19 @@ func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface
time.Sleep(t)
}
// select next node
node, err := next()
service := req.Service()
// lookup the route to send the reques to
route, err := g.lookupRoute(req, callOpts)
if err != nil {
if err == selector.ErrNotFound {
return errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error())
}
return errors.InternalServerError("go.micro.client", "error selecting %s node: %s", service, err.Error())
return err
}
// make the call
err = gcall(ctx, node, req, rsp, callOpts)
g.opts.Selector.Mark(service, node, err)
err = gcall(ctx, route, req, rsp, callOpts)
// record the result of the call to inform future routing decisions
g.opts.Selector.Record(*route, err)
// try and transform the error to a go-micro error
if verr, ok := err.(*errors.Error); ok {
return verr
}
@@ -503,11 +515,6 @@ func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...cli
opt(&callOpts)
}
next, err := g.next(req, callOpts)
if err != nil {
return nil, err
}
// #200 - streams shouldn't have a request timeout set on the context
// should we noop right here?
@@ -537,20 +544,25 @@ func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...cli
time.Sleep(t)
}
node, err := next()
service := req.Service()
// lookup the route to send the reques to
route, err := g.lookupRoute(req, callOpts)
if err != nil {
if err == selector.ErrNotFound {
return nil, errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error())
}
return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %s", service, err.Error())
return nil, err
}
// make the call
stream := &grpcStream{}
err = g.stream(ctx, node, req, stream, callOpts)
err = g.stream(ctx, route, req, stream, callOpts)
g.opts.Selector.Mark(service, node, err)
// record the result of the call to inform future routing decisions
g.opts.Selector.Record(*route, err)
// try and transform the error to a go-micro error
if verr, ok := err.(*errors.Error); ok {
return nil, verr
}
g.opts.Selector.Record(*route, err)
return stream, err
}
@@ -577,7 +589,7 @@ func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...cli
return rsp.stream, nil
}
retry, rerr := callOpts.Retry(ctx, req, i, err)
retry, rerr := callOpts.Retry(ctx, req, i, grr)
if rerr != nil {
return nil, rerr
}

View File

@@ -6,10 +6,10 @@ import (
"testing"
"github.com/micro/go-micro/v2/client"
"github.com/micro/go-micro/v2/client/selector"
"github.com/micro/go-micro/v2/errors"
"github.com/micro/go-micro/v2/registry"
"github.com/micro/go-micro/v2/registry/memory"
"github.com/micro/go-micro/v2/router"
pgrpc "google.golang.org/grpc"
pb "google.golang.org/grpc/examples/helloworld/helloworld"
)
@@ -56,16 +56,11 @@ func TestGRPCClient(t *testing.T) {
},
})
// create selector
se := selector.NewSelector(
selector.Registry(r),
)
// create router
rtr := router.NewRouter(router.Registry(r))
// create client
c := NewClient(
client.Registry(r),
client.Selector(se),
)
c := NewClient(client.Router(rtr))
testMethods := []string{
"/helloworld.Greeter/SayHello",

View File

@@ -5,9 +5,10 @@ import (
"time"
"github.com/micro/go-micro/v2/broker"
"github.com/micro/go-micro/v2/client/selector"
"github.com/micro/go-micro/v2/codec"
"github.com/micro/go-micro/v2/registry"
"github.com/micro/go-micro/v2/router"
"github.com/micro/go-micro/v2/selector"
"github.com/micro/go-micro/v2/transport"
)
@@ -18,13 +19,10 @@ type Options struct {
// Plugged interfaces
Broker broker.Broker
Codecs map[string]codec.NewCodec
Registry registry.Registry
Router router.Router
Selector selector.Selector
Transport transport.Transport
// Router sets the router
Router Router
// Connection Pool
PoolSize int
PoolTTL time.Duration
@@ -44,26 +42,28 @@ type Options struct {
}
type CallOptions struct {
SelectOptions []selector.SelectOption
// Address of remote hosts
Address []string
// Backoff func
Backoff BackoffFunc
// Check if retriable func
Retry RetryFunc
// Duration to cache the response for
CacheExpiry time.Duration
// Transport Dial Timeout
DialTimeout time.Duration
// Number of Call attempts
Retries int
// Check if retriable func
Retry RetryFunc
// Request/Response timeout
RequestTimeout time.Duration
// Router to use for this call
Router router.Router
// Selector to use for the call
Selector selector.Selector
// Stream timeout for the stream
StreamTimeout time.Duration
// Use the services own auth token
ServiceToken bool
// Duration to cache the response for
CacheExpiry time.Duration
// Network to lookup the route within
Network string
@@ -112,8 +112,8 @@ func NewOptions(options ...Option) Options {
PoolSize: DefaultPoolSize,
PoolTTL: DefaultPoolTTL,
Broker: broker.DefaultBroker,
Router: router.DefaultRouter,
Selector: selector.DefaultSelector,
Registry: registry.DefaultRegistry,
Transport: transport.DefaultTransport,
}
@@ -159,15 +159,6 @@ func PoolTTL(d time.Duration) Option {
}
}
// Registry to find nodes for a given service
func Registry(r registry.Registry) Option {
return func(o *Options) {
o.Registry = r
// set in the selector
o.Selector.Init(selector.Registry(r))
}
}
// Transport to use for communication e.g http, rabbitmq, etc
func Transport(t transport.Transport) Option {
return func(o *Options) {
@@ -175,7 +166,14 @@ func Transport(t transport.Transport) Option {
}
}
// Select is used to select a node to route a request to
// Router is used to lookup routes for a service
func Router(r router.Router) Option {
return func(o *Options) {
o.Router = r
}
}
// Selector is used to select a route
func Selector(s selector.Selector) Option {
return func(o *Options) {
o.Selector = s
@@ -219,6 +217,13 @@ func Retry(fn RetryFunc) Option {
}
}
// Registry sets the routers registry
func Registry(r registry.Registry) Option {
return func(o *Options) {
o.Router.Init(router.Registry(r))
}
}
// The request timeout.
// Should this be a Call Option?
func RequestTimeout(d time.Duration) Option {
@@ -264,12 +269,6 @@ func WithAddress(a ...string) CallOption {
}
}
func WithSelectOption(so ...selector.SelectOption) CallOption {
return func(o *CallOptions) {
o.SelectOptions = append(o.SelectOptions, so...)
}
}
// WithCallWrapper is a CallOption which adds to the existing CallFunc wrappers
func WithCallWrapper(cw ...CallWrapper) CallOption {
return func(o *CallOptions) {
@@ -347,6 +346,20 @@ func WithNetwork(n string) CallOption {
}
}
// WithRouter sets the router to use for this call
func WithRouter(r router.Router) CallOption {
return func(o *CallOptions) {
o.Router = r
}
}
// WithSelector sets the selector to use for this call
func WithSelector(s selector.Selector) CallOption {
return func(o *CallOptions) {
o.Selector = s
}
}
func WithMessageContentType(ct string) MessageOption {
return func(o *MessageOptions) {
o.ContentType = ct
@@ -366,10 +379,3 @@ func StreamingRequest() RequestOption {
o.Stream = true
}
}
// WithRouter sets the client router
func WithRouter(r Router) Option {
return func(o *Options) {
o.Router = r
}
}

View File

@@ -3,17 +3,18 @@ package client
import (
"context"
"fmt"
"math/rand"
"sync/atomic"
"time"
"github.com/google/uuid"
"github.com/micro/go-micro/v2/broker"
"github.com/micro/go-micro/v2/client/selector"
"github.com/micro/go-micro/v2/codec"
raw "github.com/micro/go-micro/v2/codec/bytes"
"github.com/micro/go-micro/v2/errors"
"github.com/micro/go-micro/v2/metadata"
"github.com/micro/go-micro/v2/registry"
"github.com/micro/go-micro/v2/router"
"github.com/micro/go-micro/v2/selector"
"github.com/micro/go-micro/v2/transport"
"github.com/micro/go-micro/v2/util/buf"
"github.com/micro/go-micro/v2/util/net"
@@ -63,9 +64,56 @@ func (r *rpcClient) newCodec(contentType string) (codec.NewCodec, error) {
return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType)
}
func (r *rpcClient) call(ctx context.Context, node *registry.Node, req Request, resp interface{}, opts CallOptions) error {
address := node.Address
func (r *rpcClient) lookupRoute(req Request, opts CallOptions) (*router.Route, error) {
// check to see if the proxy has been set, if it has we don't need to lookup the routes; net.Proxy
// returns a slice of addresses, so we'll use a random one. Eventually we should to use the
// selector for this.
service, addresses, _ := net.Proxy(req.Service(), opts.Address)
if len(addresses) > 0 {
return &router.Route{
Service: service,
Address: addresses[rand.Int()%len(addresses)],
}, nil
}
// construct the router query
query := []router.QueryOption{router.QueryService(req.Service())}
// if a custom network was requested, pass this to the router. By default the router will use it's
// own network, which is set during initialisation.
if len(opts.Network) > 0 {
query = append(query, router.QueryNetwork(opts.Network))
}
// use the router passed as a call option, or fallback to the rpc clients router
if opts.Router == nil {
opts.Router = r.opts.Router
}
// lookup the routes which can be used to execute the request
routes, err := opts.Router.Lookup(query...)
if err == router.ErrRouteNotFound {
return nil, errors.InternalServerError("go.micro.client", "service %s: %s", req.Service(), err.Error())
} else if err != nil {
return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %s", req.Service(), err.Error())
}
// use the selector passed as a call option, or fallback to the rpc clients selector
if opts.Selector == nil {
opts.Selector = r.opts.Selector
}
// select the route to use for the request
if route, err := opts.Selector.Select(routes); err == selector.ErrNoneAvailable {
return nil, errors.InternalServerError("go.micro.client", "service %s: %s", req.Service(), err.Error())
} else if err != nil {
return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %s", req.Service(), err.Error())
} else {
return route, nil
}
}
func (r *rpcClient) call(ctx context.Context, route *router.Route, req Request, resp interface{}, opts CallOptions) error {
msg := &transport.Message{
Header: make(map[string]string),
}
@@ -90,7 +138,7 @@ func (r *rpcClient) call(ctx context.Context, node *registry.Node, req Request,
msg.Header["Accept"] = req.ContentType()
// setup old protocol
cf := setupProtocol(msg, node)
cf := setupProtocol(msg, route)
// no codec specified
if cf == nil {
@@ -109,7 +157,7 @@ func (r *rpcClient) call(ctx context.Context, node *registry.Node, req Request,
dOpts = append(dOpts, transport.WithTimeout(opts.DialTimeout))
}
c, err := r.pool.Get(address, dOpts...)
c, err := r.pool.Get(route.Address, dOpts...)
if err != nil {
return errors.InternalServerError("go.micro.client", "connection error: %v", err)
}
@@ -182,9 +230,7 @@ func (r *rpcClient) call(ctx context.Context, node *registry.Node, req Request,
return nil
}
func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request, opts CallOptions) (Stream, error) {
address := node.Address
func (r *rpcClient) stream(ctx context.Context, route *router.Route, req Request, opts CallOptions) (Stream, error) {
msg := &transport.Message{
Header: make(map[string]string),
}
@@ -206,7 +252,7 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request
msg.Header["Accept"] = req.ContentType()
// set old codecs
cf := setupProtocol(msg, node)
cf := setupProtocol(msg, route)
// no codec specified
if cf == nil {
@@ -225,7 +271,7 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request
dOpts = append(dOpts, transport.WithTimeout(opts.DialTimeout))
}
c, err := r.opts.Transport.Dial(address, dOpts...)
c, err := r.opts.Transport.Dial(route.Address, dOpts...)
if err != nil {
return nil, errors.InternalServerError("go.micro.client", "connection error: %v", err)
}
@@ -320,43 +366,6 @@ func (r *rpcClient) Options() Options {
return r.opts
}
// next returns an iterator for the next nodes to call
func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, error) {
// try get the proxy
service, address, _ := net.Proxy(request.Service(), opts.Address)
// return remote address
if len(address) > 0 {
nodes := make([]*registry.Node, len(address))
for i, addr := range address {
nodes[i] = &registry.Node{
Address: addr,
// Set the protocol
Metadata: map[string]string{
"protocol": "mucp",
},
}
}
// crude return method
return func() (*registry.Node, error) {
return nodes[time.Now().Unix()%int64(len(nodes))], nil
}, nil
}
// get next nodes from the selector
next, err := r.opts.Selector.Select(service, opts.SelectOptions...)
if err != nil {
if err == selector.ErrNotFound {
return nil, errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error())
}
return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %s", service, err.Error())
}
return next, nil
}
func (r *rpcClient) Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error {
// make a copy of call opts
callOpts := r.opts.CallOptions
@@ -364,14 +373,8 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
opt(&callOpts)
}
next, err := r.next(request, callOpts)
if err != nil {
return err
}
// check if we already have a deadline
d, ok := ctx.Deadline()
if !ok {
if d, ok := ctx.Deadline(); !ok {
// no deadline so we create a new one
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, callOpts.RequestTimeout)
@@ -379,8 +382,8 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
} else {
// got a deadline so no need to setup context
// but we need to set the timeout we pass along
opt := WithRequestTimeout(d.Sub(time.Now()))
opt(&callOpts)
remaining := d.Sub(time.Now())
WithRequestTimeout(remaining)(&callOpts)
}
// should we noop right here?
@@ -411,19 +414,18 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
time.Sleep(t)
}
// select next node
node, err := next()
service := request.Service()
// lookup the route to send the request via
route, err := r.lookupRoute(request, callOpts)
if err != nil {
if err == selector.ErrNotFound {
return errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error())
}
return errors.InternalServerError("go.micro.client", "error getting next %s node: %s", service, err.Error())
return err
}
// make the call
err = rcall(ctx, node, request, response, callOpts)
r.opts.Selector.Mark(service, node, err)
err = rcall(ctx, route, request, response, callOpts)
// record the result of the call to inform future routing decisions
r.opts.Selector.Record(*route, err)
return err
}
@@ -475,11 +477,6 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
opt(&callOpts)
}
next, err := r.next(request, callOpts)
if err != nil {
return nil, err
}
// should we noop right here?
select {
case <-ctx.Done():
@@ -499,17 +496,18 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
time.Sleep(t)
}
node, err := next()
service := request.Service()
// lookup the route to send the request via
route, err := r.lookupRoute(request, callOpts)
if err != nil {
if err == selector.ErrNotFound {
return nil, errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error())
}
return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %s", service, err.Error())
return nil, err
}
stream, err := r.stream(ctx, node, request, callOpts)
r.opts.Selector.Mark(service, node, err)
// perform the call
stream, err := r.stream(ctx, route, request, callOpts)
// record the result of the call to inform future routing decisions
r.opts.Selector.Record(*route, err)
return stream, err
}

View File

@@ -5,10 +5,10 @@ import (
"fmt"
"testing"
"github.com/micro/go-micro/v2/client/selector"
"github.com/micro/go-micro/v2/errors"
"github.com/micro/go-micro/v2/registry"
"github.com/micro/go-micro/v2/registry/memory"
"github.com/micro/go-micro/v2/router"
)
func newTestRegistry() registry.Registry {
@@ -22,7 +22,7 @@ func TestCallAddress(t *testing.T) {
address := "10.1.10.1:8080"
wrap := func(cf CallFunc) CallFunc {
return func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error {
return func(ctx context.Context, route *router.Route, req Request, rsp interface{}, opts CallOptions) error {
called = true
if req.Service() != service {
@@ -33,8 +33,8 @@ func TestCallAddress(t *testing.T) {
return fmt.Errorf("expected service: %s got %s", endpoint, req.Endpoint())
}
if node.Address != address {
return fmt.Errorf("expected address: %s got %s", address, node.Address)
if route.Address != address {
return fmt.Errorf("expected address: %s got %s", address, route.Address)
}
// don't do the call
@@ -47,7 +47,6 @@ func TestCallAddress(t *testing.T) {
Registry(r),
WrapCall(wrap),
)
c.Options().Selector.Init(selector.Registry(r))
req := c.NewRequest(service, endpoint, nil)
@@ -70,7 +69,7 @@ func TestCallRetry(t *testing.T) {
var called int
wrap := func(cf CallFunc) CallFunc {
return func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error {
return func(ctx context.Context, route *router.Route, req Request, rsp interface{}, opts CallOptions) error {
called++
if called == 1 {
return errors.InternalServerError("test.error", "retry request")
@@ -86,7 +85,6 @@ func TestCallRetry(t *testing.T) {
Registry(r),
WrapCall(wrap),
)
c.Options().Selector.Init(selector.Registry(r))
req := c.NewRequest(service, endpoint, nil)
@@ -109,7 +107,7 @@ func TestCallWrapper(t *testing.T) {
address := "10.1.10.1:8080"
wrap := func(cf CallFunc) CallFunc {
return func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error {
return func(ctx context.Context, route *router.Route, req Request, rsp interface{}, opts CallOptions) error {
called = true
if req.Service() != service {
@@ -120,8 +118,8 @@ func TestCallWrapper(t *testing.T) {
return fmt.Errorf("expected service: %s got %s", endpoint, req.Endpoint())
}
if node.Address != address {
return fmt.Errorf("expected address: %s got %s", address, node.Address)
if route.Address != address {
return fmt.Errorf("expected address: %s got %s", address, route.Address)
}
// don't do the call
@@ -134,7 +132,6 @@ func TestCallWrapper(t *testing.T) {
Registry(r),
WrapCall(wrap),
)
c.Options().Selector.Init(selector.Registry(r))
r.Register(&registry.Service{
Name: service,

View File

@@ -12,7 +12,7 @@ import (
"github.com/micro/go-micro/v2/codec/proto"
"github.com/micro/go-micro/v2/codec/protorpc"
"github.com/micro/go-micro/v2/errors"
"github.com/micro/go-micro/v2/registry"
"github.com/micro/go-micro/v2/router"
"github.com/micro/go-micro/v2/transport"
)
@@ -128,11 +128,9 @@ func setHeaders(m *codec.Message, stream string) {
}
// setupProtocol sets up the old protocol
func setupProtocol(msg *transport.Message, node *registry.Node) codec.NewCodec {
protocol := node.Metadata["protocol"]
// got protocol
if len(protocol) > 0 {
func setupProtocol(msg *transport.Message, route *router.Route) codec.NewCodec {
// get the protocol from route metadata
if protocol := route.Metadata["protocol"]; len(protocol) > 0 {
return nil
}

View File

@@ -1,47 +0,0 @@
package selector
import (
"github.com/micro/go-micro/v2/registry"
)
var (
// mock data
testData = map[string][]*registry.Service{
"foo": {
{
Name: "foo",
Version: "1.0.0",
Nodes: []*registry.Node{
{
Id: "foo-1.0.0-123",
Address: "localhost:9999",
},
{
Id: "foo-1.0.0-321",
Address: "localhost:9999",
},
},
},
{
Name: "foo",
Version: "1.0.1",
Nodes: []*registry.Node{
{
Id: "foo-1.0.1-321",
Address: "localhost:6666",
},
},
},
{
Name: "foo",
Version: "1.0.3",
Nodes: []*registry.Node{
{
Id: "foo-1.0.3-345",
Address: "localhost:8888",
},
},
},
},
}
)

View File

@@ -1,132 +0,0 @@
package selector
import (
"time"
"github.com/micro/go-micro/v2/registry"
"github.com/micro/go-micro/v2/registry/cache"
)
type registrySelector struct {
so Options
rc cache.Cache
}
func (c *registrySelector) newCache() cache.Cache {
ropts := []cache.Option{}
if c.so.Context != nil {
if t, ok := c.so.Context.Value("selector_ttl").(time.Duration); ok {
ropts = append(ropts, cache.WithTTL(t))
}
}
return cache.New(c.so.Registry, ropts...)
}
func (c *registrySelector) Init(opts ...Option) error {
for _, o := range opts {
o(&c.so)
}
c.rc.Stop()
c.rc = c.newCache()
return nil
}
func (c *registrySelector) Options() Options {
return c.so
}
func (c *registrySelector) Select(service string, opts ...SelectOption) (Next, error) {
sopts := SelectOptions{Strategy: c.so.Strategy}
for _, opt := range opts {
opt(&sopts)
}
// a specific domain was requested, only lookup the services in that domain
if len(sopts.Domain) > 0 {
services, err := c.rc.GetService(service, registry.GetDomain(sopts.Domain))
if err != nil && err != registry.ErrNotFound {
return nil, err
}
for _, filter := range sopts.Filters {
services = filter(services)
}
if len(services) == 0 {
return nil, ErrNoneAvailable
}
return sopts.Strategy(services), nil
}
// get the service. Because the service could be running in the current or the default domain,
// we call both. For example, go.micro.service.foo could be running in the services current domain,
// however the runtime (go.micro.runtime) will always be run in the default domain.
services, err := c.rc.GetService(service, registry.GetDomain(c.so.Domain))
if err != nil && err != registry.ErrNotFound {
return nil, err
}
if c.so.Domain != registry.DefaultDomain {
srvs, err := c.rc.GetService(service, registry.GetDomain(registry.DefaultDomain))
if err != nil && err != registry.ErrNotFound {
return nil, err
}
if err == nil {
services = append(services, srvs...)
}
}
if services == nil {
return nil, ErrNoneAvailable
}
// apply the filters
for _, filter := range sopts.Filters {
services = filter(services)
}
// if there's nothing left, return
if len(services) == 0 {
return nil, ErrNoneAvailable
}
return sopts.Strategy(services), nil
}
func (c *registrySelector) Mark(service string, node *registry.Node, err error) {
}
func (c *registrySelector) Reset(service string) {
}
// Close stops the watcher and destroys the cache
func (c *registrySelector) Close() error {
c.rc.Stop()
return nil
}
func (c *registrySelector) String() string {
return "registry"
}
func NewSelector(opts ...Option) Selector {
sopts := Options{
Strategy: Random,
}
for _, opt := range opts {
opt(&sopts)
}
if sopts.Registry == nil {
sopts.Registry = registry.DefaultRegistry
}
s := &registrySelector{
so: sopts,
}
s.rc = s.newCache()
return s
}

View File

@@ -1,32 +0,0 @@
package selector
import (
"os"
"testing"
"github.com/micro/go-micro/v2/registry/memory"
)
func TestRegistrySelector(t *testing.T) {
counts := map[string]int{}
r := memory.NewRegistry(memory.Services(testData))
cache := NewSelector(Registry(r))
next, err := cache.Select("foo")
if err != nil {
t.Errorf("Unexpected error calling cache select: %v", err)
}
for i := 0; i < 100; i++ {
node, err := next()
if err != nil {
t.Errorf("Expected node err, got err: %v", err)
}
counts[node.Id]++
}
if len(os.Getenv("IN_TRAVIS_CI")) == 0 {
t.Logf("Selector Counts %v", counts)
}
}

View File

@@ -1,124 +0,0 @@
// Package dns provides a dns SRV selector
package dns
import (
"fmt"
"net"
"strconv"
"github.com/micro/go-micro/v2/client/selector"
"github.com/micro/go-micro/v2/registry"
)
type dnsSelector struct {
options selector.Options
domain string
}
var (
DefaultDomain = "local"
)
func (d *dnsSelector) Init(opts ...selector.Option) error {
for _, o := range opts {
o(&d.options)
}
return nil
}
func (d *dnsSelector) Options() selector.Options {
return d.options
}
func (d *dnsSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) {
var srv []*net.SRV
// check if its host:port
host, port, err := net.SplitHostPort(service)
// not host:port
if err != nil {
// lookup the SRV record
_, srvs, err := net.LookupSRV(service, "tcp", d.domain)
if err != nil {
return nil, err
}
// set SRV records
srv = srvs
// got host:port
} else {
p, _ := strconv.Atoi(port)
// lookup the A record
ips, err := net.LookupHost(host)
if err != nil {
return nil, err
}
// create SRV records
for _, ip := range ips {
srv = append(srv, &net.SRV{
Target: ip,
Port: uint16(p),
})
}
}
nodes := make([]*registry.Node, 0, len(srv))
for _, node := range srv {
nodes = append(nodes, &registry.Node{
Id: node.Target,
Address: fmt.Sprintf("%s:%d", node.Target, node.Port),
})
}
services := []*registry.Service{
{
Name: service,
Nodes: nodes,
},
}
sopts := selector.SelectOptions{
Strategy: d.options.Strategy,
}
for _, opt := range opts {
opt(&sopts)
}
// apply the filters
for _, filter := range sopts.Filters {
services = filter(services)
}
// if there's nothing left, return
if len(services) == 0 {
return nil, selector.ErrNoneAvailable
}
return sopts.Strategy(services), nil
}
func (d *dnsSelector) Mark(service string, node *registry.Node, err error) {}
func (d *dnsSelector) Reset(service string) {}
func (d *dnsSelector) Close() error {
return nil
}
func (d *dnsSelector) String() string {
return "dns"
}
func NewSelector(opts ...selector.Option) selector.Selector {
options := selector.Options{
Strategy: selector.Random,
}
for _, o := range opts {
o(&options)
}
return &dnsSelector{options: options, domain: DefaultDomain}
}

View File

@@ -1,73 +0,0 @@
package selector
import (
"github.com/micro/go-micro/v2/registry"
)
// FilterEndpoint is an endpoint based Select Filter which will
// only return services with the endpoint specified.
func FilterEndpoint(name string) Filter {
return func(old []*registry.Service) []*registry.Service {
var services []*registry.Service
for _, service := range old {
for _, ep := range service.Endpoints {
if ep.Name == name {
services = append(services, service)
break
}
}
}
return services
}
}
// FilterLabel is a label based Select Filter which will
// only return services with the label specified.
func FilterLabel(key, val string) Filter {
return func(old []*registry.Service) []*registry.Service {
var services []*registry.Service
for _, service := range old {
serv := new(registry.Service)
var nodes []*registry.Node
for _, node := range service.Nodes {
if node.Metadata == nil {
continue
}
if node.Metadata[key] == val {
nodes = append(nodes, node)
}
}
// only add service if there's some nodes
if len(nodes) > 0 {
// copy
*serv = *service
serv.Nodes = nodes
services = append(services, serv)
}
}
return services
}
}
// FilterVersion is a version based Select Filter which will
// only return services with the version specified.
func FilterVersion(version string) Filter {
return func(old []*registry.Service) []*registry.Service {
var services []*registry.Service
for _, service := range old {
if service.Version == version {
services = append(services, service)
}
}
return services
}
}

View File

@@ -1,239 +0,0 @@
package selector
import (
"testing"
"github.com/micro/go-micro/v2/registry"
)
func TestFilterEndpoint(t *testing.T) {
testData := []struct {
services []*registry.Service
endpoint string
count int
}{
{
services: []*registry.Service{
{
Name: "test",
Version: "1.0.0",
Endpoints: []*registry.Endpoint{
{
Name: "Foo.Bar",
},
},
},
{
Name: "test",
Version: "1.1.0",
Endpoints: []*registry.Endpoint{
{
Name: "Baz.Bar",
},
},
},
},
endpoint: "Foo.Bar",
count: 1,
},
{
services: []*registry.Service{
{
Name: "test",
Version: "1.0.0",
Endpoints: []*registry.Endpoint{
{
Name: "Foo.Bar",
},
},
},
{
Name: "test",
Version: "1.1.0",
Endpoints: []*registry.Endpoint{
{
Name: "Foo.Bar",
},
},
},
},
endpoint: "Bar.Baz",
count: 0,
},
}
for _, data := range testData {
filter := FilterEndpoint(data.endpoint)
services := filter(data.services)
if len(services) != data.count {
t.Fatalf("Expected %d services, got %d", data.count, len(services))
}
for _, service := range services {
var seen bool
for _, ep := range service.Endpoints {
if ep.Name == data.endpoint {
seen = true
break
}
}
if !seen && data.count > 0 {
t.Fatalf("Expected %d services but seen is %t; result %+v", data.count, seen, services)
}
}
}
}
func TestFilterLabel(t *testing.T) {
testData := []struct {
services []*registry.Service
label [2]string
count int
}{
{
services: []*registry.Service{
{
Name: "test",
Version: "1.0.0",
Nodes: []*registry.Node{
{
Id: "test-1",
Address: "localhost",
Metadata: map[string]string{
"foo": "bar",
},
},
},
},
{
Name: "test",
Version: "1.1.0",
Nodes: []*registry.Node{
{
Id: "test-2",
Address: "localhost",
Metadata: map[string]string{
"foo": "baz",
},
},
},
},
},
label: [2]string{"foo", "bar"},
count: 1,
},
{
services: []*registry.Service{
{
Name: "test",
Version: "1.0.0",
Nodes: []*registry.Node{
{
Id: "test-1",
Address: "localhost",
},
},
},
{
Name: "test",
Version: "1.1.0",
Nodes: []*registry.Node{
{
Id: "test-2",
Address: "localhost",
},
},
},
},
label: [2]string{"foo", "bar"},
count: 0,
},
}
for _, data := range testData {
filter := FilterLabel(data.label[0], data.label[1])
services := filter(data.services)
if len(services) != data.count {
t.Fatalf("Expected %d services, got %d", data.count, len(services))
}
for _, service := range services {
var seen bool
for _, node := range service.Nodes {
if node.Metadata[data.label[0]] != data.label[1] {
t.Fatalf("Expected %s=%s but got %s=%s for service %+v node %+v",
data.label[0], data.label[1], data.label[0], node.Metadata[data.label[0]], service, node)
}
seen = true
}
if !seen {
t.Fatalf("Expected node for %s=%s but saw none; results %+v", data.label[0], data.label[1], service)
}
}
}
}
func TestFilterVersion(t *testing.T) {
testData := []struct {
services []*registry.Service
version string
count int
}{
{
services: []*registry.Service{
{
Name: "test",
Version: "1.0.0",
},
{
Name: "test",
Version: "1.1.0",
},
},
version: "1.0.0",
count: 1,
},
{
services: []*registry.Service{
{
Name: "test",
Version: "1.0.0",
},
{
Name: "test",
Version: "1.1.0",
},
},
version: "2.0.0",
count: 0,
},
}
for _, data := range testData {
filter := FilterVersion(data.version)
services := filter(data.services)
if len(services) != data.count {
t.Fatalf("Expected %d services, got %d", data.count, len(services))
}
var seen bool
for _, service := range services {
if service.Version != data.version {
t.Fatalf("Expected version %s, got %s", data.version, service.Version)
}
seen = true
}
if !seen && data.count > 0 {
t.Fatalf("Expected %d services but seen is %t; result %+v", data.count, seen, services)
}
}
}

View File

@@ -1,78 +0,0 @@
package selector
import (
"context"
"github.com/micro/go-micro/v2/registry"
)
type Options struct {
Registry registry.Registry
Strategy Strategy
// Domain to lookup services from within the registry
Domain string
// Other options for implementations of the interface
// can be stored in a context
Context context.Context
}
type SelectOptions struct {
Filters []Filter
Strategy Strategy
Domain string
// Other options for implementations of the interface
// can be stored in a context
Context context.Context
}
// Option used to initialise the selector
type Option func(*Options)
// SelectOption used when making a select call
type SelectOption func(*SelectOptions)
// Registry sets the registry used by the selector
func Registry(r registry.Registry) Option {
return func(o *Options) {
o.Registry = r
}
}
// Domain sets the domain used by the selector
func Domain(d string) Option {
return func(o *Options) {
o.Domain = d
}
}
// SetStrategy sets the default strategy for the selector
func SetStrategy(fn Strategy) Option {
return func(o *Options) {
o.Strategy = fn
}
}
// WithFilter adds a filter function to the list of filters
// used during the Select call.
func WithFilter(fn ...Filter) SelectOption {
return func(o *SelectOptions) {
o.Filters = append(o.Filters, fn...)
}
}
// Strategy sets the selector strategy
func WithStrategy(fn Strategy) SelectOption {
return func(o *SelectOptions) {
o.Strategy = fn
}
}
// WithDomain sets the registry domain to use for the selection
func WithDomain(d string) SelectOption {
return func(o *SelectOptions) {
o.Domain = d
}
}

View File

@@ -1,18 +0,0 @@
package registry
import (
"context"
"time"
"github.com/micro/go-micro/v2/client/selector"
)
// Set the registry cache ttl
func TTL(t time.Duration) selector.Option {
return func(o *selector.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, "selector_ttl", t)
}
}

View File

@@ -1,11 +0,0 @@
// Package registry uses the go-micro registry for selection
package registry
import (
"github.com/micro/go-micro/v2/client/selector"
)
// NewSelector returns a new registry selector
func NewSelector(opts ...selector.Option) selector.Selector {
return selector.NewSelector(opts...)
}

View File

@@ -1,270 +0,0 @@
// Package router is a network/router selector
package router
import (
"context"
"os"
"sort"
"sync"
"github.com/micro/go-micro/v2/client"
"github.com/micro/go-micro/v2/client/selector"
"github.com/micro/go-micro/v2/registry"
"github.com/micro/go-micro/v2/router"
pb "github.com/micro/go-micro/v2/router/service/proto"
)
type routerSelector struct {
opts selector.Options
// the router
r router.Router
// the client we have
c client.Client
// the client for the remote router
rs pb.RouterService
// name of the router
name string
// address of the remote router
addr string
// whether to use the remote router
remote bool
}
type clientKey struct{}
type routerKey struct{}
// getRoutes returns the routes whether they are remote or local
func (r *routerSelector) getRoutes(service string) ([]router.Route, error) {
if !r.remote {
// lookup router for routes for the service
return r.r.Lookup(
router.QueryService(service),
)
}
// lookup the remote router
var addrs []string
// set the remote address if specified
if len(r.addr) > 0 {
addrs = append(addrs, r.addr)
} else {
// we have a name so we need to check the registry
services, err := r.c.Options().Registry.GetService(r.name)
if err != nil {
return nil, err
}
for _, service := range services {
for _, node := range service.Nodes {
addrs = append(addrs, node.Address)
}
}
}
// no router addresses available
if len(addrs) == 0 {
return nil, selector.ErrNoneAvailable
}
var pbRoutes *pb.LookupResponse
var err error
// TODO: implement backoff and retries
for _, addr := range addrs {
// call the router
pbRoutes, err = r.rs.Lookup(context.Background(), &pb.LookupRequest{
Query: &pb.Query{
Service: service,
},
}, client.WithAddress(addr))
if err != nil {
continue
}
break
}
// errored out
if err != nil {
return nil, err
}
// no routes
if pbRoutes == nil {
return nil, selector.ErrNoneAvailable
}
routes := make([]router.Route, 0, len(pbRoutes.Routes))
// convert from pb to []*router.Route
for _, r := range pbRoutes.Routes {
routes = append(routes, router.Route{
Service: r.Service,
Address: r.Address,
Gateway: r.Gateway,
Network: r.Network,
Link: r.Link,
Metric: r.Metric,
})
}
return routes, nil
}
func (r *routerSelector) Init(opts ...selector.Option) error {
// no op
return nil
}
func (r *routerSelector) Options() selector.Options {
return r.opts
}
func (r *routerSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) {
// TODO: pull routes asynchronously and cache
routes, err := r.getRoutes(service)
if err != nil {
return nil, err
}
// no routes return not found error
if len(routes) == 0 {
return nil, selector.ErrNotFound
}
// TODO: apply filters by pseudo constructing service
// sort the routes based on metric
sort.Slice(routes, func(i, j int) bool {
return routes[i].Metric < routes[j].Metric
})
// roundrobin assuming routes are in metric preference order
var i int
var mtx sync.Mutex
return func() (*registry.Node, error) {
// get index and increment counter with every call to next
mtx.Lock()
idx := i
i++
mtx.Unlock()
// get route based on idx
route := routes[idx%len(routes)]
// defaults to gateway and no port
address := route.Address
if len(route.Gateway) > 0 {
address = route.Gateway
}
// return as a node
return &registry.Node{
// TODO: add id and metadata if we can
Address: address,
}, nil
}, nil
}
func (r *routerSelector) Mark(service string, node *registry.Node, err error) {
// TODO: pass back metrics or information to the router
}
func (r *routerSelector) Reset(service string) {
// TODO: reset the metrics or information at the router
}
func (r *routerSelector) Close() error {
// stop the router advertisements
return r.r.Close()
}
func (r *routerSelector) String() string {
return "router"
}
// NewSelector returns a new router based selector
func NewSelector(opts ...selector.Option) selector.Selector {
options := selector.Options{
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
// set default registry if not set
if options.Registry == nil {
options.Registry = registry.DefaultRegistry
}
// try get router from the context
r, ok := options.Context.Value(routerKey{}).(router.Router)
if !ok {
// TODO: Use router.DefaultRouter?
r = router.NewRouter(
router.Registry(options.Registry),
)
}
// try get client from the context
c, ok := options.Context.Value(clientKey{}).(client.Client)
if !ok {
c = client.DefaultClient
}
// get the router from env vars if its a remote service
remote := true
routerName := os.Getenv("MICRO_ROUTER")
routerAddress := os.Getenv("MICRO_ROUTER_ADDRESS")
// start the router advertisements if we're running it locally
if len(routerName) == 0 && len(routerAddress) == 0 {
go r.Advertise()
remote = false
}
return &routerSelector{
opts: options,
// set the internal router
r: r,
// set the client
c: c,
// set the router client
rs: pb.NewRouterService(routerName, c),
// name of the router
name: routerName,
// address of router
addr: routerAddress,
// let ourselves know to use the remote router
remote: remote,
}
}
// WithClient sets the client for the request
func WithClient(c client.Client) selector.Option {
return func(o *selector.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, clientKey{}, c)
}
}
// WithRouter sets the router as an option
func WithRouter(r router.Router) selector.Option {
return func(o *selector.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, routerKey{}, r)
}
}

View File

@@ -1,43 +0,0 @@
// Package selector is a way to pick a list of service nodes
package selector
import (
"errors"
"github.com/micro/go-micro/v2/registry"
)
// Selector builds on the registry as a mechanism to pick nodes
// and mark their status. This allows host pools and other things
// to be built using various algorithms.
type Selector interface {
Init(opts ...Option) error
Options() Options
// Select returns a function which should return the next node
Select(service string, opts ...SelectOption) (Next, error)
// Mark sets the success/error against a node
Mark(service string, node *registry.Node, err error)
// Reset returns state back to zero for a service
Reset(service string)
// Close renders the selector unusable
Close() error
// Name of the selector
String() string
}
// Next is a function that returns the next node
// based on the selector's strategy
type Next func() (*registry.Node, error)
// Filter is used to filter a service during the selection process
type Filter func([]*registry.Service) []*registry.Service
// Strategy is a selection strategy e.g random, round robin
type Strategy func([]*registry.Service) Next
var (
DefaultSelector = NewSelector()
ErrNotFound = errors.New("not found")
ErrNoneAvailable = errors.New("none available")
)

View File

@@ -1,58 +0,0 @@
// Package static provides a static resolver which returns the name/ip passed in without any change
package static
import (
"github.com/micro/go-micro/v2/client/selector"
"github.com/micro/go-micro/v2/registry"
)
// staticSelector is a static selector
type staticSelector struct {
opts selector.Options
}
func (s *staticSelector) Init(opts ...selector.Option) error {
for _, o := range opts {
o(&s.opts)
}
return nil
}
func (s *staticSelector) Options() selector.Options {
return s.opts
}
func (s *staticSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) {
return func() (*registry.Node, error) {
return &registry.Node{
Id: service,
Address: service,
}, nil
}, nil
}
func (s *staticSelector) Mark(service string, node *registry.Node, err error) {
return
}
func (s *staticSelector) Reset(service string) {
return
}
func (s *staticSelector) Close() error {
return nil
}
func (s *staticSelector) String() string {
return "static"
}
func NewSelector(opts ...selector.Option) selector.Selector {
var options selector.Options
for _, o := range opts {
o(&options)
}
return &staticSelector{
opts: options,
}
}

View File

@@ -1,56 +0,0 @@
package selector
import (
"math/rand"
"sync"
"time"
"github.com/micro/go-micro/v2/registry"
)
func init() {
rand.Seed(time.Now().UnixNano())
}
// Random is a random strategy algorithm for node selection
func Random(services []*registry.Service) Next {
nodes := make([]*registry.Node, 0, len(services))
for _, service := range services {
nodes = append(nodes, service.Nodes...)
}
return func() (*registry.Node, error) {
if len(nodes) == 0 {
return nil, ErrNoneAvailable
}
i := rand.Int() % len(nodes)
return nodes[i], nil
}
}
// RoundRobin is a roundrobin strategy algorithm for node selection
func RoundRobin(services []*registry.Service) Next {
nodes := make([]*registry.Node, 0, len(services))
for _, service := range services {
nodes = append(nodes, service.Nodes...)
}
var i = rand.Int()
var mtx sync.Mutex
return func() (*registry.Node, error) {
if len(nodes) == 0 {
return nil, ErrNoneAvailable
}
mtx.Lock()
node := nodes[i%len(nodes)]
i++
mtx.Unlock()
return node, nil
}
}

View File

@@ -1,58 +0,0 @@
package selector
import (
"os"
"testing"
"github.com/micro/go-micro/v2/registry"
)
func TestStrategies(t *testing.T) {
testData := []*registry.Service{
{
Name: "test1",
Version: "latest",
Nodes: []*registry.Node{
{
Id: "test1-1",
Address: "10.0.0.1:1001",
},
{
Id: "test1-2",
Address: "10.0.0.2:1002",
},
},
},
{
Name: "test1",
Version: "default",
Nodes: []*registry.Node{
{
Id: "test1-3",
Address: "10.0.0.3:1003",
},
{
Id: "test1-4",
Address: "10.0.0.4:1004",
},
},
},
}
for name, strategy := range map[string]Strategy{"random": Random, "roundrobin": RoundRobin} {
next := strategy(testData)
counts := make(map[string]int)
for i := 0; i < 100; i++ {
node, err := next()
if err != nil {
t.Fatal(err)
}
counts[node.Id]++
}
if len(os.Getenv("IN_TRAVIS_CI")) == 0 {
t.Logf("%s: %+v\n", name, counts)
}
}
}

View File

@@ -3,11 +3,11 @@ package client
import (
"context"
"github.com/micro/go-micro/v2/registry"
"github.com/micro/go-micro/v2/router"
)
// CallFunc represents the individual call func
type CallFunc func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error
type CallFunc func(ctx context.Context, route *router.Route, req Request, rsp interface{}, opts CallOptions) error
// CallWrapper is a low level wrapper for the CallFunc
type CallWrapper func(CallFunc) CallFunc