diff --git a/consul.go b/consul.go index 9890533..9bb2246 100644 --- a/consul.go +++ b/consul.go @@ -3,12 +3,10 @@ package consul import ( "context" "fmt" - "reflect" "github.com/hashicorp/consul/api" "github.com/imdario/mergo" "github.com/unistack-org/micro/v3/config" - "github.com/unistack-org/micro/v3/util/jitter" rutil "github.com/unistack-org/micro/v3/util/reflect" ) @@ -183,98 +181,6 @@ func (c *consulConfig) Watch(ctx context.Context, opts ...config.WatchOption) (c return w, nil } -type consulWatcher struct { - cli *api.Client - path string - opts config.Options - wopts config.WatchOptions - done chan struct{} - vchan chan map[string]interface{} - echan chan error -} - -func (w *consulWatcher) run() { - ticker := jitter.NewTicker(w.wopts.MinInterval, w.wopts.MaxInterval) - defer ticker.Stop() - - src := w.opts.Struct - if w.wopts.Struct != nil { - src = w.wopts.Struct - } - - for { - select { - case <-w.done: - return - case <-ticker.C: - dst, err := rutil.Zero(src) - if err != nil { - w.echan <- err - return - } - - pair, _, err := w.cli.KV().Get(w.path, nil) - if err != nil { - w.echan <- err - return - } - - if pair == nil { - w.echan <- fmt.Errorf("consul path %s not found", w.path) - return - } - - err = w.opts.Codec.Unmarshal(pair.Value, dst) - if err != nil { - w.echan <- err - return - } - - srcmp, err := rutil.StructFieldsMap(src) - if err != nil { - w.echan <- err - return - } - - dstmp, err := rutil.StructFieldsMap(dst) - if err != nil { - w.echan <- err - return - } - - for sk, sv := range srcmp { - if reflect.DeepEqual(dstmp[sk], sv) { - delete(dstmp, sk) - } - } - if len(dstmp) > 0 { - w.vchan <- dstmp - src = dst - } - } - } -} - -func (w *consulWatcher) Next() (map[string]interface{}, error) { - select { - case <-w.done: - break - case err := <-w.echan: - return nil, err - case v, ok := <-w.vchan: - if !ok { - break - } - return v, nil - } - return nil, config.ErrWatcherStopped -} - -func (w *consulWatcher) Stop() error { - close(w.done) - return nil -} - func NewConfig(opts ...config.Option) config.Config { options := config.NewOptions(opts...) if len(options.StructTag) == 0 { diff --git a/watcher.go b/watcher.go new file mode 100644 index 0000000..2e8df77 --- /dev/null +++ b/watcher.go @@ -0,0 +1,103 @@ +package consul + +import ( + "fmt" + "reflect" + + "github.com/hashicorp/consul/api" + "github.com/unistack-org/micro/v3/config" + "github.com/unistack-org/micro/v3/util/jitter" + rutil "github.com/unistack-org/micro/v3/util/reflect" +) + +type consulWatcher struct { + cli *api.Client + path string + opts config.Options + wopts config.WatchOptions + done chan struct{} + vchan chan map[string]interface{} + echan chan error +} + +func (w *consulWatcher) run() { + ticker := jitter.NewTicker(w.wopts.MinInterval, w.wopts.MaxInterval) + defer ticker.Stop() + + src := w.opts.Struct + if w.wopts.Struct != nil { + src = w.wopts.Struct + } + + for { + select { + case <-w.done: + return + case <-ticker.C: + dst, err := rutil.Zero(src) + if err != nil { + w.echan <- err + return + } + + pair, _, err := w.cli.KV().Get(w.path, nil) + if err != nil { + w.echan <- err + return + } + + if pair == nil { + w.echan <- fmt.Errorf("consul path %s not found", w.path) + return + } + + err = w.opts.Codec.Unmarshal(pair.Value, dst) + if err != nil { + w.echan <- err + return + } + + srcmp, err := rutil.StructFieldsMap(src) + if err != nil { + w.echan <- err + return + } + + dstmp, err := rutil.StructFieldsMap(dst) + if err != nil { + w.echan <- err + return + } + + for sk, sv := range srcmp { + if reflect.DeepEqual(dstmp[sk], sv) { + delete(dstmp, sk) + } + } + if len(dstmp) > 0 { + w.vchan <- dstmp + src = dst + } + } + } +} + +func (w *consulWatcher) Next() (map[string]interface{}, error) { + select { + case <-w.done: + break + case err := <-w.echan: + return nil, err + case v, ok := <-w.vchan: + if !ok { + break + } + return v, nil + } + return nil, config.ErrWatcherStopped +} + +func (w *consulWatcher) Stop() error { + close(w.done) + return nil +}