diff --git a/config/config.go b/config/config.go index d2a0262d..32017708 100644 --- a/config/config.go +++ b/config/config.go @@ -10,8 +10,11 @@ import ( // DefaultConfig default config var DefaultConfig Config = NewConfig() -// DefaultWatcherInterval default interval for poll changes -var DefaultWatcherInterval = 5 * time.Second +// DefaultWatcherMinInterval default min interval for poll changes +var DefaultWatcherMinInterval = 5 * time.Second + +// DefaultWatcherMinInterval default max interval for poll changes +var DefaultWatcherMaxInterval = 9 * time.Second var ( // ErrCodecMissing is returned when codec needed and not specified diff --git a/config/default.go b/config/default.go index 3e9dec49..df5e2c34 100644 --- a/config/default.go +++ b/config/default.go @@ -5,9 +5,9 @@ import ( "reflect" "strconv" "strings" - "time" "github.com/imdario/mergo" + "github.com/unistack-org/micro/v3/util/jitter" rutil "github.com/unistack-org/micro/v3/util/reflect" ) @@ -302,7 +302,7 @@ type defaultWatcher struct { } func (w *defaultWatcher) run() { - ticker := time.NewTicker(w.wopts.Interval) + ticker := jitter.NewTicker(w.wopts.MinInterval, w.wopts.MaxInterval) defer ticker.Stop() src := w.opts.Struct diff --git a/config/default_test.go b/config/default_test.go index 66e3b193..a21a021d 100644 --- a/config/default_test.go +++ b/config/default_test.go @@ -31,7 +31,7 @@ func TestWatch(t *testing.T) { t.Fatal(err) } - w, err := cfg.Watch(ctx, config.WatchInterval(500*time.Millisecond)) + w, err := cfg.Watch(ctx, config.WatchInterval(200*time.Millisecond, 500*time.Millisecond)) if err != nil { t.Fatal(err) } diff --git a/config/options.go b/config/options.go index a417f571..3f97f20f 100644 --- a/config/options.go +++ b/config/options.go @@ -211,8 +211,10 @@ type WatchOptions struct { Context context.Context // Coalesce multiple events to one Coalesce bool - // Interval to periodically pull changes if config source not supports async notify - Interval time.Duration + // MinInterval specifies the min time.Duration interval for poll changes + MinInterval time.Duration + // MaxInterval specifies the max time.Duration interval for poll changes + MaxInterval time.Duration // Struct for filling Struct interface{} } @@ -221,8 +223,9 @@ type WatchOption func(*WatchOptions) func NewWatchOptions(opts ...WatchOption) WatchOptions { options := WatchOptions{ - Context: context.Background(), - Interval: DefaultWatcherInterval, + Context: context.Background(), + MinInterval: DefaultWatcherMinInterval, + MaxInterval: DefaultWatcherMaxInterval, } for _, o := range opts { o(&options) @@ -244,10 +247,11 @@ func WatchCoalesce(b bool) WatchOption { } } -// WatchInterval specifies time.Duration for pulling changes -func WatchInterval(td time.Duration) WatchOption { +// WatchInterval specifies min and max time.Duration for pulling changes +func WatchInterval(min, max time.Duration) WatchOption { return func(o *WatchOptions) { - o.Interval = td + o.MinInterval = min + o.MaxInterval = max } } diff --git a/util/jitter/jitter.go b/util/jitter/random.go similarity index 66% rename from util/jitter/jitter.go rename to util/jitter/random.go index 28dde074..3131afc7 100644 --- a/util/jitter/jitter.go +++ b/util/jitter/random.go @@ -7,8 +7,8 @@ import ( "github.com/unistack-org/micro/v3/util/rand" ) -// Do returns a random time to jitter with max cap specified -func Do(d time.Duration) time.Duration { +// Random returns a random time to jitter with max cap specified +func Random(d time.Duration) time.Duration { var rng rand.Rand v := rng.Float64() * float64(d.Nanoseconds()) return time.Duration(v) diff --git a/util/jitter/ticker.go b/util/jitter/ticker.go new file mode 100644 index 00000000..7457d3d1 --- /dev/null +++ b/util/jitter/ticker.go @@ -0,0 +1,65 @@ +package jitter + +import ( + "time" + + "github.com/unistack-org/micro/v3/util/rand" +) + +// Ticker is similar to time.Ticker but ticks at random intervals between +// the min and max duration values (stored internally as int64 nanosecond +// counts). +type Ticker struct { + C chan time.Time + done chan chan struct{} + min int64 + max int64 + rng rand.Rand +} + +// NewTicker returns a pointer to an initialized instance of the Ticker. +// Min and max are durations of the shortest and longest allowed +// ticks. Ticker will run in a goroutine until explicitly stopped. +func NewTicker(min, max time.Duration) *Ticker { + ticker := &Ticker{ + C: make(chan time.Time), + done: make(chan chan struct{}), + min: min.Nanoseconds(), + max: max.Nanoseconds(), + } + go ticker.run() + return ticker +} + +// Stop terminates the ticker goroutine and closes the C channel. +func (ticker *Ticker) Stop() { + c := make(chan struct{}) + ticker.done <- c + <-c +} + +func (ticker *Ticker) run() { + defer close(ticker.C) + t := time.NewTimer(ticker.nextInterval()) + for { + // either a stop signal or a timeout + select { + case c := <-ticker.done: + t.Stop() + close(c) + return + case <-t.C: + select { + case ticker.C <- time.Now(): + t.Stop() + t = time.NewTimer(ticker.nextInterval()) + default: + // there could be noone receiving... + } + } + } +} + +func (ticker *Ticker) nextInterval() time.Duration { + return time.Duration(ticker.rng.Int63n(ticker.max-ticker.min)+ticker.min) * time.Nanosecond +}