micro/config/default.go

295 lines
4.3 KiB
Go

package config
import (
"bytes"
"fmt"
"sync"
"time"
"github.com/unistack-org/micro/v3/config/loader"
"github.com/unistack-org/micro/v3/config/reader"
"github.com/unistack-org/micro/v3/config/source"
)
type config struct {
exit chan bool
opts Options
sync.RWMutex
// the current snapshot
snap *loader.Snapshot
// the current values
vals reader.Values
}
type watcher struct {
lw loader.Watcher
rd reader.Reader
path []string
value reader.Value
}
func newConfig(opts ...Option) (Config, error) {
var c config
if err := c.Init(opts...); err != nil {
return nil, err
}
go c.run()
return &c, nil
}
func (c *config) Init(opts ...Option) error {
c.opts = Options{}
c.exit = make(chan bool)
for _, o := range opts {
o(&c.opts)
}
err := c.opts.Loader.Load(c.opts.Source...)
if err != nil {
return err
}
c.snap, err = c.opts.Loader.Snapshot()
if err != nil {
return err
}
c.vals, err = c.opts.Reader.Values(c.snap.ChangeSet)
if err != nil {
return err
}
return nil
}
func (c *config) Options() Options {
return c.opts
}
func (c *config) run() {
watch := func(w loader.Watcher) error {
for {
// get changeset
snap, err := w.Next()
if err != nil {
return err
}
c.Lock()
if c.snap != nil && c.snap.Version >= snap.Version {
c.Unlock()
continue
}
// save
c.snap = snap
// set values
c.vals, _ = c.opts.Reader.Values(snap.ChangeSet)
c.Unlock()
}
}
for {
w, err := c.opts.Loader.Watch()
if err != nil {
time.Sleep(time.Second)
continue
}
done := make(chan bool)
// the stop watch func
go func() {
select {
case <-done:
case <-c.exit:
}
w.Stop()
}()
// block watch
if err := watch(w); err != nil {
// do something better
time.Sleep(time.Second)
}
// close done chan
close(done)
// if the config is closed exit
select {
case <-c.exit:
return
default:
}
}
}
func (c *config) Map() map[string]interface{} {
c.RLock()
defer c.RUnlock()
return c.vals.Map()
}
func (c *config) Scan(v interface{}) error {
c.RLock()
defer c.RUnlock()
return c.vals.Scan(v)
}
// sync loads all the sources, calls the parser and updates the config
func (c *config) Sync() error {
if err := c.opts.Loader.Sync(); err != nil {
return err
}
snap, err := c.opts.Loader.Snapshot()
if err != nil {
return err
}
c.Lock()
defer c.Unlock()
c.snap = snap
vals, err := c.opts.Reader.Values(snap.ChangeSet)
if err != nil {
return err
}
c.vals = vals
return nil
}
func (c *config) Close() error {
select {
case <-c.exit:
return nil
default:
close(c.exit)
}
return nil
}
func (c *config) Get(path ...string) (reader.Value, error) {
c.RLock()
defer c.RUnlock()
// did sync actually work?
if c.vals != nil {
return c.vals.Get(path...)
}
// no value
return nil, fmt.Errorf("no value")
}
func (c *config) Set(val interface{}, path ...string) error {
c.Lock()
defer c.Unlock()
if c.vals != nil {
c.vals.Set(val, path...)
}
return nil
}
func (c *config) Del(path ...string) error {
c.Lock()
defer c.Unlock()
if c.vals != nil {
c.vals.Del(path...)
}
return nil
}
func (c *config) Bytes() []byte {
c.RLock()
defer c.RUnlock()
if c.vals == nil {
return []byte{}
}
return c.vals.Bytes()
}
func (c *config) Load(sources ...source.Source) error {
if err := c.opts.Loader.Load(sources...); err != nil {
return err
}
snap, err := c.opts.Loader.Snapshot()
if err != nil {
return err
}
c.Lock()
defer c.Unlock()
c.snap = snap
vals, err := c.opts.Reader.Values(snap.ChangeSet)
if err != nil {
return err
}
c.vals = vals
return nil
}
func (c *config) Watch(path ...string) (Watcher, error) {
value, err := c.Get(path...)
if err != nil {
return nil, err
}
w, err := c.opts.Loader.Watch(path...)
if err != nil {
return nil, err
}
return &watcher{
lw: w,
rd: c.opts.Reader,
path: path,
value: value,
}, nil
}
func (c *config) String() string {
return "config"
}
func (w *watcher) Next() (reader.Value, error) {
for {
s, err := w.lw.Next()
if err != nil {
return nil, err
}
// only process changes
if bytes.Equal(w.value.Bytes(), s.ChangeSet.Data) {
continue
}
v, err := w.rd.Values(s.ChangeSet)
if err != nil {
return nil, err
}
return v.Get()
}
}
func (w *watcher) Stop() error {
return w.lw.Stop()
}