diff --git a/client/client.go b/client/client.go index 7c1c940e..259e717a 100644 --- a/client/client.go +++ b/client/client.go @@ -22,6 +22,8 @@ supported. package client import ( + "time" + "golang.org/x/net/context" ) @@ -71,6 +73,9 @@ type RequestOption func(*RequestOptions) var ( DefaultClient Client = newRpcClient() + + DefaultRetries = 1 + DefaultRequestTimeout = time.Second * 5 ) // Makes a synchronous call to a service using the default client diff --git a/client/options.go b/client/options.go index bcf4fdf7..76871a63 100644 --- a/client/options.go +++ b/client/options.go @@ -1,6 +1,8 @@ package client import ( + "time" + "github.com/micro/go-micro/broker" "github.com/micro/go-micro/codec" "github.com/micro/go-micro/registry" @@ -9,14 +11,15 @@ import ( ) type Options struct { - ContentType string - Broker broker.Broker - Codecs map[string]codec.NewCodec - Registry registry.Registry - Selector selector.Selector - Transport transport.Transport - Wrappers []Wrapper - Retries int + ContentType string + Broker broker.Broker + Codecs map[string]codec.NewCodec + Registry registry.Registry + Selector selector.Selector + Transport transport.Transport + Wrappers []Wrapper + Retries int + RequestTimeout time.Duration // Other options to be used by client implementations Options map[string]string @@ -43,17 +46,15 @@ type RequestOptions struct { func newOptions(options ...Option) Options { opts := Options{ - Codecs: make(map[string]codec.NewCodec), + Codecs: make(map[string]codec.NewCodec), + Retries: DefaultRetries, + RequestTimeout: DefaultRequestTimeout, } for _, o := range options { o(&opts) } - if opts.Retries == 0 { - opts.Retries = 1 - } - if len(opts.ContentType) == 0 { opts.ContentType = defaultContentType } @@ -128,13 +129,22 @@ func Wrap(w Wrapper) Option { } } -// Number of retries when making the request +// Number of retries when making the request. +// Should this be a Call Option? func Retries(i int) Option { return func(o *Options) { o.Retries = i } } +// The request timeout. +// Should this be a Call Option? +func RequestTimeout(d time.Duration) Option { + return func(o *Options) { + o.RequestTimeout = d + } +} + // Call Options func WithSelectOption(so selector.SelectOption) CallOption { diff --git a/client/rpc_client.go b/client/rpc_client.go index 97219576..0fbab585 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "sync" + "time" "github.com/micro/go-micro/broker" "github.com/micro/go-micro/codec" @@ -76,11 +77,24 @@ func (r *rpcClient) call(ctx context.Context, address string, request Request, r defer c.Close() client := newClientWithCodec(newRpcPlusCodec(msg, c, cf)) - err = client.Call(ctx, request.Service(), request.Method(), request.Request(), response) - if err != nil { - return err + defer client.Close() + + ch := make(chan error, 1) + + go func() { + select { + case ch <- client.Call(ctx, request.Service(), request.Method(), request.Request(), response): + default: + } + }() + + select { + case err = <-ch: + case <-time.After(r.opts.RequestTimeout): + err = errors.New("go.micro.client", "request timeout", 408) } - return client.Close() + + return err } func (r *rpcClient) stream(ctx context.Context, address string, req Request) (Streamer, error) { @@ -116,7 +130,21 @@ func (r *rpcClient) stream(ctx context.Context, address string, req Request) (St codec: newRpcPlusCodec(msg, c, cf), } - err = stream.Send(req.Request()) + ch := make(chan error, 1) + + go func() { + select { + case ch <- stream.Send(req.Request()): + default: + } + }() + + select { + case err = <-ch: + case <-time.After(r.opts.RequestTimeout): + err = errors.New("go.micro.client", "request timeout", 408) + } + return stream, err } @@ -148,6 +176,8 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac return errors.InternalServerError("go.micro.client", err.Error()) } + var grr error + for i := 0; i < r.opts.Retries; i++ { node, err := next() if err != nil && err == selector.ErrNotFound { @@ -161,16 +191,16 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac address = fmt.Sprintf("%s:%d", address, node.Port) } - err = r.call(ctx, address, request, response) - r.opts.Selector.Mark(request.Service(), node, err) + grr = r.call(ctx, address, request, response) + r.opts.Selector.Mark(request.Service(), node, grr) // if the call succeeded lets bail early - if err == nil { + if grr == nil { return nil } } - return err + return grr } func (r *rpcClient) StreamRemote(ctx context.Context, address string, request Request, opts ...CallOption) (Streamer, error) { @@ -191,6 +221,7 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt } var stream Streamer + var grr error for i := 0; i < r.opts.Retries; i++ { node, err := next() @@ -205,16 +236,16 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt address = fmt.Sprintf("%s:%d", address, node.Port) } - stream, err = r.stream(ctx, address, request) - r.opts.Selector.Mark(request.Service(), node, err) + stream, grr = r.stream(ctx, address, request) + r.opts.Selector.Mark(request.Service(), node, grr) // bail early if succeeds - if err == nil { + if grr == nil { return stream, nil } } - return stream, err + return stream, grr } func (r *rpcClient) Publish(ctx context.Context, p Publication, opts ...PublishOption) error {