diff --git a/store/cache/cache.go b/store/cache/cache.go new file mode 100644 index 00000000..a52c2bb3 --- /dev/null +++ b/store/cache/cache.go @@ -0,0 +1,131 @@ +// Package cache implements a faulting style read cache on top of multiple micro stores +package cache + +import ( + "fmt" + + "github.com/micro/go-micro/v2/store" + "github.com/pkg/errors" +) + +type cache struct { + stores []store.Store + options store.Options +} + +// NewCache returns a new store using the underlying stores, which must be already Init()ialised +func NewCache(stores ...store.Store) store.Store { + c := &cache{} + c.stores = make([]store.Store, len(stores)) + for i, s := range stores { + c.stores[i] = s + } + return c +} + +func (c *cache) Init(...store.Option) error { + if len(c.stores) < 2 { + return errors.New("cache requires at least 2 stores") + } + return nil +} + +func (c *cache) Options() store.Options { + return c.options +} + +func (c *cache) String() string { + stores := make([]string, len(c.stores)) + for i, s := range c.stores { + stores[i] = s.String() + } + return fmt.Sprintf("cache %v", stores) +} + +func (c *cache) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) { + readOpts := store.ReadOptions{} + for _, o := range opts { + o(&readOpts) + } + + if readOpts.Prefix || readOpts.Suffix { + // List, then try cached gets for each key + var lOpts []store.ListOption + if readOpts.Prefix { + lOpts = append(lOpts, store.ListPrefix(key)) + } + if readOpts.Suffix { + lOpts = append(lOpts, store.ListSuffix(key)) + } + if readOpts.Limit > 0 { + lOpts = append(lOpts, store.ListLimit(readOpts.Limit)) + } + if readOpts.Offset > 0 { + lOpts = append(lOpts, store.ListOffset(readOpts.Offset)) + } + keys, err := c.List(lOpts...) + if err != nil { + return []*store.Record{}, errors.Wrap(err, "cache.List failed") + } + recs := make([]*store.Record, len(keys)) + for i, k := range keys { + r, err := c.readOne(k, opts...) + if err != nil { + return recs, errors.Wrap(err, "cache.readOne failed") + } + recs[i] = r + } + return recs, nil + } + + // Otherwise just try cached get + r, err := c.readOne(key, opts...) + if err != nil { + return []*store.Record{}, err // preserve store.ErrNotFound + } + return []*store.Record{r}, nil +} + +func (c *cache) readOne(key string, opts ...store.ReadOption) (*store.Record, error) { + for i, s := range c.stores { + // ReadOne ignores all options + r, err := s.Read(key) + if err == nil { + if len(r) > 1 { + return nil, errors.Wrapf(err, "read from L%d cache (%s) returned multiple records", i, c.stores[i].String()) + } + for j := i - 1; j >= 0; j-- { + err := c.stores[j].Write(r[0]) + if err != nil { + return nil, errors.Wrapf(err, "could not write to L%d cache (%s)", j, c.stores[j].String()) + } + } + return r[0], nil + } + } + return nil, store.ErrNotFound +} + +func (c *cache) Write(r *store.Record, opts ...store.WriteOption) error { + // Write to all layers in reverse + for i := len(c.stores) - 1; i >= 0; i-- { + if err := c.stores[i].Write(r, opts...); err != nil { + return errors.Wrapf(err, "could not write to L%d cache (%s)", i, c.stores[i].String()) + } + } + return nil +} + +func (c *cache) Delete(key string, opts ...store.DeleteOption) error { + for i, s := range c.stores { + if err := s.Delete(key, opts...); err != nil { + return errors.Wrapf(err, "could not delete from L%d cache (%s)", i, c.stores[i].String()) + } + } + return nil +} + +func (c *cache) List(opts ...store.ListOption) ([]string, error) { + // List only makes sense from the top level + return c.stores[len(c.stores)-1].List(opts...) +} diff --git a/store/cache/cache_test.go b/store/cache/cache_test.go new file mode 100644 index 00000000..2af10698 --- /dev/null +++ b/store/cache/cache_test.go @@ -0,0 +1,97 @@ +package cache + +import ( + "sort" + "testing" + + "github.com/micro/go-micro/v2/store" + "github.com/micro/go-micro/v2/store/memory" + "github.com/stretchr/testify/assert" +) + +func TestCache(t *testing.T) { + l0, l1, l2 := memory.NewStore(store.Namespace("l0")), memory.NewStore(store.Prefix("l1")), memory.NewStore(store.Suffix("l2")) + _, _, _ = l0.Init(), l1.Init(), l2.Init() + + assert := assert.New(t) + + nonCache := NewCache(l0) + assert.NotNil(nonCache.Init(), "Expected a cache initialised with just 1 store to fail") + + // Basic functionality + cachedStore := NewCache(l0, l1, l2) + assert.Nil(cachedStore.Init(), "Init should not error") + assert.Equal(cachedStore.Options(), store.Options{}, "Options on store/cache are nonsensical") + expectedString := "cache [memory memory memory]" + assert.Equal(cachedStore.String(), expectedString, "Cache couldn't describe itself as expected") + + // Read/Write tests + _, err := cachedStore.Read("test") + assert.Equal(store.ErrNotFound, err, "Read non existant key") + r1 := &store.Record{ + Key: "aaa", + Value: []byte("bbb"), + } + r2 := &store.Record{ + Key: "aaaa", + Value: []byte("bbbb"), + } + r3 := &store.Record{ + Key: "aaaaa", + Value: []byte("bbbbb"), + } + // Write 3 records directly to l2 + l2.Write(r1) + l2.Write(r2) + l2.Write(r3) + // Ensure it's not in l0 + assert.Equal(store.ErrNotFound, func() error { _, err := l0.Read(r1.Key); return err }()) + // Read from cache, ensure it's in all 3 stores + results, err := cachedStore.Read(r1.Key) + assert.Nil(err, "cachedStore.Read() returned error") + assert.Len(results, 1, "cachedStore.Read() should only return 1 result") + assert.Equal(r1, results[0], "Cached read didn't return the record that was put in") + results, err = l0.Read(r1.Key) + assert.Nil(err) + assert.Equal(r1, results[0], "l0 not coherent") + results, err = l1.Read(r1.Key) + assert.Nil(err) + assert.Equal(r1, results[0], "l1 not coherent") + results, err = l2.Read(r1.Key) + assert.Nil(err) + assert.Equal(r1, results[0], "l2 not coherent") + // Multiple read + results, err = cachedStore.Read("aa", store.ReadPrefix()) + assert.Nil(err, "Cachedstore multiple read errored") + assert.Len(results, 3, "ReadPrefix should have read all records") + // l1 should now have all 3 records + l1results, err := l1.Read("aa", store.ReadPrefix()) + assert.Nil(err, "l1.Read failed") + assert.Len(l1results, 3, "l1 didn't contain a full cache") + sort.Slice(results, func(i, j int) bool { return results[i].Key < results[j].Key }) + sort.Slice(l1results, func(i, j int) bool { return l1results[i].Key < l1results[j].Key }) + assert.Equal(results[0], l1results[0], "l1 cache not coherent") + assert.Equal(results[1], l1results[1], "l1 cache not coherent") + assert.Equal(results[2], l1results[2], "l1 cache not coherent") + + // Test List and Delete + keys, err := cachedStore.List(store.ListPrefix("a")) + assert.Nil(err, "List should not error") + assert.Len(keys, 3, "List should return 3 keys") + for _, k := range keys { + err := cachedStore.Delete(k) + assert.Nil(err, "Delete should not error") + _, err = cachedStore.Read(k) + // N.B. - this may not pass on stores that are eventually consistent + assert.Equal(store.ErrNotFound, err, "record should be gone") + } + + // Test Write + err = cachedStore.Write(r1) + assert.Nil(err, "Write shouldn't fail") + l2result, err := l2.Read(r1.Key) + assert.Nil(err) + assert.Len(l2result, 1) + assert.Equal(r1, l2result[0], "Write didn't make it all the way through to l2") + +} diff --git a/store/memory/memory.go b/store/memory/memory.go index d9bdf1b3..07e73ddc 100644 --- a/store/memory/memory.go +++ b/store/memory/memory.go @@ -106,7 +106,9 @@ func (m *memoryStore) get(k string) (*store.Record, error) { newRecord.Key = storedRecord.key newRecord.Value = make([]byte, len(storedRecord.value)) copy(newRecord.Value, storedRecord.value) - newRecord.Expiry = time.Until(storedRecord.expiresAt) + if !storedRecord.expiresAt.IsZero() { + newRecord.Expiry = time.Until(storedRecord.expiresAt) + } return newRecord, nil }