Unify the store tests (#1952)

Add more tests for store
This commit is contained in:
Dominic Wong
2020-08-19 23:41:03 +01:00
committed by Vasiliy Tolstov
parent eee5b98d78
commit d5da9c0728
5 changed files with 571 additions and 97 deletions

View File

@@ -27,7 +27,7 @@ var (
re = regexp.MustCompile("[^a-zA-Z0-9]+")
statements = map[string]string{
"list": "SELECT key, value, metadata, expiry FROM %s.%s;",
"list": "SELECT key, value, metadata, expiry FROM %s.%s WHERE key LIKE $1 ORDER BY key DESC LIMIT $2 OFFSET $3;",
"read": "SELECT key, value, metadata, expiry FROM %s.%s WHERE key = $1;",
"readMany": "SELECT key, value, metadata, expiry FROM %s.%s WHERE key LIKE $1;",
"readOffset": "SELECT key, value, metadata, expiry FROM %s.%s WHERE key LIKE $1 ORDER BY key DESC LIMIT $2 OFFSET $3;",
@@ -213,6 +213,23 @@ func (s *sqlStore) List(opts ...store.ListOption) ([]string, error) {
if err := s.createDB(options.Database, options.Table); err != nil {
return nil, err
}
limit := sql.NullInt32{}
offset := 0
pattern := "%"
if options.Prefix != "" || options.Suffix != "" {
if options.Prefix != "" {
pattern = options.Prefix + pattern
}
if options.Suffix != "" {
pattern = pattern + options.Suffix
}
}
if options.Offset > 0 {
offset = int(options.Offset)
}
if options.Limit > 0 {
limit = sql.NullInt32{Int32: int32(options.Limit), Valid: true}
}
st, err := s.prepare(options.Database, options.Table, "list")
if err != nil {
@@ -220,7 +237,7 @@ func (s *sqlStore) List(opts ...store.ListOption) ([]string, error) {
}
defer st.Close()
rows, err := st.Query()
rows, err := st.Query(pattern, limit, offset)
if err != nil {
if err == sql.ErrNoRows {
return nil, nil
@@ -228,8 +245,55 @@ func (s *sqlStore) List(opts ...store.ListOption) ([]string, error) {
return nil, err
}
defer rows.Close()
var keys []string
records, err := s.rowsToRecords(rows)
if err != nil {
return nil, err
}
for _, k := range records {
keys = append(keys, k.Key)
}
rowErr := rows.Close()
if rowErr != nil {
// transaction rollback or something
return keys, rowErr
}
if err := rows.Err(); err != nil {
return keys, err
}
return keys, nil
}
// rowToRecord converts from sql.Row to a store.Record. If the record has expired it will issue a delete in a separate goroutine
func (s *sqlStore) rowToRecord(row *sql.Row) (*store.Record, error) {
var timehelper pq.NullTime
record := &store.Record{}
metadata := make(Metadata)
if err := row.Scan(&record.Key, &record.Value, &metadata, &timehelper); err != nil {
if err == sql.ErrNoRows {
return record, store.ErrNotFound
}
return nil, err
}
// set the metadata
record.Metadata = toMetadata(&metadata)
if timehelper.Valid {
if timehelper.Time.Before(time.Now()) {
// record has expired
go s.Delete(record.Key)
return nil, store.ErrNotFound
}
record.Expiry = time.Until(timehelper.Time)
}
return record, nil
}
// rowsToRecords converts from sql.Rows to []*store.Record. If a record has expired it will issue a delete in a separate goroutine
func (s *sqlStore) rowsToRecords(rows *sql.Rows) ([]*store.Record, error) {
var records []*store.Record
var timehelper pq.NullTime
for rows.Next() {
@@ -237,7 +301,7 @@ func (s *sqlStore) List(opts ...store.ListOption) ([]string, error) {
metadata := make(Metadata)
if err := rows.Scan(&record.Key, &record.Value, &metadata, &timehelper); err != nil {
return keys, err
return records, err
}
// set the metadata
@@ -249,22 +313,13 @@ func (s *sqlStore) List(opts ...store.ListOption) ([]string, error) {
go s.Delete(record.Key)
} else {
record.Expiry = time.Until(timehelper.Time)
keys = append(keys, record.Key)
records = append(records, record)
}
} else {
keys = append(keys, record.Key)
records = append(records, record)
}
}
rowErr := rows.Close()
if rowErr != nil {
// transaction rollback or something
return keys, rowErr
}
if err := rows.Err(); err != nil {
return keys, err
}
return keys, nil
return records, nil
}
// Read a single key
@@ -283,9 +338,6 @@ func (s *sqlStore) Read(key string, opts ...store.ReadOption) ([]*store.Record,
return s.read(key, options)
}
var records []*store.Record
var timehelper pq.NullTime
st, err := s.prepare(options.Database, options.Table, "read")
if err != nil {
return nil, err
@@ -293,32 +345,12 @@ func (s *sqlStore) Read(key string, opts ...store.ReadOption) ([]*store.Record,
defer st.Close()
row := st.QueryRow(key)
record := &store.Record{}
metadata := make(Metadata)
if err := row.Scan(&record.Key, &record.Value, &metadata, &timehelper); err != nil {
if err == sql.ErrNoRows {
return records, store.ErrNotFound
}
return records, err
record, err := s.rowToRecord(row)
if err != nil {
return nil, err
}
// set the metadata
record.Metadata = toMetadata(&metadata)
if timehelper.Valid {
if timehelper.Time.Before(time.Now()) {
// record has expired
go s.Delete(key)
return records, store.ErrNotFound
}
record.Expiry = time.Until(timehelper.Time)
records = append(records, record)
} else {
records = append(records, record)
}
return records, nil
var records []*store.Record
return append(records, record), nil
}
// Read Many records
@@ -361,31 +393,9 @@ func (s *sqlStore) read(key string, options store.ReadOptions) ([]*store.Record,
defer rows.Close()
var records []*store.Record
var timehelper pq.NullTime
for rows.Next() {
record := &store.Record{}
metadata := make(Metadata)
if err := rows.Scan(&record.Key, &record.Value, &metadata, &timehelper); err != nil {
return records, err
}
// set the metadata
record.Metadata = toMetadata(&metadata)
if timehelper.Valid {
if timehelper.Time.Before(time.Now()) {
// record has expired
go s.Delete(record.Key)
} else {
record.Expiry = time.Until(timehelper.Time)
records = append(records, record)
}
} else {
records = append(records, record)
}
records, err := s.rowsToRecords(rows)
if err != nil {
return nil, err
}
rowErr := rows.Close()
if rowErr != nil {