Move the remaining consul cruft to go-plugins

This commit is contained in:
Asim Aslam 2019-10-03 11:22:35 +01:00
parent b5f33b2aaa
commit af5d7a3420
11 changed files with 556 additions and 539 deletions

View File

@ -1,49 +0,0 @@
# Consul Source
The consul source reads config from consul key/values
## Consul Format
The consul source expects keys under the default prefix `/micro/config`
Values are expected to be json
```
// set database
consul kv put micro/config/database '{"address": "10.0.0.1", "port": 3306}'
// set cache
consul kv put micro/config/cache '{"address": "10.0.0.2", "port": 6379}'
```
Keys are split on `/` so access becomes
```
conf.Get("micro", "config", "database")
```
## New Source
Specify source with data
```go
consulSource := consul.NewSource(
// optionally specify consul address; default to localhost:8500
consul.WithAddress("10.0.0.10:8500"),
// optionally specify prefix; defaults to /micro/config
consul.WithPrefix("/my/prefix"),
// optionally strip the provided prefix from the keys, defaults to false
consul.StripPrefix(true),
)
```
## Load Source
Load the source into config
```go
// Create new config
conf := config.NewConfig()
// Load consul source
conf.Load(consulSource)
```

View File

@ -1,126 +0,0 @@
package consul
import (
"fmt"
"net"
"time"
"github.com/hashicorp/consul/api"
"github.com/micro/go-micro/config/source"
)
// Currently a single consul reader
type consul struct {
prefix string
stripPrefix string
addr string
opts source.Options
client *api.Client
}
var (
// DefaultPrefix is the prefix that consul keys will be assumed to have if you
// haven't specified one
DefaultPrefix = "/micro/config/"
)
func (c *consul) Read() (*source.ChangeSet, error) {
kv, _, err := c.client.KV().List(c.prefix, nil)
if err != nil {
return nil, err
}
if kv == nil || len(kv) == 0 {
return nil, fmt.Errorf("source not found: %s", c.prefix)
}
data, err := makeMap(c.opts.Encoder, kv, c.stripPrefix)
if err != nil {
return nil, fmt.Errorf("error reading data: %v", err)
}
b, err := c.opts.Encoder.Encode(data)
if err != nil {
return nil, fmt.Errorf("error reading source: %v", err)
}
cs := &source.ChangeSet{
Timestamp: time.Now(),
Format: c.opts.Encoder.String(),
Source: c.String(),
Data: b,
}
cs.Checksum = cs.Sum()
return cs, nil
}
func (c *consul) String() string {
return "consul"
}
func (c *consul) Watch() (source.Watcher, error) {
w, err := newWatcher(c.prefix, c.addr, c.String(), c.stripPrefix, c.opts.Encoder)
if err != nil {
return nil, err
}
return w, nil
}
// NewSource creates a new consul source
func NewSource(opts ...source.Option) source.Source {
options := source.NewOptions(opts...)
// use default config
config := api.DefaultConfig()
// use the consul config passed in the options if any
if co, ok := options.Context.Value(configKey{}).(*api.Config); ok {
config = co
}
// check if there are any addrs
a, ok := options.Context.Value(addressKey{}).(string)
if ok {
addr, port, err := net.SplitHostPort(a)
if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
port = "8500"
addr = a
config.Address = fmt.Sprintf("%s:%s", addr, port)
} else if err == nil {
config.Address = fmt.Sprintf("%s:%s", addr, port)
}
}
dc, ok := options.Context.Value(dcKey{}).(string)
if ok {
config.Datacenter = dc
}
token, ok := options.Context.Value(tokenKey{}).(string)
if ok {
config.Token = token
}
// create the client
client, _ := api.NewClient(config)
prefix := DefaultPrefix
sp := ""
f, ok := options.Context.Value(prefixKey{}).(string)
if ok {
prefix = f
}
if b, ok := options.Context.Value(stripPrefixKey{}).(bool); ok && b {
sp = prefix
}
return &consul{
prefix: prefix,
stripPrefix: sp,
addr: config.Address,
opts: options,
client: client,
}
}

View File

@ -1,89 +0,0 @@
package consul
import (
"fmt"
"strings"
"github.com/hashicorp/consul/api"
"github.com/micro/go-micro/config/encoder"
)
type configValue interface {
Value() interface{}
Decode(encoder.Encoder, []byte) error
}
type configArrayValue struct {
v []interface{}
}
func (a *configArrayValue) Value() interface{} { return a.v }
func (a *configArrayValue) Decode(e encoder.Encoder, b []byte) error {
return e.Decode(b, &a.v)
}
type configMapValue struct {
v map[string]interface{}
}
func (m *configMapValue) Value() interface{} { return m.v }
func (m *configMapValue) Decode(e encoder.Encoder, b []byte) error {
return e.Decode(b, &m.v)
}
func makeMap(e encoder.Encoder, kv api.KVPairs, stripPrefix string) (map[string]interface{}, error) {
data := make(map[string]interface{})
// consul guarantees lexicographic order, so no need to sort
for _, v := range kv {
pathString := strings.TrimPrefix(strings.TrimPrefix(v.Key, strings.TrimPrefix(stripPrefix, "/")), "/")
if pathString == "" {
continue
}
var val configValue
var err error
// ensure a valid value is stored at this location
if len(v.Value) > 0 {
// try to decode into map value or array value
arrayV := &configArrayValue{v: []interface{}{}}
mapV := &configMapValue{v: map[string]interface{}{}}
switch {
case arrayV.Decode(e, v.Value) == nil:
val = arrayV
case mapV.Decode(e, v.Value) == nil:
val = mapV
default:
return nil, fmt.Errorf("faild decode value. path: %s, error: %s", pathString, err)
}
}
// set target at the root
target := data
path := strings.Split(pathString, "/")
// find (or create) the leaf node we want to put this value at
for _, dir := range path[:len(path)-1] {
if _, ok := target[dir]; !ok {
target[dir] = make(map[string]interface{})
}
target = target[dir].(map[string]interface{})
}
leafDir := path[len(path)-1]
// copy over the keys from the value
switch val.(type) {
case *configArrayValue:
target[leafDir] = val.Value()
case *configMapValue:
target[leafDir] = make(map[string]interface{})
target = target[leafDir].(map[string]interface{})
mapv := val.Value().(map[string]interface{})
for k := range mapv {
target[k] = mapv[k]
}
}
}
return data, nil
}

View File

@ -1,95 +0,0 @@
package consul
import (
"time"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/api/watch"
"github.com/micro/go-micro/config/encoder"
"github.com/micro/go-micro/config/source"
)
type watcher struct {
e encoder.Encoder
name string
stripPrefix string
wp *watch.Plan
ch chan *source.ChangeSet
exit chan bool
}
func newWatcher(key, addr, name, stripPrefix string, e encoder.Encoder) (source.Watcher, error) {
w := &watcher{
e: e,
name: name,
stripPrefix: stripPrefix,
ch: make(chan *source.ChangeSet),
exit: make(chan bool),
}
wp, err := watch.Parse(map[string]interface{}{"type": "keyprefix", "prefix": key})
if err != nil {
return nil, err
}
wp.Handler = w.handle
// wp.Run is a blocking call and will prevent newWatcher from returning
go wp.Run(addr)
w.wp = wp
return w, nil
}
func (w *watcher) handle(idx uint64, data interface{}) {
if data == nil {
return
}
kvs, ok := data.(api.KVPairs)
if !ok {
return
}
d, err := makeMap(w.e, kvs, w.stripPrefix)
if err != nil {
return
}
b, err := w.e.Encode(d)
if err != nil {
return
}
cs := &source.ChangeSet{
Timestamp: time.Now(),
Format: w.e.String(),
Source: w.name,
Data: b,
}
cs.Checksum = cs.Sum()
w.ch <- cs
}
func (w *watcher) Next() (*source.ChangeSet, error) {
select {
case cs := <-w.ch:
return cs, nil
case <-w.exit:
return nil, source.ErrWatcherStopped
}
}
func (w *watcher) Stop() error {
select {
case <-w.exit:
return nil
default:
w.wp.Stop()
close(w.exit)
}
return nil
}

View File

@ -0,0 +1,51 @@
# Etcd Source
The etcd source reads config from etcd key/values
This source supports etcd version 3 and beyond.
## Etcd Format
The etcd source expects keys under the default prefix `/micro/config` (prefix can be changed)
Values are expected to be JSON
```
// set database
etcdctl put /micro/config/database '{"address": "10.0.0.1", "port": 3306}'
// set cache
etcdctl put /micro/config/cache '{"address": "10.0.0.2", "port": 6379}'
```
Keys are split on `/` so access becomes
```
conf.Get("micro", "config", "database")
```
## New Source
Specify source with data
```go
etcdSource := etcd.NewSource(
// optionally specify etcd address; default to localhost:8500
etcd.WithAddress("10.0.0.10:8500"),
// optionally specify prefix; defaults to /micro/config
etcd.WithPrefix("/my/prefix"),
// optionally strip the provided prefix from the keys, defaults to false
etcd.StripPrefix(true),
)
```
## Load Source
Load the source into config
```go
// Create new config
conf := config.NewConfig()
// Load file source
conf.Load(etcdSource)
```

141
config/source/etcd/etcd.go Normal file
View File

@ -0,0 +1,141 @@
package etcd
import (
"context"
"fmt"
"net"
"time"
cetcd "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/micro/go-micro/config/source"
)
// Currently a single etcd reader
type etcd struct {
prefix string
stripPrefix string
opts source.Options
client *cetcd.Client
cerr error
}
var (
DefaultPrefix = "/micro/config/"
)
func (c *etcd) Read() (*source.ChangeSet, error) {
if c.cerr != nil {
return nil, c.cerr
}
rsp, err := c.client.Get(context.Background(), c.prefix, cetcd.WithPrefix())
if err != nil {
return nil, err
}
if rsp == nil || len(rsp.Kvs) == 0 {
return nil, fmt.Errorf("source not found: %s", c.prefix)
}
var kvs []*mvccpb.KeyValue
for _, v := range rsp.Kvs {
kvs = append(kvs, (*mvccpb.KeyValue)(v))
}
data := makeMap(c.opts.Encoder, kvs, c.stripPrefix)
b, err := c.opts.Encoder.Encode(data)
if err != nil {
return nil, fmt.Errorf("error reading source: %v", err)
}
cs := &source.ChangeSet{
Timestamp: time.Now(),
Source: c.String(),
Data: b,
Format: c.opts.Encoder.String(),
}
cs.Checksum = cs.Sum()
return cs, nil
}
func (c *etcd) String() string {
return "etcd"
}
func (c *etcd) Watch() (source.Watcher, error) {
if c.cerr != nil {
return nil, c.cerr
}
cs, err := c.Read()
if err != nil {
return nil, err
}
return newWatcher(c.prefix, c.stripPrefix, c.client.Watcher, cs, c.opts)
}
func NewSource(opts ...source.Option) source.Source {
options := source.NewOptions(opts...)
var endpoints []string
// check if there are any addrs
addrs, ok := options.Context.Value(addressKey{}).([]string)
if ok {
for _, a := range addrs {
addr, port, err := net.SplitHostPort(a)
if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
port = "2379"
addr = a
endpoints = append(endpoints, fmt.Sprintf("%s:%s", addr, port))
} else if err == nil {
endpoints = append(endpoints, fmt.Sprintf("%s:%s", addr, port))
}
}
}
if len(endpoints) == 0 {
endpoints = []string{"localhost:2379"}
}
// check dial timeout option
dialTimeout, ok := options.Context.Value(dialTimeoutKey{}).(time.Duration)
if !ok {
dialTimeout = 3 * time.Second // default dial timeout
}
config := cetcd.Config{
Endpoints: endpoints,
DialTimeout: dialTimeout,
}
u, ok := options.Context.Value(authKey{}).(*authCreds)
if ok {
config.Username = u.Username
config.Password = u.Password
}
// use default config
client, err := cetcd.New(config)
prefix := DefaultPrefix
sp := ""
f, ok := options.Context.Value(prefixKey{}).(string)
if ok {
prefix = f
}
if b, ok := options.Context.Value(stripPrefixKey{}).(bool); ok && b {
sp = prefix
}
return &etcd{
prefix: prefix,
stripPrefix: sp,
opts: options,
client: client,
cerr: err,
}
}

View File

@ -1,21 +1,25 @@
package consul package etcd
import ( import (
"context" "context"
"time"
"github.com/hashicorp/consul/api"
"github.com/micro/go-micro/config/source" "github.com/micro/go-micro/config/source"
) )
type addressKey struct{} type addressKey struct{}
type prefixKey struct{} type prefixKey struct{}
type stripPrefixKey struct{} type stripPrefixKey struct{}
type dcKey struct{} type authKey struct{}
type tokenKey struct{} type dialTimeoutKey struct{}
type configKey struct{}
// WithAddress sets the consul address type authCreds struct {
func WithAddress(a string) source.Option { Username string
Password string
}
// WithAddress sets the etcd address
func WithAddress(a ...string) source.Option {
return func(o *source.Options) { return func(o *source.Options) {
if o.Context == nil { if o.Context == nil {
o.Context = context.Background() o.Context = context.Background()
@ -45,31 +49,22 @@ func StripPrefix(strip bool) source.Option {
} }
} }
func WithDatacenter(p string) source.Option { // Auth allows you to specify username/password
func Auth(username, password string) source.Option {
return func(o *source.Options) { return func(o *source.Options) {
if o.Context == nil { if o.Context == nil {
o.Context = context.Background() o.Context = context.Background()
} }
o.Context = context.WithValue(o.Context, dcKey{}, p) o.Context = context.WithValue(o.Context, authKey{}, &authCreds{Username: username, Password: password})
} }
} }
// WithToken sets the key token to use // WithDialTimeout set the time out for dialing to etcd
func WithToken(p string) source.Option { func WithDialTimeout(timeout time.Duration) source.Option {
return func(o *source.Options) { return func(o *source.Options) {
if o.Context == nil { if o.Context == nil {
o.Context = context.Background() o.Context = context.Background()
} }
o.Context = context.WithValue(o.Context, tokenKey{}, p) o.Context = context.WithValue(o.Context, dialTimeoutKey{}, timeout)
}
}
// WithConfig set consul-specific options
func WithConfig(c *api.Config) source.Option {
return func(o *source.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, configKey{}, c)
} }
} }

View File

@ -0,0 +1,89 @@
package etcd
import (
"strings"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/micro/go-micro/config/encoder"
)
func makeEvMap(e encoder.Encoder, data map[string]interface{}, kv []*clientv3.Event, stripPrefix string) map[string]interface{} {
if data == nil {
data = make(map[string]interface{})
}
for _, v := range kv {
switch mvccpb.Event_EventType(v.Type) {
case mvccpb.DELETE:
data = update(e, data, (*mvccpb.KeyValue)(v.Kv), "delete", stripPrefix)
default:
data = update(e, data, (*mvccpb.KeyValue)(v.Kv), "insert", stripPrefix)
}
}
return data
}
func makeMap(e encoder.Encoder, kv []*mvccpb.KeyValue, stripPrefix string) map[string]interface{} {
data := make(map[string]interface{})
for _, v := range kv {
data = update(e, data, v, "put", stripPrefix)
}
return data
}
func update(e encoder.Encoder, data map[string]interface{}, v *mvccpb.KeyValue, action, stripPrefix string) map[string]interface{} {
// remove prefix if non empty, and ensure leading / is removed as well
vkey := strings.TrimPrefix(strings.TrimPrefix(string(v.Key), stripPrefix), "/")
// split on prefix
haveSplit := strings.Contains(vkey, "/")
keys := strings.Split(vkey, "/")
var vals interface{}
e.Decode(v.Value, &vals)
if !haveSplit && len(keys) == 1 {
switch action {
case "delete":
data = make(map[string]interface{})
default:
v, ok := vals.(map[string]interface{})
if ok {
data = v
}
}
return data
}
// set data for first iteration
kvals := data
// iterate the keys and make maps
for i, k := range keys {
kval, ok := kvals[k].(map[string]interface{})
if !ok {
// create next map
kval = make(map[string]interface{})
// set it
kvals[k] = kval
}
// last key: write vals
if l := len(keys) - 1; i == l {
switch action {
case "delete":
delete(kvals, k)
default:
kvals[k] = vals
}
break
}
// set kvals for next iterator
kvals = kval
}
return data
}

View File

@ -0,0 +1,113 @@
package etcd
import (
"context"
"errors"
"sync"
"time"
cetcd "github.com/coreos/etcd/clientv3"
"github.com/micro/go-micro/config/source"
)
type watcher struct {
opts source.Options
name string
stripPrefix string
sync.RWMutex
cs *source.ChangeSet
ch chan *source.ChangeSet
exit chan bool
}
func newWatcher(key, strip string, wc cetcd.Watcher, cs *source.ChangeSet, opts source.Options) (source.Watcher, error) {
w := &watcher{
opts: opts,
name: "etcd",
stripPrefix: strip,
cs: cs,
ch: make(chan *source.ChangeSet),
exit: make(chan bool),
}
ch := wc.Watch(context.Background(), key, cetcd.WithPrefix())
go w.run(wc, ch)
return w, nil
}
func (w *watcher) handle(evs []*cetcd.Event) {
w.RLock()
data := w.cs.Data
w.RUnlock()
var vals map[string]interface{}
// unpackage existing changeset
if err := w.opts.Encoder.Decode(data, &vals); err != nil {
return
}
// update base changeset
d := makeEvMap(w.opts.Encoder, vals, evs, w.stripPrefix)
// pack the changeset
b, err := w.opts.Encoder.Encode(d)
if err != nil {
return
}
// create new changeset
cs := &source.ChangeSet{
Timestamp: time.Now(),
Source: w.name,
Data: b,
Format: w.opts.Encoder.String(),
}
cs.Checksum = cs.Sum()
// set base change set
w.Lock()
w.cs = cs
w.Unlock()
// send update
w.ch <- cs
}
func (w *watcher) run(wc cetcd.Watcher, ch cetcd.WatchChan) {
for {
select {
case rsp, ok := <-ch:
if !ok {
return
}
w.handle(rsp.Events)
case <-w.exit:
wc.Close()
return
}
}
}
func (w *watcher) Next() (*source.ChangeSet, error) {
select {
case cs := <-w.ch:
return cs, nil
case <-w.exit:
return nil, errors.New("watcher stopped")
}
}
func (w *watcher) Stop() error {
select {
case <-w.exit:
return nil
default:
close(w.exit)
}
return nil
}

View File

@ -1,158 +0,0 @@
package consul
import (
"fmt"
"log"
"net"
"os"
"path"
"sync"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/api/watch"
"github.com/micro/go-micro/sync/leader"
)
type consulLeader struct {
opts leader.Options
c *api.Client
}
type consulElected struct {
c *api.Client
l *api.Lock
id string
key string
opts leader.ElectOptions
mtx sync.RWMutex
rv <-chan struct{}
}
func (c *consulLeader) Elect(id string, opts ...leader.ElectOption) (leader.Elected, error) {
var options leader.ElectOptions
for _, o := range opts {
o(&options)
}
key := path.Join("micro/leader", c.opts.Group)
lc, err := c.c.LockOpts(&api.LockOptions{
Key: key,
Value: []byte(id),
})
if err != nil {
return nil, err
}
rv, err := lc.Lock(nil)
if err != nil {
return nil, err
}
return &consulElected{
c: c.c,
key: key,
rv: rv,
id: id,
l: lc,
opts: options,
}, nil
}
func (c *consulLeader) Follow() chan string {
ch := make(chan string, 1)
key := path.Join("/micro/leader", c.opts.Group)
p, err := watch.Parse(map[string]interface{}{
"type": "key",
"key": key,
})
if err != nil {
return ch
}
p.Handler = func(idx uint64, raw interface{}) {
if raw == nil {
return // ignore
}
v, ok := raw.(*api.KVPair)
if !ok || v == nil {
return // ignore
}
ch <- string(v.Value)
}
go p.RunWithClientAndLogger(c.c, log.New(os.Stdout, "consul: ", log.Lshortfile))
return ch
}
func (c *consulLeader) String() string {
return "consul"
}
func (c *consulElected) Id() string {
return c.id
}
func (c *consulElected) Reelect() error {
rv, err := c.l.Lock(nil)
if err != nil {
return err
}
c.mtx.Lock()
c.rv = rv
c.mtx.Unlock()
return nil
}
func (c *consulElected) Revoked() chan bool {
ch := make(chan bool, 1)
c.mtx.RLock()
rv := c.rv
c.mtx.RUnlock()
go func() {
<-rv
ch <- true
close(ch)
}()
return ch
}
func (c *consulElected) Resign() error {
return c.l.Unlock()
}
func NewLeader(opts ...leader.Option) leader.Leader {
options := leader.Options{
Group: "default",
}
for _, o := range opts {
o(&options)
}
config := api.DefaultConfig()
// set host
// config.Host something
// check if there are any addrs
if len(options.Nodes) > 0 {
addr, port, err := net.SplitHostPort(options.Nodes[0])
if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
port = "8500"
config.Address = fmt.Sprintf("%s:%s", addr, port)
} else if err == nil {
config.Address = fmt.Sprintf("%s:%s", addr, port)
}
}
client, _ := api.NewClient(config)
return &consulLeader{
opts: options,
c: client,
}
}

145
sync/leader/etcd/etcd.go Normal file
View File

@ -0,0 +1,145 @@
package etcd
import (
"context"
"log"
"path"
"strings"
client "github.com/coreos/etcd/clientv3"
cc "github.com/coreos/etcd/clientv3/concurrency"
"github.com/micro/go-micro/sync/leader"
)
type etcdLeader struct {
opts leader.Options
path string
client *client.Client
}
type etcdElected struct {
s *cc.Session
e *cc.Election
id string
}
func (e *etcdLeader) Elect(id string, opts ...leader.ElectOption) (leader.Elected, error) {
var options leader.ElectOptions
for _, o := range opts {
o(&options)
}
// make path
path := path.Join(e.path, strings.Replace(id, "/", "-", -1))
s, err := cc.NewSession(e.client)
if err != nil {
return nil, err
}
l := cc.NewElection(s, path)
ctx, _ := context.WithCancel(context.Background())
if err := l.Campaign(ctx, id); err != nil {
return nil, err
}
return &etcdElected{
e: l,
id: id,
}, nil
}
func (e *etcdLeader) Follow() chan string {
ch := make(chan string)
s, err := cc.NewSession(e.client)
if err != nil {
return ch
}
l := cc.NewElection(s, e.path)
ech := l.Observe(context.Background())
go func() {
for {
select {
case r, ok := <-ech:
if !ok {
return
}
ch <- string(r.Kvs[0].Value)
}
}
}()
return ch
}
func (e *etcdLeader) String() string {
return "etcd"
}
func (e *etcdElected) Reelect() error {
ctx, _ := context.WithCancel(context.Background())
return e.e.Campaign(ctx, e.id)
}
func (e *etcdElected) Revoked() chan bool {
ch := make(chan bool, 1)
ech := e.e.Observe(context.Background())
go func() {
for r := range ech {
if string(r.Kvs[0].Value) != e.id {
ch <- true
close(ch)
return
}
}
}()
return ch
}
func (e *etcdElected) Resign() error {
return e.e.Resign(context.Background())
}
func (e *etcdElected) Id() string {
return e.id
}
func NewLeader(opts ...leader.Option) leader.Leader {
var options leader.Options
for _, o := range opts {
o(&options)
}
var endpoints []string
for _, addr := range options.Nodes {
if len(addr) > 0 {
endpoints = append(endpoints, addr)
}
}
if len(endpoints) == 0 {
endpoints = []string{"http://127.0.0.1:2379"}
}
// TODO: parse addresses
c, err := client.New(client.Config{
Endpoints: endpoints,
})
if err != nil {
log.Fatal(err)
}
return &etcdLeader{
path: "/micro/leader",
client: c,
opts: options,
}
}