check in cruft
This commit is contained in:
parent
502ac83bfe
commit
0ecc1d6197
38
grpc.go
38
grpc.go
@ -6,9 +6,9 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
"os"
|
|
||||||
|
|
||||||
"github.com/micro/go-micro/broker"
|
"github.com/micro/go-micro/broker"
|
||||||
"github.com/micro/go-micro/client"
|
"github.com/micro/go-micro/client"
|
||||||
@ -49,17 +49,17 @@ func (g *grpcClient) secure() grpc.DialOption {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcClient) next(request client.Request, opts client.CallOptions) (selector.Next, error) {
|
func (g *grpcClient) next(request client.Request, opts client.CallOptions) (selector.Next, error) {
|
||||||
service := request.Service()
|
service := request.Service()
|
||||||
|
|
||||||
// get proxy
|
// get proxy
|
||||||
if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 {
|
if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 {
|
||||||
service = prx
|
service = prx
|
||||||
}
|
}
|
||||||
|
|
||||||
// get proxy address
|
// get proxy address
|
||||||
if prx := os.Getenv("MICRO_PROXY_ADDRESS"); len(prx) > 0 {
|
if prx := os.Getenv("MICRO_PROXY_ADDRESS"); len(prx) > 0 {
|
||||||
opts.Address = prx
|
opts.Address = prx
|
||||||
}
|
}
|
||||||
|
|
||||||
// return remote address
|
// return remote address
|
||||||
if len(opts.Address) > 0 {
|
if len(opts.Address) > 0 {
|
||||||
@ -112,7 +112,7 @@ func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.R
|
|||||||
|
|
||||||
var grr error
|
var grr error
|
||||||
|
|
||||||
cc, err := g.pool.getConn(address, grpc.WithDefaultCallOptions(grpc.CallCustomCodec(cf)),
|
cc, err := g.pool.getConn(address, grpc.WithDefaultCallOptions(grpc.ForceCodec(cf)),
|
||||||
grpc.WithTimeout(opts.DialTimeout), g.secure(),
|
grpc.WithTimeout(opts.DialTimeout), g.secure(),
|
||||||
grpc.WithDefaultCallOptions(
|
grpc.WithDefaultCallOptions(
|
||||||
grpc.MaxCallRecvMsgSize(maxRecvMsgSize),
|
grpc.MaxCallRecvMsgSize(maxRecvMsgSize),
|
||||||
@ -129,7 +129,7 @@ func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.R
|
|||||||
ch := make(chan error, 1)
|
ch := make(chan error, 1)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
err := cc.Invoke(ctx, methodToGRPC(req.Endpoint(), req.Body()), req.Body(), rsp, grpc.CallContentSubtype(cf.String()))
|
err := cc.Invoke(ctx, methodToGRPC(req.Endpoint(), req.Body()), req.Body(), rsp, grpc.ForceCodec(cf))
|
||||||
ch <- microError(err)
|
ch <- microError(err)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -194,17 +194,17 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client
|
|||||||
}
|
}
|
||||||
|
|
||||||
rsp := &response{
|
rsp := &response{
|
||||||
conn: cc,
|
conn: cc,
|
||||||
stream: st,
|
stream: st,
|
||||||
codec: cf,
|
codec: cf,
|
||||||
}
|
}
|
||||||
|
|
||||||
return &grpcStream{
|
return &grpcStream{
|
||||||
context: ctx,
|
context: ctx,
|
||||||
request: req,
|
request: req,
|
||||||
response: rsp,
|
response: rsp,
|
||||||
stream: st,
|
stream: st,
|
||||||
conn: cc,
|
conn: cc,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -230,7 +230,7 @@ func (g *grpcClient) maxSendMsgSizeValue() int {
|
|||||||
return v.(int)
|
return v.(int)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcClient) newGRPCCodec(contentType string) (grpc.Codec, error) {
|
func (g *grpcClient) newGRPCCodec(contentType string) (encoding.Codec, error) {
|
||||||
codecs := make(map[string]encoding.Codec)
|
codecs := make(map[string]encoding.Codec)
|
||||||
if g.opts.Context != nil {
|
if g.opts.Context != nil {
|
||||||
if v := g.opts.Context.Value(codecsKey{}); v != nil {
|
if v := g.opts.Context.Value(codecsKey{}); v != nil {
|
||||||
|
@ -15,7 +15,7 @@ type grpcRequest struct {
|
|||||||
contentType string
|
contentType string
|
||||||
request interface{}
|
request interface{}
|
||||||
opts client.RequestOptions
|
opts client.RequestOptions
|
||||||
codec codec.Codec
|
codec codec.Codec
|
||||||
}
|
}
|
||||||
|
|
||||||
func methodToGRPC(method string, request interface{}) string {
|
func methodToGRPC(method string, request interface{}) string {
|
||||||
|
@ -5,12 +5,13 @@ import (
|
|||||||
|
|
||||||
"github.com/micro/go-micro/codec"
|
"github.com/micro/go-micro/codec"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/encoding"
|
||||||
)
|
)
|
||||||
|
|
||||||
type response struct {
|
type response struct {
|
||||||
conn *grpc.ClientConn
|
conn *grpc.ClientConn
|
||||||
stream grpc.ClientStream
|
stream grpc.ClientStream
|
||||||
codec grpc.Codec
|
codec encoding.Codec
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read the response
|
// Read the response
|
||||||
|
10
stream.go
10
stream.go
@ -12,12 +12,12 @@ import (
|
|||||||
// Implements the streamer interface
|
// Implements the streamer interface
|
||||||
type grpcStream struct {
|
type grpcStream struct {
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
err error
|
err error
|
||||||
conn *grpc.ClientConn
|
conn *grpc.ClientConn
|
||||||
stream grpc.ClientStream
|
stream grpc.ClientStream
|
||||||
request client.Request
|
request client.Request
|
||||||
response client.Response
|
response client.Response
|
||||||
context context.Context
|
context context.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcStream) Context() context.Context {
|
func (g *grpcStream) Context() context.Context {
|
||||||
|
Loading…
Reference in New Issue
Block a user