cleanup client/selector/lookup (#1937)

* cleanup client/selector/lookup

* add mdns router, remove registry from client

* fix roundtripper

* remove comment

* fix compile issue

* fix mucp test

* fix api router
This commit is contained in:
Asim Aslam 2020-08-17 22:44:45 +01:00 committed by GitHub
parent 7135787b78
commit 50ec6c748f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 398 additions and 418 deletions

View File

@ -18,6 +18,8 @@ import (
"github.com/micro/go-micro/v3/client" "github.com/micro/go-micro/v3/client"
gcli "github.com/micro/go-micro/v3/client/grpc" gcli "github.com/micro/go-micro/v3/client/grpc"
rmemory "github.com/micro/go-micro/v3/registry/memory" rmemory "github.com/micro/go-micro/v3/registry/memory"
rt "github.com/micro/go-micro/v3/router"
regRouter "github.com/micro/go-micro/v3/router/registry"
"github.com/micro/go-micro/v3/server" "github.com/micro/go-micro/v3/server"
gsrv "github.com/micro/go-micro/v3/server/grpc" gsrv "github.com/micro/go-micro/v3/server/grpc"
pb "github.com/micro/go-micro/v3/server/grpc/proto" pb "github.com/micro/go-micro/v3/server/grpc/proto"
@ -55,9 +57,13 @@ func initial(t *testing.T) (server.Server, client.Client) {
server.Registry(r), server.Registry(r),
) )
rtr := regRouter.NewRouter(
rt.Registry(r),
)
// create a new server // create a new server
c := gcli.NewClient( c := gcli.NewClient(
client.Registry(r), client.Router(rtr),
) )
h := &testServer{} h := &testServer{}

View File

@ -16,7 +16,6 @@ import (
raw "github.com/micro/go-micro/v3/codec/bytes" raw "github.com/micro/go-micro/v3/codec/bytes"
"github.com/micro/go-micro/v3/errors" "github.com/micro/go-micro/v3/errors"
"github.com/micro/go-micro/v3/metadata" "github.com/micro/go-micro/v3/metadata"
"github.com/micro/go-micro/v3/registry"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
@ -70,7 +69,7 @@ func (g *grpcClient) secure(addr string) grpc.DialOption {
return grpc.WithInsecure() return grpc.WithInsecure()
} }
func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.Request, rsp interface{}, opts client.CallOptions) error { func (g *grpcClient) call(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
var header map[string]string var header map[string]string
header = make(map[string]string) header = make(map[string]string)
@ -103,7 +102,7 @@ func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.R
grpcDialOptions := []grpc.DialOption{ grpcDialOptions := []grpc.DialOption{
grpc.WithTimeout(opts.DialTimeout), grpc.WithTimeout(opts.DialTimeout),
g.secure(node.Address), g.secure(addr),
grpc.WithDefaultCallOptions( grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(maxRecvMsgSize), grpc.MaxCallRecvMsgSize(maxRecvMsgSize),
grpc.MaxCallSendMsgSize(maxSendMsgSize), grpc.MaxCallSendMsgSize(maxSendMsgSize),
@ -114,13 +113,13 @@ func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.R
grpcDialOptions = append(grpcDialOptions, opts...) grpcDialOptions = append(grpcDialOptions, opts...)
} }
cc, err := g.pool.getConn(node.Address, grpcDialOptions...) cc, err := g.pool.getConn(addr, grpcDialOptions...)
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
} }
defer func() { defer func() {
// defer execution of release // defer execution of release
g.pool.release(node.Address, cc, grr) g.pool.release(addr, cc, grr)
}() }()
ch := make(chan error, 1) ch := make(chan error, 1)
@ -146,7 +145,7 @@ func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.R
return grr return grr
} }
func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client.Request, rsp interface{}, opts client.CallOptions) error { func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
var header map[string]string var header map[string]string
if md, ok := metadata.FromContext(ctx); ok { if md, ok := metadata.FromContext(ctx); ok {
@ -186,14 +185,14 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client
grpcDialOptions := []grpc.DialOption{ grpcDialOptions := []grpc.DialOption{
grpc.WithTimeout(opts.DialTimeout), grpc.WithTimeout(opts.DialTimeout),
g.secure(node.Address), g.secure(addr),
} }
if opts := g.getGrpcDialOptions(); opts != nil { if opts := g.getGrpcDialOptions(); opts != nil {
grpcDialOptions = append(grpcDialOptions, opts...) grpcDialOptions = append(grpcDialOptions, opts...)
} }
cc, err := grpc.DialContext(dialCtx, node.Address, grpcDialOptions...) cc, err := grpc.DialContext(dialCtx, addr, grpcDialOptions...)
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
} }
@ -389,6 +388,34 @@ func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface
gcall = callOpts.CallWrappers[i-1](gcall) gcall = callOpts.CallWrappers[i-1](gcall)
} }
// use the router passed as a call option, or fallback to the rpc clients router
if callOpts.Router == nil {
callOpts.Router = g.opts.Router
}
if callOpts.Selector == nil {
callOpts.Selector = g.opts.Selector
}
// inject proxy address
// TODO: don't even bother using Lookup/Select in this case
if len(g.opts.Proxy) > 0 {
callOpts.Address = []string{g.opts.Proxy}
}
// lookup the route to send the reques to
// TODO apply any filtering here
routes, err := g.opts.Lookup(ctx, req, callOpts)
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
// balance the list of nodes
next, err := callOpts.Selector.Select(routes)
if err != nil {
return err
}
// return errors.New("go.micro.client", "request timeout", 408) // return errors.New("go.micro.client", "request timeout", 408)
call := func(i int) error { call := func(i int) error {
// call backoff first. Someone may want an initial start delay // call backoff first. Someone may want an initial start delay
@ -402,36 +429,14 @@ func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface
time.Sleep(t) time.Sleep(t)
} }
// use the router passed as a call option, or fallback to the rpc clients router // get the next node
if callOpts.Router == nil { node := next()
callOpts.Router = g.opts.Router
}
// use the selector passed as a call option, or fallback to the rpc clients selector
if callOpts.Selector == nil {
callOpts.Selector = g.opts.Selector
}
// inject proxy address
if len(g.opts.Proxy) > 0 {
callOpts.Address = []string{g.opts.Proxy}
}
// lookup the route to send the reques to
route, err := client.LookupRoute(req, callOpts)
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
// pass a node to enable backwards compatability as changing the
// call func would be a breaking change.
// todo v3: change the call func to accept a route
node := &registry.Node{Address: route.Address}
// make the call // make the call
err = gcall(ctx, node, req, rsp, callOpts) err = gcall(ctx, node, req, rsp, callOpts)
// record the result of the call to inform future routing decisions // record the result of the call to inform future routing decisions
g.opts.Selector.Record(*route, err) g.opts.Selector.Record(node, err)
// try and transform the error to a go-micro error // try and transform the error to a go-micro error
if verr, ok := err.(*errors.Error); ok { if verr, ok := err.(*errors.Error); ok {
@ -498,6 +503,34 @@ func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...cli
gstream = callOpts.CallWrappers[i-1](gstream) gstream = callOpts.CallWrappers[i-1](gstream)
} }
// use the router passed as a call option, or fallback to the rpc clients router
if callOpts.Router == nil {
callOpts.Router = g.opts.Router
}
if callOpts.Selector == nil {
callOpts.Selector = g.opts.Selector
}
// inject proxy address
// TODO: don't even bother using Lookup/Select in this case
if len(g.opts.Proxy) > 0 {
callOpts.Address = []string{g.opts.Proxy}
}
// lookup the route to send the reques to
// TODO: move to internal lookup func
routes, err := g.opts.Lookup(ctx, req, callOpts)
if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
}
// balance the list of nodes
next, err := callOpts.Selector.Select(routes)
if err != nil {
return nil, err
}
call := func(i int) (client.Stream, error) { call := func(i int) (client.Stream, error) {
// call backoff first. Someone may want an initial start delay // call backoff first. Someone may want an initial start delay
t, err := callOpts.Backoff(ctx, req, i) t, err := callOpts.Backoff(ctx, req, i)
@ -510,44 +543,22 @@ func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...cli
time.Sleep(t) time.Sleep(t)
} }
// use the router passed as a call option, or fallback to the rpc clients router // get the next node
if callOpts.Router == nil { node := next()
callOpts.Router = g.opts.Router
}
// use the selector passed as a call option, or fallback to the rpc clients selector
if callOpts.Selector == nil {
callOpts.Selector = g.opts.Selector
}
// inject proxy address
if len(g.opts.Proxy) > 0 {
callOpts.Address = []string{g.opts.Proxy}
}
// lookup the route to send the reques to
route, err := client.LookupRoute(req, callOpts)
if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
}
// pass a node to enable backwards compatability as changing the
// call func would be a breaking change.
// todo v3: change the call func to accept a route
node := &registry.Node{Address: route.Address}
// make the call // make the call
stream := &grpcStream{} stream := &grpcStream{}
err = g.stream(ctx, node, req, stream, callOpts) err = g.stream(ctx, node, req, stream, callOpts)
// record the result of the call to inform future routing decisions // record the result of the call to inform future routing decisions
g.opts.Selector.Record(*route, err) g.opts.Selector.Record(node, err)
// try and transform the error to a go-micro error // try and transform the error to a go-micro error
if verr, ok := err.(*errors.Error); ok { if verr, ok := err.(*errors.Error); ok {
return nil, verr return nil, verr
} }
g.opts.Selector.Record(*route, err) g.opts.Selector.Record(node, err)
return stream, err return stream, err
} }

View File

@ -1,21 +1,20 @@
package client package client
import ( import (
"math/rand" "context"
"github.com/micro/go-micro/v3/errors" "github.com/micro/go-micro/v3/errors"
"github.com/micro/go-micro/v3/router" "github.com/micro/go-micro/v3/router"
"github.com/micro/go-micro/v3/selector"
) )
// LookupFunc is used to lookup routes for a service
type LookupFunc func(context.Context, Request, CallOptions) ([]string, error)
// LookupRoute for a request using the router and then choose one using the selector // LookupRoute for a request using the router and then choose one using the selector
func LookupRoute(req Request, opts CallOptions) (*router.Route, error) { func LookupRoute(ctx context.Context, req Request, opts CallOptions) ([]string, error) {
// check to see if an address was provided as a call option // check to see if an address was provided as a call option
if len(opts.Address) > 0 { if len(opts.Address) > 0 {
return &router.Route{ return opts.Address, nil
Service: req.Service(),
Address: opts.Address[rand.Int()%len(opts.Address)],
}, nil
} }
// construct the router query // construct the router query
@ -35,12 +34,11 @@ func LookupRoute(req Request, opts CallOptions) (*router.Route, error) {
return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %s", req.Service(), err.Error()) return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %s", req.Service(), err.Error())
} }
// select the route to use for the request var addrs []string
if route, err := opts.Selector.Select(routes, opts.SelectOptions...); err == selector.ErrNoneAvailable {
return nil, errors.InternalServerError("go.micro.client", "service %s: %s", req.Service(), err.Error()) for _, route := range routes {
} else if err != nil { addrs = append(addrs, route.Address)
return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %s", req.Service(), err.Error())
} else {
return route, nil
} }
return addrs, nil
} }

View File

@ -14,17 +14,11 @@ import (
raw "github.com/micro/go-micro/v3/codec/bytes" raw "github.com/micro/go-micro/v3/codec/bytes"
"github.com/micro/go-micro/v3/errors" "github.com/micro/go-micro/v3/errors"
"github.com/micro/go-micro/v3/metadata" "github.com/micro/go-micro/v3/metadata"
"github.com/micro/go-micro/v3/registry"
"github.com/micro/go-micro/v3/transport" "github.com/micro/go-micro/v3/transport"
"github.com/micro/go-micro/v3/util/buf" "github.com/micro/go-micro/v3/util/buf"
"github.com/micro/go-micro/v3/util/pool" "github.com/micro/go-micro/v3/util/pool"
) )
// NewClient returns a new micro client interface
func NewClient(opts ...client.Option) client.Client {
return newClient(opts...)
}
type rpcClient struct { type rpcClient struct {
once atomic.Value once atomic.Value
opts client.Options opts client.Options
@ -32,7 +26,8 @@ type rpcClient struct {
seq uint64 seq uint64
} }
func newClient(opt ...client.Option) client.Client { // NewClient returns a new micro client interface
func NewClient(opt ...client.Option) client.Client {
opts := client.NewOptions(opt...) opts := client.NewOptions(opt...)
p := pool.NewPool( p := pool.NewPool(
@ -68,7 +63,7 @@ func (r *rpcClient) newCodec(contentType string) (codec.NewCodec, error) {
return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType) return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType)
} }
func (r *rpcClient) call(ctx context.Context, node *registry.Node, req client.Request, resp interface{}, opts client.CallOptions) error { func (r *rpcClient) call(ctx context.Context, addr string, req client.Request, resp interface{}, opts client.CallOptions) error {
msg := &transport.Message{ msg := &transport.Message{
Header: make(map[string]string), Header: make(map[string]string),
} }
@ -92,16 +87,9 @@ func (r *rpcClient) call(ctx context.Context, node *registry.Node, req client.Re
// set the accept header // set the accept header
msg.Header["Accept"] = req.ContentType() msg.Header["Accept"] = req.ContentType()
// setup old protocol cf, err := r.newCodec(req.ContentType())
cf := setupProtocol(msg, node) if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
// no codec specified
if cf == nil {
var err error
cf, err = r.newCodec(req.ContentType())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
} }
dOpts := []transport.DialOption{ dOpts := []transport.DialOption{
@ -112,7 +100,7 @@ func (r *rpcClient) call(ctx context.Context, node *registry.Node, req client.Re
dOpts = append(dOpts, transport.WithTimeout(opts.DialTimeout)) dOpts = append(dOpts, transport.WithTimeout(opts.DialTimeout))
} }
c, err := r.pool.Get(node.Address, dOpts...) c, err := r.pool.Get(addr, dOpts...)
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", "connection error: %v", err) return errors.InternalServerError("go.micro.client", "connection error: %v", err)
} }
@ -185,7 +173,7 @@ func (r *rpcClient) call(ctx context.Context, node *registry.Node, req client.Re
return nil return nil
} }
func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req client.Request, opts client.CallOptions) (client.Stream, error) { func (r *rpcClient) stream(ctx context.Context, addr string, req client.Request, opts client.CallOptions) (client.Stream, error) {
msg := &transport.Message{ msg := &transport.Message{
Header: make(map[string]string), Header: make(map[string]string),
} }
@ -206,16 +194,9 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req client.
// set the accept header // set the accept header
msg.Header["Accept"] = req.ContentType() msg.Header["Accept"] = req.ContentType()
// set old codecs cf, err := r.newCodec(req.ContentType())
cf := setupProtocol(msg, node) if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
// no codec specified
if cf == nil {
var err error
cf, err = r.newCodec(req.ContentType())
if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
}
} }
dOpts := []transport.DialOption{ dOpts := []transport.DialOption{
@ -226,7 +207,7 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req client.
dOpts = append(dOpts, transport.WithTimeout(opts.DialTimeout)) dOpts = append(dOpts, transport.WithTimeout(opts.DialTimeout))
} }
c, err := r.opts.Transport.Dial(node.Address, dOpts...) c, err := r.opts.Transport.Dial(addr, dOpts...)
if err != nil { if err != nil {
return nil, errors.InternalServerError("go.micro.client", "connection error: %v", err) return nil, errors.InternalServerError("go.micro.client", "connection error: %v", err)
} }
@ -356,6 +337,34 @@ func (r *rpcClient) Call(ctx context.Context, request client.Request, response i
rcall = callOpts.CallWrappers[i-1](rcall) rcall = callOpts.CallWrappers[i-1](rcall)
} }
// use the router passed as a call option, or fallback to the rpc clients router
if callOpts.Router == nil {
callOpts.Router = r.opts.Router
}
if callOpts.Selector == nil {
callOpts.Selector = r.opts.Selector
}
// inject proxy address
// TODO: don't even bother using Lookup/Select in this case
if len(r.opts.Proxy) > 0 {
callOpts.Address = []string{r.opts.Proxy}
}
// lookup the route to send the reques to
// TODO apply any filtering here
routes, err := r.opts.Lookup(ctx, request, callOpts)
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
// balance the list of nodes
next, err := callOpts.Selector.Select(routes)
if err != nil {
return err
}
// return errors.New("go.micro.client", "request timeout", 408) // return errors.New("go.micro.client", "request timeout", 408)
call := func(i int) error { call := func(i int) error {
// call backoff first. Someone may want an initial start delay // call backoff first. Someone may want an initial start delay
@ -369,36 +378,14 @@ func (r *rpcClient) Call(ctx context.Context, request client.Request, response i
time.Sleep(t) time.Sleep(t)
} }
// use the router passed as a call option, or fallback to the rpc clients router // get the next node
if callOpts.Router == nil { node := next()
callOpts.Router = r.opts.Router
}
// use the selector passed as a call option, or fallback to the rpc clients selector
if callOpts.Selector == nil {
callOpts.Selector = r.opts.Selector
}
// inject proxy address
if len(r.opts.Proxy) > 0 {
callOpts.Address = []string{r.opts.Proxy}
}
// lookup the route to send the request via
route, err := client.LookupRoute(request, callOpts)
if err != nil {
return err
}
// pass a node to enable backwards comparability as changing the
// call func would be a breaking change.
// todo v3: change the call func to accept a route
node := &registry.Node{Address: route.Address, Metadata: route.Metadata}
// make the call // make the call
err = rcall(ctx, node, request, response, callOpts) err = rcall(ctx, node, request, response, callOpts)
// record the result of the call to inform future routing decisions // record the result of the call to inform future routing decisions
r.opts.Selector.Record(*route, err) r.opts.Selector.Record(node, err)
return err return err
} }
@ -458,6 +445,34 @@ func (r *rpcClient) Stream(ctx context.Context, request client.Request, opts ...
default: default:
} }
// use the router passed as a call option, or fallback to the rpc clients router
if callOpts.Router == nil {
callOpts.Router = r.opts.Router
}
if callOpts.Selector == nil {
callOpts.Selector = r.opts.Selector
}
// inject proxy address
// TODO: don't even bother using Lookup/Select in this case
if len(r.opts.Proxy) > 0 {
callOpts.Address = []string{r.opts.Proxy}
}
// lookup the route to send the reques to
// TODO apply any filtering here
routes, err := r.opts.Lookup(ctx, request, callOpts)
if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
}
// balance the list of nodes
next, err := callOpts.Selector.Select(routes)
if err != nil {
return nil, err
}
call := func(i int) (client.Stream, error) { call := func(i int) (client.Stream, error) {
// call backoff first. Someone may want an initial start delay // call backoff first. Someone may want an initial start delay
t, err := callOpts.Backoff(ctx, request, i) t, err := callOpts.Backoff(ctx, request, i)
@ -470,36 +485,14 @@ func (r *rpcClient) Stream(ctx context.Context, request client.Request, opts ...
time.Sleep(t) time.Sleep(t)
} }
// use the router passed as a call option, or fallback to the rpc clients router // get the next node
if callOpts.Router == nil { node := next()
callOpts.Router = r.opts.Router
}
// use the selector passed as a call option, or fallback to the rpc clients selector
if callOpts.Selector == nil {
callOpts.Selector = r.opts.Selector
}
// inject proxy address
if len(r.opts.Proxy) > 0 {
callOpts.Address = []string{r.opts.Proxy}
}
// lookup the route to send the request via
route, err := client.LookupRoute(request, callOpts)
if err != nil {
return nil, err
}
// pass a node to enable backwards compatability as changing the
// call func would be a breaking change.
// todo v3: change the call func to accept a route
node := &registry.Node{Address: route.Address, Metadata: route.Metadata}
// perform the call // perform the call
stream, err := r.stream(ctx, node, request, callOpts) stream, err := r.stream(ctx, node, request, callOpts)
// record the result of the call to inform future routing decisions // record the result of the call to inform future routing decisions
r.opts.Selector.Record(*route, err) r.opts.Selector.Record(node, err)
return stream, err return stream, err
} }

View File

@ -9,10 +9,13 @@ import (
"github.com/micro/go-micro/v3/errors" "github.com/micro/go-micro/v3/errors"
"github.com/micro/go-micro/v3/registry" "github.com/micro/go-micro/v3/registry"
"github.com/micro/go-micro/v3/registry/memory" "github.com/micro/go-micro/v3/registry/memory"
"github.com/micro/go-micro/v3/router"
regRouter "github.com/micro/go-micro/v3/router/registry"
) )
func newTestRegistry() registry.Registry { func newTestRouter() router.Router {
return memory.NewRegistry(memory.Services(testData)) reg := memory.NewRegistry(memory.Services(testData))
return regRouter.NewRouter(router.Registry(reg))
} }
func TestCallAddress(t *testing.T) { func TestCallAddress(t *testing.T) {
@ -22,7 +25,7 @@ func TestCallAddress(t *testing.T) {
address := "10.1.10.1:8080" address := "10.1.10.1:8080"
wrap := func(cf client.CallFunc) client.CallFunc { wrap := func(cf client.CallFunc) client.CallFunc {
return func(ctx context.Context, node *registry.Node, req client.Request, rsp interface{}, opts client.CallOptions) error { return func(ctx context.Context, node string, req client.Request, rsp interface{}, opts client.CallOptions) error {
called = true called = true
if req.Service() != service { if req.Service() != service {
@ -33,8 +36,8 @@ func TestCallAddress(t *testing.T) {
return fmt.Errorf("expected service: %s got %s", endpoint, req.Endpoint()) return fmt.Errorf("expected service: %s got %s", endpoint, req.Endpoint())
} }
if node.Address != address { if node != address {
return fmt.Errorf("expected address: %s got %s", address, node.Address) return fmt.Errorf("expected address: %s got %s", address, node)
} }
// don't do the call // don't do the call
@ -42,9 +45,10 @@ func TestCallAddress(t *testing.T) {
} }
} }
r := newTestRegistry() r := newTestRouter()
c := NewClient( c := NewClient(
client.Registry(r), client.Router(r),
client.WrapCall(wrap), client.WrapCall(wrap),
) )
@ -69,7 +73,7 @@ func TestCallRetry(t *testing.T) {
var called int var called int
wrap := func(cf client.CallFunc) client.CallFunc { wrap := func(cf client.CallFunc) client.CallFunc {
return func(ctx context.Context, node *registry.Node, req client.Request, rsp interface{}, opts client.CallOptions) error { return func(ctx context.Context, node string, req client.Request, rsp interface{}, opts client.CallOptions) error {
called++ called++
if called == 1 { if called == 1 {
return errors.InternalServerError("test.error", "retry request") return errors.InternalServerError("test.error", "retry request")
@ -80,9 +84,9 @@ func TestCallRetry(t *testing.T) {
} }
} }
r := newTestRegistry() r := newTestRouter()
c := NewClient( c := NewClient(
client.Registry(r), client.Router(r),
client.WrapCall(wrap), client.WrapCall(wrap),
) )
@ -107,7 +111,7 @@ func TestCallWrapper(t *testing.T) {
address := "10.1.10.1:8080" address := "10.1.10.1:8080"
wrap := func(cf client.CallFunc) client.CallFunc { wrap := func(cf client.CallFunc) client.CallFunc {
return func(ctx context.Context, node *registry.Node, req client.Request, rsp interface{}, opts client.CallOptions) error { return func(ctx context.Context, node string, req client.Request, rsp interface{}, opts client.CallOptions) error {
called = true called = true
if req.Service() != service { if req.Service() != service {
@ -118,8 +122,8 @@ func TestCallWrapper(t *testing.T) {
return fmt.Errorf("expected service: %s got %s", endpoint, req.Endpoint()) return fmt.Errorf("expected service: %s got %s", endpoint, req.Endpoint())
} }
if node.Address != address { if node != address {
return fmt.Errorf("expected address: %s got %s", address, node.Address) return fmt.Errorf("expected address: %s got %s", address, node)
} }
// don't do the call // don't do the call
@ -127,22 +131,19 @@ func TestCallWrapper(t *testing.T) {
} }
} }
r := newTestRegistry() r := newTestRouter()
c := NewClient( c := NewClient(
client.Registry(r), client.Router(r),
client.WrapCall(wrap), client.WrapCall(wrap),
) )
r.Register(&registry.Service{ r.Options().Registry.Register(&registry.Service{
Name: service, Name: service,
Version: "latest", Version: "latest",
Nodes: []*registry.Node{ Nodes: []*registry.Node{
{ {
Id: id, Id: id,
Address: address, Address: address,
Metadata: map[string]string{
"protocol": "mucp",
},
}, },
}, },
}) })

View File

@ -7,10 +7,10 @@ import (
"github.com/micro/go-micro/v3/broker" "github.com/micro/go-micro/v3/broker"
"github.com/micro/go-micro/v3/broker/http" "github.com/micro/go-micro/v3/broker/http"
"github.com/micro/go-micro/v3/codec" "github.com/micro/go-micro/v3/codec"
"github.com/micro/go-micro/v3/registry"
"github.com/micro/go-micro/v3/router" "github.com/micro/go-micro/v3/router"
regRouter "github.com/micro/go-micro/v3/router/registry" regRouter "github.com/micro/go-micro/v3/router/registry"
"github.com/micro/go-micro/v3/selector" "github.com/micro/go-micro/v3/selector"
"github.com/micro/go-micro/v3/selector/random"
"github.com/micro/go-micro/v3/transport" "github.com/micro/go-micro/v3/transport"
thttp "github.com/micro/go-micro/v3/transport/http" thttp "github.com/micro/go-micro/v3/transport/http"
) )
@ -28,6 +28,9 @@ type Options struct {
Selector selector.Selector Selector selector.Selector
Transport transport.Transport Transport transport.Transport
// Lookup used for looking up routes
Lookup LookupFunc
// Connection Pool // Connection Pool
PoolSize int PoolSize int
PoolTTL time.Duration PoolTTL time.Duration
@ -116,11 +119,12 @@ func NewOptions(options ...Option) Options {
RequestTimeout: DefaultRequestTimeout, RequestTimeout: DefaultRequestTimeout,
DialTimeout: transport.DefaultDialTimeout, DialTimeout: transport.DefaultDialTimeout,
}, },
Lookup: LookupRoute,
PoolSize: DefaultPoolSize, PoolSize: DefaultPoolSize,
PoolTTL: DefaultPoolTTL, PoolTTL: DefaultPoolTTL,
Broker: http.NewBroker(), Broker: http.NewBroker(),
Router: regRouter.NewRouter(), Router: regRouter.NewRouter(),
Selector: selector.DefaultSelector, Selector: random.NewSelector(),
Transport: thttp.NewTransport(), Transport: thttp.NewTransport(),
} }
@ -216,6 +220,13 @@ func Backoff(fn BackoffFunc) Option {
} }
} }
// Lookup sets the lookup function to use for resolving service names
func Lookup(l LookupFunc) Option {
return func(o *Options) {
o.Lookup = l
}
}
// Number of retries when making the request. // Number of retries when making the request.
// Should this be a Call Option? // Should this be a Call Option?
func Retries(i int) Option { func Retries(i int) Option {
@ -231,13 +242,6 @@ func Retry(fn RetryFunc) Option {
} }
} }
// Registry sets the routers registry
func Registry(r registry.Registry) Option {
return func(o *Options) {
o.Router.Init(router.Registry(r))
}
}
// The request timeout. // The request timeout.
// Should this be a Call Option? // Should this be a Call Option?
func RequestTimeout(d time.Duration) Option { func RequestTimeout(d time.Duration) Option {

View File

@ -2,12 +2,10 @@ package client
import ( import (
"context" "context"
"github.com/micro/go-micro/v3/registry"
) )
// CallFunc represents the individual call func // CallFunc represents the individual call func
type CallFunc func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error type CallFunc func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error
// CallWrapper is a low level wrapper for the CallFunc // CallWrapper is a low level wrapper for the CallFunc
type CallWrapper func(CallFunc) CallFunc type CallWrapper func(CallFunc) CallFunc

View File

@ -10,6 +10,8 @@ import (
"github.com/micro/go-micro/v3/client" "github.com/micro/go-micro/v3/client"
cmucp "github.com/micro/go-micro/v3/client/mucp" cmucp "github.com/micro/go-micro/v3/client/mucp"
"github.com/micro/go-micro/v3/registry/memory" "github.com/micro/go-micro/v3/registry/memory"
"github.com/micro/go-micro/v3/router"
"github.com/micro/go-micro/v3/router/registry"
"github.com/micro/go-micro/v3/server" "github.com/micro/go-micro/v3/server"
"github.com/micro/go-micro/v3/server/mucp" "github.com/micro/go-micro/v3/server/mucp"
) )
@ -53,6 +55,9 @@ func TestHTTPProxy(t *testing.T) {
defer cancel() defer cancel()
reg := memory.NewRegistry() reg := memory.NewRegistry()
rtr := registry.NewRouter(
router.Registry(reg),
)
// new micro service // new micro service
service := mucp.NewServer( service := mucp.NewServer(
@ -70,7 +75,7 @@ func TestHTTPProxy(t *testing.T) {
go http.Serve(c, nil) go http.Serve(c, nil)
cl := cmucp.NewClient( cl := cmucp.NewClient(
client.Registry(reg), client.Router(rtr),
) )
for _, test := range testCases { for _, test := range testCases {

117
router/mdns/mdns.go Normal file
View File

@ -0,0 +1,117 @@
// Package mdns is an mdns router
package mdns
import (
"fmt"
"net"
"strconv"
"time"
"github.com/micro/go-micro/v3/router"
"github.com/micro/go-micro/v3/util/mdns"
)
// NewRouter returns an initialized dns router
func NewRouter(opts ...router.Option) router.Router {
options := router.DefaultOptions()
for _, o := range opts {
o(&options)
}
if len(options.Network) == 0 {
options.Network = "micro"
}
return &mdnsRouter{options}
}
type mdnsRouter struct {
options router.Options
}
func (m *mdnsRouter) Init(opts ...router.Option) error {
for _, o := range opts {
o(&m.options)
}
return nil
}
func (m *mdnsRouter) Options() router.Options {
return m.options
}
func (m *mdnsRouter) Table() router.Table {
return nil
}
func (m *mdnsRouter) Lookup(opts ...router.QueryOption) ([]router.Route, error) {
options := router.NewQuery(opts...)
// check to see if we have the port provided in the service, e.g. go-micro-srv-foo:8000
service, port, err := net.SplitHostPort(options.Service)
if err != nil {
service = options.Service
}
// query for the host
entries := make(chan *mdns.ServiceEntry)
p := mdns.DefaultParams(service)
p.Timeout = time.Millisecond * 100
p.Entries = entries
// check if we're using our own network
if len(options.Network) > 0 {
p.Domain = options.Network
}
// do the query
if err := mdns.Query(p); err != nil {
return nil, err
}
var routes []router.Route
// compose the routes based on the entries
for e := range entries {
addr := e.Host
// prefer ipv4 addrs
if len(e.AddrV4) > 0 {
addr = e.AddrV4.String()
// else use ipv6
} else if len(e.AddrV6) > 0 {
addr = "[" + e.AddrV6.String() + "]"
} else if len(addr) == 0 {
continue
}
pt := 443
if e.Port > 0 {
pt = e.Port
}
// set the port
if len(port) > 0 {
pt, _ = strconv.Atoi(port)
}
routes = append(routes, router.Route{
Service: service,
Address: fmt.Sprintf("%s:%d", addr, pt),
Network: p.Domain,
})
}
return routes, nil
}
func (m *mdnsRouter) Watch(opts ...router.WatchOption) (router.Watcher, error) {
return nil, nil
}
func (m *mdnsRouter) Close() error {
return nil
}
func (m *mdnsRouter) String() string {
return "mdns"
}

View File

@ -1,31 +1,17 @@
package selector package selector
import "github.com/micro/go-micro/v3/router"
// Options used to configure a selector // Options used to configure a selector
type Options struct{} type Options struct{}
// Option updates the options // Option updates the options
type Option func(*Options) type Option func(*Options)
// Filter the routes
type Filter func([]router.Route) []router.Route
// SelectOptions used to configure selection // SelectOptions used to configure selection
type SelectOptions struct { type SelectOptions struct{}
Filters []Filter
}
// SelectOption updates the select options // SelectOption updates the select options
type SelectOption func(*SelectOptions) type SelectOption func(*SelectOptions)
// WithFilter adds a filter to the options
func WithFilter(f Filter) SelectOption {
return func(o *SelectOptions) {
o.Filters = append(o.Filters, f)
}
}
// NewSelectOptions parses select options // NewSelectOptions parses select options
func NewSelectOptions(opts ...SelectOption) SelectOptions { func NewSelectOptions(opts ...SelectOption) SelectOptions {
var options SelectOptions var options SelectOptions
@ -33,9 +19,5 @@ func NewSelectOptions(opts ...SelectOption) SelectOptions {
o(&options) o(&options)
} }
if options.Filters == nil {
options.Filters = make([]Filter, 0)
}
return options return options
} }

View File

@ -1,56 +0,0 @@
package selector
import (
"math/rand"
"github.com/micro/go-micro/v3/router"
)
type random struct{}
func (r *random) Init(opts ...Option) error {
return nil
}
func (r *random) Options() Options {
return Options{}
}
func (r *random) Select(routes []router.Route, opts ...SelectOption) (*router.Route, error) {
// parse the options
options := NewSelectOptions(opts...)
// apply the filters
for _, f := range options.Filters {
routes = f(routes)
}
// we can't select from an empty pool of routes
if len(routes) == 0 {
return nil, ErrNoneAvailable
}
// if there is only one route provided we'll select it
if len(routes) == 1 {
return &routes[0], nil
}
// select a random route from the slice
return &routes[rand.Intn(len(routes)-1)], nil
}
func (r *random) Record(route router.Route, err error) error {
return nil
}
func (r *random) Close() error {
return nil
}
func (r *random) String() string {
return "random"
}
func newSelector(...Option) Selector {
return &random{}
}

View File

@ -1,10 +1,44 @@
package random package random
import ( import (
"math/rand"
"github.com/micro/go-micro/v3/selector" "github.com/micro/go-micro/v3/selector"
) )
type random struct{}
func (r *random) Select(routes []string, opts ...selector.SelectOption) (selector.Next, error) {
// we can't select from an empty pool of routes
if len(routes) == 0 {
return nil, selector.ErrNoneAvailable
}
// return the next func
return func() string {
// if there is only one route provided we'll select it
if len(routes) == 1 {
return routes[0]
}
// select a random route from the slice
return routes[rand.Intn(len(routes)-1)]
}, nil
}
func (r *random) Record(addr string, err error) error {
return nil
}
func (r *random) Reset() error {
return nil
}
func (r *random) String() string {
return "random"
}
// NewSelector returns a random selector // NewSelector returns a random selector
func NewSelector(opts ...selector.Option) selector.Selector { func NewSelector(opts ...selector.Option) selector.Selector {
return selector.DefaultSelector return new(random)
} }

View File

@ -1,110 +1,35 @@
package roundrobin package roundrobin
import ( import (
"sort"
"sync"
"time"
"github.com/micro/go-micro/v3/router"
"github.com/micro/go-micro/v3/selector" "github.com/micro/go-micro/v3/selector"
) )
var routeTTL = time.Minute * 15
// NewSelector returns an initalised round robin selector // NewSelector returns an initalised round robin selector
func NewSelector(opts ...selector.Option) selector.Selector { func NewSelector(opts ...selector.Option) selector.Selector {
r := &roundrobin{ return new(roundrobin)
routes: make(map[uint64]time.Time),
ticker: time.NewTicker(time.Minute),
}
go r.cleanRoutes()
return r
} }
type roundrobin struct { type roundrobin struct{}
ticker *time.Ticker
// routes is a map with the key being a route's hash and the value being the last time it
// was used to perform a request
routes map[uint64]time.Time
sync.Mutex
}
func (r *roundrobin) Init(opts ...selector.Option) error {
return nil
}
func (r *roundrobin) Options() selector.Options {
return selector.Options{}
}
func (r *roundrobin) Select(routes []router.Route, opts ...selector.SelectOption) (*router.Route, error) {
// parse the options
options := selector.NewSelectOptions(opts...)
// apply the filters
for _, f := range options.Filters {
routes = f(routes)
}
func (r *roundrobin) Select(routes []string, opts ...selector.SelectOption) (selector.Next, error) {
if len(routes) == 0 { if len(routes) == 0 {
return nil, selector.ErrNoneAvailable return nil, selector.ErrNoneAvailable
} }
r.Lock() var i int
defer r.Unlock()
// setLastUsed will update the last used time for a route return func() string {
setLastUsed := func(hash uint64) { route := routes[i%len(routes)]
r.routes[hash] = time.Now() // increment
} i++
return route
// if a route hasn't yet been seen, prioritise it }, nil
for _, route := range routes {
if _, ok := r.routes[route.Hash()]; !ok {
setLastUsed(route.Hash())
return &route, nil
}
}
// sort the services by the time they were last used
sort.SliceStable(routes, func(i, j int) bool {
iLastSeen := r.routes[routes[i].Hash()]
jLastSeen := r.routes[routes[j].Hash()]
return iLastSeen.UnixNano() < jLastSeen.UnixNano()
})
// return the route which was last used
setLastUsed(routes[0].Hash())
return &routes[0], nil
} }
func (r *roundrobin) Record(srv router.Route, err error) error { func (r *roundrobin) Record(addr string, err error) error { return nil }
return nil
}
func (r *roundrobin) Close() error { func (r *roundrobin) Reset() error { return nil }
r.ticker.Stop()
return nil
}
func (r *roundrobin) String() string { func (r *roundrobin) String() string {
return "roundrobin" return "roundrobin"
} }
func (r *roundrobin) cleanRoutes() {
for _ = range r.ticker.C {
r.Lock()
// copy the slice to prevent concurrent map iteration and map write
rts := r.routes
for hash, t := range rts {
if t.Unix() < time.Now().Add(-routeTTL).Unix() {
delete(r.routes, hash)
}
}
r.Unlock()
}
}

View File

@ -3,7 +3,6 @@ package roundrobin
import ( import (
"testing" "testing"
"github.com/micro/go-micro/v3/router"
"github.com/micro/go-micro/v3/selector" "github.com/micro/go-micro/v3/selector"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@ -11,39 +10,31 @@ import (
func TestRoundRobin(t *testing.T) { func TestRoundRobin(t *testing.T) {
selector.Tests(t, NewSelector()) selector.Tests(t, NewSelector())
r1 := router.Route{Service: "go.micro.service.foo", Address: "127.0.0.1:8000"} r1 := "127.0.0.1:8000"
r2 := router.Route{Service: "go.micro.service.foo", Address: "127.0.0.1:8001"} r2 := "127.0.0.1:8001"
r3 := router.Route{Service: "go.micro.service.foo", Address: "127.0.0.1:8002"} r3 := "127.0.0.1:8002"
sel := NewSelector() sel := NewSelector()
// By passing r1 and r2 first, it forces a set sequence of (r1 => r2 => r3 => r1) // By passing r1 and r2 first, it forces a set sequence of (r1 => r2 => r3 => r1)
r, err := sel.Select([]router.Route{r1}) next, err := sel.Select([]string{r1})
r := next()
assert.Nil(t, err, "Error should be nil") assert.Nil(t, err, "Error should be nil")
assert.Equal(t, r1, *r, "Expected route to be r1") assert.Equal(t, r1, r, "Expected route to be r1")
r, err = sel.Select([]router.Route{r2}) next, err = sel.Select([]string{r2})
r = next()
assert.Nil(t, err, "Error should be nil") assert.Nil(t, err, "Error should be nil")
assert.Equal(t, r2, *r, "Expected route to be r2") assert.Equal(t, r2, r, "Expected route to be r2")
// Because r1 and r2 have been recently called, r3 should be chosen // Because r1 and r2 have been recently called, r3 should be chosen
r, err = sel.Select([]router.Route{r1, r2, r3}) next, err = sel.Select([]string{r1, r2, r3})
n1, n2, n3 := next(), next(), next()
assert.Nil(t, err, "Error should be nil") assert.Nil(t, err, "Error should be nil")
assert.Equal(t, r3, *r, "Expected route to be r3") assert.Equal(t, r1, n1, "Expected route to be r3")
assert.Equal(t, r2, n2, "Expected route to be r3")
assert.Equal(t, r3, n3, "Expected route to be r3")
// r1 was called longest ago, so it should be prioritised
r, err = sel.Select([]router.Route{r1, r2, r3})
assert.Nil(t, err, "Error should be nil")
assert.Equal(t, r1, *r, "Expected route to be r1")
r, err = sel.Select([]router.Route{r1, r2, r3})
assert.Nil(t, err, "Error should be nil")
assert.Equal(t, r2, *r, "Expected route to be r2")
r, err = sel.Select([]router.Route{r1, r2, r3})
assert.Nil(t, err, "Error should be nil")
assert.Equal(t, r3, *r, "Expected route to be r3")
} }

View File

@ -3,35 +3,24 @@ package selector
import ( import (
"errors" "errors"
"github.com/micro/go-micro/v3/router"
) )
var ( var (
// DefaultSelector is the default selector
DefaultSelector = NewSelector()
// ErrNoneAvailable is returned by select when no routes were provided to select from // ErrNoneAvailable is returned by select when no routes were provided to select from
ErrNoneAvailable = errors.New("none available") ErrNoneAvailable = errors.New("none available")
) )
// Selector selects a route from a pool // Selector selects a route from a pool
type Selector interface { type Selector interface {
// Init a selector with options
Init(...Option) error
// Options the selector is using
Options() Options
// Select a route from the pool using the strategy // Select a route from the pool using the strategy
Select([]router.Route, ...SelectOption) (*router.Route, error) Select([]string, ...SelectOption) (Next, error)
// Record the error returned from a route to inform future selection // Record the error returned from a route to inform future selection
Record(router.Route, error) error Record(string, error) error
// Close the selector // Reset the selector
Close() error Reset() error
// String returns the name of the selector // String returns the name of the selector
String() string String() string
} }
// NewSelector creates new selector and returns it // Next returns the next node
func NewSelector(opts ...Option) Selector { type Next func() string
return newSelector(opts...)
}

View File

@ -3,47 +3,35 @@ package selector
import ( import (
"testing" "testing"
"github.com/micro/go-micro/v3/router"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
// Tests runs all the tests against a selector to ensure the implementations are consistent // Tests runs all the tests against a selector to ensure the implementations are consistent
func Tests(t *testing.T, s Selector) { func Tests(t *testing.T, s Selector) {
r1 := router.Route{Service: "go.micro.service.foo", Address: "127.0.0.1:8000"} r1 := "127.0.0.1:8000"
r2 := router.Route{Service: "go.micro.service.foo", Address: "127.0.0.1:8001"} r2 := "127.0.0.1:8001"
t.Run("Select", func(t *testing.T) { t.Run("Select", func(t *testing.T) {
t.Run("NoRoutes", func(t *testing.T) { t.Run("NoRoutes", func(t *testing.T) {
srv, err := s.Select([]router.Route{}) _, err := s.Select([]string{})
assert.Nil(t, srv, "Route should be nil")
assert.Equal(t, ErrNoneAvailable, err, "Expected error to be none available") assert.Equal(t, ErrNoneAvailable, err, "Expected error to be none available")
}) })
t.Run("OneRoute", func(t *testing.T) { t.Run("OneRoute", func(t *testing.T) {
srv, err := s.Select([]router.Route{r1}) next, err := s.Select([]string{r1})
srv := next()
assert.Nil(t, err, "Error should be nil") assert.Nil(t, err, "Error should be nil")
assert.Equal(t, r1, *srv, "Expected the route to be returned") assert.Equal(t, r1, srv, "Expected the route to be returned")
}) })
t.Run("MultipleRoutes", func(t *testing.T) { t.Run("MultipleRoutes", func(t *testing.T) {
srv, err := s.Select([]router.Route{r1, r2}) next, err := s.Select([]string{r1, r2})
assert.Nil(t, err, "Error should be nil") assert.Nil(t, err, "Error should be nil")
if srv.Address != r1.Address && srv.Address != r2.Address { srv := next()
if srv != r1 && srv != r2 {
t.Errorf("Expected the route to be one of the inputs") t.Errorf("Expected the route to be one of the inputs")
} }
}) })
t.Run("Filters", func(t *testing.T) {
var filterApplied bool
filter := func(rts []router.Route) []router.Route {
filterApplied = true
return rts
}
_, err := s.Select([]router.Route{r1, r2}, WithFilter(filter))
assert.Nil(t, err, "Error should be nil")
assert.True(t, filterApplied, "Filters should be applied")
})
}) })
t.Run("Record", func(t *testing.T) { t.Run("Record", func(t *testing.T) {

View File

@ -21,18 +21,12 @@ func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
} }
// rudimentary retry 3 times // rudimentary retry 3 times
for i := 0; i < 3; i++ { for _, route := range routes {
route, err := r.st.Select(routes)
if err != nil {
continue
}
req.URL.Host = route.Address req.URL.Host = route.Address
w, err := r.rt.RoundTrip(req) w, err := r.rt.RoundTrip(req)
if err != nil { if err != nil {
continue continue
} }
return w, nil return w, nil
} }