diff --git a/client/node_selector.go b/client/node_selector.go index 3a71e8db..e492ff3b 100644 --- a/client/node_selector.go +++ b/client/node_selector.go @@ -8,10 +8,18 @@ import ( "github.com/micro/go-micro/registry" ) +// NodeSelector is used to retrieve a node to which a request +// should be routed. It takes a list of services and selects +// a single node. If a node cannot be selected it should return +// an error. A list of services is provided as a service may +// have 1 or more versions. +type NodeSelector func(service []*registry.Service) (*registry.Node, error) + func init() { rand.Seed(time.Now().UnixNano()) } +// Built in random hashed node selector func nodeSelector(service []*registry.Service) (*registry.Node, error) { if len(service) == 0 { return nil, errors.NotFound("go.micro.client", "Service not found") diff --git a/client/options.go b/client/options.go index e8be919f..46663909 100644 --- a/client/options.go +++ b/client/options.go @@ -14,6 +14,7 @@ type options struct { registry registry.Registry transport transport.Transport wrappers []Wrapper + selector NodeSelector } // Broker to be used for pub/sub @@ -51,6 +52,13 @@ func Transport(t transport.Transport) Option { } } +// Selector is used to select a node to route a request to +func Selector(s NodeSelector) Option { + return func(o *options) { + o.selector = s + } +} + // Adds a Wrapper to a list of options passed into the client func Wrap(w Wrapper) Option { return func(o *options) { diff --git a/client/rpc_client.go b/client/rpc_client.go index 05773ed0..ae6e075e 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -43,6 +43,10 @@ func newRpcClient(opt ...Option) Client { opts.broker = broker.DefaultBroker } + if opts.selector == nil { + opts.selector = nodeSelector + } + rc := &rpcClient{ once: once, opts: opts, @@ -146,7 +150,7 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac return errors.InternalServerError("go.micro.client", err.Error()) } - node, err := nodeSelector(service) + node, err := r.opts.selector(service) if err != nil { return err } @@ -169,7 +173,7 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, responseChan in return nil, errors.InternalServerError("go.micro.client", err.Error()) } - node, err := nodeSelector(service) + node, err := r.opts.selector(service) if err != nil { return nil, err } diff --git a/examples/client/node_selector/node_selector.go b/examples/client/node_selector/node_selector.go new file mode 100644 index 00000000..387984ac --- /dev/null +++ b/examples/client/node_selector/node_selector.go @@ -0,0 +1,84 @@ +package main + +import ( + "fmt" + "math/rand" + "time" + + "github.com/micro/go-micro/client" + "github.com/micro/go-micro/cmd" + c "github.com/micro/go-micro/context" + "github.com/micro/go-micro/errors" + example "github.com/micro/go-micro/examples/server/proto/example" + "github.com/micro/go-micro/registry" + "golang.org/x/net/context" +) + +func init() { + rand.Seed(time.Now().Unix()) +} + +// A random node selector +func randomSelector(s []*registry.Service) (*registry.Node, error) { + if len(s) == 0 { + return nil, errors.NotFound("go.micro.client", "Service not found") + } + + i := rand.Int() + j := i % len(s) + + if len(s[j].Nodes) == 0 { + return nil, errors.NotFound("go.micro.client", "Service not found") + } + + n := i % len(s[j].Nodes) + return s[j].Nodes[n], nil +} + +// Wraps the node selector so that it will log what node was selected +func wrapSelector(fn client.NodeSelector) client.NodeSelector { + return func(s []*registry.Service) (*registry.Node, error) { + n, err := fn(s) + if err != nil { + return nil, err + } + fmt.Printf("Selected node %v\n", n) + return n, nil + } +} + +func call(i int) { + // Create new request to service go.micro.srv.example, method Example.Call + req := client.NewRequest("go.micro.srv.example", "Example.Call", &example.Request{ + Name: "John", + }) + + // create context with metadata + ctx := c.WithMetadata(context.Background(), map[string]string{ + "X-User-Id": "john", + "X-From-Id": "script", + }) + + rsp := &example.Response{} + + // Call service + if err := client.Call(ctx, req, rsp); err != nil { + fmt.Println("call err: ", err, rsp) + return + } + + fmt.Println("Call:", i, "rsp:", rsp.Msg) +} + +func main() { + cmd.Init() + + client.DefaultClient = client.NewClient( + client.Selector(wrapSelector(randomSelector)), + ) + + fmt.Println("\n--- Call example ---\n") + for i := 0; i < 10; i++ { + call(i) + } +}