Add opts to service proto (#1517)
* Add opts to service proto * Support database/table opts
This commit is contained in:
@@ -7,6 +7,7 @@ import (
|
||||
"net/url"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/lib/pq"
|
||||
@@ -21,214 +22,51 @@ var (
|
||||
DefaultTable = "micro"
|
||||
)
|
||||
|
||||
var (
|
||||
statements = map[string]string{
|
||||
"list": "SELECT key, value, expiry FROM %s.%s;",
|
||||
"read": "SELECT key, value, expiry FROM %s.%s WHERE key = $1;",
|
||||
"readMany": "SELECT key, value, expiry FROM %s.%s WHERE key LIKE $1;",
|
||||
"readOffset": "SELECT key, value, expiry FROM %s.%s WHERE key LIKE $1 ORDER BY key DESC LIMIT $2 OFFSET $3;",
|
||||
"write": "INSERT INTO %s.%s(key, value, expiry) VALUES ($1, $2::bytea, $3) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, expiry = EXCLUDED.expiry;",
|
||||
"delete": "DELETE FROM %s.%s WHERE key = $1;",
|
||||
}
|
||||
)
|
||||
|
||||
type sqlStore struct {
|
||||
db *sql.DB
|
||||
|
||||
database string
|
||||
table string
|
||||
|
||||
list *sql.Stmt
|
||||
readOne *sql.Stmt
|
||||
readMany *sql.Stmt
|
||||
readOffset *sql.Stmt
|
||||
write *sql.Stmt
|
||||
delete *sql.Stmt
|
||||
|
||||
options store.Options
|
||||
db *sql.DB
|
||||
|
||||
sync.RWMutex
|
||||
// known databases
|
||||
databases map[string]bool
|
||||
}
|
||||
|
||||
func (s *sqlStore) Close() error {
|
||||
closeStmt(s.delete)
|
||||
closeStmt(s.list)
|
||||
closeStmt(s.readMany)
|
||||
closeStmt(s.readOffset)
|
||||
closeStmt(s.readOne)
|
||||
closeStmt(s.write)
|
||||
if s.db != nil {
|
||||
return s.db.Close()
|
||||
func (s *sqlStore) createDB(database, table string) {
|
||||
if len(database) == 0 {
|
||||
database = s.options.Database
|
||||
}
|
||||
return nil
|
||||
if len(table) == 0 {
|
||||
table = s.options.Table
|
||||
}
|
||||
|
||||
s.Lock()
|
||||
_, ok := s.databases[database+":"+table]
|
||||
if !ok {
|
||||
s.initDB(database, table)
|
||||
s.databases[database+":"+table] = true
|
||||
}
|
||||
s.Unlock()
|
||||
}
|
||||
|
||||
func (s *sqlStore) Init(opts ...store.Option) error {
|
||||
for _, o := range opts {
|
||||
o(&s.options)
|
||||
}
|
||||
// reconfigure
|
||||
return s.configure()
|
||||
}
|
||||
|
||||
// List all the known records
|
||||
func (s *sqlStore) List(opts ...store.ListOption) ([]string, error) {
|
||||
rows, err := s.list.Query()
|
||||
var keys []string
|
||||
var timehelper pq.NullTime
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return keys, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
for rows.Next() {
|
||||
record := &store.Record{}
|
||||
if err := rows.Scan(&record.Key, &record.Value, &timehelper); err != nil {
|
||||
return keys, err
|
||||
}
|
||||
if timehelper.Valid {
|
||||
if timehelper.Time.Before(time.Now()) {
|
||||
// record has expired
|
||||
go s.Delete(record.Key)
|
||||
} else {
|
||||
record.Expiry = time.Until(timehelper.Time)
|
||||
keys = append(keys, record.Key)
|
||||
}
|
||||
} else {
|
||||
keys = append(keys, record.Key)
|
||||
}
|
||||
|
||||
}
|
||||
rowErr := rows.Close()
|
||||
if rowErr != nil {
|
||||
// transaction rollback or something
|
||||
return keys, rowErr
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return keys, err
|
||||
}
|
||||
return keys, nil
|
||||
}
|
||||
|
||||
// Read a single key
|
||||
func (s *sqlStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) {
|
||||
var options store.ReadOptions
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
if options.Prefix || options.Suffix {
|
||||
return s.read(key, options)
|
||||
}
|
||||
|
||||
var records []*store.Record
|
||||
var timehelper pq.NullTime
|
||||
|
||||
row := s.readOne.QueryRow(key)
|
||||
record := &store.Record{}
|
||||
if err := row.Scan(&record.Key, &record.Value, &timehelper); err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return records, store.ErrNotFound
|
||||
}
|
||||
return records, err
|
||||
}
|
||||
if timehelper.Valid {
|
||||
if timehelper.Time.Before(time.Now()) {
|
||||
// record has expired
|
||||
go s.Delete(key)
|
||||
return records, store.ErrNotFound
|
||||
}
|
||||
record.Expiry = time.Until(timehelper.Time)
|
||||
records = append(records, record)
|
||||
} else {
|
||||
records = append(records, record)
|
||||
}
|
||||
|
||||
return records, nil
|
||||
}
|
||||
|
||||
// Read Many records
|
||||
func (s *sqlStore) read(key string, options store.ReadOptions) ([]*store.Record, error) {
|
||||
pattern := "%"
|
||||
if options.Prefix {
|
||||
pattern = key + pattern
|
||||
}
|
||||
if options.Suffix {
|
||||
pattern = pattern + key
|
||||
}
|
||||
var rows *sql.Rows
|
||||
var err error
|
||||
if options.Limit != 0 {
|
||||
rows, err = s.readOffset.Query(pattern, options.Limit, options.Offset)
|
||||
} else {
|
||||
rows, err = s.readMany.Query(pattern)
|
||||
}
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return []*store.Record{}, nil
|
||||
}
|
||||
return []*store.Record{}, errors.Wrap(err, "sqlStore.read failed")
|
||||
}
|
||||
defer rows.Close()
|
||||
var records []*store.Record
|
||||
var timehelper pq.NullTime
|
||||
|
||||
for rows.Next() {
|
||||
record := &store.Record{}
|
||||
if err := rows.Scan(&record.Key, &record.Value, &timehelper); err != nil {
|
||||
return records, err
|
||||
}
|
||||
if timehelper.Valid {
|
||||
if timehelper.Time.Before(time.Now()) {
|
||||
// record has expired
|
||||
go s.Delete(record.Key)
|
||||
} else {
|
||||
record.Expiry = time.Until(timehelper.Time)
|
||||
records = append(records, record)
|
||||
}
|
||||
} else {
|
||||
records = append(records, record)
|
||||
}
|
||||
}
|
||||
rowErr := rows.Close()
|
||||
if rowErr != nil {
|
||||
// transaction rollback or something
|
||||
return records, rowErr
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return records, err
|
||||
}
|
||||
|
||||
return records, nil
|
||||
}
|
||||
|
||||
// Write records
|
||||
func (s *sqlStore) Write(r *store.Record, opts ...store.WriteOption) error {
|
||||
var err error
|
||||
if r.Expiry != 0 {
|
||||
_, err = s.write.Exec(r.Key, r.Value, time.Now().Add(r.Expiry))
|
||||
} else {
|
||||
_, err = s.write.Exec(r.Key, r.Value, nil)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Couldn't insert record "+r.Key)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete records with keys
|
||||
func (s *sqlStore) Delete(key string, opts ...store.DeleteOption) error {
|
||||
result, err := s.delete.Exec(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = result.RowsAffected()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *sqlStore) initDB() error {
|
||||
func (s *sqlStore) initDB(database, table string) error {
|
||||
// Create the namespace's database
|
||||
_, err := s.db.Exec(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s ;", s.database))
|
||||
_, err := s.db.Exec(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s ;", database))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = s.db.Exec(fmt.Sprintf("SET DATABASE = %s ;", s.database))
|
||||
_, err = s.db.Exec(fmt.Sprintf("SET DATABASE = %s ;", database))
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Couldn't set database")
|
||||
}
|
||||
@@ -240,58 +78,17 @@ func (s *sqlStore) initDB() error {
|
||||
value bytea,
|
||||
expiry timestamp with time zone,
|
||||
CONSTRAINT %s_pkey PRIMARY KEY (key)
|
||||
);`, s.table, s.table))
|
||||
);`, table, table))
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Couldn't create table")
|
||||
}
|
||||
|
||||
// Create Index
|
||||
_, err = s.db.Exec(fmt.Sprintf(`CREATE INDEX IF NOT EXISTS "%s" ON %s.%s USING btree ("key");`, "key_index_"+s.table, s.database, s.table))
|
||||
_, err = s.db.Exec(fmt.Sprintf(`CREATE INDEX IF NOT EXISTS "%s" ON %s.%s USING btree ("key");`, "key_index_"+table, database, table))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
list, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM %s.%s;", s.database, s.table))
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "List statement couldn't be prepared")
|
||||
}
|
||||
closeStmt(s.list)
|
||||
s.list = list
|
||||
readOne, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM %s.%s WHERE key = $1;", s.database, s.table))
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "ReadOne statement couldn't be prepared")
|
||||
}
|
||||
closeStmt(s.readOne)
|
||||
s.readOne = readOne
|
||||
readMany, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM %s.%s WHERE key LIKE $1;", s.database, s.table))
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "ReadMany statement couldn't be prepared")
|
||||
}
|
||||
closeStmt(s.readMany)
|
||||
s.readMany = readMany
|
||||
readOffset, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM %s.%s WHERE key LIKE $1 ORDER BY key DESC LIMIT $2 OFFSET $3;", s.database, s.table))
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "ReadOffset statement couldn't be prepared")
|
||||
}
|
||||
closeStmt(s.readOffset)
|
||||
s.readOffset = readOffset
|
||||
write, err := s.db.Prepare(fmt.Sprintf(`INSERT INTO %s.%s(key, value, expiry)
|
||||
VALUES ($1, $2::bytea, $3)
|
||||
ON CONFLICT (key)
|
||||
DO UPDATE
|
||||
SET value = EXCLUDED.value, expiry = EXCLUDED.expiry;`, s.database, s.table))
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Write statement couldn't be prepared")
|
||||
}
|
||||
closeStmt(s.write)
|
||||
s.write = write
|
||||
delete, err := s.db.Prepare(fmt.Sprintf("DELETE FROM %s.%s WHERE key = $1;", s.database, s.table))
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Delete statement couldn't be prepared")
|
||||
}
|
||||
closeStmt(s.delete)
|
||||
s.delete = delete
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -344,21 +141,286 @@ func (s *sqlStore) configure() error {
|
||||
|
||||
// save the values
|
||||
s.db = db
|
||||
s.database = database
|
||||
s.table = table
|
||||
|
||||
// initialise the database
|
||||
return s.initDB()
|
||||
return s.initDB(s.options.Database, s.options.Table)
|
||||
}
|
||||
|
||||
func (s *sqlStore) String() string {
|
||||
return "cockroach"
|
||||
func (s *sqlStore) prepare(database, table, query string) (*sql.Stmt, error) {
|
||||
st, ok := statements[query]
|
||||
if !ok {
|
||||
return nil, errors.New("unsupported statement")
|
||||
}
|
||||
if len(database) == 0 {
|
||||
database = s.options.Database
|
||||
}
|
||||
if len(table) == 0 {
|
||||
table = s.options.Table
|
||||
}
|
||||
|
||||
q := fmt.Sprintf(st, database, table)
|
||||
stmt, err := s.db.Prepare(q)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return stmt, nil
|
||||
}
|
||||
|
||||
func (s *sqlStore) Close() error {
|
||||
if s.db != nil {
|
||||
return s.db.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *sqlStore) Init(opts ...store.Option) error {
|
||||
for _, o := range opts {
|
||||
o(&s.options)
|
||||
}
|
||||
// reconfigure
|
||||
return s.configure()
|
||||
}
|
||||
|
||||
// List all the known records
|
||||
func (s *sqlStore) List(opts ...store.ListOption) ([]string, error) {
|
||||
var options store.ListOptions
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
// create the db if not exists
|
||||
s.createDB(options.Database, options.Table)
|
||||
|
||||
st, err := s.prepare(options.Database, options.Table, "list")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer st.Close()
|
||||
|
||||
rows, err := st.Query()
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var keys []string
|
||||
var timehelper pq.NullTime
|
||||
|
||||
for rows.Next() {
|
||||
record := &store.Record{}
|
||||
if err := rows.Scan(&record.Key, &record.Value, &timehelper); err != nil {
|
||||
return keys, err
|
||||
}
|
||||
if timehelper.Valid {
|
||||
if timehelper.Time.Before(time.Now()) {
|
||||
// record has expired
|
||||
go s.Delete(record.Key)
|
||||
} else {
|
||||
record.Expiry = time.Until(timehelper.Time)
|
||||
keys = append(keys, record.Key)
|
||||
}
|
||||
} else {
|
||||
keys = append(keys, record.Key)
|
||||
}
|
||||
|
||||
}
|
||||
rowErr := rows.Close()
|
||||
if rowErr != nil {
|
||||
// transaction rollback or something
|
||||
return keys, rowErr
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return keys, err
|
||||
}
|
||||
return keys, nil
|
||||
}
|
||||
|
||||
// Read a single key
|
||||
func (s *sqlStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) {
|
||||
var options store.ReadOptions
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
// create the db if not exists
|
||||
s.createDB(options.Database, options.Table)
|
||||
|
||||
if options.Prefix || options.Suffix {
|
||||
return s.read(key, options)
|
||||
}
|
||||
|
||||
var records []*store.Record
|
||||
var timehelper pq.NullTime
|
||||
|
||||
st, err := s.prepare(options.Database, options.Table, "read")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer st.Close()
|
||||
|
||||
row := st.QueryRow(key)
|
||||
record := &store.Record{}
|
||||
if err := row.Scan(&record.Key, &record.Value, &timehelper); err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return records, store.ErrNotFound
|
||||
}
|
||||
return records, err
|
||||
}
|
||||
if timehelper.Valid {
|
||||
if timehelper.Time.Before(time.Now()) {
|
||||
// record has expired
|
||||
go s.Delete(key)
|
||||
return records, store.ErrNotFound
|
||||
}
|
||||
record.Expiry = time.Until(timehelper.Time)
|
||||
records = append(records, record)
|
||||
} else {
|
||||
records = append(records, record)
|
||||
}
|
||||
|
||||
return records, nil
|
||||
}
|
||||
|
||||
// Read Many records
|
||||
func (s *sqlStore) read(key string, options store.ReadOptions) ([]*store.Record, error) {
|
||||
pattern := "%"
|
||||
if options.Prefix {
|
||||
pattern = key + pattern
|
||||
}
|
||||
if options.Suffix {
|
||||
pattern = pattern + key
|
||||
}
|
||||
|
||||
var rows *sql.Rows
|
||||
var err error
|
||||
|
||||
if options.Limit != 0 {
|
||||
st, err := s.prepare(options.Database, options.Table, "readOffset")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer st.Close()
|
||||
|
||||
rows, err = st.Query(pattern, options.Limit, options.Offset)
|
||||
} else {
|
||||
st, err := s.prepare(options.Database, options.Table, "readMany")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer st.Close()
|
||||
|
||||
rows, err = st.Query(pattern)
|
||||
}
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return []*store.Record{}, nil
|
||||
}
|
||||
return []*store.Record{}, errors.Wrap(err, "sqlStore.read failed")
|
||||
}
|
||||
|
||||
defer rows.Close()
|
||||
|
||||
var records []*store.Record
|
||||
var timehelper pq.NullTime
|
||||
|
||||
for rows.Next() {
|
||||
record := &store.Record{}
|
||||
if err := rows.Scan(&record.Key, &record.Value, &timehelper); err != nil {
|
||||
return records, err
|
||||
}
|
||||
if timehelper.Valid {
|
||||
if timehelper.Time.Before(time.Now()) {
|
||||
// record has expired
|
||||
go s.Delete(record.Key)
|
||||
} else {
|
||||
record.Expiry = time.Until(timehelper.Time)
|
||||
records = append(records, record)
|
||||
}
|
||||
} else {
|
||||
records = append(records, record)
|
||||
}
|
||||
}
|
||||
rowErr := rows.Close()
|
||||
if rowErr != nil {
|
||||
// transaction rollback or something
|
||||
return records, rowErr
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return records, err
|
||||
}
|
||||
|
||||
return records, nil
|
||||
}
|
||||
|
||||
// Write records
|
||||
func (s *sqlStore) Write(r *store.Record, opts ...store.WriteOption) error {
|
||||
var options store.WriteOptions
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
// create the db if not exists
|
||||
s.createDB(options.Database, options.Table)
|
||||
|
||||
st, err := s.prepare(options.Database, options.Table, "write")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer st.Close()
|
||||
|
||||
if r.Expiry != 0 {
|
||||
_, err = st.Exec(r.Key, r.Value, time.Now().Add(r.Expiry))
|
||||
} else {
|
||||
_, err = st.Exec(r.Key, r.Value, nil)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Couldn't insert record "+r.Key)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete records with keys
|
||||
func (s *sqlStore) Delete(key string, opts ...store.DeleteOption) error {
|
||||
var options store.DeleteOptions
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
// create the db if not exists
|
||||
s.createDB(options.Database, options.Table)
|
||||
|
||||
st, err := s.prepare(options.Database, options.Table, "delete")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer st.Close()
|
||||
|
||||
result, err := st.Exec(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = result.RowsAffected()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *sqlStore) Options() store.Options {
|
||||
return s.options
|
||||
}
|
||||
|
||||
func (s *sqlStore) String() string {
|
||||
return "cockroach"
|
||||
}
|
||||
|
||||
// NewStore returns a new micro Store backed by sql
|
||||
func NewStore(opts ...store.Option) store.Store {
|
||||
var options store.Options
|
||||
@@ -370,16 +432,11 @@ func NewStore(opts ...store.Option) store.Store {
|
||||
s := new(sqlStore)
|
||||
// set the options
|
||||
s.options = options
|
||||
|
||||
// mark known databases
|
||||
s.databases = make(map[string]bool)
|
||||
// best-effort configure the store
|
||||
s.configure()
|
||||
|
||||
// return store
|
||||
return s
|
||||
}
|
||||
|
||||
func closeStmt(s *sql.Stmt) {
|
||||
if s != nil {
|
||||
s.Close()
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user