Add retries

This commit is contained in:
Asim 2016-01-02 23:16:15 +00:00
parent b11e5789fe
commit 1037663acf
2 changed files with 89 additions and 54 deletions

View File

@ -16,6 +16,7 @@ type Options struct {
Selector selector.Selector Selector selector.Selector
Transport transport.Transport Transport transport.Transport
Wrappers []Wrapper Wrappers []Wrapper
Retries int
// Other options to be used by client implementations // Other options to be used by client implementations
Options map[string]string Options map[string]string
@ -40,6 +41,44 @@ type RequestOptions struct {
Options map[string]string Options map[string]string
} }
func newOptions(options ...Option) Options {
opts := Options{
Codecs: make(map[string]codec.NewCodec),
}
for _, o := range options {
o(&opts)
}
if opts.Retries == 0 {
opts.Retries = 1
}
if len(opts.ContentType) == 0 {
opts.ContentType = defaultContentType
}
if opts.Broker == nil {
opts.Broker = broker.DefaultBroker
}
if opts.Registry == nil {
opts.Registry = registry.DefaultRegistry
}
if opts.Selector == nil {
opts.Selector = selector.NewSelector(
selector.Registry(opts.Registry),
)
}
if opts.Transport == nil {
opts.Transport = transport.DefaultTransport
}
return opts
}
// Broker to be used for pub/sub // Broker to be used for pub/sub
func Broker(b broker.Broker) Option { func Broker(b broker.Broker) Option {
return func(o *Options) { return func(o *Options) {
@ -89,6 +128,13 @@ func Wrap(w Wrapper) Option {
} }
} }
// Number of retries when making the request
func Retries(i int) Option {
return func(o *Options) {
o.Retries = i
}
}
// Call Options // Call Options
func WithSelectOption(so selector.SelectOption) CallOption { func WithSelectOption(so selector.SelectOption) CallOption {

View File

@ -9,7 +9,6 @@ import (
"github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec"
c "github.com/micro/go-micro/context" c "github.com/micro/go-micro/context"
"github.com/micro/go-micro/errors" "github.com/micro/go-micro/errors"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector" "github.com/micro/go-micro/selector"
"github.com/micro/go-micro/transport" "github.com/micro/go-micro/transport"
@ -24,35 +23,7 @@ type rpcClient struct {
func newRpcClient(opt ...Option) Client { func newRpcClient(opt ...Option) Client {
var once sync.Once var once sync.Once
opts := Options{ opts := newOptions(opt...)
Codecs: make(map[string]codec.NewCodec),
}
for _, o := range opt {
o(&opts)
}
if len(opts.ContentType) == 0 {
opts.ContentType = defaultContentType
}
if opts.Broker == nil {
opts.Broker = broker.DefaultBroker
}
if opts.Registry == nil {
opts.Registry = registry.DefaultRegistry
}
if opts.Selector == nil {
opts.Selector = selector.NewSelector(
selector.Registry(opts.Registry),
)
}
if opts.Transport == nil {
opts.Transport = transport.DefaultTransport
}
rc := &rpcClient{ rc := &rpcClient{
once: once, once: once,
@ -177,20 +148,28 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
return errors.InternalServerError("go.micro.client", err.Error()) return errors.InternalServerError("go.micro.client", err.Error())
} }
node, err := next() for i := 0; i < r.opts.Retries; i++ {
if err != nil && err == selector.ErrNotFound { node, err := next()
return errors.NotFound("go.micro.client", err.Error()) if err != nil && err == selector.ErrNotFound {
} else if err != nil { return errors.NotFound("go.micro.client", err.Error())
return errors.InternalServerError("go.micro.client", err.Error()) } else if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
address := node.Address
if node.Port > 0 {
address = fmt.Sprintf("%s:%d", address, node.Port)
}
err = r.call(ctx, address, request, response)
r.opts.Selector.Mark(request.Service(), node, err)
// if the call succeeded lets bail early
if err == nil {
return nil
}
} }
address := node.Address
if node.Port > 0 {
address = fmt.Sprintf("%s:%d", address, node.Port)
}
err = r.call(ctx, address, request, response)
r.opts.Selector.Mark(request.Service(), node, err)
return err return err
} }
@ -211,20 +190,30 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
return nil, errors.InternalServerError("go.micro.client", err.Error()) return nil, errors.InternalServerError("go.micro.client", err.Error())
} }
node, err := next() var stream Streamer
if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", err.Error()) for i := 0; i < r.opts.Retries; i++ {
} else if err != nil { node, err := next()
return nil, errors.InternalServerError("go.micro.client", err.Error()) if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", err.Error())
} else if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
}
address := node.Address
if node.Port > 0 {
address = fmt.Sprintf("%s:%d", address, node.Port)
}
stream, err = r.stream(ctx, address, request)
r.opts.Selector.Mark(request.Service(), node, err)
// bail early if succeeds
if err == nil {
return stream, nil
}
} }
address := node.Address
if node.Port > 0 {
address = fmt.Sprintf("%s:%d", address, node.Port)
}
stream, err := r.stream(ctx, address, request)
r.opts.Selector.Mark(request.Service(), node, err)
return stream, err return stream, err
} }