Merge pull request #901 from micro/sqlstore

Implementation of PostgreSQL for micro store
This commit is contained in:
Asim Aslam 2019-11-01 15:48:47 +00:00 committed by GitHub
commit bd1918900e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 358 additions and 4 deletions

2
go.mod
View File

@ -34,7 +34,9 @@ require (
github.com/jonboulle/clockwork v0.1.0 // indirect
github.com/joncalhoun/qson v0.0.0-20170526102502-8a9cab3a62b1
github.com/json-iterator/go v1.1.7
github.com/kr/pretty v0.1.0
github.com/leodido/go-urn v1.1.0 // indirect
github.com/lib/pq v1.2.0
github.com/lucas-clemente/quic-go v0.12.1
github.com/mholt/certmagic v0.7.5
github.com/micro/cli v0.2.0

2
go.sum
View File

@ -228,6 +228,8 @@ github.com/labbsr0x/bindman-dns-webhook v1.0.2/go.mod h1:p6b+VCXIR8NYKpDr8/dg1HK
github.com/labbsr0x/goh v1.0.1/go.mod h1:8K2UhVoaWXcCU7Lxoa2omWnC8gyW8px7/lmO61c027w=
github.com/leodido/go-urn v1.1.0 h1:Sm1gr51B1kKyfD2BlRcLSiEkffoG96g6TPv6eRoEiB8=
github.com/leodido/go-urn v1.1.0/go.mod h1:+cyI34gQWZcE1eQU7NVgKkkzdXDQHr1dBMtdAPozLkw=
github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/linode/linodego v0.10.0/go.mod h1:cziNP7pbvE3mXIPneHj0oRY8L1WtGEIKlZ8LANE4eXA=
github.com/liquidweb/liquidweb-go v1.6.0/go.mod h1:UDcVnAMDkZxpw4Y7NOHkqoeiGacVLEIG/i5J9cyixzQ=
github.com/lucas-clemente/quic-go v0.12.1 h1:BPITli+6KnKogtTxBk2aS4okr5dUHz2LtIDAP1b8UL4=

View File

@ -4,7 +4,7 @@ import (
"github.com/micro/go-micro/config/options"
)
// Set the nodes used to back the store
// Nodes is a list of nodes used to back the store
func Nodes(a ...string) options.Option {
return options.WithValue("store.nodes", a)
}
@ -13,3 +13,9 @@ func Nodes(a ...string) options.Option {
func Prefix(p string) options.Option {
return options.WithValue("store.prefix", p)
}
// Namespace offers a way to have multiple isolated
// stores in the same backend, if supported.
func Namespace(n string) options.Option {
return options.WithValue("store.namespace", n)
}

View File

@ -0,0 +1,248 @@
// Package postgresql implements a micro Store backed by sql
package postgresql
import (
"database/sql"
"fmt"
"strings"
"time"
"unicode"
"github.com/lib/pq"
"github.com/pkg/errors"
"github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/store"
)
// DefaultNamespace is the namespace that the sql store
// will use if no namespace is provided.
const DefaultNamespace = "micro"
type sqlStore struct {
db *sql.DB
table string
options.Options
}
// List all the known records
func (s *sqlStore) List() ([]*store.Record, error) {
q, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM micro.%s;", s.table))
if err != nil {
return nil, err
}
var records []*store.Record
var timehelper pq.NullTime
rows, err := q.Query()
if err != nil {
if err == sql.ErrNoRows {
return records, nil
}
return nil, err
}
defer rows.Close()
for rows.Next() {
record := &store.Record{}
if err := rows.Scan(&record.Key, &record.Value, &timehelper); err != nil {
return records, err
}
if timehelper.Valid {
if timehelper.Time.Before(time.Now()) {
// record has expired
go s.Delete(record.Key)
} else {
record.Expiry = time.Until(timehelper.Time)
records = append(records, record)
}
} else {
records = append(records, record)
}
}
rowErr := rows.Close()
if rowErr != nil {
// transaction rollback or something
return records, rowErr
}
if err := rows.Err(); err != nil {
return records, err
}
return records, nil
}
// Read all records with keys
func (s *sqlStore) Read(keys ...string) ([]*store.Record, error) {
q, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM micro.%s WHERE key = $1;", s.table))
if err != nil {
return nil, err
}
var records []*store.Record
var timehelper pq.NullTime
for _, key := range keys {
row := q.QueryRow(key)
record := &store.Record{}
if err := row.Scan(&record.Key, &record.Value, &timehelper); err != nil {
if err == sql.ErrNoRows {
return records, store.ErrNotFound
}
return records, err
}
if timehelper.Valid {
if timehelper.Time.Before(time.Now()) {
// record has expired
go s.Delete(key)
return records, store.ErrNotFound
}
record.Expiry = time.Until(timehelper.Time)
records = append(records, record)
} else {
records = append(records, record)
}
}
return records, nil
}
// Write records
func (s *sqlStore) Write(rec ...*store.Record) error {
q, err := s.db.Prepare(fmt.Sprintf(`INSERT INTO micro.%s(key, value, expiry)
VALUES ($1, $2::bytea, $3)
ON CONFLICT (key)
DO UPDATE
SET value = EXCLUDED.value, expiry = EXCLUDED.expiry;`, s.table))
if err != nil {
return err
}
for _, r := range rec {
var err error
if r.Expiry != 0 {
_, err = q.Exec(r.Key, r.Value, time.Now().Add(r.Expiry))
} else {
_, err = q.Exec(r.Key, r.Value, nil)
}
if err != nil {
return errors.Wrap(err, "Couldn't insert record "+r.Key)
}
}
return nil
}
// Delete records with keys
func (s *sqlStore) Delete(keys ...string) error {
q, err := s.db.Prepare(fmt.Sprintf("DELETE FROM micro.%s WHERE key = $1;", s.table))
if err != nil {
return err
}
for _, key := range keys {
result, err := q.Exec(key)
if err != nil {
return err
}
_, err = result.RowsAffected()
if err != nil {
return err
}
}
return nil
}
func (s *sqlStore) initDB(options options.Options) error {
// Get the store.namespace option, or use sql.DefaultNamespace
namespaceOpt, found := options.Values().Get("store.namespace")
if !found {
s.table = DefaultNamespace
} else {
if namespace, ok := namespaceOpt.(string); ok {
s.table = namespace
} else {
return errors.New("store.namespace option must be a string")
}
}
// Create "micro" schema
schema, err := s.db.Prepare("CREATE SCHEMA IF NOT EXISTS micro ;")
if err != nil {
return err
}
_, err = schema.Exec()
if err != nil {
return errors.Wrap(err, "Couldn't create Schema")
}
// Create a table for the Store namespace
tableq, err := s.db.Prepare(fmt.Sprintf(`CREATE TABLE IF NOT EXISTS micro.%s
(
key text COLLATE "default" NOT NULL,
value bytea,
expiry timestamp with time zone,
CONSTRAINT %s_pkey PRIMARY KEY (key)
);`, s.table, s.table))
_, err = tableq.Exec()
if err != nil {
return errors.Wrap(err, "Couldn't create table")
}
return nil
}
// New returns a new micro Store backed by sql
func New(opts ...options.Option) (store.Store, error) {
options := options.NewOptions(opts...)
driver, dataSourceName, err := validateOptions(options)
if err != nil {
return nil, err
}
if !strings.Contains(dataSourceName, " ") {
dataSourceName = fmt.Sprintf("host=%s", dataSourceName)
}
db, err := sql.Open(driver, dataSourceName)
if err != nil {
return nil, err
}
if err := db.Ping(); err != nil {
return nil, err
}
s := &sqlStore{
db: db,
}
return s, s.initDB(options)
}
// validateOptions checks whether the provided options are valid, then returns the driver
// and data source name.
func validateOptions(options options.Options) (driver, dataSourceName string, err error) {
driverOpt, found := options.Values().Get("store.sql.driver")
if !found {
return "", "", errors.New("No store.sql.driver option specified")
}
nodesOpt, found := options.Values().Get("store.nodes")
if !found {
return "", "", errors.New("No store.nodes option specified (expected a database connection string)")
}
driver, ok := driverOpt.(string)
if !ok {
return "", "", errors.New("store.sql.driver option must be a string")
}
nodes, ok := nodesOpt.([]string)
if !ok {
return "", "", errors.New("store.nodes option must be a []string")
}
if len(nodes) != 1 {
return "", "", errors.New("expected only 1 store.nodes option")
}
namespaceOpt, found := options.Values().Get("store.namespace")
if found {
namespace, ok := namespaceOpt.(string)
if !ok {
return "", "", errors.New("store.namespace must me a string")
}
for _, r := range namespace {
if !unicode.IsLetter(r) {
return "", "", errors.New("store.namespace must only contain letters")
}
}
}
return driver, nodes[0], nil
}

View File

@ -0,0 +1,95 @@
package postgresql
import (
"database/sql"
"fmt"
"testing"
"time"
"github.com/kr/pretty"
"github.com/micro/go-micro/store"
)
func TestSQL(t *testing.T) {
connection := fmt.Sprintf(
"host=%s port=%d user=%s sslmode=disable dbname=%s",
"localhost",
5432,
"jake",
"test",
)
db, err := sql.Open("postgres", connection)
if err != nil {
t.Fatal(err)
}
if err := db.Ping(); err != nil {
t.Skip(err)
}
db.Close()
sqlStore, err := New(
store.Namespace("testsql"),
store.Nodes(connection),
)
if err != nil {
t.Fatal(err.Error())
}
records, err := sqlStore.List()
if err != nil {
t.Error(err)
} else {
t.Logf("%# v\n", pretty.Formatter(records))
}
err = sqlStore.Write(
&store.Record{
Key: "test",
Value: []byte("foo"),
},
&store.Record{
Key: "bar",
Value: []byte("baz"),
},
&store.Record{
Key: "qux",
Value: []byte("aasad"),
},
)
if err != nil {
t.Error(err)
}
err = sqlStore.Delete("qux")
if err != nil {
t.Error(err)
}
err = sqlStore.Write(&store.Record{
Key: "test",
Value: []byte("bar"),
Expiry: time.Minute,
})
if err != nil {
t.Error(err)
}
records, err = sqlStore.Read("test")
if err != nil {
t.Error(err)
} else {
t.Logf("%# v\n", pretty.Formatter(records))
if string(records[0].Value) != "bar" {
t.Error("Expected bar, got ", string(records[0].Value))
}
}
time.Sleep(61 * time.Second)
records, err = sqlStore.Read("test")
if err == nil {
t.Error("Key test should have expired")
} else {
if err != store.ErrNotFound {
t.Error(err)
}
}
}

View File

@ -7,6 +7,7 @@ import (
)
var (
// ErrNotFound is returned when a Read key doesn't exist
ErrNotFound = errors.New("not found")
)
@ -14,11 +15,11 @@ var (
type Store interface {
// List all the known records
List() ([]*Record, error)
// Read a record with key
// Read records with keys
Read(key ...string) ([]*Record, error)
// Write a record
// Write records
Write(rec ...*Record) error
// Delete a record with key
// Delete records with keys
Delete(key ...string) error
}