micro/store/cockroach/cockroach.go

272 lines
5.6 KiB
Go
Raw Normal View History

2019-12-16 15:09:59 +00:00
// Package cockroach implements the cockroach store
package cockroach
2019-11-01 14:13:21 +00:00
import (
"database/sql"
"fmt"
"net/url"
2019-12-16 15:09:59 +00:00
"strings"
2019-11-01 14:13:21 +00:00
"time"
"unicode"
"github.com/lib/pq"
"github.com/micro/go-micro/v2/store"
"github.com/micro/go-micro/v2/util/log"
2019-12-16 14:38:51 +00:00
"github.com/pkg/errors"
2019-11-01 14:13:21 +00:00
)
// DefaultNamespace is the namespace that the sql store
// will use if no namespace is provided.
var (
DefaultNamespace = "micro"
DefaultPrefix = "micro"
)
2019-11-01 14:13:21 +00:00
type sqlStore struct {
db *sql.DB
database string
table string
2019-12-16 14:38:51 +00:00
options store.Options
2019-11-01 14:13:21 +00:00
}
func (s *sqlStore) Init(opts ...store.Option) error {
for _, o := range opts {
o(&s.options)
}
// reconfigure
return s.configure()
}
2019-11-01 14:13:21 +00:00
// List all the known records
func (s *sqlStore) List() ([]*store.Record, error) {
2019-12-16 17:11:13 +00:00
rows, err := s.db.Query(fmt.Sprintf("SELECT key, value, expiry FROM %s.%s;", s.database, s.table))
2019-11-01 14:13:21 +00:00
var records []*store.Record
var timehelper pq.NullTime
if err != nil {
if err == sql.ErrNoRows {
return records, nil
}
return nil, err
}
defer rows.Close()
for rows.Next() {
record := &store.Record{}
if err := rows.Scan(&record.Key, &record.Value, &timehelper); err != nil {
return records, err
}
if timehelper.Valid {
if timehelper.Time.Before(time.Now()) {
// record has expired
go s.Delete(record.Key)
} else {
record.Expiry = time.Until(timehelper.Time)
records = append(records, record)
}
} else {
records = append(records, record)
}
}
rowErr := rows.Close()
if rowErr != nil {
// transaction rollback or something
return records, rowErr
}
if err := rows.Err(); err != nil {
return records, err
}
return records, nil
}
// Read all records with keys
func (s *sqlStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) {
var options store.ReadOptions
for _, o := range opts {
o(&options)
}
// TODO: make use of options.Prefix using WHERE key LIKE = ?
q, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM %s.%s WHERE key = $1;", s.database, s.table))
2019-11-01 14:13:21 +00:00
if err != nil {
return nil, err
}
2019-11-01 14:13:21 +00:00
var records []*store.Record
var timehelper pq.NullTime
row := q.QueryRow(key)
record := &store.Record{}
if err := row.Scan(&record.Key, &record.Value, &timehelper); err != nil {
if err == sql.ErrNoRows {
return records, store.ErrNotFound
2019-11-01 14:13:21 +00:00
}
return records, err
}
if timehelper.Valid {
if timehelper.Time.Before(time.Now()) {
// record has expired
go s.Delete(key)
return records, store.ErrNotFound
2019-11-01 14:13:21 +00:00
}
record.Expiry = time.Until(timehelper.Time)
records = append(records, record)
} else {
records = append(records, record)
2019-11-01 14:13:21 +00:00
}
2019-11-01 14:13:21 +00:00
return records, nil
}
// Write records
func (s *sqlStore) Write(r *store.Record) error {
q, err := s.db.Prepare(fmt.Sprintf(`INSERT INTO %s.%s(key, value, expiry)
2019-11-01 14:13:21 +00:00
VALUES ($1, $2::bytea, $3)
ON CONFLICT (key)
DO UPDATE
SET value = EXCLUDED.value, expiry = EXCLUDED.expiry;`, s.database, s.table))
2019-11-01 14:13:21 +00:00
if err != nil {
return err
}
if r.Expiry != 0 {
_, err = q.Exec(r.Key, r.Value, time.Now().Add(r.Expiry))
} else {
_, err = q.Exec(r.Key, r.Value, nil)
}
if err != nil {
return errors.Wrap(err, "Couldn't insert record "+r.Key)
2019-11-01 14:13:21 +00:00
}
return nil
}
// Delete records with keys
func (s *sqlStore) Delete(key string) error {
q, err := s.db.Prepare(fmt.Sprintf("DELETE FROM %s.%s WHERE key = $1;", s.database, s.table))
2019-11-01 14:13:21 +00:00
if err != nil {
return err
}
result, err := q.Exec(key)
if err != nil {
return err
2019-11-01 14:13:21 +00:00
}
_, err = result.RowsAffected()
if err != nil {
return err
}
2019-11-01 14:13:21 +00:00
return nil
}
func (s *sqlStore) initDB() error {
2019-12-16 17:11:13 +00:00
// Create the namespace's database
_, err := s.db.Exec(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s ;", s.database))
2019-11-01 14:13:21 +00:00
if err != nil {
return err
2019-11-01 14:13:21 +00:00
}
2019-12-16 14:38:51 +00:00
2019-12-16 17:11:13 +00:00
_, err = s.db.Exec(fmt.Sprintf("SET DATABASE = %s ;", s.database))
2019-11-01 14:13:21 +00:00
if err != nil {
return errors.Wrap(err, "Couldn't set database")
2019-11-01 14:13:21 +00:00
}
2019-12-16 17:11:13 +00:00
// Create a table for the namespace's prefix
_, err = s.db.Exec(fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s
2019-11-01 14:13:21 +00:00
(
2019-12-16 17:11:13 +00:00
key text NOT NULL,
2019-11-01 14:13:21 +00:00
value bytea,
expiry timestamp with time zone,
CONSTRAINT %s_pkey PRIMARY KEY (key)
2019-12-16 17:11:13 +00:00
);`, s.table, s.table))
2019-11-01 14:13:21 +00:00
if err != nil {
return errors.Wrap(err, "Couldn't create table")
2019-11-01 14:13:21 +00:00
}
return nil
}
2019-11-01 14:13:21 +00:00
func (s *sqlStore) configure() error {
nodes := s.options.Nodes
2019-12-16 14:38:51 +00:00
if len(nodes) == 0 {
nodes = []string{"localhost:26257"}
}
2019-11-01 14:13:21 +00:00
namespace := s.options.Namespace
2019-12-16 14:38:51 +00:00
if len(namespace) == 0 {
namespace = DefaultNamespace
2019-11-01 14:13:21 +00:00
}
2019-12-16 14:38:51 +00:00
prefix := s.options.Prefix
2019-12-16 14:38:51 +00:00
if len(prefix) == 0 {
prefix = DefaultPrefix
2019-11-01 14:13:21 +00:00
}
2019-12-16 14:38:51 +00:00
for _, r := range namespace {
if !unicode.IsLetter(r) {
return errors.New("store.namespace must only contain letters")
2019-12-16 14:38:51 +00:00
}
2019-11-01 14:13:21 +00:00
}
2019-12-16 14:38:51 +00:00
2019-12-16 15:09:59 +00:00
source := nodes[0]
// check if it is a standard connection string eg: host=%s port=%d user=%s password=%s dbname=%s sslmode=disable
// if err is nil which means it would be a URL like postgre://xxxx?yy=zz
_, err := url.Parse(source)
if err != nil {
if !strings.Contains(source, " ") {
source = fmt.Sprintf("host=%s", source)
}
2019-12-16 15:09:59 +00:00
}
2019-12-16 14:38:51 +00:00
// create source from first node
2019-12-16 15:09:59 +00:00
db, err := sql.Open("postgres", source)
2019-12-16 14:38:51 +00:00
if err != nil {
return err
2019-11-01 14:13:21 +00:00
}
2019-12-16 14:38:51 +00:00
if err := db.Ping(); err != nil {
return err
2019-11-01 14:13:21 +00:00
}
2019-12-16 14:38:51 +00:00
if s.db != nil {
s.db.Close()
2019-11-01 14:13:21 +00:00
}
// save the values
s.db = db
s.database = namespace
s.table = prefix
// initialise the database
return s.initDB()
}
func (s *sqlStore) String() string {
return "cockroach"
}
// New returns a new micro Store backed by sql
func NewStore(opts ...store.Option) store.Store {
var options store.Options
for _, o := range opts {
o(&options)
}
// new store
s := new(sqlStore)
// set the options
s.options = options
// configure the store
if err := s.configure(); err != nil {
log.Fatal(err)
}
// return store
2019-12-16 14:38:51 +00:00
return s
2019-11-01 14:13:21 +00:00
}