87
grpc.go
87
grpc.go
@@ -8,14 +8,15 @@ import (
|
||||
"net"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/micro/go-micro/v3/broker"
|
||||
"github.com/micro/go-micro/v3/client"
|
||||
raw "github.com/micro/go-micro/v3/codec/bytes"
|
||||
"github.com/micro/go-micro/v3/errors"
|
||||
"github.com/micro/go-micro/v3/metadata"
|
||||
raw "github.com/unistack-org/micro-codec-bytes"
|
||||
"github.com/unistack-org/micro/v3/broker"
|
||||
"github.com/unistack-org/micro/v3/client"
|
||||
"github.com/unistack-org/micro/v3/errors"
|
||||
"github.com/unistack-org/micro/v3/metadata"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
@@ -24,9 +25,11 @@ import (
|
||||
)
|
||||
|
||||
type grpcClient struct {
|
||||
opts client.Options
|
||||
pool *pool
|
||||
once atomic.Value
|
||||
opts client.Options
|
||||
codecs map[string]encoding.Codec
|
||||
pool *pool
|
||||
once atomic.Value
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
func init() {
|
||||
@@ -100,8 +103,16 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
|
||||
|
||||
var grr error
|
||||
|
||||
var dialCtx context.Context
|
||||
var cancel context.CancelFunc
|
||||
if opts.DialTimeout >= 0 {
|
||||
dialCtx, cancel = context.WithTimeout(ctx, opts.DialTimeout)
|
||||
} else {
|
||||
dialCtx, cancel = context.WithCancel(ctx)
|
||||
}
|
||||
defer cancel()
|
||||
|
||||
grpcDialOptions := []grpc.DialOption{
|
||||
grpc.WithTimeout(opts.DialTimeout),
|
||||
g.secure(addr),
|
||||
grpc.WithDefaultCallOptions(
|
||||
grpc.MaxCallRecvMsgSize(maxRecvMsgSize),
|
||||
@@ -113,7 +124,7 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
|
||||
grpcDialOptions = append(grpcDialOptions, opts...)
|
||||
}
|
||||
|
||||
cc, err := g.pool.getConn(addr, grpcDialOptions...)
|
||||
cc, err := g.pool.getConn(dialCtx, addr, grpcDialOptions...)
|
||||
if err != nil {
|
||||
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
|
||||
}
|
||||
@@ -172,10 +183,18 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
|
||||
return errors.InternalServerError("go.micro.client", err.Error())
|
||||
}
|
||||
|
||||
var dialCtx context.Context
|
||||
var cancel context.CancelFunc
|
||||
if opts.DialTimeout >= 0 {
|
||||
dialCtx, cancel = context.WithTimeout(ctx, opts.DialTimeout)
|
||||
} else {
|
||||
dialCtx, cancel = context.WithCancel(ctx)
|
||||
}
|
||||
defer cancel()
|
||||
|
||||
wc := wrapCodec{cf}
|
||||
|
||||
grpcDialOptions := []grpc.DialOption{
|
||||
grpc.WithTimeout(opts.DialTimeout),
|
||||
g.secure(addr),
|
||||
}
|
||||
|
||||
@@ -183,7 +202,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
|
||||
grpcDialOptions = append(grpcDialOptions, opts...)
|
||||
}
|
||||
|
||||
cc, err := g.pool.getConn(addr, grpcDialOptions...)
|
||||
cc, err := g.pool.getConn(dialCtx, addr, grpcDialOptions...)
|
||||
if err != nil {
|
||||
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
|
||||
}
|
||||
@@ -202,10 +221,10 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
|
||||
grpcCallOptions = append(grpcCallOptions, opts...)
|
||||
}
|
||||
|
||||
var cancel context.CancelFunc
|
||||
ctx, cancel = context.WithCancel(ctx)
|
||||
// create a new cancelling context
|
||||
newCtx, cancel := context.WithCancel(ctx)
|
||||
|
||||
st, err := cc.NewStream(ctx, desc, methodToGRPC(req.Service(), req.Endpoint()), grpcCallOptions...)
|
||||
st, err := cc.NewStream(newCtx, desc, methodToGRPC(req.Service(), req.Endpoint()), grpcCallOptions...)
|
||||
if err != nil {
|
||||
// we need to cleanup as we dialled and created a context
|
||||
// cancel the context
|
||||
@@ -300,16 +319,10 @@ func (g *grpcClient) maxSendMsgSizeValue() int {
|
||||
}
|
||||
|
||||
func (g *grpcClient) newGRPCCodec(contentType string) (encoding.Codec, error) {
|
||||
codecs := make(map[string]encoding.Codec)
|
||||
if g.opts.Context != nil {
|
||||
if v := g.opts.Context.Value(codecsKey{}); v != nil {
|
||||
codecs = v.(map[string]encoding.Codec)
|
||||
}
|
||||
}
|
||||
if c, ok := codecs[contentType]; ok {
|
||||
return wrapCodec{c}, nil
|
||||
}
|
||||
if c, ok := defaultGRPCCodecs[contentType]; ok {
|
||||
g.RLock()
|
||||
defer g.RUnlock()
|
||||
|
||||
if c, ok := g.codecs[contentType]; ok {
|
||||
return wrapCodec{c}, nil
|
||||
}
|
||||
return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType)
|
||||
@@ -435,7 +448,9 @@ func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface
|
||||
err = gcall(ctx, node, req, rsp, callOpts)
|
||||
|
||||
// record the result of the call to inform future routing decisions
|
||||
g.opts.Selector.Record(node, err)
|
||||
if verr := g.opts.Selector.Record(node, err); verr != nil {
|
||||
return verr
|
||||
}
|
||||
|
||||
// try and transform the error to a go-micro error
|
||||
if verr, ok := err.(*errors.Error); ok {
|
||||
@@ -550,7 +565,9 @@ func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...cli
|
||||
err = g.stream(ctx, node, req, stream, callOpts)
|
||||
|
||||
// record the result of the call to inform future routing decisions
|
||||
g.opts.Selector.Record(node, err)
|
||||
if verr := g.opts.Selector.Record(node, err); verr != nil {
|
||||
return nil, verr
|
||||
}
|
||||
|
||||
// try and transform the error to a go-micro error
|
||||
if verr, ok := err.(*errors.Error); ok {
|
||||
@@ -720,6 +737,22 @@ func newClient(opts ...client.Option) client.Client {
|
||||
c = options.Wrappers[i-1](c)
|
||||
}
|
||||
|
||||
rc.codecs = make(map[string]encoding.Codec, len(defaultGRPCCodecs))
|
||||
for k, v := range defaultGRPCCodecs {
|
||||
rc.codecs[k] = v
|
||||
}
|
||||
|
||||
var codecs map[string]encoding.Codec
|
||||
if rc.opts.Context != nil {
|
||||
if v := rc.opts.Context.Value(codecsKey{}); v != nil {
|
||||
codecs = v.(map[string]encoding.Codec)
|
||||
}
|
||||
}
|
||||
|
||||
for k, v := range codecs {
|
||||
rc.codecs[k] = v
|
||||
}
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user