Merge pull request #686 from milosgajdos83/config-watcher

Introduce ErrStoppedWatcher for source.Source Watchers and fixed test
This commit is contained in:
Asim Aslam 2019-08-21 11:06:14 +01:00 committed by GitHub
commit 0aea8e3163
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 37 additions and 19 deletions

View File

@ -145,16 +145,26 @@ func TestFileChange(t *testing.T) {
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
changeTimes := 0 changeTimes := 0
done := make(chan bool)
go func() { go func() {
defer func() {
close(done)
}()
for { for {
v, err := watcher.Next() v, err := watcher.Next()
if err != nil { if err != nil {
t.Error(err) if err.Error() != "watcher stopped" {
t.Error(err)
return
}
return return
} }
changeTimes++ changeTimes++
t.Logf("file change%s", string(v.Bytes())) t.Logf("file change%s: %d", string(v.Bytes()), changeTimes)
} }
}() }()
@ -169,10 +179,17 @@ func TestFileChange(t *testing.T) {
t.Error(err) t.Error(err)
} }
time.Sleep(time.Second) time.Sleep(500 * time.Millisecond)
} }
if changeTimes != 4 { if err := watcher.Stop(); err != nil {
t.Error(fmt.Errorf("watcher error: change times %d is not enough", changeTimes)) t.Fatalf("failed to stop watcher: %s", err)
}
// wait for the watcher to finish
<-done
if changeTimes != 5 {
t.Errorf("watcher error: change times %d is not enough", changeTimes)
} }
} }

View File

@ -1,7 +1,6 @@
package consul package consul
import ( import (
"errors"
"time" "time"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
@ -80,7 +79,7 @@ func (w *watcher) Next() (*source.ChangeSet, error) {
case cs := <-w.ch: case cs := <-w.ch:
return cs, nil return cs, nil
case <-w.exit: case <-w.exit:
return nil, errors.New("watcher stopped") return nil, source.ErrWatcherStopped
} }
} }

View File

@ -86,8 +86,8 @@ func TestEnvvar_Prefixes(t *testing.T) {
} }
func TestEnvvar_WatchNextNoOpsUntilStop(t *testing.T) { func TestEnvvar_WatchNextNoOpsUntilStop(t *testing.T) {
source := NewSource(WithStrippedPrefix("GOMICRO_")) src := NewSource(WithStrippedPrefix("GOMICRO_"))
w, err := source.Watch() w, err := src.Watch()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
@ -97,7 +97,7 @@ func TestEnvvar_WatchNextNoOpsUntilStop(t *testing.T) {
w.Stop() w.Stop()
}() }()
if _, err := w.Next(); err.Error() != "watcher stopped" { if _, err := w.Next(); err != source.ErrWatcherStopped {
t.Errorf("expected watcher stopped error, got %v", err) t.Errorf("expected watcher stopped error, got %v", err)
} }
} }

View File

@ -1,8 +1,6 @@
package env package env
import ( import (
"errors"
"github.com/micro/go-micro/config/source" "github.com/micro/go-micro/config/source"
) )
@ -13,7 +11,7 @@ type watcher struct {
func (w *watcher) Next() (*source.ChangeSet, error) { func (w *watcher) Next() (*source.ChangeSet, error) {
<-w.exit <-w.exit
return nil, errors.New("watcher stopped") return nil, source.ErrWatcherStopped
} }
func (w *watcher) Stop() error { func (w *watcher) Stop() error {

View File

@ -3,7 +3,6 @@
package file package file
import ( import (
"errors"
"os" "os"
"github.com/fsnotify/fsnotify" "github.com/fsnotify/fsnotify"
@ -36,7 +35,7 @@ func (w *watcher) Next() (*source.ChangeSet, error) {
// is it closed? // is it closed?
select { select {
case <-w.exit: case <-w.exit:
return nil, errors.New("watcher stopped") return nil, source.ErrWatcherStopped
default: default:
} }
@ -59,7 +58,7 @@ func (w *watcher) Next() (*source.ChangeSet, error) {
case err := <-w.fw.Errors: case err := <-w.fw.Errors:
return nil, err return nil, err
case <-w.exit: case <-w.exit:
return nil, errors.New("watcher stopped") return nil, source.ErrWatcherStopped
} }
} }

View File

@ -3,7 +3,6 @@
package file package file
import ( import (
"errors"
"os" "os"
"github.com/fsnotify/fsnotify" "github.com/fsnotify/fsnotify"
@ -36,7 +35,7 @@ func (w *watcher) Next() (*source.ChangeSet, error) {
// is it closed? // is it closed?
select { select {
case <-w.exit: case <-w.exit:
return nil, errors.New("watcher stopped") return nil, source.ErrWatcherStopped
default: default:
} }
@ -63,7 +62,7 @@ func (w *watcher) Next() (*source.ChangeSet, error) {
case err := <-w.fw.Errors: case err := <-w.fw.Errors:
return nil, err return nil, err
case <-w.exit: case <-w.exit:
return nil, errors.New("watcher stopped") return nil, source.ErrWatcherStopped
} }
} }

View File

@ -2,9 +2,15 @@
package source package source
import ( import (
"errors"
"time" "time"
) )
var (
// ErrWatcherStopped is returned when source watcher has been stopped
ErrWatcherStopped = errors.New("watcher stopped")
)
// Source is the source from which config is loaded // Source is the source from which config is loaded
type Source interface { type Source interface {
Read() (*ChangeSet, error) Read() (*ChangeSet, error)