From 78a79ca9e12926364804e45df905db5f65109eed Mon Sep 17 00:00:00 2001 From: Dominic Wong Date: Thu, 20 Aug 2020 15:08:35 +0100 Subject: [PATCH] Memory and file store list fixes (#1959) * Refactor file and memory stores --- store/file/file.go | 118 ++++++++++++------------------- store/memory/memory.go | 146 +++++++++++++++++---------------------- store/test/store_test.go | 11 ++- 3 files changed, 121 insertions(+), 154 deletions(-) diff --git a/store/file/file.go b/store/file/file.go index 7e2b63e6..d6633295 100644 --- a/store/file/file.go +++ b/store/file/file.go @@ -2,11 +2,10 @@ package file import ( + "bytes" "encoding/json" "os" "path/filepath" - "sort" - "strings" "time" "github.com/micro/go-micro/v3/store" @@ -111,8 +110,9 @@ func (f *fileStore) getDB(database, table string) (*bolt.DB, error) { return bolt.Open(dbPath, 0700, &bolt.Options{Timeout: 5 * time.Second}) } -func (m *fileStore) list(db *bolt.DB, limit, offset uint) []string { - var allItems []string +func (m *fileStore) list(db *bolt.DB, limit, offset uint, prefix, suffix string) []string { + + var keys []string db.View(func(tx *bolt.Tx) error { b := tx.Bucket([]byte(dataBucket)) @@ -120,54 +120,53 @@ func (m *fileStore) list(db *bolt.DB, limit, offset uint) []string { if b == nil { return nil } + c := b.Cursor() + var k, v []byte + var cont func(k []byte) bool - // @todo very inefficient - if err := b.ForEach(func(k, v []byte) error { + if prefix != "" { + // for prefix we can speed up the search, not for suffix though :( + k, v = c.Seek([]byte(prefix)) + cont = func(k []byte) bool { + return bytes.HasPrefix(k, []byte(prefix)) + } + } else { + k, v = c.First() + cont = func(k []byte) bool { + return true + } + } + + for ; k != nil && cont(k); k, v = c.Next() { storedRecord := &record{} if err := json.Unmarshal(v, storedRecord); err != nil { return err } - if !storedRecord.ExpiresAt.IsZero() { if storedRecord.ExpiresAt.Before(time.Now()) { - return nil + continue } } + if suffix != "" && !bytes.HasSuffix(k, []byte(suffix)) { + continue + } + if offset > 0 { + offset-- + continue + } + keys = append(keys, string(k)) + // this check still works if no limit was passed to begin with, you'll just end up with large -ve value + if limit == 1 { + break + } + limit-- - allItems = append(allItems, string(k)) - - return nil - }); err != nil { - return err } - return nil }) - allKeys := make([]string, len(allItems)) - - for i, k := range allItems { - allKeys[i] = k - } - - if limit != 0 || offset != 0 { - sort.Slice(allKeys, func(i, j int) bool { return allKeys[i] < allKeys[j] }) - end := len(allKeys) - if limit > 0 { - calcLimit := int(offset + limit) - if calcLimit < end { - end = calcLimit - } - } - - if int(offset) >= end { - return nil - } - return allKeys[offset:end] - } - - return allKeys + return keys } func (m *fileStore) get(db *bolt.DB, k string) (*store.Record, error) { @@ -283,21 +282,17 @@ func (m *fileStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, var keys []string // Handle Prefix / suffix - // TODO: do range scan here rather than listing all keys if readOpts.Prefix || readOpts.Suffix { - // list the keys - k := m.list(db, readOpts.Limit, readOpts.Offset) - - // check for prefix and suffix - for _, v := range k { - if readOpts.Prefix && !strings.HasPrefix(v, key) { - continue - } - if readOpts.Suffix && !strings.HasSuffix(v, key) { - continue - } - keys = append(keys, v) + prefix := "" + if readOpts.Prefix { + prefix = key } + suffix := "" + if readOpts.Suffix { + suffix = key + } + // list the keys + keys = m.list(db, readOpts.Limit, readOpts.Offset, prefix, suffix) } else { keys = []string{key} } @@ -369,28 +364,7 @@ func (m *fileStore) List(opts ...store.ListOption) ([]string, error) { } defer db.Close() - // TODO apply prefix/suffix in range query - allKeys := m.list(db, listOptions.Limit, listOptions.Offset) - - if len(listOptions.Prefix) > 0 { - var prefixKeys []string - for _, k := range allKeys { - if strings.HasPrefix(k, listOptions.Prefix) { - prefixKeys = append(prefixKeys, k) - } - } - allKeys = prefixKeys - } - - if len(listOptions.Suffix) > 0 { - var suffixKeys []string - for _, k := range allKeys { - if strings.HasSuffix(k, listOptions.Suffix) { - suffixKeys = append(suffixKeys, k) - } - } - allKeys = suffixKeys - } + allKeys := m.list(db, listOptions.Limit, listOptions.Offset, listOptions.Prefix, listOptions.Suffix) return allKeys, nil } diff --git a/store/memory/memory.go b/store/memory/memory.go index 5ef77b60..bc20030b 100644 --- a/store/memory/memory.go +++ b/store/memory/memory.go @@ -5,6 +5,7 @@ import ( "path/filepath" "sort" "strings" + "sync" "time" "github.com/micro/go-micro/v3/store" @@ -19,7 +20,7 @@ func NewStore(opts ...store.Option) store.Store { Database: "micro", Table: "micro", }, - store: cache.New(cache.NoExpiration, 5*time.Minute), + stores: map[string]*cache.Cache{}, // cache.New(cache.NoExpiration, 5*time.Minute), } for _, o := range opts { o(&s.options) @@ -28,9 +29,10 @@ func NewStore(opts ...store.Option) store.Store { } type memoryStore struct { + sync.RWMutex options store.Options - store *cache.Cache + stores map[string]*cache.Cache } type storeRecord struct { @@ -40,10 +42,6 @@ type storeRecord struct { expiresAt time.Time } -func (m *memoryStore) key(prefix, key string) string { - return filepath.Join(prefix, key) -} - func (m *memoryStore) prefix(database, table string) string { if len(database) == 0 { database = m.options.Database @@ -54,11 +52,24 @@ func (m *memoryStore) prefix(database, table string) string { return filepath.Join(database, table) } -func (m *memoryStore) get(prefix, key string) (*store.Record, error) { - key = m.key(prefix, key) +func (m *memoryStore) getStore(prefix string) *cache.Cache { + m.RLock() + store := m.stores[prefix] + m.RUnlock() + if store == nil { + m.Lock() + if m.stores[prefix] == nil { + m.stores[prefix] = cache.New(cache.NoExpiration, 5*time.Minute) + } + store = m.stores[prefix] + m.Unlock() + } + return store +} +func (m *memoryStore) get(prefix, key string) (*store.Record, error) { var storedRecord *storeRecord - r, found := m.store.Get(key) + r, found := m.getStore(prefix).Get(key) if !found { return nil, store.ErrNotFound } @@ -91,8 +102,6 @@ func (m *memoryStore) get(prefix, key string) (*store.Record, error) { } func (m *memoryStore) set(prefix string, r *store.Record) { - key := m.key(prefix, r.Key) - // copy the incoming record and then // convert the expiry in to a hard timestamp i := &storeRecord{} @@ -113,33 +122,54 @@ func (m *memoryStore) set(prefix string, r *store.Record) { i.metadata[k] = v } - m.store.Set(key, i, r.Expiry) + m.getStore(prefix).Set(r.Key, i, r.Expiry) } func (m *memoryStore) delete(prefix, key string) { - key = m.key(prefix, key) - m.store.Delete(key) + m.getStore(prefix).Delete(key) } -func (m *memoryStore) list(prefix string, limit, offset uint) []string { - allItems := m.store.Items() - keys := make([]string, len(allItems)) - i := 0 +func (m *memoryStore) list(prefix string, limit, offset uint, prefixFilter, suffixFilter string) []string { + allItems := m.getStore(prefix).Items() + + allKeys := make([]string, len(allItems)) + + // construct list of keys for this prefix + i := 0 for k := range allItems { - if !strings.HasPrefix(k, prefix+"/") { - continue - } - keys[i] = strings.TrimPrefix(k, prefix+"/") + allKeys[i] = k i++ } - - sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] }) - return applyLimitAndOffset(keys, limit, offset) + keys := make([]string, 0, len(allKeys)) + sort.Slice(allKeys, func(i, j int) bool { return allKeys[i] < allKeys[j] }) + for _, k := range allKeys { + if prefixFilter != "" && !strings.HasPrefix(k, prefixFilter) { + continue + } + if suffixFilter != "" && !strings.HasSuffix(k, suffixFilter) { + continue + } + if offset > 0 { + offset-- + continue + } + keys = append(keys, k) + // this check still works if no limit was passed to begin with, you'll just end up with large -ve value + if limit == 1 { + break + } + limit-- + } + return keys } func (m *memoryStore) Close() error { - m.store.Flush() + m.Lock() + defer m.Unlock() + for _, s := range m.stores { + s.Flush() + } return nil } @@ -165,20 +195,15 @@ func (m *memoryStore) Read(key string, opts ...store.ReadOption) ([]*store.Recor var keys []string // Handle Prefix / suffix if readOpts.Prefix || readOpts.Suffix { - // apply limit / offset once filtering is complete - for _, kk := range m.list(prefix, 0, 0) { - if readOpts.Prefix && !strings.HasPrefix(kk, key) { - continue - } - - if readOpts.Suffix && !strings.HasSuffix(kk, key) { - continue - } - - keys = append(keys, kk) + prefixFilter := "" + if readOpts.Prefix { + prefixFilter = key } - - keys = applyLimitAndOffset(keys, readOpts.Limit, readOpts.Offset) + suffixFilter := "" + if readOpts.Suffix { + suffixFilter = key + } + keys = m.list(prefix, readOpts.Limit, readOpts.Offset, prefixFilter, suffixFilter) } else { keys = []string{key} } @@ -257,47 +282,6 @@ func (m *memoryStore) List(opts ...store.ListOption) ([]string, error) { } prefix := m.prefix(listOptions.Database, listOptions.Table) - keys := m.list(prefix, listOptions.Limit, listOptions.Offset) - - if len(listOptions.Prefix) > 0 { - var prefixKeys []string - for _, k := range keys { - if strings.HasPrefix(k, listOptions.Prefix) { - prefixKeys = append(prefixKeys, k) - } - } - keys = prefixKeys - } - - if len(listOptions.Suffix) > 0 { - var suffixKeys []string - for _, k := range keys { - if strings.HasSuffix(k, listOptions.Suffix) { - suffixKeys = append(suffixKeys, k) - } - } - keys = suffixKeys - } - + keys := m.list(prefix, listOptions.Limit, listOptions.Offset, listOptions.Prefix, listOptions.Suffix) return keys, nil } - -func applyLimitAndOffset(keys []string, limit, offset uint) []string { - if limit == 0 && offset == 0 { - return keys - } - - end := len(keys) - if limit > 0 { - calcLimit := int(offset + limit) - if calcLimit < end { - end = calcLimit - } - } - - if int(offset) >= end { - return nil - } - - return keys[offset:end] -} diff --git a/store/test/store_test.go b/store/test/store_test.go index b440c978..7cea5084 100644 --- a/store/test/store_test.go +++ b/store/test/store_test.go @@ -264,6 +264,15 @@ func listTests(s store.Store, t *testing.T) { t.Fatalf("Expected 2 records, received %d %+v", len(recs), recs) } + for i := 0; i < 10; i++ { + s.Write(&store.Record{Key: fmt.Sprintf("ListOffset%d", i), Value: []byte("bar")}) + } + + recs, err = s.List(store.ListPrefix("ListOffset"), store.ListOffset(6)) + if len(recs) != 4 { + t.Fatalf("Expected 4 records, received %d %+v", len(recs), recs) + } + } func expiryTests(s store.Store, t *testing.T) { @@ -306,7 +315,7 @@ func expiryTests(s store.Store, t *testing.T) { t.Error(err) } if len(results) != 3 { - t.Fatal("Results should have returned 3 records") + t.Fatalf("Results should have returned 3 records, returned %d", len(results)) } time.Sleep(1 * time.Second) results, err = s.Read("a", store.ReadPrefix())