Deprecate client/selector (#1767)
* client/{grpc,rpc}: depricate selector (wip) * {client,cmd}: remove client/selector * deprecate client/selector * router/static: fix lookup * config/cmd: add support for legacy static selector flag * config/cmd: add support for legacy dns selector flag
This commit is contained in:
parent
ea93f93b18
commit
422fd980e9
134
grpc.go
134
grpc.go
@ -5,6 +5,7 @@ import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
"reflect"
|
||||
"strings"
|
||||
@ -13,11 +14,11 @@ import (
|
||||
|
||||
"github.com/micro/go-micro/v2/broker"
|
||||
"github.com/micro/go-micro/v2/client"
|
||||
"github.com/micro/go-micro/v2/client/selector"
|
||||
raw "github.com/micro/go-micro/v2/codec/bytes"
|
||||
"github.com/micro/go-micro/v2/errors"
|
||||
"github.com/micro/go-micro/v2/metadata"
|
||||
"github.com/micro/go-micro/v2/registry"
|
||||
"github.com/micro/go-micro/v2/router"
|
||||
"github.com/micro/go-micro/v2/selector"
|
||||
pnet "github.com/micro/go-micro/v2/util/net"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
@ -72,41 +73,59 @@ func (g *grpcClient) secure(addr string) grpc.DialOption {
|
||||
return grpc.WithInsecure()
|
||||
}
|
||||
|
||||
func (g *grpcClient) next(request client.Request, opts client.CallOptions) (selector.Next, error) {
|
||||
service, address, _ := pnet.Proxy(request.Service(), opts.Address)
|
||||
|
||||
// return remote address
|
||||
if len(address) > 0 {
|
||||
return func() (*registry.Node, error) {
|
||||
return ®istry.Node{
|
||||
Address: address[0],
|
||||
}, nil
|
||||
// lookupRoute for a request using the router and then choose one using the selector
|
||||
func (g *grpcClient) lookupRoute(req client.Request, opts client.CallOptions) (*router.Route, error) {
|
||||
// check to see if the proxy has been set, if it has we don't need to lookup the routes; net.Proxy
|
||||
// returns a slice of addresses, so we'll use a random one. Eventually we should to use the
|
||||
// selector for this.
|
||||
service, addresses, _ := pnet.Proxy(req.Service(), opts.Address)
|
||||
if len(addresses) > 0 {
|
||||
return &router.Route{
|
||||
Service: service,
|
||||
Address: addresses[rand.Int()%len(addresses)],
|
||||
}, nil
|
||||
}
|
||||
|
||||
// if the network was set, pass it to the selector
|
||||
sopts := opts.SelectOptions
|
||||
// construct the router query
|
||||
query := []router.QueryOption{router.QueryService(req.Service())}
|
||||
|
||||
// if a custom network was requested, pass this to the router. By default the router will use it's
|
||||
// own network, which is set during initialisation.
|
||||
if len(opts.Network) > 0 {
|
||||
sopts = append(sopts, selector.WithDomain(opts.Network))
|
||||
query = append(query, router.QueryNetwork(opts.Network))
|
||||
}
|
||||
|
||||
// get next nodes from the selector
|
||||
next, err := g.opts.Selector.Select(service, sopts...)
|
||||
if err != nil {
|
||||
if err == selector.ErrNotFound {
|
||||
return nil, errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error())
|
||||
}
|
||||
return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %s", service, err.Error())
|
||||
// use the router passed as a call option, or fallback to the grpc clients router
|
||||
if opts.Router == nil {
|
||||
opts.Router = g.opts.Router
|
||||
}
|
||||
|
||||
return next, nil
|
||||
// lookup the routes which can be used to execute the request
|
||||
routes, err := opts.Router.Lookup(query...)
|
||||
if err == router.ErrRouteNotFound {
|
||||
return nil, errors.InternalServerError("go.micro.client", "service %s: %s", req.Service(), err.Error())
|
||||
} else if err != nil {
|
||||
return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %s", req.Service(), err.Error())
|
||||
}
|
||||
|
||||
func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.Request, rsp interface{}, opts client.CallOptions) error {
|
||||
// use the selector passed as a call option, or fallback to the grpc clients selector
|
||||
if opts.Selector == nil {
|
||||
opts.Selector = g.opts.Selector
|
||||
}
|
||||
|
||||
// select the route to use for the request
|
||||
if route, err := opts.Selector.Select(routes); err == selector.ErrNoneAvailable {
|
||||
return nil, errors.InternalServerError("go.micro.client", "service %s: %s", req.Service(), err.Error())
|
||||
} else if err != nil {
|
||||
return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %s", req.Service(), err.Error())
|
||||
} else {
|
||||
return route, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (g *grpcClient) call(ctx context.Context, route *router.Route, req client.Request, rsp interface{}, opts client.CallOptions) error {
|
||||
var header map[string]string
|
||||
|
||||
address := node.Address
|
||||
|
||||
header = make(map[string]string)
|
||||
if md, ok := metadata.FromContext(ctx); ok {
|
||||
header = make(map[string]string, len(md))
|
||||
@ -137,7 +156,7 @@ func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.R
|
||||
|
||||
grpcDialOptions := []grpc.DialOption{
|
||||
grpc.WithTimeout(opts.DialTimeout),
|
||||
g.secure(address),
|
||||
g.secure(route.Address),
|
||||
grpc.WithDefaultCallOptions(
|
||||
grpc.MaxCallRecvMsgSize(maxRecvMsgSize),
|
||||
grpc.MaxCallSendMsgSize(maxSendMsgSize),
|
||||
@ -148,13 +167,13 @@ func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.R
|
||||
grpcDialOptions = append(grpcDialOptions, opts...)
|
||||
}
|
||||
|
||||
cc, err := g.pool.getConn(address, grpcDialOptions...)
|
||||
cc, err := g.pool.getConn(route.Address, grpcDialOptions...)
|
||||
if err != nil {
|
||||
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
|
||||
}
|
||||
defer func() {
|
||||
// defer execution of release
|
||||
g.pool.release(address, cc, grr)
|
||||
g.pool.release(route.Address, cc, grr)
|
||||
}()
|
||||
|
||||
ch := make(chan error, 1)
|
||||
@ -180,11 +199,9 @@ func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.R
|
||||
return grr
|
||||
}
|
||||
|
||||
func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client.Request, rsp interface{}, opts client.CallOptions) error {
|
||||
func (g *grpcClient) stream(ctx context.Context, route *router.Route, req client.Request, rsp interface{}, opts client.CallOptions) error {
|
||||
var header map[string]string
|
||||
|
||||
address := node.Address
|
||||
|
||||
if md, ok := metadata.FromContext(ctx); ok {
|
||||
header = make(map[string]string, len(md))
|
||||
for k, v := range md {
|
||||
@ -222,14 +239,14 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client
|
||||
|
||||
grpcDialOptions := []grpc.DialOption{
|
||||
grpc.WithTimeout(opts.DialTimeout),
|
||||
g.secure(address),
|
||||
g.secure(route.Address),
|
||||
}
|
||||
|
||||
if opts := g.getGrpcDialOptions(); opts != nil {
|
||||
grpcDialOptions = append(grpcDialOptions, opts...)
|
||||
}
|
||||
|
||||
cc, err := grpc.DialContext(dialCtx, address, grpcDialOptions...)
|
||||
cc, err := grpc.DialContext(dialCtx, route.Address, grpcDialOptions...)
|
||||
if err != nil {
|
||||
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
|
||||
}
|
||||
@ -396,11 +413,6 @@ func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface
|
||||
opt(&callOpts)
|
||||
}
|
||||
|
||||
next, err := g.next(req, callOpts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// check if we already have a deadline
|
||||
d, ok := ctx.Deadline()
|
||||
if !ok {
|
||||
@ -443,19 +455,19 @@ func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface
|
||||
time.Sleep(t)
|
||||
}
|
||||
|
||||
// select next node
|
||||
node, err := next()
|
||||
service := req.Service()
|
||||
// lookup the route to send the reques to
|
||||
route, err := g.lookupRoute(req, callOpts)
|
||||
if err != nil {
|
||||
if err == selector.ErrNotFound {
|
||||
return errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error())
|
||||
}
|
||||
return errors.InternalServerError("go.micro.client", "error selecting %s node: %s", service, err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
// make the call
|
||||
err = gcall(ctx, node, req, rsp, callOpts)
|
||||
g.opts.Selector.Mark(service, node, err)
|
||||
err = gcall(ctx, route, req, rsp, callOpts)
|
||||
|
||||
// record the result of the call to inform future routing decisions
|
||||
g.opts.Selector.Record(*route, err)
|
||||
|
||||
// try and transform the error to a go-micro error
|
||||
if verr, ok := err.(*errors.Error); ok {
|
||||
return verr
|
||||
}
|
||||
@ -503,11 +515,6 @@ func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...cli
|
||||
opt(&callOpts)
|
||||
}
|
||||
|
||||
next, err := g.next(req, callOpts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// #200 - streams shouldn't have a request timeout set on the context
|
||||
|
||||
// should we noop right here?
|
||||
@ -537,20 +544,25 @@ func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...cli
|
||||
time.Sleep(t)
|
||||
}
|
||||
|
||||
node, err := next()
|
||||
service := req.Service()
|
||||
// lookup the route to send the reques to
|
||||
route, err := g.lookupRoute(req, callOpts)
|
||||
if err != nil {
|
||||
if err == selector.ErrNotFound {
|
||||
return nil, errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error())
|
||||
}
|
||||
return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %s", service, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// make the call
|
||||
stream := &grpcStream{}
|
||||
err = g.stream(ctx, node, req, stream, callOpts)
|
||||
err = g.stream(ctx, route, req, stream, callOpts)
|
||||
|
||||
g.opts.Selector.Mark(service, node, err)
|
||||
// record the result of the call to inform future routing decisions
|
||||
g.opts.Selector.Record(*route, err)
|
||||
|
||||
// try and transform the error to a go-micro error
|
||||
if verr, ok := err.(*errors.Error); ok {
|
||||
return nil, verr
|
||||
}
|
||||
|
||||
g.opts.Selector.Record(*route, err)
|
||||
return stream, err
|
||||
}
|
||||
|
||||
@ -577,7 +589,7 @@ func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...cli
|
||||
return rsp.stream, nil
|
||||
}
|
||||
|
||||
retry, rerr := callOpts.Retry(ctx, req, i, err)
|
||||
retry, rerr := callOpts.Retry(ctx, req, i, grr)
|
||||
if rerr != nil {
|
||||
return nil, rerr
|
||||
}
|
||||
|
13
grpc_test.go
13
grpc_test.go
@ -6,10 +6,10 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/micro/go-micro/v2/client"
|
||||
"github.com/micro/go-micro/v2/client/selector"
|
||||
"github.com/micro/go-micro/v2/errors"
|
||||
"github.com/micro/go-micro/v2/registry"
|
||||
"github.com/micro/go-micro/v2/registry/memory"
|
||||
"github.com/micro/go-micro/v2/router"
|
||||
pgrpc "google.golang.org/grpc"
|
||||
pb "google.golang.org/grpc/examples/helloworld/helloworld"
|
||||
)
|
||||
@ -56,16 +56,11 @@ func TestGRPCClient(t *testing.T) {
|
||||
},
|
||||
})
|
||||
|
||||
// create selector
|
||||
se := selector.NewSelector(
|
||||
selector.Registry(r),
|
||||
)
|
||||
// create router
|
||||
rtr := router.NewRouter(router.Registry(r))
|
||||
|
||||
// create client
|
||||
c := NewClient(
|
||||
client.Registry(r),
|
||||
client.Selector(se),
|
||||
)
|
||||
c := NewClient(client.Router(rtr))
|
||||
|
||||
testMethods := []string{
|
||||
"/helloworld.Greeter/SayHello",
|
||||
|
Loading…
Reference in New Issue
Block a user