2019-12-16 15:09:59 +00:00
// Package cockroach implements the cockroach store
package cockroach
2019-11-01 14:13:21 +00:00
import (
"database/sql"
"fmt"
2020-01-29 00:47:41 +08:00
"net/url"
2020-03-01 22:09:06 +00:00
"regexp"
2019-12-16 15:09:59 +00:00
"strings"
2020-04-30 22:51:25 +01:00
"sync"
2019-11-01 14:13:21 +00:00
"time"
"github.com/lib/pq"
2020-05-20 14:03:38 +01:00
"github.com/micro/go-micro/v2/logger"
2020-01-30 14:39:00 +03:00
"github.com/micro/go-micro/v2/store"
2019-12-16 14:38:51 +00:00
"github.com/pkg/errors"
2019-11-01 14:13:21 +00:00
)
2020-04-07 13:00:05 +01:00
// DefaultDatabase is the namespace that the sql store
2019-11-01 14:13:21 +00:00
// will use if no namespace is provided.
2019-12-16 12:13:18 +00:00
var (
2020-04-07 13:00:05 +01:00
DefaultDatabase = "micro"
2020-04-08 12:08:08 +01:00
DefaultTable = "micro"
2019-12-16 12:13:18 +00:00
)
2019-11-01 14:13:21 +00:00
2020-04-30 22:51:25 +01:00
var (
2020-05-01 15:31:55 +01:00
re = regexp . MustCompile ( "[^a-zA-Z0-9]+" )
2020-04-30 22:51:25 +01:00
statements = map [ string ] string {
"list" : "SELECT key, value, expiry FROM %s.%s;" ,
"read" : "SELECT key, value, expiry FROM %s.%s WHERE key = $1;" ,
"readMany" : "SELECT key, value, expiry FROM %s.%s WHERE key LIKE $1;" ,
"readOffset" : "SELECT key, value, expiry FROM %s.%s WHERE key LIKE $1 ORDER BY key DESC LIMIT $2 OFFSET $3;" ,
"write" : "INSERT INTO %s.%s(key, value, expiry) VALUES ($1, $2::bytea, $3) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, expiry = EXCLUDED.expiry;" ,
"delete" : "DELETE FROM %s.%s WHERE key = $1;" ,
}
)
2019-11-01 14:13:21 +00:00
type sqlStore struct {
2020-04-30 22:51:25 +01:00
options store . Options
db * sql . DB
2019-11-01 14:13:21 +00:00
2020-04-30 22:51:25 +01:00
sync . RWMutex
// known databases
databases map [ string ] bool
}
2019-12-16 14:38:51 +00:00
2020-05-01 15:31:55 +01:00
func ( s * sqlStore ) getDB ( database , table string ) ( string , string ) {
2020-04-30 22:51:25 +01:00
if len ( database ) == 0 {
2020-05-01 15:31:55 +01:00
if len ( s . options . Database ) > 0 {
database = s . options . Database
} else {
database = DefaultDatabase
}
2020-04-30 22:51:25 +01:00
}
2020-05-01 15:31:55 +01:00
2020-04-30 22:51:25 +01:00
if len ( table ) == 0 {
2020-05-01 15:31:55 +01:00
if len ( s . options . Table ) > 0 {
table = s . options . Table
} else {
database = DefaultTable
}
2020-04-30 22:51:25 +01:00
}
2020-03-12 13:41:30 +00:00
2020-05-01 15:31:55 +01:00
// store.namespace must only contain letters, numbers and underscores
database = re . ReplaceAllString ( database , "_" )
table = re . ReplaceAllString ( table , "_" )
return database , table
}
2020-05-06 10:58:14 +01:00
func ( s * sqlStore ) createDB ( database , table string ) error {
2020-05-01 15:31:55 +01:00
database , table = s . getDB ( database , table )
2020-04-30 22:51:25 +01:00
s . Lock ( )
2020-05-06 10:58:14 +01:00
defer s . Unlock ( )
if _ , ok := s . databases [ database + ":" + table ] ; ok {
return nil
}
if err := s . initDB ( database , table ) ; err != nil {
return err
2020-04-30 22:51:25 +01:00
}
2020-05-06 10:58:14 +01:00
s . databases [ database + ":" + table ] = true
return nil
2020-04-30 22:51:25 +01:00
}
func ( s * sqlStore ) initDB ( database , table string ) error {
2020-05-20 14:03:38 +01:00
if s . db == nil {
return errors . New ( "Database connection not initialised" )
}
2020-04-30 22:51:25 +01:00
// Create the namespace's database
2020-04-30 23:53:54 +01:00
_ , err := s . db . Exec ( fmt . Sprintf ( "CREATE DATABASE IF NOT EXISTS %s;" , database ) )
2020-04-30 22:51:25 +01:00
if err != nil {
return err
}
2020-04-30 23:53:54 +01:00
_ , err = s . db . Exec ( fmt . Sprintf ( "SET DATABASE = %s;" , database ) )
2020-04-30 22:51:25 +01:00
if err != nil {
return errors . Wrap ( err , "Couldn't set database" )
}
// Create a table for the namespace's prefix
_ , err = s . db . Exec ( fmt . Sprintf ( ` CREATE TABLE IF NOT EXISTS % s
(
key text NOT NULL ,
value bytea ,
expiry timestamp with time zone ,
CONSTRAINT % s_pkey PRIMARY KEY ( key )
) ; ` , table , table ) )
if err != nil {
return errors . Wrap ( err , "Couldn't create table" )
}
// Create Index
_ , err = s . db . Exec ( fmt . Sprintf ( ` CREATE INDEX IF NOT EXISTS "%s" ON %s.%s USING btree ("key"); ` , "key_index_" + table , database , table ) )
if err != nil {
return err
}
return nil
}
func ( s * sqlStore ) configure ( ) error {
if len ( s . options . Nodes ) == 0 {
s . options . Nodes = [ ] string { "postgresql://root@localhost:26257?sslmode=disable" }
}
source := s . options . Nodes [ 0 ]
// check if it is a standard connection string eg: host=%s port=%d user=%s password=%s dbname=%s sslmode=disable
// if err is nil which means it would be a URL like postgre://xxxx?yy=zz
2020-05-01 15:31:55 +01:00
_ , err := url . Parse ( source )
2020-04-30 22:51:25 +01:00
if err != nil {
if ! strings . Contains ( source , " " ) {
source = fmt . Sprintf ( "host=%s" , source )
}
}
// create source from first node
db , err := sql . Open ( "postgres" , source )
if err != nil {
return err
}
if err := db . Ping ( ) ; err != nil {
return err
}
if s . db != nil {
s . db . Close ( )
}
// save the values
s . db = db
2020-05-01 15:31:55 +01:00
// get DB
database , table := s . getDB ( s . options . Database , s . options . Table )
2020-04-30 22:51:25 +01:00
// initialise the database
2020-05-01 15:31:55 +01:00
return s . initDB ( database , table )
2020-04-30 22:51:25 +01:00
}
func ( s * sqlStore ) prepare ( database , table , query string ) ( * sql . Stmt , error ) {
st , ok := statements [ query ]
if ! ok {
return nil , errors . New ( "unsupported statement" )
}
2020-05-01 15:31:55 +01:00
// get DB
database , table = s . getDB ( database , table )
2020-04-30 22:51:25 +01:00
q := fmt . Sprintf ( st , database , table )
stmt , err := s . db . Prepare ( q )
if err != nil {
return nil , err
}
return stmt , nil
2019-11-01 14:13:21 +00:00
}
2020-04-08 09:51:10 +01:00
func ( s * sqlStore ) Close ( ) error {
if s . db != nil {
return s . db . Close ( )
}
return nil
}
2020-01-08 12:11:31 +00:00
func ( s * sqlStore ) Init ( opts ... store . Option ) error {
for _ , o := range opts {
o ( & s . options )
}
// reconfigure
return s . configure ( )
}
2019-11-01 14:13:21 +00:00
// List all the known records
2020-03-12 13:41:30 +00:00
func ( s * sqlStore ) List ( opts ... store . ListOption ) ( [ ] string , error ) {
2020-04-30 22:51:25 +01:00
var options store . ListOptions
for _ , o := range opts {
o ( & options )
}
// create the db if not exists
2020-05-06 10:58:14 +01:00
if err := s . createDB ( options . Database , options . Table ) ; err != nil {
return nil , err
}
2020-04-30 22:51:25 +01:00
st , err := s . prepare ( options . Database , options . Table , "list" )
if err != nil {
return nil , err
}
defer st . Close ( )
rows , err := st . Query ( )
2019-11-01 14:13:21 +00:00
if err != nil {
if err == sql . ErrNoRows {
2020-04-30 22:51:25 +01:00
return nil , nil
2019-11-01 14:13:21 +00:00
}
return nil , err
}
defer rows . Close ( )
2020-04-30 22:51:25 +01:00
var keys [ ] string
var timehelper pq . NullTime
2019-11-01 14:13:21 +00:00
for rows . Next ( ) {
record := & store . Record { }
if err := rows . Scan ( & record . Key , & record . Value , & timehelper ) ; err != nil {
2020-03-12 13:41:30 +00:00
return keys , err
2019-11-01 14:13:21 +00:00
}
if timehelper . Valid {
if timehelper . Time . Before ( time . Now ( ) ) {
// record has expired
go s . Delete ( record . Key )
} else {
record . Expiry = time . Until ( timehelper . Time )
2020-03-12 13:41:30 +00:00
keys = append ( keys , record . Key )
2019-11-01 14:13:21 +00:00
}
} else {
2020-03-12 13:41:30 +00:00
keys = append ( keys , record . Key )
2019-11-01 14:13:21 +00:00
}
}
rowErr := rows . Close ( )
if rowErr != nil {
// transaction rollback or something
2020-03-12 13:41:30 +00:00
return keys , rowErr
2019-11-01 14:13:21 +00:00
}
if err := rows . Err ( ) ; err != nil {
2020-03-12 13:41:30 +00:00
return keys , err
2019-11-01 14:13:21 +00:00
}
2020-03-12 13:41:30 +00:00
return keys , nil
2019-11-01 14:13:21 +00:00
}
2020-03-12 13:41:30 +00:00
// Read a single key
2020-01-08 22:23:14 +00:00
func ( s * sqlStore ) Read ( key string , opts ... store . ReadOption ) ( [ ] * store . Record , error ) {
var options store . ReadOptions
for _ , o := range opts {
o ( & options )
}
2020-04-30 22:51:25 +01:00
// create the db if not exists
2020-05-06 10:58:14 +01:00
if err := s . createDB ( options . Database , options . Table ) ; err != nil {
return nil , err
}
2020-04-30 22:51:25 +01:00
2020-03-17 16:15:23 +00:00
if options . Prefix || options . Suffix {
return s . read ( key , options )
}
2020-01-08 22:23:14 +00:00
2019-11-01 14:13:21 +00:00
var records [ ] * store . Record
var timehelper pq . NullTime
2020-01-08 22:23:14 +00:00
2020-04-30 22:51:25 +01:00
st , err := s . prepare ( options . Database , options . Table , "read" )
if err != nil {
return nil , err
}
defer st . Close ( )
row := st . QueryRow ( key )
2020-01-08 22:23:14 +00:00
record := & store . Record { }
if err := row . Scan ( & record . Key , & record . Value , & timehelper ) ; err != nil {
if err == sql . ErrNoRows {
return records , store . ErrNotFound
2019-11-01 14:13:21 +00:00
}
2020-01-08 22:23:14 +00:00
return records , err
}
if timehelper . Valid {
if timehelper . Time . Before ( time . Now ( ) ) {
// record has expired
go s . Delete ( key )
return records , store . ErrNotFound
2019-11-01 14:13:21 +00:00
}
2020-01-08 22:23:14 +00:00
record . Expiry = time . Until ( timehelper . Time )
records = append ( records , record )
} else {
records = append ( records , record )
2019-11-01 14:13:21 +00:00
}
2020-01-08 22:23:14 +00:00
2019-11-01 14:13:21 +00:00
return records , nil
}
2020-03-17 16:15:23 +00:00
// Read Many records
func ( s * sqlStore ) read ( key string , options store . ReadOptions ) ( [ ] * store . Record , error ) {
pattern := "%"
if options . Prefix {
pattern = key + pattern
}
if options . Suffix {
pattern = pattern + key
}
2020-04-30 22:51:25 +01:00
2020-03-17 16:15:23 +00:00
var rows * sql . Rows
var err error
2020-04-30 22:51:25 +01:00
2020-03-17 16:15:23 +00:00
if options . Limit != 0 {
2020-04-30 22:51:25 +01:00
st , err := s . prepare ( options . Database , options . Table , "readOffset" )
if err != nil {
return nil , err
}
defer st . Close ( )
rows , err = st . Query ( pattern , options . Limit , options . Offset )
2020-03-17 16:15:23 +00:00
} else {
2020-04-30 22:51:25 +01:00
st , err := s . prepare ( options . Database , options . Table , "readMany" )
if err != nil {
return nil , err
}
defer st . Close ( )
rows , err = st . Query ( pattern )
2020-03-17 16:15:23 +00:00
}
if err != nil {
if err == sql . ErrNoRows {
return [ ] * store . Record { } , nil
}
return [ ] * store . Record { } , errors . Wrap ( err , "sqlStore.read failed" )
}
2020-04-30 22:51:25 +01:00
2020-03-17 16:15:23 +00:00
defer rows . Close ( )
2020-04-30 22:51:25 +01:00
2020-03-17 16:15:23 +00:00
var records [ ] * store . Record
var timehelper pq . NullTime
for rows . Next ( ) {
record := & store . Record { }
if err := rows . Scan ( & record . Key , & record . Value , & timehelper ) ; err != nil {
return records , err
}
if timehelper . Valid {
if timehelper . Time . Before ( time . Now ( ) ) {
// record has expired
go s . Delete ( record . Key )
} else {
record . Expiry = time . Until ( timehelper . Time )
records = append ( records , record )
}
} else {
records = append ( records , record )
}
}
rowErr := rows . Close ( )
if rowErr != nil {
// transaction rollback or something
return records , rowErr
}
if err := rows . Err ( ) ; err != nil {
return records , err
}
return records , nil
}
2019-11-01 14:13:21 +00:00
// Write records
2020-03-12 13:41:30 +00:00
func ( s * sqlStore ) Write ( r * store . Record , opts ... store . WriteOption ) error {
2020-04-30 22:51:25 +01:00
var options store . WriteOptions
for _ , o := range opts {
o ( & options )
}
// create the db if not exists
2020-05-06 10:58:14 +01:00
if err := s . createDB ( options . Database , options . Table ) ; err != nil {
return err
}
2020-04-30 22:51:25 +01:00
st , err := s . prepare ( options . Database , options . Table , "write" )
if err != nil {
return err
}
defer st . Close ( )
2020-01-08 22:23:14 +00:00
if r . Expiry != 0 {
2020-04-30 22:51:25 +01:00
_ , err = st . Exec ( r . Key , r . Value , time . Now ( ) . Add ( r . Expiry ) )
2020-01-08 22:23:14 +00:00
} else {
2020-04-30 22:51:25 +01:00
_ , err = st . Exec ( r . Key , r . Value , nil )
2020-01-08 22:23:14 +00:00
}
if err != nil {
return errors . Wrap ( err , "Couldn't insert record " + r . Key )
2019-11-01 14:13:21 +00:00
}
return nil
}
// Delete records with keys
2020-03-12 13:41:30 +00:00
func ( s * sqlStore ) Delete ( key string , opts ... store . DeleteOption ) error {
2020-04-30 22:51:25 +01:00
var options store . DeleteOptions
for _ , o := range opts {
o ( & options )
2020-01-08 22:23:14 +00:00
}
2020-04-30 22:51:25 +01:00
// create the db if not exists
2020-05-06 10:58:14 +01:00
if err := s . createDB ( options . Database , options . Table ) ; err != nil {
return err
}
2019-11-01 14:13:21 +00:00
2020-04-30 22:51:25 +01:00
st , err := s . prepare ( options . Database , options . Table , "delete" )
2019-11-01 14:13:21 +00:00
if err != nil {
2020-01-08 12:11:31 +00:00
return err
2019-11-01 14:13:21 +00:00
}
2020-04-30 22:51:25 +01:00
defer st . Close ( )
2019-12-16 14:38:51 +00:00
2020-04-30 22:51:25 +01:00
result , err := st . Exec ( key )
2020-03-17 16:15:23 +00:00
if err != nil {
return err
}
2020-04-30 22:51:25 +01:00
_ , err = result . RowsAffected ( )
2020-03-12 13:41:30 +00:00
if err != nil {
2020-04-30 22:51:25 +01:00
return err
2020-03-12 13:41:30 +00:00
}
2020-01-08 12:11:31 +00:00
return nil
}
2019-11-01 14:13:21 +00:00
2020-04-30 22:51:25 +01:00
func ( s * sqlStore ) Options ( ) store . Options {
return s . options
2020-01-08 12:11:31 +00:00
}
2020-01-10 19:13:55 +00:00
func ( s * sqlStore ) String ( ) string {
return "cockroach"
}
2020-03-12 13:41:30 +00:00
// NewStore returns a new micro Store backed by sql
2020-01-08 12:11:31 +00:00
func NewStore ( opts ... store . Option ) store . Store {
2020-04-30 23:53:54 +01:00
options := store . Options {
Database : DefaultDatabase ,
2020-05-01 15:31:55 +01:00
Table : DefaultTable ,
2020-04-30 23:53:54 +01:00
}
2020-01-08 12:11:31 +00:00
for _ , o := range opts {
o ( & options )
}
// new store
s := new ( sqlStore )
2020-01-10 19:13:55 +00:00
// set the options
s . options = options
2020-04-30 22:51:25 +01:00
// mark known databases
s . databases = make ( map [ string ] bool )
2020-03-24 17:16:38 +00:00
// best-effort configure the store
2020-05-20 14:03:38 +01:00
if err := s . configure ( ) ; err != nil {
if logger . V ( logger . ErrorLevel , logger . DefaultLogger ) {
logger . Error ( "Error configuring store " , err )
}
}
2020-01-08 12:11:31 +00:00
// return store
2019-12-16 14:38:51 +00:00
return s
2019-11-01 14:13:21 +00:00
}