diff --git a/consul.go b/consul.go index f17bf98..b4eae0c 100644 --- a/consul.go +++ b/consul.go @@ -3,10 +3,12 @@ package consul import ( "context" "fmt" + "reflect" "github.com/hashicorp/consul/api" "github.com/imdario/mergo" "github.com/unistack-org/micro/v3/config" + "github.com/unistack-org/micro/v3/util/jitter" rutil "github.com/unistack-org/micro/v3/util/reflect" ) @@ -104,11 +106,16 @@ func (c *consulConfig) Load(ctx context.Context, opts ...config.LoadOption) erro mopts = append(mopts, mergo.WithAppendSlice) } - src, err := rutil.Zero(c.opts.Struct) + dst := c.opts.Struct + if options.Struct != nil { + dst = options.Struct + } + + src, err := rutil.Zero(dst) if err == nil { err = c.opts.Codec.Unmarshal(pair.Value, src) if err == nil { - err = mergo.Merge(c.opts.Struct, src, mopts...) + err = mergo.Merge(dst, src, mopts...) } } @@ -160,6 +167,113 @@ func (c *consulConfig) Name() string { return c.opts.Name } +func (c *consulConfig) Watch(ctx context.Context, opts ...config.WatchOption) (config.Watcher, error) { + w := &consulWatcher{ + cli: c.cli, + path: c.path, + opts: c.opts, + wopts: config.NewWatchOptions(opts...), + done: make(chan struct{}), + vchan: make(chan map[string]interface{}), + echan: make(chan error), + } + + go w.run() + + return w, nil +} + +type consulWatcher struct { + cli *api.Client + path string + opts config.Options + wopts config.WatchOptions + done chan struct{} + vchan chan map[string]interface{} + echan chan error +} + +func (w *consulWatcher) run() { + ticker := jitter.NewTicker(w.wopts.MinInterval, w.wopts.MaxInterval) + defer ticker.Stop() + + src := w.opts.Struct + if w.wopts.Struct != nil { + src = w.wopts.Struct + } + + for { + select { + case <-w.done: + return + case <-ticker.C: + dst, err := rutil.Zero(src) + if err != nil { + w.echan <- err + return + } + + pair, _, err := w.cli.KV().Get(w.path, nil) + if err != nil { + w.echan <- err + return + } + + if pair == nil { + w.echan <- fmt.Errorf("consul path %s not found", w.path) + return + } + + err = w.opts.Codec.Unmarshal(pair.Value, dst) + if err != nil { + w.echan <- err + return + } + + srcmp, err := rutil.StructFieldsMap(src) + if err != nil { + w.echan <- err + return + } + + dstmp, err := rutil.StructFieldsMap(dst) + if err != nil { + w.echan <- err + return + } + + for sk, sv := range srcmp { + if reflect.DeepEqual(dstmp[sk], sv) { + delete(dstmp, sk) + } + } + + w.vchan <- dstmp + src = dst + } + } +} + +func (w *consulWatcher) Next() (map[string]interface{}, error) { + select { + case <-w.done: + break + case err := <-w.echan: + return nil, err + case v, ok := <-w.vchan: + if !ok { + break + } + return v, nil + } + return nil, config.ErrWatcherStopped +} + +func (w *consulWatcher) Stop() error { + close(w.done) + return nil +} + func NewConfig(opts ...config.Option) config.Config { options := config.NewOptions(opts...) if len(options.StructTag) == 0 { diff --git a/go.mod b/go.mod index 0369c74..613b6cf 100644 --- a/go.mod +++ b/go.mod @@ -9,5 +9,5 @@ require ( github.com/hashicorp/golang-lru v0.5.3 // indirect github.com/imdario/mergo v0.3.12 github.com/pkg/errors v0.9.1 // indirect - github.com/unistack-org/micro/v3 v3.4.0 + github.com/unistack-org/micro/v3 v3.5.8 ) diff --git a/go.sum b/go.sum index 21ab178..6fb2e39 100644 --- a/go.sum +++ b/go.sum @@ -7,15 +7,15 @@ github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kB github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/ef-ds/deque v1.0.4/go.mod h1:gXDnTC3yqvBcHbq2lcExjtAcVrOnJCbMcZXmuj8Z4tg= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= +github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= -github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/consul/api v1.8.1 h1:BOEQaMWoGMhmQ29fC26bi0qb7/rId9JzZP2V0Xmx7m8= github.com/hashicorp/consul/api v1.8.1/go.mod h1:sDjTOq0yUyv5G4h+BqSea7Fn6BU+XbolEz1952UB+mk= github.com/hashicorp/consul/sdk v0.7.0 h1:H6R9d008jDcHPQPAqPNuydAshJ4v5/8URdFnUvK/+sc= @@ -25,8 +25,6 @@ github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brv github.com/hashicorp/go-cleanhttp v0.5.1 h1:dH3aiDG9Jvb5r5+bYHsikaOUIpcM0xvgMXVoDkXMzJM= github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-hclog v0.12.0/go.mod h1:whpDNt7SSdeAju8AWKIWsul05p54N/39EeqMAyrmvFQ= -github.com/hashicorp/go-hclog v0.16.0 h1:uCeOEwSWGMwhJUdpUjk+1cVKIEfGu2/1nFXukimi2MU= -github.com/hashicorp/go-hclog v0.16.0/go.mod h1:whpDNt7SSdeAju8AWKIWsul05p54N/39EeqMAyrmvFQ= github.com/hashicorp/go-hclog v0.16.1 h1:IVQwpTGNRRIHafnTs2dQLIk4ENtneRIEEJWOVDqz99o= github.com/hashicorp/go-hclog v0.16.1/go.mod h1:whpDNt7SSdeAju8AWKIWsul05p54N/39EeqMAyrmvFQ= github.com/hashicorp/go-immutable-radix v1.0.0 h1:AKDB1HM5PWEA7i4nhcpwOrO2byshxBjXVn/J/3+z5/0= @@ -97,8 +95,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/unistack-org/micro/v3 v3.4.0 h1:z9F3lgAb2j4cZ1ib5qBj66JPYUAzR4sNIJqUDjVwyVQ= -github.com/unistack-org/micro/v3 v3.4.0/go.mod h1:LXmPfbJnJNvL0kQs8HfnkV3Wya2Wb+C7keVq++RCZnk= +github.com/unistack-org/micro/v3 v3.5.8 h1:7kuNW7Pih3LYUA0DuVt9l87LjtT4rsqDFyTYLrLtNNI= +github.com/unistack-org/micro/v3 v3.5.8/go.mod h1:zQnZPEy842kQNcyjmVys6tdMjty4PHdyUUKYm1wrg1s= golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392 h1:ACG4HJsFiNMf47Y4PeRoebLNy/2lXT9EtprMuTFWt1M=