Fix config watch (#1670)

* add dirty overrite test case

* need version to figure out if config need update or not

* using nanosecond as version for two goroutine can run in same second

* config should check snapshot version when update

* set checksum of ChangeSet

Co-authored-by: Asim Aslam <asim@aslam.me>
This commit is contained in:
sunfuze 2020-06-17 00:10:52 +08:00 committed by Dominic Wong
parent c28f625cd4
commit 81aa8e0231
4 changed files with 107 additions and 24 deletions

View File

@ -82,6 +82,11 @@ func (c *config) run() {
c.Lock()
if c.snap.Version >= snap.Version {
c.Unlock()
continue
}
// save
c.snap = snap

View File

@ -4,12 +4,15 @@ import (
"fmt"
"os"
"path/filepath"
"runtime"
"strings"
"testing"
"time"
"github.com/micro/go-micro/v2/config/source"
"github.com/micro/go-micro/v2/config/source/env"
"github.com/micro/go-micro/v2/config/source/file"
"github.com/micro/go-micro/v2/config/source/memory"
)
func createFileForIssue18(t *testing.T, content string) *os.File {
@ -127,3 +130,37 @@ func TestConfigMerge(t *testing.T) {
actualHost)
}
}
func equalS(t *testing.T, actual, expect string) {
if actual != expect {
t.Errorf("Expected %s but got %s", actual, expect)
}
}
func TestConfigWatcherDirtyOverrite(t *testing.T) {
n := runtime.GOMAXPROCS(0)
defer runtime.GOMAXPROCS(n)
runtime.GOMAXPROCS(1)
l := 100
ss := make([]source.Source, l, l)
for i := 0; i < l; i++ {
ss[i] = memory.NewSource(memory.WithJSON([]byte(fmt.Sprintf(`{"key%d": "val%d"}`, i, i))))
}
conf, _ := NewConfig()
for _, s := range ss {
_ = conf.Load(s)
}
runtime.Gosched()
for i, _ := range ss {
k := fmt.Sprintf("key%d", i)
v := fmt.Sprintf("val%d", i)
equalS(t, conf.Get(k).String(""), v)
}
}

View File

@ -32,19 +32,21 @@ type memory struct {
watchers *list.List
}
type updateValue struct {
version string
value reader.Value
}
type watcher struct {
exit chan bool
path []string
value reader.Value
reader reader.Reader
updates chan reader.Value
version string
updates chan updateValue
}
func (m *memory) watch(idx int, s source.Source) {
m.Lock()
m.sets = append(m.sets, &source.ChangeSet{Source: s.String()})
m.Unlock()
// watches a source for changes
watch := func(idx int, s source.Watcher) error {
for {
@ -70,7 +72,7 @@ func (m *memory) watch(idx int, s source.Source) {
m.vals, _ = m.opts.Reader.Values(set)
m.snap = &loader.Snapshot{
ChangeSet: set,
Version: fmt.Sprintf("%d", time.Now().Unix()),
Version: genVer(),
}
m.Unlock()
@ -141,7 +143,7 @@ func (m *memory) reload() error {
m.vals, _ = m.opts.Reader.Values(set)
m.snap = &loader.Snapshot{
ChangeSet: set,
Version: fmt.Sprintf("%d", time.Now().Unix()),
Version: genVer(),
}
m.Unlock()
@ -159,11 +161,23 @@ func (m *memory) update() {
for e := m.watchers.Front(); e != nil; e = e.Next() {
watchers = append(watchers, e.Value.(*watcher))
}
vals := m.vals
snap := m.snap
m.RUnlock()
for _, w := range watchers {
if w.version >= snap.Version {
continue
}
uv := updateValue{
version: m.snap.Version,
value: vals.Get(w.path...),
}
select {
case w.updates <- m.vals.Get(w.path...):
case w.updates <- uv:
default:
}
}
@ -226,7 +240,7 @@ func (m *memory) Sync() error {
m.vals = vals
m.snap = &loader.Snapshot{
ChangeSet: set,
Version: fmt.Sprintf("%d", time.Now().Unix()),
Version: genVer(),
}
m.Unlock()
@ -285,6 +299,7 @@ func (m *memory) Get(path ...string) (reader.Value, error) {
}
// ok we're going hardcore now
return nil, errors.New("no values")
}
@ -333,7 +348,8 @@ func (m *memory) Watch(path ...string) (loader.Watcher, error) {
path: path,
value: value,
reader: m.opts.Reader,
updates: make(chan reader.Value, 1),
updates: make(chan updateValue, 1),
version: m.snap.Version,
}
e := m.watchers.PushBack(w)
@ -355,28 +371,43 @@ func (m *memory) String() string {
}
func (w *watcher) Next() (*loader.Snapshot, error) {
update := func(v reader.Value) *loader.Snapshot {
w.value = v
cs := &source.ChangeSet{
Data: v.Bytes(),
Format: w.reader.String(),
Source: "memory",
Timestamp: time.Now(),
}
cs.Checksum = cs.Sum()
return &loader.Snapshot{
ChangeSet: cs,
Version: w.version,
}
}
for {
select {
case <-w.exit:
return nil, errors.New("watcher stopped")
case v := <-w.updates:
case uv := <-w.updates:
if uv.version <= w.version {
continue
}
v := uv.value
w.version = uv.version
if bytes.Equal(w.value.Bytes(), v.Bytes()) {
continue
}
w.value = v
cs := &source.ChangeSet{
Data: v.Bytes(),
Format: w.reader.String(),
Source: "memory",
Timestamp: time.Now(),
}
cs.Sum()
return &loader.Snapshot{
ChangeSet: cs,
Version: fmt.Sprintf("%d", time.Now().Unix()),
}, nil
return update(v), nil
}
}
}
@ -386,10 +417,16 @@ func (w *watcher) Stop() error {
case <-w.exit:
default:
close(w.exit)
close(w.updates)
}
return nil
}
func genVer() string {
return fmt.Sprintf("%d", time.Now().UnixNano())
}
func NewLoader(opts ...loader.Option) loader.Loader {
options := loader.Options{
Reader: json.NewReader(),
@ -406,7 +443,10 @@ func NewLoader(opts ...loader.Option) loader.Loader {
sources: options.Source,
}
m.sets = make([]*source.ChangeSet, len(options.Source))
for i, s := range options.Source {
m.sets[i] = &source.ChangeSet{Source: s.String()}
go m.watch(i, s)
}

View File

@ -42,6 +42,7 @@ func (s *memory) Watch() (source.Watcher, error) {
}
func (m *memory) Write(cs *source.ChangeSet) error {
m.Update(cs)
return nil
}