jitter: add NewTickerContext #106
@ -259,7 +259,7 @@ func (s *service) Start() error {
|
|||||||
s.RUnlock()
|
s.RUnlock()
|
||||||
|
|
||||||
if config.Loggers[0].V(logger.InfoLevel) {
|
if config.Loggers[0].V(logger.InfoLevel) {
|
||||||
config.Loggers[0].Infof(s.opts.Context, "starting [service] %s-%s", s.Options().Name, s.Options().Version)
|
config.Loggers[0].Infof(s.opts.Context, "starting [service] %s version %s", s.Options().Name, s.Options().Version)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, fn := range s.opts.BeforeStart {
|
for _, fn := range s.opts.BeforeStart {
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package jitter
|
package jitter // import "go.unistack.org/micro/v3/util/jitter"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/util/rand"
|
"go.unistack.org/micro/v3/util/rand"
|
||||||
@ -10,13 +11,31 @@ import (
|
|||||||
// the min and max duration values (stored internally as int64 nanosecond
|
// the min and max duration values (stored internally as int64 nanosecond
|
||||||
// counts).
|
// counts).
|
||||||
type Ticker struct {
|
type Ticker struct {
|
||||||
C chan time.Time
|
ctx context.Context
|
||||||
done chan chan struct{}
|
done chan chan struct{}
|
||||||
|
C chan time.Time
|
||||||
min int64
|
min int64
|
||||||
max int64
|
max int64
|
||||||
|
exp int64
|
||||||
|
exit bool
|
||||||
rng rand.Rand
|
rng rand.Rand
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewTickerContext returns a pointer to an initialized instance of the Ticker.
|
||||||
|
// It works like NewTicker except that it has ability to close via context.
|
||||||
|
// Also it works fine with context.WithTimeout to handle max time to run ticker.
|
||||||
|
func NewTickerContext(ctx context.Context, min, max time.Duration) *Ticker {
|
||||||
|
ticker := &Ticker{
|
||||||
|
C: make(chan time.Time),
|
||||||
|
done: make(chan chan struct{}),
|
||||||
|
min: min.Nanoseconds(),
|
||||||
|
max: max.Nanoseconds(),
|
||||||
|
ctx: ctx,
|
||||||
|
}
|
||||||
|
go ticker.run()
|
||||||
|
return ticker
|
||||||
|
}
|
||||||
|
|
||||||
// NewTicker returns a pointer to an initialized instance of the Ticker.
|
// NewTicker returns a pointer to an initialized instance of the Ticker.
|
||||||
// Min and max are durations of the shortest and longest allowed
|
// Min and max are durations of the shortest and longest allowed
|
||||||
// ticks. Ticker will run in a goroutine until explicitly stopped.
|
// ticks. Ticker will run in a goroutine until explicitly stopped.
|
||||||
@ -26,6 +45,7 @@ func NewTicker(min, max time.Duration) *Ticker {
|
|||||||
done: make(chan chan struct{}),
|
done: make(chan chan struct{}),
|
||||||
min: min.Nanoseconds(),
|
min: min.Nanoseconds(),
|
||||||
max: max.Nanoseconds(),
|
max: max.Nanoseconds(),
|
||||||
|
ctx: context.Background(),
|
||||||
}
|
}
|
||||||
go ticker.run()
|
go ticker.run()
|
||||||
return ticker
|
return ticker
|
||||||
@ -33,9 +53,14 @@ func NewTicker(min, max time.Duration) *Ticker {
|
|||||||
|
|
||||||
// Stop terminates the ticker goroutine and closes the C channel.
|
// Stop terminates the ticker goroutine and closes the C channel.
|
||||||
func (ticker *Ticker) Stop() {
|
func (ticker *Ticker) Stop() {
|
||||||
|
if ticker.exit {
|
||||||
|
return
|
||||||
|
}
|
||||||
c := make(chan struct{})
|
c := make(chan struct{})
|
||||||
ticker.done <- c
|
ticker.done <- c
|
||||||
<-c
|
<-c
|
||||||
|
// close(ticker.C)
|
||||||
|
ticker.exit = true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ticker *Ticker) run() {
|
func (ticker *Ticker) run() {
|
||||||
@ -44,6 +69,8 @@ func (ticker *Ticker) run() {
|
|||||||
for {
|
for {
|
||||||
// either a stop signal or a timeout
|
// either a stop signal or a timeout
|
||||||
select {
|
select {
|
||||||
|
case <-ticker.ctx.Done():
|
||||||
|
t.Stop()
|
||||||
case c := <-ticker.done:
|
case c := <-ticker.done:
|
||||||
t.Stop()
|
t.Stop()
|
||||||
close(c)
|
close(c)
|
||||||
|
62
util/jitter/ticker_test.go
Normal file
62
util/jitter/ticker_test.go
Normal file
@ -0,0 +1,62 @@
|
|||||||
|
package jitter
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNewTickerContext(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
ticker := NewTickerContext(ctx, 600*time.Millisecond, 1000*time.Millisecond)
|
||||||
|
defer ticker.Stop()
|
||||||
|
loop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
ticker.Stop()
|
||||||
|
break loop
|
||||||
|
case v, ok := <-ticker.C:
|
||||||
|
if ok {
|
||||||
|
t.Fatalf("context must be closed %s", v)
|
||||||
|
}
|
||||||
|
break loop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTicker(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
min := time.Duration(10)
|
||||||
|
max := time.Duration(20)
|
||||||
|
|
||||||
|
// tick can take a little longer since we're not adjusting it to account for
|
||||||
|
// processing.
|
||||||
|
precision := time.Duration(4)
|
||||||
|
|
||||||
|
rt := NewTicker(min*time.Millisecond, max*time.Millisecond)
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
t0 := time.Now()
|
||||||
|
t1 := <-rt.C
|
||||||
|
td := t1.Sub(t0)
|
||||||
|
if td < min*time.Millisecond {
|
||||||
|
t.Fatalf("tick was shorter than expected: %s", td)
|
||||||
|
} else if td > (max+precision)*time.Millisecond {
|
||||||
|
t.Fatalf("tick was longer than expected: %s", td)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
rt.Stop()
|
||||||
|
time.Sleep((max + precision) * time.Millisecond)
|
||||||
|
select {
|
||||||
|
case v, ok := <-rt.C:
|
||||||
|
if ok || !v.IsZero() {
|
||||||
|
t.Fatal("ticker did not shut down")
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
t.Fatal("expected to receive close channel signal")
|
||||||
|
}
|
||||||
|
}
|
@ -1,7 +1,11 @@
|
|||||||
package register // import "go.unistack.org/micro/v3/util/register"
|
package register // import "go.unistack.org/micro/v3/util/register"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/register"
|
"go.unistack.org/micro/v3/register"
|
||||||
|
jitter "go.unistack.org/micro/v3/util/jitter"
|
||||||
)
|
)
|
||||||
|
|
||||||
func addNodes(old, neu []*register.Node) []*register.Node {
|
func addNodes(old, neu []*register.Node) []*register.Node {
|
||||||
@ -146,3 +150,30 @@ func Remove(old, del []*register.Service) []*register.Service {
|
|||||||
|
|
||||||
return services
|
return services
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WaitService using register wait for service to appear with min/max interval for check and optional timeout.
|
||||||
|
// Timeout can be 0 to wait infinitive.
|
||||||
|
func WaitService(ctx context.Context, reg register.Register, name string, min time.Duration, max time.Duration, timeout time.Duration, opts ...register.LookupOption) error {
|
||||||
|
if timeout > 0 {
|
||||||
|
var cancel context.CancelFunc
|
||||||
|
ctx, cancel = context.WithTimeout(ctx, timeout)
|
||||||
|
defer cancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
ticker := jitter.NewTickerContext(ctx, min, max)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case _, ok := <-ticker.C:
|
||||||
|
if _, err := reg.LookupService(ctx, name, opts...); err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if ok {
|
||||||
|
return register.ErrNotFound
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user