pass timeout headers in proper format #70
13
grpc.go
13
grpc.go
@ -19,12 +19,13 @@ import (
|
|||||||
"go.unistack.org/micro/v3/metadata"
|
"go.unistack.org/micro/v3/metadata"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/credentials"
|
"google.golang.org/grpc/credentials"
|
||||||
|
"google.golang.org/grpc/credentials/insecure"
|
||||||
"google.golang.org/grpc/encoding"
|
"google.golang.org/grpc/encoding"
|
||||||
gmetadata "google.golang.org/grpc/metadata"
|
gmetadata "google.golang.org/grpc/metadata"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
defaultContentType = "application/grpc+proto"
|
DefaultContentType = "application/grpc+proto"
|
||||||
)
|
)
|
||||||
|
|
||||||
type grpcClient struct {
|
type grpcClient struct {
|
||||||
@ -62,7 +63,7 @@ func (g *grpcClient) secure(addr string) grpc.DialOption {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// other fallback to insecure
|
// other fallback to insecure
|
||||||
return grpc.WithInsecure()
|
return grpc.WithTransportCredentials(insecure.NewCredentials())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcClient) call(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
|
func (g *grpcClient) call(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
|
||||||
@ -78,7 +79,8 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// set timeout in nanoseconds
|
// set timeout in nanoseconds
|
||||||
header["timeout"] = fmt.Sprintf("%d", opts.RequestTimeout)
|
header["Grpc-Timeout"] = fmt.Sprintf("%dn", opts.RequestTimeout)
|
||||||
|
header["timeout"] = fmt.Sprintf("%dn", opts.RequestTimeout)
|
||||||
|
|
||||||
md := gmetadata.New(header)
|
md := gmetadata.New(header)
|
||||||
ctx = gmetadata.NewOutgoingContext(ctx, md)
|
ctx = gmetadata.NewOutgoingContext(ctx, md)
|
||||||
@ -161,7 +163,8 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
|
|||||||
|
|
||||||
// set timeout in nanoseconds
|
// set timeout in nanoseconds
|
||||||
if opts.StreamTimeout > time.Duration(0) {
|
if opts.StreamTimeout > time.Duration(0) {
|
||||||
header["timeout"] = fmt.Sprintf("%d", opts.StreamTimeout)
|
header["Grpc-Timeout"] = fmt.Sprintf("%dn", opts.StreamTimeout)
|
||||||
|
header["timeout"] = fmt.Sprintf("%dn", opts.StreamTimeout)
|
||||||
}
|
}
|
||||||
// set the content type for the request
|
// set the content type for the request
|
||||||
header["x-content-type"] = req.ContentType()
|
header["x-content-type"] = req.ContentType()
|
||||||
@ -756,7 +759,7 @@ func (g *grpcClient) getGrpcCallOptions() []grpc.CallOption {
|
|||||||
func NewClient(opts ...client.Option) client.Client {
|
func NewClient(opts ...client.Option) client.Client {
|
||||||
options := client.NewOptions(opts...)
|
options := client.NewOptions(opts...)
|
||||||
// default content type for grpc
|
// default content type for grpc
|
||||||
options.ContentType = defaultContentType
|
options.ContentType = DefaultContentType
|
||||||
|
|
||||||
rc := &grpcClient{
|
rc := &grpcClient{
|
||||||
opts: options,
|
opts: options,
|
||||||
|
@ -2,6 +2,7 @@ package grpc
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -59,6 +60,9 @@ func newPool(size int, ttl time.Duration, idle int, ms int) *pool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) getConn(ctx context.Context, addr string, opts ...grpc.DialOption) (*poolConn, error) {
|
func (p *pool) getConn(ctx context.Context, addr string, opts ...grpc.DialOption) (*poolConn, error) {
|
||||||
|
if strings.HasPrefix(addr, "http") {
|
||||||
|
addr = addr[strings.Index(addr, ":")+3:]
|
||||||
|
}
|
||||||
now := time.Now().Unix()
|
now := time.Now().Unix()
|
||||||
p.Lock()
|
p.Lock()
|
||||||
sp, ok := p.conns[addr]
|
sp, ok := p.conns[addr]
|
||||||
@ -126,7 +130,7 @@ func (p *pool) getConn(ctx context.Context, addr string, opts ...grpc.DialOption
|
|||||||
}
|
}
|
||||||
p.Unlock()
|
p.Unlock()
|
||||||
|
|
||||||
// create new conn
|
// create new conn)
|
||||||
cc, err := grpc.DialContext(ctx, addr, opts...)
|
cc, err := grpc.DialContext(ctx, addr, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
Loading…
Reference in New Issue
Block a user