Compare commits
3 Commits
Author | SHA1 | Date | |
---|---|---|---|
84e024d880 | |||
30826a6959 | |||
0501ba2bac |
93
consul.go
93
consul.go
@@ -3,12 +3,10 @@ package consul
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/imdario/mergo"
|
||||
"github.com/unistack-org/micro/v3/config"
|
||||
"github.com/unistack-org/micro/v3/util/jitter"
|
||||
rutil "github.com/unistack-org/micro/v3/util/reflect"
|
||||
)
|
||||
|
||||
@@ -183,97 +181,6 @@ func (c *consulConfig) Watch(ctx context.Context, opts ...config.WatchOption) (c
|
||||
return w, nil
|
||||
}
|
||||
|
||||
type consulWatcher struct {
|
||||
cli *api.Client
|
||||
path string
|
||||
opts config.Options
|
||||
wopts config.WatchOptions
|
||||
done chan struct{}
|
||||
vchan chan map[string]interface{}
|
||||
echan chan error
|
||||
}
|
||||
|
||||
func (w *consulWatcher) run() {
|
||||
ticker := jitter.NewTicker(w.wopts.MinInterval, w.wopts.MaxInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
src := w.opts.Struct
|
||||
if w.wopts.Struct != nil {
|
||||
src = w.wopts.Struct
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-w.done:
|
||||
return
|
||||
case <-ticker.C:
|
||||
dst, err := rutil.Zero(src)
|
||||
if err != nil {
|
||||
w.echan <- err
|
||||
return
|
||||
}
|
||||
|
||||
pair, _, err := w.cli.KV().Get(w.path, nil)
|
||||
if err != nil {
|
||||
w.echan <- err
|
||||
return
|
||||
}
|
||||
|
||||
if pair == nil {
|
||||
w.echan <- fmt.Errorf("consul path %s not found", w.path)
|
||||
return
|
||||
}
|
||||
|
||||
err = w.opts.Codec.Unmarshal(pair.Value, dst)
|
||||
if err != nil {
|
||||
w.echan <- err
|
||||
return
|
||||
}
|
||||
|
||||
srcmp, err := rutil.StructFieldsMap(src)
|
||||
if err != nil {
|
||||
w.echan <- err
|
||||
return
|
||||
}
|
||||
|
||||
dstmp, err := rutil.StructFieldsMap(dst)
|
||||
if err != nil {
|
||||
w.echan <- err
|
||||
return
|
||||
}
|
||||
|
||||
for sk, sv := range srcmp {
|
||||
if reflect.DeepEqual(dstmp[sk], sv) {
|
||||
delete(dstmp, sk)
|
||||
}
|
||||
}
|
||||
|
||||
w.vchan <- dstmp
|
||||
src = dst
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *consulWatcher) Next() (map[string]interface{}, error) {
|
||||
select {
|
||||
case <-w.done:
|
||||
break
|
||||
case err := <-w.echan:
|
||||
return nil, err
|
||||
case v, ok := <-w.vchan:
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
return v, nil
|
||||
}
|
||||
return nil, config.ErrWatcherStopped
|
||||
}
|
||||
|
||||
func (w *consulWatcher) Stop() error {
|
||||
close(w.done)
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewConfig(opts ...config.Option) config.Config {
|
||||
options := config.NewOptions(opts...)
|
||||
if len(options.StructTag) == 0 {
|
||||
|
2
go.mod
2
go.mod
@@ -9,5 +9,5 @@ require (
|
||||
github.com/hashicorp/golang-lru v0.5.3 // indirect
|
||||
github.com/imdario/mergo v0.3.12
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/unistack-org/micro/v3 v3.5.8
|
||||
github.com/unistack-org/micro/v3 v3.6.0
|
||||
)
|
||||
|
4
go.sum
4
go.sum
@@ -95,8 +95,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
|
||||
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
|
||||
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
|
||||
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
|
||||
github.com/unistack-org/micro/v3 v3.5.8 h1:7kuNW7Pih3LYUA0DuVt9l87LjtT4rsqDFyTYLrLtNNI=
|
||||
github.com/unistack-org/micro/v3 v3.5.8/go.mod h1:zQnZPEy842kQNcyjmVys6tdMjty4PHdyUUKYm1wrg1s=
|
||||
github.com/unistack-org/micro/v3 v3.6.0 h1:atxcH6C5JWVjXPDQiT8N9SALf1yWaVtpVvvxVdz7Y7s=
|
||||
github.com/unistack-org/micro/v3 v3.6.0/go.mod h1:zQnZPEy842kQNcyjmVys6tdMjty4PHdyUUKYm1wrg1s=
|
||||
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392 h1:ACG4HJsFiNMf47Y4PeRoebLNy/2lXT9EtprMuTFWt1M=
|
||||
|
17
logger.go
17
logger.go
@@ -27,7 +27,7 @@ func (l *consulLogger) Name() string {
|
||||
|
||||
func (l *consulLogger) With(args ...interface{}) hclog.Logger {
|
||||
fields := make(map[string]interface{}, int(len(args)/2))
|
||||
for i := 0; i < int(len(args)/2); i = i + 2 {
|
||||
for i := 0; i < int(len(args)/2); i += 2 {
|
||||
fields[fmt.Sprintf("%v", args[i])] = args[i+1]
|
||||
}
|
||||
return &consulLogger{logger: l.logger.Fields(fields)}
|
||||
@@ -62,16 +62,25 @@ func (l *consulLogger) ImpliedArgs() []interface{} {
|
||||
|
||||
func (l *consulLogger) Named(name string) hclog.Logger {
|
||||
var newname string
|
||||
if oldname, ok := l.logger.Options().Fields["name"]; ok {
|
||||
var oldname string
|
||||
|
||||
fields := l.logger.Options().Fields
|
||||
for i := 0; i < len(fields); i += 2 {
|
||||
if fields[i].(string) == "name" {
|
||||
oldname = fields[i+1].(string)
|
||||
}
|
||||
}
|
||||
|
||||
if len(oldname) > 0 {
|
||||
newname = fmt.Sprintf("%s.%s", oldname, name)
|
||||
} else {
|
||||
newname = fmt.Sprintf("%s", name)
|
||||
}
|
||||
return &consulLogger{logger: l.logger.Fields(map[string]interface{}{"name": newname})}
|
||||
return &consulLogger{logger: l.logger.Fields("name", newname)}
|
||||
}
|
||||
|
||||
func (l *consulLogger) ResetNamed(name string) hclog.Logger {
|
||||
return &consulLogger{logger: l.logger.Fields(map[string]interface{}{"name": name})}
|
||||
return &consulLogger{logger: l.logger.Fields("name", name)}
|
||||
}
|
||||
|
||||
func (l *consulLogger) SetLevel(level hclog.Level) {
|
||||
|
103
watcher.go
Normal file
103
watcher.go
Normal file
@@ -0,0 +1,103 @@
|
||||
package consul
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/unistack-org/micro/v3/config"
|
||||
"github.com/unistack-org/micro/v3/util/jitter"
|
||||
rutil "github.com/unistack-org/micro/v3/util/reflect"
|
||||
)
|
||||
|
||||
type consulWatcher struct {
|
||||
cli *api.Client
|
||||
path string
|
||||
opts config.Options
|
||||
wopts config.WatchOptions
|
||||
done chan struct{}
|
||||
vchan chan map[string]interface{}
|
||||
echan chan error
|
||||
}
|
||||
|
||||
func (w *consulWatcher) run() {
|
||||
ticker := jitter.NewTicker(w.wopts.MinInterval, w.wopts.MaxInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
src := w.opts.Struct
|
||||
if w.wopts.Struct != nil {
|
||||
src = w.wopts.Struct
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-w.done:
|
||||
return
|
||||
case <-ticker.C:
|
||||
dst, err := rutil.Zero(src)
|
||||
if err != nil {
|
||||
w.echan <- err
|
||||
return
|
||||
}
|
||||
|
||||
pair, _, err := w.cli.KV().Get(w.path, nil)
|
||||
if err != nil {
|
||||
w.echan <- err
|
||||
return
|
||||
}
|
||||
|
||||
if pair == nil {
|
||||
w.echan <- fmt.Errorf("consul path %s not found", w.path)
|
||||
return
|
||||
}
|
||||
|
||||
err = w.opts.Codec.Unmarshal(pair.Value, dst)
|
||||
if err != nil {
|
||||
w.echan <- err
|
||||
return
|
||||
}
|
||||
|
||||
srcmp, err := rutil.StructFieldsMap(src)
|
||||
if err != nil {
|
||||
w.echan <- err
|
||||
return
|
||||
}
|
||||
|
||||
dstmp, err := rutil.StructFieldsMap(dst)
|
||||
if err != nil {
|
||||
w.echan <- err
|
||||
return
|
||||
}
|
||||
|
||||
for sk, sv := range srcmp {
|
||||
if reflect.DeepEqual(dstmp[sk], sv) {
|
||||
delete(dstmp, sk)
|
||||
}
|
||||
}
|
||||
if len(dstmp) > 0 {
|
||||
w.vchan <- dstmp
|
||||
src = dst
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *consulWatcher) Next() (map[string]interface{}, error) {
|
||||
select {
|
||||
case <-w.done:
|
||||
break
|
||||
case err := <-w.echan:
|
||||
return nil, err
|
||||
case v, ok := <-w.vchan:
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
return v, nil
|
||||
}
|
||||
return nil, config.ErrWatcherStopped
|
||||
}
|
||||
|
||||
func (w *consulWatcher) Stop() error {
|
||||
close(w.done)
|
||||
return nil
|
||||
}
|
Reference in New Issue
Block a user