Set CallOptions as struct in Options. Can then be overridden easily during Call/Stream

This commit is contained in:
Asim 2016-04-05 18:07:07 +01:00
parent c462d7776c
commit 56c6993eb8
3 changed files with 176 additions and 35 deletions

View File

@ -13,16 +13,21 @@ import (
) )
type Options struct { type Options struct {
ContentType string // Used to select codec
Broker broker.Broker ContentType string
Codecs map[string]codec.NewCodec
Registry registry.Registry // Plugged interfaces
Selector selector.Selector Broker broker.Broker
Transport transport.Transport Codecs map[string]codec.NewCodec
Wrappers []Wrapper Registry registry.Registry
Retries int Selector selector.Selector
RequestTimeout time.Duration Transport transport.Transport
DialTimeout time.Duration
// Middleware for client
Wrappers []Wrapper
// Default Call Options
CallOptions CallOptions
// Other options for implementations of the interface // Other options for implementations of the interface
// can be stored in a context // can be stored in a context
@ -32,6 +37,13 @@ type Options struct {
type CallOptions struct { type CallOptions struct {
SelectOptions []selector.SelectOption SelectOptions []selector.SelectOption
// Transport Dial Timeout
DialTimeout time.Duration
// Number of Call attempts
Retries int
// Request/Response timeout
RequestTimeout time.Duration
// Other options for implementations of the interface // Other options for implementations of the interface
// can be stored in a context // can be stored in a context
Context context.Context Context context.Context
@ -53,10 +65,12 @@ type RequestOptions struct {
func newOptions(options ...Option) Options { func newOptions(options ...Option) Options {
opts := Options{ opts := Options{
Codecs: make(map[string]codec.NewCodec), Codecs: make(map[string]codec.NewCodec),
Retries: DefaultRetries, CallOptions: CallOptions{
RequestTimeout: DefaultRequestTimeout, Retries: DefaultRetries,
DialTimeout: transport.DefaultDialTimeout, RequestTimeout: DefaultRequestTimeout,
DialTimeout: transport.DefaultDialTimeout,
},
} }
for _, o := range options { for _, o := range options {
@ -141,7 +155,7 @@ func Wrap(w Wrapper) Option {
// Should this be a Call Option? // Should this be a Call Option?
func Retries(i int) Option { func Retries(i int) Option {
return func(o *Options) { return func(o *Options) {
o.Retries = i o.CallOptions.Retries = i
} }
} }
@ -149,14 +163,14 @@ func Retries(i int) Option {
// Should this be a Call Option? // Should this be a Call Option?
func RequestTimeout(d time.Duration) Option { func RequestTimeout(d time.Duration) Option {
return func(o *Options) { return func(o *Options) {
o.RequestTimeout = d o.CallOptions.RequestTimeout = d
} }
} }
// Transport dial timeout // Transport dial timeout
func DialTimeout(d time.Duration) Option { func DialTimeout(d time.Duration) Option {
return func(o *Options) { return func(o *Options) {
o.DialTimeout = d o.CallOptions.DialTimeout = d
} }
} }
@ -168,6 +182,30 @@ func WithSelectOption(so selector.SelectOption) CallOption {
} }
} }
// WithRetries is a CallOption which overrides that which
// set in Options.CallOptions
func WithRetries(i int) CallOption {
return func(o *CallOptions) {
o.Retries = i
}
}
// WithRequestTimeout is a CallOption which overrides that which
// set in Options.CallOptions
func WithRequestTimeout(d time.Duration) CallOption {
return func(o *CallOptions) {
o.RequestTimeout = d
}
}
// WithDialTimeout is a CallOption which overrides that which
// set in Options.CallOptions
func WithDialTimeout(d time.Duration) CallOption {
return func(o *CallOptions) {
o.DialTimeout = d
}
}
// Request Options // Request Options
func StreamingRequest() RequestOption { func StreamingRequest() RequestOption {

85
client/options_test.go Normal file
View File

@ -0,0 +1,85 @@
package client
import (
"testing"
"time"
"github.com/micro/go-micro/transport"
)
func TestCallOptions(t *testing.T) {
testData := []struct {
set bool
retries int
rtimeout time.Duration
dtimeout time.Duration
}{
{false, DefaultRetries, DefaultRequestTimeout, transport.DefaultDialTimeout},
{true, 10, time.Second, time.Second * 2},
}
for _, d := range testData {
var opts Options
var cl Client
if d.set {
opts = newOptions(
Retries(d.retries),
RequestTimeout(d.rtimeout),
DialTimeout(d.dtimeout),
)
cl = NewClient(
Retries(d.retries),
RequestTimeout(d.rtimeout),
DialTimeout(d.dtimeout),
)
} else {
opts = newOptions()
cl = NewClient()
}
// test options and those set in client
for _, o := range []Options{opts, cl.Options()} {
if o.CallOptions.Retries != d.retries {
t.Fatalf("Expected retries %v got %v", d.retries, o.CallOptions.Retries)
}
if o.CallOptions.RequestTimeout != d.rtimeout {
t.Fatalf("Expected request timeout %v got %v", d.rtimeout, o.CallOptions.RequestTimeout)
}
if o.CallOptions.DialTimeout != d.dtimeout {
t.Fatalf("Expected %v got %v", d.dtimeout, o.CallOptions.DialTimeout)
}
// copy CallOptions
callOpts := o.CallOptions
// create new opts
cretries := WithRetries(o.CallOptions.Retries * 10)
crtimeout := WithRequestTimeout(o.CallOptions.RequestTimeout * (time.Second * 10))
cdtimeout := WithDialTimeout(o.CallOptions.DialTimeout * (time.Second * 10))
// set call options
for _, opt := range []CallOption{cretries, crtimeout, cdtimeout} {
opt(&callOpts)
}
// check call options
if e := o.CallOptions.Retries * 10; callOpts.Retries != e {
t.Fatalf("Expected retries %v got %v", e, callOpts.Retries)
}
if e := o.CallOptions.RequestTimeout * (time.Second * 10); callOpts.RequestTimeout != e {
t.Fatalf("Expected request timeout %v got %v", e, callOpts.RequestTimeout)
}
if e := o.CallOptions.DialTimeout * (time.Second * 10); callOpts.DialTimeout != e {
t.Fatalf("Expected %v got %v", e, callOpts.DialTimeout)
}
}
}
}

View File

@ -51,7 +51,7 @@ func (r *rpcClient) newCodec(contentType string) (codec.NewCodec, error) {
return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType) return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType)
} }
func (r *rpcClient) call(ctx context.Context, address string, req Request, resp interface{}) error { func (r *rpcClient) call(ctx context.Context, address string, req Request, resp interface{}, opts CallOptions) error {
msg := &transport.Message{ msg := &transport.Message{
Header: make(map[string]string), Header: make(map[string]string),
} }
@ -70,7 +70,7 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp
return errors.InternalServerError("go.micro.client", err.Error()) return errors.InternalServerError("go.micro.client", err.Error())
} }
c, err := r.opts.Transport.Dial(address, transport.WithTimeout(r.opts.DialTimeout)) c, err := r.opts.Transport.Dial(address, transport.WithTimeout(opts.DialTimeout))
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
} }
@ -108,14 +108,14 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp
select { select {
case err = <-ch: case err = <-ch:
case <-time.After(r.opts.RequestTimeout): case <-time.After(opts.RequestTimeout):
err = errors.New("go.micro.client", "request timeout", 408) err = errors.New("go.micro.client", "request timeout", 408)
} }
return err return err
} }
func (r *rpcClient) stream(ctx context.Context, address string, req Request) (Streamer, error) { func (r *rpcClient) stream(ctx context.Context, address string, req Request, opts CallOptions) (Streamer, error) {
msg := &transport.Message{ msg := &transport.Message{
Header: make(map[string]string), Header: make(map[string]string),
} }
@ -134,7 +134,7 @@ func (r *rpcClient) stream(ctx context.Context, address string, req Request) (St
return nil, errors.InternalServerError("go.micro.client", err.Error()) return nil, errors.InternalServerError("go.micro.client", err.Error())
} }
c, err := r.opts.Transport.Dial(address, transport.WithStream(), transport.WithTimeout(r.opts.DialTimeout)) c, err := r.opts.Transport.Dial(address, transport.WithStream(), transport.WithTimeout(opts.DialTimeout))
if err != nil { if err != nil {
return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
} }
@ -156,7 +156,7 @@ func (r *rpcClient) stream(ctx context.Context, address string, req Request) (St
select { select {
case err = <-ch: case err = <-ch:
case <-time.After(r.opts.RequestTimeout): case <-time.After(opts.RequestTimeout):
err = errors.New("go.micro.client", "request timeout", 408) err = errors.New("go.micro.client", "request timeout", 408)
} }
@ -175,16 +175,25 @@ func (r *rpcClient) Options() Options {
} }
func (r *rpcClient) CallRemote(ctx context.Context, address string, request Request, response interface{}, opts ...CallOption) error { func (r *rpcClient) CallRemote(ctx context.Context, address string, request Request, response interface{}, opts ...CallOption) error {
return r.call(ctx, address, request, response) // make a copy of call opts
callOpts := r.opts.CallOptions
for _, opt := range opts {
opt(&callOpts)
}
return r.call(ctx, address, request, response, callOpts)
} }
func (r *rpcClient) Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error { func (r *rpcClient) Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error {
var copts CallOptions // make a copy of call opts
callOpts := r.opts.CallOptions
for _, opt := range opts { for _, opt := range opts {
opt(&copts) opt(&callOpts)
} }
next, err := r.opts.Selector.Select(request.Service(), copts.SelectOptions...) next, err := r.opts.Selector.Select(request.Service(), callOpts.SelectOptions...)
if err != nil && err == selector.ErrNotFound { if err != nil && err == selector.ErrNotFound {
return errors.NotFound("go.micro.client", err.Error()) return errors.NotFound("go.micro.client", err.Error())
} else if err != nil { } else if err != nil {
@ -193,7 +202,7 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
var grr error var grr error
for i := 0; i < r.opts.Retries; i++ { for i := 0; i < callOpts.Retries; i++ {
node, err := next() node, err := next()
if err != nil && err == selector.ErrNotFound { if err != nil && err == selector.ErrNotFound {
return errors.NotFound("go.micro.client", err.Error()) return errors.NotFound("go.micro.client", err.Error())
@ -206,7 +215,7 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
address = fmt.Sprintf("%s:%d", address, node.Port) address = fmt.Sprintf("%s:%d", address, node.Port)
} }
grr = r.call(ctx, address, request, response) grr = r.call(ctx, address, request, response, callOpts)
r.opts.Selector.Mark(request.Service(), node, grr) r.opts.Selector.Mark(request.Service(), node, grr)
// if the call succeeded lets bail early // if the call succeeded lets bail early
@ -219,16 +228,25 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
} }
func (r *rpcClient) StreamRemote(ctx context.Context, address string, request Request, opts ...CallOption) (Streamer, error) { func (r *rpcClient) StreamRemote(ctx context.Context, address string, request Request, opts ...CallOption) (Streamer, error) {
return r.stream(ctx, address, request) // make a copy of call opts
callOpts := r.opts.CallOptions
for _, opt := range opts {
opt(&callOpts)
}
return r.stream(ctx, address, request, callOpts)
} }
func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOption) (Streamer, error) { func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOption) (Streamer, error) {
var copts CallOptions // make a copy of call opts
callOpts := r.opts.CallOptions
for _, opt := range opts { for _, opt := range opts {
opt(&copts) opt(&callOpts)
} }
next, err := r.opts.Selector.Select(request.Service(), copts.SelectOptions...) next, err := r.opts.Selector.Select(request.Service(), callOpts.SelectOptions...)
if err != nil && err == selector.ErrNotFound { if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", err.Error()) return nil, errors.NotFound("go.micro.client", err.Error())
} else if err != nil { } else if err != nil {
@ -238,7 +256,7 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
var stream Streamer var stream Streamer
var grr error var grr error
for i := 0; i < r.opts.Retries; i++ { for i := 0; i < callOpts.Retries; i++ {
node, err := next() node, err := next()
if err != nil && err == selector.ErrNotFound { if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", err.Error()) return nil, errors.NotFound("go.micro.client", err.Error())
@ -251,7 +269,7 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
address = fmt.Sprintf("%s:%d", address, node.Port) address = fmt.Sprintf("%s:%d", address, node.Port)
} }
stream, grr = r.stream(ctx, address, request) stream, grr = r.stream(ctx, address, request, callOpts)
r.opts.Selector.Mark(request.Service(), node, grr) r.opts.Selector.Mark(request.Service(), node, grr)
// bail early if succeeds // bail early if succeeds