Change the store interface to remove variadic args (#1095)

This commit is contained in:
Asim Aslam 2020-01-08 22:23:14 +00:00 committed by GitHub
parent 78aed5beed
commit a90a74c9e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 356 additions and 378 deletions

View File

@ -106,15 +106,19 @@ func (w *workersKV) Init(opts ...store.Option) error {
return nil
}
// In the cloudflare workers KV implemention, List() doesn't guarantee
// anything as the workers API is eventually consistent.
func (w *workersKV) List() ([]*store.Record, error) {
func (w *workersKV) list(prefix string) ([]string, error) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
path := fmt.Sprintf("accounts/%s/storage/kv/namespaces/%s/keys", w.account, w.namespace)
response, _, _, err := w.request(ctx, http.MethodGet, path, nil, make(http.Header))
body := make(map[string]string)
if len(prefix) > 0 {
body["prefix"] = prefix
}
response, _, _, err := w.request(ctx, http.MethodGet, path, body, make(http.Header))
if err != nil {
return nil, err
}
@ -138,13 +142,51 @@ func (w *workersKV) List() ([]*store.Record, error) {
keys = append(keys, r.Name)
}
return w.Read(keys...)
return keys, nil
}
func (w *workersKV) Read(keys ...string) ([]*store.Record, error) {
// In the cloudflare workers KV implemention, List() doesn't guarantee
// anything as the workers API is eventually consistent.
func (w *workersKV) List() ([]*store.Record, error) {
keys, err := w.list("")
if err != nil {
return nil, err
}
var gerr error
var records []*store.Record
for _, key := range keys {
r, err := w.Read(key)
if err != nil {
gerr = err
continue
}
records = append(records, r...)
}
return records, gerr
}
func (w *workersKV) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
var options store.ReadOptions
for _, o := range opts {
o(&options)
}
keys := []string{key}
if options.Prefix {
k, err := w.list(key)
if err != nil {
return nil, err
}
keys = k
}
//nolint:prealloc
var records []*store.Record
@ -174,65 +216,61 @@ func (w *workersKV) Read(keys ...string) ([]*store.Record, error) {
return records, nil
}
func (w *workersKV) Write(records ...*store.Record) error {
func (w *workersKV) Write(r *store.Record) error {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
for _, r := range records {
path := fmt.Sprintf("accounts/%s/storage/kv/namespaces/%s/values/%s", w.account, w.namespace, url.PathEscape(r.Key))
if r.Expiry != 0 {
// Minimum cloudflare TTL is 60 Seconds
exp := int(math.Max(60, math.Round(r.Expiry.Seconds())))
path = path + "?expiration_ttl=" + strconv.Itoa(exp)
}
path := fmt.Sprintf("accounts/%s/storage/kv/namespaces/%s/values/%s", w.account, w.namespace, url.PathEscape(r.Key))
if r.Expiry != 0 {
// Minimum cloudflare TTL is 60 Seconds
exp := int(math.Max(60, math.Round(r.Expiry.Seconds())))
path = path + "?expiration_ttl=" + strconv.Itoa(exp)
}
headers := make(http.Header)
headers := make(http.Header)
resp, _, _, err := w.request(ctx, http.MethodPut, path, r.Value, headers)
if err != nil {
return err
}
resp, _, _, err := w.request(ctx, http.MethodPut, path, r.Value, headers)
if err != nil {
return err
}
a := &apiResponse{}
if err := json.Unmarshal(resp, a); err != nil {
return err
}
a := &apiResponse{}
if err := json.Unmarshal(resp, a); err != nil {
return err
}
if !a.Success {
messages := ""
for _, m := range a.Errors {
messages += strconv.Itoa(m.Code) + " " + m.Message + "\n"
}
return errors.New(messages)
if !a.Success {
messages := ""
for _, m := range a.Errors {
messages += strconv.Itoa(m.Code) + " " + m.Message + "\n"
}
return errors.New(messages)
}
return nil
}
func (w *workersKV) Delete(keys ...string) error {
func (w *workersKV) Delete(key string) error {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
for _, k := range keys {
path := fmt.Sprintf("accounts/%s/storage/kv/namespaces/%s/values/%s", w.account, w.namespace, url.PathEscape(k))
resp, _, _, err := w.request(ctx, http.MethodDelete, path, nil, make(http.Header))
if err != nil {
return err
}
path := fmt.Sprintf("accounts/%s/storage/kv/namespaces/%s/values/%s", w.account, w.namespace, url.PathEscape(key))
resp, _, _, err := w.request(ctx, http.MethodDelete, path, nil, make(http.Header))
if err != nil {
return err
}
a := &apiResponse{}
if err := json.Unmarshal(resp, a); err != nil {
return err
}
a := &apiResponse{}
if err := json.Unmarshal(resp, a); err != nil {
return err
}
if !a.Success {
messages := ""
for _, m := range a.Errors {
messages += strconv.Itoa(m.Code) + " " + m.Message + "\n"
}
return errors.New(messages)
if !a.Success {
messages := ""
for _, m := range a.Errors {
messages += strconv.Itoa(m.Code) + " " + m.Message + "\n"
}
return errors.New(messages)
}
return nil

View File

@ -33,17 +33,18 @@ func TestCloudflare(t *testing.T) {
t.Log("Listed " + strconv.Itoa(len(records)) + " records")
}
err = wkv.Write(
&store.Record{
Key: randomK,
Value: []byte(randomV),
},
&store.Record{
Key: "expirationtest",
Value: []byte("This message will self destruct"),
Expiry: 75 * time.Second,
},
)
err = wkv.Write(&store.Record{
Key: randomK,
Value: []byte(randomV),
})
if err != nil {
t.Errorf("Write: %s", err.Error())
}
err = wkv.Write(&store.Record{
Key: "expirationtest",
Value: []byte("This message will self destruct"),
Expiry: 75 * time.Second,
})
if err != nil {
t.Errorf("Write: %s", err.Error())
}

View File

@ -80,39 +80,47 @@ func (s *sqlStore) List() ([]*store.Record, error) {
}
// Read all records with keys
func (s *sqlStore) Read(keys ...string) ([]*store.Record, error) {
func (s *sqlStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) {
var options store.ReadOptions
for _, o := range opts {
o(&options)
}
// TODO: make use of options.Prefix using WHERE key LIKE = ?
q, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM %s.%s WHERE key = $1;", s.database, s.table))
if err != nil {
return nil, err
}
var records []*store.Record
var timehelper pq.NullTime
for _, key := range keys {
row := q.QueryRow(key)
record := &store.Record{}
if err := row.Scan(&record.Key, &record.Value, &timehelper); err != nil {
if err == sql.ErrNoRows {
return records, store.ErrNotFound
}
return records, err
}
if timehelper.Valid {
if timehelper.Time.Before(time.Now()) {
// record has expired
go s.Delete(key)
return records, store.ErrNotFound
}
record.Expiry = time.Until(timehelper.Time)
records = append(records, record)
} else {
records = append(records, record)
row := q.QueryRow(key)
record := &store.Record{}
if err := row.Scan(&record.Key, &record.Value, &timehelper); err != nil {
if err == sql.ErrNoRows {
return records, store.ErrNotFound
}
return records, err
}
if timehelper.Valid {
if timehelper.Time.Before(time.Now()) {
// record has expired
go s.Delete(key)
return records, store.ErrNotFound
}
record.Expiry = time.Until(timehelper.Time)
records = append(records, record)
} else {
records = append(records, record)
}
return records, nil
}
// Write records
func (s *sqlStore) Write(rec ...*store.Record) error {
func (s *sqlStore) Write(r *store.Record) error {
q, err := s.db.Prepare(fmt.Sprintf(`INSERT INTO %s.%s(key, value, expiry)
VALUES ($1, $2::bytea, $3)
ON CONFLICT (key)
@ -121,37 +129,36 @@ func (s *sqlStore) Write(rec ...*store.Record) error {
if err != nil {
return err
}
for _, r := range rec {
var err error
if r.Expiry != 0 {
_, err = q.Exec(r.Key, r.Value, time.Now().Add(r.Expiry))
} else {
_, err = q.Exec(r.Key, r.Value, nil)
}
if err != nil {
return errors.Wrap(err, "Couldn't insert record "+r.Key)
}
if r.Expiry != 0 {
_, err = q.Exec(r.Key, r.Value, time.Now().Add(r.Expiry))
} else {
_, err = q.Exec(r.Key, r.Value, nil)
}
if err != nil {
return errors.Wrap(err, "Couldn't insert record "+r.Key)
}
return nil
}
// Delete records with keys
func (s *sqlStore) Delete(keys ...string) error {
func (s *sqlStore) Delete(key string) error {
q, err := s.db.Prepare(fmt.Sprintf("DELETE FROM %s.%s WHERE key = $1;", s.database, s.table))
if err != nil {
return err
}
for _, key := range keys {
result, err := q.Exec(key)
if err != nil {
return err
}
_, err = result.RowsAffected()
if err != nil {
return err
}
result, err := q.Exec(key)
if err != nil {
return err
}
_, err = result.RowsAffected()
if err != nil {
return err
}
return nil
}

View File

@ -44,10 +44,20 @@ func TestSQL(t *testing.T) {
Key: "test",
Value: []byte("foo"),
},
)
if err != nil {
t.Error(err)
}
err = sqlStore.Write(
&store.Record{
Key: "bar",
Value: []byte("baz"),
},
)
if err != nil {
t.Error(err)
}
err = sqlStore.Write(
&store.Record{
Key: "qux",
Value: []byte("aasad"),

View File

@ -6,7 +6,6 @@ import (
"log"
client "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/micro/go-micro/store"
)
@ -22,26 +21,31 @@ func (e *ekv) Init(opts ...store.Option) error {
return nil
}
func (e *ekv) Read(keys ...string) ([]*store.Record, error) {
//nolint:prealloc
var values []*mvccpb.KeyValue
for _, key := range keys {
keyval, err := e.kv.Get(context.Background(), key)
if err != nil {
return nil, err
}
if keyval == nil || len(keyval.Kvs) == 0 {
return nil, store.ErrNotFound
}
values = append(values, keyval.Kvs...)
func (e *ekv) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) {
var options store.ReadOptions
for _, o := range opts {
o(&options)
}
records := make([]*store.Record, 0, len(values))
var etcdOpts []client.OpOption
for _, kv := range values {
// set options prefix
if options.Prefix {
etcdOpts = append(etcdOpts, client.WithPrefix())
}
keyval, err := e.kv.Get(context.Background(), key, etcdOpts...)
if err != nil {
return nil, err
}
if keyval == nil || len(keyval.Kvs) == 0 {
return nil, store.ErrNotFound
}
records := make([]*store.Record, 0, len(keyval.Kvs))
for _, kv := range keyval.Kvs {
records = append(records, &store.Record{
Key: string(kv.Key),
Value: kv.Value,
@ -52,27 +56,15 @@ func (e *ekv) Read(keys ...string) ([]*store.Record, error) {
return records, nil
}
func (e *ekv) Delete(keys ...string) error {
var gerr error
for _, key := range keys {
_, err := e.kv.Delete(context.Background(), key)
if err != nil {
gerr = err
}
}
return gerr
func (e *ekv) Delete(key string) error {
_, err := e.kv.Delete(context.Background(), key)
return err
}
func (e *ekv) Write(records ...*store.Record) error {
var gerr error
for _, record := range records {
// TODO create lease to expire keys
_, err := e.kv.Put(context.Background(), record.Key, string(record.Value))
if err != nil {
gerr = err
}
}
return gerr
func (e *ekv) Write(record *store.Record) error {
// TODO create lease to expire keys
_, err := e.kv.Put(context.Background(), record.Key, string(record.Value))
return err
}
func (e *ekv) List() ([]*store.Record, error) {

View File

@ -2,6 +2,7 @@
package memory
import (
"strings"
"sync"
"time"
@ -55,19 +56,37 @@ func (m *memoryStore) List() ([]*store.Record, error) {
return values, nil
}
func (m *memoryStore) Read(keys ...string) ([]*store.Record, error) {
func (m *memoryStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) {
m.RLock()
defer m.RUnlock()
//nolint:prealloc
var records []*store.Record
var options store.ReadOptions
for _, key := range keys {
for _, o := range opts {
o(&options)
}
var vals []*memoryRecord
if !options.Prefix {
v, ok := m.values[key]
if !ok {
return nil, store.ErrNotFound
}
vals = []*memoryRecord{v}
} else {
for _, v := range m.values {
if !strings.HasPrefix(v.r.Key, key) {
continue
}
vals = append(vals, v)
}
}
//nolint:prealloc
var records []*store.Record
for _, v := range vals {
// get expiry
d := v.r.Expiry
t := time.Since(v.c)
@ -88,29 +107,25 @@ func (m *memoryStore) Read(keys ...string) ([]*store.Record, error) {
return records, nil
}
func (m *memoryStore) Write(records ...*store.Record) error {
func (m *memoryStore) Write(r *store.Record) error {
m.Lock()
defer m.Unlock()
for _, r := range records {
// set the record
m.values[r.Key] = &memoryRecord{
r: r,
c: time.Now(),
}
// set the record
m.values[r.Key] = &memoryRecord{
r: r,
c: time.Now(),
}
return nil
}
func (m *memoryStore) Delete(keys ...string) error {
func (m *memoryStore) Delete(key string) error {
m.Lock()
defer m.Unlock()
for _, key := range keys {
// delete the value
delete(m.values, key)
}
// delete the value
delete(m.values, key)
return nil
}

View File

@ -1,107 +0,0 @@
// Code generated by mockery v1.0.0. DO NOT EDIT.
package mock
import mock "github.com/stretchr/testify/mock"
import store "github.com/micro/go-micro/store"
// Store is an autogenerated mock type for the Store type
type Store struct {
mock.Mock
}
func (_m *Store) Init(...store.Option) error {
return nil
}
// Delete provides a mock function with given fields: key
func (_m *Store) Delete(key ...string) error {
_va := make([]interface{}, len(key))
for _i := range key {
_va[_i] = key[_i]
}
var _ca []interface{}
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 error
if rf, ok := ret.Get(0).(func(...string) error); ok {
r0 = rf(key...)
} else {
r0 = ret.Error(0)
}
return r0
}
// List provides a mock function with given fields:
func (_m *Store) List() ([]*store.Record, error) {
ret := _m.Called()
var r0 []*store.Record
if rf, ok := ret.Get(0).(func() []*store.Record); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*store.Record)
}
}
var r1 error
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Read provides a mock function with given fields: key
func (_m *Store) Read(key ...string) ([]*store.Record, error) {
_va := make([]interface{}, len(key))
for _i := range key {
_va[_i] = key[_i]
}
var _ca []interface{}
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 []*store.Record
if rf, ok := ret.Get(0).(func(...string) []*store.Record); ok {
r0 = rf(key...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*store.Record)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(...string) error); ok {
r1 = rf(key...)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Write provides a mock function with given fields: rec
func (_m *Store) Write(rec ...*store.Record) error {
_va := make([]interface{}, len(rec))
for _i := range rec {
_va[_i] = rec[_i]
}
var _ca []interface{}
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 error
if rf, ok := ret.Get(0).(func(...*store.Record) error); ok {
r0 = rf(rec...)
} else {
r0 = ret.Error(0)
}
return r0
}

View File

@ -38,3 +38,10 @@ func Namespace(ns string) Option {
o.Namespace = ns
}
}
// ReadPrefix uses the key as a prefix
func ReadPrefix() ReadOption {
return func(o *ReadOptions) {
o.Prefix = true
}
}

View File

@ -78,18 +78,58 @@ func (m *Record) GetExpiry() int64 {
return 0
}
type ReadRequest struct {
Keys []string `protobuf:"bytes,1,rep,name=keys,proto3" json:"keys,omitempty"`
type ReadOptions struct {
Prefix bool `protobuf:"varint,1,opt,name=prefix,proto3" json:"prefix,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *ReadOptions) Reset() { *m = ReadOptions{} }
func (m *ReadOptions) String() string { return proto.CompactTextString(m) }
func (*ReadOptions) ProtoMessage() {}
func (*ReadOptions) Descriptor() ([]byte, []int) {
return fileDescriptor_f84ccc98e143ed3e, []int{1}
}
func (m *ReadOptions) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ReadOptions.Unmarshal(m, b)
}
func (m *ReadOptions) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ReadOptions.Marshal(b, m, deterministic)
}
func (m *ReadOptions) XXX_Merge(src proto.Message) {
xxx_messageInfo_ReadOptions.Merge(m, src)
}
func (m *ReadOptions) XXX_Size() int {
return xxx_messageInfo_ReadOptions.Size(m)
}
func (m *ReadOptions) XXX_DiscardUnknown() {
xxx_messageInfo_ReadOptions.DiscardUnknown(m)
}
var xxx_messageInfo_ReadOptions proto.InternalMessageInfo
func (m *ReadOptions) GetPrefix() bool {
if m != nil {
return m.Prefix
}
return false
}
type ReadRequest struct {
Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
Options *ReadOptions `protobuf:"bytes,2,opt,name=options,proto3" json:"options,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *ReadRequest) Reset() { *m = ReadRequest{} }
func (m *ReadRequest) String() string { return proto.CompactTextString(m) }
func (*ReadRequest) ProtoMessage() {}
func (*ReadRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_f84ccc98e143ed3e, []int{1}
return fileDescriptor_f84ccc98e143ed3e, []int{2}
}
func (m *ReadRequest) XXX_Unmarshal(b []byte) error {
@ -110,9 +150,16 @@ func (m *ReadRequest) XXX_DiscardUnknown() {
var xxx_messageInfo_ReadRequest proto.InternalMessageInfo
func (m *ReadRequest) GetKeys() []string {
func (m *ReadRequest) GetKey() string {
if m != nil {
return m.Keys
return m.Key
}
return ""
}
func (m *ReadRequest) GetOptions() *ReadOptions {
if m != nil {
return m.Options
}
return nil
}
@ -128,7 +175,7 @@ func (m *ReadResponse) Reset() { *m = ReadResponse{} }
func (m *ReadResponse) String() string { return proto.CompactTextString(m) }
func (*ReadResponse) ProtoMessage() {}
func (*ReadResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_f84ccc98e143ed3e, []int{2}
return fileDescriptor_f84ccc98e143ed3e, []int{3}
}
func (m *ReadResponse) XXX_Unmarshal(b []byte) error {
@ -157,17 +204,17 @@ func (m *ReadResponse) GetRecords() []*Record {
}
type WriteRequest struct {
Records []*Record `protobuf:"bytes,2,rep,name=records,proto3" json:"records,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
Record *Record `protobuf:"bytes,1,opt,name=record,proto3" json:"record,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *WriteRequest) Reset() { *m = WriteRequest{} }
func (m *WriteRequest) String() string { return proto.CompactTextString(m) }
func (*WriteRequest) ProtoMessage() {}
func (*WriteRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_f84ccc98e143ed3e, []int{3}
return fileDescriptor_f84ccc98e143ed3e, []int{4}
}
func (m *WriteRequest) XXX_Unmarshal(b []byte) error {
@ -188,9 +235,9 @@ func (m *WriteRequest) XXX_DiscardUnknown() {
var xxx_messageInfo_WriteRequest proto.InternalMessageInfo
func (m *WriteRequest) GetRecords() []*Record {
func (m *WriteRequest) GetRecord() *Record {
if m != nil {
return m.Records
return m.Record
}
return nil
}
@ -205,7 +252,7 @@ func (m *WriteResponse) Reset() { *m = WriteResponse{} }
func (m *WriteResponse) String() string { return proto.CompactTextString(m) }
func (*WriteResponse) ProtoMessage() {}
func (*WriteResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_f84ccc98e143ed3e, []int{4}
return fileDescriptor_f84ccc98e143ed3e, []int{5}
}
func (m *WriteResponse) XXX_Unmarshal(b []byte) error {
@ -227,7 +274,7 @@ func (m *WriteResponse) XXX_DiscardUnknown() {
var xxx_messageInfo_WriteResponse proto.InternalMessageInfo
type DeleteRequest struct {
Keys []string `protobuf:"bytes,1,rep,name=keys,proto3" json:"keys,omitempty"`
Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -237,7 +284,7 @@ func (m *DeleteRequest) Reset() { *m = DeleteRequest{} }
func (m *DeleteRequest) String() string { return proto.CompactTextString(m) }
func (*DeleteRequest) ProtoMessage() {}
func (*DeleteRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_f84ccc98e143ed3e, []int{5}
return fileDescriptor_f84ccc98e143ed3e, []int{6}
}
func (m *DeleteRequest) XXX_Unmarshal(b []byte) error {
@ -258,11 +305,11 @@ func (m *DeleteRequest) XXX_DiscardUnknown() {
var xxx_messageInfo_DeleteRequest proto.InternalMessageInfo
func (m *DeleteRequest) GetKeys() []string {
func (m *DeleteRequest) GetKey() string {
if m != nil {
return m.Keys
return m.Key
}
return nil
return ""
}
type DeleteResponse struct {
@ -275,7 +322,7 @@ func (m *DeleteResponse) Reset() { *m = DeleteResponse{} }
func (m *DeleteResponse) String() string { return proto.CompactTextString(m) }
func (*DeleteResponse) ProtoMessage() {}
func (*DeleteResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_f84ccc98e143ed3e, []int{6}
return fileDescriptor_f84ccc98e143ed3e, []int{7}
}
func (m *DeleteResponse) XXX_Unmarshal(b []byte) error {
@ -297,8 +344,6 @@ func (m *DeleteResponse) XXX_DiscardUnknown() {
var xxx_messageInfo_DeleteResponse proto.InternalMessageInfo
type ListRequest struct {
// optional key
Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -308,7 +353,7 @@ func (m *ListRequest) Reset() { *m = ListRequest{} }
func (m *ListRequest) String() string { return proto.CompactTextString(m) }
func (*ListRequest) ProtoMessage() {}
func (*ListRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_f84ccc98e143ed3e, []int{7}
return fileDescriptor_f84ccc98e143ed3e, []int{8}
}
func (m *ListRequest) XXX_Unmarshal(b []byte) error {
@ -329,13 +374,6 @@ func (m *ListRequest) XXX_DiscardUnknown() {
var xxx_messageInfo_ListRequest proto.InternalMessageInfo
func (m *ListRequest) GetKey() string {
if m != nil {
return m.Key
}
return ""
}
type ListResponse struct {
Records []*Record `protobuf:"bytes,1,rep,name=records,proto3" json:"records,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
@ -347,7 +385,7 @@ func (m *ListResponse) Reset() { *m = ListResponse{} }
func (m *ListResponse) String() string { return proto.CompactTextString(m) }
func (*ListResponse) ProtoMessage() {}
func (*ListResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_f84ccc98e143ed3e, []int{8}
return fileDescriptor_f84ccc98e143ed3e, []int{9}
}
func (m *ListResponse) XXX_Unmarshal(b []byte) error {
@ -377,6 +415,7 @@ func (m *ListResponse) GetRecords() []*Record {
func init() {
proto.RegisterType((*Record)(nil), "go.micro.store.Record")
proto.RegisterType((*ReadOptions)(nil), "go.micro.store.ReadOptions")
proto.RegisterType((*ReadRequest)(nil), "go.micro.store.ReadRequest")
proto.RegisterType((*ReadResponse)(nil), "go.micro.store.ReadResponse")
proto.RegisterType((*WriteRequest)(nil), "go.micro.store.WriteRequest")
@ -392,26 +431,28 @@ func init() {
}
var fileDescriptor_f84ccc98e143ed3e = []byte{
// 333 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x92, 0x4d, 0x4f, 0xc2, 0x40,
0x10, 0x86, 0x59, 0x0a, 0x35, 0x0c, 0x1f, 0x92, 0x89, 0x21, 0x0d, 0x7e, 0xd5, 0x7a, 0xe9, 0xc5,
0x42, 0xf0, 0x0f, 0x98, 0xf8, 0x11, 0x4d, 0x3c, 0xad, 0x07, 0xcf, 0x08, 0x13, 0xd2, 0x80, 0x2e,
0xee, 0x16, 0x62, 0xff, 0x90, 0xbf, 0xd3, 0xec, 0x6e, 0xab, 0xc5, 0x42, 0x62, 0xbc, 0xcd, 0xee,
0xbc, 0xf3, 0xec, 0xdb, 0x79, 0x0b, 0xd1, 0x6b, 0x3c, 0x91, 0x62, 0x30, 0x13, 0x17, 0xb6, 0x50,
0x89, 0x90, 0x34, 0x50, 0x24, 0xd7, 0xf1, 0x84, 0x06, 0x4b, 0x29, 0x92, 0xec, 0x2e, 0x32, 0x35,
0x76, 0x66, 0xc2, 0x8e, 0x44, 0xe6, 0x36, 0xb8, 0x07, 0x97, 0xd3, 0x44, 0xc8, 0x29, 0x76, 0xc1,
0x99, 0x53, 0xea, 0x31, 0x9f, 0x85, 0x0d, 0xae, 0x4b, 0x3c, 0x80, 0xfa, 0x7a, 0xbc, 0x58, 0x91,
0x57, 0xf5, 0x59, 0xd8, 0xe2, 0xf6, 0x80, 0x3d, 0x70, 0xe9, 0x63, 0x19, 0xcb, 0xd4, 0x73, 0x7c,
0x16, 0x3a, 0x3c, 0x3b, 0x05, 0x67, 0xd0, 0xe4, 0x34, 0x9e, 0x72, 0x7a, 0x5f, 0x91, 0x4a, 0x10,
0xa1, 0x36, 0xa7, 0x54, 0x79, 0xcc, 0x77, 0xc2, 0x06, 0x37, 0x75, 0x70, 0x05, 0x2d, 0x2b, 0x51,
0x4b, 0xf1, 0xa6, 0x08, 0x87, 0xb0, 0x27, 0xcd, 0xe3, 0x56, 0xd6, 0x1c, 0xf5, 0xa2, 0x4d, 0x7b,
0x91, 0xf5, 0xc6, 0x73, 0x99, 0x26, 0x3c, 0xcb, 0x38, 0xa1, 0xfc, 0x95, 0x02, 0xa1, 0xfa, 0x37,
0xc2, 0x3e, 0xb4, 0x33, 0x82, 0x35, 0x11, 0x9c, 0x43, 0xfb, 0x86, 0x16, 0xf4, 0xc3, 0xdc, 0xe6,
0xbc, 0x0b, 0x9d, 0x5c, 0x94, 0x8d, 0x9d, 0x42, 0xf3, 0x31, 0x56, 0x49, 0x3e, 0x54, 0xda, 0x9e,
0xb6, 0x6a, 0x05, 0xff, 0xfd, 0xd8, 0xd1, 0x67, 0x15, 0xea, 0x4f, 0xba, 0x83, 0xb7, 0x50, 0xd3,
0x2c, 0x3c, 0xfc, 0x3d, 0x52, 0xb0, 0xd0, 0x3f, 0xda, 0xde, 0xcc, 0xfc, 0x56, 0x86, 0x0c, 0xaf,
0xa1, 0xa6, 0xf7, 0x5f, 0xc6, 0x14, 0x82, 0x2b, 0x63, 0x8a, 0x91, 0x05, 0x15, 0xbc, 0x83, 0xba,
0x59, 0x20, 0x96, 0x84, 0xc5, 0x64, 0xfa, 0xc7, 0x3b, 0xba, 0xdf, 0x9c, 0x07, 0x70, 0xed, 0x4a,
0xb1, 0x24, 0xdd, 0xc8, 0xa3, 0x7f, 0xb2, 0xab, 0x9d, 0xa3, 0x5e, 0x5c, 0xf3, 0x6f, 0x5f, 0x7e,
0x05, 0x00, 0x00, 0xff, 0xff, 0x30, 0x48, 0x25, 0x2d, 0x0d, 0x03, 0x00, 0x00,
// 364 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x53, 0x4d, 0x4b, 0xc3, 0x40,
0x10, 0x6d, 0x9a, 0x36, 0xd5, 0x49, 0x5b, 0xcb, 0x22, 0x25, 0xd4, 0x0f, 0xe2, 0x82, 0x90, 0x8b,
0x69, 0xa9, 0x78, 0x15, 0xc1, 0x0f, 0x14, 0x04, 0x61, 0x05, 0x3d, 0xd7, 0x76, 0x2c, 0xc1, 0xda,
0x8d, 0xbb, 0x69, 0x69, 0xff, 0x90, 0xbf, 0x53, 0xb2, 0xbb, 0xd1, 0x94, 0x34, 0x17, 0x6f, 0x33,
0xfb, 0xde, 0xbc, 0x99, 0x79, 0xc3, 0x42, 0xf8, 0x19, 0x8d, 0x05, 0xef, 0x4f, 0xf9, 0x99, 0x0e,
0x64, 0xc2, 0x05, 0xf6, 0x25, 0x8a, 0x65, 0x34, 0xc6, 0x7e, 0x2c, 0x78, 0x62, 0xde, 0x42, 0x15,
0x93, 0xf6, 0x94, 0xeb, 0x92, 0x50, 0xbd, 0xd2, 0x7b, 0x70, 0x18, 0x8e, 0xb9, 0x98, 0x90, 0x0e,
0xd8, 0x1f, 0xb8, 0xf6, 0x2c, 0xdf, 0x0a, 0x76, 0x59, 0x1a, 0x92, 0x7d, 0xa8, 0x2f, 0x47, 0xb3,
0x05, 0x7a, 0x55, 0xdf, 0x0a, 0x9a, 0x4c, 0x27, 0xa4, 0x0b, 0x0e, 0xae, 0xe2, 0x48, 0xac, 0x3d,
0xdb, 0xb7, 0x02, 0x9b, 0x99, 0x8c, 0x9e, 0x82, 0xcb, 0x70, 0x34, 0x79, 0x8a, 0x93, 0x88, 0xcf,
0x65, 0x4a, 0x8b, 0x05, 0xbe, 0x47, 0x2b, 0xa5, 0xb8, 0xc3, 0x4c, 0x46, 0x5f, 0x34, 0x8d, 0xe1,
0xd7, 0x02, 0x65, 0xb2, 0xa5, 0xeb, 0x05, 0x34, 0xb8, 0xd6, 0x50, 0x7d, 0xdd, 0xe1, 0x41, 0xb8,
0x39, 0x73, 0x98, 0x6b, 0xc3, 0x32, 0x2e, 0xbd, 0x82, 0xa6, 0xd6, 0x95, 0x31, 0x9f, 0x4b, 0x24,
0x03, 0x68, 0x08, 0xb5, 0x98, 0xf4, 0x2c, 0xdf, 0x0e, 0xdc, 0x61, 0xb7, 0x28, 0x93, 0xc2, 0x2c,
0xa3, 0xd1, 0x4b, 0x68, 0xbe, 0x8a, 0x28, 0xc1, 0x6c, 0xb4, 0x10, 0x1c, 0x0d, 0xa9, 0xe9, 0xca,
0x05, 0x0c, 0x8b, 0xee, 0x41, 0xcb, 0xd4, 0xeb, 0x11, 0xe8, 0x09, 0xb4, 0x6e, 0x70, 0x86, 0x7f,
0x8a, 0x85, 0x65, 0x69, 0x07, 0xda, 0x19, 0xc5, 0x14, 0xb5, 0xc0, 0x7d, 0x8c, 0x64, 0x62, 0x4a,
0xd2, 0xb5, 0x74, 0xfa, 0xdf, 0xb5, 0x86, 0xdf, 0x55, 0xa8, 0x3f, 0xa7, 0x08, 0xb9, 0x85, 0x5a,
0xaa, 0x45, 0x0a, 0x86, 0xe6, 0x1a, 0xf6, 0x0e, 0xb7, 0x83, 0x66, 0xba, 0xca, 0xc0, 0x22, 0xd7,
0x50, 0x4b, 0x9d, 0x26, 0x5b, 0xef, 0x52, 0x2a, 0x93, 0x3f, 0x0e, 0xad, 0x90, 0x3b, 0xa8, 0x2b,
0xb3, 0x48, 0x81, 0x98, 0xbf, 0x41, 0xef, 0xa8, 0x04, 0xfd, 0xd5, 0x79, 0x00, 0x47, 0x1b, 0x48,
0x0a, 0xd4, 0x0d, 0xef, 0x7b, 0xc7, 0x65, 0x70, 0x26, 0xf5, 0xe6, 0xa8, 0x1f, 0x72, 0xfe, 0x13,
0x00, 0x00, 0xff, 0xff, 0xf9, 0x20, 0x54, 0x71, 0x53, 0x03, 0x00, 0x00,
}

View File

@ -18,8 +18,13 @@ message Record {
int64 expiry = 3;
}
message ReadOptions {
bool prefix = 1;
}
message ReadRequest {
repeated string keys = 1;
string key = 1;
ReadOptions options = 2;
}
message ReadResponse {
@ -27,21 +32,18 @@ message ReadResponse {
}
message WriteRequest {
repeated Record records = 2;
Record record = 1;
}
message WriteResponse {}
message DeleteRequest {
repeated string keys = 1;
string key = 1;
}
message DeleteResponse {}
message ListRequest {
// optional key
string key = 1;
}
message ListRequest {}
message ListResponse {
repeated Record records = 1;

View File

@ -73,6 +73,7 @@ func (s *serviceStore) List() ([]*store.Record, error) {
if err != nil {
return records, err
}
for _, record := range rsp.Records {
records = append(records, &store.Record{
Key: record.Key,
@ -86,15 +87,24 @@ func (s *serviceStore) List() ([]*store.Record, error) {
}
// Read a record with key
func (s *serviceStore) Read(keys ...string) ([]*store.Record, error) {
func (s *serviceStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) {
var options store.ReadOptions
for _, o := range opts {
o(&options)
}
rsp, err := s.Client.Read(s.Context(), &pb.ReadRequest{
Keys: keys,
Key: key,
Options: &pb.ReadOptions{
Prefix: options.Prefix,
},
}, client.WithAddress(s.Nodes...))
if err != nil {
return nil, err
}
records := make([]*store.Record, 0, len(rsp.Records))
for _, val := range rsp.Records {
records = append(records, &store.Record{
Key: val.Key,
@ -102,32 +112,27 @@ func (s *serviceStore) Read(keys ...string) ([]*store.Record, error) {
Expiry: time.Duration(val.Expiry) * time.Second,
})
}
return records, nil
}
// Write a record
func (s *serviceStore) Write(recs ...*store.Record) error {
records := make([]*pb.Record, 0, len(recs))
for _, record := range recs {
records = append(records, &pb.Record{
func (s *serviceStore) Write(record *store.Record) error {
_, err := s.Client.Write(s.Context(), &pb.WriteRequest{
Record: &pb.Record{
Key: record.Key,
Value: record.Value,
Expiry: int64(record.Expiry.Seconds()),
})
}
_, err := s.Client.Write(s.Context(), &pb.WriteRequest{
Records: records,
},
}, client.WithAddress(s.Nodes...))
return err
}
// Delete a record with key
func (s *serviceStore) Delete(keys ...string) error {
func (s *serviceStore) Delete(key string) error {
_, err := s.Client.Delete(s.Context(), &pb.DeleteRequest{
Keys: keys,
Key: key,
}, client.WithAddress(s.Nodes...))
return err
}

View File

@ -20,11 +20,11 @@ type Store interface {
// List all the known records
List() ([]*Record, error)
// Read records with keys
Read(key ...string) ([]*Record, error)
Read(key string, opts ...ReadOption) ([]*Record, error)
// Write records
Write(rec ...*Record) error
Write(*Record) error
// Delete records with keys
Delete(key ...string) error
Delete(key string) error
}
// Record represents a data record
@ -34,6 +34,13 @@ type Record struct {
Expiry time.Duration
}
type ReadOptions struct {
// Read key as a prefix
Prefix bool
}
type ReadOption func(o *ReadOptions)
type noop struct{}
func (n *noop) Init(...Option) error {
@ -44,14 +51,14 @@ func (n *noop) List() ([]*Record, error) {
return nil, nil
}
func (n *noop) Read(key ...string) ([]*Record, error) {
func (n *noop) Read(key string, opts ...ReadOption) ([]*Record, error) {
return nil, nil
}
func (n *noop) Write(rec ...*Record) error {
func (n *noop) Write(rec *Record) error {
return nil
}
func (n *noop) Delete(key ...string) error {
func (n *noop) Delete(key string) error {
return nil
}

View File

@ -1,40 +0,0 @@
package sync
import (
"testing"
"time"
"github.com/micro/go-micro/store"
store_mock "github.com/micro/go-micro/store/mock"
mem_lock "github.com/micro/go-micro/sync/lock/memory"
"github.com/stretchr/testify/mock"
)
func TestIterate(t *testing.T) {
recA := &store.Record{
Key: "A",
Value: nil,
}
recB := &store.Record{
Key: "B",
Value: nil,
}
s1 := &store_mock.Store{}
s2 := &store_mock.Store{}
s1.On("List").Return([]*store.Record{recA, recB}, nil)
s2.On("List").Return([]*store.Record{recB, recA}, nil)
s1.On("Write", mock.Anything).Return(nil)
s2.On("Write", mock.Anything).Return(nil)
f := func(key, val interface{}) error {
time.Sleep(1 * time.Millisecond)
return nil
}
l := mem_lock.NewLock()
m1 := NewMap(WithStore(s1), WithLock(l))
m2 := NewMap(WithStore(s2), WithLock(l))
go func() {
m2.Iterate(f)
}()
m1.Iterate(f)
}