From 90a9df9b8cc346cf56021bfdf40b0a2be5823698 Mon Sep 17 00:00:00 2001 From: outshow Date: Tue, 11 Jun 2019 16:18:37 +0800 Subject: [PATCH 1/3] 1. use github.com/coreos instead of go.etcd.io in etcd related import path; 2. add dialtimeout to etcd client --- config/source/etcd/etcd.go | 13 ++++++++++--- config/source/etcd/options.go | 14 +++++++++++++- config/source/etcd/util.go | 4 ++-- config/source/etcd/watcher.go | 2 +- go.mod | 2 +- 5 files changed, 27 insertions(+), 8 deletions(-) diff --git a/config/source/etcd/etcd.go b/config/source/etcd/etcd.go index 873c2de7..5cbe1286 100644 --- a/config/source/etcd/etcd.go +++ b/config/source/etcd/etcd.go @@ -6,9 +6,9 @@ import ( "net" "time" + cetcd "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/mvcc/mvccpb" "github.com/micro/go-micro/config/source" - cetcd "go.etcd.io/etcd/clientv3" - "go.etcd.io/etcd/mvcc/mvccpb" ) // Currently a single etcd reader @@ -100,8 +100,15 @@ func NewSource(opts ...source.Option) source.Source { endpoints = []string{"localhost:2379"} } + // check dial timeout option + dialTimeout, ok := options.Context.Value(dialTimeoutKey{}).(time.Duration) + if !ok { + dialTimeout = 3 * time.Second // default dial timeout + } + config := cetcd.Config{ - Endpoints: endpoints, + Endpoints: endpoints, + DialTimeout: dialTimeout, } u, ok := options.Context.Value(authKey{}).(*authCreds) diff --git a/config/source/etcd/options.go b/config/source/etcd/options.go index 87eff8ef..325d2f46 100644 --- a/config/source/etcd/options.go +++ b/config/source/etcd/options.go @@ -2,6 +2,7 @@ package etcd import ( "context" + "time" "github.com/micro/go-micro/config/source" ) @@ -10,13 +11,14 @@ type addressKey struct{} type prefixKey struct{} type stripPrefixKey struct{} type authKey struct{} +type dialTimeoutKey struct{} type authCreds struct { Username string Password string } -// WithAddress sets the consul address +// WithAddress sets the etcd address func WithAddress(a ...string) source.Option { return func(o *source.Options) { if o.Context == nil { @@ -56,3 +58,13 @@ func Auth(username, password string) source.Option { o.Context = context.WithValue(o.Context, authKey{}, &authCreds{Username: username, Password: password}) } } + +// WithDialTimeout set the time out for dialing to etcd +func WithDialTimeout(timeout time.Duration) source.Option { + return func(o *source.Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, dialTimeoutKey{}, timeout) + } +} diff --git a/config/source/etcd/util.go b/config/source/etcd/util.go index e57475e4..31887fc9 100644 --- a/config/source/etcd/util.go +++ b/config/source/etcd/util.go @@ -3,9 +3,9 @@ package etcd import ( "strings" + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/mvcc/mvccpb" "github.com/micro/go-micro/config/encoder" - "go.etcd.io/etcd/clientv3" - "go.etcd.io/etcd/mvcc/mvccpb" ) func makeEvMap(e encoder.Encoder, data map[string]interface{}, kv []*clientv3.Event, stripPrefix string) map[string]interface{} { diff --git a/config/source/etcd/watcher.go b/config/source/etcd/watcher.go index 1066c899..2f9b3189 100644 --- a/config/source/etcd/watcher.go +++ b/config/source/etcd/watcher.go @@ -6,8 +6,8 @@ import ( "sync" "time" + cetcd "github.com/coreos/etcd/clientv3" "github.com/micro/go-micro/config/source" - cetcd "go.etcd.io/etcd/clientv3" ) type watcher struct { diff --git a/go.mod b/go.mod index b2cb5e95..def78418 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/beevik/ntp v0.2.0 github.com/bitly/go-simplejson v0.5.0 github.com/bwmarrin/discordgo v0.19.0 - github.com/coreos/etcd v3.3.13+incompatible // indirect + github.com/coreos/etcd v3.3.13+incompatible github.com/forestgiant/sliceutil v0.0.0-20160425183142-94783f95db6c github.com/fsnotify/fsnotify v1.4.7 github.com/fsouza/go-dockerclient v1.4.1 From aec1ca66352ab751a603fea3afe6850b5dad914e Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Tue, 11 Jun 2019 09:53:06 +0100 Subject: [PATCH 2/3] remove etcd source --- config/source/etcd/README.md | 51 ------------ config/source/etcd/etcd.go | 141 ---------------------------------- config/source/etcd/options.go | 70 ----------------- config/source/etcd/util.go | 89 --------------------- config/source/etcd/watcher.go | 113 --------------------------- 5 files changed, 464 deletions(-) delete mode 100644 config/source/etcd/README.md delete mode 100644 config/source/etcd/etcd.go delete mode 100644 config/source/etcd/options.go delete mode 100644 config/source/etcd/util.go delete mode 100644 config/source/etcd/watcher.go diff --git a/config/source/etcd/README.md b/config/source/etcd/README.md deleted file mode 100644 index a3025ad4..00000000 --- a/config/source/etcd/README.md +++ /dev/null @@ -1,51 +0,0 @@ -# Etcd Source - -The etcd source reads config from etcd key/values - -This source supports etcd version 3 and beyond. - -## Etcd Format - -The etcd source expects keys under the default prefix `/micro/config` (prefix can be changed) - -Values are expected to be JSON - -``` -// set database -etcdctl put /micro/config/database '{"address": "10.0.0.1", "port": 3306}' -// set cache -etcdctl put /micro/config/cache '{"address": "10.0.0.2", "port": 6379}' -``` - -Keys are split on `/` so access becomes - -``` -conf.Get("micro", "config", "database") -``` - -## New Source - -Specify source with data - -```go -etcdSource := etcd.NewSource( - // optionally specify etcd address; default to localhost:8500 - etcd.WithAddress("10.0.0.10:8500"), - // optionally specify prefix; defaults to /micro/config - etcd.WithPrefix("/my/prefix"), - // optionally strip the provided prefix from the keys, defaults to false - etcd.StripPrefix(true), -) -``` - -## Load Source - -Load the source into config - -```go -// Create new config -conf := config.NewConfig() - -// Load file source -conf.Load(etcdSource) -``` diff --git a/config/source/etcd/etcd.go b/config/source/etcd/etcd.go deleted file mode 100644 index 5cbe1286..00000000 --- a/config/source/etcd/etcd.go +++ /dev/null @@ -1,141 +0,0 @@ -package etcd - -import ( - "context" - "fmt" - "net" - "time" - - cetcd "github.com/coreos/etcd/clientv3" - "github.com/coreos/etcd/mvcc/mvccpb" - "github.com/micro/go-micro/config/source" -) - -// Currently a single etcd reader -type etcd struct { - prefix string - stripPrefix string - opts source.Options - client *cetcd.Client - cerr error -} - -var ( - DefaultPrefix = "/micro/config/" -) - -func (c *etcd) Read() (*source.ChangeSet, error) { - if c.cerr != nil { - return nil, c.cerr - } - - rsp, err := c.client.Get(context.Background(), c.prefix, cetcd.WithPrefix()) - if err != nil { - return nil, err - } - - if rsp == nil || len(rsp.Kvs) == 0 { - return nil, fmt.Errorf("source not found: %s", c.prefix) - } - - var kvs []*mvccpb.KeyValue - for _, v := range rsp.Kvs { - kvs = append(kvs, (*mvccpb.KeyValue)(v)) - } - - data := makeMap(c.opts.Encoder, kvs, c.stripPrefix) - - b, err := c.opts.Encoder.Encode(data) - if err != nil { - return nil, fmt.Errorf("error reading source: %v", err) - } - - cs := &source.ChangeSet{ - Timestamp: time.Now(), - Source: c.String(), - Data: b, - Format: c.opts.Encoder.String(), - } - cs.Checksum = cs.Sum() - - return cs, nil -} - -func (c *etcd) String() string { - return "etcd" -} - -func (c *etcd) Watch() (source.Watcher, error) { - if c.cerr != nil { - return nil, c.cerr - } - cs, err := c.Read() - if err != nil { - return nil, err - } - return newWatcher(c.prefix, c.stripPrefix, c.client.Watcher, cs, c.opts) -} - -func NewSource(opts ...source.Option) source.Source { - options := source.NewOptions(opts...) - - var endpoints []string - - // check if there are any addrs - addrs, ok := options.Context.Value(addressKey{}).([]string) - if ok { - for _, a := range addrs { - addr, port, err := net.SplitHostPort(a) - if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" { - port = "2379" - addr = a - endpoints = append(endpoints, fmt.Sprintf("%s:%s", addr, port)) - } else if err == nil { - endpoints = append(endpoints, fmt.Sprintf("%s:%s", addr, port)) - } - } - } - - if len(endpoints) == 0 { - endpoints = []string{"localhost:2379"} - } - - // check dial timeout option - dialTimeout, ok := options.Context.Value(dialTimeoutKey{}).(time.Duration) - if !ok { - dialTimeout = 3 * time.Second // default dial timeout - } - - config := cetcd.Config{ - Endpoints: endpoints, - DialTimeout: dialTimeout, - } - - u, ok := options.Context.Value(authKey{}).(*authCreds) - if ok { - config.Username = u.Username - config.Password = u.Password - } - - // use default config - client, err := cetcd.New(config) - - prefix := DefaultPrefix - sp := "" - f, ok := options.Context.Value(prefixKey{}).(string) - if ok { - prefix = f - } - - if b, ok := options.Context.Value(stripPrefixKey{}).(bool); ok && b { - sp = prefix - } - - return &etcd{ - prefix: prefix, - stripPrefix: sp, - opts: options, - client: client, - cerr: err, - } -} diff --git a/config/source/etcd/options.go b/config/source/etcd/options.go deleted file mode 100644 index 325d2f46..00000000 --- a/config/source/etcd/options.go +++ /dev/null @@ -1,70 +0,0 @@ -package etcd - -import ( - "context" - "time" - - "github.com/micro/go-micro/config/source" -) - -type addressKey struct{} -type prefixKey struct{} -type stripPrefixKey struct{} -type authKey struct{} -type dialTimeoutKey struct{} - -type authCreds struct { - Username string - Password string -} - -// WithAddress sets the etcd address -func WithAddress(a ...string) source.Option { - return func(o *source.Options) { - if o.Context == nil { - o.Context = context.Background() - } - o.Context = context.WithValue(o.Context, addressKey{}, a) - } -} - -// WithPrefix sets the key prefix to use -func WithPrefix(p string) source.Option { - return func(o *source.Options) { - if o.Context == nil { - o.Context = context.Background() - } - o.Context = context.WithValue(o.Context, prefixKey{}, p) - } -} - -// StripPrefix indicates whether to remove the prefix from config entries, or leave it in place. -func StripPrefix(strip bool) source.Option { - return func(o *source.Options) { - if o.Context == nil { - o.Context = context.Background() - } - - o.Context = context.WithValue(o.Context, stripPrefixKey{}, strip) - } -} - -// Auth allows you to specify username/password -func Auth(username, password string) source.Option { - return func(o *source.Options) { - if o.Context == nil { - o.Context = context.Background() - } - o.Context = context.WithValue(o.Context, authKey{}, &authCreds{Username: username, Password: password}) - } -} - -// WithDialTimeout set the time out for dialing to etcd -func WithDialTimeout(timeout time.Duration) source.Option { - return func(o *source.Options) { - if o.Context == nil { - o.Context = context.Background() - } - o.Context = context.WithValue(o.Context, dialTimeoutKey{}, timeout) - } -} diff --git a/config/source/etcd/util.go b/config/source/etcd/util.go deleted file mode 100644 index 31887fc9..00000000 --- a/config/source/etcd/util.go +++ /dev/null @@ -1,89 +0,0 @@ -package etcd - -import ( - "strings" - - "github.com/coreos/etcd/clientv3" - "github.com/coreos/etcd/mvcc/mvccpb" - "github.com/micro/go-micro/config/encoder" -) - -func makeEvMap(e encoder.Encoder, data map[string]interface{}, kv []*clientv3.Event, stripPrefix string) map[string]interface{} { - if data == nil { - data = make(map[string]interface{}) - } - - for _, v := range kv { - switch mvccpb.Event_EventType(v.Type) { - case mvccpb.DELETE: - data = update(e, data, (*mvccpb.KeyValue)(v.Kv), "delete", stripPrefix) - default: - data = update(e, data, (*mvccpb.KeyValue)(v.Kv), "insert", stripPrefix) - } - } - - return data -} - -func makeMap(e encoder.Encoder, kv []*mvccpb.KeyValue, stripPrefix string) map[string]interface{} { - data := make(map[string]interface{}) - - for _, v := range kv { - data = update(e, data, v, "put", stripPrefix) - } - - return data -} - -func update(e encoder.Encoder, data map[string]interface{}, v *mvccpb.KeyValue, action, stripPrefix string) map[string]interface{} { - // remove prefix if non empty, and ensure leading / is removed as well - vkey := strings.TrimPrefix(strings.TrimPrefix(string(v.Key), stripPrefix), "/") - // split on prefix - haveSplit := strings.Contains(vkey, "/") - keys := strings.Split(vkey, "/") - - var vals interface{} - e.Decode(v.Value, &vals) - - if !haveSplit && len(keys) == 1 { - switch action { - case "delete": - data = make(map[string]interface{}) - default: - v, ok := vals.(map[string]interface{}) - if ok { - data = v - } - } - return data - } - - // set data for first iteration - kvals := data - // iterate the keys and make maps - for i, k := range keys { - kval, ok := kvals[k].(map[string]interface{}) - if !ok { - // create next map - kval = make(map[string]interface{}) - // set it - kvals[k] = kval - } - - // last key: write vals - if l := len(keys) - 1; i == l { - switch action { - case "delete": - delete(kvals, k) - default: - kvals[k] = vals - } - break - } - - // set kvals for next iterator - kvals = kval - } - - return data -} diff --git a/config/source/etcd/watcher.go b/config/source/etcd/watcher.go deleted file mode 100644 index 2f9b3189..00000000 --- a/config/source/etcd/watcher.go +++ /dev/null @@ -1,113 +0,0 @@ -package etcd - -import ( - "context" - "errors" - "sync" - "time" - - cetcd "github.com/coreos/etcd/clientv3" - "github.com/micro/go-micro/config/source" -) - -type watcher struct { - opts source.Options - name string - stripPrefix string - - sync.RWMutex - cs *source.ChangeSet - - ch chan *source.ChangeSet - exit chan bool -} - -func newWatcher(key, strip string, wc cetcd.Watcher, cs *source.ChangeSet, opts source.Options) (source.Watcher, error) { - w := &watcher{ - opts: opts, - name: "etcd", - stripPrefix: strip, - cs: cs, - ch: make(chan *source.ChangeSet), - exit: make(chan bool), - } - - ch := wc.Watch(context.Background(), key, cetcd.WithPrefix()) - - go w.run(wc, ch) - - return w, nil -} - -func (w *watcher) handle(evs []*cetcd.Event) { - w.RLock() - data := w.cs.Data - w.RUnlock() - - var vals map[string]interface{} - - // unpackage existing changeset - if err := w.opts.Encoder.Decode(data, &vals); err != nil { - return - } - - // update base changeset - d := makeEvMap(w.opts.Encoder, vals, evs, w.stripPrefix) - - // pack the changeset - b, err := w.opts.Encoder.Encode(d) - if err != nil { - return - } - - // create new changeset - cs := &source.ChangeSet{ - Timestamp: time.Now(), - Source: w.name, - Data: b, - Format: w.opts.Encoder.String(), - } - cs.Checksum = cs.Sum() - - // set base change set - w.Lock() - w.cs = cs - w.Unlock() - - // send update - w.ch <- cs -} - -func (w *watcher) run(wc cetcd.Watcher, ch cetcd.WatchChan) { - for { - select { - case rsp, ok := <-ch: - if !ok { - return - } - w.handle(rsp.Events) - case <-w.exit: - wc.Close() - return - } - } -} - -func (w *watcher) Next() (*source.ChangeSet, error) { - select { - case cs := <-w.ch: - return cs, nil - case <-w.exit: - return nil, errors.New("watcher stopped") - } -} - -func (w *watcher) Stop() error { - select { - case <-w.exit: - return nil - default: - close(w.exit) - } - return nil -} From 6d06ee8078bcaabcdd867a3b224efde0b9f37c82 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Tue, 11 Jun 2019 11:40:37 +0100 Subject: [PATCH 3/3] Update go.mod to strip etcd --- go.mod | 4 +--- go.sum | 8 ++------ 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index def78418..a9590f1d 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,6 @@ require ( github.com/beevik/ntp v0.2.0 github.com/bitly/go-simplejson v0.5.0 github.com/bwmarrin/discordgo v0.19.0 - github.com/coreos/etcd v3.3.13+incompatible github.com/forestgiant/sliceutil v0.0.0-20160425183142-94783f95db6c github.com/fsnotify/fsnotify v1.4.7 github.com/fsouza/go-dockerclient v1.4.1 @@ -36,9 +35,8 @@ require ( github.com/nlopes/slack v0.5.0 github.com/pkg/errors v0.8.1 github.com/technoweenie/multipartstreamer v1.0.1 // indirect - go.etcd.io/etcd v3.3.13+incompatible golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5 - golang.org/x/net v0.0.0-20190606173856-1492cefac77f + golang.org/x/net v0.0.0-20190607181551-461777fb6f67 google.golang.org/grpc v1.21.1 gopkg.in/go-playground/validator.v9 v9.29.0 gopkg.in/src-d/go-git.v4 v4.11.0 diff --git a/go.sum b/go.sum index 1e4d5b40..380c6df5 100644 --- a/go.sum +++ b/go.sum @@ -21,8 +21,6 @@ github.com/cheekybits/genny v1.0.0/go.mod h1:+tQajlRqAUrPI7DOSpB0XAqZYtQakVtB7wX github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/containerd/continuity v0.0.0-20181203112020-004b46473808 h1:4BX8f882bXEDKfWIf0wa8HRvpnBoPszJJXL+TVbBw4M= github.com/containerd/continuity v0.0.0-20181203112020-004b46473808/go.mod h1:GL3xCUCBDV3CZiTSEKksMWbLE66hEyuu9qyDOOqM47Y= -github.com/coreos/etcd v3.3.13+incompatible h1:8F3hqu9fGYLBifCmRCJsicFqDx/D68Rt3q1JMazcgBQ= -github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/docker/docker v0.7.3-0.20190309235953-33c3200e0d16 h1:dmUn0SuGx7unKFwxyeQ/oLUHhEfZosEDrpmYM+6MTuc= @@ -194,8 +192,6 @@ github.com/technoweenie/multipartstreamer v1.0.1 h1:XRztA5MXiR1TIRHxH2uNxXxaIkKQ github.com/technoweenie/multipartstreamer v1.0.1/go.mod h1:jNVxdtShOxzAsukZwTSw6MDx5eUJoiEBsSvzDU9uzog= github.com/xanzy/ssh-agent v0.2.0 h1:Adglfbi5p9Z0BmK2oKU9nTG+zKfniSfnaMYB+ULd+Ro= github.com/xanzy/ssh-agent v0.2.0/go.mod h1:0NyE30eGUDliuLEHJgYte/zncp2zdTStcOnWhgSqHD8= -go.etcd.io/etcd v3.3.13+incompatible h1:jCejD5EMnlGxFvcGRyEV4VGlENZc7oPQX6o0t7n3xbw= -go.etcd.io/etcd v3.3.13+incompatible/go.mod h1:yaeTdrJi5lOmYerz05bd8+V7KubZs8YSFZfzsF9A6aI= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181030102418-4d3f4d9ffa16/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= @@ -212,8 +208,8 @@ golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190606173856-1492cefac77f h1:IWHgpgFqnL5AhBUBZSgBdjl2vkQUEzcY+JNKWfcgAU0= -golang.org/x/net v0.0.0-20190606173856-1492cefac77f/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= +golang.org/x/net v0.0.0-20190607181551-461777fb6f67 h1:rJJxsykSlULwd2P2+pg/rtnwN2FrWp4IuCxOSyS0V00= +golang.org/x/net v0.0.0-20190607181551-461777fb6f67/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=