diff --git a/store/cockroach/cockroach.go b/store/cockroach/cockroach.go index 4bd92576..d70b0c1c 100644 --- a/store/cockroach/cockroach.go +++ b/store/cockroach/cockroach.go @@ -28,10 +28,12 @@ type sqlStore struct { database string table string - list *sql.Stmt - readOne *sql.Stmt - write *sql.Stmt - delete *sql.Stmt + list *sql.Stmt + readOne *sql.Stmt + readMany *sql.Stmt + readOffset *sql.Stmt + write *sql.Stmt + delete *sql.Stmt options store.Options } @@ -92,7 +94,9 @@ func (s *sqlStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, o(&options) } - // TODO: make use of options.Prefix using WHERE key LIKE = ? + if options.Prefix || options.Suffix { + return s.read(key, options) + } var records []*store.Record var timehelper pq.NullTime @@ -120,6 +124,61 @@ func (s *sqlStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, return records, nil } +// Read Many records +func (s *sqlStore) read(key string, options store.ReadOptions) ([]*store.Record, error) { + pattern := "%" + if options.Prefix { + pattern = key + pattern + } + if options.Suffix { + pattern = pattern + key + } + var rows *sql.Rows + var err error + if options.Limit != 0 { + rows, err = s.readOffset.Query(pattern, options.Limit, options.Offset) + } else { + rows, err = s.readMany.Query(pattern) + } + if err != nil { + if err == sql.ErrNoRows { + return []*store.Record{}, nil + } + return []*store.Record{}, errors.Wrap(err, "sqlStore.read failed") + } + defer rows.Close() + var records []*store.Record + var timehelper pq.NullTime + + for rows.Next() { + record := &store.Record{} + if err := rows.Scan(&record.Key, &record.Value, &timehelper); err != nil { + return records, err + } + if timehelper.Valid { + if timehelper.Time.Before(time.Now()) { + // record has expired + go s.Delete(record.Key) + } else { + record.Expiry = time.Until(timehelper.Time) + records = append(records, record) + } + } else { + records = append(records, record) + } + } + rowErr := rows.Close() + if rowErr != nil { + // transaction rollback or something + return records, rowErr + } + if err := rows.Err(); err != nil { + return records, err + } + + return records, nil +} + // Write records func (s *sqlStore) Write(r *store.Record, opts ...store.WriteOption) error { var err error @@ -174,16 +233,44 @@ func (s *sqlStore) initDB() error { return errors.Wrap(err, "Couldn't create table") } + // Create Index + _, err = s.db.Exec(fmt.Sprintf(`CREATE INDEX IF NOT EXISTS "%s" ON %s.%s USING btree ("key")`, "key_index_"+s.table, s.database, s.table)) + if err != nil { + return err + } + list, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM %s.%s;", s.database, s.table)) if err != nil { return errors.Wrap(err, "List statement couldn't be prepared") } + if s.list != nil { + s.list.Close() + } s.list = list readOne, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM %s.%s WHERE key = $1;", s.database, s.table)) if err != nil { return errors.Wrap(err, "ReadOne statement couldn't be prepared") } + if s.readOne != nil { + s.readOne.Close() + } s.readOne = readOne + readMany, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM %s.%s WHERE key LIKE $1;", s.database, s.table)) + if err != nil { + return errors.Wrap(err, "ReadMany statement couldn't be prepared") + } + if s.readMany != nil { + s.readMany.Close() + } + s.readMany = readMany + readOffset, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM %s.%s WHERE key LIKE $1 ORDER BY key DESC LIMIT $2 OFFSET $3;", s.database, s.table)) + if err != nil { + return errors.Wrap(err, "ReadOffset statement couldn't be prepared") + } + if s.readOffset != nil { + s.readOffset.Close() + } + s.readOffset = readOffset write, err := s.db.Prepare(fmt.Sprintf(`INSERT INTO %s.%s(key, value, expiry) VALUES ($1, $2::bytea, $3) ON CONFLICT (key) @@ -192,11 +279,17 @@ func (s *sqlStore) initDB() error { if err != nil { return errors.Wrap(err, "Write statement couldn't be prepared") } + if s.write != nil { + s.write.Close() + } s.write = write delete, err := s.db.Prepare(fmt.Sprintf("DELETE FROM %s.%s WHERE key = $1;", s.database, s.table)) if err != nil { return errors.Wrap(err, "Delete statement couldn't be prepared") } + if s.delete != nil { + s.delete.Close() + } s.delete = delete return nil diff --git a/store/cockroach/cockroach_test.go b/store/cockroach/cockroach_test.go index 38e36f5f..d9a95c1b 100644 --- a/store/cockroach/cockroach_test.go +++ b/store/cockroach/cockroach_test.go @@ -14,8 +14,8 @@ func TestSQL(t *testing.T) { connection := fmt.Sprintf( "host=%s port=%d user=%s sslmode=disable dbname=%s", "localhost", - 5432, - "jake", + 26257, + "root", "test", ) db, err := sql.Open("postgres", connection) @@ -32,6 +32,10 @@ func TestSQL(t *testing.T) { store.Nodes(connection), ) + if err := sqlStore.Init(); err != nil { + t.Fatal(err) + } + keys, err := sqlStore.List() if err != nil { t.Error(err) @@ -74,7 +78,7 @@ func TestSQL(t *testing.T) { err = sqlStore.Write(&store.Record{ Key: "test", Value: []byte("bar"), - Expiry: time.Minute, + Expiry: time.Second * 10, }) if err != nil { t.Error(err) @@ -89,7 +93,7 @@ func TestSQL(t *testing.T) { t.Error("Expected bar, got ", string(records[0].Value)) } - time.Sleep(61 * time.Second) + time.Sleep(11 * time.Second) _, err = sqlStore.Read("test") switch err { case nil: @@ -99,4 +103,17 @@ func TestSQL(t *testing.T) { case store.ErrNotFound: break } + sqlStore.Delete("bar") + sqlStore.Write(&store.Record{Key: "aaa", Value: []byte("bbb"), Expiry: 5 * time.Second}) + sqlStore.Write(&store.Record{Key: "aaaa", Value: []byte("bbb"), Expiry: 5 * time.Second}) + sqlStore.Write(&store.Record{Key: "aaaaa", Value: []byte("bbb"), Expiry: 5 * time.Second}) + results, err := sqlStore.Read("a", store.ReadPrefix()) + if len(results) != 3 { + t.Fatal("Results should have returned 3 records") + } + time.Sleep(6 * time.Second) + results, err = sqlStore.Read("a", store.ReadPrefix()) + if len(results) != 0 { + t.Fatal("Results should have returned 0 records") + } } diff --git a/store/memory/memory_test.go b/store/memory/memory_test.go index 26e04004..21f6d597 100644 --- a/store/memory/memory_test.go +++ b/store/memory/memory_test.go @@ -234,6 +234,16 @@ func basictest(s store.Store, t *testing.T) { t.Error("Expiry options were not effective") } } + s.Write(&store.Record{Key: "a", Value: []byte("a")}) + s.Write(&store.Record{Key: "aa", Value: []byte("aa")}) + s.Write(&store.Record{Key: "aaa", Value: []byte("aaa")}) + if results, err := s.Read("b", store.ReadPrefix()); err != nil { + t.Error(err) + } else { + if len(results) != 0 { + t.Errorf("Expected 0 results, got %d", len(results)) + } + } s.Init() for i := 0; i < 10; i++ {