jitter: add NewTickerContext
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
parent
297a80da84
commit
c6c2b0884e
@ -259,7 +259,7 @@ func (s *service) Start() error {
|
||||
s.RUnlock()
|
||||
|
||||
if config.Loggers[0].V(logger.InfoLevel) {
|
||||
config.Loggers[0].Infof(s.opts.Context, "starting [service] %s-%s", s.Options().Name, s.Options().Version)
|
||||
config.Loggers[0].Infof(s.opts.Context, "starting [service] %s version %s", s.Options().Name, s.Options().Version)
|
||||
}
|
||||
|
||||
for _, fn := range s.opts.BeforeStart {
|
||||
|
@ -1,6 +1,7 @@
|
||||
package jitter
|
||||
package jitter // import "go.unistack.org/micro/v3/util/jitter"
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"go.unistack.org/micro/v3/util/rand"
|
||||
@ -10,13 +11,31 @@ import (
|
||||
// the min and max duration values (stored internally as int64 nanosecond
|
||||
// counts).
|
||||
type Ticker struct {
|
||||
C chan time.Time
|
||||
ctx context.Context
|
||||
done chan chan struct{}
|
||||
C chan time.Time
|
||||
min int64
|
||||
max int64
|
||||
exp int64
|
||||
exit bool
|
||||
rng rand.Rand
|
||||
}
|
||||
|
||||
// NewTickerContext returns a pointer to an initialized instance of the Ticker.
|
||||
// It works like NewTicker except that it has ability to close via context.
|
||||
// Also it works fine with context.WithTimeout to handle max time to run ticker.
|
||||
func NewTickerContext(ctx context.Context, min, max time.Duration) *Ticker {
|
||||
ticker := &Ticker{
|
||||
C: make(chan time.Time),
|
||||
done: make(chan chan struct{}),
|
||||
min: min.Nanoseconds(),
|
||||
max: max.Nanoseconds(),
|
||||
ctx: ctx,
|
||||
}
|
||||
go ticker.run()
|
||||
return ticker
|
||||
}
|
||||
|
||||
// NewTicker returns a pointer to an initialized instance of the Ticker.
|
||||
// Min and max are durations of the shortest and longest allowed
|
||||
// ticks. Ticker will run in a goroutine until explicitly stopped.
|
||||
@ -26,6 +45,7 @@ func NewTicker(min, max time.Duration) *Ticker {
|
||||
done: make(chan chan struct{}),
|
||||
min: min.Nanoseconds(),
|
||||
max: max.Nanoseconds(),
|
||||
ctx: context.Background(),
|
||||
}
|
||||
go ticker.run()
|
||||
return ticker
|
||||
@ -33,9 +53,14 @@ func NewTicker(min, max time.Duration) *Ticker {
|
||||
|
||||
// Stop terminates the ticker goroutine and closes the C channel.
|
||||
func (ticker *Ticker) Stop() {
|
||||
if ticker.exit {
|
||||
return
|
||||
}
|
||||
c := make(chan struct{})
|
||||
ticker.done <- c
|
||||
<-c
|
||||
// close(ticker.C)
|
||||
ticker.exit = true
|
||||
}
|
||||
|
||||
func (ticker *Ticker) run() {
|
||||
@ -44,6 +69,8 @@ func (ticker *Ticker) run() {
|
||||
for {
|
||||
// either a stop signal or a timeout
|
||||
select {
|
||||
case <-ticker.ctx.Done():
|
||||
t.Stop()
|
||||
case c := <-ticker.done:
|
||||
t.Stop()
|
||||
close(c)
|
||||
|
62
util/jitter/ticker_test.go
Normal file
62
util/jitter/ticker_test.go
Normal file
@ -0,0 +1,62 @@
|
||||
package jitter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestNewTickerContext(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
|
||||
defer cancel()
|
||||
|
||||
ticker := NewTickerContext(ctx, 600*time.Millisecond, 1000*time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
ticker.Stop()
|
||||
break loop
|
||||
case v, ok := <-ticker.C:
|
||||
if ok {
|
||||
t.Fatalf("context must be closed %s", v)
|
||||
}
|
||||
break loop
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestTicker(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
min := time.Duration(10)
|
||||
max := time.Duration(20)
|
||||
|
||||
// tick can take a little longer since we're not adjusting it to account for
|
||||
// processing.
|
||||
precision := time.Duration(4)
|
||||
|
||||
rt := NewTicker(min*time.Millisecond, max*time.Millisecond)
|
||||
for i := 0; i < 5; i++ {
|
||||
t0 := time.Now()
|
||||
t1 := <-rt.C
|
||||
td := t1.Sub(t0)
|
||||
if td < min*time.Millisecond {
|
||||
t.Fatalf("tick was shorter than expected: %s", td)
|
||||
} else if td > (max+precision)*time.Millisecond {
|
||||
t.Fatalf("tick was longer than expected: %s", td)
|
||||
}
|
||||
}
|
||||
rt.Stop()
|
||||
time.Sleep((max + precision) * time.Millisecond)
|
||||
select {
|
||||
case v, ok := <-rt.C:
|
||||
if ok || !v.IsZero() {
|
||||
t.Fatal("ticker did not shut down")
|
||||
}
|
||||
default:
|
||||
t.Fatal("expected to receive close channel signal")
|
||||
}
|
||||
}
|
@ -1,7 +1,11 @@
|
||||
package register // import "go.unistack.org/micro/v3/util/register"
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"go.unistack.org/micro/v3/register"
|
||||
jitter "go.unistack.org/micro/v3/util/jitter"
|
||||
)
|
||||
|
||||
func addNodes(old, neu []*register.Node) []*register.Node {
|
||||
@ -146,3 +150,30 @@ func Remove(old, del []*register.Service) []*register.Service {
|
||||
|
||||
return services
|
||||
}
|
||||
|
||||
// WaitService using register wait for service to appear with min/max interval for check and optional timeout.
|
||||
// Timeout can be 0 to wait infinitive.
|
||||
func WaitService(ctx context.Context, reg register.Register, name string, min time.Duration, max time.Duration, timeout time.Duration, opts ...register.LookupOption) error {
|
||||
if timeout > 0 {
|
||||
var cancel context.CancelFunc
|
||||
ctx, cancel = context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
}
|
||||
|
||||
ticker := jitter.NewTickerContext(ctx, min, max)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case _, ok := <-ticker.C:
|
||||
if _, err := reg.LookupService(ctx, name, opts...); err == nil {
|
||||
return nil
|
||||
}
|
||||
if ok {
|
||||
return register.ErrNotFound
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user