diff --git a/config/source/etcd/README.md b/config/source/etcd/README.md deleted file mode 100644 index a3025ad4..00000000 --- a/config/source/etcd/README.md +++ /dev/null @@ -1,51 +0,0 @@ -# Etcd Source - -The etcd source reads config from etcd key/values - -This source supports etcd version 3 and beyond. - -## Etcd Format - -The etcd source expects keys under the default prefix `/micro/config` (prefix can be changed) - -Values are expected to be JSON - -``` -// set database -etcdctl put /micro/config/database '{"address": "10.0.0.1", "port": 3306}' -// set cache -etcdctl put /micro/config/cache '{"address": "10.0.0.2", "port": 6379}' -``` - -Keys are split on `/` so access becomes - -``` -conf.Get("micro", "config", "database") -``` - -## New Source - -Specify source with data - -```go -etcdSource := etcd.NewSource( - // optionally specify etcd address; default to localhost:8500 - etcd.WithAddress("10.0.0.10:8500"), - // optionally specify prefix; defaults to /micro/config - etcd.WithPrefix("/my/prefix"), - // optionally strip the provided prefix from the keys, defaults to false - etcd.StripPrefix(true), -) -``` - -## Load Source - -Load the source into config - -```go -// Create new config -conf := config.NewConfig() - -// Load file source -conf.Load(etcdSource) -``` diff --git a/config/source/etcd/etcd.go b/config/source/etcd/etcd.go deleted file mode 100644 index 5cbe1286..00000000 --- a/config/source/etcd/etcd.go +++ /dev/null @@ -1,141 +0,0 @@ -package etcd - -import ( - "context" - "fmt" - "net" - "time" - - cetcd "github.com/coreos/etcd/clientv3" - "github.com/coreos/etcd/mvcc/mvccpb" - "github.com/micro/go-micro/config/source" -) - -// Currently a single etcd reader -type etcd struct { - prefix string - stripPrefix string - opts source.Options - client *cetcd.Client - cerr error -} - -var ( - DefaultPrefix = "/micro/config/" -) - -func (c *etcd) Read() (*source.ChangeSet, error) { - if c.cerr != nil { - return nil, c.cerr - } - - rsp, err := c.client.Get(context.Background(), c.prefix, cetcd.WithPrefix()) - if err != nil { - return nil, err - } - - if rsp == nil || len(rsp.Kvs) == 0 { - return nil, fmt.Errorf("source not found: %s", c.prefix) - } - - var kvs []*mvccpb.KeyValue - for _, v := range rsp.Kvs { - kvs = append(kvs, (*mvccpb.KeyValue)(v)) - } - - data := makeMap(c.opts.Encoder, kvs, c.stripPrefix) - - b, err := c.opts.Encoder.Encode(data) - if err != nil { - return nil, fmt.Errorf("error reading source: %v", err) - } - - cs := &source.ChangeSet{ - Timestamp: time.Now(), - Source: c.String(), - Data: b, - Format: c.opts.Encoder.String(), - } - cs.Checksum = cs.Sum() - - return cs, nil -} - -func (c *etcd) String() string { - return "etcd" -} - -func (c *etcd) Watch() (source.Watcher, error) { - if c.cerr != nil { - return nil, c.cerr - } - cs, err := c.Read() - if err != nil { - return nil, err - } - return newWatcher(c.prefix, c.stripPrefix, c.client.Watcher, cs, c.opts) -} - -func NewSource(opts ...source.Option) source.Source { - options := source.NewOptions(opts...) - - var endpoints []string - - // check if there are any addrs - addrs, ok := options.Context.Value(addressKey{}).([]string) - if ok { - for _, a := range addrs { - addr, port, err := net.SplitHostPort(a) - if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" { - port = "2379" - addr = a - endpoints = append(endpoints, fmt.Sprintf("%s:%s", addr, port)) - } else if err == nil { - endpoints = append(endpoints, fmt.Sprintf("%s:%s", addr, port)) - } - } - } - - if len(endpoints) == 0 { - endpoints = []string{"localhost:2379"} - } - - // check dial timeout option - dialTimeout, ok := options.Context.Value(dialTimeoutKey{}).(time.Duration) - if !ok { - dialTimeout = 3 * time.Second // default dial timeout - } - - config := cetcd.Config{ - Endpoints: endpoints, - DialTimeout: dialTimeout, - } - - u, ok := options.Context.Value(authKey{}).(*authCreds) - if ok { - config.Username = u.Username - config.Password = u.Password - } - - // use default config - client, err := cetcd.New(config) - - prefix := DefaultPrefix - sp := "" - f, ok := options.Context.Value(prefixKey{}).(string) - if ok { - prefix = f - } - - if b, ok := options.Context.Value(stripPrefixKey{}).(bool); ok && b { - sp = prefix - } - - return &etcd{ - prefix: prefix, - stripPrefix: sp, - opts: options, - client: client, - cerr: err, - } -} diff --git a/config/source/etcd/options.go b/config/source/etcd/options.go deleted file mode 100644 index 325d2f46..00000000 --- a/config/source/etcd/options.go +++ /dev/null @@ -1,70 +0,0 @@ -package etcd - -import ( - "context" - "time" - - "github.com/micro/go-micro/config/source" -) - -type addressKey struct{} -type prefixKey struct{} -type stripPrefixKey struct{} -type authKey struct{} -type dialTimeoutKey struct{} - -type authCreds struct { - Username string - Password string -} - -// WithAddress sets the etcd address -func WithAddress(a ...string) source.Option { - return func(o *source.Options) { - if o.Context == nil { - o.Context = context.Background() - } - o.Context = context.WithValue(o.Context, addressKey{}, a) - } -} - -// WithPrefix sets the key prefix to use -func WithPrefix(p string) source.Option { - return func(o *source.Options) { - if o.Context == nil { - o.Context = context.Background() - } - o.Context = context.WithValue(o.Context, prefixKey{}, p) - } -} - -// StripPrefix indicates whether to remove the prefix from config entries, or leave it in place. -func StripPrefix(strip bool) source.Option { - return func(o *source.Options) { - if o.Context == nil { - o.Context = context.Background() - } - - o.Context = context.WithValue(o.Context, stripPrefixKey{}, strip) - } -} - -// Auth allows you to specify username/password -func Auth(username, password string) source.Option { - return func(o *source.Options) { - if o.Context == nil { - o.Context = context.Background() - } - o.Context = context.WithValue(o.Context, authKey{}, &authCreds{Username: username, Password: password}) - } -} - -// WithDialTimeout set the time out for dialing to etcd -func WithDialTimeout(timeout time.Duration) source.Option { - return func(o *source.Options) { - if o.Context == nil { - o.Context = context.Background() - } - o.Context = context.WithValue(o.Context, dialTimeoutKey{}, timeout) - } -} diff --git a/config/source/etcd/util.go b/config/source/etcd/util.go deleted file mode 100644 index 31887fc9..00000000 --- a/config/source/etcd/util.go +++ /dev/null @@ -1,89 +0,0 @@ -package etcd - -import ( - "strings" - - "github.com/coreos/etcd/clientv3" - "github.com/coreos/etcd/mvcc/mvccpb" - "github.com/micro/go-micro/config/encoder" -) - -func makeEvMap(e encoder.Encoder, data map[string]interface{}, kv []*clientv3.Event, stripPrefix string) map[string]interface{} { - if data == nil { - data = make(map[string]interface{}) - } - - for _, v := range kv { - switch mvccpb.Event_EventType(v.Type) { - case mvccpb.DELETE: - data = update(e, data, (*mvccpb.KeyValue)(v.Kv), "delete", stripPrefix) - default: - data = update(e, data, (*mvccpb.KeyValue)(v.Kv), "insert", stripPrefix) - } - } - - return data -} - -func makeMap(e encoder.Encoder, kv []*mvccpb.KeyValue, stripPrefix string) map[string]interface{} { - data := make(map[string]interface{}) - - for _, v := range kv { - data = update(e, data, v, "put", stripPrefix) - } - - return data -} - -func update(e encoder.Encoder, data map[string]interface{}, v *mvccpb.KeyValue, action, stripPrefix string) map[string]interface{} { - // remove prefix if non empty, and ensure leading / is removed as well - vkey := strings.TrimPrefix(strings.TrimPrefix(string(v.Key), stripPrefix), "/") - // split on prefix - haveSplit := strings.Contains(vkey, "/") - keys := strings.Split(vkey, "/") - - var vals interface{} - e.Decode(v.Value, &vals) - - if !haveSplit && len(keys) == 1 { - switch action { - case "delete": - data = make(map[string]interface{}) - default: - v, ok := vals.(map[string]interface{}) - if ok { - data = v - } - } - return data - } - - // set data for first iteration - kvals := data - // iterate the keys and make maps - for i, k := range keys { - kval, ok := kvals[k].(map[string]interface{}) - if !ok { - // create next map - kval = make(map[string]interface{}) - // set it - kvals[k] = kval - } - - // last key: write vals - if l := len(keys) - 1; i == l { - switch action { - case "delete": - delete(kvals, k) - default: - kvals[k] = vals - } - break - } - - // set kvals for next iterator - kvals = kval - } - - return data -} diff --git a/config/source/etcd/watcher.go b/config/source/etcd/watcher.go deleted file mode 100644 index 2f9b3189..00000000 --- a/config/source/etcd/watcher.go +++ /dev/null @@ -1,113 +0,0 @@ -package etcd - -import ( - "context" - "errors" - "sync" - "time" - - cetcd "github.com/coreos/etcd/clientv3" - "github.com/micro/go-micro/config/source" -) - -type watcher struct { - opts source.Options - name string - stripPrefix string - - sync.RWMutex - cs *source.ChangeSet - - ch chan *source.ChangeSet - exit chan bool -} - -func newWatcher(key, strip string, wc cetcd.Watcher, cs *source.ChangeSet, opts source.Options) (source.Watcher, error) { - w := &watcher{ - opts: opts, - name: "etcd", - stripPrefix: strip, - cs: cs, - ch: make(chan *source.ChangeSet), - exit: make(chan bool), - } - - ch := wc.Watch(context.Background(), key, cetcd.WithPrefix()) - - go w.run(wc, ch) - - return w, nil -} - -func (w *watcher) handle(evs []*cetcd.Event) { - w.RLock() - data := w.cs.Data - w.RUnlock() - - var vals map[string]interface{} - - // unpackage existing changeset - if err := w.opts.Encoder.Decode(data, &vals); err != nil { - return - } - - // update base changeset - d := makeEvMap(w.opts.Encoder, vals, evs, w.stripPrefix) - - // pack the changeset - b, err := w.opts.Encoder.Encode(d) - if err != nil { - return - } - - // create new changeset - cs := &source.ChangeSet{ - Timestamp: time.Now(), - Source: w.name, - Data: b, - Format: w.opts.Encoder.String(), - } - cs.Checksum = cs.Sum() - - // set base change set - w.Lock() - w.cs = cs - w.Unlock() - - // send update - w.ch <- cs -} - -func (w *watcher) run(wc cetcd.Watcher, ch cetcd.WatchChan) { - for { - select { - case rsp, ok := <-ch: - if !ok { - return - } - w.handle(rsp.Events) - case <-w.exit: - wc.Close() - return - } - } -} - -func (w *watcher) Next() (*source.ChangeSet, error) { - select { - case cs := <-w.ch: - return cs, nil - case <-w.exit: - return nil, errors.New("watcher stopped") - } -} - -func (w *watcher) Stop() error { - select { - case <-w.exit: - return nil - default: - close(w.exit) - } - return nil -}