remove etcd source
This commit is contained in:
parent
d030c78d1c
commit
aec1ca6635
@ -1,51 +0,0 @@
|
|||||||
# Etcd Source
|
|
||||||
|
|
||||||
The etcd source reads config from etcd key/values
|
|
||||||
|
|
||||||
This source supports etcd version 3 and beyond.
|
|
||||||
|
|
||||||
## Etcd Format
|
|
||||||
|
|
||||||
The etcd source expects keys under the default prefix `/micro/config` (prefix can be changed)
|
|
||||||
|
|
||||||
Values are expected to be JSON
|
|
||||||
|
|
||||||
```
|
|
||||||
// set database
|
|
||||||
etcdctl put /micro/config/database '{"address": "10.0.0.1", "port": 3306}'
|
|
||||||
// set cache
|
|
||||||
etcdctl put /micro/config/cache '{"address": "10.0.0.2", "port": 6379}'
|
|
||||||
```
|
|
||||||
|
|
||||||
Keys are split on `/` so access becomes
|
|
||||||
|
|
||||||
```
|
|
||||||
conf.Get("micro", "config", "database")
|
|
||||||
```
|
|
||||||
|
|
||||||
## New Source
|
|
||||||
|
|
||||||
Specify source with data
|
|
||||||
|
|
||||||
```go
|
|
||||||
etcdSource := etcd.NewSource(
|
|
||||||
// optionally specify etcd address; default to localhost:8500
|
|
||||||
etcd.WithAddress("10.0.0.10:8500"),
|
|
||||||
// optionally specify prefix; defaults to /micro/config
|
|
||||||
etcd.WithPrefix("/my/prefix"),
|
|
||||||
// optionally strip the provided prefix from the keys, defaults to false
|
|
||||||
etcd.StripPrefix(true),
|
|
||||||
)
|
|
||||||
```
|
|
||||||
|
|
||||||
## Load Source
|
|
||||||
|
|
||||||
Load the source into config
|
|
||||||
|
|
||||||
```go
|
|
||||||
// Create new config
|
|
||||||
conf := config.NewConfig()
|
|
||||||
|
|
||||||
// Load file source
|
|
||||||
conf.Load(etcdSource)
|
|
||||||
```
|
|
@ -1,141 +0,0 @@
|
|||||||
package etcd
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"net"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
cetcd "github.com/coreos/etcd/clientv3"
|
|
||||||
"github.com/coreos/etcd/mvcc/mvccpb"
|
|
||||||
"github.com/micro/go-micro/config/source"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Currently a single etcd reader
|
|
||||||
type etcd struct {
|
|
||||||
prefix string
|
|
||||||
stripPrefix string
|
|
||||||
opts source.Options
|
|
||||||
client *cetcd.Client
|
|
||||||
cerr error
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
DefaultPrefix = "/micro/config/"
|
|
||||||
)
|
|
||||||
|
|
||||||
func (c *etcd) Read() (*source.ChangeSet, error) {
|
|
||||||
if c.cerr != nil {
|
|
||||||
return nil, c.cerr
|
|
||||||
}
|
|
||||||
|
|
||||||
rsp, err := c.client.Get(context.Background(), c.prefix, cetcd.WithPrefix())
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if rsp == nil || len(rsp.Kvs) == 0 {
|
|
||||||
return nil, fmt.Errorf("source not found: %s", c.prefix)
|
|
||||||
}
|
|
||||||
|
|
||||||
var kvs []*mvccpb.KeyValue
|
|
||||||
for _, v := range rsp.Kvs {
|
|
||||||
kvs = append(kvs, (*mvccpb.KeyValue)(v))
|
|
||||||
}
|
|
||||||
|
|
||||||
data := makeMap(c.opts.Encoder, kvs, c.stripPrefix)
|
|
||||||
|
|
||||||
b, err := c.opts.Encoder.Encode(data)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("error reading source: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
cs := &source.ChangeSet{
|
|
||||||
Timestamp: time.Now(),
|
|
||||||
Source: c.String(),
|
|
||||||
Data: b,
|
|
||||||
Format: c.opts.Encoder.String(),
|
|
||||||
}
|
|
||||||
cs.Checksum = cs.Sum()
|
|
||||||
|
|
||||||
return cs, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *etcd) String() string {
|
|
||||||
return "etcd"
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *etcd) Watch() (source.Watcher, error) {
|
|
||||||
if c.cerr != nil {
|
|
||||||
return nil, c.cerr
|
|
||||||
}
|
|
||||||
cs, err := c.Read()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return newWatcher(c.prefix, c.stripPrefix, c.client.Watcher, cs, c.opts)
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewSource(opts ...source.Option) source.Source {
|
|
||||||
options := source.NewOptions(opts...)
|
|
||||||
|
|
||||||
var endpoints []string
|
|
||||||
|
|
||||||
// check if there are any addrs
|
|
||||||
addrs, ok := options.Context.Value(addressKey{}).([]string)
|
|
||||||
if ok {
|
|
||||||
for _, a := range addrs {
|
|
||||||
addr, port, err := net.SplitHostPort(a)
|
|
||||||
if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
|
|
||||||
port = "2379"
|
|
||||||
addr = a
|
|
||||||
endpoints = append(endpoints, fmt.Sprintf("%s:%s", addr, port))
|
|
||||||
} else if err == nil {
|
|
||||||
endpoints = append(endpoints, fmt.Sprintf("%s:%s", addr, port))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(endpoints) == 0 {
|
|
||||||
endpoints = []string{"localhost:2379"}
|
|
||||||
}
|
|
||||||
|
|
||||||
// check dial timeout option
|
|
||||||
dialTimeout, ok := options.Context.Value(dialTimeoutKey{}).(time.Duration)
|
|
||||||
if !ok {
|
|
||||||
dialTimeout = 3 * time.Second // default dial timeout
|
|
||||||
}
|
|
||||||
|
|
||||||
config := cetcd.Config{
|
|
||||||
Endpoints: endpoints,
|
|
||||||
DialTimeout: dialTimeout,
|
|
||||||
}
|
|
||||||
|
|
||||||
u, ok := options.Context.Value(authKey{}).(*authCreds)
|
|
||||||
if ok {
|
|
||||||
config.Username = u.Username
|
|
||||||
config.Password = u.Password
|
|
||||||
}
|
|
||||||
|
|
||||||
// use default config
|
|
||||||
client, err := cetcd.New(config)
|
|
||||||
|
|
||||||
prefix := DefaultPrefix
|
|
||||||
sp := ""
|
|
||||||
f, ok := options.Context.Value(prefixKey{}).(string)
|
|
||||||
if ok {
|
|
||||||
prefix = f
|
|
||||||
}
|
|
||||||
|
|
||||||
if b, ok := options.Context.Value(stripPrefixKey{}).(bool); ok && b {
|
|
||||||
sp = prefix
|
|
||||||
}
|
|
||||||
|
|
||||||
return &etcd{
|
|
||||||
prefix: prefix,
|
|
||||||
stripPrefix: sp,
|
|
||||||
opts: options,
|
|
||||||
client: client,
|
|
||||||
cerr: err,
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,70 +0,0 @@
|
|||||||
package etcd
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/micro/go-micro/config/source"
|
|
||||||
)
|
|
||||||
|
|
||||||
type addressKey struct{}
|
|
||||||
type prefixKey struct{}
|
|
||||||
type stripPrefixKey struct{}
|
|
||||||
type authKey struct{}
|
|
||||||
type dialTimeoutKey struct{}
|
|
||||||
|
|
||||||
type authCreds struct {
|
|
||||||
Username string
|
|
||||||
Password string
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithAddress sets the etcd address
|
|
||||||
func WithAddress(a ...string) source.Option {
|
|
||||||
return func(o *source.Options) {
|
|
||||||
if o.Context == nil {
|
|
||||||
o.Context = context.Background()
|
|
||||||
}
|
|
||||||
o.Context = context.WithValue(o.Context, addressKey{}, a)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithPrefix sets the key prefix to use
|
|
||||||
func WithPrefix(p string) source.Option {
|
|
||||||
return func(o *source.Options) {
|
|
||||||
if o.Context == nil {
|
|
||||||
o.Context = context.Background()
|
|
||||||
}
|
|
||||||
o.Context = context.WithValue(o.Context, prefixKey{}, p)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// StripPrefix indicates whether to remove the prefix from config entries, or leave it in place.
|
|
||||||
func StripPrefix(strip bool) source.Option {
|
|
||||||
return func(o *source.Options) {
|
|
||||||
if o.Context == nil {
|
|
||||||
o.Context = context.Background()
|
|
||||||
}
|
|
||||||
|
|
||||||
o.Context = context.WithValue(o.Context, stripPrefixKey{}, strip)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Auth allows you to specify username/password
|
|
||||||
func Auth(username, password string) source.Option {
|
|
||||||
return func(o *source.Options) {
|
|
||||||
if o.Context == nil {
|
|
||||||
o.Context = context.Background()
|
|
||||||
}
|
|
||||||
o.Context = context.WithValue(o.Context, authKey{}, &authCreds{Username: username, Password: password})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithDialTimeout set the time out for dialing to etcd
|
|
||||||
func WithDialTimeout(timeout time.Duration) source.Option {
|
|
||||||
return func(o *source.Options) {
|
|
||||||
if o.Context == nil {
|
|
||||||
o.Context = context.Background()
|
|
||||||
}
|
|
||||||
o.Context = context.WithValue(o.Context, dialTimeoutKey{}, timeout)
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,89 +0,0 @@
|
|||||||
package etcd
|
|
||||||
|
|
||||||
import (
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
"github.com/coreos/etcd/clientv3"
|
|
||||||
"github.com/coreos/etcd/mvcc/mvccpb"
|
|
||||||
"github.com/micro/go-micro/config/encoder"
|
|
||||||
)
|
|
||||||
|
|
||||||
func makeEvMap(e encoder.Encoder, data map[string]interface{}, kv []*clientv3.Event, stripPrefix string) map[string]interface{} {
|
|
||||||
if data == nil {
|
|
||||||
data = make(map[string]interface{})
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, v := range kv {
|
|
||||||
switch mvccpb.Event_EventType(v.Type) {
|
|
||||||
case mvccpb.DELETE:
|
|
||||||
data = update(e, data, (*mvccpb.KeyValue)(v.Kv), "delete", stripPrefix)
|
|
||||||
default:
|
|
||||||
data = update(e, data, (*mvccpb.KeyValue)(v.Kv), "insert", stripPrefix)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return data
|
|
||||||
}
|
|
||||||
|
|
||||||
func makeMap(e encoder.Encoder, kv []*mvccpb.KeyValue, stripPrefix string) map[string]interface{} {
|
|
||||||
data := make(map[string]interface{})
|
|
||||||
|
|
||||||
for _, v := range kv {
|
|
||||||
data = update(e, data, v, "put", stripPrefix)
|
|
||||||
}
|
|
||||||
|
|
||||||
return data
|
|
||||||
}
|
|
||||||
|
|
||||||
func update(e encoder.Encoder, data map[string]interface{}, v *mvccpb.KeyValue, action, stripPrefix string) map[string]interface{} {
|
|
||||||
// remove prefix if non empty, and ensure leading / is removed as well
|
|
||||||
vkey := strings.TrimPrefix(strings.TrimPrefix(string(v.Key), stripPrefix), "/")
|
|
||||||
// split on prefix
|
|
||||||
haveSplit := strings.Contains(vkey, "/")
|
|
||||||
keys := strings.Split(vkey, "/")
|
|
||||||
|
|
||||||
var vals interface{}
|
|
||||||
e.Decode(v.Value, &vals)
|
|
||||||
|
|
||||||
if !haveSplit && len(keys) == 1 {
|
|
||||||
switch action {
|
|
||||||
case "delete":
|
|
||||||
data = make(map[string]interface{})
|
|
||||||
default:
|
|
||||||
v, ok := vals.(map[string]interface{})
|
|
||||||
if ok {
|
|
||||||
data = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return data
|
|
||||||
}
|
|
||||||
|
|
||||||
// set data for first iteration
|
|
||||||
kvals := data
|
|
||||||
// iterate the keys and make maps
|
|
||||||
for i, k := range keys {
|
|
||||||
kval, ok := kvals[k].(map[string]interface{})
|
|
||||||
if !ok {
|
|
||||||
// create next map
|
|
||||||
kval = make(map[string]interface{})
|
|
||||||
// set it
|
|
||||||
kvals[k] = kval
|
|
||||||
}
|
|
||||||
|
|
||||||
// last key: write vals
|
|
||||||
if l := len(keys) - 1; i == l {
|
|
||||||
switch action {
|
|
||||||
case "delete":
|
|
||||||
delete(kvals, k)
|
|
||||||
default:
|
|
||||||
kvals[k] = vals
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
// set kvals for next iterator
|
|
||||||
kvals = kval
|
|
||||||
}
|
|
||||||
|
|
||||||
return data
|
|
||||||
}
|
|
@ -1,113 +0,0 @@
|
|||||||
package etcd
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
cetcd "github.com/coreos/etcd/clientv3"
|
|
||||||
"github.com/micro/go-micro/config/source"
|
|
||||||
)
|
|
||||||
|
|
||||||
type watcher struct {
|
|
||||||
opts source.Options
|
|
||||||
name string
|
|
||||||
stripPrefix string
|
|
||||||
|
|
||||||
sync.RWMutex
|
|
||||||
cs *source.ChangeSet
|
|
||||||
|
|
||||||
ch chan *source.ChangeSet
|
|
||||||
exit chan bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func newWatcher(key, strip string, wc cetcd.Watcher, cs *source.ChangeSet, opts source.Options) (source.Watcher, error) {
|
|
||||||
w := &watcher{
|
|
||||||
opts: opts,
|
|
||||||
name: "etcd",
|
|
||||||
stripPrefix: strip,
|
|
||||||
cs: cs,
|
|
||||||
ch: make(chan *source.ChangeSet),
|
|
||||||
exit: make(chan bool),
|
|
||||||
}
|
|
||||||
|
|
||||||
ch := wc.Watch(context.Background(), key, cetcd.WithPrefix())
|
|
||||||
|
|
||||||
go w.run(wc, ch)
|
|
||||||
|
|
||||||
return w, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *watcher) handle(evs []*cetcd.Event) {
|
|
||||||
w.RLock()
|
|
||||||
data := w.cs.Data
|
|
||||||
w.RUnlock()
|
|
||||||
|
|
||||||
var vals map[string]interface{}
|
|
||||||
|
|
||||||
// unpackage existing changeset
|
|
||||||
if err := w.opts.Encoder.Decode(data, &vals); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// update base changeset
|
|
||||||
d := makeEvMap(w.opts.Encoder, vals, evs, w.stripPrefix)
|
|
||||||
|
|
||||||
// pack the changeset
|
|
||||||
b, err := w.opts.Encoder.Encode(d)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// create new changeset
|
|
||||||
cs := &source.ChangeSet{
|
|
||||||
Timestamp: time.Now(),
|
|
||||||
Source: w.name,
|
|
||||||
Data: b,
|
|
||||||
Format: w.opts.Encoder.String(),
|
|
||||||
}
|
|
||||||
cs.Checksum = cs.Sum()
|
|
||||||
|
|
||||||
// set base change set
|
|
||||||
w.Lock()
|
|
||||||
w.cs = cs
|
|
||||||
w.Unlock()
|
|
||||||
|
|
||||||
// send update
|
|
||||||
w.ch <- cs
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *watcher) run(wc cetcd.Watcher, ch cetcd.WatchChan) {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case rsp, ok := <-ch:
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
w.handle(rsp.Events)
|
|
||||||
case <-w.exit:
|
|
||||||
wc.Close()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *watcher) Next() (*source.ChangeSet, error) {
|
|
||||||
select {
|
|
||||||
case cs := <-w.ch:
|
|
||||||
return cs, nil
|
|
||||||
case <-w.exit:
|
|
||||||
return nil, errors.New("watcher stopped")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *watcher) Stop() error {
|
|
||||||
select {
|
|
||||||
case <-w.exit:
|
|
||||||
return nil
|
|
||||||
default:
|
|
||||||
close(w.exit)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user