diff --git a/grpc.go b/grpc.go index de19c45..d5c0c03 100644 --- a/grpc.go +++ b/grpc.go @@ -19,12 +19,13 @@ import ( "go.unistack.org/micro/v3/metadata" "google.golang.org/grpc" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/encoding" gmetadata "google.golang.org/grpc/metadata" ) const ( - defaultContentType = "application/grpc+proto" + DefaultContentType = "application/grpc+proto" ) type grpcClient struct { @@ -62,7 +63,7 @@ func (g *grpcClient) secure(addr string) grpc.DialOption { } // other fallback to insecure - return grpc.WithInsecure() + return grpc.WithTransportCredentials(insecure.NewCredentials()) } func (g *grpcClient) call(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error { @@ -78,7 +79,8 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request, } // set timeout in nanoseconds - header["timeout"] = fmt.Sprintf("%d", opts.RequestTimeout) + header["Grpc-Timeout"] = fmt.Sprintf("%dn", opts.RequestTimeout) + header["timeout"] = fmt.Sprintf("%dn", opts.RequestTimeout) md := gmetadata.New(header) ctx = gmetadata.NewOutgoingContext(ctx, md) @@ -161,7 +163,8 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request // set timeout in nanoseconds if opts.StreamTimeout > time.Duration(0) { - header["timeout"] = fmt.Sprintf("%d", opts.StreamTimeout) + header["Grpc-Timeout"] = fmt.Sprintf("%dn", opts.StreamTimeout) + header["timeout"] = fmt.Sprintf("%dn", opts.StreamTimeout) } // set the content type for the request header["x-content-type"] = req.ContentType() @@ -756,7 +759,7 @@ func (g *grpcClient) getGrpcCallOptions() []grpc.CallOption { func NewClient(opts ...client.Option) client.Client { options := client.NewOptions(opts...) // default content type for grpc - options.ContentType = defaultContentType + options.ContentType = DefaultContentType rc := &grpcClient{ opts: options, diff --git a/grpc_pool.go b/grpc_pool.go index ab1ae79..f8da979 100644 --- a/grpc_pool.go +++ b/grpc_pool.go @@ -2,6 +2,7 @@ package grpc import ( "context" + "strings" "sync" "time" @@ -59,6 +60,9 @@ func newPool(size int, ttl time.Duration, idle int, ms int) *pool { } func (p *pool) getConn(ctx context.Context, addr string, opts ...grpc.DialOption) (*poolConn, error) { + if strings.HasPrefix(addr, "http") { + addr = addr[strings.Index(addr, ":")+3:] + } now := time.Now().Unix() p.Lock() sp, ok := p.conns[addr] @@ -126,7 +130,7 @@ func (p *pool) getConn(ctx context.Context, addr string, opts ...grpc.DialOption } p.Unlock() - // create new conn + // create new conn) cc, err := grpc.DialContext(ctx, addr, opts...) if err != nil { return nil, err