Fix config watch (#1670)

* add dirty overrite test case

* need version to figure out if config need update or not

* using nanosecond as version for two goroutine can run in same second

* config should check snapshot version when update

* set checksum of ChangeSet

Co-authored-by: Asim Aslam <asim@aslam.me>
This commit is contained in:
sunfuze 2020-06-17 00:10:52 +08:00 committed by GitHub
parent 6add74b4f6
commit 2efb459c66
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 107 additions and 24 deletions

View File

@ -82,6 +82,11 @@ func (c *config) run() {
c.Lock() c.Lock()
if c.snap.Version >= snap.Version {
c.Unlock()
continue
}
// save // save
c.snap = snap c.snap = snap

View File

@ -4,12 +4,15 @@ import (
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
"runtime"
"strings" "strings"
"testing" "testing"
"time" "time"
"github.com/micro/go-micro/v2/config/source"
"github.com/micro/go-micro/v2/config/source/env" "github.com/micro/go-micro/v2/config/source/env"
"github.com/micro/go-micro/v2/config/source/file" "github.com/micro/go-micro/v2/config/source/file"
"github.com/micro/go-micro/v2/config/source/memory"
) )
func createFileForIssue18(t *testing.T, content string) *os.File { func createFileForIssue18(t *testing.T, content string) *os.File {
@ -127,3 +130,37 @@ func TestConfigMerge(t *testing.T) {
actualHost) actualHost)
} }
} }
func equalS(t *testing.T, actual, expect string) {
if actual != expect {
t.Errorf("Expected %s but got %s", actual, expect)
}
}
func TestConfigWatcherDirtyOverrite(t *testing.T) {
n := runtime.GOMAXPROCS(0)
defer runtime.GOMAXPROCS(n)
runtime.GOMAXPROCS(1)
l := 100
ss := make([]source.Source, l, l)
for i := 0; i < l; i++ {
ss[i] = memory.NewSource(memory.WithJSON([]byte(fmt.Sprintf(`{"key%d": "val%d"}`, i, i))))
}
conf, _ := NewConfig()
for _, s := range ss {
_ = conf.Load(s)
}
runtime.Gosched()
for i, _ := range ss {
k := fmt.Sprintf("key%d", i)
v := fmt.Sprintf("val%d", i)
equalS(t, conf.Get(k).String(""), v)
}
}

View File

@ -32,19 +32,21 @@ type memory struct {
watchers *list.List watchers *list.List
} }
type updateValue struct {
version string
value reader.Value
}
type watcher struct { type watcher struct {
exit chan bool exit chan bool
path []string path []string
value reader.Value value reader.Value
reader reader.Reader reader reader.Reader
updates chan reader.Value version string
updates chan updateValue
} }
func (m *memory) watch(idx int, s source.Source) { func (m *memory) watch(idx int, s source.Source) {
m.Lock()
m.sets = append(m.sets, &source.ChangeSet{Source: s.String()})
m.Unlock()
// watches a source for changes // watches a source for changes
watch := func(idx int, s source.Watcher) error { watch := func(idx int, s source.Watcher) error {
for { for {
@ -70,7 +72,7 @@ func (m *memory) watch(idx int, s source.Source) {
m.vals, _ = m.opts.Reader.Values(set) m.vals, _ = m.opts.Reader.Values(set)
m.snap = &loader.Snapshot{ m.snap = &loader.Snapshot{
ChangeSet: set, ChangeSet: set,
Version: fmt.Sprintf("%d", time.Now().Unix()), Version: genVer(),
} }
m.Unlock() m.Unlock()
@ -141,7 +143,7 @@ func (m *memory) reload() error {
m.vals, _ = m.opts.Reader.Values(set) m.vals, _ = m.opts.Reader.Values(set)
m.snap = &loader.Snapshot{ m.snap = &loader.Snapshot{
ChangeSet: set, ChangeSet: set,
Version: fmt.Sprintf("%d", time.Now().Unix()), Version: genVer(),
} }
m.Unlock() m.Unlock()
@ -159,11 +161,23 @@ func (m *memory) update() {
for e := m.watchers.Front(); e != nil; e = e.Next() { for e := m.watchers.Front(); e != nil; e = e.Next() {
watchers = append(watchers, e.Value.(*watcher)) watchers = append(watchers, e.Value.(*watcher))
} }
vals := m.vals
snap := m.snap
m.RUnlock() m.RUnlock()
for _, w := range watchers { for _, w := range watchers {
if w.version >= snap.Version {
continue
}
uv := updateValue{
version: m.snap.Version,
value: vals.Get(w.path...),
}
select { select {
case w.updates <- m.vals.Get(w.path...): case w.updates <- uv:
default: default:
} }
} }
@ -226,7 +240,7 @@ func (m *memory) Sync() error {
m.vals = vals m.vals = vals
m.snap = &loader.Snapshot{ m.snap = &loader.Snapshot{
ChangeSet: set, ChangeSet: set,
Version: fmt.Sprintf("%d", time.Now().Unix()), Version: genVer(),
} }
m.Unlock() m.Unlock()
@ -285,6 +299,7 @@ func (m *memory) Get(path ...string) (reader.Value, error) {
} }
// ok we're going hardcore now // ok we're going hardcore now
return nil, errors.New("no values") return nil, errors.New("no values")
} }
@ -333,7 +348,8 @@ func (m *memory) Watch(path ...string) (loader.Watcher, error) {
path: path, path: path,
value: value, value: value,
reader: m.opts.Reader, reader: m.opts.Reader,
updates: make(chan reader.Value, 1), updates: make(chan updateValue, 1),
version: m.snap.Version,
} }
e := m.watchers.PushBack(w) e := m.watchers.PushBack(w)
@ -355,28 +371,43 @@ func (m *memory) String() string {
} }
func (w *watcher) Next() (*loader.Snapshot, error) { func (w *watcher) Next() (*loader.Snapshot, error) {
update := func(v reader.Value) *loader.Snapshot {
w.value = v
cs := &source.ChangeSet{
Data: v.Bytes(),
Format: w.reader.String(),
Source: "memory",
Timestamp: time.Now(),
}
cs.Checksum = cs.Sum()
return &loader.Snapshot{
ChangeSet: cs,
Version: w.version,
}
}
for { for {
select { select {
case <-w.exit: case <-w.exit:
return nil, errors.New("watcher stopped") return nil, errors.New("watcher stopped")
case v := <-w.updates:
case uv := <-w.updates:
if uv.version <= w.version {
continue
}
v := uv.value
w.version = uv.version
if bytes.Equal(w.value.Bytes(), v.Bytes()) { if bytes.Equal(w.value.Bytes(), v.Bytes()) {
continue continue
} }
w.value = v
cs := &source.ChangeSet{ return update(v), nil
Data: v.Bytes(),
Format: w.reader.String(),
Source: "memory",
Timestamp: time.Now(),
}
cs.Sum()
return &loader.Snapshot{
ChangeSet: cs,
Version: fmt.Sprintf("%d", time.Now().Unix()),
}, nil
} }
} }
} }
@ -386,10 +417,16 @@ func (w *watcher) Stop() error {
case <-w.exit: case <-w.exit:
default: default:
close(w.exit) close(w.exit)
close(w.updates)
} }
return nil return nil
} }
func genVer() string {
return fmt.Sprintf("%d", time.Now().UnixNano())
}
func NewLoader(opts ...loader.Option) loader.Loader { func NewLoader(opts ...loader.Option) loader.Loader {
options := loader.Options{ options := loader.Options{
Reader: json.NewReader(), Reader: json.NewReader(),
@ -406,7 +443,10 @@ func NewLoader(opts ...loader.Option) loader.Loader {
sources: options.Source, sources: options.Source,
} }
m.sets = make([]*source.ChangeSet, len(options.Source))
for i, s := range options.Source { for i, s := range options.Source {
m.sets[i] = &source.ChangeSet{Source: s.String()}
go m.watch(i, s) go m.watch(i, s)
} }

View File

@ -42,6 +42,7 @@ func (s *memory) Watch() (source.Watcher, error) {
} }
func (m *memory) Write(cs *source.ChangeSet) error { func (m *memory) Write(cs *source.ChangeSet) error {
m.Update(cs)
return nil return nil
} }