From 4ea27517b5d1247946157d36989bc19b317785ee Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Tue, 20 Aug 2019 22:32:47 +0100 Subject: [PATCH] Source watcher ErrStoppedWatcher and fixed test --- config/default_test.go | 27 ++++++++++++++++++++++----- config/source/consul/watcher.go | 3 +-- config/source/env/env_test.go | 6 +++--- config/source/env/watcher.go | 4 +--- config/source/file/watcher.go | 5 ++--- config/source/file/watcher_linux.go | 5 ++--- config/source/source.go | 6 ++++++ 7 files changed, 37 insertions(+), 19 deletions(-) diff --git a/config/default_test.go b/config/default_test.go index fd1933f4..87128150 100644 --- a/config/default_test.go +++ b/config/default_test.go @@ -145,16 +145,26 @@ func TestFileChange(t *testing.T) { if err != nil { t.Error(err) } + changeTimes := 0 + done := make(chan bool) + go func() { + defer func() { + close(done) + }() + for { v, err := watcher.Next() if err != nil { - t.Error(err) + if err.Error() != "watcher stopped" { + t.Error(err) + return + } return } changeTimes++ - t.Logf("file change,%s", string(v.Bytes())) + t.Logf("file change,%s: %d", string(v.Bytes()), changeTimes) } }() @@ -169,10 +179,17 @@ func TestFileChange(t *testing.T) { t.Error(err) } - time.Sleep(time.Second) + time.Sleep(500 * time.Millisecond) } - if changeTimes != 4 { - t.Error(fmt.Errorf("watcher error: change times %d is not enough", changeTimes)) + if err := watcher.Stop(); err != nil { + t.Fatalf("failed to stop watcher: %s", err) + } + + // wait for the watcher to finish + <-done + + if changeTimes != 5 { + t.Errorf("watcher error: change times %d is not enough", changeTimes) } } diff --git a/config/source/consul/watcher.go b/config/source/consul/watcher.go index e6993d8b..a20c8f9b 100644 --- a/config/source/consul/watcher.go +++ b/config/source/consul/watcher.go @@ -1,7 +1,6 @@ package consul import ( - "errors" "time" "github.com/hashicorp/consul/api" @@ -80,7 +79,7 @@ func (w *watcher) Next() (*source.ChangeSet, error) { case cs := <-w.ch: return cs, nil case <-w.exit: - return nil, errors.New("watcher stopped") + return nil, source.ErrWatcherStopped } } diff --git a/config/source/env/env_test.go b/config/source/env/env_test.go index 891d8d8b..4c22122e 100644 --- a/config/source/env/env_test.go +++ b/config/source/env/env_test.go @@ -86,8 +86,8 @@ func TestEnvvar_Prefixes(t *testing.T) { } func TestEnvvar_WatchNextNoOpsUntilStop(t *testing.T) { - source := NewSource(WithStrippedPrefix("GOMICRO_")) - w, err := source.Watch() + src := NewSource(WithStrippedPrefix("GOMICRO_")) + w, err := src.Watch() if err != nil { t.Error(err) } @@ -97,7 +97,7 @@ func TestEnvvar_WatchNextNoOpsUntilStop(t *testing.T) { w.Stop() }() - if _, err := w.Next(); err.Error() != "watcher stopped" { + if _, err := w.Next(); err != source.ErrWatcherStopped { t.Errorf("expected watcher stopped error, got %v", err) } } diff --git a/config/source/env/watcher.go b/config/source/env/watcher.go index 5dd3ef34..4ffe783c 100644 --- a/config/source/env/watcher.go +++ b/config/source/env/watcher.go @@ -1,8 +1,6 @@ package env import ( - "errors" - "github.com/micro/go-micro/config/source" ) @@ -13,7 +11,7 @@ type watcher struct { func (w *watcher) Next() (*source.ChangeSet, error) { <-w.exit - return nil, errors.New("watcher stopped") + return nil, source.ErrWatcherStopped } func (w *watcher) Stop() error { diff --git a/config/source/file/watcher.go b/config/source/file/watcher.go index 76ecd260..d9595e27 100644 --- a/config/source/file/watcher.go +++ b/config/source/file/watcher.go @@ -3,7 +3,6 @@ package file import ( - "errors" "os" "github.com/fsnotify/fsnotify" @@ -36,7 +35,7 @@ func (w *watcher) Next() (*source.ChangeSet, error) { // is it closed? select { case <-w.exit: - return nil, errors.New("watcher stopped") + return nil, source.ErrWatcherStopped default: } @@ -59,7 +58,7 @@ func (w *watcher) Next() (*source.ChangeSet, error) { case err := <-w.fw.Errors: return nil, err case <-w.exit: - return nil, errors.New("watcher stopped") + return nil, source.ErrWatcherStopped } } diff --git a/config/source/file/watcher_linux.go b/config/source/file/watcher_linux.go index 3f48a00b..82d45154 100644 --- a/config/source/file/watcher_linux.go +++ b/config/source/file/watcher_linux.go @@ -3,7 +3,6 @@ package file import ( - "errors" "os" "github.com/fsnotify/fsnotify" @@ -36,7 +35,7 @@ func (w *watcher) Next() (*source.ChangeSet, error) { // is it closed? select { case <-w.exit: - return nil, errors.New("watcher stopped") + return nil, source.ErrWatcherStopped default: } @@ -63,7 +62,7 @@ func (w *watcher) Next() (*source.ChangeSet, error) { case err := <-w.fw.Errors: return nil, err case <-w.exit: - return nil, errors.New("watcher stopped") + return nil, source.ErrWatcherStopped } } diff --git a/config/source/source.go b/config/source/source.go index 828c8ad2..c6d961be 100644 --- a/config/source/source.go +++ b/config/source/source.go @@ -2,9 +2,15 @@ package source import ( + "errors" "time" ) +var ( + // ErrWatcherStopped is returned when source watcher has been stopped + ErrWatcherStopped = errors.New("watcher stopped") +) + // Source is the source from which config is loaded type Source interface { Read() (*ChangeSet, error)