diff --git a/client/grpc/grpc.go b/client/grpc/grpc.go index 1613506e..9ed206c6 100644 --- a/client/grpc/grpc.go +++ b/client/grpc/grpc.go @@ -6,9 +6,9 @@ import ( "context" "crypto/tls" "fmt" + "os" "sync" "time" - "os" "github.com/micro/go-micro/broker" "github.com/micro/go-micro/client" @@ -49,17 +49,17 @@ func (g *grpcClient) secure() grpc.DialOption { } func (g *grpcClient) next(request client.Request, opts client.CallOptions) (selector.Next, error) { - service := request.Service() + service := request.Service() - // get proxy - if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 { - service = prx - } + // get proxy + if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 { + service = prx + } - // get proxy address - if prx := os.Getenv("MICRO_PROXY_ADDRESS"); len(prx) > 0 { - opts.Address = prx - } + // get proxy address + if prx := os.Getenv("MICRO_PROXY_ADDRESS"); len(prx) > 0 { + opts.Address = prx + } // return remote address if len(opts.Address) > 0 { @@ -112,7 +112,7 @@ func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.R var grr error - cc, err := g.pool.getConn(address, grpc.WithDefaultCallOptions(grpc.CallCustomCodec(cf)), + cc, err := g.pool.getConn(address, grpc.WithDefaultCallOptions(grpc.ForceCodec(cf)), grpc.WithTimeout(opts.DialTimeout), g.secure(), grpc.WithDefaultCallOptions( grpc.MaxCallRecvMsgSize(maxRecvMsgSize), @@ -129,7 +129,7 @@ func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.R ch := make(chan error, 1) go func() { - err := cc.Invoke(ctx, methodToGRPC(req.Endpoint(), req.Body()), req.Body(), rsp, grpc.CallContentSubtype(cf.String())) + err := cc.Invoke(ctx, methodToGRPC(req.Endpoint(), req.Body()), req.Body(), rsp, grpc.ForceCodec(cf)) ch <- microError(err) }() @@ -194,17 +194,17 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client } rsp := &response{ - conn: cc, + conn: cc, stream: st, - codec: cf, + codec: cf, } return &grpcStream{ - context: ctx, - request: req, + context: ctx, + request: req, response: rsp, - stream: st, - conn: cc, + stream: st, + conn: cc, }, nil } @@ -230,7 +230,7 @@ func (g *grpcClient) maxSendMsgSizeValue() int { return v.(int) } -func (g *grpcClient) newGRPCCodec(contentType string) (grpc.Codec, error) { +func (g *grpcClient) newGRPCCodec(contentType string) (encoding.Codec, error) { codecs := make(map[string]encoding.Codec) if g.opts.Context != nil { if v := g.opts.Context.Value(codecsKey{}); v != nil { diff --git a/client/grpc/request.go b/client/grpc/request.go index cbb8fc53..cb14a330 100644 --- a/client/grpc/request.go +++ b/client/grpc/request.go @@ -15,7 +15,7 @@ type grpcRequest struct { contentType string request interface{} opts client.RequestOptions - codec codec.Codec + codec codec.Codec } func methodToGRPC(method string, request interface{}) string { diff --git a/client/grpc/response.go b/client/grpc/response.go index 23c452f7..7ef72241 100644 --- a/client/grpc/response.go +++ b/client/grpc/response.go @@ -5,12 +5,13 @@ import ( "github.com/micro/go-micro/codec" "google.golang.org/grpc" + "google.golang.org/grpc/encoding" ) type response struct { - conn *grpc.ClientConn + conn *grpc.ClientConn stream grpc.ClientStream - codec grpc.Codec + codec encoding.Codec } // Read the response diff --git a/client/grpc/stream.go b/client/grpc/stream.go index d9aae958..af919e46 100644 --- a/client/grpc/stream.go +++ b/client/grpc/stream.go @@ -12,12 +12,12 @@ import ( // Implements the streamer interface type grpcStream struct { sync.RWMutex - err error - conn *grpc.ClientConn - stream grpc.ClientStream - request client.Request + err error + conn *grpc.ClientConn + stream grpc.ClientStream + request client.Request response client.Response - context context.Context + context context.Context } func (g *grpcStream) Context() context.Context {