fix stream timeout

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2021-04-29 22:38:38 +03:00
parent 05add422d1
commit 3cbc879769

22
http.go
View File

@ -181,9 +181,10 @@ func (h *httpClient) stream(ctx context.Context, addr string, req client.Request
if len(opts.ContentType) > 0 { if len(opts.ContentType) > 0 {
ct = opts.ContentType ct = opts.ContentType
} }
// set timeout in nanoseconds // set timeout in nanoseconds
header.Set("Timeout", fmt.Sprintf("%d", opts.RequestTimeout)) if opts.StreamTimeout > time.Duration(0) {
header.Set("Timeout", fmt.Sprintf("%d", opts.StreamTimeout))
}
// set the content type for the request // set the content type for the request
header.Set("Content-Type", ct) header.Set("Content-Type", ct)
@ -399,22 +400,22 @@ func (h *httpClient) Call(ctx context.Context, req client.Request, rsp interface
func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) { func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
// make a copy of call opts // make a copy of call opts
callOpts := h.opts.CallOptions callOpts := h.opts.CallOptions
for _, opt := range opts { for _, o := range opts {
opt(&callOpts) o(&callOpts)
} }
// check if we already have a deadline // check if we already have a deadline
d, ok := ctx.Deadline() d, ok := ctx.Deadline()
if !ok { if !ok && callOpts.StreamTimeout > time.Duration(0) {
var cancel context.CancelFunc var cancel context.CancelFunc
// no deadline so we create a new one // no deadline so we create a new one
ctx, cancel = context.WithTimeout(ctx, callOpts.RequestTimeout) ctx, cancel = context.WithTimeout(ctx, callOpts.StreamTimeout)
defer cancel() defer cancel()
} else { } else {
// got a deadline so no need to setup context // got a deadline so no need to setup context
// but we need to set the timeout we pass along // but we need to set the timeout we pass along
opt := client.WithRequestTimeout(time.Until(d)) o := client.WithStreamTimeout(time.Until(d))
opt(&callOpts) o(&callOpts)
} }
// should we noop right here? // should we noop right here?
@ -426,10 +427,7 @@ func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...cli
/* /*
// make copy of call method // make copy of call method
hstream, err := h.stream() hstream := h.stream
if err != nil {
return nil, err
}
// wrap the call in reverse // wrap the call in reverse
for i := len(callOpts.CallWrappers); i > 0; i-- { for i := len(callOpts.CallWrappers); i > 0; i-- {
hstream = callOpts.CallWrappers[i-1](hstream) hstream = callOpts.CallWrappers[i-1](hstream)