diff --git a/store/cache/cache.go b/store/cache/cache.go index 996d96e1..10b5c8a4 100644 --- a/store/cache/cache.go +++ b/store/cache/cache.go @@ -1,152 +1,128 @@ -// Package cache implements a faulting style read cache on top of multiple micro stores package cache import ( - "fmt" - "github.com/micro/go-micro/v2/store" "github.com/micro/go-micro/v2/store/memory" - "github.com/pkg/errors" ) +// cache store is a store with caching to reduce IO where applicable. +// A memory store is used to cache reads from the given backing store. +// Reads are read through, writes are write-through type cache struct { - stores []store.Store + m store.Store // the memory store + b store.Store // the backing store, could be file, cockroach etc + options store.Options } -// Cache is a cpu register style cache for the store. -// It syncs between N stores in a faulting manner. -type Cache interface { - // Implements the store interface - store.Store -} - -// NewCache returns a new store using the underlying stores, which must be already Init()ialised -func NewCache(stores ...store.Store) Cache { - if len(stores) == 0 { - stores = []store.Store{ - memory.NewStore(), - } +// NewStore returns a new cache store +func NewStore(store store.Store, opts ...store.Option) store.Store { + cf := &cache{ + m: memory.NewStore(opts...), + b: store, } + return cf - // TODO: build in an in memory cache - c := &cache{ - stores: stores, - } - - return c } -func (c *cache) Close() error { - return nil -} - -func (c *cache) Init(opts ...store.Option) error { - // pass to the stores - for _, store := range c.stores { - if err := store.Init(opts...); err != nil { - return err - } - } - return nil -} - -func (c *cache) Options() store.Options { - // return from first store - return c.stores[0].Options() -} - -func (c *cache) String() string { - stores := make([]string, len(c.stores)) - for i, s := range c.stores { - stores[i] = s.String() - } - return fmt.Sprintf("cache %v", stores) -} - -func (c *cache) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) { - readOpts := store.ReadOptions{} +func (c *cache) init(opts ...store.Option) error { for _, o := range opts { - o(&readOpts) + o(&c.options) } + return nil +} - if readOpts.Prefix || readOpts.Suffix { - // List, then try cached gets for each key - var lOpts []store.ListOption - if readOpts.Prefix { - lOpts = append(lOpts, store.ListPrefix(key)) - } - if readOpts.Suffix { - lOpts = append(lOpts, store.ListSuffix(key)) - } - if readOpts.Limit > 0 { - lOpts = append(lOpts, store.ListLimit(readOpts.Limit)) - } - if readOpts.Offset > 0 { - lOpts = append(lOpts, store.ListOffset(readOpts.Offset)) - } - keys, err := c.List(lOpts...) - if err != nil { - return []*store.Record{}, errors.Wrap(err, "cache.List failed") - } - recs := make([]*store.Record, len(keys)) - for i, k := range keys { - r, err := c.readOne(k, opts...) - if err != nil { - return recs, errors.Wrap(err, "cache.readOne failed") - } - recs[i] = r - } +// Init initialises the underlying stores +func (c *cache) Init(opts ...store.Option) error { + if err := c.init(opts...); err != nil { + return err + } + if err := c.m.Init(opts...); err != nil { + return err + } + return c.b.Init(opts...) +} + +// Options allows you to view the current options. +func (c *cache) Options() store.Options { + return c.options +} + +// Read takes a single key name and optional ReadOptions. It returns matching []*Record or an error. +func (c *cache) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) { + recs, err := c.m.Read(key, opts...) + if err != nil && err != store.ErrNotFound { + return nil, err + } + if len(recs) > 0 { return recs, nil } - - // Otherwise just try cached get - r, err := c.readOne(key, opts...) - if err != nil { - return []*store.Record{}, err // preserve store.ErrNotFound + recs, err = c.b.Read(key, opts...) + if err == nil { + for _, rec := range recs { + if err := c.m.Write(rec); err != nil { + return nil, err + } + } } - return []*store.Record{r}, nil + return recs, err } -func (c *cache) readOne(key string, opts ...store.ReadOption) (*store.Record, error) { - for i, s := range c.stores { - // ReadOne ignores all options - r, err := s.Read(key) - if err == nil { - if len(r) > 1 { - return nil, errors.Wrapf(err, "read from L%d cache (%s) returned multiple records", i, c.stores[i].String()) +// Write() writes a record to the store, and returns an error if the record was not written. +// If the write succeeds in writing to memory but fails to write through to file, you'll receive an error +// but the value may still reside in memory so appropriate action should be taken. +func (c *cache) Write(r *store.Record, opts ...store.WriteOption) error { + if err := c.m.Write(r, opts...); err != nil { + return err + } + return c.b.Write(r, opts...) +} + +// Delete removes the record with the corresponding key from the store. +// If the delete succeeds in writing to memory but fails to write through to file, you'll receive an error +// but the value may still reside in memory so appropriate action should be taken. +func (c *cache) Delete(key string, opts ...store.DeleteOption) error { + if err := c.m.Delete(key, opts...); err != nil { + return err + } + return c.b.Delete(key, opts...) +} + +// List returns any keys that match, or an empty list with no error if none matched. +func (c *cache) List(opts ...store.ListOption) ([]string, error) { + keys, err := c.m.List(opts...) + if err != nil && err != store.ErrNotFound { + return nil, err + } + if len(keys) > 0 { + return keys, nil + } + keys, err = c.b.List(opts...) + if err == nil { + for _, key := range keys { + recs, err := c.b.Read(key) + if err != nil { + return nil, err } - for j := i - 1; j >= 0; j-- { - err := c.stores[j].Write(r[0]) - if err != nil { - return nil, errors.Wrapf(err, "could not write to L%d cache (%s)", j, c.stores[j].String()) + for _, r := range recs { + if err := c.m.Write(r); err != nil { + return nil, err } } - return r[0], nil + } } - return nil, store.ErrNotFound + return keys, err } -func (c *cache) Write(r *store.Record, opts ...store.WriteOption) error { - // Write to all layers in reverse - for i := len(c.stores) - 1; i >= 0; i-- { - if err := c.stores[i].Write(r, opts...); err != nil { - return errors.Wrapf(err, "could not write to L%d cache (%s)", i, c.stores[i].String()) - } +// Close the store and the underlying store +func (c *cache) Close() error { + if err := c.m.Close(); err != nil { + return err } - return nil + return c.b.Close() } -func (c *cache) Delete(key string, opts ...store.DeleteOption) error { - for i, s := range c.stores { - if err := s.Delete(key, opts...); err != nil { - return errors.Wrapf(err, "could not delete from L%d cache (%s)", i, c.stores[i].String()) - } - } - return nil -} - -func (c *cache) List(opts ...store.ListOption) ([]string, error) { - // List only makes sense from the top level - return c.stores[len(c.stores)-1].List(opts...) +// String returns the name of the implementation. +func (c *cache) String() string { + return "cache" } diff --git a/store/cache/cache_test.go b/store/cache/cache_test.go index 132e9acf..0da6808b 100644 --- a/store/cache/cache_test.go +++ b/store/cache/cache_test.go @@ -1,99 +1,102 @@ package cache import ( - "sort" + "os" + "path/filepath" "testing" "github.com/micro/go-micro/v2/store" - "github.com/micro/go-micro/v2/store/memory" + "github.com/micro/go-micro/v2/store/file" "github.com/stretchr/testify/assert" ) -func TestCache(t *testing.T) { - l0, l1, l2 := memory.NewStore(store.Database("l0")), memory.NewStore(store.Table("l1")), memory.NewStore() - _, _, _ = l0.Init(), l1.Init(), l2.Init() +func cleanup(db string, s store.Store) { + s.Close() + dir := filepath.Join(file.DefaultDir, db+"/") + os.RemoveAll(dir) +} - assert := assert.New(t) +func TestRead(t *testing.T) { + cf := NewStore(file.NewStore()) + cf.Init() + cfInt := cf.(*cache) + defer cleanup(file.DefaultDatabase, cf) - nonCache := NewCache(nil) - assert.Equal(len(nonCache.(*cache).stores), 1, "Expected a cache initialised with just 1 store to fail") - - // Basic functionality - cachedStore := NewCache(l0, l1, l2) - assert.Equal(cachedStore.Options(), l0.Options(), "Options on store/cache are nonsensical") - expectedString := "cache [memory memory memory]" - assert.Equal(cachedStore.String(), expectedString, "Cache couldn't describe itself as expected") - - // Read/Write tests - _, err := cachedStore.Read("test") - assert.Equal(store.ErrNotFound, err, "Read non existant key") - r1 := &store.Record{ - Key: "aaa", - Value: []byte("bbb"), - Metadata: map[string]interface{}{}, - } - r2 := &store.Record{ - Key: "aaaa", - Value: []byte("bbbb"), - Metadata: map[string]interface{}{}, - } - r3 := &store.Record{ - Key: "aaaaa", - Value: []byte("bbbbb"), - Metadata: map[string]interface{}{}, - } - // Write 3 records directly to l2 - l2.Write(r1) - l2.Write(r2) - l2.Write(r3) - // Ensure it's not in l0 - assert.Equal(store.ErrNotFound, func() error { _, err := l0.Read(r1.Key); return err }()) - // Read from cache, ensure it's in all 3 stores - results, err := cachedStore.Read(r1.Key) - assert.Nil(err, "cachedStore.Read() returned error") - assert.Len(results, 1, "cachedStore.Read() should only return 1 result") - assert.Equal(r1, results[0], "Cached read didn't return the record that was put in") - results, err = l0.Read(r1.Key) - assert.Nil(err) - assert.Equal(r1, results[0], "l0 not coherent") - results, err = l1.Read(r1.Key) - assert.Nil(err) - assert.Equal(r1, results[0], "l1 not coherent") - results, err = l2.Read(r1.Key) - assert.Nil(err) - assert.Equal(r1, results[0], "l2 not coherent") - // Multiple read - results, err = cachedStore.Read("aa", store.ReadPrefix()) - assert.Nil(err, "Cachedstore multiple read errored") - assert.Len(results, 3, "ReadPrefix should have read all records") - // l1 should now have all 3 records - l1results, err := l1.Read("aa", store.ReadPrefix()) - assert.Nil(err, "l1.Read failed") - assert.Len(l1results, 3, "l1 didn't contain a full cache") - sort.Slice(results, func(i, j int) bool { return results[i].Key < results[j].Key }) - sort.Slice(l1results, func(i, j int) bool { return l1results[i].Key < l1results[j].Key }) - assert.Equal(results[0], l1results[0], "l1 cache not coherent") - assert.Equal(results[1], l1results[1], "l1 cache not coherent") - assert.Equal(results[2], l1results[2], "l1 cache not coherent") - - // Test List and Delete - keys, err := cachedStore.List(store.ListPrefix("a")) - assert.Nil(err, "List should not error") - assert.Len(keys, 3, "List should return 3 keys") - for _, k := range keys { - err := cachedStore.Delete(k) - assert.Nil(err, "Delete should not error") - _, err = cachedStore.Read(k) - // N.B. - this may not pass on stores that are eventually consistent - assert.Equal(store.ErrNotFound, err, "record should be gone") - } - - // Test Write - err = cachedStore.Write(r1) - assert.Nil(err, "Write shouldn't fail") - l2result, err := l2.Read(r1.Key) - assert.Nil(err) - assert.Len(l2result, 1) - assert.Equal(r1, l2result[0], "Write didn't make it all the way through to l2") + _, err := cf.Read("key1") + assert.Error(t, err, "Unexpected record") + cfInt.b.Write(&store.Record{ + Key: "key1", + Value: []byte("foo"), + }) + recs, err := cf.Read("key1") + assert.NoError(t, err) + assert.Len(t, recs, 1, "Expected a record to be pulled from file store") + recs, err = cfInt.m.Read("key1") + assert.NoError(t, err) + assert.Len(t, recs, 1, "Expected a memory store to be populatedfrom file store") + +} + +func TestWrite(t *testing.T) { + cf := NewStore(file.NewStore()) + cf.Init() + cfInt := cf.(*cache) + defer cleanup(file.DefaultDatabase, cf) + + cf.Write(&store.Record{ + Key: "key1", + Value: []byte("foo"), + }) + recs, _ := cfInt.m.Read("key1") + assert.Len(t, recs, 1, "Expected a record in the memory store") + recs, _ = cfInt.b.Read("key1") + assert.Len(t, recs, 1, "Expected a record in the file store") + +} + +func TestDelete(t *testing.T) { + cf := NewStore(file.NewStore()) + cf.Init() + cfInt := cf.(*cache) + defer cleanup(file.DefaultDatabase, cf) + + cf.Write(&store.Record{ + Key: "key1", + Value: []byte("foo"), + }) + recs, _ := cfInt.m.Read("key1") + assert.Len(t, recs, 1, "Expected a record in the memory store") + recs, _ = cfInt.b.Read("key1") + assert.Len(t, recs, 1, "Expected a record in the file store") + cf.Delete("key1") + + _, err := cfInt.m.Read("key1") + assert.Error(t, err, "Expected no records in memory store") + _, err = cfInt.b.Read("key1") + assert.Error(t, err, "Expected no records in file store") + +} + +func TestList(t *testing.T) { + cf := NewStore(file.NewStore()) + cf.Init() + cfInt := cf.(*cache) + defer cleanup(file.DefaultDatabase, cf) + + keys, err := cf.List() + assert.NoError(t, err) + assert.Len(t, keys, 0) + cfInt.b.Write(&store.Record{ + Key: "key1", + Value: []byte("foo"), + }) + + cfInt.b.Write(&store.Record{ + Key: "key2", + Value: []byte("foo"), + }) + keys, err = cf.List() + assert.NoError(t, err) + assert.Len(t, keys, 2) }