From af5d7a342091d56e7ebe71cac5992cc58e1b6014 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Thu, 3 Oct 2019 11:22:35 +0100 Subject: [PATCH] Move the remaining consul cruft to go-plugins --- config/source/consul/README.md | 49 ------- config/source/consul/consul.go | 126 ----------------- config/source/consul/util.go | 89 ------------ config/source/consul/watcher.go | 95 ------------- config/source/etcd/README.md | 51 +++++++ config/source/etcd/etcd.go | 141 +++++++++++++++++++ config/source/{consul => etcd}/options.go | 39 +++--- config/source/etcd/util.go | 89 ++++++++++++ config/source/etcd/watcher.go | 113 ++++++++++++++++ sync/leader/consul/consul.go | 158 ---------------------- sync/leader/etcd/etcd.go | 145 ++++++++++++++++++++ 11 files changed, 556 insertions(+), 539 deletions(-) delete mode 100644 config/source/consul/README.md delete mode 100644 config/source/consul/consul.go delete mode 100644 config/source/consul/util.go delete mode 100644 config/source/consul/watcher.go create mode 100644 config/source/etcd/README.md create mode 100644 config/source/etcd/etcd.go rename config/source/{consul => etcd}/options.go (61%) create mode 100644 config/source/etcd/util.go create mode 100644 config/source/etcd/watcher.go delete mode 100644 sync/leader/consul/consul.go create mode 100644 sync/leader/etcd/etcd.go diff --git a/config/source/consul/README.md b/config/source/consul/README.md deleted file mode 100644 index 28006fc4..00000000 --- a/config/source/consul/README.md +++ /dev/null @@ -1,49 +0,0 @@ -# Consul Source - -The consul source reads config from consul key/values - -## Consul Format - -The consul source expects keys under the default prefix `/micro/config` - -Values are expected to be json - -``` -// set database -consul kv put micro/config/database '{"address": "10.0.0.1", "port": 3306}' -// set cache -consul kv put micro/config/cache '{"address": "10.0.0.2", "port": 6379}' -``` - -Keys are split on `/` so access becomes - -``` -conf.Get("micro", "config", "database") -``` - -## New Source - -Specify source with data - -```go -consulSource := consul.NewSource( - // optionally specify consul address; default to localhost:8500 - consul.WithAddress("10.0.0.10:8500"), - // optionally specify prefix; defaults to /micro/config - consul.WithPrefix("/my/prefix"), - // optionally strip the provided prefix from the keys, defaults to false - consul.StripPrefix(true), -) -``` - -## Load Source - -Load the source into config - -```go -// Create new config -conf := config.NewConfig() - -// Load consul source -conf.Load(consulSource) -``` diff --git a/config/source/consul/consul.go b/config/source/consul/consul.go deleted file mode 100644 index f5c3c695..00000000 --- a/config/source/consul/consul.go +++ /dev/null @@ -1,126 +0,0 @@ -package consul - -import ( - "fmt" - "net" - "time" - - "github.com/hashicorp/consul/api" - "github.com/micro/go-micro/config/source" -) - -// Currently a single consul reader -type consul struct { - prefix string - stripPrefix string - addr string - opts source.Options - client *api.Client -} - -var ( - // DefaultPrefix is the prefix that consul keys will be assumed to have if you - // haven't specified one - DefaultPrefix = "/micro/config/" -) - -func (c *consul) Read() (*source.ChangeSet, error) { - kv, _, err := c.client.KV().List(c.prefix, nil) - if err != nil { - return nil, err - } - - if kv == nil || len(kv) == 0 { - return nil, fmt.Errorf("source not found: %s", c.prefix) - } - - data, err := makeMap(c.opts.Encoder, kv, c.stripPrefix) - if err != nil { - return nil, fmt.Errorf("error reading data: %v", err) - } - - b, err := c.opts.Encoder.Encode(data) - if err != nil { - return nil, fmt.Errorf("error reading source: %v", err) - } - - cs := &source.ChangeSet{ - Timestamp: time.Now(), - Format: c.opts.Encoder.String(), - Source: c.String(), - Data: b, - } - cs.Checksum = cs.Sum() - - return cs, nil -} - -func (c *consul) String() string { - return "consul" -} - -func (c *consul) Watch() (source.Watcher, error) { - w, err := newWatcher(c.prefix, c.addr, c.String(), c.stripPrefix, c.opts.Encoder) - if err != nil { - return nil, err - } - return w, nil -} - -// NewSource creates a new consul source -func NewSource(opts ...source.Option) source.Source { - options := source.NewOptions(opts...) - - // use default config - config := api.DefaultConfig() - - // use the consul config passed in the options if any - if co, ok := options.Context.Value(configKey{}).(*api.Config); ok { - config = co - } - - // check if there are any addrs - a, ok := options.Context.Value(addressKey{}).(string) - if ok { - addr, port, err := net.SplitHostPort(a) - if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" { - port = "8500" - addr = a - config.Address = fmt.Sprintf("%s:%s", addr, port) - } else if err == nil { - config.Address = fmt.Sprintf("%s:%s", addr, port) - } - } - - dc, ok := options.Context.Value(dcKey{}).(string) - if ok { - config.Datacenter = dc - } - - token, ok := options.Context.Value(tokenKey{}).(string) - if ok { - config.Token = token - } - - // create the client - client, _ := api.NewClient(config) - - prefix := DefaultPrefix - sp := "" - f, ok := options.Context.Value(prefixKey{}).(string) - if ok { - prefix = f - } - - if b, ok := options.Context.Value(stripPrefixKey{}).(bool); ok && b { - sp = prefix - } - - return &consul{ - prefix: prefix, - stripPrefix: sp, - addr: config.Address, - opts: options, - client: client, - } -} diff --git a/config/source/consul/util.go b/config/source/consul/util.go deleted file mode 100644 index 1deacbb4..00000000 --- a/config/source/consul/util.go +++ /dev/null @@ -1,89 +0,0 @@ -package consul - -import ( - "fmt" - "strings" - - "github.com/hashicorp/consul/api" - "github.com/micro/go-micro/config/encoder" -) - -type configValue interface { - Value() interface{} - Decode(encoder.Encoder, []byte) error -} -type configArrayValue struct { - v []interface{} -} - -func (a *configArrayValue) Value() interface{} { return a.v } -func (a *configArrayValue) Decode(e encoder.Encoder, b []byte) error { - return e.Decode(b, &a.v) -} - -type configMapValue struct { - v map[string]interface{} -} - -func (m *configMapValue) Value() interface{} { return m.v } -func (m *configMapValue) Decode(e encoder.Encoder, b []byte) error { - return e.Decode(b, &m.v) -} - -func makeMap(e encoder.Encoder, kv api.KVPairs, stripPrefix string) (map[string]interface{}, error) { - - data := make(map[string]interface{}) - - // consul guarantees lexicographic order, so no need to sort - for _, v := range kv { - pathString := strings.TrimPrefix(strings.TrimPrefix(v.Key, strings.TrimPrefix(stripPrefix, "/")), "/") - if pathString == "" { - continue - } - var val configValue - var err error - - // ensure a valid value is stored at this location - if len(v.Value) > 0 { - // try to decode into map value or array value - arrayV := &configArrayValue{v: []interface{}{}} - mapV := &configMapValue{v: map[string]interface{}{}} - switch { - case arrayV.Decode(e, v.Value) == nil: - val = arrayV - case mapV.Decode(e, v.Value) == nil: - val = mapV - default: - return nil, fmt.Errorf("faild decode value. path: %s, error: %s", pathString, err) - } - } - - // set target at the root - target := data - path := strings.Split(pathString, "/") - // find (or create) the leaf node we want to put this value at - for _, dir := range path[:len(path)-1] { - if _, ok := target[dir]; !ok { - target[dir] = make(map[string]interface{}) - } - target = target[dir].(map[string]interface{}) - } - - leafDir := path[len(path)-1] - - // copy over the keys from the value - switch val.(type) { - case *configArrayValue: - target[leafDir] = val.Value() - case *configMapValue: - target[leafDir] = make(map[string]interface{}) - target = target[leafDir].(map[string]interface{}) - mapv := val.Value().(map[string]interface{}) - for k := range mapv { - target[k] = mapv[k] - } - } - } - - return data, nil -} diff --git a/config/source/consul/watcher.go b/config/source/consul/watcher.go deleted file mode 100644 index a20c8f9b..00000000 --- a/config/source/consul/watcher.go +++ /dev/null @@ -1,95 +0,0 @@ -package consul - -import ( - "time" - - "github.com/hashicorp/consul/api" - "github.com/hashicorp/consul/api/watch" - "github.com/micro/go-micro/config/encoder" - "github.com/micro/go-micro/config/source" -) - -type watcher struct { - e encoder.Encoder - name string - stripPrefix string - - wp *watch.Plan - ch chan *source.ChangeSet - exit chan bool -} - -func newWatcher(key, addr, name, stripPrefix string, e encoder.Encoder) (source.Watcher, error) { - w := &watcher{ - e: e, - name: name, - stripPrefix: stripPrefix, - ch: make(chan *source.ChangeSet), - exit: make(chan bool), - } - - wp, err := watch.Parse(map[string]interface{}{"type": "keyprefix", "prefix": key}) - if err != nil { - return nil, err - } - - wp.Handler = w.handle - - // wp.Run is a blocking call and will prevent newWatcher from returning - go wp.Run(addr) - - w.wp = wp - - return w, nil -} - -func (w *watcher) handle(idx uint64, data interface{}) { - if data == nil { - return - } - - kvs, ok := data.(api.KVPairs) - if !ok { - return - } - - d, err := makeMap(w.e, kvs, w.stripPrefix) - if err != nil { - return - } - - b, err := w.e.Encode(d) - if err != nil { - return - } - - cs := &source.ChangeSet{ - Timestamp: time.Now(), - Format: w.e.String(), - Source: w.name, - Data: b, - } - cs.Checksum = cs.Sum() - - w.ch <- cs -} - -func (w *watcher) Next() (*source.ChangeSet, error) { - select { - case cs := <-w.ch: - return cs, nil - case <-w.exit: - return nil, source.ErrWatcherStopped - } -} - -func (w *watcher) Stop() error { - select { - case <-w.exit: - return nil - default: - w.wp.Stop() - close(w.exit) - } - return nil -} diff --git a/config/source/etcd/README.md b/config/source/etcd/README.md new file mode 100644 index 00000000..a3025ad4 --- /dev/null +++ b/config/source/etcd/README.md @@ -0,0 +1,51 @@ +# Etcd Source + +The etcd source reads config from etcd key/values + +This source supports etcd version 3 and beyond. + +## Etcd Format + +The etcd source expects keys under the default prefix `/micro/config` (prefix can be changed) + +Values are expected to be JSON + +``` +// set database +etcdctl put /micro/config/database '{"address": "10.0.0.1", "port": 3306}' +// set cache +etcdctl put /micro/config/cache '{"address": "10.0.0.2", "port": 6379}' +``` + +Keys are split on `/` so access becomes + +``` +conf.Get("micro", "config", "database") +``` + +## New Source + +Specify source with data + +```go +etcdSource := etcd.NewSource( + // optionally specify etcd address; default to localhost:8500 + etcd.WithAddress("10.0.0.10:8500"), + // optionally specify prefix; defaults to /micro/config + etcd.WithPrefix("/my/prefix"), + // optionally strip the provided prefix from the keys, defaults to false + etcd.StripPrefix(true), +) +``` + +## Load Source + +Load the source into config + +```go +// Create new config +conf := config.NewConfig() + +// Load file source +conf.Load(etcdSource) +``` diff --git a/config/source/etcd/etcd.go b/config/source/etcd/etcd.go new file mode 100644 index 00000000..5cbe1286 --- /dev/null +++ b/config/source/etcd/etcd.go @@ -0,0 +1,141 @@ +package etcd + +import ( + "context" + "fmt" + "net" + "time" + + cetcd "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/mvcc/mvccpb" + "github.com/micro/go-micro/config/source" +) + +// Currently a single etcd reader +type etcd struct { + prefix string + stripPrefix string + opts source.Options + client *cetcd.Client + cerr error +} + +var ( + DefaultPrefix = "/micro/config/" +) + +func (c *etcd) Read() (*source.ChangeSet, error) { + if c.cerr != nil { + return nil, c.cerr + } + + rsp, err := c.client.Get(context.Background(), c.prefix, cetcd.WithPrefix()) + if err != nil { + return nil, err + } + + if rsp == nil || len(rsp.Kvs) == 0 { + return nil, fmt.Errorf("source not found: %s", c.prefix) + } + + var kvs []*mvccpb.KeyValue + for _, v := range rsp.Kvs { + kvs = append(kvs, (*mvccpb.KeyValue)(v)) + } + + data := makeMap(c.opts.Encoder, kvs, c.stripPrefix) + + b, err := c.opts.Encoder.Encode(data) + if err != nil { + return nil, fmt.Errorf("error reading source: %v", err) + } + + cs := &source.ChangeSet{ + Timestamp: time.Now(), + Source: c.String(), + Data: b, + Format: c.opts.Encoder.String(), + } + cs.Checksum = cs.Sum() + + return cs, nil +} + +func (c *etcd) String() string { + return "etcd" +} + +func (c *etcd) Watch() (source.Watcher, error) { + if c.cerr != nil { + return nil, c.cerr + } + cs, err := c.Read() + if err != nil { + return nil, err + } + return newWatcher(c.prefix, c.stripPrefix, c.client.Watcher, cs, c.opts) +} + +func NewSource(opts ...source.Option) source.Source { + options := source.NewOptions(opts...) + + var endpoints []string + + // check if there are any addrs + addrs, ok := options.Context.Value(addressKey{}).([]string) + if ok { + for _, a := range addrs { + addr, port, err := net.SplitHostPort(a) + if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" { + port = "2379" + addr = a + endpoints = append(endpoints, fmt.Sprintf("%s:%s", addr, port)) + } else if err == nil { + endpoints = append(endpoints, fmt.Sprintf("%s:%s", addr, port)) + } + } + } + + if len(endpoints) == 0 { + endpoints = []string{"localhost:2379"} + } + + // check dial timeout option + dialTimeout, ok := options.Context.Value(dialTimeoutKey{}).(time.Duration) + if !ok { + dialTimeout = 3 * time.Second // default dial timeout + } + + config := cetcd.Config{ + Endpoints: endpoints, + DialTimeout: dialTimeout, + } + + u, ok := options.Context.Value(authKey{}).(*authCreds) + if ok { + config.Username = u.Username + config.Password = u.Password + } + + // use default config + client, err := cetcd.New(config) + + prefix := DefaultPrefix + sp := "" + f, ok := options.Context.Value(prefixKey{}).(string) + if ok { + prefix = f + } + + if b, ok := options.Context.Value(stripPrefixKey{}).(bool); ok && b { + sp = prefix + } + + return &etcd{ + prefix: prefix, + stripPrefix: sp, + opts: options, + client: client, + cerr: err, + } +} diff --git a/config/source/consul/options.go b/config/source/etcd/options.go similarity index 61% rename from config/source/consul/options.go rename to config/source/etcd/options.go index 8bca6a66..325d2f46 100644 --- a/config/source/consul/options.go +++ b/config/source/etcd/options.go @@ -1,21 +1,25 @@ -package consul +package etcd import ( "context" + "time" - "github.com/hashicorp/consul/api" "github.com/micro/go-micro/config/source" ) type addressKey struct{} type prefixKey struct{} type stripPrefixKey struct{} -type dcKey struct{} -type tokenKey struct{} -type configKey struct{} +type authKey struct{} +type dialTimeoutKey struct{} -// WithAddress sets the consul address -func WithAddress(a string) source.Option { +type authCreds struct { + Username string + Password string +} + +// WithAddress sets the etcd address +func WithAddress(a ...string) source.Option { return func(o *source.Options) { if o.Context == nil { o.Context = context.Background() @@ -45,31 +49,22 @@ func StripPrefix(strip bool) source.Option { } } -func WithDatacenter(p string) source.Option { +// Auth allows you to specify username/password +func Auth(username, password string) source.Option { return func(o *source.Options) { if o.Context == nil { o.Context = context.Background() } - o.Context = context.WithValue(o.Context, dcKey{}, p) + o.Context = context.WithValue(o.Context, authKey{}, &authCreds{Username: username, Password: password}) } } -// WithToken sets the key token to use -func WithToken(p string) source.Option { +// WithDialTimeout set the time out for dialing to etcd +func WithDialTimeout(timeout time.Duration) source.Option { return func(o *source.Options) { if o.Context == nil { o.Context = context.Background() } - o.Context = context.WithValue(o.Context, tokenKey{}, p) - } -} - -// WithConfig set consul-specific options -func WithConfig(c *api.Config) source.Option { - return func(o *source.Options) { - if o.Context == nil { - o.Context = context.Background() - } - o.Context = context.WithValue(o.Context, configKey{}, c) + o.Context = context.WithValue(o.Context, dialTimeoutKey{}, timeout) } } diff --git a/config/source/etcd/util.go b/config/source/etcd/util.go new file mode 100644 index 00000000..31887fc9 --- /dev/null +++ b/config/source/etcd/util.go @@ -0,0 +1,89 @@ +package etcd + +import ( + "strings" + + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/mvcc/mvccpb" + "github.com/micro/go-micro/config/encoder" +) + +func makeEvMap(e encoder.Encoder, data map[string]interface{}, kv []*clientv3.Event, stripPrefix string) map[string]interface{} { + if data == nil { + data = make(map[string]interface{}) + } + + for _, v := range kv { + switch mvccpb.Event_EventType(v.Type) { + case mvccpb.DELETE: + data = update(e, data, (*mvccpb.KeyValue)(v.Kv), "delete", stripPrefix) + default: + data = update(e, data, (*mvccpb.KeyValue)(v.Kv), "insert", stripPrefix) + } + } + + return data +} + +func makeMap(e encoder.Encoder, kv []*mvccpb.KeyValue, stripPrefix string) map[string]interface{} { + data := make(map[string]interface{}) + + for _, v := range kv { + data = update(e, data, v, "put", stripPrefix) + } + + return data +} + +func update(e encoder.Encoder, data map[string]interface{}, v *mvccpb.KeyValue, action, stripPrefix string) map[string]interface{} { + // remove prefix if non empty, and ensure leading / is removed as well + vkey := strings.TrimPrefix(strings.TrimPrefix(string(v.Key), stripPrefix), "/") + // split on prefix + haveSplit := strings.Contains(vkey, "/") + keys := strings.Split(vkey, "/") + + var vals interface{} + e.Decode(v.Value, &vals) + + if !haveSplit && len(keys) == 1 { + switch action { + case "delete": + data = make(map[string]interface{}) + default: + v, ok := vals.(map[string]interface{}) + if ok { + data = v + } + } + return data + } + + // set data for first iteration + kvals := data + // iterate the keys and make maps + for i, k := range keys { + kval, ok := kvals[k].(map[string]interface{}) + if !ok { + // create next map + kval = make(map[string]interface{}) + // set it + kvals[k] = kval + } + + // last key: write vals + if l := len(keys) - 1; i == l { + switch action { + case "delete": + delete(kvals, k) + default: + kvals[k] = vals + } + break + } + + // set kvals for next iterator + kvals = kval + } + + return data +} diff --git a/config/source/etcd/watcher.go b/config/source/etcd/watcher.go new file mode 100644 index 00000000..2f9b3189 --- /dev/null +++ b/config/source/etcd/watcher.go @@ -0,0 +1,113 @@ +package etcd + +import ( + "context" + "errors" + "sync" + "time" + + cetcd "github.com/coreos/etcd/clientv3" + "github.com/micro/go-micro/config/source" +) + +type watcher struct { + opts source.Options + name string + stripPrefix string + + sync.RWMutex + cs *source.ChangeSet + + ch chan *source.ChangeSet + exit chan bool +} + +func newWatcher(key, strip string, wc cetcd.Watcher, cs *source.ChangeSet, opts source.Options) (source.Watcher, error) { + w := &watcher{ + opts: opts, + name: "etcd", + stripPrefix: strip, + cs: cs, + ch: make(chan *source.ChangeSet), + exit: make(chan bool), + } + + ch := wc.Watch(context.Background(), key, cetcd.WithPrefix()) + + go w.run(wc, ch) + + return w, nil +} + +func (w *watcher) handle(evs []*cetcd.Event) { + w.RLock() + data := w.cs.Data + w.RUnlock() + + var vals map[string]interface{} + + // unpackage existing changeset + if err := w.opts.Encoder.Decode(data, &vals); err != nil { + return + } + + // update base changeset + d := makeEvMap(w.opts.Encoder, vals, evs, w.stripPrefix) + + // pack the changeset + b, err := w.opts.Encoder.Encode(d) + if err != nil { + return + } + + // create new changeset + cs := &source.ChangeSet{ + Timestamp: time.Now(), + Source: w.name, + Data: b, + Format: w.opts.Encoder.String(), + } + cs.Checksum = cs.Sum() + + // set base change set + w.Lock() + w.cs = cs + w.Unlock() + + // send update + w.ch <- cs +} + +func (w *watcher) run(wc cetcd.Watcher, ch cetcd.WatchChan) { + for { + select { + case rsp, ok := <-ch: + if !ok { + return + } + w.handle(rsp.Events) + case <-w.exit: + wc.Close() + return + } + } +} + +func (w *watcher) Next() (*source.ChangeSet, error) { + select { + case cs := <-w.ch: + return cs, nil + case <-w.exit: + return nil, errors.New("watcher stopped") + } +} + +func (w *watcher) Stop() error { + select { + case <-w.exit: + return nil + default: + close(w.exit) + } + return nil +} diff --git a/sync/leader/consul/consul.go b/sync/leader/consul/consul.go deleted file mode 100644 index b5b344c6..00000000 --- a/sync/leader/consul/consul.go +++ /dev/null @@ -1,158 +0,0 @@ -package consul - -import ( - "fmt" - "log" - "net" - "os" - "path" - "sync" - - "github.com/hashicorp/consul/api" - "github.com/hashicorp/consul/api/watch" - "github.com/micro/go-micro/sync/leader" -) - -type consulLeader struct { - opts leader.Options - c *api.Client -} - -type consulElected struct { - c *api.Client - l *api.Lock - id string - key string - opts leader.ElectOptions - - mtx sync.RWMutex - rv <-chan struct{} -} - -func (c *consulLeader) Elect(id string, opts ...leader.ElectOption) (leader.Elected, error) { - var options leader.ElectOptions - for _, o := range opts { - o(&options) - } - - key := path.Join("micro/leader", c.opts.Group) - - lc, err := c.c.LockOpts(&api.LockOptions{ - Key: key, - Value: []byte(id), - }) - if err != nil { - return nil, err - } - - rv, err := lc.Lock(nil) - if err != nil { - return nil, err - } - - return &consulElected{ - c: c.c, - key: key, - rv: rv, - id: id, - l: lc, - opts: options, - }, nil -} - -func (c *consulLeader) Follow() chan string { - ch := make(chan string, 1) - - key := path.Join("/micro/leader", c.opts.Group) - - p, err := watch.Parse(map[string]interface{}{ - "type": "key", - "key": key, - }) - if err != nil { - return ch - } - p.Handler = func(idx uint64, raw interface{}) { - if raw == nil { - return // ignore - } - v, ok := raw.(*api.KVPair) - if !ok || v == nil { - return // ignore - } - ch <- string(v.Value) - } - - go p.RunWithClientAndLogger(c.c, log.New(os.Stdout, "consul: ", log.Lshortfile)) - return ch -} - -func (c *consulLeader) String() string { - return "consul" -} - -func (c *consulElected) Id() string { - return c.id -} - -func (c *consulElected) Reelect() error { - rv, err := c.l.Lock(nil) - if err != nil { - return err - } - - c.mtx.Lock() - c.rv = rv - c.mtx.Unlock() - return nil -} - -func (c *consulElected) Revoked() chan bool { - ch := make(chan bool, 1) - c.mtx.RLock() - rv := c.rv - c.mtx.RUnlock() - - go func() { - <-rv - ch <- true - close(ch) - }() - - return ch -} - -func (c *consulElected) Resign() error { - return c.l.Unlock() -} - -func NewLeader(opts ...leader.Option) leader.Leader { - options := leader.Options{ - Group: "default", - } - for _, o := range opts { - o(&options) - } - - config := api.DefaultConfig() - - // set host - // config.Host something - // check if there are any addrs - if len(options.Nodes) > 0 { - addr, port, err := net.SplitHostPort(options.Nodes[0]) - if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" { - port = "8500" - config.Address = fmt.Sprintf("%s:%s", addr, port) - } else if err == nil { - config.Address = fmt.Sprintf("%s:%s", addr, port) - } - } - - client, _ := api.NewClient(config) - - return &consulLeader{ - opts: options, - c: client, - } -} diff --git a/sync/leader/etcd/etcd.go b/sync/leader/etcd/etcd.go new file mode 100644 index 00000000..6179bf0b --- /dev/null +++ b/sync/leader/etcd/etcd.go @@ -0,0 +1,145 @@ +package etcd + +import ( + "context" + "log" + "path" + "strings" + + client "github.com/coreos/etcd/clientv3" + cc "github.com/coreos/etcd/clientv3/concurrency" + "github.com/micro/go-micro/sync/leader" +) + +type etcdLeader struct { + opts leader.Options + path string + client *client.Client +} + +type etcdElected struct { + s *cc.Session + e *cc.Election + id string +} + +func (e *etcdLeader) Elect(id string, opts ...leader.ElectOption) (leader.Elected, error) { + var options leader.ElectOptions + for _, o := range opts { + o(&options) + } + + // make path + path := path.Join(e.path, strings.Replace(id, "/", "-", -1)) + + s, err := cc.NewSession(e.client) + if err != nil { + return nil, err + } + + l := cc.NewElection(s, path) + + ctx, _ := context.WithCancel(context.Background()) + + if err := l.Campaign(ctx, id); err != nil { + return nil, err + } + + return &etcdElected{ + e: l, + id: id, + }, nil +} + +func (e *etcdLeader) Follow() chan string { + ch := make(chan string) + + s, err := cc.NewSession(e.client) + if err != nil { + return ch + } + + l := cc.NewElection(s, e.path) + ech := l.Observe(context.Background()) + + go func() { + for { + select { + case r, ok := <-ech: + if !ok { + return + } + ch <- string(r.Kvs[0].Value) + } + } + }() + + return ch +} + +func (e *etcdLeader) String() string { + return "etcd" +} + +func (e *etcdElected) Reelect() error { + ctx, _ := context.WithCancel(context.Background()) + return e.e.Campaign(ctx, e.id) +} + +func (e *etcdElected) Revoked() chan bool { + ch := make(chan bool, 1) + ech := e.e.Observe(context.Background()) + + go func() { + for r := range ech { + if string(r.Kvs[0].Value) != e.id { + ch <- true + close(ch) + return + } + } + }() + + return ch +} + +func (e *etcdElected) Resign() error { + return e.e.Resign(context.Background()) +} + +func (e *etcdElected) Id() string { + return e.id +} + +func NewLeader(opts ...leader.Option) leader.Leader { + var options leader.Options + for _, o := range opts { + o(&options) + } + + var endpoints []string + + for _, addr := range options.Nodes { + if len(addr) > 0 { + endpoints = append(endpoints, addr) + } + } + + if len(endpoints) == 0 { + endpoints = []string{"http://127.0.0.1:2379"} + } + + // TODO: parse addresses + c, err := client.New(client.Config{ + Endpoints: endpoints, + }) + if err != nil { + log.Fatal(err) + } + + return &etcdLeader{ + path: "/micro/leader", + client: c, + opts: options, + } +}