client/{grpc,rpc}: fix previous breaking change with CallFunc (router.Route vs registry.Node) (#1781)

This commit is contained in:
ben-toogood 2020-07-02 17:26:45 +01:00 committed by Vasiliy Tolstov
parent fa9c531ad9
commit cb2fc6e24d

29
grpc.go
View File

@ -17,6 +17,7 @@ import (
raw "github.com/micro/go-micro/v2/codec/bytes" raw "github.com/micro/go-micro/v2/codec/bytes"
"github.com/micro/go-micro/v2/errors" "github.com/micro/go-micro/v2/errors"
"github.com/micro/go-micro/v2/metadata" "github.com/micro/go-micro/v2/metadata"
"github.com/micro/go-micro/v2/registry"
"github.com/micro/go-micro/v2/router" "github.com/micro/go-micro/v2/router"
"github.com/micro/go-micro/v2/selector" "github.com/micro/go-micro/v2/selector"
pnet "github.com/micro/go-micro/v2/util/net" pnet "github.com/micro/go-micro/v2/util/net"
@ -123,7 +124,7 @@ func (g *grpcClient) lookupRoute(req client.Request, opts client.CallOptions) (*
} }
} }
func (g *grpcClient) call(ctx context.Context, route *router.Route, req client.Request, rsp interface{}, opts client.CallOptions) error { func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.Request, rsp interface{}, opts client.CallOptions) error {
var header map[string]string var header map[string]string
header = make(map[string]string) header = make(map[string]string)
@ -156,7 +157,7 @@ func (g *grpcClient) call(ctx context.Context, route *router.Route, req client.R
grpcDialOptions := []grpc.DialOption{ grpcDialOptions := []grpc.DialOption{
grpc.WithTimeout(opts.DialTimeout), grpc.WithTimeout(opts.DialTimeout),
g.secure(route.Address), g.secure(node.Address),
grpc.WithDefaultCallOptions( grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(maxRecvMsgSize), grpc.MaxCallRecvMsgSize(maxRecvMsgSize),
grpc.MaxCallSendMsgSize(maxSendMsgSize), grpc.MaxCallSendMsgSize(maxSendMsgSize),
@ -167,13 +168,13 @@ func (g *grpcClient) call(ctx context.Context, route *router.Route, req client.R
grpcDialOptions = append(grpcDialOptions, opts...) grpcDialOptions = append(grpcDialOptions, opts...)
} }
cc, err := g.pool.getConn(route.Address, grpcDialOptions...) cc, err := g.pool.getConn(node.Address, grpcDialOptions...)
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
} }
defer func() { defer func() {
// defer execution of release // defer execution of release
g.pool.release(route.Address, cc, grr) g.pool.release(node.Address, cc, grr)
}() }()
ch := make(chan error, 1) ch := make(chan error, 1)
@ -199,7 +200,7 @@ func (g *grpcClient) call(ctx context.Context, route *router.Route, req client.R
return grr return grr
} }
func (g *grpcClient) stream(ctx context.Context, route *router.Route, req client.Request, rsp interface{}, opts client.CallOptions) error { func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client.Request, rsp interface{}, opts client.CallOptions) error {
var header map[string]string var header map[string]string
if md, ok := metadata.FromContext(ctx); ok { if md, ok := metadata.FromContext(ctx); ok {
@ -239,14 +240,14 @@ func (g *grpcClient) stream(ctx context.Context, route *router.Route, req client
grpcDialOptions := []grpc.DialOption{ grpcDialOptions := []grpc.DialOption{
grpc.WithTimeout(opts.DialTimeout), grpc.WithTimeout(opts.DialTimeout),
g.secure(route.Address), g.secure(node.Address),
} }
if opts := g.getGrpcDialOptions(); opts != nil { if opts := g.getGrpcDialOptions(); opts != nil {
grpcDialOptions = append(grpcDialOptions, opts...) grpcDialOptions = append(grpcDialOptions, opts...)
} }
cc, err := grpc.DialContext(dialCtx, route.Address, grpcDialOptions...) cc, err := grpc.DialContext(dialCtx, node.Address, grpcDialOptions...)
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
} }
@ -461,8 +462,13 @@ func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface
return err return err
} }
// pass a node to enable backwards compatability as changing the
// call func would be a breaking change.
// todo v3: change the call func to accept a route
node := &registry.Node{Address: route.Address}
// make the call // make the call
err = gcall(ctx, route, req, rsp, callOpts) err = gcall(ctx, node, req, rsp, callOpts)
// record the result of the call to inform future routing decisions // record the result of the call to inform future routing decisions
g.opts.Selector.Record(*route, err) g.opts.Selector.Record(*route, err)
@ -550,9 +556,14 @@ func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...cli
return nil, err return nil, err
} }
// pass a node to enable backwards compatability as changing the
// call func would be a breaking change.
// todo v3: change the call func to accept a route
node := &registry.Node{Address: route.Address}
// make the call // make the call
stream := &grpcStream{} stream := &grpcStream{}
err = g.stream(ctx, route, req, stream, callOpts) err = g.stream(ctx, node, req, stream, callOpts)
// record the result of the call to inform future routing decisions // record the result of the call to inform future routing decisions
g.opts.Selector.Record(*route, err) g.opts.Selector.Record(*route, err)