jitter: add NewTickerContext #106
| @@ -259,7 +259,7 @@ func (s *service) Start() error { | |||||||
| 	s.RUnlock() | 	s.RUnlock() | ||||||
|  |  | ||||||
| 	if config.Loggers[0].V(logger.InfoLevel) { | 	if config.Loggers[0].V(logger.InfoLevel) { | ||||||
| 		config.Loggers[0].Infof(s.opts.Context, "starting [service] %s-%s", s.Options().Name, s.Options().Version) | 		config.Loggers[0].Infof(s.opts.Context, "starting [service] %s version %s", s.Options().Name, s.Options().Version) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	for _, fn := range s.opts.BeforeStart { | 	for _, fn := range s.opts.BeforeStart { | ||||||
|   | |||||||
| @@ -1,6 +1,7 @@ | |||||||
| package jitter | package jitter // import "go.unistack.org/micro/v3/util/jitter" | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
|  | 	"context" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| 	"go.unistack.org/micro/v3/util/rand" | 	"go.unistack.org/micro/v3/util/rand" | ||||||
| @@ -10,13 +11,31 @@ import ( | |||||||
| // the min and max duration values (stored internally as int64 nanosecond | // the min and max duration values (stored internally as int64 nanosecond | ||||||
| // counts). | // counts). | ||||||
| type Ticker struct { | type Ticker struct { | ||||||
| 	C    chan time.Time | 	ctx  context.Context | ||||||
| 	done chan chan struct{} | 	done chan chan struct{} | ||||||
|  | 	C    chan time.Time | ||||||
| 	min  int64 | 	min  int64 | ||||||
| 	max  int64 | 	max  int64 | ||||||
|  | 	exp  int64 | ||||||
|  | 	exit bool | ||||||
| 	rng  rand.Rand | 	rng  rand.Rand | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // NewTickerContext returns a pointer to an initialized instance of the Ticker. | ||||||
|  | // It works like NewTicker except that it has ability to close via context. | ||||||
|  | // Also it works fine with context.WithTimeout to handle max time to run ticker. | ||||||
|  | func NewTickerContext(ctx context.Context, min, max time.Duration) *Ticker { | ||||||
|  | 	ticker := &Ticker{ | ||||||
|  | 		C:    make(chan time.Time), | ||||||
|  | 		done: make(chan chan struct{}), | ||||||
|  | 		min:  min.Nanoseconds(), | ||||||
|  | 		max:  max.Nanoseconds(), | ||||||
|  | 		ctx:  ctx, | ||||||
|  | 	} | ||||||
|  | 	go ticker.run() | ||||||
|  | 	return ticker | ||||||
|  | } | ||||||
|  |  | ||||||
| // NewTicker returns a pointer to an initialized instance of the Ticker. | // NewTicker returns a pointer to an initialized instance of the Ticker. | ||||||
| // Min and max are durations of the shortest and longest allowed | // Min and max are durations of the shortest and longest allowed | ||||||
| // ticks. Ticker will run in a goroutine until explicitly stopped. | // ticks. Ticker will run in a goroutine until explicitly stopped. | ||||||
| @@ -26,6 +45,7 @@ func NewTicker(min, max time.Duration) *Ticker { | |||||||
| 		done: make(chan chan struct{}), | 		done: make(chan chan struct{}), | ||||||
| 		min:  min.Nanoseconds(), | 		min:  min.Nanoseconds(), | ||||||
| 		max:  max.Nanoseconds(), | 		max:  max.Nanoseconds(), | ||||||
|  | 		ctx:  context.Background(), | ||||||
| 	} | 	} | ||||||
| 	go ticker.run() | 	go ticker.run() | ||||||
| 	return ticker | 	return ticker | ||||||
| @@ -33,9 +53,14 @@ func NewTicker(min, max time.Duration) *Ticker { | |||||||
|  |  | ||||||
| // Stop terminates the ticker goroutine and closes the C channel. | // Stop terminates the ticker goroutine and closes the C channel. | ||||||
| func (ticker *Ticker) Stop() { | func (ticker *Ticker) Stop() { | ||||||
|  | 	if ticker.exit { | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
| 	c := make(chan struct{}) | 	c := make(chan struct{}) | ||||||
| 	ticker.done <- c | 	ticker.done <- c | ||||||
| 	<-c | 	<-c | ||||||
|  | 	// close(ticker.C) | ||||||
|  | 	ticker.exit = true | ||||||
| } | } | ||||||
|  |  | ||||||
| func (ticker *Ticker) run() { | func (ticker *Ticker) run() { | ||||||
| @@ -44,6 +69,8 @@ func (ticker *Ticker) run() { | |||||||
| 	for { | 	for { | ||||||
| 		// either a stop signal or a timeout | 		// either a stop signal or a timeout | ||||||
| 		select { | 		select { | ||||||
|  | 		case <-ticker.ctx.Done(): | ||||||
|  | 			t.Stop() | ||||||
| 		case c := <-ticker.done: | 		case c := <-ticker.done: | ||||||
| 			t.Stop() | 			t.Stop() | ||||||
| 			close(c) | 			close(c) | ||||||
|   | |||||||
							
								
								
									
										62
									
								
								util/jitter/ticker_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										62
									
								
								util/jitter/ticker_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,62 @@ | |||||||
|  | package jitter | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"testing" | ||||||
|  | 	"time" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | func TestNewTickerContext(t *testing.T) { | ||||||
|  | 	t.Parallel() | ||||||
|  | 	ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) | ||||||
|  | 	defer cancel() | ||||||
|  |  | ||||||
|  | 	ticker := NewTickerContext(ctx, 600*time.Millisecond, 1000*time.Millisecond) | ||||||
|  | 	defer ticker.Stop() | ||||||
|  | loop: | ||||||
|  | 	for { | ||||||
|  | 		select { | ||||||
|  | 		case <-ctx.Done(): | ||||||
|  | 			ticker.Stop() | ||||||
|  | 			break loop | ||||||
|  | 		case v, ok := <-ticker.C: | ||||||
|  | 			if ok { | ||||||
|  | 				t.Fatalf("context must be closed %s", v) | ||||||
|  | 			} | ||||||
|  | 			break loop | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func TestTicker(t *testing.T) { | ||||||
|  | 	t.Parallel() | ||||||
|  |  | ||||||
|  | 	min := time.Duration(10) | ||||||
|  | 	max := time.Duration(20) | ||||||
|  |  | ||||||
|  | 	// tick can take a little longer since we're not adjusting it to account for | ||||||
|  | 	// processing. | ||||||
|  | 	precision := time.Duration(4) | ||||||
|  |  | ||||||
|  | 	rt := NewTicker(min*time.Millisecond, max*time.Millisecond) | ||||||
|  | 	for i := 0; i < 5; i++ { | ||||||
|  | 		t0 := time.Now() | ||||||
|  | 		t1 := <-rt.C | ||||||
|  | 		td := t1.Sub(t0) | ||||||
|  | 		if td < min*time.Millisecond { | ||||||
|  | 			t.Fatalf("tick was shorter than expected: %s", td) | ||||||
|  | 		} else if td > (max+precision)*time.Millisecond { | ||||||
|  | 			t.Fatalf("tick was longer than expected: %s", td) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	rt.Stop() | ||||||
|  | 	time.Sleep((max + precision) * time.Millisecond) | ||||||
|  | 	select { | ||||||
|  | 	case v, ok := <-rt.C: | ||||||
|  | 		if ok || !v.IsZero() { | ||||||
|  | 			t.Fatal("ticker did not shut down") | ||||||
|  | 		} | ||||||
|  | 	default: | ||||||
|  | 		t.Fatal("expected to receive close channel signal") | ||||||
|  | 	} | ||||||
|  | } | ||||||
| @@ -1,7 +1,11 @@ | |||||||
| package register // import "go.unistack.org/micro/v3/util/register" | package register // import "go.unistack.org/micro/v3/util/register" | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
| 	"go.unistack.org/micro/v3/register" | 	"go.unistack.org/micro/v3/register" | ||||||
|  | 	jitter "go.unistack.org/micro/v3/util/jitter" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| func addNodes(old, neu []*register.Node) []*register.Node { | func addNodes(old, neu []*register.Node) []*register.Node { | ||||||
| @@ -146,3 +150,30 @@ func Remove(old, del []*register.Service) []*register.Service { | |||||||
|  |  | ||||||
| 	return services | 	return services | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // WaitService using register wait for service to appear with min/max interval for check and optional timeout. | ||||||
|  | // Timeout can be 0 to wait infinitive. | ||||||
|  | func WaitService(ctx context.Context, reg register.Register, name string, min time.Duration, max time.Duration, timeout time.Duration, opts ...register.LookupOption) error { | ||||||
|  | 	if timeout > 0 { | ||||||
|  | 		var cancel context.CancelFunc | ||||||
|  | 		ctx, cancel = context.WithTimeout(ctx, timeout) | ||||||
|  | 		defer cancel() | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	ticker := jitter.NewTickerContext(ctx, min, max) | ||||||
|  | 	defer ticker.Stop() | ||||||
|  |  | ||||||
|  | 	for { | ||||||
|  | 		select { | ||||||
|  | 		case <-ctx.Done(): | ||||||
|  | 			return ctx.Err() | ||||||
|  | 		case _, ok := <-ticker.C: | ||||||
|  | 			if _, err := reg.LookupService(ctx, name, opts...); err == nil { | ||||||
|  | 				return nil | ||||||
|  | 			} | ||||||
|  | 			if ok { | ||||||
|  | 				return register.ErrNotFound | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user