From dee63b2b2cc7c9299070ae8dd58969cbc1a31454 Mon Sep 17 00:00:00 2001 From: Jake Sanders Date: Fri, 1 Nov 2019 14:13:21 +0000 Subject: [PATCH 1/3] Implementation of postgres store --- store/options.go | 8 +- store/postgresql/postgresql.go | 244 ++++++++++++++++++++++++++++ store/postgresql/postgresql_test.go | 95 +++++++++++ store/store.go | 7 +- 4 files changed, 350 insertions(+), 4 deletions(-) create mode 100644 store/postgresql/postgresql.go create mode 100644 store/postgresql/postgresql_test.go diff --git a/store/options.go b/store/options.go index cf780a58..99014362 100644 --- a/store/options.go +++ b/store/options.go @@ -4,7 +4,7 @@ import ( "github.com/micro/go-micro/config/options" ) -// Set the nodes used to back the store +// Nodes is a list of nodes used to back the store func Nodes(a ...string) options.Option { return options.WithValue("store.nodes", a) } @@ -13,3 +13,9 @@ func Nodes(a ...string) options.Option { func Prefix(p string) options.Option { return options.WithValue("store.prefix", p) } + +// Namespace offers a way to have multiple isolated +// stores in the same backend, if supported. +func Namespace(n string) options.Option { + return options.WithValue("store.namespace", n) +} diff --git a/store/postgresql/postgresql.go b/store/postgresql/postgresql.go new file mode 100644 index 00000000..7abca6df --- /dev/null +++ b/store/postgresql/postgresql.go @@ -0,0 +1,244 @@ +// Package postgresql implements a micro Store backed by sql +package postgresql + +import ( + "database/sql" + "fmt" + "time" + "unicode" + + "github.com/lib/pq" + "github.com/pkg/errors" + + "github.com/micro/go-micro/config/options" + "github.com/micro/go-micro/store" +) + +// DefaultNamespace is the namespace that the sql store +// will use if no namespace is provided. +const DefaultNamespace = "micro" + +type sqlStore struct { + db *sql.DB + + table string + options.Options +} + +// List all the known records +func (s *sqlStore) List() ([]*store.Record, error) { + q, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM micro.%s;", s.table)) + if err != nil { + return nil, err + } + var records []*store.Record + var timehelper pq.NullTime + rows, err := q.Query() + if err != nil { + if err == sql.ErrNoRows { + return records, nil + } + return nil, err + } + defer rows.Close() + for rows.Next() { + record := &store.Record{} + if err := rows.Scan(&record.Key, &record.Value, &timehelper); err != nil { + return records, err + } + if timehelper.Valid { + if timehelper.Time.Before(time.Now()) { + // record has expired + go s.Delete(record.Key) + } else { + record.Expiry = time.Until(timehelper.Time) + records = append(records, record) + } + } else { + records = append(records, record) + } + + } + rowErr := rows.Close() + if rowErr != nil { + // transaction rollback or something + return records, rowErr + } + if err := rows.Err(); err != nil { + return records, err + } + return records, nil +} + +// Read all records with keys +func (s *sqlStore) Read(keys ...string) ([]*store.Record, error) { + q, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM micro.%s WHERE key = $1;", s.table)) + if err != nil { + return nil, err + } + var records []*store.Record + var timehelper pq.NullTime + for _, key := range keys { + row := q.QueryRow(key) + record := &store.Record{} + if err := row.Scan(&record.Key, &record.Value, &timehelper); err != nil { + if err == sql.ErrNoRows { + return records, store.ErrNotFound + } + return records, err + } + if timehelper.Valid { + if timehelper.Time.Before(time.Now()) { + // record has expired + go s.Delete(key) + return records, store.ErrNotFound + } + record.Expiry = time.Until(timehelper.Time) + records = append(records, record) + } else { + records = append(records, record) + } + } + return records, nil +} + +// Write records +func (s *sqlStore) Write(rec ...*store.Record) error { + q, err := s.db.Prepare(fmt.Sprintf(`INSERT INTO micro.%s(key, value, expiry) + VALUES ($1, $2::bytea, $3) + ON CONFLICT (key) + DO UPDATE + SET value = EXCLUDED.value, expiry = EXCLUDED.expiry;`, s.table)) + if err != nil { + return err + } + for _, r := range rec { + var err error + if r.Expiry != 0 { + _, err = q.Exec(r.Key, r.Value, time.Now().Add(r.Expiry)) + } else { + _, err = q.Exec(r.Key, r.Value, nil) + } + if err != nil { + return errors.Wrap(err, "Couldn't insert record "+r.Key) + } + } + + return nil +} + +// Delete records with keys +func (s *sqlStore) Delete(keys ...string) error { + q, err := s.db.Prepare(fmt.Sprintf("DELETE FROM micro.%s WHERE key = $1;", s.table)) + if err != nil { + return err + } + for _, key := range keys { + result, err := q.Exec(key) + if err != nil { + return err + } + _, err = result.RowsAffected() + if err != nil { + return err + } + } + return nil +} + +func (s *sqlStore) initDB(options options.Options) error { + // Get the store.namespace option, or use sql.DefaultNamespace + namespaceOpt, found := options.Values().Get("store.namespace") + if !found { + s.table = DefaultNamespace + } else { + if namespace, ok := namespaceOpt.(string); ok { + s.table = namespace + } else { + return errors.New("store.namespace option must be a string") + } + } + + // Create "micro" schema + schema, err := s.db.Prepare("CREATE SCHEMA IF NOT EXISTS micro ;") + if err != nil { + return err + } + _, err = schema.Exec() + if err != nil { + return errors.Wrap(err, "Couldn't create Schema") + } + + // Create a table for the Store namespace + tableq, err := s.db.Prepare(fmt.Sprintf(`CREATE TABLE IF NOT EXISTS micro.%s + ( + key text COLLATE "default" NOT NULL, + value bytea, + expiry timestamp with time zone, + CONSTRAINT %s_pkey PRIMARY KEY (key) + );`, s.table, s.table)) + _, err = tableq.Exec() + if err != nil { + return errors.Wrap(err, "Couldn't create table") + } + + return nil +} + +// New returns a new micro Store backed by sql +func New(opts ...options.Option) (store.Store, error) { + options := options.NewOptions(opts...) + driver, dataSourceName, err := validateOptions(options) + if err != nil { + return nil, err + } + db, err := sql.Open(driver, dataSourceName) + if err != nil { + return nil, err + } + if err := db.Ping(); err != nil { + return nil, err + } + s := &sqlStore{ + db: db, + } + + return s, s.initDB(options) +} + +// validateOptions checks whether the provided options are valid, then returns the driver +// and data source name. +func validateOptions(options options.Options) (driver, dataSourceName string, err error) { + driverOpt, found := options.Values().Get("store.sql.driver") + if !found { + return "", "", errors.New("No store.sql.driver option specified") + } + nodesOpt, found := options.Values().Get("store.nodes") + if !found { + return "", "", errors.New("No store.nodes option specified (expected a database connection string)") + } + driver, ok := driverOpt.(string) + if !ok { + return "", "", errors.New("store.sql.driver option must be a string") + } + nodes, ok := nodesOpt.([]string) + if !ok { + return "", "", errors.New("store.nodes option must be a []string") + } + if len(nodes) != 1 { + return "", "", errors.New("expected only 1 store.nodes option") + } + namespaceOpt, found := options.Values().Get("store.namespace") + if found { + namespace, ok := namespaceOpt.(string) + if !ok { + return "", "", errors.New("store.namespace must me a string") + } + for _, r := range namespace { + if !unicode.IsLetter(r) { + return "", "", errors.New("store.namespace must only contain letters") + } + } + } + return driver, nodes[0], nil +} diff --git a/store/postgresql/postgresql_test.go b/store/postgresql/postgresql_test.go new file mode 100644 index 00000000..65d97e78 --- /dev/null +++ b/store/postgresql/postgresql_test.go @@ -0,0 +1,95 @@ +package postgresql + +import ( + "database/sql" + "fmt" + "testing" + "time" + + "github.com/kr/pretty" + "github.com/micro/go-micro/store" +) + +func TestSQL(t *testing.T) { + connection := fmt.Sprintf( + "host=%s port=%d user=%s sslmode=disable dbname=%s", + "localhost", + 5432, + "jake", + "test", + ) + db, err := sql.Open("postgres", connection) + if err != nil { + t.Fatal(err) + } + if err := db.Ping(); err != nil { + t.Skip(err) + } + db.Close() + + sqlStore, err := New( + store.Namespace("testsql"), + store.Nodes(connection), + ) + if err != nil { + t.Fatal(err.Error()) + } + + records, err := sqlStore.List() + if err != nil { + t.Error(err) + } else { + t.Logf("%# v\n", pretty.Formatter(records)) + } + + err = sqlStore.Write( + &store.Record{ + Key: "test", + Value: []byte("foo"), + }, + &store.Record{ + Key: "bar", + Value: []byte("baz"), + }, + &store.Record{ + Key: "qux", + Value: []byte("aasad"), + }, + ) + if err != nil { + t.Error(err) + } + err = sqlStore.Delete("qux") + if err != nil { + t.Error(err) + } + + err = sqlStore.Write(&store.Record{ + Key: "test", + Value: []byte("bar"), + Expiry: time.Minute, + }) + if err != nil { + t.Error(err) + } + + records, err = sqlStore.Read("test") + if err != nil { + t.Error(err) + } else { + t.Logf("%# v\n", pretty.Formatter(records)) + if string(records[0].Value) != "bar" { + t.Error("Expected bar, got ", string(records[0].Value)) + } + } + + time.Sleep(61 * time.Second) + records, err = sqlStore.Read("test") + if err == nil { + t.Error("Key test should have expired") + } else { + if err != store.ErrNotFound { + t.Error(err) + } + } +} diff --git a/store/store.go b/store/store.go index c45712eb..9d265500 100644 --- a/store/store.go +++ b/store/store.go @@ -7,6 +7,7 @@ import ( ) var ( + // ErrNotFound is returned when a Read key doesn't exist ErrNotFound = errors.New("not found") ) @@ -14,11 +15,11 @@ var ( type Store interface { // List all the known records List() ([]*Record, error) - // Read a record with key + // Read records with keys Read(key ...string) ([]*Record, error) - // Write a record + // Write records Write(rec ...*Record) error - // Delete a record with key + // Delete records with keys Delete(key ...string) error } From ee35fe61af23b51fab3e19a0cfbec0ef9481ad5c Mon Sep 17 00:00:00 2001 From: Jake Sanders Date: Fri, 1 Nov 2019 14:13:47 +0000 Subject: [PATCH 2/3] update go.mod for postgres --- go.mod | 2 ++ go.sum | 2 ++ 2 files changed, 4 insertions(+) diff --git a/go.mod b/go.mod index a08ccfe0..12808717 100644 --- a/go.mod +++ b/go.mod @@ -34,7 +34,9 @@ require ( github.com/jonboulle/clockwork v0.1.0 // indirect github.com/joncalhoun/qson v0.0.0-20170526102502-8a9cab3a62b1 github.com/json-iterator/go v1.1.7 + github.com/kr/pretty v0.1.0 github.com/leodido/go-urn v1.1.0 // indirect + github.com/lib/pq v1.2.0 github.com/lucas-clemente/quic-go v0.12.1 github.com/mholt/certmagic v0.7.5 github.com/micro/cli v0.2.0 diff --git a/go.sum b/go.sum index 6970348f..fd44a28e 100644 --- a/go.sum +++ b/go.sum @@ -228,6 +228,8 @@ github.com/labbsr0x/bindman-dns-webhook v1.0.2/go.mod h1:p6b+VCXIR8NYKpDr8/dg1HK github.com/labbsr0x/goh v1.0.1/go.mod h1:8K2UhVoaWXcCU7Lxoa2omWnC8gyW8px7/lmO61c027w= github.com/leodido/go-urn v1.1.0 h1:Sm1gr51B1kKyfD2BlRcLSiEkffoG96g6TPv6eRoEiB8= github.com/leodido/go-urn v1.1.0/go.mod h1:+cyI34gQWZcE1eQU7NVgKkkzdXDQHr1dBMtdAPozLkw= +github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0= +github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/linode/linodego v0.10.0/go.mod h1:cziNP7pbvE3mXIPneHj0oRY8L1WtGEIKlZ8LANE4eXA= github.com/liquidweb/liquidweb-go v1.6.0/go.mod h1:UDcVnAMDkZxpw4Y7NOHkqoeiGacVLEIG/i5J9cyixzQ= github.com/lucas-clemente/quic-go v0.12.1 h1:BPITli+6KnKogtTxBk2aS4okr5dUHz2LtIDAP1b8UL4= From 15e3b9b4c0cd41678ea8d1fe7e2d6cc553f35f6c Mon Sep 17 00:00:00 2001 From: Jake Sanders Date: Fri, 1 Nov 2019 15:16:05 +0000 Subject: [PATCH 3/3] Let people connect with just a hostname --- store/postgresql/postgresql.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/store/postgresql/postgresql.go b/store/postgresql/postgresql.go index 7abca6df..a574c5ed 100644 --- a/store/postgresql/postgresql.go +++ b/store/postgresql/postgresql.go @@ -4,6 +4,7 @@ package postgresql import ( "database/sql" "fmt" + "strings" "time" "unicode" @@ -192,6 +193,9 @@ func New(opts ...options.Option) (store.Store, error) { if err != nil { return nil, err } + if !strings.Contains(dataSourceName, " ") { + dataSourceName = fmt.Sprintf("host=%s", dataSourceName) + } db, err := sql.Open(driver, dataSourceName) if err != nil { return nil, err