From 68b0238a5d37fd48d70be14162bd6b33c4528e54 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Tue, 31 Mar 2020 23:22:11 +0100 Subject: [PATCH] add stream timeout option which defaults to 0 (#1456) * add stream timeout option which defaults to 0 * fix option --- client/grpc/grpc.go | 4 +++- client/options.go | 16 ++++++++++++++++ client/rpc_client.go | 4 +++- 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/client/grpc/grpc.go b/client/grpc/grpc.go index 5f14c25d..71a89f9c 100644 --- a/client/grpc/grpc.go +++ b/client/grpc/grpc.go @@ -221,7 +221,9 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client } // set timeout in nanoseconds - header["timeout"] = fmt.Sprintf("%d", opts.RequestTimeout) + if opts.StreamTimeout > time.Duration(0) { + header["timeout"] = fmt.Sprintf("%d", opts.StreamTimeout) + } // set the content type for the request header["x-content-type"] = req.ContentType() diff --git a/client/options.go b/client/options.go index 378957ed..da3d592e 100644 --- a/client/options.go +++ b/client/options.go @@ -57,6 +57,8 @@ type CallOptions struct { Retries int // Request/Response timeout RequestTimeout time.Duration + // Stream timeout for the stream + StreamTimeout time.Duration // Use the services own auth token ServiceToken bool @@ -227,6 +229,13 @@ func RequestTimeout(d time.Duration) Option { } } +// StreamTimeout sets the stream timeout +func StreamTimeout(d time.Duration) Option { + return func(o *Options) { + o.CallOptions.StreamTimeout = d + } +} + // Transport dial timeout func DialTimeout(d time.Duration) Option { return func(o *Options) { @@ -295,6 +304,13 @@ func WithRequestTimeout(d time.Duration) CallOption { } } +// WithStreamTimeout sets the stream timeout +func WithStreamTimeout(d time.Duration) CallOption { + return func(o *CallOptions) { + o.StreamTimeout = d + } +} + // WithDialTimeout is a CallOption which overrides that which // set in Options.CallOptions func WithDialTimeout(d time.Duration) CallOption { diff --git a/client/rpc_client.go b/client/rpc_client.go index 8b4b806b..ea68bfb2 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -198,7 +198,9 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request } // set timeout in nanoseconds - msg.Header["Timeout"] = fmt.Sprintf("%d", opts.RequestTimeout) + if opts.StreamTimeout > time.Duration(0) { + msg.Header["Timeout"] = fmt.Sprintf("%d", opts.StreamTimeout) + } // set the content type for the request msg.Header["Content-Type"] = req.ContentType() // set the accept header