Add dialoptions and calloptions

This commit is contained in:
orb li 2019-10-09 14:45:51 +08:00
parent 44473f954f
commit 88ef785127
2 changed files with 99 additions and 7 deletions

View File

@ -109,13 +109,22 @@ func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.R
maxSendMsgSize := g.maxSendMsgSizeValue() maxSendMsgSize := g.maxSendMsgSizeValue()
var grr error var grr error
cc, err := g.pool.getConn(address, grpc.WithDefaultCallOptions(grpc.ForceCodec(cf)), grpcDialOptions := []grpc.DialOption{
grpc.WithTimeout(opts.DialTimeout), g.secure(), grpc.WithDefaultCallOptions(grpc.ForceCodec(cf)),
grpc.WithTimeout(opts.DialTimeout),
g.secure(),
grpc.WithDefaultCallOptions( grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(maxRecvMsgSize), grpc.MaxCallRecvMsgSize(maxRecvMsgSize),
grpc.MaxCallSendMsgSize(maxSendMsgSize), grpc.MaxCallSendMsgSize(maxSendMsgSize),
)) ),
}
if opts := g.getGrpcDialOptions(); opts != nil {
grpcDialOptions = append(grpcDialOptions, opts...)
}
cc, err := g.pool.getConn(address, grpcDialOptions...)
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
} }
@ -127,7 +136,11 @@ func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.R
ch := make(chan error, 1) ch := make(chan error, 1)
go func() { go func() {
err := cc.Invoke(ctx, methodToGRPC(req.Service(), req.Endpoint()), req.Body(), rsp, grpc.CallContentSubtype(cf.Name())) grpcCallOptions := []grpc.CallOption{grpc.CallContentSubtype(cf.Name())}
if opts := g.getGrpcCallOptions(); opts != nil {
grpcCallOptions = append(grpcCallOptions, opts...)
}
err := cc.Invoke(ctx, methodToGRPC(req.Service(), req.Endpoint()), req.Body(), rsp, grpcCallOptions...)
ch <- microError(err) ch <- microError(err)
}() }()
@ -174,8 +187,17 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client
defer cancel() defer cancel()
wc := wrapCodec{cf} wc := wrapCodec{cf}
grpcDialOptions := []grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.ForceCodec(wc)),
g.secure(),
}
if opts := g.getGrpcDialOptions(); opts != nil {
grpcDialOptions = append(grpcDialOptions, opts...)
}
cc, err := grpc.DialContext(dialCtx, address, grpc.WithDefaultCallOptions(grpc.ForceCodec(wc)), g.secure()) cc, err := grpc.DialContext(dialCtx, address, grpcDialOptions...)
if err != nil { if err != nil {
return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
} }
@ -186,7 +208,11 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client
ServerStreams: true, ServerStreams: true,
} }
st, err := cc.NewStream(ctx, desc, methodToGRPC(req.Service(), req.Endpoint())) grpcCallOptions := []grpc.CallOption{}
if opts := g.getGrpcCallOptions(); opts != nil {
grpcCallOptions = append(grpcCallOptions, opts...)
}
st, err := cc.NewStream(ctx, desc, methodToGRPC(req.Service(), req.Endpoint()), grpcCallOptions...)
if err != nil { if err != nil {
return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err)) return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err))
} }
@ -514,6 +540,46 @@ func (g *grpcClient) String() string {
return "grpc" return "grpc"
} }
func (g *grpcClient) getGrpcDialOptions() []grpc.DialOption {
if g.opts.Context == nil {
return nil
}
v := g.opts.Context.Value(grpcDialOptions{})
if v == nil {
return nil
}
opts, ok := v.([]grpc.DialOption)
if !ok {
return nil
}
return opts
}
func (g *grpcClient) getGrpcCallOptions() []grpc.CallOption {
if g.opts.CallOptions.Context == nil {
return nil
}
v := g.opts.CallOptions.Context.Value(grpcCallOptions{})
if v == nil {
return nil
}
opts, ok := v.([]grpc.CallOption)
if !ok {
return nil
}
return opts
}
func newClient(opts ...client.Option) client.Client { func newClient(opts ...client.Option) client.Client {
options := client.Options{ options := client.Options{
Codecs: make(map[string]codec.NewCodec), Codecs: make(map[string]codec.NewCodec),

View File

@ -23,6 +23,8 @@ type codecsKey struct{}
type tlsAuth struct{} type tlsAuth struct{}
type maxRecvMsgSizeKey struct{} type maxRecvMsgSizeKey struct{}
type maxSendMsgSizeKey struct{} type maxSendMsgSizeKey struct{}
type grpcDialOptions struct{}
type grpcCallOptions struct{}
// gRPC Codec to be used to encode/decode requests for a given content type // gRPC Codec to be used to encode/decode requests for a given content type
func Codec(contentType string, c encoding.Codec) client.Option { func Codec(contentType string, c encoding.Codec) client.Option {
@ -72,3 +74,27 @@ func MaxSendMsgSize(s int) client.Option {
o.Context = context.WithValue(o.Context, maxSendMsgSizeKey{}, s) o.Context = context.WithValue(o.Context, maxSendMsgSizeKey{}, s)
} }
} }
//
// DialOptions to be used to configure gRPC dial options
//
func DialOptions(opts ...grpc.DialOption) client.Option {
return func(o *client.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, grpcDialOptions{}, opts)
}
}
//
// CallOptions to be used to configure gRPC call options
//
func CallOptions(opts ...grpc.CallOption) client.Option {
return func(o *client.Options) {
if o.CallOptions.Context == nil {
o.CallOptions.Context = context.Background()
}
o.CallOptions.Context = context.WithValue(o.CallOptions.Context, grpcCallOptions{}, opts)
}
}