config: add watcher interface

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2021-08-03 00:23:28 +03:00
parent 46d4461f6d
commit ca9f6fb5eb
4 changed files with 236 additions and 18 deletions

View File

@ -4,11 +4,15 @@ package config
import (
"context"
"errors"
"time"
)
// DefaultConfig default config
var DefaultConfig Config = NewConfig()
// DefaultWatcherInterval default interval for poll changes
var DefaultWatcherInterval = 5 * time.Second
var (
// ErrCodecMissing is returned when codec needed and not specified
ErrCodecMissing = errors.New("codec missing")
@ -30,15 +34,17 @@ type Config interface {
Load(context.Context, ...LoadOption) error
// Save config to sources
Save(context.Context, ...SaveOption) error
// Watch a value for changes
//Watch(context.Context) (Watcher, error)
// Watch a config for changes
Watch(context.Context, ...WatchOption) (Watcher, error)
// String returns config type name
String() string
}
// Watcher is the config watcher
type Watcher interface {
// Next() (, error)
// Next blocks until update happens or error returned
Next() (map[string]interface{}, error)
// Stop stops watcher
Stop() error
}

View File

@ -5,6 +5,7 @@ import (
"reflect"
"strconv"
"strings"
"time"
"github.com/imdario/mergo"
rutil "github.com/unistack-org/micro/v3/util/reflect"
@ -41,11 +42,15 @@ func (c *defaultConfig) Load(ctx context.Context, opts ...LoadOption) error {
mopts = append(mopts, mergo.WithAppendSlice)
}
src, err := rutil.Zero(c.opts.Struct)
dst := c.opts.Struct
if options.Struct != nil {
dst = options.Struct
}
src, err := rutil.Zero(dst)
if err == nil {
valueOf := reflect.ValueOf(src)
if err = c.fillValues(valueOf); err == nil {
err = mergo.Merge(c.opts.Struct, src, mopts...)
if err = fillValues(reflect.ValueOf(src), c.opts.StructTag); err == nil {
err = mergo.Merge(dst, src, mopts...)
}
}
@ -63,7 +68,7 @@ func (c *defaultConfig) Load(ctx context.Context, opts ...LoadOption) error {
}
//nolint:gocyclo
func (c *defaultConfig) fillValue(value reflect.Value, val string) error {
func fillValue(value reflect.Value, val string) error {
if !rutil.IsEmpty(value) {
return nil
}
@ -80,10 +85,10 @@ func (c *defaultConfig) fillValue(value reflect.Value, val string) error {
kv := strings.FieldsFunc(nval, func(c rune) bool { return c == '=' })
mkey := reflect.Indirect(reflect.New(kt))
mval := reflect.Indirect(reflect.New(et))
if err := c.fillValue(mkey, kv[0]); err != nil {
if err := fillValue(mkey, kv[0]); err != nil {
return err
}
if err := c.fillValue(mval, kv[1]); err != nil {
if err := fillValue(mval, kv[1]); err != nil {
return err
}
value.SetMapIndex(mkey, mval)
@ -93,7 +98,7 @@ func (c *defaultConfig) fillValue(value reflect.Value, val string) error {
value.Set(reflect.MakeSlice(reflect.SliceOf(value.Type().Elem()), len(nvals), len(nvals)))
for idx, nval := range nvals {
nvalue := reflect.Indirect(reflect.New(value.Type().Elem()))
if err := c.fillValue(nvalue, nval); err != nil {
if err := fillValue(nvalue, nval); err != nil {
return err
}
value.Index(idx).Set(nvalue)
@ -182,7 +187,7 @@ func (c *defaultConfig) fillValue(value reflect.Value, val string) error {
return nil
}
func (c *defaultConfig) fillValues(valueOf reflect.Value) error {
func fillValues(valueOf reflect.Value, tname string) error {
var values reflect.Value
if valueOf.Kind() == reflect.Ptr {
@ -209,7 +214,7 @@ func (c *defaultConfig) fillValues(valueOf reflect.Value) error {
switch value.Kind() {
case reflect.Struct:
value.Set(reflect.Indirect(reflect.New(value.Type())))
if err := c.fillValues(value); err != nil {
if err := fillValues(value, tname); err != nil {
return err
}
continue
@ -223,17 +228,17 @@ func (c *defaultConfig) fillValues(valueOf reflect.Value) error {
value.Set(reflect.New(value.Type().Elem()))
}
value = value.Elem()
if err := c.fillValues(value); err != nil {
if err := fillValues(value, tname); err != nil {
return err
}
continue
}
tag, ok := field.Tag.Lookup(c.opts.StructTag)
tag, ok := field.Tag.Lookup(tname)
if !ok {
continue
}
if err := c.fillValue(value, tag); err != nil {
if err := fillValue(value, tag); err != nil {
return err
}
}
@ -265,6 +270,20 @@ func (c *defaultConfig) Name() string {
return c.opts.Name
}
func (c *defaultConfig) Watch(ctx context.Context, opts ...WatchOption) (Watcher, error) {
w := &defaultWatcher{
opts: c.opts,
wopts: NewWatchOptions(opts...),
done: make(chan bool),
vchan: make(chan map[string]interface{}),
echan: make(chan error),
}
go w.run()
return w, nil
}
// NewConfig returns new default config source
func NewConfig(opts ...Option) Config {
options := NewOptions(opts...)
@ -273,3 +292,73 @@ func NewConfig(opts ...Option) Config {
}
return &defaultConfig{opts: options}
}
type defaultWatcher struct {
opts Options
wopts WatchOptions
done chan bool
ticker *time.Ticker
vchan chan map[string]interface{}
echan chan error
}
func (w *defaultWatcher) run() {
ticker := time.NewTicker(w.wopts.Interval)
defer ticker.Stop()
src := w.opts.Struct
if w.wopts.Struct != nil {
src = w.wopts.Struct
}
for {
select {
case <-w.done:
return
case <-ticker.C:
dst, err := rutil.Zero(src)
if err == nil {
err = fillValues(reflect.ValueOf(dst), w.opts.StructTag)
}
if err != nil {
w.echan <- err
return
}
srcmp, err := rutil.StructFieldsMap(src)
if err != nil {
w.echan <- err
return
}
dstmp, err := rutil.StructFieldsMap(dst)
if err != nil {
w.echan <- err
return
}
for sk, sv := range srcmp {
if reflect.DeepEqual(dstmp[sk], sv) {
delete(dstmp, sk)
}
}
w.vchan <- dstmp
src = dst
}
}
}
func (w *defaultWatcher) Next() (map[string]interface{}, error) {
select {
case <-w.done:
break
case v, ok := <-w.vchan:
if !ok {
break
}
return v, nil
}
return nil, ErrWatcherStopped
}
func (w *defaultWatcher) Stop() error {
close(w.done)
return nil
}

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"testing"
"time"
"github.com/unistack-org/micro/v3/config"
)
@ -17,6 +18,57 @@ type Cfg struct {
IntValue int `default:"99"`
}
func TestWatch(t *testing.T) {
ctx := context.Background()
conf := &Cfg{IntValue: 10}
cfg := config.NewConfig(config.Struct(conf))
if err := cfg.Init(); err != nil {
t.Fatal(err)
}
if err := cfg.Load(ctx); err != nil {
t.Fatal(err)
}
w, err := cfg.Watch(ctx, config.WatchInterval(500*time.Millisecond))
if err != nil {
t.Fatal(err)
}
defer func() {
_ = w.Stop()
}()
done := make(chan struct{})
go func() {
for {
mp, err := w.Next()
if err != nil && err != config.ErrWatcherStopped {
t.Fatal(err)
} else if err == config.ErrWatcherStopped {
return
}
if len(mp) != 1 {
t.Fatal(fmt.Errorf("default watcher err: %v", mp))
}
v, ok := mp["IntValue"]
if !ok {
t.Fatal(fmt.Errorf("default watcher err: %v", v))
}
if nv, ok := v.(int); !ok || nv != 99 {
t.Fatal(fmt.Errorf("default watcher err: %v", v))
}
close(done)
return
}
}()
<-done
}
func TestDefault(t *testing.T) {
ctx := context.Background()
conf := &Cfg{IntValue: 10}
@ -47,6 +99,6 @@ func TestDefault(t *testing.T) {
if conf.StringValue != "after_load" {
t.Fatal("AfterLoad option not working")
}
t.Logf("%#+v\n", conf)
_ = conf
//t.Logf("%#+v\n", conf)
}

View File

@ -2,6 +2,7 @@ package config
import (
"context"
"time"
"github.com/unistack-org/micro/v3/codec"
"github.com/unistack-org/micro/v3/logger"
@ -62,6 +63,7 @@ type LoadOption func(o *LoadOptions)
// LoadOptions struct
type LoadOptions struct {
Struct interface{}
Override bool
Append bool
}
@ -88,13 +90,29 @@ func LoadAppend(b bool) LoadOption {
}
}
// LoadStruct override struct for loading
func LoadStruct(src interface{}) LoadOption {
return func(o *LoadOptions) {
o.Struct = src
}
}
// SaveOption function signature
type SaveOption func(o *SaveOptions)
// SaveOptions struct
type SaveOptions struct {
Struct interface{}
}
// SaveStruct override struct for save to config
func SaveStruct(src interface{}) SaveOption {
return func(o *SaveOptions) {
o.Struct = src
}
}
// NewSaveOptions fill SaveOptions struct
func NewSaveOptions(opts ...SaveOption) SaveOptions {
options := SaveOptions{}
for _, o := range opts {
@ -186,3 +204,56 @@ func Name(n string) Option {
o.Name = n
}
}
// WatchOptions struuct
type WatchOptions struct {
// Context used by non default options
Context context.Context
// Coalesce multiple events to one
Coalesce bool
// Interval to periodically pull changes if config source not supports async notify
Interval time.Duration
// Struct for filling
Struct interface{}
}
type WatchOption func(*WatchOptions)
func NewWatchOptions(opts ...WatchOption) WatchOptions {
options := WatchOptions{
Context: context.Background(),
Interval: DefaultWatcherInterval,
}
for _, o := range opts {
o(&options)
}
return options
}
// WatchContext pass context
func WatchContext(ctx context.Context) WatchOption {
return func(o *WatchOptions) {
o.Context = ctx
}
}
// WatchCoalesce controls watch event combining
func WatchCoalesce(b bool) WatchOption {
return func(o *WatchOptions) {
o.Coalesce = b
}
}
// WatchInterval specifies time.Duration for pulling changes
func WatchInterval(td time.Duration) WatchOption {
return func(o *WatchOptions) {
o.Interval = td
}
}
// WatchStruct overrides struct for fill
func WatchStruct(src interface{}) WatchOption {
return func(o *WatchOptions) {
o.Struct = src
}
}