From 56c6993eb846896b44f2cbaaf5ee3b6cc4a8c573 Mon Sep 17 00:00:00 2001 From: Asim Date: Tue, 5 Apr 2016 18:07:07 +0100 Subject: [PATCH] Set CallOptions as struct in Options. Can then be overridden easily during Call/Stream --- client/options.go | 72 ++++++++++++++++++++++++++--------- client/options_test.go | 85 ++++++++++++++++++++++++++++++++++++++++++ client/rpc_client.go | 54 ++++++++++++++++++--------- 3 files changed, 176 insertions(+), 35 deletions(-) create mode 100644 client/options_test.go diff --git a/client/options.go b/client/options.go index 25f88041..9d310943 100644 --- a/client/options.go +++ b/client/options.go @@ -13,16 +13,21 @@ import ( ) type Options struct { - ContentType string - Broker broker.Broker - Codecs map[string]codec.NewCodec - Registry registry.Registry - Selector selector.Selector - Transport transport.Transport - Wrappers []Wrapper - Retries int - RequestTimeout time.Duration - DialTimeout time.Duration + // Used to select codec + ContentType string + + // Plugged interfaces + Broker broker.Broker + Codecs map[string]codec.NewCodec + Registry registry.Registry + Selector selector.Selector + Transport transport.Transport + + // Middleware for client + Wrappers []Wrapper + + // Default Call Options + CallOptions CallOptions // Other options for implementations of the interface // can be stored in a context @@ -32,6 +37,13 @@ type Options struct { type CallOptions struct { SelectOptions []selector.SelectOption + // Transport Dial Timeout + DialTimeout time.Duration + // Number of Call attempts + Retries int + // Request/Response timeout + RequestTimeout time.Duration + // Other options for implementations of the interface // can be stored in a context Context context.Context @@ -53,10 +65,12 @@ type RequestOptions struct { func newOptions(options ...Option) Options { opts := Options{ - Codecs: make(map[string]codec.NewCodec), - Retries: DefaultRetries, - RequestTimeout: DefaultRequestTimeout, - DialTimeout: transport.DefaultDialTimeout, + Codecs: make(map[string]codec.NewCodec), + CallOptions: CallOptions{ + Retries: DefaultRetries, + RequestTimeout: DefaultRequestTimeout, + DialTimeout: transport.DefaultDialTimeout, + }, } for _, o := range options { @@ -141,7 +155,7 @@ func Wrap(w Wrapper) Option { // Should this be a Call Option? func Retries(i int) Option { return func(o *Options) { - o.Retries = i + o.CallOptions.Retries = i } } @@ -149,14 +163,14 @@ func Retries(i int) Option { // Should this be a Call Option? func RequestTimeout(d time.Duration) Option { return func(o *Options) { - o.RequestTimeout = d + o.CallOptions.RequestTimeout = d } } // Transport dial timeout func DialTimeout(d time.Duration) Option { return func(o *Options) { - o.DialTimeout = d + o.CallOptions.DialTimeout = d } } @@ -168,6 +182,30 @@ func WithSelectOption(so selector.SelectOption) CallOption { } } +// WithRetries is a CallOption which overrides that which +// set in Options.CallOptions +func WithRetries(i int) CallOption { + return func(o *CallOptions) { + o.Retries = i + } +} + +// WithRequestTimeout is a CallOption which overrides that which +// set in Options.CallOptions +func WithRequestTimeout(d time.Duration) CallOption { + return func(o *CallOptions) { + o.RequestTimeout = d + } +} + +// WithDialTimeout is a CallOption which overrides that which +// set in Options.CallOptions +func WithDialTimeout(d time.Duration) CallOption { + return func(o *CallOptions) { + o.DialTimeout = d + } +} + // Request Options func StreamingRequest() RequestOption { diff --git a/client/options_test.go b/client/options_test.go new file mode 100644 index 00000000..9252994c --- /dev/null +++ b/client/options_test.go @@ -0,0 +1,85 @@ +package client + +import ( + "testing" + "time" + + "github.com/micro/go-micro/transport" +) + +func TestCallOptions(t *testing.T) { + testData := []struct { + set bool + retries int + rtimeout time.Duration + dtimeout time.Duration + }{ + {false, DefaultRetries, DefaultRequestTimeout, transport.DefaultDialTimeout}, + {true, 10, time.Second, time.Second * 2}, + } + + for _, d := range testData { + var opts Options + var cl Client + + if d.set { + opts = newOptions( + Retries(d.retries), + RequestTimeout(d.rtimeout), + DialTimeout(d.dtimeout), + ) + + cl = NewClient( + Retries(d.retries), + RequestTimeout(d.rtimeout), + DialTimeout(d.dtimeout), + ) + } else { + opts = newOptions() + cl = NewClient() + } + + // test options and those set in client + for _, o := range []Options{opts, cl.Options()} { + if o.CallOptions.Retries != d.retries { + t.Fatalf("Expected retries %v got %v", d.retries, o.CallOptions.Retries) + } + + if o.CallOptions.RequestTimeout != d.rtimeout { + t.Fatalf("Expected request timeout %v got %v", d.rtimeout, o.CallOptions.RequestTimeout) + } + + if o.CallOptions.DialTimeout != d.dtimeout { + t.Fatalf("Expected %v got %v", d.dtimeout, o.CallOptions.DialTimeout) + } + + // copy CallOptions + callOpts := o.CallOptions + + // create new opts + cretries := WithRetries(o.CallOptions.Retries * 10) + crtimeout := WithRequestTimeout(o.CallOptions.RequestTimeout * (time.Second * 10)) + cdtimeout := WithDialTimeout(o.CallOptions.DialTimeout * (time.Second * 10)) + + // set call options + for _, opt := range []CallOption{cretries, crtimeout, cdtimeout} { + opt(&callOpts) + } + + // check call options + if e := o.CallOptions.Retries * 10; callOpts.Retries != e { + t.Fatalf("Expected retries %v got %v", e, callOpts.Retries) + } + + if e := o.CallOptions.RequestTimeout * (time.Second * 10); callOpts.RequestTimeout != e { + t.Fatalf("Expected request timeout %v got %v", e, callOpts.RequestTimeout) + } + + if e := o.CallOptions.DialTimeout * (time.Second * 10); callOpts.DialTimeout != e { + t.Fatalf("Expected %v got %v", e, callOpts.DialTimeout) + } + + } + + } +} diff --git a/client/rpc_client.go b/client/rpc_client.go index d9628697..d51dfc2f 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -51,7 +51,7 @@ func (r *rpcClient) newCodec(contentType string) (codec.NewCodec, error) { return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType) } -func (r *rpcClient) call(ctx context.Context, address string, req Request, resp interface{}) error { +func (r *rpcClient) call(ctx context.Context, address string, req Request, resp interface{}, opts CallOptions) error { msg := &transport.Message{ Header: make(map[string]string), } @@ -70,7 +70,7 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp return errors.InternalServerError("go.micro.client", err.Error()) } - c, err := r.opts.Transport.Dial(address, transport.WithTimeout(r.opts.DialTimeout)) + c, err := r.opts.Transport.Dial(address, transport.WithTimeout(opts.DialTimeout)) if err != nil { return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) } @@ -108,14 +108,14 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp select { case err = <-ch: - case <-time.After(r.opts.RequestTimeout): + case <-time.After(opts.RequestTimeout): err = errors.New("go.micro.client", "request timeout", 408) } return err } -func (r *rpcClient) stream(ctx context.Context, address string, req Request) (Streamer, error) { +func (r *rpcClient) stream(ctx context.Context, address string, req Request, opts CallOptions) (Streamer, error) { msg := &transport.Message{ Header: make(map[string]string), } @@ -134,7 +134,7 @@ func (r *rpcClient) stream(ctx context.Context, address string, req Request) (St return nil, errors.InternalServerError("go.micro.client", err.Error()) } - c, err := r.opts.Transport.Dial(address, transport.WithStream(), transport.WithTimeout(r.opts.DialTimeout)) + c, err := r.opts.Transport.Dial(address, transport.WithStream(), transport.WithTimeout(opts.DialTimeout)) if err != nil { return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) } @@ -156,7 +156,7 @@ func (r *rpcClient) stream(ctx context.Context, address string, req Request) (St select { case err = <-ch: - case <-time.After(r.opts.RequestTimeout): + case <-time.After(opts.RequestTimeout): err = errors.New("go.micro.client", "request timeout", 408) } @@ -175,16 +175,25 @@ func (r *rpcClient) Options() Options { } func (r *rpcClient) CallRemote(ctx context.Context, address string, request Request, response interface{}, opts ...CallOption) error { - return r.call(ctx, address, request, response) + // make a copy of call opts + callOpts := r.opts.CallOptions + + for _, opt := range opts { + opt(&callOpts) + } + + return r.call(ctx, address, request, response, callOpts) } func (r *rpcClient) Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error { - var copts CallOptions + // make a copy of call opts + callOpts := r.opts.CallOptions + for _, opt := range opts { - opt(&copts) + opt(&callOpts) } - next, err := r.opts.Selector.Select(request.Service(), copts.SelectOptions...) + next, err := r.opts.Selector.Select(request.Service(), callOpts.SelectOptions...) if err != nil && err == selector.ErrNotFound { return errors.NotFound("go.micro.client", err.Error()) } else if err != nil { @@ -193,7 +202,7 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac var grr error - for i := 0; i < r.opts.Retries; i++ { + for i := 0; i < callOpts.Retries; i++ { node, err := next() if err != nil && err == selector.ErrNotFound { return errors.NotFound("go.micro.client", err.Error()) @@ -206,7 +215,7 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac address = fmt.Sprintf("%s:%d", address, node.Port) } - grr = r.call(ctx, address, request, response) + grr = r.call(ctx, address, request, response, callOpts) r.opts.Selector.Mark(request.Service(), node, grr) // if the call succeeded lets bail early @@ -219,16 +228,25 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac } func (r *rpcClient) StreamRemote(ctx context.Context, address string, request Request, opts ...CallOption) (Streamer, error) { - return r.stream(ctx, address, request) + // make a copy of call opts + callOpts := r.opts.CallOptions + + for _, opt := range opts { + opt(&callOpts) + } + + return r.stream(ctx, address, request, callOpts) } func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOption) (Streamer, error) { - var copts CallOptions + // make a copy of call opts + callOpts := r.opts.CallOptions + for _, opt := range opts { - opt(&copts) + opt(&callOpts) } - next, err := r.opts.Selector.Select(request.Service(), copts.SelectOptions...) + next, err := r.opts.Selector.Select(request.Service(), callOpts.SelectOptions...) if err != nil && err == selector.ErrNotFound { return nil, errors.NotFound("go.micro.client", err.Error()) } else if err != nil { @@ -238,7 +256,7 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt var stream Streamer var grr error - for i := 0; i < r.opts.Retries; i++ { + for i := 0; i < callOpts.Retries; i++ { node, err := next() if err != nil && err == selector.ErrNotFound { return nil, errors.NotFound("go.micro.client", err.Error()) @@ -251,7 +269,7 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt address = fmt.Sprintf("%s:%d", address, node.Port) } - stream, grr = r.stream(ctx, address, request) + stream, grr = r.stream(ctx, address, request, callOpts) r.opts.Selector.Mark(request.Service(), node, grr) // bail early if succeeds