From a90a74c9e28d5aabd0933a9e975837e219a5b945 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Wed, 8 Jan 2020 22:23:14 +0000 Subject: [PATCH] Change the store interface to remove variadic args (#1095) --- store/cloudflare/cloudflare.go | 130 +++++++++++++++--------- store/cloudflare/cloudflare_test.go | 23 ++--- store/cockroach/cockroach.go | 89 +++++++++-------- store/cockroach/cockroach_test.go | 10 ++ store/etcd/etcd.go | 66 ++++++------- store/memory/memory.go | 47 ++++++--- store/mock/store.go | 107 -------------------- store/options.go | 7 ++ store/service/proto/store.pb.go | 147 ++++++++++++++++++---------- store/service/proto/store.proto | 16 +-- store/service/service.go | 33 ++++--- store/store.go | 19 ++-- sync/map_test.go | 40 -------- 13 files changed, 356 insertions(+), 378 deletions(-) delete mode 100644 store/mock/store.go delete mode 100644 sync/map_test.go diff --git a/store/cloudflare/cloudflare.go b/store/cloudflare/cloudflare.go index 03e32298..9bc4b69f 100644 --- a/store/cloudflare/cloudflare.go +++ b/store/cloudflare/cloudflare.go @@ -106,15 +106,19 @@ func (w *workersKV) Init(opts ...store.Option) error { return nil } -// In the cloudflare workers KV implemention, List() doesn't guarantee -// anything as the workers API is eventually consistent. -func (w *workersKV) List() ([]*store.Record, error) { +func (w *workersKV) list(prefix string) ([]string, error) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() path := fmt.Sprintf("accounts/%s/storage/kv/namespaces/%s/keys", w.account, w.namespace) - response, _, _, err := w.request(ctx, http.MethodGet, path, nil, make(http.Header)) + body := make(map[string]string) + + if len(prefix) > 0 { + body["prefix"] = prefix + } + + response, _, _, err := w.request(ctx, http.MethodGet, path, body, make(http.Header)) if err != nil { return nil, err } @@ -138,13 +142,51 @@ func (w *workersKV) List() ([]*store.Record, error) { keys = append(keys, r.Name) } - return w.Read(keys...) + return keys, nil } -func (w *workersKV) Read(keys ...string) ([]*store.Record, error) { +// In the cloudflare workers KV implemention, List() doesn't guarantee +// anything as the workers API is eventually consistent. +func (w *workersKV) List() ([]*store.Record, error) { + keys, err := w.list("") + if err != nil { + return nil, err + } + + var gerr error + var records []*store.Record + + for _, key := range keys { + r, err := w.Read(key) + if err != nil { + gerr = err + continue + } + records = append(records, r...) + } + + return records, gerr +} + +func (w *workersKV) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() + var options store.ReadOptions + for _, o := range opts { + o(&options) + } + + keys := []string{key} + + if options.Prefix { + k, err := w.list(key) + if err != nil { + return nil, err + } + keys = k + } + //nolint:prealloc var records []*store.Record @@ -174,65 +216,61 @@ func (w *workersKV) Read(keys ...string) ([]*store.Record, error) { return records, nil } -func (w *workersKV) Write(records ...*store.Record) error { +func (w *workersKV) Write(r *store.Record) error { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - for _, r := range records { - path := fmt.Sprintf("accounts/%s/storage/kv/namespaces/%s/values/%s", w.account, w.namespace, url.PathEscape(r.Key)) - if r.Expiry != 0 { - // Minimum cloudflare TTL is 60 Seconds - exp := int(math.Max(60, math.Round(r.Expiry.Seconds()))) - path = path + "?expiration_ttl=" + strconv.Itoa(exp) - } + path := fmt.Sprintf("accounts/%s/storage/kv/namespaces/%s/values/%s", w.account, w.namespace, url.PathEscape(r.Key)) + if r.Expiry != 0 { + // Minimum cloudflare TTL is 60 Seconds + exp := int(math.Max(60, math.Round(r.Expiry.Seconds()))) + path = path + "?expiration_ttl=" + strconv.Itoa(exp) + } - headers := make(http.Header) + headers := make(http.Header) - resp, _, _, err := w.request(ctx, http.MethodPut, path, r.Value, headers) - if err != nil { - return err - } + resp, _, _, err := w.request(ctx, http.MethodPut, path, r.Value, headers) + if err != nil { + return err + } - a := &apiResponse{} - if err := json.Unmarshal(resp, a); err != nil { - return err - } + a := &apiResponse{} + if err := json.Unmarshal(resp, a); err != nil { + return err + } - if !a.Success { - messages := "" - for _, m := range a.Errors { - messages += strconv.Itoa(m.Code) + " " + m.Message + "\n" - } - return errors.New(messages) + if !a.Success { + messages := "" + for _, m := range a.Errors { + messages += strconv.Itoa(m.Code) + " " + m.Message + "\n" } + return errors.New(messages) } return nil } -func (w *workersKV) Delete(keys ...string) error { +func (w *workersKV) Delete(key string) error { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - for _, k := range keys { - path := fmt.Sprintf("accounts/%s/storage/kv/namespaces/%s/values/%s", w.account, w.namespace, url.PathEscape(k)) - resp, _, _, err := w.request(ctx, http.MethodDelete, path, nil, make(http.Header)) - if err != nil { - return err - } + path := fmt.Sprintf("accounts/%s/storage/kv/namespaces/%s/values/%s", w.account, w.namespace, url.PathEscape(key)) + resp, _, _, err := w.request(ctx, http.MethodDelete, path, nil, make(http.Header)) + if err != nil { + return err + } - a := &apiResponse{} - if err := json.Unmarshal(resp, a); err != nil { - return err - } + a := &apiResponse{} + if err := json.Unmarshal(resp, a); err != nil { + return err + } - if !a.Success { - messages := "" - for _, m := range a.Errors { - messages += strconv.Itoa(m.Code) + " " + m.Message + "\n" - } - return errors.New(messages) + if !a.Success { + messages := "" + for _, m := range a.Errors { + messages += strconv.Itoa(m.Code) + " " + m.Message + "\n" } + return errors.New(messages) } return nil diff --git a/store/cloudflare/cloudflare_test.go b/store/cloudflare/cloudflare_test.go index 525eeecd..b1586a36 100644 --- a/store/cloudflare/cloudflare_test.go +++ b/store/cloudflare/cloudflare_test.go @@ -33,17 +33,18 @@ func TestCloudflare(t *testing.T) { t.Log("Listed " + strconv.Itoa(len(records)) + " records") } - err = wkv.Write( - &store.Record{ - Key: randomK, - Value: []byte(randomV), - }, - &store.Record{ - Key: "expirationtest", - Value: []byte("This message will self destruct"), - Expiry: 75 * time.Second, - }, - ) + err = wkv.Write(&store.Record{ + Key: randomK, + Value: []byte(randomV), + }) + if err != nil { + t.Errorf("Write: %s", err.Error()) + } + err = wkv.Write(&store.Record{ + Key: "expirationtest", + Value: []byte("This message will self destruct"), + Expiry: 75 * time.Second, + }) if err != nil { t.Errorf("Write: %s", err.Error()) } diff --git a/store/cockroach/cockroach.go b/store/cockroach/cockroach.go index d321dc06..95fe34f4 100644 --- a/store/cockroach/cockroach.go +++ b/store/cockroach/cockroach.go @@ -80,39 +80,47 @@ func (s *sqlStore) List() ([]*store.Record, error) { } // Read all records with keys -func (s *sqlStore) Read(keys ...string) ([]*store.Record, error) { +func (s *sqlStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) { + var options store.ReadOptions + for _, o := range opts { + o(&options) + } + + // TODO: make use of options.Prefix using WHERE key LIKE = ? + q, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM %s.%s WHERE key = $1;", s.database, s.table)) if err != nil { return nil, err } + var records []*store.Record var timehelper pq.NullTime - for _, key := range keys { - row := q.QueryRow(key) - record := &store.Record{} - if err := row.Scan(&record.Key, &record.Value, &timehelper); err != nil { - if err == sql.ErrNoRows { - return records, store.ErrNotFound - } - return records, err - } - if timehelper.Valid { - if timehelper.Time.Before(time.Now()) { - // record has expired - go s.Delete(key) - return records, store.ErrNotFound - } - record.Expiry = time.Until(timehelper.Time) - records = append(records, record) - } else { - records = append(records, record) + + row := q.QueryRow(key) + record := &store.Record{} + if err := row.Scan(&record.Key, &record.Value, &timehelper); err != nil { + if err == sql.ErrNoRows { + return records, store.ErrNotFound } + return records, err } + if timehelper.Valid { + if timehelper.Time.Before(time.Now()) { + // record has expired + go s.Delete(key) + return records, store.ErrNotFound + } + record.Expiry = time.Until(timehelper.Time) + records = append(records, record) + } else { + records = append(records, record) + } + return records, nil } // Write records -func (s *sqlStore) Write(rec ...*store.Record) error { +func (s *sqlStore) Write(r *store.Record) error { q, err := s.db.Prepare(fmt.Sprintf(`INSERT INTO %s.%s(key, value, expiry) VALUES ($1, $2::bytea, $3) ON CONFLICT (key) @@ -121,37 +129,36 @@ func (s *sqlStore) Write(rec ...*store.Record) error { if err != nil { return err } - for _, r := range rec { - var err error - if r.Expiry != 0 { - _, err = q.Exec(r.Key, r.Value, time.Now().Add(r.Expiry)) - } else { - _, err = q.Exec(r.Key, r.Value, nil) - } - if err != nil { - return errors.Wrap(err, "Couldn't insert record "+r.Key) - } + + if r.Expiry != 0 { + _, err = q.Exec(r.Key, r.Value, time.Now().Add(r.Expiry)) + } else { + _, err = q.Exec(r.Key, r.Value, nil) + } + + if err != nil { + return errors.Wrap(err, "Couldn't insert record "+r.Key) } return nil } // Delete records with keys -func (s *sqlStore) Delete(keys ...string) error { +func (s *sqlStore) Delete(key string) error { q, err := s.db.Prepare(fmt.Sprintf("DELETE FROM %s.%s WHERE key = $1;", s.database, s.table)) if err != nil { return err } - for _, key := range keys { - result, err := q.Exec(key) - if err != nil { - return err - } - _, err = result.RowsAffected() - if err != nil { - return err - } + + result, err := q.Exec(key) + if err != nil { + return err } + _, err = result.RowsAffected() + if err != nil { + return err + } + return nil } diff --git a/store/cockroach/cockroach_test.go b/store/cockroach/cockroach_test.go index 08f20135..cb0b53a8 100644 --- a/store/cockroach/cockroach_test.go +++ b/store/cockroach/cockroach_test.go @@ -44,10 +44,20 @@ func TestSQL(t *testing.T) { Key: "test", Value: []byte("foo"), }, + ) + if err != nil { + t.Error(err) + } + err = sqlStore.Write( &store.Record{ Key: "bar", Value: []byte("baz"), }, + ) + if err != nil { + t.Error(err) + } + err = sqlStore.Write( &store.Record{ Key: "qux", Value: []byte("aasad"), diff --git a/store/etcd/etcd.go b/store/etcd/etcd.go index 5eac1e50..c1d8850e 100644 --- a/store/etcd/etcd.go +++ b/store/etcd/etcd.go @@ -6,7 +6,6 @@ import ( "log" client "github.com/coreos/etcd/clientv3" - "github.com/coreos/etcd/mvcc/mvccpb" "github.com/micro/go-micro/store" ) @@ -22,26 +21,31 @@ func (e *ekv) Init(opts ...store.Option) error { return nil } -func (e *ekv) Read(keys ...string) ([]*store.Record, error) { - //nolint:prealloc - var values []*mvccpb.KeyValue - - for _, key := range keys { - keyval, err := e.kv.Get(context.Background(), key) - if err != nil { - return nil, err - } - - if keyval == nil || len(keyval.Kvs) == 0 { - return nil, store.ErrNotFound - } - - values = append(values, keyval.Kvs...) +func (e *ekv) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) { + var options store.ReadOptions + for _, o := range opts { + o(&options) } - records := make([]*store.Record, 0, len(values)) + var etcdOpts []client.OpOption - for _, kv := range values { + // set options prefix + if options.Prefix { + etcdOpts = append(etcdOpts, client.WithPrefix()) + } + + keyval, err := e.kv.Get(context.Background(), key, etcdOpts...) + if err != nil { + return nil, err + } + + if keyval == nil || len(keyval.Kvs) == 0 { + return nil, store.ErrNotFound + } + + records := make([]*store.Record, 0, len(keyval.Kvs)) + + for _, kv := range keyval.Kvs { records = append(records, &store.Record{ Key: string(kv.Key), Value: kv.Value, @@ -52,27 +56,15 @@ func (e *ekv) Read(keys ...string) ([]*store.Record, error) { return records, nil } -func (e *ekv) Delete(keys ...string) error { - var gerr error - for _, key := range keys { - _, err := e.kv.Delete(context.Background(), key) - if err != nil { - gerr = err - } - } - return gerr +func (e *ekv) Delete(key string) error { + _, err := e.kv.Delete(context.Background(), key) + return err } -func (e *ekv) Write(records ...*store.Record) error { - var gerr error - for _, record := range records { - // TODO create lease to expire keys - _, err := e.kv.Put(context.Background(), record.Key, string(record.Value)) - if err != nil { - gerr = err - } - } - return gerr +func (e *ekv) Write(record *store.Record) error { + // TODO create lease to expire keys + _, err := e.kv.Put(context.Background(), record.Key, string(record.Value)) + return err } func (e *ekv) List() ([]*store.Record, error) { diff --git a/store/memory/memory.go b/store/memory/memory.go index 5715f41e..814a7459 100644 --- a/store/memory/memory.go +++ b/store/memory/memory.go @@ -2,6 +2,7 @@ package memory import ( + "strings" "sync" "time" @@ -55,19 +56,37 @@ func (m *memoryStore) List() ([]*store.Record, error) { return values, nil } -func (m *memoryStore) Read(keys ...string) ([]*store.Record, error) { +func (m *memoryStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) { m.RLock() defer m.RUnlock() - //nolint:prealloc - var records []*store.Record + var options store.ReadOptions - for _, key := range keys { + for _, o := range opts { + o(&options) + } + + var vals []*memoryRecord + + if !options.Prefix { v, ok := m.values[key] if !ok { return nil, store.ErrNotFound } + vals = []*memoryRecord{v} + } else { + for _, v := range m.values { + if !strings.HasPrefix(v.r.Key, key) { + continue + } + vals = append(vals, v) + } + } + //nolint:prealloc + var records []*store.Record + + for _, v := range vals { // get expiry d := v.r.Expiry t := time.Since(v.c) @@ -88,29 +107,25 @@ func (m *memoryStore) Read(keys ...string) ([]*store.Record, error) { return records, nil } -func (m *memoryStore) Write(records ...*store.Record) error { +func (m *memoryStore) Write(r *store.Record) error { m.Lock() defer m.Unlock() - for _, r := range records { - // set the record - m.values[r.Key] = &memoryRecord{ - r: r, - c: time.Now(), - } + // set the record + m.values[r.Key] = &memoryRecord{ + r: r, + c: time.Now(), } return nil } -func (m *memoryStore) Delete(keys ...string) error { +func (m *memoryStore) Delete(key string) error { m.Lock() defer m.Unlock() - for _, key := range keys { - // delete the value - delete(m.values, key) - } + // delete the value + delete(m.values, key) return nil } diff --git a/store/mock/store.go b/store/mock/store.go deleted file mode 100644 index c9908400..00000000 --- a/store/mock/store.go +++ /dev/null @@ -1,107 +0,0 @@ -// Code generated by mockery v1.0.0. DO NOT EDIT. - -package mock - -import mock "github.com/stretchr/testify/mock" -import store "github.com/micro/go-micro/store" - -// Store is an autogenerated mock type for the Store type -type Store struct { - mock.Mock -} - -func (_m *Store) Init(...store.Option) error { - return nil -} - -// Delete provides a mock function with given fields: key -func (_m *Store) Delete(key ...string) error { - _va := make([]interface{}, len(key)) - for _i := range key { - _va[_i] = key[_i] - } - var _ca []interface{} - _ca = append(_ca, _va...) - ret := _m.Called(_ca...) - - var r0 error - if rf, ok := ret.Get(0).(func(...string) error); ok { - r0 = rf(key...) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// List provides a mock function with given fields: -func (_m *Store) List() ([]*store.Record, error) { - ret := _m.Called() - - var r0 []*store.Record - if rf, ok := ret.Get(0).(func() []*store.Record); ok { - r0 = rf() - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]*store.Record) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func() error); ok { - r1 = rf() - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// Read provides a mock function with given fields: key -func (_m *Store) Read(key ...string) ([]*store.Record, error) { - _va := make([]interface{}, len(key)) - for _i := range key { - _va[_i] = key[_i] - } - var _ca []interface{} - _ca = append(_ca, _va...) - ret := _m.Called(_ca...) - - var r0 []*store.Record - if rf, ok := ret.Get(0).(func(...string) []*store.Record); ok { - r0 = rf(key...) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]*store.Record) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func(...string) error); ok { - r1 = rf(key...) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// Write provides a mock function with given fields: rec -func (_m *Store) Write(rec ...*store.Record) error { - _va := make([]interface{}, len(rec)) - for _i := range rec { - _va[_i] = rec[_i] - } - var _ca []interface{} - _ca = append(_ca, _va...) - ret := _m.Called(_ca...) - - var r0 error - if rf, ok := ret.Get(0).(func(...*store.Record) error); ok { - r0 = rf(rec...) - } else { - r0 = ret.Error(0) - } - - return r0 -} diff --git a/store/options.go b/store/options.go index 0161ed20..a319cf58 100644 --- a/store/options.go +++ b/store/options.go @@ -38,3 +38,10 @@ func Namespace(ns string) Option { o.Namespace = ns } } + +// ReadPrefix uses the key as a prefix +func ReadPrefix() ReadOption { + return func(o *ReadOptions) { + o.Prefix = true + } +} diff --git a/store/service/proto/store.pb.go b/store/service/proto/store.pb.go index 8aaa8a3f..6a05463a 100644 --- a/store/service/proto/store.pb.go +++ b/store/service/proto/store.pb.go @@ -78,18 +78,58 @@ func (m *Record) GetExpiry() int64 { return 0 } -type ReadRequest struct { - Keys []string `protobuf:"bytes,1,rep,name=keys,proto3" json:"keys,omitempty"` +type ReadOptions struct { + Prefix bool `protobuf:"varint,1,opt,name=prefix,proto3" json:"prefix,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` } +func (m *ReadOptions) Reset() { *m = ReadOptions{} } +func (m *ReadOptions) String() string { return proto.CompactTextString(m) } +func (*ReadOptions) ProtoMessage() {} +func (*ReadOptions) Descriptor() ([]byte, []int) { + return fileDescriptor_f84ccc98e143ed3e, []int{1} +} + +func (m *ReadOptions) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ReadOptions.Unmarshal(m, b) +} +func (m *ReadOptions) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ReadOptions.Marshal(b, m, deterministic) +} +func (m *ReadOptions) XXX_Merge(src proto.Message) { + xxx_messageInfo_ReadOptions.Merge(m, src) +} +func (m *ReadOptions) XXX_Size() int { + return xxx_messageInfo_ReadOptions.Size(m) +} +func (m *ReadOptions) XXX_DiscardUnknown() { + xxx_messageInfo_ReadOptions.DiscardUnknown(m) +} + +var xxx_messageInfo_ReadOptions proto.InternalMessageInfo + +func (m *ReadOptions) GetPrefix() bool { + if m != nil { + return m.Prefix + } + return false +} + +type ReadRequest struct { + Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` + Options *ReadOptions `protobuf:"bytes,2,opt,name=options,proto3" json:"options,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + func (m *ReadRequest) Reset() { *m = ReadRequest{} } func (m *ReadRequest) String() string { return proto.CompactTextString(m) } func (*ReadRequest) ProtoMessage() {} func (*ReadRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_f84ccc98e143ed3e, []int{1} + return fileDescriptor_f84ccc98e143ed3e, []int{2} } func (m *ReadRequest) XXX_Unmarshal(b []byte) error { @@ -110,9 +150,16 @@ func (m *ReadRequest) XXX_DiscardUnknown() { var xxx_messageInfo_ReadRequest proto.InternalMessageInfo -func (m *ReadRequest) GetKeys() []string { +func (m *ReadRequest) GetKey() string { if m != nil { - return m.Keys + return m.Key + } + return "" +} + +func (m *ReadRequest) GetOptions() *ReadOptions { + if m != nil { + return m.Options } return nil } @@ -128,7 +175,7 @@ func (m *ReadResponse) Reset() { *m = ReadResponse{} } func (m *ReadResponse) String() string { return proto.CompactTextString(m) } func (*ReadResponse) ProtoMessage() {} func (*ReadResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_f84ccc98e143ed3e, []int{2} + return fileDescriptor_f84ccc98e143ed3e, []int{3} } func (m *ReadResponse) XXX_Unmarshal(b []byte) error { @@ -157,17 +204,17 @@ func (m *ReadResponse) GetRecords() []*Record { } type WriteRequest struct { - Records []*Record `protobuf:"bytes,2,rep,name=records,proto3" json:"records,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + Record *Record `protobuf:"bytes,1,opt,name=record,proto3" json:"record,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *WriteRequest) Reset() { *m = WriteRequest{} } func (m *WriteRequest) String() string { return proto.CompactTextString(m) } func (*WriteRequest) ProtoMessage() {} func (*WriteRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_f84ccc98e143ed3e, []int{3} + return fileDescriptor_f84ccc98e143ed3e, []int{4} } func (m *WriteRequest) XXX_Unmarshal(b []byte) error { @@ -188,9 +235,9 @@ func (m *WriteRequest) XXX_DiscardUnknown() { var xxx_messageInfo_WriteRequest proto.InternalMessageInfo -func (m *WriteRequest) GetRecords() []*Record { +func (m *WriteRequest) GetRecord() *Record { if m != nil { - return m.Records + return m.Record } return nil } @@ -205,7 +252,7 @@ func (m *WriteResponse) Reset() { *m = WriteResponse{} } func (m *WriteResponse) String() string { return proto.CompactTextString(m) } func (*WriteResponse) ProtoMessage() {} func (*WriteResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_f84ccc98e143ed3e, []int{4} + return fileDescriptor_f84ccc98e143ed3e, []int{5} } func (m *WriteResponse) XXX_Unmarshal(b []byte) error { @@ -227,7 +274,7 @@ func (m *WriteResponse) XXX_DiscardUnknown() { var xxx_messageInfo_WriteResponse proto.InternalMessageInfo type DeleteRequest struct { - Keys []string `protobuf:"bytes,1,rep,name=keys,proto3" json:"keys,omitempty"` + Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -237,7 +284,7 @@ func (m *DeleteRequest) Reset() { *m = DeleteRequest{} } func (m *DeleteRequest) String() string { return proto.CompactTextString(m) } func (*DeleteRequest) ProtoMessage() {} func (*DeleteRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_f84ccc98e143ed3e, []int{5} + return fileDescriptor_f84ccc98e143ed3e, []int{6} } func (m *DeleteRequest) XXX_Unmarshal(b []byte) error { @@ -258,11 +305,11 @@ func (m *DeleteRequest) XXX_DiscardUnknown() { var xxx_messageInfo_DeleteRequest proto.InternalMessageInfo -func (m *DeleteRequest) GetKeys() []string { +func (m *DeleteRequest) GetKey() string { if m != nil { - return m.Keys + return m.Key } - return nil + return "" } type DeleteResponse struct { @@ -275,7 +322,7 @@ func (m *DeleteResponse) Reset() { *m = DeleteResponse{} } func (m *DeleteResponse) String() string { return proto.CompactTextString(m) } func (*DeleteResponse) ProtoMessage() {} func (*DeleteResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_f84ccc98e143ed3e, []int{6} + return fileDescriptor_f84ccc98e143ed3e, []int{7} } func (m *DeleteResponse) XXX_Unmarshal(b []byte) error { @@ -297,8 +344,6 @@ func (m *DeleteResponse) XXX_DiscardUnknown() { var xxx_messageInfo_DeleteResponse proto.InternalMessageInfo type ListRequest struct { - // optional key - Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -308,7 +353,7 @@ func (m *ListRequest) Reset() { *m = ListRequest{} } func (m *ListRequest) String() string { return proto.CompactTextString(m) } func (*ListRequest) ProtoMessage() {} func (*ListRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_f84ccc98e143ed3e, []int{7} + return fileDescriptor_f84ccc98e143ed3e, []int{8} } func (m *ListRequest) XXX_Unmarshal(b []byte) error { @@ -329,13 +374,6 @@ func (m *ListRequest) XXX_DiscardUnknown() { var xxx_messageInfo_ListRequest proto.InternalMessageInfo -func (m *ListRequest) GetKey() string { - if m != nil { - return m.Key - } - return "" -} - type ListResponse struct { Records []*Record `protobuf:"bytes,1,rep,name=records,proto3" json:"records,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` @@ -347,7 +385,7 @@ func (m *ListResponse) Reset() { *m = ListResponse{} } func (m *ListResponse) String() string { return proto.CompactTextString(m) } func (*ListResponse) ProtoMessage() {} func (*ListResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_f84ccc98e143ed3e, []int{8} + return fileDescriptor_f84ccc98e143ed3e, []int{9} } func (m *ListResponse) XXX_Unmarshal(b []byte) error { @@ -377,6 +415,7 @@ func (m *ListResponse) GetRecords() []*Record { func init() { proto.RegisterType((*Record)(nil), "go.micro.store.Record") + proto.RegisterType((*ReadOptions)(nil), "go.micro.store.ReadOptions") proto.RegisterType((*ReadRequest)(nil), "go.micro.store.ReadRequest") proto.RegisterType((*ReadResponse)(nil), "go.micro.store.ReadResponse") proto.RegisterType((*WriteRequest)(nil), "go.micro.store.WriteRequest") @@ -392,26 +431,28 @@ func init() { } var fileDescriptor_f84ccc98e143ed3e = []byte{ - // 333 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x92, 0x4d, 0x4f, 0xc2, 0x40, - 0x10, 0x86, 0x59, 0x0a, 0x35, 0x0c, 0x1f, 0x92, 0x89, 0x21, 0x0d, 0x7e, 0xd5, 0x7a, 0xe9, 0xc5, - 0x42, 0xf0, 0x0f, 0x98, 0xf8, 0x11, 0x4d, 0x3c, 0xad, 0x07, 0xcf, 0x08, 0x13, 0xd2, 0x80, 0x2e, - 0xee, 0x16, 0x62, 0xff, 0x90, 0xbf, 0xd3, 0xec, 0x6e, 0xab, 0xc5, 0x42, 0x62, 0xbc, 0xcd, 0xee, - 0xbc, 0xf3, 0xec, 0xdb, 0x79, 0x0b, 0xd1, 0x6b, 0x3c, 0x91, 0x62, 0x30, 0x13, 0x17, 0xb6, 0x50, - 0x89, 0x90, 0x34, 0x50, 0x24, 0xd7, 0xf1, 0x84, 0x06, 0x4b, 0x29, 0x92, 0xec, 0x2e, 0x32, 0x35, - 0x76, 0x66, 0xc2, 0x8e, 0x44, 0xe6, 0x36, 0xb8, 0x07, 0x97, 0xd3, 0x44, 0xc8, 0x29, 0x76, 0xc1, - 0x99, 0x53, 0xea, 0x31, 0x9f, 0x85, 0x0d, 0xae, 0x4b, 0x3c, 0x80, 0xfa, 0x7a, 0xbc, 0x58, 0x91, - 0x57, 0xf5, 0x59, 0xd8, 0xe2, 0xf6, 0x80, 0x3d, 0x70, 0xe9, 0x63, 0x19, 0xcb, 0xd4, 0x73, 0x7c, - 0x16, 0x3a, 0x3c, 0x3b, 0x05, 0x67, 0xd0, 0xe4, 0x34, 0x9e, 0x72, 0x7a, 0x5f, 0x91, 0x4a, 0x10, - 0xa1, 0x36, 0xa7, 0x54, 0x79, 0xcc, 0x77, 0xc2, 0x06, 0x37, 0x75, 0x70, 0x05, 0x2d, 0x2b, 0x51, - 0x4b, 0xf1, 0xa6, 0x08, 0x87, 0xb0, 0x27, 0xcd, 0xe3, 0x56, 0xd6, 0x1c, 0xf5, 0xa2, 0x4d, 0x7b, - 0x91, 0xf5, 0xc6, 0x73, 0x99, 0x26, 0x3c, 0xcb, 0x38, 0xa1, 0xfc, 0x95, 0x02, 0xa1, 0xfa, 0x37, - 0xc2, 0x3e, 0xb4, 0x33, 0x82, 0x35, 0x11, 0x9c, 0x43, 0xfb, 0x86, 0x16, 0xf4, 0xc3, 0xdc, 0xe6, - 0xbc, 0x0b, 0x9d, 0x5c, 0x94, 0x8d, 0x9d, 0x42, 0xf3, 0x31, 0x56, 0x49, 0x3e, 0x54, 0xda, 0x9e, - 0xb6, 0x6a, 0x05, 0xff, 0xfd, 0xd8, 0xd1, 0x67, 0x15, 0xea, 0x4f, 0xba, 0x83, 0xb7, 0x50, 0xd3, - 0x2c, 0x3c, 0xfc, 0x3d, 0x52, 0xb0, 0xd0, 0x3f, 0xda, 0xde, 0xcc, 0xfc, 0x56, 0x86, 0x0c, 0xaf, - 0xa1, 0xa6, 0xf7, 0x5f, 0xc6, 0x14, 0x82, 0x2b, 0x63, 0x8a, 0x91, 0x05, 0x15, 0xbc, 0x83, 0xba, - 0x59, 0x20, 0x96, 0x84, 0xc5, 0x64, 0xfa, 0xc7, 0x3b, 0xba, 0xdf, 0x9c, 0x07, 0x70, 0xed, 0x4a, - 0xb1, 0x24, 0xdd, 0xc8, 0xa3, 0x7f, 0xb2, 0xab, 0x9d, 0xa3, 0x5e, 0x5c, 0xf3, 0x6f, 0x5f, 0x7e, - 0x05, 0x00, 0x00, 0xff, 0xff, 0x30, 0x48, 0x25, 0x2d, 0x0d, 0x03, 0x00, 0x00, + // 364 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x53, 0x4d, 0x4b, 0xc3, 0x40, + 0x10, 0x6d, 0x9a, 0x36, 0xd5, 0x49, 0x5b, 0xcb, 0x22, 0x25, 0xd4, 0x0f, 0xe2, 0x82, 0x90, 0x8b, + 0x69, 0xa9, 0x78, 0x15, 0xc1, 0x0f, 0x14, 0x04, 0x61, 0x05, 0x3d, 0xd7, 0x76, 0x2c, 0xc1, 0xda, + 0x8d, 0xbb, 0x69, 0x69, 0xff, 0x90, 0xbf, 0x53, 0xb2, 0xbb, 0xd1, 0x94, 0x34, 0x17, 0x6f, 0x33, + 0xfb, 0xde, 0xbc, 0x99, 0x79, 0xc3, 0x42, 0xf8, 0x19, 0x8d, 0x05, 0xef, 0x4f, 0xf9, 0x99, 0x0e, + 0x64, 0xc2, 0x05, 0xf6, 0x25, 0x8a, 0x65, 0x34, 0xc6, 0x7e, 0x2c, 0x78, 0x62, 0xde, 0x42, 0x15, + 0x93, 0xf6, 0x94, 0xeb, 0x92, 0x50, 0xbd, 0xd2, 0x7b, 0x70, 0x18, 0x8e, 0xb9, 0x98, 0x90, 0x0e, + 0xd8, 0x1f, 0xb8, 0xf6, 0x2c, 0xdf, 0x0a, 0x76, 0x59, 0x1a, 0x92, 0x7d, 0xa8, 0x2f, 0x47, 0xb3, + 0x05, 0x7a, 0x55, 0xdf, 0x0a, 0x9a, 0x4c, 0x27, 0xa4, 0x0b, 0x0e, 0xae, 0xe2, 0x48, 0xac, 0x3d, + 0xdb, 0xb7, 0x02, 0x9b, 0x99, 0x8c, 0x9e, 0x82, 0xcb, 0x70, 0x34, 0x79, 0x8a, 0x93, 0x88, 0xcf, + 0x65, 0x4a, 0x8b, 0x05, 0xbe, 0x47, 0x2b, 0xa5, 0xb8, 0xc3, 0x4c, 0x46, 0x5f, 0x34, 0x8d, 0xe1, + 0xd7, 0x02, 0x65, 0xb2, 0xa5, 0xeb, 0x05, 0x34, 0xb8, 0xd6, 0x50, 0x7d, 0xdd, 0xe1, 0x41, 0xb8, + 0x39, 0x73, 0x98, 0x6b, 0xc3, 0x32, 0x2e, 0xbd, 0x82, 0xa6, 0xd6, 0x95, 0x31, 0x9f, 0x4b, 0x24, + 0x03, 0x68, 0x08, 0xb5, 0x98, 0xf4, 0x2c, 0xdf, 0x0e, 0xdc, 0x61, 0xb7, 0x28, 0x93, 0xc2, 0x2c, + 0xa3, 0xd1, 0x4b, 0x68, 0xbe, 0x8a, 0x28, 0xc1, 0x6c, 0xb4, 0x10, 0x1c, 0x0d, 0xa9, 0xe9, 0xca, + 0x05, 0x0c, 0x8b, 0xee, 0x41, 0xcb, 0xd4, 0xeb, 0x11, 0xe8, 0x09, 0xb4, 0x6e, 0x70, 0x86, 0x7f, + 0x8a, 0x85, 0x65, 0x69, 0x07, 0xda, 0x19, 0xc5, 0x14, 0xb5, 0xc0, 0x7d, 0x8c, 0x64, 0x62, 0x4a, + 0xd2, 0xb5, 0x74, 0xfa, 0xdf, 0xb5, 0x86, 0xdf, 0x55, 0xa8, 0x3f, 0xa7, 0x08, 0xb9, 0x85, 0x5a, + 0xaa, 0x45, 0x0a, 0x86, 0xe6, 0x1a, 0xf6, 0x0e, 0xb7, 0x83, 0x66, 0xba, 0xca, 0xc0, 0x22, 0xd7, + 0x50, 0x4b, 0x9d, 0x26, 0x5b, 0xef, 0x52, 0x2a, 0x93, 0x3f, 0x0e, 0xad, 0x90, 0x3b, 0xa8, 0x2b, + 0xb3, 0x48, 0x81, 0x98, 0xbf, 0x41, 0xef, 0xa8, 0x04, 0xfd, 0xd5, 0x79, 0x00, 0x47, 0x1b, 0x48, + 0x0a, 0xd4, 0x0d, 0xef, 0x7b, 0xc7, 0x65, 0x70, 0x26, 0xf5, 0xe6, 0xa8, 0x1f, 0x72, 0xfe, 0x13, + 0x00, 0x00, 0xff, 0xff, 0xf9, 0x20, 0x54, 0x71, 0x53, 0x03, 0x00, 0x00, } diff --git a/store/service/proto/store.proto b/store/service/proto/store.proto index 5f9ea233..39bd8afc 100644 --- a/store/service/proto/store.proto +++ b/store/service/proto/store.proto @@ -18,8 +18,13 @@ message Record { int64 expiry = 3; } +message ReadOptions { + bool prefix = 1; +} + message ReadRequest { - repeated string keys = 1; + string key = 1; + ReadOptions options = 2; } message ReadResponse { @@ -27,21 +32,18 @@ message ReadResponse { } message WriteRequest { - repeated Record records = 2; + Record record = 1; } message WriteResponse {} message DeleteRequest { - repeated string keys = 1; + string key = 1; } message DeleteResponse {} -message ListRequest { - // optional key - string key = 1; -} +message ListRequest {} message ListResponse { repeated Record records = 1; diff --git a/store/service/service.go b/store/service/service.go index e364f794..8610541b 100644 --- a/store/service/service.go +++ b/store/service/service.go @@ -73,6 +73,7 @@ func (s *serviceStore) List() ([]*store.Record, error) { if err != nil { return records, err } + for _, record := range rsp.Records { records = append(records, &store.Record{ Key: record.Key, @@ -86,15 +87,24 @@ func (s *serviceStore) List() ([]*store.Record, error) { } // Read a record with key -func (s *serviceStore) Read(keys ...string) ([]*store.Record, error) { +func (s *serviceStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) { + var options store.ReadOptions + for _, o := range opts { + o(&options) + } + rsp, err := s.Client.Read(s.Context(), &pb.ReadRequest{ - Keys: keys, + Key: key, + Options: &pb.ReadOptions{ + Prefix: options.Prefix, + }, }, client.WithAddress(s.Nodes...)) if err != nil { return nil, err } records := make([]*store.Record, 0, len(rsp.Records)) + for _, val := range rsp.Records { records = append(records, &store.Record{ Key: val.Key, @@ -102,32 +112,27 @@ func (s *serviceStore) Read(keys ...string) ([]*store.Record, error) { Expiry: time.Duration(val.Expiry) * time.Second, }) } + return records, nil } // Write a record -func (s *serviceStore) Write(recs ...*store.Record) error { - records := make([]*pb.Record, 0, len(recs)) - - for _, record := range recs { - records = append(records, &pb.Record{ +func (s *serviceStore) Write(record *store.Record) error { + _, err := s.Client.Write(s.Context(), &pb.WriteRequest{ + Record: &pb.Record{ Key: record.Key, Value: record.Value, Expiry: int64(record.Expiry.Seconds()), - }) - } - - _, err := s.Client.Write(s.Context(), &pb.WriteRequest{ - Records: records, + }, }, client.WithAddress(s.Nodes...)) return err } // Delete a record with key -func (s *serviceStore) Delete(keys ...string) error { +func (s *serviceStore) Delete(key string) error { _, err := s.Client.Delete(s.Context(), &pb.DeleteRequest{ - Keys: keys, + Key: key, }, client.WithAddress(s.Nodes...)) return err } diff --git a/store/store.go b/store/store.go index d8c44c62..c84e76cb 100644 --- a/store/store.go +++ b/store/store.go @@ -20,11 +20,11 @@ type Store interface { // List all the known records List() ([]*Record, error) // Read records with keys - Read(key ...string) ([]*Record, error) + Read(key string, opts ...ReadOption) ([]*Record, error) // Write records - Write(rec ...*Record) error + Write(*Record) error // Delete records with keys - Delete(key ...string) error + Delete(key string) error } // Record represents a data record @@ -34,6 +34,13 @@ type Record struct { Expiry time.Duration } +type ReadOptions struct { + // Read key as a prefix + Prefix bool +} + +type ReadOption func(o *ReadOptions) + type noop struct{} func (n *noop) Init(...Option) error { @@ -44,14 +51,14 @@ func (n *noop) List() ([]*Record, error) { return nil, nil } -func (n *noop) Read(key ...string) ([]*Record, error) { +func (n *noop) Read(key string, opts ...ReadOption) ([]*Record, error) { return nil, nil } -func (n *noop) Write(rec ...*Record) error { +func (n *noop) Write(rec *Record) error { return nil } -func (n *noop) Delete(key ...string) error { +func (n *noop) Delete(key string) error { return nil } diff --git a/sync/map_test.go b/sync/map_test.go deleted file mode 100644 index fa54cd2c..00000000 --- a/sync/map_test.go +++ /dev/null @@ -1,40 +0,0 @@ -package sync - -import ( - "testing" - "time" - - "github.com/micro/go-micro/store" - store_mock "github.com/micro/go-micro/store/mock" - mem_lock "github.com/micro/go-micro/sync/lock/memory" - "github.com/stretchr/testify/mock" -) - -func TestIterate(t *testing.T) { - recA := &store.Record{ - Key: "A", - Value: nil, - } - recB := &store.Record{ - Key: "B", - Value: nil, - } - s1 := &store_mock.Store{} - s2 := &store_mock.Store{} - s1.On("List").Return([]*store.Record{recA, recB}, nil) - s2.On("List").Return([]*store.Record{recB, recA}, nil) - s1.On("Write", mock.Anything).Return(nil) - s2.On("Write", mock.Anything).Return(nil) - - f := func(key, val interface{}) error { - time.Sleep(1 * time.Millisecond) - return nil - } - l := mem_lock.NewLock() - m1 := NewMap(WithStore(s1), WithLock(l)) - m2 := NewMap(WithStore(s2), WithLock(l)) - go func() { - m2.Iterate(f) - }() - m1.Iterate(f) -}