add vault/etcd

This commit is contained in:
Asim Aslam 2019-05-31 12:38:49 +01:00
parent f8c880c39e
commit ef9c223ac8
12 changed files with 911 additions and 0 deletions

View File

@ -0,0 +1,51 @@
# Etcd Source
The etcd source reads config from etcd key/values
This source supports etcd version 3 and beyond.
## Etcd Format
The etcd source expects keys under the default prefix `/micro/config` (prefix can be changed)
Values are expected to be JSON
// set database
etcdctl put /micro/config/database '{"address": "", "port": 3306}'
// set cache
etcdctl put /micro/config/cache '{"address": "", "port": 6379}'
Keys are split on `/` so access becomes
conf.Get("micro", "config", "database")
## New Source
Specify source with data
etcdSource := etcd.NewSource(
// optionally specify etcd address; default to localhost:8500
// optionally specify prefix; defaults to /micro/config
// optionally strip the provided prefix from the keys, defaults to false
## Load Source
Load the source into config
// Create new config
conf := config.NewConfig()
// Load file source

config/source/etcd/etcd.go Normal file
View File

@ -0,0 +1,134 @@
package etcd
import (
cetcd ""
// Currently a single etcd reader
type etcd struct {
prefix string
stripPrefix string
opts source.Options
client *cetcd.Client
cerr error
var (
DefaultPrefix = "/micro/config/"
func (c *etcd) Read() (*source.ChangeSet, error) {
if c.cerr != nil {
return nil, c.cerr
rsp, err := c.client.Get(context.Background(), c.prefix, cetcd.WithPrefix())
if err != nil {
return nil, err
if rsp == nil || len(rsp.Kvs) == 0 {
return nil, fmt.Errorf("source not found: %s", c.prefix)
var kvs []*mvccpb.KeyValue
for _, v := range rsp.Kvs {
kvs = append(kvs, (*mvccpb.KeyValue)(v))
data := makeMap(c.opts.Encoder, kvs, c.stripPrefix)
b, err := c.opts.Encoder.Encode(data)
if err != nil {
return nil, fmt.Errorf("error reading source: %v", err)
cs := &source.ChangeSet{
Timestamp: time.Now(),
Source: c.String(),
Data: b,
Format: c.opts.Encoder.String(),
cs.Checksum = cs.Sum()
return cs, nil
func (c *etcd) String() string {
return "etcd"
func (c *etcd) Watch() (source.Watcher, error) {
if c.cerr != nil {
return nil, c.cerr
cs, err := c.Read()
if err != nil {
return nil, err
return newWatcher(c.prefix, c.stripPrefix, c.client.Watcher, cs, c.opts)
func NewSource(opts ...source.Option) source.Source {
options := source.NewOptions(opts...)
var endpoints []string
// check if there are any addrs
addrs, ok := options.Context.Value(addressKey{}).([]string)
if ok {
for _, a := range addrs {
addr, port, err := net.SplitHostPort(a)
if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
port = "2379"
addr = a
endpoints = append(endpoints, fmt.Sprintf("%s:%s", addr, port))
} else if err == nil {
endpoints = append(endpoints, fmt.Sprintf("%s:%s", addr, port))
if len(endpoints) == 0 {
endpoints = []string{"localhost:2379"}
config := cetcd.Config{
Endpoints: endpoints,
u, ok := options.Context.Value(authKey{}).(*authCreds)
if ok {
config.Username = u.Username
config.Password = u.Password
// use default config
client, err := cetcd.New(config)
prefix := DefaultPrefix
sp := ""
f, ok := options.Context.Value(prefixKey{}).(string)
if ok {
prefix = f
if b, ok := options.Context.Value(stripPrefixKey{}).(bool); ok && b {
sp = prefix
return &etcd{
prefix: prefix,
stripPrefix: sp,
opts: options,
client: client,
cerr: err,

View File

@ -0,0 +1,58 @@
package etcd
import (
type addressKey struct{}
type prefixKey struct{}
type stripPrefixKey struct{}
type authKey struct{}
type authCreds struct {
Username string
Password string
// WithAddress sets the consul address
func WithAddress(a ...string) source.Option {
return func(o *source.Options) {
if o.Context == nil {
o.Context = context.Background()
o.Context = context.WithValue(o.Context, addressKey{}, a)
// WithPrefix sets the key prefix to use
func WithPrefix(p string) source.Option {
return func(o *source.Options) {
if o.Context == nil {
o.Context = context.Background()
o.Context = context.WithValue(o.Context, prefixKey{}, p)
// StripPrefix indicates whether to remove the prefix from config entries, or leave it in place.
func StripPrefix(strip bool) source.Option {
return func(o *source.Options) {
if o.Context == nil {
o.Context = context.Background()
o.Context = context.WithValue(o.Context, stripPrefixKey{}, strip)
// Auth allows you to specify username/password
func Auth(username, password string) source.Option {
return func(o *source.Options) {
if o.Context == nil {
o.Context = context.Background()
o.Context = context.WithValue(o.Context, authKey{}, &authCreds{Username: username, Password: password})

View File

@ -0,0 +1,89 @@
package etcd
import (
func makeEvMap(e encoder.Encoder, data map[string]interface{}, kv []*clientv3.Event, stripPrefix string) map[string]interface{} {
if data == nil {
data = make(map[string]interface{})
for _, v := range kv {
switch mvccpb.Event_EventType(v.Type) {
case mvccpb.DELETE:
data = update(e, data, (*mvccpb.KeyValue)(v.Kv), "delete", stripPrefix)
data = update(e, data, (*mvccpb.KeyValue)(v.Kv), "insert", stripPrefix)
return data
func makeMap(e encoder.Encoder, kv []*mvccpb.KeyValue, stripPrefix string) map[string]interface{} {
data := make(map[string]interface{})
for _, v := range kv {
data = update(e, data, v, "put", stripPrefix)
return data
func update(e encoder.Encoder, data map[string]interface{}, v *mvccpb.KeyValue, action, stripPrefix string) map[string]interface{} {
// remove prefix if non empty, and ensure leading / is removed as well
vkey := strings.TrimPrefix(strings.TrimPrefix(string(v.Key), stripPrefix), "/")
// split on prefix
haveSplit := strings.Contains(vkey, "/")
keys := strings.Split(vkey, "/")
var vals interface{}
e.Decode(v.Value, &vals)
if !haveSplit && len(keys) == 1 {
switch action {
case "delete":
data = make(map[string]interface{})
v, ok := vals.(map[string]interface{})
if ok {
data = v
return data
// set data for first iteration
kvals := data
// iterate the keys and make maps
for i, k := range keys {
kval, ok := kvals[k].(map[string]interface{})
if !ok {
// create next map
kval = make(map[string]interface{})
// set it
kvals[k] = kval
// last key: write vals
if l := len(keys) - 1; i == l {
switch action {
case "delete":
delete(kvals, k)
kvals[k] = vals
// set kvals for next iterator
kvals = kval
return data

View File

@ -0,0 +1,113 @@
package etcd
import (
cetcd ""
type watcher struct {
opts source.Options
name string
stripPrefix string
cs *source.ChangeSet
ch chan *source.ChangeSet
exit chan bool
func newWatcher(key, strip string, wc cetcd.Watcher, cs *source.ChangeSet, opts source.Options) (source.Watcher, error) {
w := &watcher{
opts: opts,
name: "etcd",
stripPrefix: strip,
cs: cs,
ch: make(chan *source.ChangeSet),
exit: make(chan bool),
ch := wc.Watch(context.Background(), key, cetcd.WithPrefix())
go, ch)
return w, nil
func (w *watcher) handle(evs []*cetcd.Event) {
data := w.cs.Data
var vals map[string]interface{}
// unpackage existing changeset
if err := w.opts.Encoder.Decode(data, &vals); err != nil {
// update base changeset
d := makeEvMap(w.opts.Encoder, vals, evs, w.stripPrefix)
// pack the changeset
b, err := w.opts.Encoder.Encode(d)
if err != nil {
// create new changeset
cs := &source.ChangeSet{
Timestamp: time.Now(),
Data: b,
Format: w.opts.Encoder.String(),
cs.Checksum = cs.Sum()
// set base change set
w.cs = cs
// send update <- cs
func (w *watcher) run(wc cetcd.Watcher, ch cetcd.WatchChan) {
for {
select {
case rsp, ok := <-ch:
if !ok {
case <-w.exit:
func (w *watcher) Next() (*source.ChangeSet, error) {
select {
case cs := <
return cs, nil
case <-w.exit:
return nil, errors.New("watcher stopped")
func (w *watcher) Stop() error {
select {
case <-w.exit:
return nil
return nil

View File

@ -0,0 +1,43 @@
# Vault Source
The vault source reads config from different secret engines in a Vault server. For example:
kv: secret/data/<my/secret>
database credentials: database/creds/<my-db-role>
## New Source
Specify source with data
vaultSource := vault.NewSource(
// mandatory: it specifies server address.
// It could have different formats:
// ->
// ->
// mandatory: it specifies a resource to been access
// mandatory: it specifies a resource to been access
// optional: path to store my secret.
// By default use resourcePath value
// optional: namespace.
## Load Source
Load the source into config
// Create new config
conf := config.NewConfig()
// Load file source

View File

@ -0,0 +1,63 @@
package vault
import (
type addressKey struct{}
type resourcePath struct{}
type nameSpace struct{}
type tokenKey struct{}
type secretName struct{}
// WithAddress sets the server address
func WithAddress(a string) source.Option {
return func(o *source.Options) {
if o.Context == nil {
o.Context = context.Background()
o.Context = context.WithValue(o.Context, addressKey{}, a)
// WithResourcePath sets the resource that will be access
func WithResourcePath(p string) source.Option {
return func(o *source.Options) {
if o.Context == nil {
o.Context = context.Background()
o.Context = context.WithValue(o.Context, resourcePath{}, p)
// WithNameSpace sets the namespace that its going to be access
func WithNameSpace(n string) source.Option {
return func(o *source.Options) {
if o.Context == nil {
o.Context = context.Background()
o.Context = context.WithValue(o.Context, nameSpace{}, n)
// WithToken sets the key token to use
func WithToken(t string) source.Option {
return func(o *source.Options) {
if o.Context == nil {
o.Context = context.Background()
o.Context = context.WithValue(o.Context, tokenKey{}, t)
// WithSecretName sets the name of the secret to wrap in on a map
func WithSecretName(t string) source.Option {
return func(o *source.Options) {
if o.Context == nil {
o.Context = context.Background()
o.Context = context.WithValue(o.Context, secretName{}, t)

View File

@ -0,0 +1 @@
vault kv put secret/data/db/auth user=myuser password=mypassword2 host= port=3307

View File

@ -0,0 +1,98 @@
package vault
import (
func makeMap(kv map[string]interface{}, secretName string) (map[string]interface{}, error) {
data := make(map[string]interface{})
// if secret version included
if kv["data"] != nil && kv["metadata"] != nil {
kv = kv["data"].(map[string]interface{})
target := data
// if secretName defined, wrap secrets under a map
if secretName != "" {
path := strings.Split(secretName, "/")
// find (or create) the location we want to put this value at
for i, dir := range path {
if _, ok := target[dir]; !ok {
target[dir] = make(map[string]interface{})
if i < len(path)-1 {
target = target[dir].(map[string]interface{})
} else {
target[dir] = kv
return data, nil
func getAddress(options source.Options) string {
// check if there are any addrs
a, ok := options.Context.Value(addressKey{}).(string)
if ok {
// check if http protocol is defined
if a[0] != 'h' {
addr, port, err := net.SplitHostPort(a)
if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
port = "8200"
addr = a
return fmt.Sprintf("https://%s:%s", addr, port)
} else if err == nil {
return fmt.Sprintf("https://%s:%s", addr, port)
} else {
u, _ := url.Parse(a)
if host, port, _ := net.SplitHostPort(u.Host); host == "" {
port = "8200"
return fmt.Sprintf("%s://%s:%s", u.Scheme, u.Host, port)
} else {
return fmt.Sprintf("%s://%s", u.Scheme, u.Host)
return ""
func getToken(options source.Options) string {
token, ok := options.Context.Value(tokenKey{}).(string)
if ok {
return token
return ""
func getResourcePath(options source.Options) string {
path, ok := options.Context.Value(resourcePath{}).(string)
if ok {
return path
return ""
func getNameSpace(options source.Options) string {
ns, ok := options.Context.Value(nameSpace{}).(string)
if ok {
return ns
return ""
func getSecretName(options source.Options) string {
ns, ok := options.Context.Value(secretName{}).(string)
if ok {
return ns
return ""

View File

@ -0,0 +1,96 @@
package vault
import (
// Currently a single vault reader
type vault struct {
secretPath string
secretName string
opts source.Options
client *api.Client
func (c *vault) Read() (*source.ChangeSet, error) {
secret, err := c.client.Logical().Read(c.secretPath)
if err != nil {
return nil, err
if secret == nil {
return nil, fmt.Errorf("source not found: %s", c.secretPath)
if secret.Data == nil && secret.Warnings != nil {
return nil, fmt.Errorf("source: %s errors: %v", c.secretPath, secret.Warnings)
data, err := makeMap(secret.Data, c.secretName)
if err != nil {
return nil, fmt.Errorf("error reading data: %v", err)
b, err := c.opts.Encoder.Encode(data)
if err != nil {
return nil, fmt.Errorf("error reading source: %v", err)
cs := &source.ChangeSet{
Timestamp: time.Now(),
Format: c.opts.Encoder.String(),
Source: c.String(),
Data: b,
cs.Checksum = cs.Sum()
return cs, nil
//return nil, nil
func (c *vault) String() string {
return "vault"
func (c *vault) Watch() (source.Watcher, error) {
w := newWatcher(c.client)
return w, nil
// NewSource creates a new vault source
func NewSource(opts ...source.Option) source.Source {
options := source.NewOptions(opts...)
// create the client
client, _ := api.NewClient(api.DefaultConfig())
// get and set options
if address := getAddress(options); address != "" {
_ = client.SetAddress(address)
if nameSpace := getNameSpace(options); nameSpace != "" {
if token := getToken(options); token != "" {
path := getResourcePath(options)
name := getSecretName(options)
if name == "" {
name = path
return &vault{
opts: options,
client: client,
secretPath: path,
secretName: name,

View File

@ -0,0 +1,133 @@
package vault
import (
func TestVaultMakeMap(t *testing.T) {
tt := []struct {
name string
expected []byte
input []byte
secretName string
name: "simple valid data 1",
secretName: "my/secret",
input: []byte(`{"data":{"bar":"bazz", "tar":"par"}, "metadata":{"version":1, "destroyed": false}}`),
expected: []byte(`{"my":{"secret":{"bar":"bazz", "tar":"par"}}}`),
name: "simple valid data 2",
secretName: "my/secret",
input: []byte(`{"bar":"bazz", "tar":"par"}`),
expected: []byte(`{"my":{"secret":{"bar":"bazz", "tar":"par"}}}`),
for _, tc := range tt {
t.Run(, func(t *testing.T) {
var input map[string]interface{}
var expected map[string]interface{}
_ = json.Unmarshal(tc.input, &input)
_ = json.Unmarshal(tc.expected, &expected)
out, _ := makeMap(input, tc.secretName)
if eq := reflect.DeepEqual(out, expected); !eq {
t.Fatalf("expected %v and got %v", expected, out)
func TestVault_Read(t *testing.T) {
if tr := os.Getenv("TRAVIS"); len(tr) > 0 {
var (
address = ""
resource = "secret/data/db/auth"
token = "s.Q4Zi0CSowXZl7sh0z96ijcT4"
data := []byte(`{"secret":{"data":{"db":{"auth":{"host":"","password":"mypassword","port":"3306","user":"myuser"}}}}}`)
tt := []struct {
name string
addr string
resource string
token string
{name: "read data basic", addr: address, resource: resource, token: token},
{name: "read data without token", addr: address, resource: resource, token: ""},
{name: "read data full address format", addr: "", resource: resource, token: token},
{name: "read data wrong resource path", addr: address, resource: "secrets/data/db/auth", token: token},
for _, tc := range tt {
t.Run(, func(t *testing.T) {
source := NewSource(
r, err := source.Read()
if err != nil {
if tc.token == "" {
} else if strings.Compare(err.Error(), "source not found: secrets/data/db/auth") == 0 {
t.Errorf("%s: not able to read the config values because: %v",, err)
if string(r.Data) != string(data) {
t.Logf("data expected: %v", string(data))
t.Logf("data got from configmap: %v", string(r.Data))
t.Errorf("data from configmap does not match.")
func TestVault_String(t *testing.T) {
source := NewSource()
if source.String() != "vault" {
t.Errorf("expecting to get %v and instead got %v", "vault", source)
func TestVaultNewSource(t *testing.T) {
if tr := os.Getenv("TRAVIS"); len(tr) > 0 {
conf := config.NewConfig()
_ = conf.Load(NewSource(
if user := conf.Get("secret", "data", "db", "auth", "user").String("user"); user != "myuser" {
t.Errorf("expected %v and got %v", "myuser", user)
if addr := conf.Get("secret", "data", "db", "auth", "host").String("host"); addr != "" {
t.Errorf("expected %v and got %v", "", addr)

View File

@ -0,0 +1,32 @@
package vault
import (
type watcher struct {
c *api.Client
exit chan bool
func newWatcher(c *api.Client) *watcher {
return &watcher{
c: c,
exit: make(chan bool),
func (w *watcher) Next() (*source.ChangeSet, error) {
return nil, errors.New("url watcher stopped")
func (w *watcher) Stop() error {
select {
case <-w.exit:
return nil