Compare commits
6 Commits
Author | SHA1 | Date | |
---|---|---|---|
c10f29ee74 | |||
03410c4ab1 | |||
3805d0f067 | |||
680ac11ef9 | |||
35ab6ae84e | |||
c6c2b0884e |
@@ -2,6 +2,7 @@ package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"go.unistack.org/micro/v3/util/backoff"
|
||||
@@ -10,6 +11,20 @@ import (
|
||||
// BackoffFunc is the backoff call func
|
||||
type BackoffFunc func(ctx context.Context, req Request, attempts int) (time.Duration, error)
|
||||
|
||||
func exponentialBackoff(ctx context.Context, req Request, attempts int) (time.Duration, error) {
|
||||
// BackoffExp using exponential backoff func
|
||||
func BackoffExp(_ context.Context, _ Request, attempts int) (time.Duration, error) {
|
||||
return backoff.Do(attempts), nil
|
||||
}
|
||||
|
||||
// BackoffInterval specifies randomization interval for backoff func
|
||||
func BackoffInterval(min time.Duration, max time.Duration) BackoffFunc {
|
||||
return func(_ context.Context, _ Request, attempts int) (time.Duration, error) {
|
||||
td := time.Duration(time.Duration(math.Pow(float64(attempts), math.E)) * time.Millisecond * 100)
|
||||
if td < min {
|
||||
return min, nil
|
||||
} else if td > max {
|
||||
return max, nil
|
||||
}
|
||||
return td, nil
|
||||
}
|
||||
}
|
||||
|
@@ -22,7 +22,7 @@ func TestBackoff(t *testing.T) {
|
||||
}
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
d, err := exponentialBackoff(context.TODO(), r, i)
|
||||
d, err := BackoffExp(context.TODO(), r, i)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@@ -14,8 +14,8 @@ var (
|
||||
DefaultClient Client = NewClient()
|
||||
// DefaultContentType is the default content-type if not specified
|
||||
DefaultContentType = "application/json"
|
||||
// DefaultBackoff is the default backoff function for retries
|
||||
DefaultBackoff = exponentialBackoff
|
||||
// DefaultBackoff is the default backoff function for retries (minimum 10 millisecond and maximum 5 second)
|
||||
DefaultBackoff = BackoffInterval(10*time.Millisecond, 5*time.Second)
|
||||
// DefaultRetry is the default check-for-retry function for retries
|
||||
DefaultRetry = RetryNever
|
||||
// DefaultRetries is the default number of times a request is tried
|
||||
|
@@ -19,18 +19,32 @@ func RetryNever(ctx context.Context, req Request, retryCount int, err error) (bo
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// RetryOnError retries a request on a 500 or timeout error
|
||||
// RetryOnError retries a request on a 500 or 408 (timeout) error
|
||||
func RetryOnError(_ context.Context, _ Request, _ int, err error) (bool, error) {
|
||||
if err == nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
me := errors.FromError(err)
|
||||
switch me.Code {
|
||||
// retry on timeout or internal server error
|
||||
case 408, 500:
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// RetryOnErrors retries a request on specified error codes
|
||||
func RetryOnErrors(codes ...int32) RetryFunc {
|
||||
return func(_ context.Context, _ Request, _ int, err error) (bool, error) {
|
||||
if err == nil {
|
||||
return false, nil
|
||||
}
|
||||
me := errors.FromError(err)
|
||||
for _, code := range codes {
|
||||
if me.Code == code {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
34
router/context.go
Normal file
34
router/context.go
Normal file
@@ -0,0 +1,34 @@
|
||||
package router
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
type routerKey struct{}
|
||||
|
||||
// FromContext get router from context
|
||||
func FromContext(ctx context.Context) (Router, bool) {
|
||||
if ctx == nil {
|
||||
return nil, false
|
||||
}
|
||||
c, ok := ctx.Value(routerKey{}).(Router)
|
||||
return c, ok
|
||||
}
|
||||
|
||||
// NewContext put router in context
|
||||
func NewContext(ctx context.Context, c Router) context.Context {
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
return context.WithValue(ctx, routerKey{}, c)
|
||||
}
|
||||
|
||||
// SetOption returns a function to setup a context with given value
|
||||
func SetOption(k, v interface{}) Option {
|
||||
return func(o *Options) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
o.Context = context.WithValue(o.Context, k, v)
|
||||
}
|
||||
}
|
@@ -259,7 +259,7 @@ func (s *service) Start() error {
|
||||
s.RUnlock()
|
||||
|
||||
if config.Loggers[0].V(logger.InfoLevel) {
|
||||
config.Loggers[0].Infof(s.opts.Context, "starting [service] %s-%s", s.Options().Name, s.Options().Version)
|
||||
config.Loggers[0].Infof(s.opts.Context, "starting [service] %s version %s", s.Options().Name, s.Options().Version)
|
||||
}
|
||||
|
||||
for _, fn := range s.opts.BeforeStart {
|
||||
|
@@ -13,3 +13,8 @@ func Random(d time.Duration) time.Duration {
|
||||
v := rng.Float64() * float64(d.Nanoseconds())
|
||||
return time.Duration(v)
|
||||
}
|
||||
|
||||
func RandomInterval(min, max time.Duration) time.Duration {
|
||||
var rng rand.Rand
|
||||
return time.Duration(rng.Int63n(max.Nanoseconds()-min.Nanoseconds())+min.Nanoseconds()) * time.Nanosecond
|
||||
}
|
||||
|
@@ -1,6 +1,7 @@
|
||||
package jitter
|
||||
package jitter // import "go.unistack.org/micro/v3/util/jitter"
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"go.unistack.org/micro/v3/util/rand"
|
||||
@@ -10,13 +11,31 @@ import (
|
||||
// the min and max duration values (stored internally as int64 nanosecond
|
||||
// counts).
|
||||
type Ticker struct {
|
||||
C chan time.Time
|
||||
ctx context.Context
|
||||
done chan chan struct{}
|
||||
C chan time.Time
|
||||
min int64
|
||||
max int64
|
||||
exp int64
|
||||
exit bool
|
||||
rng rand.Rand
|
||||
}
|
||||
|
||||
// NewTickerContext returns a pointer to an initialized instance of the Ticker.
|
||||
// It works like NewTicker except that it has ability to close via context.
|
||||
// Also it works fine with context.WithTimeout to handle max time to run ticker.
|
||||
func NewTickerContext(ctx context.Context, min, max time.Duration) *Ticker {
|
||||
ticker := &Ticker{
|
||||
C: make(chan time.Time),
|
||||
done: make(chan chan struct{}),
|
||||
min: min.Nanoseconds(),
|
||||
max: max.Nanoseconds(),
|
||||
ctx: ctx,
|
||||
}
|
||||
go ticker.run()
|
||||
return ticker
|
||||
}
|
||||
|
||||
// NewTicker returns a pointer to an initialized instance of the Ticker.
|
||||
// Min and max are durations of the shortest and longest allowed
|
||||
// ticks. Ticker will run in a goroutine until explicitly stopped.
|
||||
@@ -26,6 +45,7 @@ func NewTicker(min, max time.Duration) *Ticker {
|
||||
done: make(chan chan struct{}),
|
||||
min: min.Nanoseconds(),
|
||||
max: max.Nanoseconds(),
|
||||
ctx: context.Background(),
|
||||
}
|
||||
go ticker.run()
|
||||
return ticker
|
||||
@@ -33,9 +53,14 @@ func NewTicker(min, max time.Duration) *Ticker {
|
||||
|
||||
// Stop terminates the ticker goroutine and closes the C channel.
|
||||
func (ticker *Ticker) Stop() {
|
||||
if ticker.exit {
|
||||
return
|
||||
}
|
||||
c := make(chan struct{})
|
||||
ticker.done <- c
|
||||
<-c
|
||||
// close(ticker.C)
|
||||
ticker.exit = true
|
||||
}
|
||||
|
||||
func (ticker *Ticker) run() {
|
||||
@@ -44,6 +69,8 @@ func (ticker *Ticker) run() {
|
||||
for {
|
||||
// either a stop signal or a timeout
|
||||
select {
|
||||
case <-ticker.ctx.Done():
|
||||
t.Stop()
|
||||
case c := <-ticker.done:
|
||||
t.Stop()
|
||||
close(c)
|
||||
|
62
util/jitter/ticker_test.go
Normal file
62
util/jitter/ticker_test.go
Normal file
@@ -0,0 +1,62 @@
|
||||
package jitter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestNewTickerContext(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
|
||||
defer cancel()
|
||||
|
||||
ticker := NewTickerContext(ctx, 600*time.Millisecond, 1000*time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
ticker.Stop()
|
||||
break loop
|
||||
case v, ok := <-ticker.C:
|
||||
if ok {
|
||||
t.Fatalf("context must be closed %s", v)
|
||||
}
|
||||
break loop
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestTicker(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
min := time.Duration(10)
|
||||
max := time.Duration(20)
|
||||
|
||||
// tick can take a little longer since we're not adjusting it to account for
|
||||
// processing.
|
||||
precision := time.Duration(4)
|
||||
|
||||
rt := NewTicker(min*time.Millisecond, max*time.Millisecond)
|
||||
for i := 0; i < 5; i++ {
|
||||
t0 := time.Now()
|
||||
t1 := <-rt.C
|
||||
td := t1.Sub(t0)
|
||||
if td < min*time.Millisecond {
|
||||
t.Fatalf("tick was shorter than expected: %s", td)
|
||||
} else if td > (max+precision)*time.Millisecond {
|
||||
t.Fatalf("tick was longer than expected: %s", td)
|
||||
}
|
||||
}
|
||||
rt.Stop()
|
||||
time.Sleep((max + precision) * time.Millisecond)
|
||||
select {
|
||||
case v, ok := <-rt.C:
|
||||
if ok || !v.IsZero() {
|
||||
t.Fatal("ticker did not shut down")
|
||||
}
|
||||
default:
|
||||
t.Fatal("expected to receive close channel signal")
|
||||
}
|
||||
}
|
@@ -1,7 +1,11 @@
|
||||
package register // import "go.unistack.org/micro/v3/util/register"
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"go.unistack.org/micro/v3/register"
|
||||
jitter "go.unistack.org/micro/v3/util/jitter"
|
||||
)
|
||||
|
||||
func addNodes(old, neu []*register.Node) []*register.Node {
|
||||
@@ -146,3 +150,30 @@ func Remove(old, del []*register.Service) []*register.Service {
|
||||
|
||||
return services
|
||||
}
|
||||
|
||||
// WaitService using register wait for service to appear with min/max interval for check and optional timeout.
|
||||
// Timeout can be 0 to wait infinitive.
|
||||
func WaitService(ctx context.Context, reg register.Register, name string, min time.Duration, max time.Duration, timeout time.Duration, opts ...register.LookupOption) error {
|
||||
if timeout > 0 {
|
||||
var cancel context.CancelFunc
|
||||
ctx, cancel = context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
}
|
||||
|
||||
ticker := jitter.NewTickerContext(ctx, min, max)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case _, ok := <-ticker.C:
|
||||
if _, err := reg.LookupService(ctx, name, opts...); err == nil {
|
||||
return nil
|
||||
}
|
||||
if ok {
|
||||
return register.ErrNotFound
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user