diff --git a/store/options.go b/store/options.go index cf780a58..99014362 100644 --- a/store/options.go +++ b/store/options.go @@ -4,7 +4,7 @@ import ( "github.com/micro/go-micro/config/options" ) -// Set the nodes used to back the store +// Nodes is a list of nodes used to back the store func Nodes(a ...string) options.Option { return options.WithValue("store.nodes", a) } @@ -13,3 +13,9 @@ func Nodes(a ...string) options.Option { func Prefix(p string) options.Option { return options.WithValue("store.prefix", p) } + +// Namespace offers a way to have multiple isolated +// stores in the same backend, if supported. +func Namespace(n string) options.Option { + return options.WithValue("store.namespace", n) +} diff --git a/store/postgresql/postgresql.go b/store/postgresql/postgresql.go new file mode 100644 index 00000000..7abca6df --- /dev/null +++ b/store/postgresql/postgresql.go @@ -0,0 +1,244 @@ +// Package postgresql implements a micro Store backed by sql +package postgresql + +import ( + "database/sql" + "fmt" + "time" + "unicode" + + "github.com/lib/pq" + "github.com/pkg/errors" + + "github.com/micro/go-micro/config/options" + "github.com/micro/go-micro/store" +) + +// DefaultNamespace is the namespace that the sql store +// will use if no namespace is provided. +const DefaultNamespace = "micro" + +type sqlStore struct { + db *sql.DB + + table string + options.Options +} + +// List all the known records +func (s *sqlStore) List() ([]*store.Record, error) { + q, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM micro.%s;", s.table)) + if err != nil { + return nil, err + } + var records []*store.Record + var timehelper pq.NullTime + rows, err := q.Query() + if err != nil { + if err == sql.ErrNoRows { + return records, nil + } + return nil, err + } + defer rows.Close() + for rows.Next() { + record := &store.Record{} + if err := rows.Scan(&record.Key, &record.Value, &timehelper); err != nil { + return records, err + } + if timehelper.Valid { + if timehelper.Time.Before(time.Now()) { + // record has expired + go s.Delete(record.Key) + } else { + record.Expiry = time.Until(timehelper.Time) + records = append(records, record) + } + } else { + records = append(records, record) + } + + } + rowErr := rows.Close() + if rowErr != nil { + // transaction rollback or something + return records, rowErr + } + if err := rows.Err(); err != nil { + return records, err + } + return records, nil +} + +// Read all records with keys +func (s *sqlStore) Read(keys ...string) ([]*store.Record, error) { + q, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM micro.%s WHERE key = $1;", s.table)) + if err != nil { + return nil, err + } + var records []*store.Record + var timehelper pq.NullTime + for _, key := range keys { + row := q.QueryRow(key) + record := &store.Record{} + if err := row.Scan(&record.Key, &record.Value, &timehelper); err != nil { + if err == sql.ErrNoRows { + return records, store.ErrNotFound + } + return records, err + } + if timehelper.Valid { + if timehelper.Time.Before(time.Now()) { + // record has expired + go s.Delete(key) + return records, store.ErrNotFound + } + record.Expiry = time.Until(timehelper.Time) + records = append(records, record) + } else { + records = append(records, record) + } + } + return records, nil +} + +// Write records +func (s *sqlStore) Write(rec ...*store.Record) error { + q, err := s.db.Prepare(fmt.Sprintf(`INSERT INTO micro.%s(key, value, expiry) + VALUES ($1, $2::bytea, $3) + ON CONFLICT (key) + DO UPDATE + SET value = EXCLUDED.value, expiry = EXCLUDED.expiry;`, s.table)) + if err != nil { + return err + } + for _, r := range rec { + var err error + if r.Expiry != 0 { + _, err = q.Exec(r.Key, r.Value, time.Now().Add(r.Expiry)) + } else { + _, err = q.Exec(r.Key, r.Value, nil) + } + if err != nil { + return errors.Wrap(err, "Couldn't insert record "+r.Key) + } + } + + return nil +} + +// Delete records with keys +func (s *sqlStore) Delete(keys ...string) error { + q, err := s.db.Prepare(fmt.Sprintf("DELETE FROM micro.%s WHERE key = $1;", s.table)) + if err != nil { + return err + } + for _, key := range keys { + result, err := q.Exec(key) + if err != nil { + return err + } + _, err = result.RowsAffected() + if err != nil { + return err + } + } + return nil +} + +func (s *sqlStore) initDB(options options.Options) error { + // Get the store.namespace option, or use sql.DefaultNamespace + namespaceOpt, found := options.Values().Get("store.namespace") + if !found { + s.table = DefaultNamespace + } else { + if namespace, ok := namespaceOpt.(string); ok { + s.table = namespace + } else { + return errors.New("store.namespace option must be a string") + } + } + + // Create "micro" schema + schema, err := s.db.Prepare("CREATE SCHEMA IF NOT EXISTS micro ;") + if err != nil { + return err + } + _, err = schema.Exec() + if err != nil { + return errors.Wrap(err, "Couldn't create Schema") + } + + // Create a table for the Store namespace + tableq, err := s.db.Prepare(fmt.Sprintf(`CREATE TABLE IF NOT EXISTS micro.%s + ( + key text COLLATE "default" NOT NULL, + value bytea, + expiry timestamp with time zone, + CONSTRAINT %s_pkey PRIMARY KEY (key) + );`, s.table, s.table)) + _, err = tableq.Exec() + if err != nil { + return errors.Wrap(err, "Couldn't create table") + } + + return nil +} + +// New returns a new micro Store backed by sql +func New(opts ...options.Option) (store.Store, error) { + options := options.NewOptions(opts...) + driver, dataSourceName, err := validateOptions(options) + if err != nil { + return nil, err + } + db, err := sql.Open(driver, dataSourceName) + if err != nil { + return nil, err + } + if err := db.Ping(); err != nil { + return nil, err + } + s := &sqlStore{ + db: db, + } + + return s, s.initDB(options) +} + +// validateOptions checks whether the provided options are valid, then returns the driver +// and data source name. +func validateOptions(options options.Options) (driver, dataSourceName string, err error) { + driverOpt, found := options.Values().Get("store.sql.driver") + if !found { + return "", "", errors.New("No store.sql.driver option specified") + } + nodesOpt, found := options.Values().Get("store.nodes") + if !found { + return "", "", errors.New("No store.nodes option specified (expected a database connection string)") + } + driver, ok := driverOpt.(string) + if !ok { + return "", "", errors.New("store.sql.driver option must be a string") + } + nodes, ok := nodesOpt.([]string) + if !ok { + return "", "", errors.New("store.nodes option must be a []string") + } + if len(nodes) != 1 { + return "", "", errors.New("expected only 1 store.nodes option") + } + namespaceOpt, found := options.Values().Get("store.namespace") + if found { + namespace, ok := namespaceOpt.(string) + if !ok { + return "", "", errors.New("store.namespace must me a string") + } + for _, r := range namespace { + if !unicode.IsLetter(r) { + return "", "", errors.New("store.namespace must only contain letters") + } + } + } + return driver, nodes[0], nil +} diff --git a/store/postgresql/postgresql_test.go b/store/postgresql/postgresql_test.go new file mode 100644 index 00000000..65d97e78 --- /dev/null +++ b/store/postgresql/postgresql_test.go @@ -0,0 +1,95 @@ +package postgresql + +import ( + "database/sql" + "fmt" + "testing" + "time" + + "github.com/kr/pretty" + "github.com/micro/go-micro/store" +) + +func TestSQL(t *testing.T) { + connection := fmt.Sprintf( + "host=%s port=%d user=%s sslmode=disable dbname=%s", + "localhost", + 5432, + "jake", + "test", + ) + db, err := sql.Open("postgres", connection) + if err != nil { + t.Fatal(err) + } + if err := db.Ping(); err != nil { + t.Skip(err) + } + db.Close() + + sqlStore, err := New( + store.Namespace("testsql"), + store.Nodes(connection), + ) + if err != nil { + t.Fatal(err.Error()) + } + + records, err := sqlStore.List() + if err != nil { + t.Error(err) + } else { + t.Logf("%# v\n", pretty.Formatter(records)) + } + + err = sqlStore.Write( + &store.Record{ + Key: "test", + Value: []byte("foo"), + }, + &store.Record{ + Key: "bar", + Value: []byte("baz"), + }, + &store.Record{ + Key: "qux", + Value: []byte("aasad"), + }, + ) + if err != nil { + t.Error(err) + } + err = sqlStore.Delete("qux") + if err != nil { + t.Error(err) + } + + err = sqlStore.Write(&store.Record{ + Key: "test", + Value: []byte("bar"), + Expiry: time.Minute, + }) + if err != nil { + t.Error(err) + } + + records, err = sqlStore.Read("test") + if err != nil { + t.Error(err) + } else { + t.Logf("%# v\n", pretty.Formatter(records)) + if string(records[0].Value) != "bar" { + t.Error("Expected bar, got ", string(records[0].Value)) + } + } + + time.Sleep(61 * time.Second) + records, err = sqlStore.Read("test") + if err == nil { + t.Error("Key test should have expired") + } else { + if err != store.ErrNotFound { + t.Error(err) + } + } +} diff --git a/store/store.go b/store/store.go index c45712eb..9d265500 100644 --- a/store/store.go +++ b/store/store.go @@ -7,6 +7,7 @@ import ( ) var ( + // ErrNotFound is returned when a Read key doesn't exist ErrNotFound = errors.New("not found") ) @@ -14,11 +15,11 @@ var ( type Store interface { // List all the known records List() ([]*Record, error) - // Read a record with key + // Read records with keys Read(key ...string) ([]*Record, error) - // Write a record + // Write records Write(rec ...*Record) error - // Delete a record with key + // Delete records with keys Delete(key ...string) error }