pass timeout headers in proper format #70

Merged
vtolstov merged 2 commits from headers into v3 2022-03-21 14:14:09 +03:00
2 changed files with 13 additions and 6 deletions
Showing only changes of commit 42b93ce57e - Show all commits

13
grpc.go
View File

@ -19,12 +19,13 @@ import (
"go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v3/metadata"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/encoding" "google.golang.org/grpc/encoding"
gmetadata "google.golang.org/grpc/metadata" gmetadata "google.golang.org/grpc/metadata"
) )
const ( const (
defaultContentType = "application/grpc+proto" DefaultContentType = "application/grpc+proto"
) )
type grpcClient struct { type grpcClient struct {
@ -62,7 +63,7 @@ func (g *grpcClient) secure(addr string) grpc.DialOption {
} }
// other fallback to insecure // other fallback to insecure
return grpc.WithInsecure() return grpc.WithTransportCredentials(insecure.NewCredentials())
} }
func (g *grpcClient) call(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error { func (g *grpcClient) call(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
@ -78,7 +79,8 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
} }
// set timeout in nanoseconds // set timeout in nanoseconds
header["timeout"] = fmt.Sprintf("%d", opts.RequestTimeout) header["Grpc-Timeout"] = fmt.Sprintf("%dn", opts.RequestTimeout)
header["timeout"] = fmt.Sprintf("%dn", opts.RequestTimeout)
md := gmetadata.New(header) md := gmetadata.New(header)
ctx = gmetadata.NewOutgoingContext(ctx, md) ctx = gmetadata.NewOutgoingContext(ctx, md)
@ -161,7 +163,8 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
// set timeout in nanoseconds // set timeout in nanoseconds
if opts.StreamTimeout > time.Duration(0) { if opts.StreamTimeout > time.Duration(0) {
header["timeout"] = fmt.Sprintf("%d", opts.StreamTimeout) header["Grpc-Timeout"] = fmt.Sprintf("%dn", opts.StreamTimeout)
header["timeout"] = fmt.Sprintf("%dn", opts.StreamTimeout)
} }
// set the content type for the request // set the content type for the request
header["x-content-type"] = req.ContentType() header["x-content-type"] = req.ContentType()
@ -756,7 +759,7 @@ func (g *grpcClient) getGrpcCallOptions() []grpc.CallOption {
func NewClient(opts ...client.Option) client.Client { func NewClient(opts ...client.Option) client.Client {
options := client.NewOptions(opts...) options := client.NewOptions(opts...)
// default content type for grpc // default content type for grpc
options.ContentType = defaultContentType options.ContentType = DefaultContentType
rc := &grpcClient{ rc := &grpcClient{
opts: options, opts: options,

View File

@ -2,6 +2,7 @@ package grpc
import ( import (
"context" "context"
"strings"
"sync" "sync"
"time" "time"
@ -59,6 +60,9 @@ func newPool(size int, ttl time.Duration, idle int, ms int) *pool {
} }
func (p *pool) getConn(ctx context.Context, addr string, opts ...grpc.DialOption) (*poolConn, error) { func (p *pool) getConn(ctx context.Context, addr string, opts ...grpc.DialOption) (*poolConn, error) {
if strings.HasPrefix(addr, "http") {
addr = addr[strings.Index(addr, ":")+3:]
}
now := time.Now().Unix() now := time.Now().Unix()
p.Lock() p.Lock()
sp, ok := p.conns[addr] sp, ok := p.conns[addr]
@ -126,7 +130,7 @@ func (p *pool) getConn(ctx context.Context, addr string, opts ...grpc.DialOption
} }
p.Unlock() p.Unlock()
// create new conn // create new conn)
cc, err := grpc.DialContext(ctx, addr, opts...) cc, err := grpc.DialContext(ctx, addr, opts...)
if err != nil { if err != nil {
return nil, err return nil, err