support ability to set store, address and namespace via flags and env vars (#1092)
This commit is contained in:
parent
0b8ff3a8bb
commit
048065fe96
@ -190,6 +190,21 @@ var (
|
||||
EnvVar: "MICRO_SELECTOR",
|
||||
Usage: "Selector used to pick nodes for querying",
|
||||
},
|
||||
cli.StringFlag{
|
||||
Name: "store",
|
||||
EnvVar: "MICRO_STORE",
|
||||
Usage: "Store used for key-value storage",
|
||||
},
|
||||
cli.StringFlag{
|
||||
Name: "store_address",
|
||||
EnvVar: "MICRO_STORE_ADDRESS",
|
||||
Usage: "Comma-separated list of store addresses",
|
||||
},
|
||||
cli.StringFlag{
|
||||
Name: "store_namespace",
|
||||
EnvVar: "MICRO_STORE_NAMESPACE",
|
||||
Usage: "Namespace for store data",
|
||||
},
|
||||
cli.StringFlag{
|
||||
Name: "transport",
|
||||
EnvVar: "MICRO_TRANSPORT",
|
||||
@ -462,6 +477,18 @@ func (c *cmd) Before(ctx *cli.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
if len(ctx.String("store_address")) > 0 {
|
||||
if err := (*c.opts.Store).Init(store.Nodes(strings.Split(ctx.String("store_address"), ",")...)); err != nil {
|
||||
log.Fatalf("Error configuring store: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if len(ctx.String("store_namespace")) > 0 {
|
||||
if err := (*c.opts.Store).Init(store.Namespace(ctx.String("store_address"))); err != nil {
|
||||
log.Fatalf("Error configuring store: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if len(ctx.String("server_name")) > 0 {
|
||||
serverOpts = append(serverOpts, server.Name(ctx.String("server_name")))
|
||||
}
|
||||
|
@ -96,6 +96,16 @@ func validateOptions(account, token, namespace string) {
|
||||
}
|
||||
}
|
||||
|
||||
func (w *workersKV) Init(opts ...store.Option) error {
|
||||
for _, o := range opts {
|
||||
o(&w.options)
|
||||
}
|
||||
if len(w.options.Namespace) > 0 {
|
||||
w.namespace = w.options.Namespace
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// In the cloudflare workers KV implemention, List() doesn't guarantee
|
||||
// anything as the workers API is eventually consistent.
|
||||
func (w *workersKV) List() ([]*store.Record, error) {
|
||||
@ -308,7 +318,7 @@ func NewStore(opts ...store.Option) store.Store {
|
||||
}
|
||||
|
||||
if len(namespace) == 0 {
|
||||
namespace = getNamespace(options.Context)
|
||||
namespace = options.Namespace
|
||||
}
|
||||
|
||||
// validate options are not blank or log.Fatal
|
||||
|
@ -25,10 +25,6 @@ func getAccount(ctx context.Context) string {
|
||||
return getOption(ctx, "CF_ACCOUNT_ID")
|
||||
}
|
||||
|
||||
func getNamespace(ctx context.Context) string {
|
||||
return getOption(ctx, "KV_NAMESPACE_ID")
|
||||
}
|
||||
|
||||
// Token sets the cloudflare api token
|
||||
func Token(t string) store.Option {
|
||||
return func(o *store.Options) {
|
||||
@ -52,9 +48,6 @@ func Account(id string) store.Option {
|
||||
// Namespace sets the KV namespace
|
||||
func Namespace(ns string) store.Option {
|
||||
return func(o *store.Options) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
o.Context = context.WithValue(o.Context, "KV_NAMESPACE_ID", ns)
|
||||
o.Namespace = ns
|
||||
}
|
||||
}
|
||||
|
@ -30,6 +30,14 @@ type sqlStore struct {
|
||||
options store.Options
|
||||
}
|
||||
|
||||
func (s *sqlStore) Init(opts ...store.Option) error {
|
||||
for _, o := range opts {
|
||||
o(&s.options)
|
||||
}
|
||||
// reconfigure
|
||||
return s.configure()
|
||||
}
|
||||
|
||||
// List all the known records
|
||||
func (s *sqlStore) List() ([]*store.Record, error) {
|
||||
rows, err := s.db.Query(fmt.Sprintf("SELECT key, value, expiry FROM %s.%s;", s.database, s.table))
|
||||
@ -147,16 +155,16 @@ func (s *sqlStore) Delete(keys ...string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *sqlStore) initDB() {
|
||||
func (s *sqlStore) initDB() error {
|
||||
// Create the namespace's database
|
||||
_, err := s.db.Exec(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s ;", s.database))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = s.db.Exec(fmt.Sprintf("SET DATABASE = %s ;", s.database))
|
||||
if err != nil {
|
||||
log.Fatal(errors.Wrap(err, "Couldn't set database"))
|
||||
return errors.Wrap(err, "Couldn't set database")
|
||||
}
|
||||
|
||||
// Create a table for the namespace's prefix
|
||||
@ -168,8 +176,60 @@ func (s *sqlStore) initDB() {
|
||||
CONSTRAINT %s_pkey PRIMARY KEY (key)
|
||||
);`, s.table, s.table))
|
||||
if err != nil {
|
||||
log.Fatal(errors.Wrap(err, "Couldn't create table"))
|
||||
return errors.Wrap(err, "Couldn't create table")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *sqlStore) configure() error {
|
||||
nodes := s.options.Nodes
|
||||
if len(nodes) == 0 {
|
||||
nodes = []string{"localhost:26257"}
|
||||
}
|
||||
|
||||
namespace := s.options.Namespace
|
||||
if len(namespace) == 0 {
|
||||
namespace = DefaultNamespace
|
||||
}
|
||||
|
||||
prefix := s.options.Prefix
|
||||
if len(prefix) == 0 {
|
||||
prefix = DefaultPrefix
|
||||
}
|
||||
|
||||
for _, r := range namespace {
|
||||
if !unicode.IsLetter(r) {
|
||||
return errors.New("store.namespace must only contain letters")
|
||||
}
|
||||
}
|
||||
|
||||
source := nodes[0]
|
||||
if !strings.Contains(source, " ") {
|
||||
source = fmt.Sprintf("host=%s", source)
|
||||
}
|
||||
|
||||
// create source from first node
|
||||
db, err := sql.Open("postgres", source)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := db.Ping(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if s.db != nil {
|
||||
s.db.Close()
|
||||
}
|
||||
|
||||
// save the values
|
||||
s.db = db
|
||||
s.database = namespace
|
||||
s.table = prefix
|
||||
|
||||
// initialise the database
|
||||
return s.initDB()
|
||||
}
|
||||
|
||||
// New returns a new micro Store backed by sql
|
||||
@ -179,46 +239,14 @@ func NewStore(opts ...store.Option) store.Store {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
nodes := options.Nodes
|
||||
if len(nodes) == 0 {
|
||||
nodes = []string{"localhost:26257"}
|
||||
}
|
||||
// new store
|
||||
s := new(sqlStore)
|
||||
|
||||
namespace := options.Namespace
|
||||
if len(namespace) == 0 {
|
||||
namespace = DefaultNamespace
|
||||
}
|
||||
|
||||
prefix := options.Prefix
|
||||
if len(prefix) == 0 {
|
||||
prefix = DefaultPrefix
|
||||
}
|
||||
|
||||
for _, r := range namespace {
|
||||
if !unicode.IsLetter(r) {
|
||||
log.Fatal("store.namespace must only contain letters")
|
||||
}
|
||||
}
|
||||
|
||||
source := nodes[0]
|
||||
if !strings.Contains(source, " ") {
|
||||
source = fmt.Sprintf("host=%s", source)
|
||||
}
|
||||
// create source from first node
|
||||
db, err := sql.Open("postgres", source)
|
||||
if err != nil {
|
||||
// configure the store
|
||||
if err := s.configure(); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
if err := db.Ping(); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
s := &sqlStore{
|
||||
db: db,
|
||||
database: namespace,
|
||||
table: prefix,
|
||||
}
|
||||
s.initDB()
|
||||
// return store
|
||||
return s
|
||||
}
|
||||
|
@ -15,6 +15,13 @@ type ekv struct {
|
||||
kv client.KV
|
||||
}
|
||||
|
||||
func (e *ekv) Init(opts ...store.Option) error {
|
||||
for _, o := range opts {
|
||||
o(&e.options)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *ekv) Read(keys ...string) ([]*store.Record, error) {
|
||||
//nolint:prealloc
|
||||
var values []*mvccpb.KeyValue
|
||||
|
@ -20,6 +20,13 @@ type memoryRecord struct {
|
||||
c time.Time
|
||||
}
|
||||
|
||||
func (m *memoryStore) Init(opts ...store.Option) error {
|
||||
for _, o := range opts {
|
||||
o(&m.options)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memoryStore) List() ([]*store.Record, error) {
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
|
@ -10,6 +10,10 @@ type Store struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
func (_m *Store) Init(...store.Option) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete provides a mock function with given fields: key
|
||||
func (_m *Store) Delete(key ...string) error {
|
||||
_va := make([]interface{}, len(key))
|
||||
|
@ -28,6 +28,17 @@ type serviceStore struct {
|
||||
Client pb.StoreService
|
||||
}
|
||||
|
||||
func (s *serviceStore) Init(opts ...store.Option) error {
|
||||
for _, o := range opts {
|
||||
o(&s.options)
|
||||
}
|
||||
s.Namespace = s.options.Namespace
|
||||
s.Prefix = s.options.Prefix
|
||||
s.Nodes = s.options.Nodes
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *serviceStore) Context() context.Context {
|
||||
ctx := context.Background()
|
||||
|
||||
|
@ -15,6 +15,8 @@ var (
|
||||
|
||||
// Store is a data storage interface
|
||||
type Store interface {
|
||||
// Initialise store options
|
||||
Init(...Option) error
|
||||
// List all the known records
|
||||
List() ([]*Record, error)
|
||||
// Read records with keys
|
||||
@ -34,6 +36,10 @@ type Record struct {
|
||||
|
||||
type noop struct{}
|
||||
|
||||
func (n *noop) Init(...Option) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *noop) List() ([]*Record, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user