From c6c2b0884e71d3e027ce7af225ce0e006ba1aeb9 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sat, 26 Mar 2022 17:59:19 +0300 Subject: [PATCH] jitter: add NewTickerContext Signed-off-by: Vasiliy Tolstov --- service.go | 2 +- util/jitter/ticker.go | 31 +++++++++++++++++-- util/jitter/ticker_test.go | 62 ++++++++++++++++++++++++++++++++++++++ util/register/util.go | 31 +++++++++++++++++++ 4 files changed, 123 insertions(+), 3 deletions(-) create mode 100644 util/jitter/ticker_test.go diff --git a/service.go b/service.go index 50478d00..bf19aa57 100644 --- a/service.go +++ b/service.go @@ -259,7 +259,7 @@ func (s *service) Start() error { s.RUnlock() if config.Loggers[0].V(logger.InfoLevel) { - config.Loggers[0].Infof(s.opts.Context, "starting [service] %s-%s", s.Options().Name, s.Options().Version) + config.Loggers[0].Infof(s.opts.Context, "starting [service] %s version %s", s.Options().Name, s.Options().Version) } for _, fn := range s.opts.BeforeStart { diff --git a/util/jitter/ticker.go b/util/jitter/ticker.go index 0b78342b..84946556 100644 --- a/util/jitter/ticker.go +++ b/util/jitter/ticker.go @@ -1,6 +1,7 @@ -package jitter +package jitter // import "go.unistack.org/micro/v3/util/jitter" import ( + "context" "time" "go.unistack.org/micro/v3/util/rand" @@ -10,13 +11,31 @@ import ( // the min and max duration values (stored internally as int64 nanosecond // counts). type Ticker struct { - C chan time.Time + ctx context.Context done chan chan struct{} + C chan time.Time min int64 max int64 + exp int64 + exit bool rng rand.Rand } +// NewTickerContext returns a pointer to an initialized instance of the Ticker. +// It works like NewTicker except that it has ability to close via context. +// Also it works fine with context.WithTimeout to handle max time to run ticker. +func NewTickerContext(ctx context.Context, min, max time.Duration) *Ticker { + ticker := &Ticker{ + C: make(chan time.Time), + done: make(chan chan struct{}), + min: min.Nanoseconds(), + max: max.Nanoseconds(), + ctx: ctx, + } + go ticker.run() + return ticker +} + // NewTicker returns a pointer to an initialized instance of the Ticker. // Min and max are durations of the shortest and longest allowed // ticks. Ticker will run in a goroutine until explicitly stopped. @@ -26,6 +45,7 @@ func NewTicker(min, max time.Duration) *Ticker { done: make(chan chan struct{}), min: min.Nanoseconds(), max: max.Nanoseconds(), + ctx: context.Background(), } go ticker.run() return ticker @@ -33,9 +53,14 @@ func NewTicker(min, max time.Duration) *Ticker { // Stop terminates the ticker goroutine and closes the C channel. func (ticker *Ticker) Stop() { + if ticker.exit { + return + } c := make(chan struct{}) ticker.done <- c <-c + // close(ticker.C) + ticker.exit = true } func (ticker *Ticker) run() { @@ -44,6 +69,8 @@ func (ticker *Ticker) run() { for { // either a stop signal or a timeout select { + case <-ticker.ctx.Done(): + t.Stop() case c := <-ticker.done: t.Stop() close(c) diff --git a/util/jitter/ticker_test.go b/util/jitter/ticker_test.go new file mode 100644 index 00000000..a271dac0 --- /dev/null +++ b/util/jitter/ticker_test.go @@ -0,0 +1,62 @@ +package jitter + +import ( + "context" + "testing" + "time" +) + +func TestNewTickerContext(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + ticker := NewTickerContext(ctx, 600*time.Millisecond, 1000*time.Millisecond) + defer ticker.Stop() +loop: + for { + select { + case <-ctx.Done(): + ticker.Stop() + break loop + case v, ok := <-ticker.C: + if ok { + t.Fatalf("context must be closed %s", v) + } + break loop + } + } +} + +func TestTicker(t *testing.T) { + t.Parallel() + + min := time.Duration(10) + max := time.Duration(20) + + // tick can take a little longer since we're not adjusting it to account for + // processing. + precision := time.Duration(4) + + rt := NewTicker(min*time.Millisecond, max*time.Millisecond) + for i := 0; i < 5; i++ { + t0 := time.Now() + t1 := <-rt.C + td := t1.Sub(t0) + if td < min*time.Millisecond { + t.Fatalf("tick was shorter than expected: %s", td) + } else if td > (max+precision)*time.Millisecond { + t.Fatalf("tick was longer than expected: %s", td) + } + } + rt.Stop() + time.Sleep((max + precision) * time.Millisecond) + select { + case v, ok := <-rt.C: + if ok || !v.IsZero() { + t.Fatal("ticker did not shut down") + } + default: + t.Fatal("expected to receive close channel signal") + } +} diff --git a/util/register/util.go b/util/register/util.go index b3baf17a..e3867389 100644 --- a/util/register/util.go +++ b/util/register/util.go @@ -1,7 +1,11 @@ package register // import "go.unistack.org/micro/v3/util/register" import ( + "context" + "time" + "go.unistack.org/micro/v3/register" + jitter "go.unistack.org/micro/v3/util/jitter" ) func addNodes(old, neu []*register.Node) []*register.Node { @@ -146,3 +150,30 @@ func Remove(old, del []*register.Service) []*register.Service { return services } + +// WaitService using register wait for service to appear with min/max interval for check and optional timeout. +// Timeout can be 0 to wait infinitive. +func WaitService(ctx context.Context, reg register.Register, name string, min time.Duration, max time.Duration, timeout time.Duration, opts ...register.LookupOption) error { + if timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, timeout) + defer cancel() + } + + ticker := jitter.NewTickerContext(ctx, min, max) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case _, ok := <-ticker.C: + if _, err := reg.LookupService(ctx, name, opts...); err == nil { + return nil + } + if ok { + return register.ErrNotFound + } + } + } +}