jitter: add NewTickerContext #106

Merged
vtolstov merged 1 commits from jitter into v3 2022-03-26 18:01:31 +03:00
4 changed files with 123 additions and 3 deletions

View File

@ -259,7 +259,7 @@ func (s *service) Start() error {
s.RUnlock() s.RUnlock()
if config.Loggers[0].V(logger.InfoLevel) { if config.Loggers[0].V(logger.InfoLevel) {
config.Loggers[0].Infof(s.opts.Context, "starting [service] %s-%s", s.Options().Name, s.Options().Version) config.Loggers[0].Infof(s.opts.Context, "starting [service] %s version %s", s.Options().Name, s.Options().Version)
} }
for _, fn := range s.opts.BeforeStart { for _, fn := range s.opts.BeforeStart {

View File

@ -1,6 +1,7 @@
package jitter package jitter // import "go.unistack.org/micro/v3/util/jitter"
import ( import (
"context"
"time" "time"
"go.unistack.org/micro/v3/util/rand" "go.unistack.org/micro/v3/util/rand"
@ -10,13 +11,31 @@ import (
// the min and max duration values (stored internally as int64 nanosecond // the min and max duration values (stored internally as int64 nanosecond
// counts). // counts).
type Ticker struct { type Ticker struct {
C chan time.Time ctx context.Context
done chan chan struct{} done chan chan struct{}
C chan time.Time
min int64 min int64
max int64 max int64
exp int64
exit bool
rng rand.Rand rng rand.Rand
} }
// NewTickerContext returns a pointer to an initialized instance of the Ticker.
// It works like NewTicker except that it has ability to close via context.
// Also it works fine with context.WithTimeout to handle max time to run ticker.
func NewTickerContext(ctx context.Context, min, max time.Duration) *Ticker {
ticker := &Ticker{
C: make(chan time.Time),
done: make(chan chan struct{}),
min: min.Nanoseconds(),
max: max.Nanoseconds(),
ctx: ctx,
}
go ticker.run()
return ticker
}
// NewTicker returns a pointer to an initialized instance of the Ticker. // NewTicker returns a pointer to an initialized instance of the Ticker.
// Min and max are durations of the shortest and longest allowed // Min and max are durations of the shortest and longest allowed
// ticks. Ticker will run in a goroutine until explicitly stopped. // ticks. Ticker will run in a goroutine until explicitly stopped.
@ -26,6 +45,7 @@ func NewTicker(min, max time.Duration) *Ticker {
done: make(chan chan struct{}), done: make(chan chan struct{}),
min: min.Nanoseconds(), min: min.Nanoseconds(),
max: max.Nanoseconds(), max: max.Nanoseconds(),
ctx: context.Background(),
} }
go ticker.run() go ticker.run()
return ticker return ticker
@ -33,9 +53,14 @@ func NewTicker(min, max time.Duration) *Ticker {
// Stop terminates the ticker goroutine and closes the C channel. // Stop terminates the ticker goroutine and closes the C channel.
func (ticker *Ticker) Stop() { func (ticker *Ticker) Stop() {
if ticker.exit {
return
}
c := make(chan struct{}) c := make(chan struct{})
ticker.done <- c ticker.done <- c
<-c <-c
// close(ticker.C)
ticker.exit = true
} }
func (ticker *Ticker) run() { func (ticker *Ticker) run() {
@ -44,6 +69,8 @@ func (ticker *Ticker) run() {
for { for {
// either a stop signal or a timeout // either a stop signal or a timeout
select { select {
case <-ticker.ctx.Done():
t.Stop()
case c := <-ticker.done: case c := <-ticker.done:
t.Stop() t.Stop()
close(c) close(c)

View File

@ -0,0 +1,62 @@
package jitter
import (
"context"
"testing"
"time"
)
func TestNewTickerContext(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
ticker := NewTickerContext(ctx, 600*time.Millisecond, 1000*time.Millisecond)
defer ticker.Stop()
loop:
for {
select {
case <-ctx.Done():
ticker.Stop()
break loop
case v, ok := <-ticker.C:
if ok {
t.Fatalf("context must be closed %s", v)
}
break loop
}
}
}
func TestTicker(t *testing.T) {
t.Parallel()
min := time.Duration(10)
max := time.Duration(20)
// tick can take a little longer since we're not adjusting it to account for
// processing.
precision := time.Duration(4)
rt := NewTicker(min*time.Millisecond, max*time.Millisecond)
for i := 0; i < 5; i++ {
t0 := time.Now()
t1 := <-rt.C
td := t1.Sub(t0)
if td < min*time.Millisecond {
t.Fatalf("tick was shorter than expected: %s", td)
} else if td > (max+precision)*time.Millisecond {
t.Fatalf("tick was longer than expected: %s", td)
}
}
rt.Stop()
time.Sleep((max + precision) * time.Millisecond)
select {
case v, ok := <-rt.C:
if ok || !v.IsZero() {
t.Fatal("ticker did not shut down")
}
default:
t.Fatal("expected to receive close channel signal")
}
}

View File

@ -1,7 +1,11 @@
package register // import "go.unistack.org/micro/v3/util/register" package register // import "go.unistack.org/micro/v3/util/register"
import ( import (
"context"
"time"
"go.unistack.org/micro/v3/register" "go.unistack.org/micro/v3/register"
jitter "go.unistack.org/micro/v3/util/jitter"
) )
func addNodes(old, neu []*register.Node) []*register.Node { func addNodes(old, neu []*register.Node) []*register.Node {
@ -146,3 +150,30 @@ func Remove(old, del []*register.Service) []*register.Service {
return services return services
} }
// WaitService using register wait for service to appear with min/max interval for check and optional timeout.
// Timeout can be 0 to wait infinitive.
func WaitService(ctx context.Context, reg register.Register, name string, min time.Duration, max time.Duration, timeout time.Duration, opts ...register.LookupOption) error {
if timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
ticker := jitter.NewTickerContext(ctx, min, max)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case _, ok := <-ticker.C:
if _, err := reg.LookupService(ctx, name, opts...); err == nil {
return nil
}
if ok {
return register.ErrNotFound
}
}
}
}