Merge branch 'master' into develop
This commit is contained in:
commit
1ccd4cd940
10
.github/PULL_REQUEST_TEMPLATE.md
vendored
Normal file
10
.github/PULL_REQUEST_TEMPLATE.md
vendored
Normal file
@ -0,0 +1,10 @@
|
|||||||
|
## Pull Request template
|
||||||
|
Please, go through these steps before clicking submit on this PR.
|
||||||
|
|
||||||
|
1. Make sure this PR targets the `develop` branch. We follow the git-flow branching model.
|
||||||
|
2. Give a descriptive title to your PR.
|
||||||
|
3. Provide a description of your changes.
|
||||||
|
4. Make sure you have some relevant tests.
|
||||||
|
5. Put `closes #XXXX` in your comment to auto-close the issue that your PR fixes (if applicable).
|
||||||
|
|
||||||
|
**PLEASE REMOVE THIS TEMPLATE BEFORE SUBMITTING**
|
15
store/cache/cache_test.go
vendored
15
store/cache/cache_test.go
vendored
@ -28,16 +28,19 @@ func TestCache(t *testing.T) {
|
|||||||
_, err := cachedStore.Read("test")
|
_, err := cachedStore.Read("test")
|
||||||
assert.Equal(store.ErrNotFound, err, "Read non existant key")
|
assert.Equal(store.ErrNotFound, err, "Read non existant key")
|
||||||
r1 := &store.Record{
|
r1 := &store.Record{
|
||||||
Key: "aaa",
|
Key: "aaa",
|
||||||
Value: []byte("bbb"),
|
Value: []byte("bbb"),
|
||||||
|
Metadata: map[string]interface{}{},
|
||||||
}
|
}
|
||||||
r2 := &store.Record{
|
r2 := &store.Record{
|
||||||
Key: "aaaa",
|
Key: "aaaa",
|
||||||
Value: []byte("bbbb"),
|
Value: []byte("bbbb"),
|
||||||
|
Metadata: map[string]interface{}{},
|
||||||
}
|
}
|
||||||
r3 := &store.Record{
|
r3 := &store.Record{
|
||||||
Key: "aaaaa",
|
Key: "aaaaa",
|
||||||
Value: []byte("bbbbb"),
|
Value: []byte("bbbbb"),
|
||||||
|
Metadata: map[string]interface{}{},
|
||||||
}
|
}
|
||||||
// Write 3 records directly to l2
|
// Write 3 records directly to l2
|
||||||
l2.Write(r1)
|
l2.Write(r1)
|
||||||
|
@ -27,11 +27,11 @@ var (
|
|||||||
re = regexp.MustCompile("[^a-zA-Z0-9]+")
|
re = regexp.MustCompile("[^a-zA-Z0-9]+")
|
||||||
|
|
||||||
statements = map[string]string{
|
statements = map[string]string{
|
||||||
"list": "SELECT key, value, expiry FROM %s.%s;",
|
"list": "SELECT key, value, metadata, expiry FROM %s.%s;",
|
||||||
"read": "SELECT key, value, expiry FROM %s.%s WHERE key = $1;",
|
"read": "SELECT key, value, metadata, expiry FROM %s.%s WHERE key = $1;",
|
||||||
"readMany": "SELECT key, value, expiry FROM %s.%s WHERE key LIKE $1;",
|
"readMany": "SELECT key, value, metadata, expiry FROM %s.%s WHERE key LIKE $1;",
|
||||||
"readOffset": "SELECT key, value, expiry FROM %s.%s WHERE key LIKE $1 ORDER BY key DESC LIMIT $2 OFFSET $3;",
|
"readOffset": "SELECT key, value, metadata, expiry FROM %s.%s WHERE key LIKE $1 ORDER BY key DESC LIMIT $2 OFFSET $3;",
|
||||||
"write": "INSERT INTO %s.%s(key, value, expiry) VALUES ($1, $2::bytea, $3) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, expiry = EXCLUDED.expiry;",
|
"write": "INSERT INTO %s.%s(key, value, metadata, expiry) VALUES ($1, $2::bytea, $3, $4) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, metadata = EXCLUDED.metadata, expiry = EXCLUDED.expiry;",
|
||||||
"delete": "DELETE FROM %s.%s WHERE key = $1;",
|
"delete": "DELETE FROM %s.%s WHERE key = $1;",
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
@ -108,6 +108,7 @@ func (s *sqlStore) initDB(database, table string) error {
|
|||||||
(
|
(
|
||||||
key text NOT NULL,
|
key text NOT NULL,
|
||||||
value bytea,
|
value bytea,
|
||||||
|
metadata JSONB,
|
||||||
expiry timestamp with time zone,
|
expiry timestamp with time zone,
|
||||||
CONSTRAINT %s_pkey PRIMARY KEY (key)
|
CONSTRAINT %s_pkey PRIMARY KEY (key)
|
||||||
);`, table, table))
|
);`, table, table))
|
||||||
@ -121,6 +122,12 @@ func (s *sqlStore) initDB(database, table string) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Create Metadata Index
|
||||||
|
_, err = s.db.Exec(fmt.Sprintf(`CREATE INDEX IF NOT EXISTS "%s" ON %s.%s USING GIN ("metadata");`, "metadata_index_"+table, database, table))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -227,9 +234,15 @@ func (s *sqlStore) List(opts ...store.ListOption) ([]string, error) {
|
|||||||
|
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
record := &store.Record{}
|
record := &store.Record{}
|
||||||
if err := rows.Scan(&record.Key, &record.Value, &timehelper); err != nil {
|
metadata := make(Metadata)
|
||||||
|
|
||||||
|
if err := rows.Scan(&record.Key, &record.Value, &metadata, &timehelper); err != nil {
|
||||||
return keys, err
|
return keys, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// set the metadata
|
||||||
|
record.Metadata = toMetadata(&metadata)
|
||||||
|
|
||||||
if timehelper.Valid {
|
if timehelper.Valid {
|
||||||
if timehelper.Time.Before(time.Now()) {
|
if timehelper.Time.Before(time.Now()) {
|
||||||
// record has expired
|
// record has expired
|
||||||
@ -281,12 +294,18 @@ func (s *sqlStore) Read(key string, opts ...store.ReadOption) ([]*store.Record,
|
|||||||
|
|
||||||
row := st.QueryRow(key)
|
row := st.QueryRow(key)
|
||||||
record := &store.Record{}
|
record := &store.Record{}
|
||||||
if err := row.Scan(&record.Key, &record.Value, &timehelper); err != nil {
|
metadata := make(Metadata)
|
||||||
|
|
||||||
|
if err := row.Scan(&record.Key, &record.Value, &metadata, &timehelper); err != nil {
|
||||||
if err == sql.ErrNoRows {
|
if err == sql.ErrNoRows {
|
||||||
return records, store.ErrNotFound
|
return records, store.ErrNotFound
|
||||||
}
|
}
|
||||||
return records, err
|
return records, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// set the metadata
|
||||||
|
record.Metadata = toMetadata(&metadata)
|
||||||
|
|
||||||
if timehelper.Valid {
|
if timehelper.Valid {
|
||||||
if timehelper.Time.Before(time.Now()) {
|
if timehelper.Time.Before(time.Now()) {
|
||||||
// record has expired
|
// record has expired
|
||||||
@ -346,9 +365,15 @@ func (s *sqlStore) read(key string, options store.ReadOptions) ([]*store.Record,
|
|||||||
|
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
record := &store.Record{}
|
record := &store.Record{}
|
||||||
if err := rows.Scan(&record.Key, &record.Value, &timehelper); err != nil {
|
metadata := make(Metadata)
|
||||||
|
|
||||||
|
if err := rows.Scan(&record.Key, &record.Value, &metadata, &timehelper); err != nil {
|
||||||
return records, err
|
return records, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// set the metadata
|
||||||
|
record.Metadata = toMetadata(&metadata)
|
||||||
|
|
||||||
if timehelper.Valid {
|
if timehelper.Valid {
|
||||||
if timehelper.Time.Before(time.Now()) {
|
if timehelper.Time.Before(time.Now()) {
|
||||||
// record has expired
|
// record has expired
|
||||||
@ -391,10 +416,15 @@ func (s *sqlStore) Write(r *store.Record, opts ...store.WriteOption) error {
|
|||||||
}
|
}
|
||||||
defer st.Close()
|
defer st.Close()
|
||||||
|
|
||||||
|
metadata := make(Metadata)
|
||||||
|
for k, v := range r.Metadata {
|
||||||
|
metadata[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
if r.Expiry != 0 {
|
if r.Expiry != 0 {
|
||||||
_, err = st.Exec(r.Key, r.Value, time.Now().Add(r.Expiry))
|
_, err = st.Exec(r.Key, r.Value, metadata, time.Now().Add(r.Expiry))
|
||||||
} else {
|
} else {
|
||||||
_, err = st.Exec(r.Key, r.Value, nil)
|
_, err = st.Exec(r.Key, r.Value, metadata, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
45
store/cockroach/metadata.go
Normal file
45
store/cockroach/metadata.go
Normal file
@ -0,0 +1,45 @@
|
|||||||
|
package cockroach
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql/driver"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
// https://github.com/upper/db/blob/master/postgresql/custom_types.go#L43
|
||||||
|
type Metadata map[string]interface{}
|
||||||
|
|
||||||
|
// Scan satisfies the sql.Scanner interface.
|
||||||
|
func (m *Metadata) Scan(src interface{}) error {
|
||||||
|
source, ok := src.([]byte)
|
||||||
|
if !ok {
|
||||||
|
return errors.New("Type assertion .([]byte) failed.")
|
||||||
|
}
|
||||||
|
|
||||||
|
var i interface{}
|
||||||
|
err := json.Unmarshal(source, &i)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
*m, ok = i.(map[string]interface{})
|
||||||
|
if !ok {
|
||||||
|
return errors.New("Type assertion .(map[string]interface{}) failed.")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Value satisfies the driver.Valuer interface.
|
||||||
|
func (m Metadata) Value() (driver.Value, error) {
|
||||||
|
j, err := json.Marshal(m)
|
||||||
|
return j, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func toMetadata(m *Metadata) map[string]interface{} {
|
||||||
|
md := make(map[string]interface{})
|
||||||
|
for k, v := range *m {
|
||||||
|
md[k] = v
|
||||||
|
}
|
||||||
|
return md
|
||||||
|
}
|
@ -54,6 +54,7 @@ type fileHandle struct {
|
|||||||
type record struct {
|
type record struct {
|
||||||
Key string
|
Key string
|
||||||
Value []byte
|
Value []byte
|
||||||
|
Metadata map[string]interface{}
|
||||||
ExpiresAt time.Time
|
ExpiresAt time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -221,6 +222,11 @@ func (m *fileStore) get(fd *fileHandle, k string) (*store.Record, error) {
|
|||||||
newRecord := &store.Record{}
|
newRecord := &store.Record{}
|
||||||
newRecord.Key = storedRecord.Key
|
newRecord.Key = storedRecord.Key
|
||||||
newRecord.Value = storedRecord.Value
|
newRecord.Value = storedRecord.Value
|
||||||
|
newRecord.Metadata = make(map[string]interface{})
|
||||||
|
|
||||||
|
for k, v := range storedRecord.Metadata {
|
||||||
|
newRecord.Metadata[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
if !storedRecord.ExpiresAt.IsZero() {
|
if !storedRecord.ExpiresAt.IsZero() {
|
||||||
if storedRecord.ExpiresAt.Before(time.Now()) {
|
if storedRecord.ExpiresAt.Before(time.Now()) {
|
||||||
@ -238,10 +244,16 @@ func (m *fileStore) set(fd *fileHandle, r *store.Record) error {
|
|||||||
item := &record{}
|
item := &record{}
|
||||||
item.Key = r.Key
|
item.Key = r.Key
|
||||||
item.Value = r.Value
|
item.Value = r.Value
|
||||||
|
item.Metadata = make(map[string]interface{})
|
||||||
|
|
||||||
if r.Expiry != 0 {
|
if r.Expiry != 0 {
|
||||||
item.ExpiresAt = time.Now().Add(r.Expiry)
|
item.ExpiresAt = time.Now().Add(r.Expiry)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for k, v := range r.Metadata {
|
||||||
|
item.Metadata[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
// marshal the data
|
// marshal the data
|
||||||
data, _ := json.Marshal(item)
|
data, _ := json.Marshal(item)
|
||||||
|
|
||||||
@ -348,6 +360,7 @@ func (m *fileStore) Write(r *store.Record, opts ...store.WriteOption) error {
|
|||||||
newRecord := store.Record{}
|
newRecord := store.Record{}
|
||||||
newRecord.Key = r.Key
|
newRecord.Key = r.Key
|
||||||
newRecord.Value = r.Value
|
newRecord.Value = r.Value
|
||||||
|
newRecord.Metadata = make(map[string]interface{})
|
||||||
newRecord.Expiry = r.Expiry
|
newRecord.Expiry = r.Expiry
|
||||||
|
|
||||||
if !writeOpts.Expiry.IsZero() {
|
if !writeOpts.Expiry.IsZero() {
|
||||||
@ -357,6 +370,10 @@ func (m *fileStore) Write(r *store.Record, opts ...store.WriteOption) error {
|
|||||||
newRecord.Expiry = writeOpts.TTL
|
newRecord.Expiry = writeOpts.TTL
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for k, v := range r.Metadata {
|
||||||
|
newRecord.Metadata[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
return m.set(fd, &newRecord)
|
return m.set(fd, &newRecord)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -33,9 +33,10 @@ type memoryStore struct {
|
|||||||
store *cache.Cache
|
store *cache.Cache
|
||||||
}
|
}
|
||||||
|
|
||||||
type internalRecord struct {
|
type storeRecord struct {
|
||||||
key string
|
key string
|
||||||
value []byte
|
value []byte
|
||||||
|
metadata map[string]interface{}
|
||||||
expiresAt time.Time
|
expiresAt time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -56,26 +57,36 @@ func (m *memoryStore) prefix(database, table string) string {
|
|||||||
func (m *memoryStore) get(prefix, key string) (*store.Record, error) {
|
func (m *memoryStore) get(prefix, key string) (*store.Record, error) {
|
||||||
key = m.key(prefix, key)
|
key = m.key(prefix, key)
|
||||||
|
|
||||||
var storedRecord *internalRecord
|
var storedRecord *storeRecord
|
||||||
r, found := m.store.Get(key)
|
r, found := m.store.Get(key)
|
||||||
if !found {
|
if !found {
|
||||||
return nil, store.ErrNotFound
|
return nil, store.ErrNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
storedRecord, ok := r.(*internalRecord)
|
storedRecord, ok := r.(*storeRecord)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, errors.New("Retrieved a non *internalRecord from the cache")
|
return nil, errors.New("Retrieved a non *storeRecord from the cache")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Copy the record on the way out
|
// Copy the record on the way out
|
||||||
newRecord := &store.Record{}
|
newRecord := &store.Record{}
|
||||||
newRecord.Key = strings.TrimPrefix(storedRecord.key, prefix+"/")
|
newRecord.Key = strings.TrimPrefix(storedRecord.key, prefix+"/")
|
||||||
newRecord.Value = make([]byte, len(storedRecord.value))
|
newRecord.Value = make([]byte, len(storedRecord.value))
|
||||||
|
newRecord.Metadata = make(map[string]interface{})
|
||||||
|
|
||||||
|
// copy the value into the new record
|
||||||
copy(newRecord.Value, storedRecord.value)
|
copy(newRecord.Value, storedRecord.value)
|
||||||
|
|
||||||
|
// check if we need to set the expiry
|
||||||
if !storedRecord.expiresAt.IsZero() {
|
if !storedRecord.expiresAt.IsZero() {
|
||||||
newRecord.Expiry = time.Until(storedRecord.expiresAt)
|
newRecord.Expiry = time.Until(storedRecord.expiresAt)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// copy in the metadata
|
||||||
|
for k, v := range storedRecord.metadata {
|
||||||
|
newRecord.Metadata[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
return newRecord, nil
|
return newRecord, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -84,15 +95,24 @@ func (m *memoryStore) set(prefix string, r *store.Record) {
|
|||||||
|
|
||||||
// copy the incoming record and then
|
// copy the incoming record and then
|
||||||
// convert the expiry in to a hard timestamp
|
// convert the expiry in to a hard timestamp
|
||||||
i := &internalRecord{}
|
i := &storeRecord{}
|
||||||
i.key = r.Key
|
i.key = r.Key
|
||||||
i.value = make([]byte, len(r.Value))
|
i.value = make([]byte, len(r.Value))
|
||||||
|
i.metadata = make(map[string]interface{})
|
||||||
|
|
||||||
|
// copy the the value
|
||||||
copy(i.value, r.Value)
|
copy(i.value, r.Value)
|
||||||
|
|
||||||
|
// set the expiry
|
||||||
if r.Expiry != 0 {
|
if r.Expiry != 0 {
|
||||||
i.expiresAt = time.Now().Add(r.Expiry)
|
i.expiresAt = time.Now().Add(r.Expiry)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// set the metadata
|
||||||
|
for k, v := range r.Metadata {
|
||||||
|
i.metadata[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
m.store.Set(key, i, r.Expiry)
|
m.store.Set(key, i, r.Expiry)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -199,6 +219,7 @@ func (m *memoryStore) Write(r *store.Record, opts ...store.WriteOption) error {
|
|||||||
newRecord := store.Record{}
|
newRecord := store.Record{}
|
||||||
newRecord.Key = r.Key
|
newRecord.Key = r.Key
|
||||||
newRecord.Value = make([]byte, len(r.Value))
|
newRecord.Value = make([]byte, len(r.Value))
|
||||||
|
newRecord.Metadata = make(map[string]interface{})
|
||||||
copy(newRecord.Value, r.Value)
|
copy(newRecord.Value, r.Value)
|
||||||
newRecord.Expiry = r.Expiry
|
newRecord.Expiry = r.Expiry
|
||||||
|
|
||||||
@ -208,6 +229,11 @@ func (m *memoryStore) Write(r *store.Record, opts ...store.WriteOption) error {
|
|||||||
if writeOpts.TTL != 0 {
|
if writeOpts.TTL != 0 {
|
||||||
newRecord.Expiry = writeOpts.TTL
|
newRecord.Expiry = writeOpts.TTL
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for k, v := range r.Metadata {
|
||||||
|
newRecord.Metadata[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
m.set(prefix, &newRecord)
|
m.set(prefix, &newRecord)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
File diff suppressed because it is too large
Load Diff
@ -1,5 +1,5 @@
|
|||||||
// Code generated by protoc-gen-micro. DO NOT EDIT.
|
// Code generated by protoc-gen-micro. DO NOT EDIT.
|
||||||
// source: store/service/proto/store.proto
|
// source: github.com/micro/go-micro/store/service/proto/store.proto
|
||||||
|
|
||||||
package go_micro_store
|
package go_micro_store
|
||||||
|
|
||||||
|
@ -11,6 +11,13 @@ service Store {
|
|||||||
rpc Tables(TablesRequest) returns (TablesResponse) {};
|
rpc Tables(TablesRequest) returns (TablesResponse) {};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message Field {
|
||||||
|
// type of value e.g string, int, int64, bool, float64
|
||||||
|
string type = 1;
|
||||||
|
// the actual value
|
||||||
|
string value = 2;
|
||||||
|
}
|
||||||
|
|
||||||
message Record {
|
message Record {
|
||||||
// key of the record
|
// key of the record
|
||||||
string key = 1;
|
string key = 1;
|
||||||
@ -18,6 +25,8 @@ message Record {
|
|||||||
bytes value = 2;
|
bytes value = 2;
|
||||||
// time.Duration (signed int64 nanoseconds)
|
// time.Duration (signed int64 nanoseconds)
|
||||||
int64 expiry = 3;
|
int64 expiry = 3;
|
||||||
|
// the associated metadata
|
||||||
|
map<string,Field> metadata = 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
message ReadOptions {
|
message ReadOptions {
|
||||||
|
@ -3,7 +3,9 @@ package service
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"reflect"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/micro/go-micro/v2/client"
|
"github.com/micro/go-micro/v2/client"
|
||||||
@ -137,10 +139,21 @@ func (s *serviceStore) Read(key string, opts ...store.ReadOption) ([]*store.Reco
|
|||||||
records := make([]*store.Record, 0, len(rsp.Records))
|
records := make([]*store.Record, 0, len(rsp.Records))
|
||||||
|
|
||||||
for _, val := range rsp.Records {
|
for _, val := range rsp.Records {
|
||||||
|
metadata := make(map[string]interface{})
|
||||||
|
|
||||||
|
for k, v := range val.Metadata {
|
||||||
|
switch v.Type {
|
||||||
|
// TODO: parse all types
|
||||||
|
default:
|
||||||
|
metadata[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
records = append(records, &store.Record{
|
records = append(records, &store.Record{
|
||||||
Key: val.Key,
|
Key: val.Key,
|
||||||
Value: val.Value,
|
Value: val.Value,
|
||||||
Expiry: time.Duration(val.Expiry) * time.Second,
|
Expiry: time.Duration(val.Expiry) * time.Second,
|
||||||
|
Metadata: metadata,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -163,11 +176,21 @@ func (s *serviceStore) Write(record *store.Record, opts ...store.WriteOption) er
|
|||||||
Table: options.Table,
|
Table: options.Table,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
metadata := make(map[string]*pb.Field)
|
||||||
|
|
||||||
|
for k, v := range record.Metadata {
|
||||||
|
metadata[k] = &pb.Field{
|
||||||
|
Type: reflect.TypeOf(v).String(),
|
||||||
|
Value: fmt.Sprintf("%v", v),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
_, err := s.Client.Write(s.Context(), &pb.WriteRequest{
|
_, err := s.Client.Write(s.Context(), &pb.WriteRequest{
|
||||||
Record: &pb.Record{
|
Record: &pb.Record{
|
||||||
Key: record.Key,
|
Key: record.Key,
|
||||||
Value: record.Value,
|
Value: record.Value,
|
||||||
Expiry: int64(record.Expiry.Seconds()),
|
Expiry: int64(record.Expiry.Seconds()),
|
||||||
|
Metadata: metadata,
|
||||||
},
|
},
|
||||||
Options: writeOpts}, client.WithAddress(s.Nodes...))
|
Options: writeOpts}, client.WithAddress(s.Nodes...))
|
||||||
if err != nil && errors.Equal(err, errors.NotFound("", "")) {
|
if err != nil && errors.Equal(err, errors.NotFound("", "")) {
|
||||||
|
@ -36,7 +36,12 @@ type Store interface {
|
|||||||
|
|
||||||
// Record is an item stored or retrieved from a Store
|
// Record is an item stored or retrieved from a Store
|
||||||
type Record struct {
|
type Record struct {
|
||||||
Key string `json:"key"`
|
// The key to store the record
|
||||||
Value []byte `json:"value"`
|
Key string `json:"key"`
|
||||||
|
// The value within the record
|
||||||
|
Value []byte `json:"value"`
|
||||||
|
// Any associated metadata for indexing
|
||||||
|
Metadata map[string]interface{} `json:"metadata"`
|
||||||
|
// Time to expire a record: TODO: change to timestamp
|
||||||
Expiry time.Duration `json:"expiry,omitempty"`
|
Expiry time.Duration `json:"expiry,omitempty"`
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user