diff --git a/config/cmd/cmd.go b/config/cmd/cmd.go index 96973c2f..c2dbd9a5 100644 --- a/config/cmd/cmd.go +++ b/config/cmd/cmd.go @@ -190,6 +190,21 @@ var ( EnvVar: "MICRO_SELECTOR", Usage: "Selector used to pick nodes for querying", }, + cli.StringFlag{ + Name: "store", + EnvVar: "MICRO_STORE", + Usage: "Store used for key-value storage", + }, + cli.StringFlag{ + Name: "store_address", + EnvVar: "MICRO_STORE_ADDRESS", + Usage: "Comma-separated list of store addresses", + }, + cli.StringFlag{ + Name: "store_namespace", + EnvVar: "MICRO_STORE_NAMESPACE", + Usage: "Namespace for store data", + }, cli.StringFlag{ Name: "transport", EnvVar: "MICRO_TRANSPORT", @@ -462,6 +477,18 @@ func (c *cmd) Before(ctx *cli.Context) error { } } + if len(ctx.String("store_address")) > 0 { + if err := (*c.opts.Store).Init(store.Nodes(strings.Split(ctx.String("store_address"), ",")...)); err != nil { + log.Fatalf("Error configuring store: %v", err) + } + } + + if len(ctx.String("store_namespace")) > 0 { + if err := (*c.opts.Store).Init(store.Namespace(ctx.String("store_address"))); err != nil { + log.Fatalf("Error configuring store: %v", err) + } + } + if len(ctx.String("server_name")) > 0 { serverOpts = append(serverOpts, server.Name(ctx.String("server_name"))) } diff --git a/store/cloudflare/cloudflare.go b/store/cloudflare/cloudflare.go index 20ce2bac..03e32298 100644 --- a/store/cloudflare/cloudflare.go +++ b/store/cloudflare/cloudflare.go @@ -96,6 +96,16 @@ func validateOptions(account, token, namespace string) { } } +func (w *workersKV) Init(opts ...store.Option) error { + for _, o := range opts { + o(&w.options) + } + if len(w.options.Namespace) > 0 { + w.namespace = w.options.Namespace + } + return nil +} + // In the cloudflare workers KV implemention, List() doesn't guarantee // anything as the workers API is eventually consistent. func (w *workersKV) List() ([]*store.Record, error) { @@ -308,7 +318,7 @@ func NewStore(opts ...store.Option) store.Store { } if len(namespace) == 0 { - namespace = getNamespace(options.Context) + namespace = options.Namespace } // validate options are not blank or log.Fatal diff --git a/store/cloudflare/options.go b/store/cloudflare/options.go index d9670ff4..be6bf6f6 100644 --- a/store/cloudflare/options.go +++ b/store/cloudflare/options.go @@ -25,10 +25,6 @@ func getAccount(ctx context.Context) string { return getOption(ctx, "CF_ACCOUNT_ID") } -func getNamespace(ctx context.Context) string { - return getOption(ctx, "KV_NAMESPACE_ID") -} - // Token sets the cloudflare api token func Token(t string) store.Option { return func(o *store.Options) { @@ -52,9 +48,6 @@ func Account(id string) store.Option { // Namespace sets the KV namespace func Namespace(ns string) store.Option { return func(o *store.Options) { - if o.Context == nil { - o.Context = context.Background() - } - o.Context = context.WithValue(o.Context, "KV_NAMESPACE_ID", ns) + o.Namespace = ns } } diff --git a/store/cockroach/cockroach.go b/store/cockroach/cockroach.go index 7979d501..d321dc06 100644 --- a/store/cockroach/cockroach.go +++ b/store/cockroach/cockroach.go @@ -30,6 +30,14 @@ type sqlStore struct { options store.Options } +func (s *sqlStore) Init(opts ...store.Option) error { + for _, o := range opts { + o(&s.options) + } + // reconfigure + return s.configure() +} + // List all the known records func (s *sqlStore) List() ([]*store.Record, error) { rows, err := s.db.Query(fmt.Sprintf("SELECT key, value, expiry FROM %s.%s;", s.database, s.table)) @@ -147,16 +155,16 @@ func (s *sqlStore) Delete(keys ...string) error { return nil } -func (s *sqlStore) initDB() { +func (s *sqlStore) initDB() error { // Create the namespace's database _, err := s.db.Exec(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s ;", s.database)) if err != nil { - log.Fatal(err) + return err } _, err = s.db.Exec(fmt.Sprintf("SET DATABASE = %s ;", s.database)) if err != nil { - log.Fatal(errors.Wrap(err, "Couldn't set database")) + return errors.Wrap(err, "Couldn't set database") } // Create a table for the namespace's prefix @@ -168,8 +176,60 @@ func (s *sqlStore) initDB() { CONSTRAINT %s_pkey PRIMARY KEY (key) );`, s.table, s.table)) if err != nil { - log.Fatal(errors.Wrap(err, "Couldn't create table")) + return errors.Wrap(err, "Couldn't create table") } + + return nil +} + +func (s *sqlStore) configure() error { + nodes := s.options.Nodes + if len(nodes) == 0 { + nodes = []string{"localhost:26257"} + } + + namespace := s.options.Namespace + if len(namespace) == 0 { + namespace = DefaultNamespace + } + + prefix := s.options.Prefix + if len(prefix) == 0 { + prefix = DefaultPrefix + } + + for _, r := range namespace { + if !unicode.IsLetter(r) { + return errors.New("store.namespace must only contain letters") + } + } + + source := nodes[0] + if !strings.Contains(source, " ") { + source = fmt.Sprintf("host=%s", source) + } + + // create source from first node + db, err := sql.Open("postgres", source) + if err != nil { + return err + } + + if err := db.Ping(); err != nil { + return err + } + + if s.db != nil { + s.db.Close() + } + + // save the values + s.db = db + s.database = namespace + s.table = prefix + + // initialise the database + return s.initDB() } // New returns a new micro Store backed by sql @@ -179,46 +239,14 @@ func NewStore(opts ...store.Option) store.Store { o(&options) } - nodes := options.Nodes - if len(nodes) == 0 { - nodes = []string{"localhost:26257"} - } + // new store + s := new(sqlStore) - namespace := options.Namespace - if len(namespace) == 0 { - namespace = DefaultNamespace - } - - prefix := options.Prefix - if len(prefix) == 0 { - prefix = DefaultPrefix - } - - for _, r := range namespace { - if !unicode.IsLetter(r) { - log.Fatal("store.namespace must only contain letters") - } - } - - source := nodes[0] - if !strings.Contains(source, " ") { - source = fmt.Sprintf("host=%s", source) - } - // create source from first node - db, err := sql.Open("postgres", source) - if err != nil { + // configure the store + if err := s.configure(); err != nil { log.Fatal(err) } - if err := db.Ping(); err != nil { - log.Fatal(err) - } - - s := &sqlStore{ - db: db, - database: namespace, - table: prefix, - } - s.initDB() + // return store return s } diff --git a/store/etcd/etcd.go b/store/etcd/etcd.go index c9bc4047..5eac1e50 100644 --- a/store/etcd/etcd.go +++ b/store/etcd/etcd.go @@ -15,6 +15,13 @@ type ekv struct { kv client.KV } +func (e *ekv) Init(opts ...store.Option) error { + for _, o := range opts { + o(&e.options) + } + return nil +} + func (e *ekv) Read(keys ...string) ([]*store.Record, error) { //nolint:prealloc var values []*mvccpb.KeyValue diff --git a/store/memory/memory.go b/store/memory/memory.go index 78e85e82..5715f41e 100644 --- a/store/memory/memory.go +++ b/store/memory/memory.go @@ -20,6 +20,13 @@ type memoryRecord struct { c time.Time } +func (m *memoryStore) Init(opts ...store.Option) error { + for _, o := range opts { + o(&m.options) + } + return nil +} + func (m *memoryStore) List() ([]*store.Record, error) { m.RLock() defer m.RUnlock() diff --git a/store/mock/store.go b/store/mock/store.go index fe296b02..c9908400 100644 --- a/store/mock/store.go +++ b/store/mock/store.go @@ -10,6 +10,10 @@ type Store struct { mock.Mock } +func (_m *Store) Init(...store.Option) error { + return nil +} + // Delete provides a mock function with given fields: key func (_m *Store) Delete(key ...string) error { _va := make([]interface{}, len(key)) diff --git a/store/service/service.go b/store/service/service.go index 37a7a030..e364f794 100644 --- a/store/service/service.go +++ b/store/service/service.go @@ -28,6 +28,17 @@ type serviceStore struct { Client pb.StoreService } +func (s *serviceStore) Init(opts ...store.Option) error { + for _, o := range opts { + o(&s.options) + } + s.Namespace = s.options.Namespace + s.Prefix = s.options.Prefix + s.Nodes = s.options.Nodes + + return nil +} + func (s *serviceStore) Context() context.Context { ctx := context.Background() diff --git a/store/store.go b/store/store.go index 9fb97328..d8c44c62 100644 --- a/store/store.go +++ b/store/store.go @@ -15,6 +15,8 @@ var ( // Store is a data storage interface type Store interface { + // Initialise store options + Init(...Option) error // List all the known records List() ([]*Record, error) // Read records with keys @@ -34,6 +36,10 @@ type Record struct { type noop struct{} +func (n *noop) Init(...Option) error { + return nil +} + func (n *noop) List() ([]*Record, error) { return nil, nil }