Implementation of postgres store

This commit is contained in:
Jake Sanders 2019-11-01 14:13:21 +00:00
parent 74286c2939
commit dee63b2b2c
4 changed files with 350 additions and 4 deletions

View File

@ -4,7 +4,7 @@ import (
"github.com/micro/go-micro/config/options" "github.com/micro/go-micro/config/options"
) )
// Set the nodes used to back the store // Nodes is a list of nodes used to back the store
func Nodes(a ...string) options.Option { func Nodes(a ...string) options.Option {
return options.WithValue("store.nodes", a) return options.WithValue("store.nodes", a)
} }
@ -13,3 +13,9 @@ func Nodes(a ...string) options.Option {
func Prefix(p string) options.Option { func Prefix(p string) options.Option {
return options.WithValue("store.prefix", p) return options.WithValue("store.prefix", p)
} }
// Namespace offers a way to have multiple isolated
// stores in the same backend, if supported.
func Namespace(n string) options.Option {
return options.WithValue("store.namespace", n)
}

View File

@ -0,0 +1,244 @@
// Package postgresql implements a micro Store backed by sql
package postgresql
import (
"database/sql"
"fmt"
"time"
"unicode"
"github.com/lib/pq"
"github.com/pkg/errors"
"github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/store"
)
// DefaultNamespace is the namespace that the sql store
// will use if no namespace is provided.
const DefaultNamespace = "micro"
type sqlStore struct {
db *sql.DB
table string
options.Options
}
// List all the known records
func (s *sqlStore) List() ([]*store.Record, error) {
q, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM micro.%s;", s.table))
if err != nil {
return nil, err
}
var records []*store.Record
var timehelper pq.NullTime
rows, err := q.Query()
if err != nil {
if err == sql.ErrNoRows {
return records, nil
}
return nil, err
}
defer rows.Close()
for rows.Next() {
record := &store.Record{}
if err := rows.Scan(&record.Key, &record.Value, &timehelper); err != nil {
return records, err
}
if timehelper.Valid {
if timehelper.Time.Before(time.Now()) {
// record has expired
go s.Delete(record.Key)
} else {
record.Expiry = time.Until(timehelper.Time)
records = append(records, record)
}
} else {
records = append(records, record)
}
}
rowErr := rows.Close()
if rowErr != nil {
// transaction rollback or something
return records, rowErr
}
if err := rows.Err(); err != nil {
return records, err
}
return records, nil
}
// Read all records with keys
func (s *sqlStore) Read(keys ...string) ([]*store.Record, error) {
q, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM micro.%s WHERE key = $1;", s.table))
if err != nil {
return nil, err
}
var records []*store.Record
var timehelper pq.NullTime
for _, key := range keys {
row := q.QueryRow(key)
record := &store.Record{}
if err := row.Scan(&record.Key, &record.Value, &timehelper); err != nil {
if err == sql.ErrNoRows {
return records, store.ErrNotFound
}
return records, err
}
if timehelper.Valid {
if timehelper.Time.Before(time.Now()) {
// record has expired
go s.Delete(key)
return records, store.ErrNotFound
}
record.Expiry = time.Until(timehelper.Time)
records = append(records, record)
} else {
records = append(records, record)
}
}
return records, nil
}
// Write records
func (s *sqlStore) Write(rec ...*store.Record) error {
q, err := s.db.Prepare(fmt.Sprintf(`INSERT INTO micro.%s(key, value, expiry)
VALUES ($1, $2::bytea, $3)
ON CONFLICT (key)
DO UPDATE
SET value = EXCLUDED.value, expiry = EXCLUDED.expiry;`, s.table))
if err != nil {
return err
}
for _, r := range rec {
var err error
if r.Expiry != 0 {
_, err = q.Exec(r.Key, r.Value, time.Now().Add(r.Expiry))
} else {
_, err = q.Exec(r.Key, r.Value, nil)
}
if err != nil {
return errors.Wrap(err, "Couldn't insert record "+r.Key)
}
}
return nil
}
// Delete records with keys
func (s *sqlStore) Delete(keys ...string) error {
q, err := s.db.Prepare(fmt.Sprintf("DELETE FROM micro.%s WHERE key = $1;", s.table))
if err != nil {
return err
}
for _, key := range keys {
result, err := q.Exec(key)
if err != nil {
return err
}
_, err = result.RowsAffected()
if err != nil {
return err
}
}
return nil
}
func (s *sqlStore) initDB(options options.Options) error {
// Get the store.namespace option, or use sql.DefaultNamespace
namespaceOpt, found := options.Values().Get("store.namespace")
if !found {
s.table = DefaultNamespace
} else {
if namespace, ok := namespaceOpt.(string); ok {
s.table = namespace
} else {
return errors.New("store.namespace option must be a string")
}
}
// Create "micro" schema
schema, err := s.db.Prepare("CREATE SCHEMA IF NOT EXISTS micro ;")
if err != nil {
return err
}
_, err = schema.Exec()
if err != nil {
return errors.Wrap(err, "Couldn't create Schema")
}
// Create a table for the Store namespace
tableq, err := s.db.Prepare(fmt.Sprintf(`CREATE TABLE IF NOT EXISTS micro.%s
(
key text COLLATE "default" NOT NULL,
value bytea,
expiry timestamp with time zone,
CONSTRAINT %s_pkey PRIMARY KEY (key)
);`, s.table, s.table))
_, err = tableq.Exec()
if err != nil {
return errors.Wrap(err, "Couldn't create table")
}
return nil
}
// New returns a new micro Store backed by sql
func New(opts ...options.Option) (store.Store, error) {
options := options.NewOptions(opts...)
driver, dataSourceName, err := validateOptions(options)
if err != nil {
return nil, err
}
db, err := sql.Open(driver, dataSourceName)
if err != nil {
return nil, err
}
if err := db.Ping(); err != nil {
return nil, err
}
s := &sqlStore{
db: db,
}
return s, s.initDB(options)
}
// validateOptions checks whether the provided options are valid, then returns the driver
// and data source name.
func validateOptions(options options.Options) (driver, dataSourceName string, err error) {
driverOpt, found := options.Values().Get("store.sql.driver")
if !found {
return "", "", errors.New("No store.sql.driver option specified")
}
nodesOpt, found := options.Values().Get("store.nodes")
if !found {
return "", "", errors.New("No store.nodes option specified (expected a database connection string)")
}
driver, ok := driverOpt.(string)
if !ok {
return "", "", errors.New("store.sql.driver option must be a string")
}
nodes, ok := nodesOpt.([]string)
if !ok {
return "", "", errors.New("store.nodes option must be a []string")
}
if len(nodes) != 1 {
return "", "", errors.New("expected only 1 store.nodes option")
}
namespaceOpt, found := options.Values().Get("store.namespace")
if found {
namespace, ok := namespaceOpt.(string)
if !ok {
return "", "", errors.New("store.namespace must me a string")
}
for _, r := range namespace {
if !unicode.IsLetter(r) {
return "", "", errors.New("store.namespace must only contain letters")
}
}
}
return driver, nodes[0], nil
}

View File

@ -0,0 +1,95 @@
package postgresql
import (
"database/sql"
"fmt"
"testing"
"time"
"github.com/kr/pretty"
"github.com/micro/go-micro/store"
)
func TestSQL(t *testing.T) {
connection := fmt.Sprintf(
"host=%s port=%d user=%s sslmode=disable dbname=%s",
"localhost",
5432,
"jake",
"test",
)
db, err := sql.Open("postgres", connection)
if err != nil {
t.Fatal(err)
}
if err := db.Ping(); err != nil {
t.Skip(err)
}
db.Close()
sqlStore, err := New(
store.Namespace("testsql"),
store.Nodes(connection),
)
if err != nil {
t.Fatal(err.Error())
}
records, err := sqlStore.List()
if err != nil {
t.Error(err)
} else {
t.Logf("%# v\n", pretty.Formatter(records))
}
err = sqlStore.Write(
&store.Record{
Key: "test",
Value: []byte("foo"),
},
&store.Record{
Key: "bar",
Value: []byte("baz"),
},
&store.Record{
Key: "qux",
Value: []byte("aasad"),
},
)
if err != nil {
t.Error(err)
}
err = sqlStore.Delete("qux")
if err != nil {
t.Error(err)
}
err = sqlStore.Write(&store.Record{
Key: "test",
Value: []byte("bar"),
Expiry: time.Minute,
})
if err != nil {
t.Error(err)
}
records, err = sqlStore.Read("test")
if err != nil {
t.Error(err)
} else {
t.Logf("%# v\n", pretty.Formatter(records))
if string(records[0].Value) != "bar" {
t.Error("Expected bar, got ", string(records[0].Value))
}
}
time.Sleep(61 * time.Second)
records, err = sqlStore.Read("test")
if err == nil {
t.Error("Key test should have expired")
} else {
if err != store.ErrNotFound {
t.Error(err)
}
}
}

View File

@ -7,6 +7,7 @@ import (
) )
var ( var (
// ErrNotFound is returned when a Read key doesn't exist
ErrNotFound = errors.New("not found") ErrNotFound = errors.New("not found")
) )
@ -14,11 +15,11 @@ var (
type Store interface { type Store interface {
// List all the known records // List all the known records
List() ([]*Record, error) List() ([]*Record, error)
// Read a record with key // Read records with keys
Read(key ...string) ([]*Record, error) Read(key ...string) ([]*Record, error)
// Write a record // Write records
Write(rec ...*Record) error Write(rec ...*Record) error
// Delete a record with key // Delete records with keys
Delete(key ...string) error Delete(key ...string) error
} }