diff --git a/file.go b/file.go index dda87c6..6684ea7 100644 --- a/file.go +++ b/file.go @@ -134,6 +134,21 @@ func (c *fileConfig) Name() string { return c.opts.Name } +func (c *fileConfig) Watch(ctx context.Context, opts ...config.WatchOption) (config.Watcher, error) { + w := &fileWatcher{ + path: c.path, + opts: c.opts, + wopts: config.NewWatchOptions(opts...), + done: make(chan struct{}), + vchan: make(chan map[string]interface{}), + echan: make(chan error), + } + + go w.run() + + return w, nil +} + func NewConfig(opts ...config.Option) config.Config { options := config.NewOptions(opts...) if len(options.StructTag) == 0 { diff --git a/go.mod b/go.mod index 23cf5c8..e77059d 100644 --- a/go.mod +++ b/go.mod @@ -4,5 +4,5 @@ go 1.16 require ( github.com/imdario/mergo v0.3.12 - github.com/unistack-org/micro/v3 v3.4.0 + github.com/unistack-org/micro/v3 v3.5.9 ) diff --git a/go.sum b/go.sum index 3e10b38..2ff7a1b 100644 --- a/go.sum +++ b/go.sum @@ -1,12 +1,12 @@ -github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/ef-ds/deque v1.0.4/go.mod h1:gXDnTC3yqvBcHbq2lcExjtAcVrOnJCbMcZXmuj8Z4tg= -github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU= github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/silas/dag v0.0.0-20210121180416-41cf55125c34/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= -github.com/unistack-org/micro/v3 v3.4.0 h1:z9F3lgAb2j4cZ1ib5qBj66JPYUAzR4sNIJqUDjVwyVQ= -github.com/unistack-org/micro/v3 v3.4.0/go.mod h1:LXmPfbJnJNvL0kQs8HfnkV3Wya2Wb+C7keVq++RCZnk= +github.com/unistack-org/micro/v3 v3.5.9 h1:9iIxGZ56bVME7E9hqKIHeSHXkn69M9KFyJfaUzi7B9k= +github.com/unistack-org/micro/v3 v3.5.9/go.mod h1:zQnZPEy842kQNcyjmVys6tdMjty4PHdyUUKYm1wrg1s= golang.org/x/net v0.0.0-20210510120150-4163338589ed/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/watcher.go b/watcher.go new file mode 100644 index 0000000..266d0ac --- /dev/null +++ b/watcher.go @@ -0,0 +1,103 @@ +package file + +import ( + "fmt" + "io" + "io/ioutil" + "os" + "reflect" + + "github.com/unistack-org/micro/v3/codec" + "github.com/unistack-org/micro/v3/config" + "github.com/unistack-org/micro/v3/util/jitter" + rutil "github.com/unistack-org/micro/v3/util/reflect" +) + +type fileWatcher struct { + path string + opts config.Options + wopts config.WatchOptions + done chan struct{} + vchan chan map[string]interface{} + echan chan error +} + +func (w *fileWatcher) run() { + ticker := jitter.NewTicker(w.wopts.MinInterval, w.wopts.MaxInterval) + defer ticker.Stop() + + src := w.opts.Struct + if w.wopts.Struct != nil { + src = w.wopts.Struct + } + + for { + select { + case <-w.done: + return + case <-ticker.C: + dst, err := rutil.Zero(src) + if err == nil { + var fp *os.File + if fp, err = os.OpenFile(w.path, os.O_RDONLY, os.FileMode(0400)); err != nil { + w.echan <- fmt.Errorf("failed to open: %s, error: %w", w.path, err) + return + } + var buf []byte + buf, err = ioutil.ReadAll(io.LimitReader(fp, int64(codec.DefaultMaxMsgSize))) + if err == nil { + err = w.opts.Codec.Unmarshal(buf, dst) + } + if err != nil { + _ = fp.Close() + w.echan <- err + return + } + err = fp.Close() + } + if err != nil { + w.echan <- err + return + } + srcmp, err := rutil.StructFieldsMap(src) + if err != nil { + w.echan <- err + return + } + dstmp, err := rutil.StructFieldsMap(dst) + if err != nil { + w.echan <- err + return + } + for sk, sv := range srcmp { + if reflect.DeepEqual(dstmp[sk], sv) { + delete(dstmp, sk) + } + } + if len(dstmp) > 0 { + w.vchan <- dstmp + src = dst + } + } + } +} + +func (w *fileWatcher) Next() (map[string]interface{}, error) { + select { + case <-w.done: + break + case err := <-w.echan: + return nil, err + case v, ok := <-w.vchan: + if !ok { + break + } + return v, nil + } + return nil, config.ErrWatcherStopped +} + +func (w *fileWatcher) Stop() error { + close(w.done) + return nil +}