diff --git a/config/README.md b/config/README.md new file mode 100644 index 00000000..f1265f51 --- /dev/null +++ b/config/README.md @@ -0,0 +1,29 @@ +# Config [![GoDoc](https://godoc.org/github.com/micro/go-micro/config?status.svg)](https://godoc.org/github.com/micro/go-micro/config) + +Go Config is a pluggable dynamic config library. + +Most config in applications are statically configured or include complex logic to load from multiple sources. +Go Config makes this easy, pluggable and mergeable. You'll never have to deal with config in the same way again. + +## Features + +- **Dynamic Loading** - Load configuration from multiple source as and when needed. Go Config manages watching config sources +in the background and automatically merges and updates an in memory view. + +- **Pluggable Sources** - Choose from any number of sources to load and merge config. The backend source is abstracted away into +a standard format consumed internally and decoded via encoders. Sources can be env vars, flags, file, etcd, k8s configmap, etc. + +- **Mergeable Config** - If you specify multiple sources of config, regardless of format, they will be merged and presented in +a single view. This massively simplifies priority order loading and changes based on environment. + +- **Observe Changes** - Optionally watch the config for changes to specific values. Hot reload your app using Go Config's watcher. +You don't have to handle ad-hoc hup reloading or whatever else, just keep reading the config and watch for changes if you need +to be notified. + +- **Sane Defaults** - In case config loads badly or is completely wiped away for some unknown reason, you can specify fallback +values when accessing any config values directly. This ensures you'll always be reading some sane default in the event of a problem. + +## Getting Started + +For detailed information or architecture, installation and general usage see the [docs](https://micro.mu/docs/go-config.html) + diff --git a/config/config.go b/config/config.go new file mode 100644 index 00000000..020c71d5 --- /dev/null +++ b/config/config.go @@ -0,0 +1,94 @@ +// Package config is an interface for dynamic configuration. +package config + +import ( + "context" + + "github.com/micro/go-micro/config/loader" + "github.com/micro/go-micro/config/reader" + "github.com/micro/go-micro/config/source" + "github.com/micro/go-micro/config/source/file" +) + +// Config is an interface abstraction for dynamic configuration +type Config interface { + // provide the reader.Values interface + reader.Values + // Stop the config loader/watcher + Close() error + // Load config sources + Load(source ...source.Source) error + // Force a source changeset sync + Sync() error + // Watch a value for changes + Watch(path ...string) (Watcher, error) +} + +// Watcher is the config watcher +type Watcher interface { + Next() (reader.Value, error) + Stop() error +} + +type Options struct { + Loader loader.Loader + Reader reader.Reader + Source []source.Source + + // for alternative data + Context context.Context +} + +type Option func(o *Options) + +var ( + // Default Config Manager + DefaultConfig = NewConfig() +) + +// NewConfig returns new config +func NewConfig(opts ...Option) Config { + return newConfig(opts...) +} + +// Return config as raw json +func Bytes() []byte { + return DefaultConfig.Bytes() +} + +// Return config as a map +func Map() map[string]interface{} { + return DefaultConfig.Map() +} + +// Scan values to a go type +func Scan(v interface{}) error { + return DefaultConfig.Scan(v) +} + +// Force a source changeset sync +func Sync() error { + return DefaultConfig.Sync() +} + +// Get a value from the config +func Get(path ...string) reader.Value { + return DefaultConfig.Get(path...) +} + +// Load config sources +func Load(source ...source.Source) error { + return DefaultConfig.Load(source...) +} + +// Watch a value for changes +func Watch(path ...string) (Watcher, error) { + return DefaultConfig.Watch(path...) +} + +// LoadFile is short hand for creating a file source and loading it +func LoadFile(path string) error { + return Load(file.NewSource( + file.WithPath(path), + )) +} diff --git a/config/default.go b/config/default.go new file mode 100644 index 00000000..aa9bf365 --- /dev/null +++ b/config/default.go @@ -0,0 +1,253 @@ +package config + +import ( + "bytes" + "sync" + "time" + + "github.com/micro/go-micro/config/loader" + "github.com/micro/go-micro/config/loader/memory" + "github.com/micro/go-micro/config/reader" + "github.com/micro/go-micro/config/reader/json" + "github.com/micro/go-micro/config/source" +) + +type config struct { + exit chan bool + opts Options + + sync.RWMutex + // the current snapshot + snap *loader.Snapshot + // the current values + vals reader.Values +} + +type watcher struct { + lw loader.Watcher + rd reader.Reader + path []string + value reader.Value +} + +func newConfig(opts ...Option) Config { + options := Options{ + Loader: memory.NewLoader(), + Reader: json.NewReader(), + } + + for _, o := range opts { + o(&options) + } + + options.Loader.Load(options.Source...) + snap, _ := options.Loader.Snapshot() + vals, _ := options.Reader.Values(snap.ChangeSet) + + c := &config{ + exit: make(chan bool), + opts: options, + snap: snap, + vals: vals, + } + + go c.run() + + return c +} + +func (c *config) run() { + watch := func(w loader.Watcher) error { + for { + // get changeset + snap, err := w.Next() + if err != nil { + return err + } + + c.Lock() + + // save + c.snap = snap + + // set values + c.vals, _ = c.opts.Reader.Values(snap.ChangeSet) + + c.Unlock() + } + } + + for { + w, err := c.opts.Loader.Watch() + if err != nil { + time.Sleep(time.Second) + continue + } + + done := make(chan bool) + + // the stop watch func + go func() { + select { + case <-done: + case <-c.exit: + } + w.Stop() + }() + + // block watch + if err := watch(w); err != nil { + // do something better + time.Sleep(time.Second) + } + + // close done chan + close(done) + + // if the config is closed exit + select { + case <-c.exit: + return + default: + } + } +} + +func (c *config) Map() map[string]interface{} { + c.RLock() + defer c.RUnlock() + return c.vals.Map() +} + +func (c *config) Scan(v interface{}) error { + c.RLock() + defer c.RUnlock() + return c.vals.Scan(v) +} + +// sync loads all the sources, calls the parser and updates the config +func (c *config) Sync() error { + if err := c.opts.Loader.Sync(); err != nil { + return err + } + + snap, err := c.opts.Loader.Snapshot() + if err != nil { + return err + } + + c.Lock() + defer c.Unlock() + + c.snap = snap + vals, err := c.opts.Reader.Values(snap.ChangeSet) + if err != nil { + return err + } + c.vals = vals + + return nil +} + +func (c *config) Close() error { + select { + case <-c.exit: + return nil + default: + close(c.exit) + } + return nil +} + +func (c *config) Get(path ...string) reader.Value { + c.RLock() + defer c.RUnlock() + + // did sync actually work? + if c.vals != nil { + return c.vals.Get(path...) + } + + // no value + return newValue() +} + +func (c *config) Bytes() []byte { + c.RLock() + defer c.RUnlock() + + if c.vals == nil { + return []byte{} + } + + return c.vals.Bytes() +} + +func (c *config) Load(sources ...source.Source) error { + if err := c.opts.Loader.Load(sources...); err != nil { + return err + } + + snap, err := c.opts.Loader.Snapshot() + if err != nil { + return err + } + + c.Lock() + defer c.Unlock() + + c.snap = snap + vals, err := c.opts.Reader.Values(snap.ChangeSet) + if err != nil { + return err + } + c.vals = vals + + return nil +} + +func (c *config) Watch(path ...string) (Watcher, error) { + value := c.Get(path...) + + w, err := c.opts.Loader.Watch(path...) + if err != nil { + return nil, err + } + + return &watcher{ + lw: w, + rd: c.opts.Reader, + path: path, + value: value, + }, nil +} + +func (c *config) String() string { + return "config" +} + +func (w *watcher) Next() (reader.Value, error) { + for { + s, err := w.lw.Next() + if err != nil { + return nil, err + } + + // only process changes + if bytes.Equal(w.value.Bytes(), s.ChangeSet.Data) { + continue + } + + v, err := w.rd.Values(s.ChangeSet) + if err != nil { + return nil, err + } + + w.value = v.Get() + return w.value, nil + } +} + +func (w *watcher) Stop() error { + return w.lw.Stop() +} diff --git a/config/default_test.go b/config/default_test.go new file mode 100644 index 00000000..9d1bd03e --- /dev/null +++ b/config/default_test.go @@ -0,0 +1,101 @@ +package config + +import ( + "fmt" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/micro/go-micro/config/source/file" +) + +func createFileForTest(t *testing.T) *os.File { + data := []byte(`{"foo": "bar"}`) + path := filepath.Join(os.TempDir(), fmt.Sprintf("file.%d", time.Now().UnixNano())) + fh, err := os.Create(path) + if err != nil { + t.Error(err) + } + _, err = fh.Write(data) + if err != nil { + t.Error(err) + } + + return fh +} + +func TestLoadWithGoodFile(t *testing.T) { + fh := createFileForTest(t) + path := fh.Name() + defer func() { + fh.Close() + os.Remove(path) + }() + + // Create new config + conf := NewConfig() + // Load file source + if err := conf.Load(file.NewSource( + file.WithPath(path), + )); err != nil { + t.Fatalf("Expected no error but got %v", err) + } +} + +func TestLoadWithInvalidFile(t *testing.T) { + fh := createFileForTest(t) + path := fh.Name() + defer func() { + fh.Close() + os.Remove(path) + }() + + // Create new config + conf := NewConfig() + // Load file source + err := conf.Load(file.NewSource( + file.WithPath(path), + file.WithPath("/i/do/not/exists.json"), + )) + + if err == nil { + t.Fatal("Expected error but none !") + } + if !strings.Contains(fmt.Sprintf("%v", err), "/i/do/not/exists.json") { + t.Fatalf("Expected error to contain the unexisting file but got %v", err) + } +} + +func TestConsul(t *testing.T) { + /*consulSource := consul.NewSource( + // optionally specify consul address; default to localhost:8500 + consul.WithAddress("131.150.38.111:8500"), + // optionally specify prefix; defaults to /micro/config + consul.WithPrefix("/project"), + // optionally strip the provided prefix from the keys, defaults to false + consul.StripPrefix(true), + consul.WithDatacenter("dc1"), + consul.WithToken("xxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"), + ) + + // Create new config + conf := NewConfig() + + // Load file source + err := conf.Load(consulSource) + if err != nil { + t.Error(err) + return + } + + m := conf.Map() + t.Log("m: ", m) + + v := conf.Get("project", "dc111", "port") + + t.Log("v: ", v.Int(13))*/ + + t.Log("OK") +} diff --git a/config/encoder/encoder.go b/config/encoder/encoder.go new file mode 100644 index 00000000..0ef0654a --- /dev/null +++ b/config/encoder/encoder.go @@ -0,0 +1,8 @@ +// Package encoder handles source encoding formats +package encoder + +type Encoder interface { + Encode(interface{}) ([]byte, error) + Decode([]byte, interface{}) error + String() string +} diff --git a/config/encoder/hcl/hcl.go b/config/encoder/hcl/hcl.go new file mode 100644 index 00000000..7fa02b2c --- /dev/null +++ b/config/encoder/hcl/hcl.go @@ -0,0 +1,26 @@ +package hcl + +import ( + "encoding/json" + + "github.com/hashicorp/hcl" + "github.com/micro/go-micro/config/encoder" +) + +type hclEncoder struct{} + +func (h hclEncoder) Encode(v interface{}) ([]byte, error) { + return json.Marshal(v) +} + +func (h hclEncoder) Decode(d []byte, v interface{}) error { + return hcl.Unmarshal(d, v) +} + +func (h hclEncoder) String() string { + return "hcl" +} + +func NewEncoder() encoder.Encoder { + return hclEncoder{} +} diff --git a/config/encoder/json/json.go b/config/encoder/json/json.go new file mode 100644 index 00000000..a5b3557b --- /dev/null +++ b/config/encoder/json/json.go @@ -0,0 +1,25 @@ +package json + +import ( + "encoding/json" + + "github.com/micro/go-micro/config/encoder" +) + +type jsonEncoder struct{} + +func (j jsonEncoder) Encode(v interface{}) ([]byte, error) { + return json.Marshal(v) +} + +func (j jsonEncoder) Decode(d []byte, v interface{}) error { + return json.Unmarshal(d, v) +} + +func (j jsonEncoder) String() string { + return "json" +} + +func NewEncoder() encoder.Encoder { + return jsonEncoder{} +} diff --git a/config/encoder/toml/toml.go b/config/encoder/toml/toml.go new file mode 100644 index 00000000..f9688966 --- /dev/null +++ b/config/encoder/toml/toml.go @@ -0,0 +1,32 @@ +package toml + +import ( + "bytes" + + "github.com/BurntSushi/toml" + "github.com/micro/go-micro/config/encoder" +) + +type tomlEncoder struct{} + +func (t tomlEncoder) Encode(v interface{}) ([]byte, error) { + b := bytes.NewBuffer(nil) + defer b.Reset() + err := toml.NewEncoder(b).Encode(v) + if err != nil { + return nil, err + } + return b.Bytes(), nil +} + +func (t tomlEncoder) Decode(d []byte, v interface{}) error { + return toml.Unmarshal(d, v) +} + +func (t tomlEncoder) String() string { + return "toml" +} + +func NewEncoder() encoder.Encoder { + return tomlEncoder{} +} diff --git a/config/encoder/xml/xml.go b/config/encoder/xml/xml.go new file mode 100644 index 00000000..e970d8f1 --- /dev/null +++ b/config/encoder/xml/xml.go @@ -0,0 +1,25 @@ +package xml + +import ( + "encoding/xml" + + "github.com/micro/go-micro/config/encoder" +) + +type xmlEncoder struct{} + +func (x xmlEncoder) Encode(v interface{}) ([]byte, error) { + return xml.Marshal(v) +} + +func (x xmlEncoder) Decode(d []byte, v interface{}) error { + return xml.Unmarshal(d, v) +} + +func (x xmlEncoder) String() string { + return "xml" +} + +func NewEncoder() encoder.Encoder { + return xmlEncoder{} +} diff --git a/config/encoder/yaml/yaml.go b/config/encoder/yaml/yaml.go new file mode 100644 index 00000000..efecbf0c --- /dev/null +++ b/config/encoder/yaml/yaml.go @@ -0,0 +1,24 @@ +package yaml + +import ( + "github.com/ghodss/yaml" + "github.com/micro/go-micro/config/encoder" +) + +type yamlEncoder struct{} + +func (y yamlEncoder) Encode(v interface{}) ([]byte, error) { + return yaml.Marshal(v) +} + +func (y yamlEncoder) Decode(d []byte, v interface{}) error { + return yaml.Unmarshal(d, v) +} + +func (y yamlEncoder) String() string { + return "yaml" +} + +func NewEncoder() encoder.Encoder { + return yamlEncoder{} +} diff --git a/config/issue18_test.go b/config/issue18_test.go new file mode 100644 index 00000000..5fed22bf --- /dev/null +++ b/config/issue18_test.go @@ -0,0 +1,60 @@ +package config + +import ( + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "github.com/micro/go-micro/config/source/env" + "github.com/micro/go-micro/config/source/file" +) + +func createFileForIssue18(t *testing.T, content string) *os.File { + data := []byte(content) + path := filepath.Join(os.TempDir(), fmt.Sprintf("file.%d", time.Now().UnixNano())) + fh, err := os.Create(path) + if err != nil { + t.Error(err) + } + _, err = fh.Write(data) + if err != nil { + t.Error(err) + } + + return fh +} + +func TestIssue18(t *testing.T) { + fh := createFileForIssue18(t, `{ + "amqp": { + "host": "rabbit.platform", + "port": 80 + }, + "handler": { + "exchange": "springCloudBus" + } +}`) + path := fh.Name() + defer func() { + fh.Close() + os.Remove(path) + }() + os.Setenv("AMQP_HOST", "rabbit.testing.com") + + conf := NewConfig() + conf.Load( + file.NewSource( + file.WithPath(path), + ), + env.NewSource(), + ) + + actualHost := conf.Get("amqp", "host").String("backup") + if actualHost != "rabbit.testing.com" { + t.Fatalf("Expected %v but got %v", + "rabbit.testing.com", + actualHost) + } +} diff --git a/config/loader/loader.go b/config/loader/loader.go new file mode 100644 index 00000000..93029243 --- /dev/null +++ b/config/loader/loader.go @@ -0,0 +1,63 @@ +// package loader manages loading from multiple sources +package loader + +import ( + "context" + + "github.com/micro/go-micro/config/reader" + "github.com/micro/go-micro/config/source" +) + +// Loader manages loading sources +type Loader interface { + // Stop the loader + Close() error + // Load the sources + Load(...source.Source) error + // A Snapshot of loaded config + Snapshot() (*Snapshot, error) + // Force sync of sources + Sync() error + // Watch for changes + Watch(...string) (Watcher, error) + // Name of loader + String() string +} + +// Watcher lets you watch sources and returns a merged ChangeSet +type Watcher interface { + // First call to next may return the current Snapshot + // If you are watching a path then only the data from + // that path is returned. + Next() (*Snapshot, error) + // Stop watching for changes + Stop() error +} + +// Snapshot is a merged ChangeSet +type Snapshot struct { + // The merged ChangeSet + ChangeSet *source.ChangeSet + // Deterministic and comparable version of the snapshot + Version string +} + +type Options struct { + Reader reader.Reader + Source []source.Source + + // for alternative data + Context context.Context +} + +type Option func(o *Options) + +// Copy snapshot +func Copy(s *Snapshot) *Snapshot { + cs := *(s.ChangeSet) + + return &Snapshot{ + ChangeSet: &cs, + Version: s.Version, + } +} diff --git a/config/loader/memory/memory.go b/config/loader/memory/memory.go new file mode 100644 index 00000000..6d5b27cb --- /dev/null +++ b/config/loader/memory/memory.go @@ -0,0 +1,415 @@ +package memory + +import ( + "bytes" + "errors" + "fmt" + "strings" + "sync" + "time" + + "github.com/micro/go-micro/config/loader" + "github.com/micro/go-micro/config/reader" + "github.com/micro/go-micro/config/reader/json" + "github.com/micro/go-micro/config/source" +) + +type memory struct { + exit chan bool + opts loader.Options + + sync.RWMutex + // the current snapshot + snap *loader.Snapshot + // the current values + vals reader.Values + // all the sets + sets []*source.ChangeSet + // all the sources + sources []source.Source + + idx int + watchers map[int]*watcher +} + +type watcher struct { + exit chan bool + path []string + value reader.Value + reader reader.Reader + updates chan reader.Value +} + +func (m *memory) watch(idx int, s source.Source) { + m.Lock() + m.sets = append(m.sets, &source.ChangeSet{Source: s.String()}) + m.Unlock() + + // watches a source for changes + watch := func(idx int, s source.Watcher) error { + for { + // get changeset + cs, err := s.Next() + if err != nil { + return err + } + + m.Lock() + + // save + m.sets[idx] = cs + + // merge sets + set, err := m.opts.Reader.Merge(m.sets...) + if err != nil { + m.Unlock() + return err + } + + // set values + m.vals, _ = m.opts.Reader.Values(set) + m.snap = &loader.Snapshot{ + ChangeSet: set, + Version: fmt.Sprintf("%d", time.Now().Unix()), + } + m.Unlock() + + // send watch updates + m.update() + } + } + + for { + // watch the source + w, err := s.Watch() + if err != nil { + time.Sleep(time.Second) + continue + } + + done := make(chan bool) + + // the stop watch func + go func() { + select { + case <-done: + case <-m.exit: + } + w.Stop() + }() + + // block watch + if err := watch(idx, w); err != nil { + // do something better + time.Sleep(time.Second) + } + + // close done chan + close(done) + + // if the config is closed exit + select { + case <-m.exit: + return + default: + } + } +} + +func (m *memory) loaded() bool { + var loaded bool + m.RLock() + if m.vals != nil { + loaded = true + } + m.RUnlock() + return loaded +} + +// reload reads the sets and creates new values +func (m *memory) reload() error { + m.Lock() + + // merge sets + set, err := m.opts.Reader.Merge(m.sets...) + if err != nil { + m.Unlock() + return err + } + + // set values + m.vals, _ = m.opts.Reader.Values(set) + m.snap = &loader.Snapshot{ + ChangeSet: set, + Version: fmt.Sprintf("%d", time.Now().Unix()), + } + + m.Unlock() + + // update watchers + m.update() + + return nil +} + +func (m *memory) update() { + var watchers []*watcher + + m.RLock() + for _, w := range m.watchers { + watchers = append(watchers, w) + } + m.RUnlock() + + for _, w := range watchers { + select { + case w.updates <- m.vals.Get(w.path...): + default: + } + } +} + +// Snapshot returns a snapshot of the current loaded config +func (m *memory) Snapshot() (*loader.Snapshot, error) { + if m.loaded() { + m.RLock() + snap := loader.Copy(m.snap) + m.RUnlock() + return snap, nil + } + + // not loaded, sync + if err := m.Sync(); err != nil { + return nil, err + } + + // make copy + m.RLock() + snap := loader.Copy(m.snap) + m.RUnlock() + + return snap, nil +} + +// Sync loads all the sources, calls the parser and updates the config +func (m *memory) Sync() error { + var sets []*source.ChangeSet + + m.Lock() + + // read the source + var gerr []string + + for _, source := range m.sources { + ch, err := source.Read() + if err != nil { + gerr = append(gerr, err.Error()) + continue + } + sets = append(sets, ch) + } + + // merge sets + set, err := m.opts.Reader.Merge(sets...) + if err != nil { + m.Unlock() + return err + } + + // set values + vals, err := m.opts.Reader.Values(set) + if err != nil { + m.Unlock() + return err + } + m.vals = vals + m.snap = &loader.Snapshot{ + ChangeSet: set, + Version: fmt.Sprintf("%d", time.Now().Unix()), + } + + m.Unlock() + + // update watchers + m.update() + + if len(gerr) > 0 { + return fmt.Errorf("source loading errors: %s", strings.Join(gerr, "\n")) + } + + return nil +} + +func (m *memory) Close() error { + select { + case <-m.exit: + return nil + default: + close(m.exit) + } + return nil +} + +func (m *memory) Get(path ...string) (reader.Value, error) { + if !m.loaded() { + if err := m.Sync(); err != nil { + return nil, err + } + } + + m.Lock() + defer m.Unlock() + + // did sync actually work? + if m.vals != nil { + return m.vals.Get(path...), nil + } + + // assuming vals is nil + // create new vals + + ch := m.snap.ChangeSet + + // we are truly screwed, trying to load in a hacked way + v, err := m.opts.Reader.Values(ch) + if err != nil { + return nil, err + } + + // lets set it just because + m.vals = v + + if m.vals != nil { + return m.vals.Get(path...), nil + } + + // ok we're going hardcore now + return nil, errors.New("no values") +} + +func (m *memory) Load(sources ...source.Source) error { + var gerrors []string + + for _, source := range sources { + set, err := source.Read() + if err != nil { + gerrors = append(gerrors, + fmt.Sprintf("error loading source %s: %v", + source, + err)) + // continue processing + continue + } + m.Lock() + m.sources = append(m.sources, source) + m.sets = append(m.sets, set) + idx := len(m.sets) - 1 + m.Unlock() + go m.watch(idx, source) + } + + if err := m.reload(); err != nil { + gerrors = append(gerrors, err.Error()) + } + + // Return errors + if len(gerrors) != 0 { + return errors.New(strings.Join(gerrors, "\n")) + } + return nil +} + +func (m *memory) Watch(path ...string) (loader.Watcher, error) { + value, err := m.Get(path...) + if err != nil { + return nil, err + } + + m.Lock() + + w := &watcher{ + exit: make(chan bool), + path: path, + value: value, + reader: m.opts.Reader, + updates: make(chan reader.Value, 1), + } + + id := m.idx + m.watchers[id] = w + m.idx++ + + m.Unlock() + + go func() { + <-w.exit + m.Lock() + delete(m.watchers, id) + m.Unlock() + }() + + return w, nil +} + +func (m *memory) String() string { + return "memory" +} + +func (w *watcher) Next() (*loader.Snapshot, error) { + for { + select { + case <-w.exit: + return nil, errors.New("watcher stopped") + case v := <-w.updates: + if bytes.Equal(w.value.Bytes(), v.Bytes()) { + continue + } + w.value = v + + cs := &source.ChangeSet{ + Data: v.Bytes(), + Format: w.reader.String(), + Source: "memory", + Timestamp: time.Now(), + } + cs.Sum() + + return &loader.Snapshot{ + ChangeSet: cs, + Version: fmt.Sprintf("%d", time.Now().Unix()), + }, nil + } + } +} + +func (w *watcher) Stop() error { + select { + case <-w.exit: + default: + close(w.exit) + } + return nil +} + +func NewLoader(opts ...loader.Option) loader.Loader { + options := loader.Options{ + Reader: json.NewReader(), + } + + for _, o := range opts { + o(&options) + } + + m := &memory{ + exit: make(chan bool), + opts: options, + watchers: make(map[int]*watcher), + sources: options.Source, + } + + for i, s := range options.Source { + go m.watch(i, s) + } + + return m +} diff --git a/config/loader/memory/options.go b/config/loader/memory/options.go new file mode 100644 index 00000000..5d778d66 --- /dev/null +++ b/config/loader/memory/options.go @@ -0,0 +1,21 @@ +package memory + +import ( + "github.com/micro/go-micro/config/loader" + "github.com/micro/go-micro/config/reader" + "github.com/micro/go-micro/config/source" +) + +// WithSource appends a source to list of sources +func WithSource(s source.Source) loader.Option { + return func(o *loader.Options) { + o.Source = append(o.Source, s) + } +} + +// WithReader sets the config reader +func WithReader(r reader.Reader) loader.Option { + return func(o *loader.Options) { + o.Reader = r + } +} diff --git a/config/options.go b/config/options.go new file mode 100644 index 00000000..a21cf945 --- /dev/null +++ b/config/options.go @@ -0,0 +1,28 @@ +package config + +import ( + "github.com/micro/go-micro/config/loader" + "github.com/micro/go-micro/config/reader" + "github.com/micro/go-micro/config/source" +) + +// WithLoader sets the loader for manager config +func WithLoader(l loader.Loader) Option { + return func(o *Options) { + o.Loader = l + } +} + +// WithSource appends a source to list of sources +func WithSource(s source.Source) Option { + return func(o *Options) { + o.Source = append(o.Source, s) + } +} + +// WithReader sets the config reader +func WithReader(r reader.Reader) Option { + return func(o *Options) { + o.Reader = r + } +} diff --git a/config/reader/json/json.go b/config/reader/json/json.go new file mode 100644 index 00000000..7f3058f8 --- /dev/null +++ b/config/reader/json/json.go @@ -0,0 +1,83 @@ +package json + +import ( + "errors" + "time" + + "github.com/imdario/mergo" + "github.com/micro/go-micro/config/encoder" + "github.com/micro/go-micro/config/encoder/json" + "github.com/micro/go-micro/config/reader" + "github.com/micro/go-micro/config/source" +) + +type jsonReader struct { + opts reader.Options + json encoder.Encoder +} + +func (j *jsonReader) Merge(changes ...*source.ChangeSet) (*source.ChangeSet, error) { + var merged map[string]interface{} + + for _, m := range changes { + if m == nil { + continue + } + + if len(m.Data) == 0 { + continue + } + + codec, ok := j.opts.Encoding[m.Format] + if !ok { + // fallback + codec = j.json + } + + var data map[string]interface{} + if err := codec.Decode(m.Data, &data); err != nil { + return nil, err + } + if err := mergo.Map(&merged, data, mergo.WithOverride); err != nil { + return nil, err + } + } + + b, err := j.json.Encode(merged) + if err != nil { + return nil, err + } + + cs := &source.ChangeSet{ + Timestamp: time.Now(), + Data: b, + Source: "json", + Format: j.json.String(), + } + cs.Checksum = cs.Sum() + + return cs, nil +} + +func (j *jsonReader) Values(ch *source.ChangeSet) (reader.Values, error) { + if ch == nil { + return nil, errors.New("changeset is nil") + } + if ch.Format != "json" { + return nil, errors.New("unsupported format") + } + return newValues(ch) +} + +func (j *jsonReader) String() string { + return "json" +} + +// NewReader creates a json reader +func NewReader(opts ...reader.Option) reader.Reader { + options := reader.NewOptions(opts...) + return &jsonReader{ + json: json.NewEncoder(), + opts: options, + } +} diff --git a/config/reader/json/json_test.go b/config/reader/json/json_test.go new file mode 100644 index 00000000..965e346d --- /dev/null +++ b/config/reader/json/json_test.go @@ -0,0 +1,43 @@ +package json + +import ( + "testing" + + "github.com/micro/go-micro/config/source" +) + +func TestReader(t *testing.T) { + data := []byte(`{"foo": "bar", "baz": {"bar": "cat"}}`) + + testData := []struct { + path []string + value string + }{ + { + []string{"foo"}, + "bar", + }, + { + []string{"baz", "bar"}, + "cat", + }, + } + + r := NewReader() + + c, err := r.Merge(&source.ChangeSet{Data: data}, &source.ChangeSet{}) + if err != nil { + t.Fatal(err) + } + + values, err := r.Values(c) + if err != nil { + t.Fatal(err) + } + + for _, test := range testData { + if v := values.Get(test.path...).String(""); v != test.value { + t.Fatalf("Expected %s got %s for path %v", test.value, v, test.path) + } + } +} diff --git a/config/reader/json/values.go b/config/reader/json/values.go new file mode 100644 index 00000000..e955e455 --- /dev/null +++ b/config/reader/json/values.go @@ -0,0 +1,208 @@ +package json + +import ( + "encoding/json" + "fmt" + "strconv" + "strings" + "time" + + simple "github.com/bitly/go-simplejson" + "github.com/micro/go-micro/config/reader" + "github.com/micro/go-micro/config/source" +) + +type jsonValues struct { + ch *source.ChangeSet + sj *simple.Json +} + +type jsonValue struct { + *simple.Json +} + +func newValues(ch *source.ChangeSet) (reader.Values, error) { + sj := simple.New() + data, _ := reader.ReplaceEnvVars(ch.Data) + if err := sj.UnmarshalJSON(data); err != nil { + sj.SetPath(nil, string(ch.Data)) + } + return &jsonValues{ch, sj}, nil +} + +func newValue(s *simple.Json) reader.Value { + if s == nil { + s = simple.New() + } + return &jsonValue{s} +} + +func (j *jsonValues) Get(path ...string) reader.Value { + return &jsonValue{j.sj.GetPath(path...)} +} + +func (j *jsonValues) Del(path ...string) { + // delete the tree? + if len(path) == 0 { + j.sj = simple.New() + return + } + + if len(path) == 1 { + j.sj.Del(path[0]) + return + } + + vals := j.sj.GetPath(path[:len(path)-1]...) + vals.Del(path[len(path)-1]) + j.sj.SetPath(path[:len(path)-1], vals.Interface()) + return +} + +func (j *jsonValues) Set(val interface{}, path ...string) { + j.sj.SetPath(path, val) +} + +func (j *jsonValues) Bytes() []byte { + b, _ := j.sj.MarshalJSON() + return b +} + +func (j *jsonValues) Map() map[string]interface{} { + m, _ := j.sj.Map() + return m +} + +func (j *jsonValues) Scan(v interface{}) error { + b, err := j.sj.MarshalJSON() + if err != nil { + return err + } + return json.Unmarshal(b, v) +} + +func (j *jsonValues) String() string { + return "json" +} + +func (j *jsonValue) Bool(def bool) bool { + b, err := j.Json.Bool() + if err == nil { + return b + } + + str, ok := j.Interface().(string) + if !ok { + return def + } + + b, err = strconv.ParseBool(str) + if err != nil { + return def + } + + return b +} + +func (j *jsonValue) Int(def int) int { + i, err := j.Json.Int() + if err == nil { + return i + } + + str, ok := j.Interface().(string) + if !ok { + return def + } + + i, err = strconv.Atoi(str) + if err != nil { + return def + } + + return i +} + +func (j *jsonValue) String(def string) string { + return j.Json.MustString(def) +} + +func (j *jsonValue) Float64(def float64) float64 { + f, err := j.Json.Float64() + if err == nil { + return f + } + + str, ok := j.Interface().(string) + if !ok { + return def + } + + f, err = strconv.ParseFloat(str, 64) + if err != nil { + return def + } + + return f +} + +func (j *jsonValue) Duration(def time.Duration) time.Duration { + v, err := j.Json.String() + if err != nil { + return def + } + + value, err := time.ParseDuration(v) + if err != nil { + return def + } + + return value +} + +func (j *jsonValue) StringSlice(def []string) []string { + v, err := j.Json.String() + if err == nil { + sl := strings.Split(v, ",") + if len(sl) > 1 { + return sl + } + } + return j.Json.MustStringArray(def) +} + +func (j *jsonValue) StringMap(def map[string]string) map[string]string { + m, err := j.Json.Map() + if err != nil { + return def + } + + res := map[string]string{} + + for k, v := range m { + res[k] = fmt.Sprintf("%v", v) + } + + return res +} + +func (j *jsonValue) Scan(v interface{}) error { + b, err := j.Json.MarshalJSON() + if err != nil { + return err + } + return json.Unmarshal(b, v) +} + +func (j *jsonValue) Bytes() []byte { + b, err := j.Json.Bytes() + if err != nil { + // try return marshalled + b, err = j.Json.MarshalJSON() + if err != nil { + return []byte{} + } + return b + } + return b +} diff --git a/config/reader/json/values_test.go b/config/reader/json/values_test.go new file mode 100644 index 00000000..516199a6 --- /dev/null +++ b/config/reader/json/values_test.go @@ -0,0 +1,39 @@ +package json + +import ( + "testing" + + "github.com/micro/go-micro/config/source" +) + +func TestValues(t *testing.T) { + data := []byte(`{"foo": "bar", "baz": {"bar": "cat"}}`) + + testData := []struct { + path []string + value string + }{ + { + []string{"foo"}, + "bar", + }, + { + []string{"baz", "bar"}, + "cat", + }, + } + + values, err := newValues(&source.ChangeSet{ + Data: data, + }) + + if err != nil { + t.Fatal(err) + } + + for _, test := range testData { + if v := values.Get(test.path...).String(""); v != test.value { + t.Fatalf("Expected %s got %s for path %v", test.value, v, test.path) + } + } +} diff --git a/config/reader/options.go b/config/reader/options.go new file mode 100644 index 00000000..71b525b9 --- /dev/null +++ b/config/reader/options.go @@ -0,0 +1,42 @@ +package reader + +import ( + "github.com/micro/go-micro/config/encoder" + "github.com/micro/go-micro/config/encoder/hcl" + "github.com/micro/go-micro/config/encoder/json" + "github.com/micro/go-micro/config/encoder/toml" + "github.com/micro/go-micro/config/encoder/xml" + "github.com/micro/go-micro/config/encoder/yaml" +) + +type Options struct { + Encoding map[string]encoder.Encoder +} + +type Option func(o *Options) + +func NewOptions(opts ...Option) Options { + options := Options{ + Encoding: map[string]encoder.Encoder{ + "json": json.NewEncoder(), + "yaml": yaml.NewEncoder(), + "toml": toml.NewEncoder(), + "xml": xml.NewEncoder(), + "hcl": hcl.NewEncoder(), + "yml": yaml.NewEncoder(), + }, + } + for _, o := range opts { + o(&options) + } + return options +} + +func WithEncoder(e encoder.Encoder) Option { + return func(o *Options) { + if o.Encoding == nil { + o.Encoding = make(map[string]encoder.Encoder) + } + o.Encoding[e.String()] = e + } +} diff --git a/config/reader/preprocessor.go b/config/reader/preprocessor.go new file mode 100644 index 00000000..2895be4f --- /dev/null +++ b/config/reader/preprocessor.go @@ -0,0 +1,23 @@ +package reader + +import ( + "os" + "regexp" +) + +func ReplaceEnvVars(raw []byte) ([]byte, error) { + re := regexp.MustCompile(`\$\{([A-Za-z0-9_]+)\}`) + if re.Match(raw) { + dataS := string(raw) + res := re.ReplaceAllStringFunc(dataS, replaceEnvVars) + return []byte(res), nil + } else { + return raw, nil + } +} + +func replaceEnvVars(element string) string { + v := element[2 : len(element)-1] + el := os.Getenv(v) + return el +} diff --git a/config/reader/preprocessor_test.go b/config/reader/preprocessor_test.go new file mode 100644 index 00000000..ba5485fe --- /dev/null +++ b/config/reader/preprocessor_test.go @@ -0,0 +1,73 @@ +package reader + +import ( + "os" + "strings" + "testing" +) + +func TestReplaceEnvVars(t *testing.T) { + os.Setenv("myBar", "cat") + os.Setenv("MYBAR", "cat") + os.Setenv("my_Bar", "cat") + os.Setenv("myBar_", "cat") + + testData := []struct { + expected string + data []byte + }{ + // Right use cases + { + `{"foo": "bar", "baz": {"bar": "cat"}}`, + []byte(`{"foo": "bar", "baz": {"bar": "${myBar}"}}`), + }, + { + `{"foo": "bar", "baz": {"bar": "cat"}}`, + []byte(`{"foo": "bar", "baz": {"bar": "${MYBAR}"}}`), + }, + { + `{"foo": "bar", "baz": {"bar": "cat"}}`, + []byte(`{"foo": "bar", "baz": {"bar": "${my_Bar}"}}`), + }, + { + `{"foo": "bar", "baz": {"bar": "cat"}}`, + []byte(`{"foo": "bar", "baz": {"bar": "${myBar_}"}}`), + }, + // Wrong use cases + { + `{"foo": "bar", "baz": {"bar": "${myBar-}"}}`, + []byte(`{"foo": "bar", "baz": {"bar": "${myBar-}"}}`), + }, + { + `{"foo": "bar", "baz": {"bar": "${}"}}`, + []byte(`{"foo": "bar", "baz": {"bar": "${}"}}`), + }, + { + `{"foo": "bar", "baz": {"bar": "$sss}"}}`, + []byte(`{"foo": "bar", "baz": {"bar": "$sss}"}}`), + }, + { + `{"foo": "bar", "baz": {"bar": "${sss"}}`, + []byte(`{"foo": "bar", "baz": {"bar": "${sss"}}`), + }, + { + `{"foo": "bar", "baz": {"bar": "{something}"}}`, + []byte(`{"foo": "bar", "baz": {"bar": "{something}"}}`), + }, + // Use cases without replace env vars + { + `{"foo": "bar", "baz": {"bar": "cat"}}`, + []byte(`{"foo": "bar", "baz": {"bar": "cat"}}`), + }, + } + + for _, test := range testData { + res, err := ReplaceEnvVars(test.data) + if err != nil { + t.Fatal(err) + } + if strings.Compare(test.expected, string(res)) != 0 { + t.Fatalf("Expected %s got %s", test.expected, res) + } + } +} diff --git a/config/reader/reader.go b/config/reader/reader.go new file mode 100644 index 00000000..eee333e3 --- /dev/null +++ b/config/reader/reader.go @@ -0,0 +1,36 @@ +// Package reader parses change sets and provides config values +package reader + +import ( + "time" + + "github.com/micro/go-micro/config/source" +) + +// Reader is an interface for merging changesets +type Reader interface { + Merge(...*source.ChangeSet) (*source.ChangeSet, error) + Values(*source.ChangeSet) (Values, error) + String() string +} + +// Values is returned by the reader +type Values interface { + Bytes() []byte + Get(path ...string) Value + Map() map[string]interface{} + Scan(v interface{}) error +} + +// Value represents a value of any type +type Value interface { + Bool(def bool) bool + Int(def int) int + String(def string) string + Float64(def float64) float64 + Duration(def time.Duration) time.Duration + StringSlice(def []string) []string + StringMap(def map[string]string) map[string]string + Scan(val interface{}) error + Bytes() []byte +} diff --git a/config/source/changeset.go b/config/source/changeset.go new file mode 100644 index 00000000..9958f61d --- /dev/null +++ b/config/source/changeset.go @@ -0,0 +1,13 @@ +package source + +import ( + "crypto/md5" + "fmt" +) + +// Sum returns the md5 checksum of the ChangeSet data +func (c *ChangeSet) Sum() string { + h := md5.New() + h.Write(c.Data) + return fmt.Sprintf("%x", h.Sum(nil)) +} diff --git a/config/source/cli/README.md b/config/source/cli/README.md new file mode 100644 index 00000000..0b482d58 --- /dev/null +++ b/config/source/cli/README.md @@ -0,0 +1,71 @@ +# cli Source + +The cli source reads config from parsed flags via a cli.Context. + +## Format + +We expect the use of the `micro/cli` package. Upper case flags will be lower cased. Dashes will be used as delimiters for nesting. + +### Example + +```go +micro.Flags( + cli.StringFlag{ + Name: "database-address", + Value: "127.0.0.1", + Usage: "the db address", + }, + cli.IntFlag{ + Name: "database-port", + Value: 3306, + Usage: "the db port", + }, +) +``` + +Becomes + +```json +{ + "database": { + "address": "127.0.0.1", + "port": 3306 + } +} +``` + +## New and Load Source + +Because a cli.Context is needed to retrieve the flags and their values, it is recommended to build your source from within a cli.Action. + +```go + +func main() { + // New Service + service := micro.NewService( + micro.Name("example"), + micro.Flags( + cli.StringFlag{ + Name: "database-address", + Value: "127.0.0.1", + Usage: "the db address", + }, + ), + ) + + var clisrc source.Source + + service.Init( + micro.Action(func(c *cli.Context) { + clisrc = cli.NewSource( + cli.Context(c), + ) + // Alternatively, just setup your config right here + }), + ) + + // ... Load and use that source ... + conf := config.NewConfig() + conf.Load(clisrc) +} +``` diff --git a/config/source/cli/cli.go b/config/source/cli/cli.go new file mode 100644 index 00000000..86b27f10 --- /dev/null +++ b/config/source/cli/cli.go @@ -0,0 +1,146 @@ +package cli + +import ( + "flag" + "io/ioutil" + "os" + "strings" + "time" + + "github.com/imdario/mergo" + "github.com/micro/cli" + "github.com/micro/go-micro/cmd" + "github.com/micro/go-micro/config/source" +) + +type cliSource struct { + opts source.Options + ctx *cli.Context +} + +func (c *cliSource) Read() (*source.ChangeSet, error) { + var changes map[string]interface{} + + for _, name := range c.ctx.GlobalFlagNames() { + tmp := toEntry(name, c.ctx.GlobalGeneric(name)) + mergo.Map(&changes, tmp) // need to sort error handling + } + + for _, name := range c.ctx.FlagNames() { + tmp := toEntry(name, c.ctx.Generic(name)) + mergo.Map(&changes, tmp) // need to sort error handling + } + + b, err := c.opts.Encoder.Encode(changes) + if err != nil { + return nil, err + } + + cs := &source.ChangeSet{ + Format: c.opts.Encoder.String(), + Data: b, + Timestamp: time.Now(), + Source: c.String(), + } + cs.Checksum = cs.Sum() + + return cs, nil +} + +func toEntry(name string, v interface{}) map[string]interface{} { + n := strings.ToLower(name) + keys := strings.FieldsFunc(n, split) + reverse(keys) + tmp := make(map[string]interface{}) + for i, k := range keys { + if i == 0 { + tmp[k] = v + continue + } + + tmp = map[string]interface{}{k: tmp} + } + return tmp +} + +func reverse(ss []string) { + for i := len(ss)/2 - 1; i >= 0; i-- { + opp := len(ss) - 1 - i + ss[i], ss[opp] = ss[opp], ss[i] + } +} + +func split(r rune) bool { + return r == '-' || r == '_' +} + +func (c *cliSource) Watch() (source.Watcher, error) { + return source.NewNoopWatcher() +} + +func (c *cliSource) String() string { + return "cli" +} + +// NewSource returns a config source for integrating parsed flags from a micro/cli.Context. +// Hyphens are delimiters for nesting, and all keys are lowercased. The assumption is that +// command line flags have already been parsed. +// +// Example: +// cli.StringFlag{Name: "db-host"}, +// +// +// { +// "database": { +// "host": "localhost" +// } +// } +func NewSource(opts ...source.Option) source.Source { + options := source.NewOptions(opts...) + + var ctx *cli.Context + + c, ok := options.Context.Value(contextKey{}).(*cli.Context) + if ok { + ctx = c + } + + // no context + if ctx == nil { + // get the default app/flags + app := cmd.App() + flags := app.Flags + + // create flagset + set := flag.NewFlagSet(app.Name, flag.ContinueOnError) + + // apply flags to set + for _, f := range flags { + f.Apply(set) + } + + // parse flags + set.SetOutput(ioutil.Discard) + set.Parse(os.Args[1:]) + + // normalise flags + normalizeFlags(app.Flags, set) + + // create context + ctx = cli.NewContext(app, set, nil) + } + + return &cliSource{ + ctx: ctx, + opts: options, + } +} + +// WithContext returns a new source with the context specified. +// The assumption is that Context is retrieved within an app.Action function. +func WithContext(ctx *cli.Context, opts ...source.Option) source.Source { + return &cliSource{ + ctx: ctx, + opts: source.NewOptions(opts...), + } +} diff --git a/config/source/cli/cli_test.go b/config/source/cli/cli_test.go new file mode 100644 index 00000000..ada1a0d6 --- /dev/null +++ b/config/source/cli/cli_test.go @@ -0,0 +1,65 @@ +package cli + +import ( + "encoding/json" + "os" + "testing" + + "github.com/micro/cli" + "github.com/micro/go-micro/cmd" + "github.com/micro/go-micro/config/source" +) + +func test(t *testing.T, withContext bool) { + var src source.Source + + // setup app + app := cmd.App() + app.Name = "testapp" + app.Flags = []cli.Flag{ + cli.StringFlag{Name: "db-host"}, + } + + // with context + if withContext { + // set action + app.Action = func(c *cli.Context) { + src = WithContext(c) + } + + // run app + app.Run([]string{"run", "-db-host", "localhost"}) + // no context + } else { + // set args + os.Args = []string{"run", "-db-host", "localhost"} + src = NewSource() + } + + // test config + c, err := src.Read() + if err != nil { + t.Error(err) + } + + var actual map[string]interface{} + if err := json.Unmarshal(c.Data, &actual); err != nil { + t.Error(err) + } + + actualDB := actual["db"].(map[string]interface{}) + if actualDB["host"] != "localhost" { + t.Errorf("expected localhost, got %v", actualDB["name"]) + } + +} + +func TestCliSource(t *testing.T) { + // without context + test(t, false) +} + +func TestCliSourceWithContext(t *testing.T) { + // with context + test(t, true) +} diff --git a/config/source/cli/options.go b/config/source/cli/options.go new file mode 100644 index 00000000..0ee463c7 --- /dev/null +++ b/config/source/cli/options.go @@ -0,0 +1,20 @@ +package cli + +import ( + "context" + + "github.com/micro/cli" + "github.com/micro/go-micro/config/source" +) + +type contextKey struct{} + +// Context sets the cli context +func Context(c *cli.Context) source.Option { + return func(o *source.Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, contextKey{}, c) + } +} diff --git a/config/source/cli/util.go b/config/source/cli/util.go new file mode 100644 index 00000000..c6274068 --- /dev/null +++ b/config/source/cli/util.go @@ -0,0 +1,50 @@ +package cli + +import ( + "errors" + "flag" + "strings" + + "github.com/micro/cli" +) + +func copyFlag(name string, ff *flag.Flag, set *flag.FlagSet) { + switch ff.Value.(type) { + case *cli.StringSlice: + default: + set.Set(name, ff.Value.String()) + } +} + +func normalizeFlags(flags []cli.Flag, set *flag.FlagSet) error { + visited := make(map[string]bool) + set.Visit(func(f *flag.Flag) { + visited[f.Name] = true + }) + for _, f := range flags { + parts := strings.Split(f.GetName(), ",") + if len(parts) == 1 { + continue + } + var ff *flag.Flag + for _, name := range parts { + name = strings.Trim(name, " ") + if visited[name] { + if ff != nil { + return errors.New("Cannot use two forms of the same flag: " + name + " " + ff.Name) + } + ff = set.Lookup(name) + } + } + if ff == nil { + continue + } + for _, name := range parts { + name = strings.Trim(name, " ") + if !visited[name] { + copyFlag(name, ff, set) + } + } + } + return nil +} diff --git a/config/source/consul/README.md b/config/source/consul/README.md new file mode 100644 index 00000000..2ba45e5b --- /dev/null +++ b/config/source/consul/README.md @@ -0,0 +1,49 @@ +# Consul Source + +The consul source reads config from consul key/values + +## Consul Format + +The consul source expects keys under the default prefix `/micro/config` + +Values are expected to be json + +``` +// set database +consul kv put micro/config/database '{"address": "10.0.0.1", "port": 3306}' +// set cache +consul kv put micro/config/cache '{"address": "10.0.0.2", "port": 6379}' +``` + +Keys are split on `/` so access becomes + +``` +conf.Get("micro", "config", "database") +``` + +## New Source + +Specify source with data + +```go +consulSource := consul.NewSource( + // optionally specify consul address; default to localhost:8500 + consul.WithAddress("10.0.0.10:8500"), + // optionally specify prefix; defaults to /micro/config + consul.WithPrefix("/my/prefix"), + // optionally strip the provided prefix from the keys, defaults to false + consul.StripPrefix(true), +) +``` + +## Load Source + +Load the source into config + +```go +// Create new config +conf := config.NewConfig() + +// Load file source +conf.Load(consulSource) +``` diff --git a/config/source/consul/consul.go b/config/source/consul/consul.go new file mode 100644 index 00000000..16e85332 --- /dev/null +++ b/config/source/consul/consul.go @@ -0,0 +1,121 @@ +package consul + +import ( + "fmt" + "net" + "time" + + "github.com/hashicorp/consul/api" + "github.com/micro/go-micro/config/source" +) + +// Currently a single consul reader +type consul struct { + prefix string + stripPrefix string + addr string + opts source.Options + client *api.Client +} + +var ( + // DefaultPrefix is the prefix that consul keys will be assumed to have if you + // haven't specified one + DefaultPrefix = "/micro/config/" +) + +func (c *consul) Read() (*source.ChangeSet, error) { + kv, _, err := c.client.KV().List(c.prefix, nil) + if err != nil { + return nil, err + } + + if kv == nil || len(kv) == 0 { + return nil, fmt.Errorf("source not found: %s", c.prefix) + } + + data, err := makeMap(c.opts.Encoder, kv, c.stripPrefix) + if err != nil { + return nil, fmt.Errorf("error reading data: %v", err) + } + + b, err := c.opts.Encoder.Encode(data) + if err != nil { + return nil, fmt.Errorf("error reading source: %v", err) + } + + cs := &source.ChangeSet{ + Timestamp: time.Now(), + Format: c.opts.Encoder.String(), + Source: c.String(), + Data: b, + } + cs.Checksum = cs.Sum() + + return cs, nil +} + +func (c *consul) String() string { + return "consul" +} + +func (c *consul) Watch() (source.Watcher, error) { + w, err := newWatcher(c.prefix, c.addr, c.String(), c.stripPrefix, c.opts.Encoder) + if err != nil { + return nil, err + } + return w, nil +} + +// NewSource creates a new consul source +func NewSource(opts ...source.Option) source.Source { + options := source.NewOptions(opts...) + + // use default config + config := api.DefaultConfig() + + // check if there are any addrs + a, ok := options.Context.Value(addressKey{}).(string) + if ok { + addr, port, err := net.SplitHostPort(a) + if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" { + port = "8500" + addr = a + config.Address = fmt.Sprintf("%s:%s", addr, port) + } else if err == nil { + config.Address = fmt.Sprintf("%s:%s", addr, port) + } + } + + dc, ok := options.Context.Value(dcKey{}).(string) + if ok { + config.Datacenter = dc + } + + token, ok := options.Context.Value(tokenKey{}).(string) + if ok { + config.Token = token + } + + // create the client + client, _ := api.NewClient(config) + + prefix := DefaultPrefix + sp := "" + f, ok := options.Context.Value(prefixKey{}).(string) + if ok { + prefix = f + } + + if b, ok := options.Context.Value(stripPrefixKey{}).(bool); ok && b { + sp = prefix + } + + return &consul{ + prefix: prefix, + stripPrefix: sp, + addr: config.Address, + opts: options, + client: client, + } +} diff --git a/config/source/consul/options.go b/config/source/consul/options.go new file mode 100644 index 00000000..9420a803 --- /dev/null +++ b/config/source/consul/options.go @@ -0,0 +1,63 @@ +package consul + +import ( + "context" + + "github.com/micro/go-micro/config/source" +) + +type addressKey struct{} +type prefixKey struct{} +type stripPrefixKey struct{} +type dcKey struct{} +type tokenKey struct{} + +// WithAddress sets the consul address +func WithAddress(a string) source.Option { + return func(o *source.Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, addressKey{}, a) + } +} + +// WithPrefix sets the key prefix to use +func WithPrefix(p string) source.Option { + return func(o *source.Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, prefixKey{}, p) + } +} + +// StripPrefix indicates whether to remove the prefix from config entries, or leave it in place. +func StripPrefix(strip bool) source.Option { + return func(o *source.Options) { + if o.Context == nil { + o.Context = context.Background() + } + + o.Context = context.WithValue(o.Context, stripPrefixKey{}, strip) + } +} + +func WithDatacenter(p string) source.Option { + return func(o *source.Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, dcKey{}, p) + } +} + +// WithToken sets the key token to use +func WithToken(p string) source.Option { + return func(o *source.Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, tokenKey{}, p) + } +} diff --git a/config/source/consul/util.go b/config/source/consul/util.go new file mode 100644 index 00000000..db6f708d --- /dev/null +++ b/config/source/consul/util.go @@ -0,0 +1,49 @@ +package consul + +import ( + "fmt" + "strings" + + "github.com/hashicorp/consul/api" + "github.com/micro/go-micro/config/encoder" +) + +func makeMap(e encoder.Encoder, kv api.KVPairs, stripPrefix string) (map[string]interface{}, error) { + data := make(map[string]interface{}) + + // consul guarantees lexicographic order, so no need to sort + for _, v := range kv { + pathString := strings.TrimPrefix(strings.TrimPrefix(v.Key, stripPrefix), "/") + var val map[string]interface{} + + // ensure a valid value is stored at this location + if len(v.Value) > 0 { + if err := e.Decode(v.Value, &val); err != nil { + return nil, fmt.Errorf("faild decode value. path: %s, error: %s", pathString, err) + } + } + + // set target at the root + target := data + + // then descend to the target location, creating as we go, if need be + if pathString != "" { + path := strings.Split(pathString, "/") + // find (or create) the location we want to put this value at + for _, dir := range path { + if _, ok := target[dir]; !ok { + target[dir] = make(map[string]interface{}) + } + target = target[dir].(map[string]interface{}) + } + + } + + // copy over the keys from the value + for k := range val { + target[k] = val[k] + } + } + + return data, nil +} diff --git a/config/source/consul/watcher.go b/config/source/consul/watcher.go new file mode 100644 index 00000000..e6993d8b --- /dev/null +++ b/config/source/consul/watcher.go @@ -0,0 +1,96 @@ +package consul + +import ( + "errors" + "time" + + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/api/watch" + "github.com/micro/go-micro/config/encoder" + "github.com/micro/go-micro/config/source" +) + +type watcher struct { + e encoder.Encoder + name string + stripPrefix string + + wp *watch.Plan + ch chan *source.ChangeSet + exit chan bool +} + +func newWatcher(key, addr, name, stripPrefix string, e encoder.Encoder) (source.Watcher, error) { + w := &watcher{ + e: e, + name: name, + stripPrefix: stripPrefix, + ch: make(chan *source.ChangeSet), + exit: make(chan bool), + } + + wp, err := watch.Parse(map[string]interface{}{"type": "keyprefix", "prefix": key}) + if err != nil { + return nil, err + } + + wp.Handler = w.handle + + // wp.Run is a blocking call and will prevent newWatcher from returning + go wp.Run(addr) + + w.wp = wp + + return w, nil +} + +func (w *watcher) handle(idx uint64, data interface{}) { + if data == nil { + return + } + + kvs, ok := data.(api.KVPairs) + if !ok { + return + } + + d, err := makeMap(w.e, kvs, w.stripPrefix) + if err != nil { + return + } + + b, err := w.e.Encode(d) + if err != nil { + return + } + + cs := &source.ChangeSet{ + Timestamp: time.Now(), + Format: w.e.String(), + Source: w.name, + Data: b, + } + cs.Checksum = cs.Sum() + + w.ch <- cs +} + +func (w *watcher) Next() (*source.ChangeSet, error) { + select { + case cs := <-w.ch: + return cs, nil + case <-w.exit: + return nil, errors.New("watcher stopped") + } +} + +func (w *watcher) Stop() error { + select { + case <-w.exit: + return nil + default: + w.wp.Stop() + close(w.exit) + } + return nil +} diff --git a/config/source/env/README.md b/config/source/env/README.md new file mode 100644 index 00000000..25bfa07a --- /dev/null +++ b/config/source/env/README.md @@ -0,0 +1,96 @@ +# Env Source + +The env source reads config from environment variables + +## Format + +We expect environment variables to be in the standard format of FOO=bar + +Keys are converted to lowercase and split on underscore. + + +### Example + +``` +DATABASE_ADDRESS=127.0.0.1 +DATABASE_PORT=3306 +``` + +Becomes + +```json +{ + "database": { + "address": "127.0.0.1", + "port": 3306 + } +} +``` + +## Prefixes + +Environment variables can be namespaced so we only have access to a subset. Two options are available: + +``` +WithPrefix(p ...string) +WithStrippedPrefix(p ...string) +``` + +The former will preserve the prefix and make it a top level key in the config. The latter eliminates the prefix, reducing the nesting by one. + +#### Example: + +Given ENVs of: + +``` +APP_DATABASE_ADDRESS=127.0.0.1 +APP_DATABASE_PORT=3306 +VAULT_ADDR=vault:1337 +``` + +and a source initialized as follows: + +``` +src := env.NewSource( + env.WithPrefix("VAULT"), + env.WithStrippedPrefix("APP"), +) +``` + +The resulting config will be: + +``` +{ + "database": { + "address": "127.0.0.1", + "port": 3306 + }, + "vault": { + "addr": "vault:1337" + } +} +``` + + +## New Source + +Specify source with data + +```go +src := env.NewSource( + // optionally specify prefix + env.WithPrefix("MICRO"), +) +``` + +## Load Source + +Load the source into config + +```go +// Create new config +conf := config.NewConfig() + +// Load file source +conf.Load(src) +``` diff --git a/config/source/env/env.go b/config/source/env/env.go new file mode 100644 index 00000000..62dbf8b2 --- /dev/null +++ b/config/source/env/env.go @@ -0,0 +1,142 @@ +package env + +import ( + "os" + "strconv" + "strings" + "time" + + "github.com/imdario/mergo" + "github.com/micro/go-micro/config/source" +) + +var ( + DefaultPrefixes = []string{} +) + +type env struct { + prefixes []string + strippedPrefixes []string + opts source.Options +} + +func (e *env) Read() (*source.ChangeSet, error) { + var changes map[string]interface{} + + for _, env := range os.Environ() { + + if len(e.prefixes) > 0 || len(e.strippedPrefixes) > 0 { + notFound := true + + if _, ok := matchPrefix(e.prefixes, env); ok { + notFound = false + } + + if match, ok := matchPrefix(e.strippedPrefixes, env); ok { + env = strings.TrimPrefix(env, match) + notFound = false + } + + if notFound { + continue + } + } + + pair := strings.SplitN(env, "=", 2) + value := pair[1] + keys := strings.Split(strings.ToLower(pair[0]), "_") + reverse(keys) + + tmp := make(map[string]interface{}) + for i, k := range keys { + if i == 0 { + if intValue, err := strconv.Atoi(value); err == nil { + tmp[k] = intValue + } else if boolValue, err := strconv.ParseBool(value); err == nil { + tmp[k] = boolValue + } else { + tmp[k] = value + } + continue + } + + tmp = map[string]interface{}{k: tmp} + } + + if err := mergo.Map(&changes, tmp); err != nil { + return nil, err + } + } + + b, err := e.opts.Encoder.Encode(changes) + if err != nil { + return nil, err + } + + cs := &source.ChangeSet{ + Format: e.opts.Encoder.String(), + Data: b, + Timestamp: time.Now(), + Source: e.String(), + } + cs.Checksum = cs.Sum() + + return cs, nil +} + +func matchPrefix(pre []string, s string) (string, bool) { + for _, p := range pre { + if strings.HasPrefix(s, p) { + return p, true + } + } + + return "", false +} + +func reverse(ss []string) { + for i := len(ss)/2 - 1; i >= 0; i-- { + opp := len(ss) - 1 - i + ss[i], ss[opp] = ss[opp], ss[i] + } +} + +func (e *env) Watch() (source.Watcher, error) { + return newWatcher() +} + +func (e *env) String() string { + return "env" +} + +// NewSource returns a config source for parsing ENV variables. +// Underscores are delimiters for nesting, and all keys are lowercased. +// +// Example: +// "DATABASE_SERVER_HOST=localhost" will convert to +// +// { +// "database": { +// "server": { +// "host": "localhost" +// } +// } +// } +func NewSource(opts ...source.Option) source.Source { + options := source.NewOptions(opts...) + + var sp []string + var pre []string + if p, ok := options.Context.Value(strippedPrefixKey{}).([]string); ok { + sp = p + } + + if p, ok := options.Context.Value(prefixKey{}).([]string); ok { + pre = p + } + + if len(sp) > 0 || len(pre) > 0 { + pre = append(pre, DefaultPrefixes...) + } + return &env{prefixes: pre, strippedPrefixes: sp, opts: options} +} diff --git a/config/source/env/env_test.go b/config/source/env/env_test.go new file mode 100644 index 00000000..891d8d8b --- /dev/null +++ b/config/source/env/env_test.go @@ -0,0 +1,112 @@ +package env + +import ( + "encoding/json" + "os" + "testing" + "time" + + "github.com/micro/go-micro/config/source" +) + +func TestEnv_Read(t *testing.T) { + expected := map[string]map[string]string{ + "database": { + "host": "localhost", + "password": "password", + "datasource": "user:password@tcp(localhost:port)/db?charset=utf8mb4&parseTime=True&loc=Local", + }, + } + + os.Setenv("DATABASE_HOST", "localhost") + os.Setenv("DATABASE_PASSWORD", "password") + os.Setenv("DATABASE_DATASOURCE", "user:password@tcp(localhost:port)/db?charset=utf8mb4&parseTime=True&loc=Local") + + source := NewSource() + c, err := source.Read() + if err != nil { + t.Error(err) + } + + var actual map[string]interface{} + if err := json.Unmarshal(c.Data, &actual); err != nil { + t.Error(err) + } + + actualDB := actual["database"].(map[string]interface{}) + + for k, v := range expected["database"] { + a := actualDB[k] + + if a != v { + t.Errorf("expected %v got %v", v, a) + } + } +} + +func TestEnvvar_Prefixes(t *testing.T) { + os.Setenv("APP_DATABASE_HOST", "localhost") + os.Setenv("APP_DATABASE_PASSWORD", "password") + os.Setenv("VAULT_ADDR", "vault:1337") + os.Setenv("MICRO_REGISTRY", "mdns") + + var prefixtests = []struct { + prefixOpts []source.Option + expectedKeys []string + }{ + {[]source.Option{WithPrefix("APP", "MICRO")}, []string{"app", "micro"}}, + {[]source.Option{WithPrefix("MICRO"), WithStrippedPrefix("APP")}, []string{"database", "micro"}}, + {[]source.Option{WithPrefix("MICRO"), WithStrippedPrefix("APP")}, []string{"database", "micro"}}, + } + + for _, pt := range prefixtests { + source := NewSource(pt.prefixOpts...) + + c, err := source.Read() + if err != nil { + t.Error(err) + } + + var actual map[string]interface{} + if err := json.Unmarshal(c.Data, &actual); err != nil { + t.Error(err) + } + + // assert other prefixes ignored + if l := len(actual); l != len(pt.expectedKeys) { + t.Errorf("expected %v top keys, got %v", len(pt.expectedKeys), l) + } + + for _, k := range pt.expectedKeys { + if !containsKey(actual, k) { + t.Errorf("expected key %v, not found", k) + } + } + } +} + +func TestEnvvar_WatchNextNoOpsUntilStop(t *testing.T) { + source := NewSource(WithStrippedPrefix("GOMICRO_")) + w, err := source.Watch() + if err != nil { + t.Error(err) + } + + go func() { + time.Sleep(50 * time.Millisecond) + w.Stop() + }() + + if _, err := w.Next(); err.Error() != "watcher stopped" { + t.Errorf("expected watcher stopped error, got %v", err) + } +} + +func containsKey(m map[string]interface{}, s string) bool { + for k := range m { + if k == s { + return true + } + } + return false +} diff --git a/config/source/env/options.go b/config/source/env/options.go new file mode 100644 index 00000000..112a7db2 --- /dev/null +++ b/config/source/env/options.go @@ -0,0 +1,49 @@ +package env + +import ( + "context" + + "strings" + + "github.com/micro/go-micro/config/source" +) + +type strippedPrefixKey struct{} +type prefixKey struct{} + +// WithStrippedPrefix sets the environment variable prefixes to scope to. +// These prefixes will be removed from the actual config entries. +func WithStrippedPrefix(p ...string) source.Option { + return func(o *source.Options) { + if o.Context == nil { + o.Context = context.Background() + } + + o.Context = context.WithValue(o.Context, strippedPrefixKey{}, appendUnderscore(p)) + } +} + +// WithPrefix sets the environment variable prefixes to scope to. +// These prefixes will not be removed. Each prefix will be considered a top level config entry. +func WithPrefix(p ...string) source.Option { + return func(o *source.Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, prefixKey{}, appendUnderscore(p)) + } +} + +func appendUnderscore(prefixes []string) []string { + var result []string + for _, p := range prefixes { + if !strings.HasSuffix(p, "_") { + result = append(result, p+"_") + continue + } + + result = append(result, p) + } + + return result +} diff --git a/config/source/env/watcher.go b/config/source/env/watcher.go new file mode 100644 index 00000000..5dd3ef34 --- /dev/null +++ b/config/source/env/watcher.go @@ -0,0 +1,26 @@ +package env + +import ( + "errors" + + "github.com/micro/go-micro/config/source" +) + +type watcher struct { + exit chan struct{} +} + +func (w *watcher) Next() (*source.ChangeSet, error) { + <-w.exit + + return nil, errors.New("watcher stopped") +} + +func (w *watcher) Stop() error { + close(w.exit) + return nil +} + +func newWatcher() (source.Watcher, error) { + return &watcher{exit: make(chan struct{})}, nil +} diff --git a/config/source/file/README.md b/config/source/file/README.md new file mode 100644 index 00000000..89c0930c --- /dev/null +++ b/config/source/file/README.md @@ -0,0 +1,70 @@ +# File Source + +The file source reads config from a file. + +It uses the File extension to determine the Format e.g `config.yaml` has the yaml format. +It does not make use of encoders or interpet the file data. If a file extension is not present +the source Format will default to the Encoder in options. + +## Example + +A config file format in json + +```json +{ + "hosts": { + "database": { + "address": "10.0.0.1", + "port": 3306 + }, + "cache": { + "address": "10.0.0.2", + "port": 6379 + } + } +} +``` + +## New Source + +Specify file source with path to file. Path is optional and will default to `config.json` + +```go +fileSource := file.NewSource( + file.WithPath("/tmp/config.json"), +) +``` + +## File Format + +To load different file formats e.g yaml, toml, xml simply specify them with their extension + +``` +fileSource := file.NewSource( + file.WithPath("/tmp/config.yaml"), +) +``` + +If you want to specify a file without extension, ensure you set the encoder to the same format + +``` +e := toml.NewEncoder() + +fileSource := file.NewSource( + file.WithPath("/tmp/config"), + source.WithEncoder(e), +) +``` + +## Load Source + +Load the source into config + +```go +// Create new config +conf := config.NewConfig() + +// Load file source +conf.Load(fileSource) +``` + diff --git a/config/source/file/file.go b/config/source/file/file.go new file mode 100644 index 00000000..8a4b720d --- /dev/null +++ b/config/source/file/file.go @@ -0,0 +1,65 @@ +// Package file is a file source. Expected format is json +package file + +import ( + "io/ioutil" + "os" + + "github.com/micro/go-micro/config/source" +) + +type file struct { + path string + opts source.Options +} + +var ( + DefaultPath = "config.json" +) + +func (f *file) Read() (*source.ChangeSet, error) { + fh, err := os.Open(f.path) + if err != nil { + return nil, err + } + defer fh.Close() + b, err := ioutil.ReadAll(fh) + if err != nil { + return nil, err + } + info, err := fh.Stat() + if err != nil { + return nil, err + } + + cs := &source.ChangeSet{ + Format: format(f.path, f.opts.Encoder), + Source: f.String(), + Timestamp: info.ModTime(), + Data: b, + } + cs.Checksum = cs.Sum() + + return cs, nil +} + +func (f *file) String() string { + return "file" +} + +func (f *file) Watch() (source.Watcher, error) { + if _, err := os.Stat(f.path); err != nil { + return nil, err + } + return newWatcher(f) +} + +func NewSource(opts ...source.Option) source.Source { + options := source.NewOptions(opts...) + path := DefaultPath + f, ok := options.Context.Value(filePathKey{}).(string) + if ok { + path = f + } + return &file{opts: options, path: path} +} diff --git a/config/source/file/file_test.go b/config/source/file/file_test.go new file mode 100644 index 00000000..f144a387 --- /dev/null +++ b/config/source/file/file_test.go @@ -0,0 +1,37 @@ +package file + +import ( + "fmt" + "os" + "path/filepath" + "testing" + "time" +) + +func TestFile(t *testing.T) { + data := []byte(`{"foo": "bar"}`) + path := filepath.Join(os.TempDir(), fmt.Sprintf("file.%d", time.Now().UnixNano())) + fh, err := os.Create(path) + if err != nil { + t.Error(err) + } + defer func() { + fh.Close() + os.Remove(path) + }() + + _, err = fh.Write(data) + if err != nil { + t.Error(err) + } + + f := NewSource(WithPath(path)) + c, err := f.Read() + if err != nil { + t.Error(err) + } + t.Logf("%+v", c) + if string(c.Data) != string(data) { + t.Error("data from file does not match") + } +} diff --git a/config/source/file/format.go b/config/source/file/format.go new file mode 100644 index 00000000..63f51bd9 --- /dev/null +++ b/config/source/file/format.go @@ -0,0 +1,15 @@ +package file + +import ( + "strings" + + "github.com/micro/go-micro/config/encoder" +) + +func format(p string, e encoder.Encoder) string { + parts := strings.Split(p, ".") + if len(parts) > 1 { + return parts[len(parts)-1] + } + return e.String() +} diff --git a/config/source/file/format_test.go b/config/source/file/format_test.go new file mode 100644 index 00000000..47f09836 --- /dev/null +++ b/config/source/file/format_test.go @@ -0,0 +1,31 @@ +package file + +import ( + "testing" + + "github.com/micro/go-micro/config/source" +) + +func TestFormat(t *testing.T) { + opts := source.NewOptions() + e := opts.Encoder + + testCases := []struct { + p string + f string + }{ + {"/foo/bar.json", "json"}, + {"/foo/bar.yaml", "yaml"}, + {"/foo/bar.xml", "xml"}, + {"/foo/bar.conf.ini", "ini"}, + {"conf", e.String()}, + } + + for _, d := range testCases { + f := format(d.p, e) + if f != d.f { + t.Fatalf("%s: expected %s got %s", d.p, d.f, f) + } + } + +} diff --git a/config/source/file/options.go b/config/source/file/options.go new file mode 100644 index 00000000..e9b16e90 --- /dev/null +++ b/config/source/file/options.go @@ -0,0 +1,19 @@ +package file + +import ( + "context" + + "github.com/micro/go-micro/config/source" +) + +type filePathKey struct{} + +// WithPath sets the path to file +func WithPath(p string) source.Option { + return func(o *source.Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, filePathKey{}, p) + } +} diff --git a/config/source/file/watcher.go b/config/source/file/watcher.go new file mode 100644 index 00000000..de28b07b --- /dev/null +++ b/config/source/file/watcher.go @@ -0,0 +1,66 @@ +package file + +import ( + "errors" + "os" + + "github.com/fsnotify/fsnotify" + "github.com/micro/go-micro/config/source" +) + +type watcher struct { + f *file + + fw *fsnotify.Watcher + exit chan bool +} + +func newWatcher(f *file) (source.Watcher, error) { + fw, err := fsnotify.NewWatcher() + if err != nil { + return nil, err + } + + fw.Add(f.path) + + return &watcher{ + f: f, + fw: fw, + exit: make(chan bool), + }, nil +} + +func (w *watcher) Next() (*source.ChangeSet, error) { + // is it closed? + select { + case <-w.exit: + return nil, errors.New("watcher stopped") + default: + } + + // try get the event + select { + case event, _ := <-w.fw.Events: + if event.Op == fsnotify.Rename { + // check existence of file, and add watch again + _, err := os.Stat(event.Name) + if err == nil || os.IsExist(err) { + w.fw.Add(event.Name) + } + } + + c, err := w.f.Read() + if err != nil { + return nil, err + } + return c, nil + case err := <-w.fw.Errors: + return nil, err + case <-w.exit: + return nil, errors.New("watcher stopped") + } +} + +func (w *watcher) Stop() error { + return w.fw.Close() +} diff --git a/config/source/flag/README.md b/config/source/flag/README.md new file mode 100644 index 00000000..fb78bae2 --- /dev/null +++ b/config/source/flag/README.md @@ -0,0 +1,47 @@ +# Flag Source + +The flag source reads config from flags + +## Format + +We expect the use of the `flag` package. Upper case flags will be lower cased. Dashes will be used as delimiters. + +### Example + +``` +dbAddress := flag.String("database_address", "127.0.0.1", "the db address") +dbPort := flag.Int("database_port", 3306, "the db port) +``` + +Becomes + +```json +{ + "database": { + "address": "127.0.0.1", + "port": 3306 + } +} +``` + +## New Source + +```go +flagSource := flag.NewSource( + // optionally enable reading of unset flags and their default + // values into config, defaults to false + IncludeUnset(true) +) +``` + +## Load Source + +Load the source into config + +```go +// Create new config +conf := config.NewConfig() + +// Load file source +conf.Load(flagSource) +``` diff --git a/config/source/flag/flag.go b/config/source/flag/flag.go new file mode 100644 index 00000000..04483d5b --- /dev/null +++ b/config/source/flag/flag.go @@ -0,0 +1,97 @@ +package flag + +import ( + "errors" + "flag" + "github.com/imdario/mergo" + "github.com/micro/go-micro/config/source" + "strings" + "time" +) + +type flagsrc struct { + opts source.Options +} + +func (fs *flagsrc) Read() (*source.ChangeSet, error) { + if !flag.Parsed() { + return nil, errors.New("flags not parsed") + } + + var changes map[string]interface{} + + visitFn := func(f *flag.Flag) { + n := strings.ToLower(f.Name) + keys := strings.FieldsFunc(n, split) + reverse(keys) + + tmp := make(map[string]interface{}) + for i, k := range keys { + if i == 0 { + tmp[k] = f.Value + continue + } + + tmp = map[string]interface{}{k: tmp} + } + + mergo.Map(&changes, tmp) // need to sort error handling + return + } + + unset, ok := fs.opts.Context.Value(includeUnsetKey{}).(bool) + if ok && unset { + flag.VisitAll(visitFn) + } else { + flag.Visit(visitFn) + } + + b, err := fs.opts.Encoder.Encode(changes) + if err != nil { + return nil, err + } + + cs := &source.ChangeSet{ + Format: fs.opts.Encoder.String(), + Data: b, + Timestamp: time.Now(), + Source: fs.String(), + } + cs.Checksum = cs.Sum() + + return cs, nil +} + +func split(r rune) bool { + return r == '-' || r == '_' +} + +func reverse(ss []string) { + for i := len(ss)/2 - 1; i >= 0; i-- { + opp := len(ss) - 1 - i + ss[i], ss[opp] = ss[opp], ss[i] + } +} + +func (fs *flagsrc) Watch() (source.Watcher, error) { + return source.NewNoopWatcher() +} + +func (fs *flagsrc) String() string { + return "flag" +} + +// NewSource returns a config source for integrating parsed flags. +// Hyphens are delimiters for nesting, and all keys are lowercased. +// +// Example: +// dbhost := flag.String("database-host", "localhost", "the db host name") +// +// { +// "database": { +// "host": "localhost" +// } +// } +func NewSource(opts ...source.Option) source.Source { + return &flagsrc{opts: source.NewOptions(opts...)} +} diff --git a/config/source/flag/flag_test.go b/config/source/flag/flag_test.go new file mode 100644 index 00000000..1b07ac81 --- /dev/null +++ b/config/source/flag/flag_test.go @@ -0,0 +1,66 @@ +package flag + +import ( + "encoding/json" + "flag" + "testing" +) + +var ( + dbuser = flag.String("database-user", "default", "db user") + dbhost = flag.String("database-host", "", "db host") + dbpw = flag.String("database-password", "", "db pw") +) + +func init() { + flag.Set("database-host", "localhost") + flag.Set("database-password", "some-password") + flag.Parse() +} + +func TestFlagsrc_Read(t *testing.T) { + source := NewSource() + c, err := source.Read() + if err != nil { + t.Error(err) + } + + var actual map[string]interface{} + if err := json.Unmarshal(c.Data, &actual); err != nil { + t.Error(err) + } + + actualDB := actual["database"].(map[string]interface{}) + if actualDB["host"] != *dbhost { + t.Errorf("expected %v got %v", *dbhost, actualDB["host"]) + } + + if actualDB["password"] != *dbpw { + t.Errorf("expected %v got %v", *dbpw, actualDB["password"]) + } + + // unset flags should not be loaded + if actualDB["user"] != nil { + t.Errorf("expected %v got %v", nil, actualDB["user"]) + } +} + +func TestFlagsrc_ReadAll(t *testing.T) { + source := NewSource(IncludeUnset(true)) + c, err := source.Read() + if err != nil { + t.Error(err) + } + + var actual map[string]interface{} + if err := json.Unmarshal(c.Data, &actual); err != nil { + t.Error(err) + } + + actualDB := actual["database"].(map[string]interface{}) + + // unset flag defaults should be loaded + if actualDB["user"] != *dbuser { + t.Errorf("expected %v got %v", *dbuser, actualDB["user"]) + } +} diff --git a/config/source/flag/options.go b/config/source/flag/options.go new file mode 100644 index 00000000..369cccb6 --- /dev/null +++ b/config/source/flag/options.go @@ -0,0 +1,20 @@ +package flag + +import ( + "context" + + "github.com/micro/go-micro/config/source" +) + +type includeUnsetKey struct{} + +// IncludeUnset toggles the loading of unset flags and their respective default values. +// Default behavior is to ignore any unset flags. +func IncludeUnset(b bool) source.Option { + return func(o *source.Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, includeUnsetKey{}, true) + } +} diff --git a/config/source/memory/README.md b/config/source/memory/README.md new file mode 100644 index 00000000..2c8038d8 --- /dev/null +++ b/config/source/memory/README.md @@ -0,0 +1,44 @@ +# Memory Source + +The memory source provides in-memory data as a source + +## Memory Format + +The expected data format is json + +```json +data := []byte(`{ + "hosts": { + "database": { + "address": "10.0.0.1", + "port": 3306 + }, + "cache": { + "address": "10.0.0.2", + "port": 6379 + } + } +}`) +``` + +## New Source + +Specify source with data + +```go +memorySource := memory.NewSource( + memory.WithData(data), +) +``` + +## Load Source + +Load the source into config + +```go +// Create new config +conf := config.NewConfig() + +// Load file source +conf.Load(memorySource) +``` diff --git a/config/source/memory/memory.go b/config/source/memory/memory.go new file mode 100644 index 00000000..50a509fd --- /dev/null +++ b/config/source/memory/memory.go @@ -0,0 +1,93 @@ +// Package memory is a memory source +package memory + +import ( + "sync" + "time" + + "github.com/micro/go-micro/config/source" + "github.com/pborman/uuid" +) + +type memory struct { + sync.RWMutex + ChangeSet *source.ChangeSet + Watchers map[string]*watcher +} + +func (s *memory) Read() (*source.ChangeSet, error) { + s.RLock() + cs := &source.ChangeSet{ + Timestamp: s.ChangeSet.Timestamp, + Data: s.ChangeSet.Data, + Checksum: s.ChangeSet.Checksum, + Source: s.ChangeSet.Source, + } + s.RUnlock() + return cs, nil +} + +func (s *memory) Watch() (source.Watcher, error) { + w := &watcher{ + Id: uuid.NewUUID().String(), + Updates: make(chan *source.ChangeSet, 100), + Source: s, + } + + s.Lock() + s.Watchers[w.Id] = w + s.Unlock() + return w, nil +} + +// Update allows manual updates of the config data. +func (s *memory) Update(c *source.ChangeSet) { + // don't process nil + if c == nil { + return + } + + // hash the file + s.Lock() + // update changeset + s.ChangeSet = &source.ChangeSet{ + Data: c.Data, + Format: c.Format, + Source: "memory", + Timestamp: time.Now(), + } + s.ChangeSet.Checksum = s.ChangeSet.Sum() + + // update watchers + for _, w := range s.Watchers { + select { + case w.Updates <- s.ChangeSet: + default: + } + } + s.Unlock() +} + +func (s *memory) String() string { + return "memory" +} + +func NewSource(opts ...source.Option) source.Source { + var options source.Options + for _, o := range opts { + o(&options) + } + + s := &memory{ + Watchers: make(map[string]*watcher), + } + + if options.Context != nil { + c, ok := options.Context.Value(changeSetKey{}).(*source.ChangeSet) + if ok { + s.Update(c) + } + } + + return s +} diff --git a/config/source/memory/options.go b/config/source/memory/options.go new file mode 100644 index 00000000..5b1e6c57 --- /dev/null +++ b/config/source/memory/options.go @@ -0,0 +1,32 @@ +package memory + +import ( + "context" + + "github.com/micro/go-micro/config/source" +) + +type changeSetKey struct{} + +// WithChangeSet allows a changeset to be set +func WithChangeSet(cs *source.ChangeSet) source.Option { + return func(o *source.Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, changeSetKey{}, cs) + } +} + +// WithData allows the source data to be set +func WithData(d []byte) source.Option { + return func(o *source.Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, changeSetKey{}, &source.ChangeSet{ + Data: d, + Format: "json", + }) + } +} diff --git a/config/source/memory/watcher.go b/config/source/memory/watcher.go new file mode 100644 index 00000000..4cdc09e1 --- /dev/null +++ b/config/source/memory/watcher.go @@ -0,0 +1,23 @@ +package memory + +import ( + "github.com/micro/go-micro/config/source" +) + +type watcher struct { + Id string + Updates chan *source.ChangeSet + Source *memory +} + +func (w *watcher) Next() (*source.ChangeSet, error) { + cs := <-w.Updates + return cs, nil +} + +func (w *watcher) Stop() error { + w.Source.Lock() + delete(w.Source.Watchers, w.Id) + w.Source.Unlock() + return nil +} diff --git a/config/source/noop.go b/config/source/noop.go new file mode 100644 index 00000000..fc444411 --- /dev/null +++ b/config/source/noop.go @@ -0,0 +1,25 @@ +package source + +import ( + "errors" +) + +type noopWatcher struct { + exit chan struct{} +} + +func (w *noopWatcher) Next() (*ChangeSet, error) { + <-w.exit + + return nil, errors.New("noopWatcher stopped") +} + +func (w *noopWatcher) Stop() error { + close(w.exit) + return nil +} + +// NewNoopWatcher returns a watcher that blocks on Next() until Stop() is called. +func NewNoopWatcher() (Watcher, error) { + return &noopWatcher{exit: make(chan struct{})}, nil +} diff --git a/config/source/options.go b/config/source/options.go new file mode 100644 index 00000000..0f8f7943 --- /dev/null +++ b/config/source/options.go @@ -0,0 +1,38 @@ +package source + +import ( + "context" + + "github.com/micro/go-micro/config/encoder" + "github.com/micro/go-micro/config/encoder/json" +) + +type Options struct { + // Encoder + Encoder encoder.Encoder + + // for alternative data + Context context.Context +} + +type Option func(o *Options) + +func NewOptions(opts ...Option) Options { + options := Options{ + Encoder: json.NewEncoder(), + Context: context.Background(), + } + + for _, o := range opts { + o(&options) + } + + return options +} + +// WithEncoder sets the source encoder +func WithEncoder(e encoder.Encoder) Option { + return func(o *Options) { + o.Encoder = e + } +} diff --git a/config/source/source.go b/config/source/source.go new file mode 100644 index 00000000..828c8ad2 --- /dev/null +++ b/config/source/source.go @@ -0,0 +1,28 @@ +// Package source is the interface for sources +package source + +import ( + "time" +) + +// Source is the source from which config is loaded +type Source interface { + Read() (*ChangeSet, error) + Watch() (Watcher, error) + String() string +} + +// ChangeSet represents a set of changes from a source +type ChangeSet struct { + Data []byte + Checksum string + Format string + Source string + Timestamp time.Time +} + +// Watcher watches a source for changes +type Watcher interface { + Next() (*ChangeSet, error) + Stop() error +} diff --git a/config/value.go b/config/value.go new file mode 100644 index 00000000..929d671b --- /dev/null +++ b/config/value.go @@ -0,0 +1,49 @@ +package config + +import ( + "time" + + "github.com/micro/go-micro/config/reader" +) + +type value struct{} + +func newValue() reader.Value { + return new(value) +} + +func (v *value) Bool(def bool) bool { + return false +} + +func (v *value) Int(def int) int { + return 0 +} + +func (v *value) String(def string) string { + return "" +} + +func (v *value) Float64(def float64) float64 { + return 0.0 +} + +func (v *value) Duration(def time.Duration) time.Duration { + return time.Duration(0) +} + +func (v *value) StringSlice(def []string) []string { + return nil +} + +func (v *value) StringMap(def map[string]string) map[string]string { + return map[string]string{} +} + +func (v *value) Scan(val interface{}) error { + return nil +} + +func (v *value) Bytes() []byte { + return nil +}