WIP: Add metadata to store record (#1604)

* Add metadata to store record

* Add metadata to cockroach store

* add metadata to store service implementation

* fix breaking cache test

* Test/fix cockroach metadata usage

* fix store memory metadata bug
This commit is contained in:
Asim Aslam 2020-06-03 09:45:08 +01:00 committed by GitHub
parent e4e56b0f3f
commit 7b379bf1f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 1378 additions and 937 deletions

View File

@ -28,16 +28,19 @@ func TestCache(t *testing.T) {
_, err := cachedStore.Read("test") _, err := cachedStore.Read("test")
assert.Equal(store.ErrNotFound, err, "Read non existant key") assert.Equal(store.ErrNotFound, err, "Read non existant key")
r1 := &store.Record{ r1 := &store.Record{
Key: "aaa", Key: "aaa",
Value: []byte("bbb"), Value: []byte("bbb"),
Metadata: map[string]interface{}{},
} }
r2 := &store.Record{ r2 := &store.Record{
Key: "aaaa", Key: "aaaa",
Value: []byte("bbbb"), Value: []byte("bbbb"),
Metadata: map[string]interface{}{},
} }
r3 := &store.Record{ r3 := &store.Record{
Key: "aaaaa", Key: "aaaaa",
Value: []byte("bbbbb"), Value: []byte("bbbbb"),
Metadata: map[string]interface{}{},
} }
// Write 3 records directly to l2 // Write 3 records directly to l2
l2.Write(r1) l2.Write(r1)

View File

@ -27,11 +27,11 @@ var (
re = regexp.MustCompile("[^a-zA-Z0-9]+") re = regexp.MustCompile("[^a-zA-Z0-9]+")
statements = map[string]string{ statements = map[string]string{
"list": "SELECT key, value, expiry FROM %s.%s;", "list": "SELECT key, value, metadata, expiry FROM %s.%s;",
"read": "SELECT key, value, expiry FROM %s.%s WHERE key = $1;", "read": "SELECT key, value, metadata, expiry FROM %s.%s WHERE key = $1;",
"readMany": "SELECT key, value, expiry FROM %s.%s WHERE key LIKE $1;", "readMany": "SELECT key, value, metadata, expiry FROM %s.%s WHERE key LIKE $1;",
"readOffset": "SELECT key, value, expiry FROM %s.%s WHERE key LIKE $1 ORDER BY key DESC LIMIT $2 OFFSET $3;", "readOffset": "SELECT key, value, metadata, expiry FROM %s.%s WHERE key LIKE $1 ORDER BY key DESC LIMIT $2 OFFSET $3;",
"write": "INSERT INTO %s.%s(key, value, expiry) VALUES ($1, $2::bytea, $3) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, expiry = EXCLUDED.expiry;", "write": "INSERT INTO %s.%s(key, value, metadata, expiry) VALUES ($1, $2::bytea, $3, $4) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, metadata = EXCLUDED.metadata, expiry = EXCLUDED.expiry;",
"delete": "DELETE FROM %s.%s WHERE key = $1;", "delete": "DELETE FROM %s.%s WHERE key = $1;",
} }
) )
@ -108,6 +108,7 @@ func (s *sqlStore) initDB(database, table string) error {
( (
key text NOT NULL, key text NOT NULL,
value bytea, value bytea,
metadata JSONB,
expiry timestamp with time zone, expiry timestamp with time zone,
CONSTRAINT %s_pkey PRIMARY KEY (key) CONSTRAINT %s_pkey PRIMARY KEY (key)
);`, table, table)) );`, table, table))
@ -121,6 +122,12 @@ func (s *sqlStore) initDB(database, table string) error {
return err return err
} }
// Create Metadata Index
_, err = s.db.Exec(fmt.Sprintf(`CREATE INDEX IF NOT EXISTS "%s" ON %s.%s USING GIN ("metadata");`, "metadata_index_"+table, database, table))
if err != nil {
return err
}
return nil return nil
} }
@ -227,9 +234,15 @@ func (s *sqlStore) List(opts ...store.ListOption) ([]string, error) {
for rows.Next() { for rows.Next() {
record := &store.Record{} record := &store.Record{}
if err := rows.Scan(&record.Key, &record.Value, &timehelper); err != nil { metadata := make(Metadata)
if err := rows.Scan(&record.Key, &record.Value, &metadata, &timehelper); err != nil {
return keys, err return keys, err
} }
// set the metadata
record.Metadata = toMetadata(&metadata)
if timehelper.Valid { if timehelper.Valid {
if timehelper.Time.Before(time.Now()) { if timehelper.Time.Before(time.Now()) {
// record has expired // record has expired
@ -281,12 +294,18 @@ func (s *sqlStore) Read(key string, opts ...store.ReadOption) ([]*store.Record,
row := st.QueryRow(key) row := st.QueryRow(key)
record := &store.Record{} record := &store.Record{}
if err := row.Scan(&record.Key, &record.Value, &timehelper); err != nil { metadata := make(Metadata)
if err := row.Scan(&record.Key, &record.Value, &metadata, &timehelper); err != nil {
if err == sql.ErrNoRows { if err == sql.ErrNoRows {
return records, store.ErrNotFound return records, store.ErrNotFound
} }
return records, err return records, err
} }
// set the metadata
record.Metadata = toMetadata(&metadata)
if timehelper.Valid { if timehelper.Valid {
if timehelper.Time.Before(time.Now()) { if timehelper.Time.Before(time.Now()) {
// record has expired // record has expired
@ -346,9 +365,15 @@ func (s *sqlStore) read(key string, options store.ReadOptions) ([]*store.Record,
for rows.Next() { for rows.Next() {
record := &store.Record{} record := &store.Record{}
if err := rows.Scan(&record.Key, &record.Value, &timehelper); err != nil { metadata := make(Metadata)
if err := rows.Scan(&record.Key, &record.Value, &metadata, &timehelper); err != nil {
return records, err return records, err
} }
// set the metadata
record.Metadata = toMetadata(&metadata)
if timehelper.Valid { if timehelper.Valid {
if timehelper.Time.Before(time.Now()) { if timehelper.Time.Before(time.Now()) {
// record has expired // record has expired
@ -391,10 +416,15 @@ func (s *sqlStore) Write(r *store.Record, opts ...store.WriteOption) error {
} }
defer st.Close() defer st.Close()
metadata := make(Metadata)
for k, v := range r.Metadata {
metadata[k] = v
}
if r.Expiry != 0 { if r.Expiry != 0 {
_, err = st.Exec(r.Key, r.Value, time.Now().Add(r.Expiry)) _, err = st.Exec(r.Key, r.Value, metadata, time.Now().Add(r.Expiry))
} else { } else {
_, err = st.Exec(r.Key, r.Value, nil) _, err = st.Exec(r.Key, r.Value, metadata, nil)
} }
if err != nil { if err != nil {

View File

@ -0,0 +1,45 @@
package cockroach
import (
"database/sql/driver"
"encoding/json"
"errors"
)
// https://github.com/upper/db/blob/master/postgresql/custom_types.go#L43
type Metadata map[string]interface{}
// Scan satisfies the sql.Scanner interface.
func (m *Metadata) Scan(src interface{}) error {
source, ok := src.([]byte)
if !ok {
return errors.New("Type assertion .([]byte) failed.")
}
var i interface{}
err := json.Unmarshal(source, &i)
if err != nil {
return err
}
*m, ok = i.(map[string]interface{})
if !ok {
return errors.New("Type assertion .(map[string]interface{}) failed.")
}
return nil
}
// Value satisfies the driver.Valuer interface.
func (m Metadata) Value() (driver.Value, error) {
j, err := json.Marshal(m)
return j, err
}
func toMetadata(m *Metadata) map[string]interface{} {
md := make(map[string]interface{})
for k, v := range *m {
md[k] = v
}
return md
}

View File

@ -54,6 +54,7 @@ type fileHandle struct {
type record struct { type record struct {
Key string Key string
Value []byte Value []byte
Metadata map[string]interface{}
ExpiresAt time.Time ExpiresAt time.Time
} }
@ -221,6 +222,11 @@ func (m *fileStore) get(fd *fileHandle, k string) (*store.Record, error) {
newRecord := &store.Record{} newRecord := &store.Record{}
newRecord.Key = storedRecord.Key newRecord.Key = storedRecord.Key
newRecord.Value = storedRecord.Value newRecord.Value = storedRecord.Value
newRecord.Metadata = make(map[string]interface{})
for k, v := range storedRecord.Metadata {
newRecord.Metadata[k] = v
}
if !storedRecord.ExpiresAt.IsZero() { if !storedRecord.ExpiresAt.IsZero() {
if storedRecord.ExpiresAt.Before(time.Now()) { if storedRecord.ExpiresAt.Before(time.Now()) {
@ -238,10 +244,16 @@ func (m *fileStore) set(fd *fileHandle, r *store.Record) error {
item := &record{} item := &record{}
item.Key = r.Key item.Key = r.Key
item.Value = r.Value item.Value = r.Value
item.Metadata = make(map[string]interface{})
if r.Expiry != 0 { if r.Expiry != 0 {
item.ExpiresAt = time.Now().Add(r.Expiry) item.ExpiresAt = time.Now().Add(r.Expiry)
} }
for k, v := range r.Metadata {
item.Metadata[k] = v
}
// marshal the data // marshal the data
data, _ := json.Marshal(item) data, _ := json.Marshal(item)
@ -348,6 +360,7 @@ func (m *fileStore) Write(r *store.Record, opts ...store.WriteOption) error {
newRecord := store.Record{} newRecord := store.Record{}
newRecord.Key = r.Key newRecord.Key = r.Key
newRecord.Value = r.Value newRecord.Value = r.Value
newRecord.Metadata = make(map[string]interface{})
newRecord.Expiry = r.Expiry newRecord.Expiry = r.Expiry
if !writeOpts.Expiry.IsZero() { if !writeOpts.Expiry.IsZero() {
@ -357,6 +370,10 @@ func (m *fileStore) Write(r *store.Record, opts ...store.WriteOption) error {
newRecord.Expiry = writeOpts.TTL newRecord.Expiry = writeOpts.TTL
} }
for k, v := range r.Metadata {
newRecord.Metadata[k] = v
}
return m.set(fd, &newRecord) return m.set(fd, &newRecord)
} }

View File

@ -33,9 +33,10 @@ type memoryStore struct {
store *cache.Cache store *cache.Cache
} }
type internalRecord struct { type storeRecord struct {
key string key string
value []byte value []byte
metadata map[string]interface{}
expiresAt time.Time expiresAt time.Time
} }
@ -56,26 +57,36 @@ func (m *memoryStore) prefix(database, table string) string {
func (m *memoryStore) get(prefix, key string) (*store.Record, error) { func (m *memoryStore) get(prefix, key string) (*store.Record, error) {
key = m.key(prefix, key) key = m.key(prefix, key)
var storedRecord *internalRecord var storedRecord *storeRecord
r, found := m.store.Get(key) r, found := m.store.Get(key)
if !found { if !found {
return nil, store.ErrNotFound return nil, store.ErrNotFound
} }
storedRecord, ok := r.(*internalRecord) storedRecord, ok := r.(*storeRecord)
if !ok { if !ok {
return nil, errors.New("Retrieved a non *internalRecord from the cache") return nil, errors.New("Retrieved a non *storeRecord from the cache")
} }
// Copy the record on the way out // Copy the record on the way out
newRecord := &store.Record{} newRecord := &store.Record{}
newRecord.Key = strings.TrimPrefix(storedRecord.key, prefix+"/") newRecord.Key = strings.TrimPrefix(storedRecord.key, prefix+"/")
newRecord.Value = make([]byte, len(storedRecord.value)) newRecord.Value = make([]byte, len(storedRecord.value))
newRecord.Metadata = make(map[string]interface{})
// copy the value into the new record
copy(newRecord.Value, storedRecord.value) copy(newRecord.Value, storedRecord.value)
// check if we need to set the expiry
if !storedRecord.expiresAt.IsZero() { if !storedRecord.expiresAt.IsZero() {
newRecord.Expiry = time.Until(storedRecord.expiresAt) newRecord.Expiry = time.Until(storedRecord.expiresAt)
} }
// copy in the metadata
for k, v := range storedRecord.metadata {
newRecord.Metadata[k] = v
}
return newRecord, nil return newRecord, nil
} }
@ -84,15 +95,24 @@ func (m *memoryStore) set(prefix string, r *store.Record) {
// copy the incoming record and then // copy the incoming record and then
// convert the expiry in to a hard timestamp // convert the expiry in to a hard timestamp
i := &internalRecord{} i := &storeRecord{}
i.key = r.Key i.key = r.Key
i.value = make([]byte, len(r.Value)) i.value = make([]byte, len(r.Value))
i.metadata = make(map[string]interface{})
// copy the the value
copy(i.value, r.Value) copy(i.value, r.Value)
// set the expiry
if r.Expiry != 0 { if r.Expiry != 0 {
i.expiresAt = time.Now().Add(r.Expiry) i.expiresAt = time.Now().Add(r.Expiry)
} }
// set the metadata
for k, v := range r.Metadata {
i.metadata[k] = v
}
m.store.Set(key, i, r.Expiry) m.store.Set(key, i, r.Expiry)
} }
@ -199,6 +219,7 @@ func (m *memoryStore) Write(r *store.Record, opts ...store.WriteOption) error {
newRecord := store.Record{} newRecord := store.Record{}
newRecord.Key = r.Key newRecord.Key = r.Key
newRecord.Value = make([]byte, len(r.Value)) newRecord.Value = make([]byte, len(r.Value))
newRecord.Metadata = make(map[string]interface{})
copy(newRecord.Value, r.Value) copy(newRecord.Value, r.Value)
newRecord.Expiry = r.Expiry newRecord.Expiry = r.Expiry
@ -208,6 +229,11 @@ func (m *memoryStore) Write(r *store.Record, opts ...store.WriteOption) error {
if writeOpts.TTL != 0 { if writeOpts.TTL != 0 {
newRecord.Expiry = writeOpts.TTL newRecord.Expiry = writeOpts.TTL
} }
for k, v := range r.Metadata {
newRecord.Metadata[k] = v
}
m.set(prefix, &newRecord) m.set(prefix, &newRecord)
return nil return nil
} }

File diff suppressed because it is too large Load Diff

View File

@ -1,5 +1,5 @@
// Code generated by protoc-gen-micro. DO NOT EDIT. // Code generated by protoc-gen-micro. DO NOT EDIT.
// source: store/service/proto/store.proto // source: github.com/micro/go-micro/store/service/proto/store.proto
package go_micro_store package go_micro_store

View File

@ -11,6 +11,13 @@ service Store {
rpc Tables(TablesRequest) returns (TablesResponse) {}; rpc Tables(TablesRequest) returns (TablesResponse) {};
} }
message Field {
// type of value e.g string, int, int64, bool, float64
string type = 1;
// the actual value
string value = 2;
}
message Record { message Record {
// key of the record // key of the record
string key = 1; string key = 1;
@ -18,6 +25,8 @@ message Record {
bytes value = 2; bytes value = 2;
// time.Duration (signed int64 nanoseconds) // time.Duration (signed int64 nanoseconds)
int64 expiry = 3; int64 expiry = 3;
// the associated metadata
map<string,Field> metadata = 4;
} }
message ReadOptions { message ReadOptions {

View File

@ -3,7 +3,9 @@ package service
import ( import (
"context" "context"
"fmt"
"io" "io"
"reflect"
"time" "time"
"github.com/micro/go-micro/v2/client" "github.com/micro/go-micro/v2/client"
@ -137,10 +139,21 @@ func (s *serviceStore) Read(key string, opts ...store.ReadOption) ([]*store.Reco
records := make([]*store.Record, 0, len(rsp.Records)) records := make([]*store.Record, 0, len(rsp.Records))
for _, val := range rsp.Records { for _, val := range rsp.Records {
metadata := make(map[string]interface{})
for k, v := range val.Metadata {
switch v.Type {
// TODO: parse all types
default:
metadata[k] = v
}
}
records = append(records, &store.Record{ records = append(records, &store.Record{
Key: val.Key, Key: val.Key,
Value: val.Value, Value: val.Value,
Expiry: time.Duration(val.Expiry) * time.Second, Expiry: time.Duration(val.Expiry) * time.Second,
Metadata: metadata,
}) })
} }
@ -163,11 +176,21 @@ func (s *serviceStore) Write(record *store.Record, opts ...store.WriteOption) er
Table: options.Table, Table: options.Table,
} }
metadata := make(map[string]*pb.Field)
for k, v := range record.Metadata {
metadata[k] = &pb.Field{
Type: reflect.TypeOf(v).String(),
Value: fmt.Sprintf("%v", v),
}
}
_, err := s.Client.Write(s.Context(), &pb.WriteRequest{ _, err := s.Client.Write(s.Context(), &pb.WriteRequest{
Record: &pb.Record{ Record: &pb.Record{
Key: record.Key, Key: record.Key,
Value: record.Value, Value: record.Value,
Expiry: int64(record.Expiry.Seconds()), Expiry: int64(record.Expiry.Seconds()),
Metadata: metadata,
}, },
Options: writeOpts}, client.WithAddress(s.Nodes...)) Options: writeOpts}, client.WithAddress(s.Nodes...))
if err != nil && errors.Equal(err, errors.NotFound("", "")) { if err != nil && errors.Equal(err, errors.NotFound("", "")) {

View File

@ -36,7 +36,12 @@ type Store interface {
// Record is an item stored or retrieved from a Store // Record is an item stored or retrieved from a Store
type Record struct { type Record struct {
Key string `json:"key"` // The key to store the record
Value []byte `json:"value"` Key string `json:"key"`
// The value within the record
Value []byte `json:"value"`
// Any associated metadata for indexing
Metadata map[string]interface{} `json:"metadata"`
// Time to expire a record: TODO: change to timestamp
Expiry time.Duration `json:"expiry,omitempty"` Expiry time.Duration `json:"expiry,omitempty"`
} }