Merge pull request #70 from micro/backoff

Add Backoff func option
This commit is contained in:
Asim Aslam 2016-04-06 17:43:39 +01:00
commit 5fb1051e0b
5 changed files with 86 additions and 0 deletions

18
client/backoff.go Normal file
View File

@ -0,0 +1,18 @@
package client
import (
"math"
"time"
"golang.org/x/net/context"
)
type BackoffFunc func(ctx context.Context, req Request, attempts int) (time.Duration, error)
// exponential backoff
func exponentialBackoff(ctx context.Context, req Request, attempts int) (time.Duration, error) {
if attempts == 0 {
return time.Duration(0), nil
}
return time.Duration(math.Pow(10, float64(attempts))) * time.Millisecond, nil
}

26
client/backoff_test.go Normal file
View File

@ -0,0 +1,26 @@
package client
import (
"math"
"testing"
"time"
"golang.org/x/net/context"
)
func TestBackoff(t *testing.T) {
delta := time.Duration(0)
for i := 0; i < 5; i++ {
d, err := exponentialBackoff(context.TODO(), NewJsonRequest("test", "test", nil), i)
if err != nil {
t.Fatal(err)
}
if d < delta {
t.Fatalf("Expected greater than %v, got %v", delta, d)
}
delta = time.Millisecond * time.Duration(math.Pow(10, float64(i+1)))
}
}

View File

@ -77,6 +77,7 @@ type RequestOption func(*RequestOptions)
var (
DefaultClient Client = newRpcClient()
DefaultBackoff = exponentialBackoff
DefaultRetries = 1
DefaultRequestTimeout = time.Second * 5
)

View File

@ -37,6 +37,8 @@ type Options struct {
type CallOptions struct {
SelectOptions []selector.SelectOption
// Backoff func
Backoff BackoffFunc
// Transport Dial Timeout
DialTimeout time.Duration
// Number of Call attempts
@ -67,6 +69,7 @@ func newOptions(options ...Option) Options {
opts := Options{
Codecs: make(map[string]codec.NewCodec),
CallOptions: CallOptions{
Backoff: DefaultBackoff,
Retries: DefaultRetries,
RequestTimeout: DefaultRequestTimeout,
DialTimeout: transport.DefaultDialTimeout,
@ -151,6 +154,14 @@ func Wrap(w Wrapper) Option {
}
}
// Backoff is used to set the backoff function used
// when retrying Calls
func Backoff(fn BackoffFunc) Option {
return func(o *Options) {
o.CallOptions.Backoff = fn
}
}
// Number of retries when making the request.
// Should this be a Call Option?
func Retries(i int) Option {
@ -182,6 +193,14 @@ func WithSelectOption(so selector.SelectOption) CallOption {
}
}
// WithBackoff is a CallOption which overrides that which
// set in Options.CallOptions
func WithBackoff(fn BackoffFunc) CallOption {
return func(o *CallOptions) {
o.Backoff = fn
}
}
// WithRetries is a CallOption which overrides that which
// set in Options.CallOptions
func WithRetries(i int) CallOption {

View File

@ -203,6 +203,17 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
var grr error
for i := 0; i < callOpts.Retries; i++ {
// call backoff first. Someone may want an initial start delay
t, err := callOpts.Backoff(ctx, request, i)
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
// only sleep if greater than 0
if t.Seconds() > 0 {
time.Sleep(t)
}
node, err := next()
if err != nil && err == selector.ErrNotFound {
return errors.NotFound("go.micro.client", err.Error())
@ -257,6 +268,17 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
var grr error
for i := 0; i < callOpts.Retries; i++ {
// call backoff first. Someone may want an initial start delay
t, err := callOpts.Backoff(ctx, request, i)
if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
}
// only sleep if greater than 0
if t.Seconds() > 0 {
time.Sleep(t)
}
node, err := next()
if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", err.Error())