From 2efb459c6686d226ee5220e1e116490fb0e6b5a3 Mon Sep 17 00:00:00 2001 From: sunfuze Date: Wed, 17 Jun 2020 00:10:52 +0800 Subject: [PATCH] Fix config watch (#1670) * add dirty overrite test case * need version to figure out if config need update or not * using nanosecond as version for two goroutine can run in same second * config should check snapshot version when update * set checksum of ChangeSet Co-authored-by: Asim Aslam --- config/default.go | 5 ++ config/default_test.go | 37 ++++++++++++++ config/loader/memory/memory.go | 88 ++++++++++++++++++++++++---------- config/source/memory/memory.go | 1 + 4 files changed, 107 insertions(+), 24 deletions(-) diff --git a/config/default.go b/config/default.go index 905ccad7..a774469f 100644 --- a/config/default.go +++ b/config/default.go @@ -82,6 +82,11 @@ func (c *config) run() { c.Lock() + if c.snap.Version >= snap.Version { + c.Unlock() + continue + } + // save c.snap = snap diff --git a/config/default_test.go b/config/default_test.go index 93048d56..0250a546 100644 --- a/config/default_test.go +++ b/config/default_test.go @@ -4,12 +4,15 @@ import ( "fmt" "os" "path/filepath" + "runtime" "strings" "testing" "time" + "github.com/micro/go-micro/v2/config/source" "github.com/micro/go-micro/v2/config/source/env" "github.com/micro/go-micro/v2/config/source/file" + "github.com/micro/go-micro/v2/config/source/memory" ) func createFileForIssue18(t *testing.T, content string) *os.File { @@ -127,3 +130,37 @@ func TestConfigMerge(t *testing.T) { actualHost) } } + +func equalS(t *testing.T, actual, expect string) { + if actual != expect { + t.Errorf("Expected %s but got %s", actual, expect) + } +} + +func TestConfigWatcherDirtyOverrite(t *testing.T) { + n := runtime.GOMAXPROCS(0) + defer runtime.GOMAXPROCS(n) + + runtime.GOMAXPROCS(1) + + l := 100 + + ss := make([]source.Source, l, l) + + for i := 0; i < l; i++ { + ss[i] = memory.NewSource(memory.WithJSON([]byte(fmt.Sprintf(`{"key%d": "val%d"}`, i, i)))) + } + + conf, _ := NewConfig() + + for _, s := range ss { + _ = conf.Load(s) + } + runtime.Gosched() + + for i, _ := range ss { + k := fmt.Sprintf("key%d", i) + v := fmt.Sprintf("val%d", i) + equalS(t, conf.Get(k).String(""), v) + } +} diff --git a/config/loader/memory/memory.go b/config/loader/memory/memory.go index 65c586bf..5af7d5c4 100644 --- a/config/loader/memory/memory.go +++ b/config/loader/memory/memory.go @@ -32,19 +32,21 @@ type memory struct { watchers *list.List } +type updateValue struct { + version string + value reader.Value +} + type watcher struct { exit chan bool path []string value reader.Value reader reader.Reader - updates chan reader.Value + version string + updates chan updateValue } func (m *memory) watch(idx int, s source.Source) { - m.Lock() - m.sets = append(m.sets, &source.ChangeSet{Source: s.String()}) - m.Unlock() - // watches a source for changes watch := func(idx int, s source.Watcher) error { for { @@ -70,7 +72,7 @@ func (m *memory) watch(idx int, s source.Source) { m.vals, _ = m.opts.Reader.Values(set) m.snap = &loader.Snapshot{ ChangeSet: set, - Version: fmt.Sprintf("%d", time.Now().Unix()), + Version: genVer(), } m.Unlock() @@ -141,7 +143,7 @@ func (m *memory) reload() error { m.vals, _ = m.opts.Reader.Values(set) m.snap = &loader.Snapshot{ ChangeSet: set, - Version: fmt.Sprintf("%d", time.Now().Unix()), + Version: genVer(), } m.Unlock() @@ -159,11 +161,23 @@ func (m *memory) update() { for e := m.watchers.Front(); e != nil; e = e.Next() { watchers = append(watchers, e.Value.(*watcher)) } + + vals := m.vals + snap := m.snap m.RUnlock() for _, w := range watchers { + if w.version >= snap.Version { + continue + } + + uv := updateValue{ + version: m.snap.Version, + value: vals.Get(w.path...), + } + select { - case w.updates <- m.vals.Get(w.path...): + case w.updates <- uv: default: } } @@ -226,7 +240,7 @@ func (m *memory) Sync() error { m.vals = vals m.snap = &loader.Snapshot{ ChangeSet: set, - Version: fmt.Sprintf("%d", time.Now().Unix()), + Version: genVer(), } m.Unlock() @@ -285,6 +299,7 @@ func (m *memory) Get(path ...string) (reader.Value, error) { } // ok we're going hardcore now + return nil, errors.New("no values") } @@ -333,7 +348,8 @@ func (m *memory) Watch(path ...string) (loader.Watcher, error) { path: path, value: value, reader: m.opts.Reader, - updates: make(chan reader.Value, 1), + updates: make(chan updateValue, 1), + version: m.snap.Version, } e := m.watchers.PushBack(w) @@ -355,28 +371,43 @@ func (m *memory) String() string { } func (w *watcher) Next() (*loader.Snapshot, error) { + update := func(v reader.Value) *loader.Snapshot { + w.value = v + + cs := &source.ChangeSet{ + Data: v.Bytes(), + Format: w.reader.String(), + Source: "memory", + Timestamp: time.Now(), + } + cs.Checksum = cs.Sum() + + return &loader.Snapshot{ + ChangeSet: cs, + Version: w.version, + } + + } + for { select { case <-w.exit: return nil, errors.New("watcher stopped") - case v := <-w.updates: + + case uv := <-w.updates: + if uv.version <= w.version { + continue + } + + v := uv.value + + w.version = uv.version + if bytes.Equal(w.value.Bytes(), v.Bytes()) { continue } - w.value = v - cs := &source.ChangeSet{ - Data: v.Bytes(), - Format: w.reader.String(), - Source: "memory", - Timestamp: time.Now(), - } - cs.Sum() - - return &loader.Snapshot{ - ChangeSet: cs, - Version: fmt.Sprintf("%d", time.Now().Unix()), - }, nil + return update(v), nil } } } @@ -386,10 +417,16 @@ func (w *watcher) Stop() error { case <-w.exit: default: close(w.exit) + close(w.updates) } + return nil } +func genVer() string { + return fmt.Sprintf("%d", time.Now().UnixNano()) +} + func NewLoader(opts ...loader.Option) loader.Loader { options := loader.Options{ Reader: json.NewReader(), @@ -406,7 +443,10 @@ func NewLoader(opts ...loader.Option) loader.Loader { sources: options.Source, } + m.sets = make([]*source.ChangeSet, len(options.Source)) + for i, s := range options.Source { + m.sets[i] = &source.ChangeSet{Source: s.String()} go m.watch(i, s) } diff --git a/config/source/memory/memory.go b/config/source/memory/memory.go index bb3f2b73..0bb78d23 100644 --- a/config/source/memory/memory.go +++ b/config/source/memory/memory.go @@ -42,6 +42,7 @@ func (s *memory) Watch() (source.Watcher, error) { } func (m *memory) Write(cs *source.ChangeSet) error { + m.Update(cs) return nil }