change store options

This commit is contained in:
Asim Aslam
2019-12-16 14:38:51 +00:00
parent e8e112144f
commit 59751c02e6
8 changed files with 156 additions and 179 deletions

View File

@@ -4,15 +4,13 @@ package postgresql
import (
"database/sql"
"fmt"
"strings"
"time"
"unicode"
"github.com/lib/pq"
"github.com/pkg/errors"
"github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/store"
"github.com/micro/go-micro/util/log"
"github.com/pkg/errors"
)
// DefaultNamespace is the namespace that the sql store
@@ -27,7 +25,8 @@ type sqlStore struct {
database string
table string
options.Options
options store.Options
}
// List all the known records
@@ -151,38 +150,16 @@ func (s *sqlStore) Delete(keys ...string) error {
return nil
}
func (s *sqlStore) initDB(options options.Options) error {
// Get the store.namespace option, or use sql.DefaultNamespace
namespaceOpt, found := options.Values().Get("store.namespace")
if !found {
s.database = DefaultNamespace
} else {
if namespace, ok := namespaceOpt.(string); ok {
s.database = namespace
} else {
return errors.New("store.namespace option must be a string")
}
}
// Get the store.namespace option, or use sql.DefaultNamespace
prefixOpt, found := options.Values().Get("store.prefix")
if !found {
s.table = DefaultPrefix
} else {
if prefix, ok := prefixOpt.(string); ok {
s.table = prefix
} else {
return errors.New("store.namespace option must be a string")
}
}
func (s *sqlStore) initDB() {
// Create "micro" schema
schema, err := s.db.Prepare(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s ;", s.database))
if err != nil {
return err
log.Fatal(err)
}
_, err = schema.Exec()
if err != nil {
return errors.Wrap(err, "Couldn't create database")
log.Fatal(errors.Wrap(err, "Couldn't create database"))
}
// Create a table for the Store namespace
@@ -194,73 +171,59 @@ func (s *sqlStore) initDB(options options.Options) error {
CONSTRAINT %s_pkey PRIMARY KEY (key)
);`, s.database, s.table, s.table))
if err != nil {
return errors.Wrap(err, "SQL statement preparation failed")
}
_, err = tableq.Exec()
if err != nil {
return errors.Wrap(err, "Couldn't create table")
log.Fatal(errors.Wrap(err, "SQL statement preparation failed"))
}
return nil
_, err = tableq.Exec()
if err != nil {
log.Fatal(errors.Wrap(err, "Couldn't create table"))
}
}
// New returns a new micro Store backed by sql
func New(opts ...options.Option) (store.Store, error) {
options := options.NewOptions(opts...)
driver, dataSourceName, err := validateOptions(options)
func New(opts ...store.Option) store.Store {
var options store.Options
for _, o := range opts {
o(&options)
}
nodes := options.Nodes
if len(nodes) == 0 {
nodes = []string{"localhost:26257"}
}
namespace := options.Namespace
if len(namespace) == 0 {
namespace = DefaultNamespace
}
prefix := options.Prefix
if len(prefix) == 0 {
prefix = DefaultPrefix
}
for _, r := range namespace {
if !unicode.IsLetter(r) {
log.Fatal("store.namespace must only contain letters")
}
}
// create source from first node
source := fmt.Sprintf("host=%s", nodes[0])
db, err := sql.Open("pq", source)
if err != nil {
return nil, err
}
if !strings.Contains(dataSourceName, " ") {
dataSourceName = fmt.Sprintf("host=%s", dataSourceName)
}
db, err := sql.Open(driver, dataSourceName)
if err != nil {
return nil, err
log.Fatal(err)
}
if err := db.Ping(); err != nil {
return nil, err
log.Fatal(err)
}
s := &sqlStore{
db: db,
db: db,
database: namespace,
table: prefix,
}
return s, s.initDB(options)
}
// validateOptions checks whether the provided options are valid, then returns the driver
// and data source name.
func validateOptions(options options.Options) (driver, dataSourceName string, err error) {
driverOpt, found := options.Values().Get("store.sql.driver")
if !found {
return "", "", errors.New("No store.sql.driver option specified")
}
nodesOpt, found := options.Values().Get("store.nodes")
if !found {
return "", "", errors.New("No store.nodes option specified (expected a database connection string)")
}
driver, ok := driverOpt.(string)
if !ok {
return "", "", errors.New("store.sql.driver option must be a string")
}
nodes, ok := nodesOpt.([]string)
if !ok {
return "", "", errors.New("store.nodes option must be a []string")
}
if len(nodes) != 1 {
return "", "", errors.New("expected only 1 store.nodes option")
}
namespaceOpt, found := options.Values().Get("store.namespace")
if found {
namespace, ok := namespaceOpt.(string)
if !ok {
return "", "", errors.New("store.namespace must me a string")
}
for _, r := range namespace {
if !unicode.IsLetter(r) {
return "", "", errors.New("store.namespace must only contain letters")
}
}
}
return driver, nodes[0], nil
return s
}

View File

@@ -27,13 +27,10 @@ func TestSQL(t *testing.T) {
}
db.Close()
sqlStore, err := New(
sqlStore := New(
store.Namespace("testsql"),
store.Nodes(connection),
)
if err != nil {
t.Fatal(err.Error())
}
records, err := sqlStore.List()
if err != nil {