parent
104b7d8f8d
commit
5967a68e78
190
store/cache/cache.go
vendored
190
store/cache/cache.go
vendored
@ -1,152 +1,128 @@
|
|||||||
// Package cache implements a faulting style read cache on top of multiple micro stores
|
|
||||||
package cache
|
package cache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"github.com/micro/go-micro/v2/store"
|
"github.com/micro/go-micro/v2/store"
|
||||||
"github.com/micro/go-micro/v2/store/memory"
|
"github.com/micro/go-micro/v2/store/memory"
|
||||||
"github.com/pkg/errors"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// cache store is a store with caching to reduce IO where applicable.
|
||||||
|
// A memory store is used to cache reads from the given backing store.
|
||||||
|
// Reads are read through, writes are write-through
|
||||||
type cache struct {
|
type cache struct {
|
||||||
stores []store.Store
|
m store.Store // the memory store
|
||||||
|
b store.Store // the backing store, could be file, cockroach etc
|
||||||
|
options store.Options
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cache is a cpu register style cache for the store.
|
// NewStore returns a new cache store
|
||||||
// It syncs between N stores in a faulting manner.
|
func NewStore(store store.Store, opts ...store.Option) store.Store {
|
||||||
type Cache interface {
|
cf := &cache{
|
||||||
// Implements the store interface
|
m: memory.NewStore(opts...),
|
||||||
store.Store
|
b: store,
|
||||||
|
}
|
||||||
|
return cf
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewCache returns a new store using the underlying stores, which must be already Init()ialised
|
func (c *cache) init(opts ...store.Option) error {
|
||||||
func NewCache(stores ...store.Store) Cache {
|
for _, o := range opts {
|
||||||
if len(stores) == 0 {
|
o(&c.options)
|
||||||
stores = []store.Store{
|
|
||||||
memory.NewStore(),
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: build in an in memory cache
|
|
||||||
c := &cache{
|
|
||||||
stores: stores,
|
|
||||||
}
|
|
||||||
|
|
||||||
return c
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *cache) Close() error {
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Init initialises the underlying stores
|
||||||
func (c *cache) Init(opts ...store.Option) error {
|
func (c *cache) Init(opts ...store.Option) error {
|
||||||
// pass to the stores
|
if err := c.init(opts...); err != nil {
|
||||||
for _, store := range c.stores {
|
|
||||||
if err := store.Init(opts...); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if err := c.m.Init(opts...); err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return c.b.Init(opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Options allows you to view the current options.
|
||||||
func (c *cache) Options() store.Options {
|
func (c *cache) Options() store.Options {
|
||||||
// return from first store
|
return c.options
|
||||||
return c.stores[0].Options()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *cache) String() string {
|
|
||||||
stores := make([]string, len(c.stores))
|
|
||||||
for i, s := range c.stores {
|
|
||||||
stores[i] = s.String()
|
|
||||||
}
|
|
||||||
return fmt.Sprintf("cache %v", stores)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Read takes a single key name and optional ReadOptions. It returns matching []*Record or an error.
|
||||||
func (c *cache) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) {
|
func (c *cache) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) {
|
||||||
readOpts := store.ReadOptions{}
|
recs, err := c.m.Read(key, opts...)
|
||||||
for _, o := range opts {
|
if err != nil && err != store.ErrNotFound {
|
||||||
o(&readOpts)
|
return nil, err
|
||||||
}
|
|
||||||
|
|
||||||
if readOpts.Prefix || readOpts.Suffix {
|
|
||||||
// List, then try cached gets for each key
|
|
||||||
var lOpts []store.ListOption
|
|
||||||
if readOpts.Prefix {
|
|
||||||
lOpts = append(lOpts, store.ListPrefix(key))
|
|
||||||
}
|
|
||||||
if readOpts.Suffix {
|
|
||||||
lOpts = append(lOpts, store.ListSuffix(key))
|
|
||||||
}
|
|
||||||
if readOpts.Limit > 0 {
|
|
||||||
lOpts = append(lOpts, store.ListLimit(readOpts.Limit))
|
|
||||||
}
|
|
||||||
if readOpts.Offset > 0 {
|
|
||||||
lOpts = append(lOpts, store.ListOffset(readOpts.Offset))
|
|
||||||
}
|
|
||||||
keys, err := c.List(lOpts...)
|
|
||||||
if err != nil {
|
|
||||||
return []*store.Record{}, errors.Wrap(err, "cache.List failed")
|
|
||||||
}
|
|
||||||
recs := make([]*store.Record, len(keys))
|
|
||||||
for i, k := range keys {
|
|
||||||
r, err := c.readOne(k, opts...)
|
|
||||||
if err != nil {
|
|
||||||
return recs, errors.Wrap(err, "cache.readOne failed")
|
|
||||||
}
|
|
||||||
recs[i] = r
|
|
||||||
}
|
}
|
||||||
|
if len(recs) > 0 {
|
||||||
return recs, nil
|
return recs, nil
|
||||||
}
|
}
|
||||||
|
recs, err = c.b.Read(key, opts...)
|
||||||
// Otherwise just try cached get
|
|
||||||
r, err := c.readOne(key, opts...)
|
|
||||||
if err != nil {
|
|
||||||
return []*store.Record{}, err // preserve store.ErrNotFound
|
|
||||||
}
|
|
||||||
return []*store.Record{r}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *cache) readOne(key string, opts ...store.ReadOption) (*store.Record, error) {
|
|
||||||
for i, s := range c.stores {
|
|
||||||
// ReadOne ignores all options
|
|
||||||
r, err := s.Read(key)
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
if len(r) > 1 {
|
for _, rec := range recs {
|
||||||
return nil, errors.Wrapf(err, "read from L%d cache (%s) returned multiple records", i, c.stores[i].String())
|
if err := c.m.Write(rec); err != nil {
|
||||||
}
|
return nil, err
|
||||||
for j := i - 1; j >= 0; j-- {
|
|
||||||
err := c.stores[j].Write(r[0])
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrapf(err, "could not write to L%d cache (%s)", j, c.stores[j].String())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return r[0], nil
|
|
||||||
}
|
}
|
||||||
}
|
return recs, err
|
||||||
return nil, store.ErrNotFound
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Write() writes a record to the store, and returns an error if the record was not written.
|
||||||
|
// If the write succeeds in writing to memory but fails to write through to file, you'll receive an error
|
||||||
|
// but the value may still reside in memory so appropriate action should be taken.
|
||||||
func (c *cache) Write(r *store.Record, opts ...store.WriteOption) error {
|
func (c *cache) Write(r *store.Record, opts ...store.WriteOption) error {
|
||||||
// Write to all layers in reverse
|
if err := c.m.Write(r, opts...); err != nil {
|
||||||
for i := len(c.stores) - 1; i >= 0; i-- {
|
return err
|
||||||
if err := c.stores[i].Write(r, opts...); err != nil {
|
|
||||||
return errors.Wrapf(err, "could not write to L%d cache (%s)", i, c.stores[i].String())
|
|
||||||
}
|
}
|
||||||
}
|
return c.b.Write(r, opts...)
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Delete removes the record with the corresponding key from the store.
|
||||||
|
// If the delete succeeds in writing to memory but fails to write through to file, you'll receive an error
|
||||||
|
// but the value may still reside in memory so appropriate action should be taken.
|
||||||
func (c *cache) Delete(key string, opts ...store.DeleteOption) error {
|
func (c *cache) Delete(key string, opts ...store.DeleteOption) error {
|
||||||
for i, s := range c.stores {
|
if err := c.m.Delete(key, opts...); err != nil {
|
||||||
if err := s.Delete(key, opts...); err != nil {
|
return err
|
||||||
return errors.Wrapf(err, "could not delete from L%d cache (%s)", i, c.stores[i].String())
|
|
||||||
}
|
}
|
||||||
}
|
return c.b.Delete(key, opts...)
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// List returns any keys that match, or an empty list with no error if none matched.
|
||||||
func (c *cache) List(opts ...store.ListOption) ([]string, error) {
|
func (c *cache) List(opts ...store.ListOption) ([]string, error) {
|
||||||
// List only makes sense from the top level
|
keys, err := c.m.List(opts...)
|
||||||
return c.stores[len(c.stores)-1].List(opts...)
|
if err != nil && err != store.ErrNotFound {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if len(keys) > 0 {
|
||||||
|
return keys, nil
|
||||||
|
}
|
||||||
|
keys, err = c.b.List(opts...)
|
||||||
|
if err == nil {
|
||||||
|
for _, key := range keys {
|
||||||
|
recs, err := c.b.Read(key)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
for _, r := range recs {
|
||||||
|
if err := c.m.Write(r); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return keys, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close the store and the underlying store
|
||||||
|
func (c *cache) Close() error {
|
||||||
|
if err := c.m.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return c.b.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// String returns the name of the implementation.
|
||||||
|
func (c *cache) String() string {
|
||||||
|
return "cache"
|
||||||
}
|
}
|
||||||
|
175
store/cache/cache_test.go
vendored
175
store/cache/cache_test.go
vendored
@ -1,99 +1,102 @@
|
|||||||
package cache
|
package cache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sort"
|
"os"
|
||||||
|
"path/filepath"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/micro/go-micro/v2/store"
|
"github.com/micro/go-micro/v2/store"
|
||||||
"github.com/micro/go-micro/v2/store/memory"
|
"github.com/micro/go-micro/v2/store/file"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestCache(t *testing.T) {
|
func cleanup(db string, s store.Store) {
|
||||||
l0, l1, l2 := memory.NewStore(store.Database("l0")), memory.NewStore(store.Table("l1")), memory.NewStore()
|
s.Close()
|
||||||
_, _, _ = l0.Init(), l1.Init(), l2.Init()
|
dir := filepath.Join(file.DefaultDir, db+"/")
|
||||||
|
os.RemoveAll(dir)
|
||||||
|
}
|
||||||
|
|
||||||
assert := assert.New(t)
|
func TestRead(t *testing.T) {
|
||||||
|
cf := NewStore(file.NewStore())
|
||||||
|
cf.Init()
|
||||||
|
cfInt := cf.(*cache)
|
||||||
|
defer cleanup(file.DefaultDatabase, cf)
|
||||||
|
|
||||||
nonCache := NewCache(nil)
|
_, err := cf.Read("key1")
|
||||||
assert.Equal(len(nonCache.(*cache).stores), 1, "Expected a cache initialised with just 1 store to fail")
|
assert.Error(t, err, "Unexpected record")
|
||||||
|
cfInt.b.Write(&store.Record{
|
||||||
// Basic functionality
|
Key: "key1",
|
||||||
cachedStore := NewCache(l0, l1, l2)
|
Value: []byte("foo"),
|
||||||
assert.Equal(cachedStore.Options(), l0.Options(), "Options on store/cache are nonsensical")
|
})
|
||||||
expectedString := "cache [memory memory memory]"
|
recs, err := cf.Read("key1")
|
||||||
assert.Equal(cachedStore.String(), expectedString, "Cache couldn't describe itself as expected")
|
assert.NoError(t, err)
|
||||||
|
assert.Len(t, recs, 1, "Expected a record to be pulled from file store")
|
||||||
// Read/Write tests
|
recs, err = cfInt.m.Read("key1")
|
||||||
_, err := cachedStore.Read("test")
|
assert.NoError(t, err)
|
||||||
assert.Equal(store.ErrNotFound, err, "Read non existant key")
|
assert.Len(t, recs, 1, "Expected a memory store to be populatedfrom file store")
|
||||||
r1 := &store.Record{
|
|
||||||
Key: "aaa",
|
}
|
||||||
Value: []byte("bbb"),
|
|
||||||
Metadata: map[string]interface{}{},
|
func TestWrite(t *testing.T) {
|
||||||
}
|
cf := NewStore(file.NewStore())
|
||||||
r2 := &store.Record{
|
cf.Init()
|
||||||
Key: "aaaa",
|
cfInt := cf.(*cache)
|
||||||
Value: []byte("bbbb"),
|
defer cleanup(file.DefaultDatabase, cf)
|
||||||
Metadata: map[string]interface{}{},
|
|
||||||
}
|
cf.Write(&store.Record{
|
||||||
r3 := &store.Record{
|
Key: "key1",
|
||||||
Key: "aaaaa",
|
Value: []byte("foo"),
|
||||||
Value: []byte("bbbbb"),
|
})
|
||||||
Metadata: map[string]interface{}{},
|
recs, _ := cfInt.m.Read("key1")
|
||||||
}
|
assert.Len(t, recs, 1, "Expected a record in the memory store")
|
||||||
// Write 3 records directly to l2
|
recs, _ = cfInt.b.Read("key1")
|
||||||
l2.Write(r1)
|
assert.Len(t, recs, 1, "Expected a record in the file store")
|
||||||
l2.Write(r2)
|
|
||||||
l2.Write(r3)
|
}
|
||||||
// Ensure it's not in l0
|
|
||||||
assert.Equal(store.ErrNotFound, func() error { _, err := l0.Read(r1.Key); return err }())
|
func TestDelete(t *testing.T) {
|
||||||
// Read from cache, ensure it's in all 3 stores
|
cf := NewStore(file.NewStore())
|
||||||
results, err := cachedStore.Read(r1.Key)
|
cf.Init()
|
||||||
assert.Nil(err, "cachedStore.Read() returned error")
|
cfInt := cf.(*cache)
|
||||||
assert.Len(results, 1, "cachedStore.Read() should only return 1 result")
|
defer cleanup(file.DefaultDatabase, cf)
|
||||||
assert.Equal(r1, results[0], "Cached read didn't return the record that was put in")
|
|
||||||
results, err = l0.Read(r1.Key)
|
cf.Write(&store.Record{
|
||||||
assert.Nil(err)
|
Key: "key1",
|
||||||
assert.Equal(r1, results[0], "l0 not coherent")
|
Value: []byte("foo"),
|
||||||
results, err = l1.Read(r1.Key)
|
})
|
||||||
assert.Nil(err)
|
recs, _ := cfInt.m.Read("key1")
|
||||||
assert.Equal(r1, results[0], "l1 not coherent")
|
assert.Len(t, recs, 1, "Expected a record in the memory store")
|
||||||
results, err = l2.Read(r1.Key)
|
recs, _ = cfInt.b.Read("key1")
|
||||||
assert.Nil(err)
|
assert.Len(t, recs, 1, "Expected a record in the file store")
|
||||||
assert.Equal(r1, results[0], "l2 not coherent")
|
cf.Delete("key1")
|
||||||
// Multiple read
|
|
||||||
results, err = cachedStore.Read("aa", store.ReadPrefix())
|
_, err := cfInt.m.Read("key1")
|
||||||
assert.Nil(err, "Cachedstore multiple read errored")
|
assert.Error(t, err, "Expected no records in memory store")
|
||||||
assert.Len(results, 3, "ReadPrefix should have read all records")
|
_, err = cfInt.b.Read("key1")
|
||||||
// l1 should now have all 3 records
|
assert.Error(t, err, "Expected no records in file store")
|
||||||
l1results, err := l1.Read("aa", store.ReadPrefix())
|
|
||||||
assert.Nil(err, "l1.Read failed")
|
}
|
||||||
assert.Len(l1results, 3, "l1 didn't contain a full cache")
|
|
||||||
sort.Slice(results, func(i, j int) bool { return results[i].Key < results[j].Key })
|
func TestList(t *testing.T) {
|
||||||
sort.Slice(l1results, func(i, j int) bool { return l1results[i].Key < l1results[j].Key })
|
cf := NewStore(file.NewStore())
|
||||||
assert.Equal(results[0], l1results[0], "l1 cache not coherent")
|
cf.Init()
|
||||||
assert.Equal(results[1], l1results[1], "l1 cache not coherent")
|
cfInt := cf.(*cache)
|
||||||
assert.Equal(results[2], l1results[2], "l1 cache not coherent")
|
defer cleanup(file.DefaultDatabase, cf)
|
||||||
|
|
||||||
// Test List and Delete
|
keys, err := cf.List()
|
||||||
keys, err := cachedStore.List(store.ListPrefix("a"))
|
assert.NoError(t, err)
|
||||||
assert.Nil(err, "List should not error")
|
assert.Len(t, keys, 0)
|
||||||
assert.Len(keys, 3, "List should return 3 keys")
|
cfInt.b.Write(&store.Record{
|
||||||
for _, k := range keys {
|
Key: "key1",
|
||||||
err := cachedStore.Delete(k)
|
Value: []byte("foo"),
|
||||||
assert.Nil(err, "Delete should not error")
|
})
|
||||||
_, err = cachedStore.Read(k)
|
|
||||||
// N.B. - this may not pass on stores that are eventually consistent
|
cfInt.b.Write(&store.Record{
|
||||||
assert.Equal(store.ErrNotFound, err, "record should be gone")
|
Key: "key2",
|
||||||
}
|
Value: []byte("foo"),
|
||||||
|
})
|
||||||
// Test Write
|
keys, err = cf.List()
|
||||||
err = cachedStore.Write(r1)
|
assert.NoError(t, err)
|
||||||
assert.Nil(err, "Write shouldn't fail")
|
assert.Len(t, keys, 2)
|
||||||
l2result, err := l2.Read(r1.Key)
|
|
||||||
assert.Nil(err)
|
|
||||||
assert.Len(l2result, 1)
|
|
||||||
assert.Equal(r1, l2result[0], "Write didn't make it all the way through to l2")
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user