Implement store read cache (#1366)
* Implement store read cache * Added cache tests and fixed a bug in memory store
This commit is contained in:
parent
cbb958def5
commit
4c6f68d537
131
store/cache/cache.go
vendored
Normal file
131
store/cache/cache.go
vendored
Normal file
@ -0,0 +1,131 @@
|
||||
// Package cache implements a faulting style read cache on top of multiple micro stores
|
||||
package cache
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/micro/go-micro/v2/store"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type cache struct {
|
||||
stores []store.Store
|
||||
options store.Options
|
||||
}
|
||||
|
||||
// NewCache returns a new store using the underlying stores, which must be already Init()ialised
|
||||
func NewCache(stores ...store.Store) store.Store {
|
||||
c := &cache{}
|
||||
c.stores = make([]store.Store, len(stores))
|
||||
for i, s := range stores {
|
||||
c.stores[i] = s
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *cache) Init(...store.Option) error {
|
||||
if len(c.stores) < 2 {
|
||||
return errors.New("cache requires at least 2 stores")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *cache) Options() store.Options {
|
||||
return c.options
|
||||
}
|
||||
|
||||
func (c *cache) String() string {
|
||||
stores := make([]string, len(c.stores))
|
||||
for i, s := range c.stores {
|
||||
stores[i] = s.String()
|
||||
}
|
||||
return fmt.Sprintf("cache %v", stores)
|
||||
}
|
||||
|
||||
func (c *cache) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) {
|
||||
readOpts := store.ReadOptions{}
|
||||
for _, o := range opts {
|
||||
o(&readOpts)
|
||||
}
|
||||
|
||||
if readOpts.Prefix || readOpts.Suffix {
|
||||
// List, then try cached gets for each key
|
||||
var lOpts []store.ListOption
|
||||
if readOpts.Prefix {
|
||||
lOpts = append(lOpts, store.ListPrefix(key))
|
||||
}
|
||||
if readOpts.Suffix {
|
||||
lOpts = append(lOpts, store.ListSuffix(key))
|
||||
}
|
||||
if readOpts.Limit > 0 {
|
||||
lOpts = append(lOpts, store.ListLimit(readOpts.Limit))
|
||||
}
|
||||
if readOpts.Offset > 0 {
|
||||
lOpts = append(lOpts, store.ListOffset(readOpts.Offset))
|
||||
}
|
||||
keys, err := c.List(lOpts...)
|
||||
if err != nil {
|
||||
return []*store.Record{}, errors.Wrap(err, "cache.List failed")
|
||||
}
|
||||
recs := make([]*store.Record, len(keys))
|
||||
for i, k := range keys {
|
||||
r, err := c.readOne(k, opts...)
|
||||
if err != nil {
|
||||
return recs, errors.Wrap(err, "cache.readOne failed")
|
||||
}
|
||||
recs[i] = r
|
||||
}
|
||||
return recs, nil
|
||||
}
|
||||
|
||||
// Otherwise just try cached get
|
||||
r, err := c.readOne(key, opts...)
|
||||
if err != nil {
|
||||
return []*store.Record{}, err // preserve store.ErrNotFound
|
||||
}
|
||||
return []*store.Record{r}, nil
|
||||
}
|
||||
|
||||
func (c *cache) readOne(key string, opts ...store.ReadOption) (*store.Record, error) {
|
||||
for i, s := range c.stores {
|
||||
// ReadOne ignores all options
|
||||
r, err := s.Read(key)
|
||||
if err == nil {
|
||||
if len(r) > 1 {
|
||||
return nil, errors.Wrapf(err, "read from L%d cache (%s) returned multiple records", i, c.stores[i].String())
|
||||
}
|
||||
for j := i - 1; j >= 0; j-- {
|
||||
err := c.stores[j].Write(r[0])
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "could not write to L%d cache (%s)", j, c.stores[j].String())
|
||||
}
|
||||
}
|
||||
return r[0], nil
|
||||
}
|
||||
}
|
||||
return nil, store.ErrNotFound
|
||||
}
|
||||
|
||||
func (c *cache) Write(r *store.Record, opts ...store.WriteOption) error {
|
||||
// Write to all layers in reverse
|
||||
for i := len(c.stores) - 1; i >= 0; i-- {
|
||||
if err := c.stores[i].Write(r, opts...); err != nil {
|
||||
return errors.Wrapf(err, "could not write to L%d cache (%s)", i, c.stores[i].String())
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *cache) Delete(key string, opts ...store.DeleteOption) error {
|
||||
for i, s := range c.stores {
|
||||
if err := s.Delete(key, opts...); err != nil {
|
||||
return errors.Wrapf(err, "could not delete from L%d cache (%s)", i, c.stores[i].String())
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *cache) List(opts ...store.ListOption) ([]string, error) {
|
||||
// List only makes sense from the top level
|
||||
return c.stores[len(c.stores)-1].List(opts...)
|
||||
}
|
97
store/cache/cache_test.go
vendored
Normal file
97
store/cache/cache_test.go
vendored
Normal file
@ -0,0 +1,97 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/micro/go-micro/v2/store"
|
||||
"github.com/micro/go-micro/v2/store/memory"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestCache(t *testing.T) {
|
||||
l0, l1, l2 := memory.NewStore(store.Namespace("l0")), memory.NewStore(store.Prefix("l1")), memory.NewStore(store.Suffix("l2"))
|
||||
_, _, _ = l0.Init(), l1.Init(), l2.Init()
|
||||
|
||||
assert := assert.New(t)
|
||||
|
||||
nonCache := NewCache(l0)
|
||||
assert.NotNil(nonCache.Init(), "Expected a cache initialised with just 1 store to fail")
|
||||
|
||||
// Basic functionality
|
||||
cachedStore := NewCache(l0, l1, l2)
|
||||
assert.Nil(cachedStore.Init(), "Init should not error")
|
||||
assert.Equal(cachedStore.Options(), store.Options{}, "Options on store/cache are nonsensical")
|
||||
expectedString := "cache [memory memory memory]"
|
||||
assert.Equal(cachedStore.String(), expectedString, "Cache couldn't describe itself as expected")
|
||||
|
||||
// Read/Write tests
|
||||
_, err := cachedStore.Read("test")
|
||||
assert.Equal(store.ErrNotFound, err, "Read non existant key")
|
||||
r1 := &store.Record{
|
||||
Key: "aaa",
|
||||
Value: []byte("bbb"),
|
||||
}
|
||||
r2 := &store.Record{
|
||||
Key: "aaaa",
|
||||
Value: []byte("bbbb"),
|
||||
}
|
||||
r3 := &store.Record{
|
||||
Key: "aaaaa",
|
||||
Value: []byte("bbbbb"),
|
||||
}
|
||||
// Write 3 records directly to l2
|
||||
l2.Write(r1)
|
||||
l2.Write(r2)
|
||||
l2.Write(r3)
|
||||
// Ensure it's not in l0
|
||||
assert.Equal(store.ErrNotFound, func() error { _, err := l0.Read(r1.Key); return err }())
|
||||
// Read from cache, ensure it's in all 3 stores
|
||||
results, err := cachedStore.Read(r1.Key)
|
||||
assert.Nil(err, "cachedStore.Read() returned error")
|
||||
assert.Len(results, 1, "cachedStore.Read() should only return 1 result")
|
||||
assert.Equal(r1, results[0], "Cached read didn't return the record that was put in")
|
||||
results, err = l0.Read(r1.Key)
|
||||
assert.Nil(err)
|
||||
assert.Equal(r1, results[0], "l0 not coherent")
|
||||
results, err = l1.Read(r1.Key)
|
||||
assert.Nil(err)
|
||||
assert.Equal(r1, results[0], "l1 not coherent")
|
||||
results, err = l2.Read(r1.Key)
|
||||
assert.Nil(err)
|
||||
assert.Equal(r1, results[0], "l2 not coherent")
|
||||
// Multiple read
|
||||
results, err = cachedStore.Read("aa", store.ReadPrefix())
|
||||
assert.Nil(err, "Cachedstore multiple read errored")
|
||||
assert.Len(results, 3, "ReadPrefix should have read all records")
|
||||
// l1 should now have all 3 records
|
||||
l1results, err := l1.Read("aa", store.ReadPrefix())
|
||||
assert.Nil(err, "l1.Read failed")
|
||||
assert.Len(l1results, 3, "l1 didn't contain a full cache")
|
||||
sort.Slice(results, func(i, j int) bool { return results[i].Key < results[j].Key })
|
||||
sort.Slice(l1results, func(i, j int) bool { return l1results[i].Key < l1results[j].Key })
|
||||
assert.Equal(results[0], l1results[0], "l1 cache not coherent")
|
||||
assert.Equal(results[1], l1results[1], "l1 cache not coherent")
|
||||
assert.Equal(results[2], l1results[2], "l1 cache not coherent")
|
||||
|
||||
// Test List and Delete
|
||||
keys, err := cachedStore.List(store.ListPrefix("a"))
|
||||
assert.Nil(err, "List should not error")
|
||||
assert.Len(keys, 3, "List should return 3 keys")
|
||||
for _, k := range keys {
|
||||
err := cachedStore.Delete(k)
|
||||
assert.Nil(err, "Delete should not error")
|
||||
_, err = cachedStore.Read(k)
|
||||
// N.B. - this may not pass on stores that are eventually consistent
|
||||
assert.Equal(store.ErrNotFound, err, "record should be gone")
|
||||
}
|
||||
|
||||
// Test Write
|
||||
err = cachedStore.Write(r1)
|
||||
assert.Nil(err, "Write shouldn't fail")
|
||||
l2result, err := l2.Read(r1.Key)
|
||||
assert.Nil(err)
|
||||
assert.Len(l2result, 1)
|
||||
assert.Equal(r1, l2result[0], "Write didn't make it all the way through to l2")
|
||||
|
||||
}
|
@ -106,7 +106,9 @@ func (m *memoryStore) get(k string) (*store.Record, error) {
|
||||
newRecord.Key = storedRecord.key
|
||||
newRecord.Value = make([]byte, len(storedRecord.value))
|
||||
copy(newRecord.Value, storedRecord.value)
|
||||
newRecord.Expiry = time.Until(storedRecord.expiresAt)
|
||||
if !storedRecord.expiresAt.IsZero() {
|
||||
newRecord.Expiry = time.Until(storedRecord.expiresAt)
|
||||
}
|
||||
|
||||
return newRecord, nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user