Merge pull request #70 from unistack-org/headers

pass timeout headers in proper format
This commit is contained in:
Василий Толстов 2022-03-21 14:14:08 +03:00 committed by GitHub
commit 2b85cabe1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 13 additions and 6 deletions

13
grpc.go
View File

@ -19,12 +19,13 @@ import (
"go.unistack.org/micro/v3/metadata"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/encoding"
gmetadata "google.golang.org/grpc/metadata"
)
const (
defaultContentType = "application/grpc+proto"
DefaultContentType = "application/grpc+proto"
)
type grpcClient struct {
@ -62,7 +63,7 @@ func (g *grpcClient) secure(addr string) grpc.DialOption {
}
// other fallback to insecure
return grpc.WithInsecure()
return grpc.WithTransportCredentials(insecure.NewCredentials())
}
func (g *grpcClient) call(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
@ -78,7 +79,8 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
}
// set timeout in nanoseconds
header["timeout"] = fmt.Sprintf("%d", opts.RequestTimeout)
header["Grpc-Timeout"] = fmt.Sprintf("%dn", opts.RequestTimeout)
header["timeout"] = fmt.Sprintf("%dn", opts.RequestTimeout)
md := gmetadata.New(header)
ctx = gmetadata.NewOutgoingContext(ctx, md)
@ -161,7 +163,8 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
// set timeout in nanoseconds
if opts.StreamTimeout > time.Duration(0) {
header["timeout"] = fmt.Sprintf("%d", opts.StreamTimeout)
header["Grpc-Timeout"] = fmt.Sprintf("%dn", opts.StreamTimeout)
header["timeout"] = fmt.Sprintf("%dn", opts.StreamTimeout)
}
// set the content type for the request
header["x-content-type"] = req.ContentType()
@ -756,7 +759,7 @@ func (g *grpcClient) getGrpcCallOptions() []grpc.CallOption {
func NewClient(opts ...client.Option) client.Client {
options := client.NewOptions(opts...)
// default content type for grpc
options.ContentType = defaultContentType
options.ContentType = DefaultContentType
rc := &grpcClient{
opts: options,

View File

@ -2,6 +2,7 @@ package grpc
import (
"context"
"strings"
"sync"
"time"
@ -59,6 +60,9 @@ func newPool(size int, ttl time.Duration, idle int, ms int) *pool {
}
func (p *pool) getConn(ctx context.Context, addr string, opts ...grpc.DialOption) (*poolConn, error) {
if strings.HasPrefix(addr, "http") {
addr = addr[strings.Index(addr, ":")+3:]
}
now := time.Now().Unix()
p.Lock()
sp, ok := p.conns[addr]
@ -126,7 +130,7 @@ func (p *pool) getConn(ctx context.Context, addr string, opts ...grpc.DialOption
}
p.Unlock()
// create new conn
// create new conn)
cc, err := grpc.DialContext(ctx, addr, opts...)
if err != nil {
return nil, err