Compare commits

...

2 Commits

Author SHA1 Message Date
84e024d880 logger: change logger interface
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-08-06 02:58:19 +03:00
30826a6959 config: minor changes to split config and watcher files
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-08-04 13:53:22 +03:00
5 changed files with 119 additions and 101 deletions

View File

@@ -3,12 +3,10 @@ package consul
import (
"context"
"fmt"
"reflect"
"github.com/hashicorp/consul/api"
"github.com/imdario/mergo"
"github.com/unistack-org/micro/v3/config"
"github.com/unistack-org/micro/v3/util/jitter"
rutil "github.com/unistack-org/micro/v3/util/reflect"
)
@@ -183,98 +181,6 @@ func (c *consulConfig) Watch(ctx context.Context, opts ...config.WatchOption) (c
return w, nil
}
type consulWatcher struct {
cli *api.Client
path string
opts config.Options
wopts config.WatchOptions
done chan struct{}
vchan chan map[string]interface{}
echan chan error
}
func (w *consulWatcher) run() {
ticker := jitter.NewTicker(w.wopts.MinInterval, w.wopts.MaxInterval)
defer ticker.Stop()
src := w.opts.Struct
if w.wopts.Struct != nil {
src = w.wopts.Struct
}
for {
select {
case <-w.done:
return
case <-ticker.C:
dst, err := rutil.Zero(src)
if err != nil {
w.echan <- err
return
}
pair, _, err := w.cli.KV().Get(w.path, nil)
if err != nil {
w.echan <- err
return
}
if pair == nil {
w.echan <- fmt.Errorf("consul path %s not found", w.path)
return
}
err = w.opts.Codec.Unmarshal(pair.Value, dst)
if err != nil {
w.echan <- err
return
}
srcmp, err := rutil.StructFieldsMap(src)
if err != nil {
w.echan <- err
return
}
dstmp, err := rutil.StructFieldsMap(dst)
if err != nil {
w.echan <- err
return
}
for sk, sv := range srcmp {
if reflect.DeepEqual(dstmp[sk], sv) {
delete(dstmp, sk)
}
}
if len(dstmp) > 0 {
w.vchan <- dstmp
src = dst
}
}
}
}
func (w *consulWatcher) Next() (map[string]interface{}, error) {
select {
case <-w.done:
break
case err := <-w.echan:
return nil, err
case v, ok := <-w.vchan:
if !ok {
break
}
return v, nil
}
return nil, config.ErrWatcherStopped
}
func (w *consulWatcher) Stop() error {
close(w.done)
return nil
}
func NewConfig(opts ...config.Option) config.Config {
options := config.NewOptions(opts...)
if len(options.StructTag) == 0 {

2
go.mod
View File

@@ -9,5 +9,5 @@ require (
github.com/hashicorp/golang-lru v0.5.3 // indirect
github.com/imdario/mergo v0.3.12
github.com/pkg/errors v0.9.1 // indirect
github.com/unistack-org/micro/v3 v3.5.8
github.com/unistack-org/micro/v3 v3.6.0
)

4
go.sum
View File

@@ -95,8 +95,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/unistack-org/micro/v3 v3.5.8 h1:7kuNW7Pih3LYUA0DuVt9l87LjtT4rsqDFyTYLrLtNNI=
github.com/unistack-org/micro/v3 v3.5.8/go.mod h1:zQnZPEy842kQNcyjmVys6tdMjty4PHdyUUKYm1wrg1s=
github.com/unistack-org/micro/v3 v3.6.0 h1:atxcH6C5JWVjXPDQiT8N9SALf1yWaVtpVvvxVdz7Y7s=
github.com/unistack-org/micro/v3 v3.6.0/go.mod h1:zQnZPEy842kQNcyjmVys6tdMjty4PHdyUUKYm1wrg1s=
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392 h1:ACG4HJsFiNMf47Y4PeRoebLNy/2lXT9EtprMuTFWt1M=

View File

@@ -27,7 +27,7 @@ func (l *consulLogger) Name() string {
func (l *consulLogger) With(args ...interface{}) hclog.Logger {
fields := make(map[string]interface{}, int(len(args)/2))
for i := 0; i < int(len(args)/2); i = i + 2 {
for i := 0; i < int(len(args)/2); i += 2 {
fields[fmt.Sprintf("%v", args[i])] = args[i+1]
}
return &consulLogger{logger: l.logger.Fields(fields)}
@@ -62,16 +62,25 @@ func (l *consulLogger) ImpliedArgs() []interface{} {
func (l *consulLogger) Named(name string) hclog.Logger {
var newname string
if oldname, ok := l.logger.Options().Fields["name"]; ok {
var oldname string
fields := l.logger.Options().Fields
for i := 0; i < len(fields); i += 2 {
if fields[i].(string) == "name" {
oldname = fields[i+1].(string)
}
}
if len(oldname) > 0 {
newname = fmt.Sprintf("%s.%s", oldname, name)
} else {
newname = fmt.Sprintf("%s", name)
}
return &consulLogger{logger: l.logger.Fields(map[string]interface{}{"name": newname})}
return &consulLogger{logger: l.logger.Fields("name", newname)}
}
func (l *consulLogger) ResetNamed(name string) hclog.Logger {
return &consulLogger{logger: l.logger.Fields(map[string]interface{}{"name": name})}
return &consulLogger{logger: l.logger.Fields("name", name)}
}
func (l *consulLogger) SetLevel(level hclog.Level) {

103
watcher.go Normal file
View File

@@ -0,0 +1,103 @@
package consul
import (
"fmt"
"reflect"
"github.com/hashicorp/consul/api"
"github.com/unistack-org/micro/v3/config"
"github.com/unistack-org/micro/v3/util/jitter"
rutil "github.com/unistack-org/micro/v3/util/reflect"
)
type consulWatcher struct {
cli *api.Client
path string
opts config.Options
wopts config.WatchOptions
done chan struct{}
vchan chan map[string]interface{}
echan chan error
}
func (w *consulWatcher) run() {
ticker := jitter.NewTicker(w.wopts.MinInterval, w.wopts.MaxInterval)
defer ticker.Stop()
src := w.opts.Struct
if w.wopts.Struct != nil {
src = w.wopts.Struct
}
for {
select {
case <-w.done:
return
case <-ticker.C:
dst, err := rutil.Zero(src)
if err != nil {
w.echan <- err
return
}
pair, _, err := w.cli.KV().Get(w.path, nil)
if err != nil {
w.echan <- err
return
}
if pair == nil {
w.echan <- fmt.Errorf("consul path %s not found", w.path)
return
}
err = w.opts.Codec.Unmarshal(pair.Value, dst)
if err != nil {
w.echan <- err
return
}
srcmp, err := rutil.StructFieldsMap(src)
if err != nil {
w.echan <- err
return
}
dstmp, err := rutil.StructFieldsMap(dst)
if err != nil {
w.echan <- err
return
}
for sk, sv := range srcmp {
if reflect.DeepEqual(dstmp[sk], sv) {
delete(dstmp, sk)
}
}
if len(dstmp) > 0 {
w.vchan <- dstmp
src = dst
}
}
}
}
func (w *consulWatcher) Next() (map[string]interface{}, error) {
select {
case <-w.done:
break
case err := <-w.echan:
return nil, err
case v, ok := <-w.vchan:
if !ok {
break
}
return v, nil
}
return nil, config.ErrWatcherStopped
}
func (w *consulWatcher) Stop() error {
close(w.done)
return nil
}