diff --git a/go.mod b/go.mod index c77b4dd2..0dd4fc51 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect github.com/dgrijalva/jwt-go v3.2.0+incompatible + github.com/ef-ds/deque v1.0.4-0.20190904040645-54cb57c252a1 github.com/forestgiant/sliceutil v0.0.0-20160425183142-94783f95db6c github.com/fsnotify/fsnotify v1.4.7 github.com/fsouza/go-dockerclient v1.6.0 diff --git a/go.sum b/go.sum index a96be5b1..557728a2 100644 --- a/go.sum +++ b/go.sum @@ -123,6 +123,8 @@ github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDD github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/ef-ds/deque v1.0.4-0.20190904040645-54cb57c252a1 h1:jFGzikHboUMRXmMBtwD/PbxoTHPs2919Irp/3rxMbvM= +github.com/ef-ds/deque v1.0.4-0.20190904040645-54cb57c252a1/go.mod h1:HvODWzv6Y6kBf3Ah2WzN1bHjDUezGLaAhwuWVwfpEJs= github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg= github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= diff --git a/store/cache/cache.go b/store/cache/cache.go deleted file mode 100644 index 1bb3852e..00000000 --- a/store/cache/cache.go +++ /dev/null @@ -1,39 +0,0 @@ -package cache - -import ( - "github.com/micro/go-micro/v2/store" - "github.com/pkg/errors" -) - -// Cache implements a cache in front of a micro Store -type Cache struct { - options store.Options - store.Store - - stores []store.Store -} - -// NewStore returns new cache -func NewStore(opts ...store.Option) store.Store { - s := &Cache{ - options: store.Options{}, - stores: []store.Store{}, - } - for _, o := range opts { - o(&s.options) - } - return s -} - -// Init initialises a new cache -func (c *Cache) Init(opts ...store.Option) error { - for _, o := range opts { - o(&c.options) - } - for _, s := range c.stores { - if err := s.Init(); err != nil { - return errors.Wrapf(err, "Store %s failed to Init()", s.String()) - } - } - return nil -} diff --git a/store/cache/cache_test.go b/store/cache/cache_test.go deleted file mode 100644 index bf01b9aa..00000000 --- a/store/cache/cache_test.go +++ /dev/null @@ -1,15 +0,0 @@ -package cache - -// import "testing" - -// func TestCache(t *testing.T) { -// c := NewStore() -// if err := c.Init(); err != nil { -// //t.Fatal(err) -// } -// if results, err := c.Read("test"); err != nil { -// //t.Fatal(err) -// } else { -// println(results) -// } -// } diff --git a/sync/store/cache.go b/sync/store/cache.go new file mode 100644 index 00000000..3c13e48a --- /dev/null +++ b/sync/store/cache.go @@ -0,0 +1,110 @@ +// Package store syncs multiple go-micro stores +package store + +import ( + "fmt" + "sync" + "time" + + "github.com/ef-ds/deque" + "github.com/micro/go-micro/v2/store" + "github.com/pkg/errors" +) + +// Cache implements a cache in front of go-micro Stores +type Cache interface { + store.Store + + // Force a full sync + Sync() error +} +type cache struct { + sOptions store.Options + cOptions Options + pendingWrites []*deque.Deque + pendingWriteTickers []*time.Ticker + sync.RWMutex +} + +// NewCache returns a new Cache +func NewCache(opts ...Option) Cache { + c := &cache{} + for _, o := range opts { + o(&c.cOptions) + } + if c.cOptions.SyncInterval == 0 { + c.cOptions.SyncInterval = 1 * time.Minute + } + if c.cOptions.SyncMultiplier == 0 { + c.cOptions.SyncMultiplier = 5 + } + return c +} + +// Init initialises the storeOptions +func (c *cache) Init(opts ...store.Option) error { + for _, o := range opts { + o(&c.sOptions) + } + if len(c.cOptions.Stores) == 0 { + return errors.New("the cache has no stores") + } + if c.sOptions.Context == nil { + return errors.New("please provide a context to the cache. Cancelling the context signals that the cache is being disposed and syncs the cache") + } + for _, s := range c.cOptions.Stores { + if err := s.Init(); err != nil { + return errors.Wrapf(err, "Store %s failed to Init()", s.String()) + } + } + c.pendingWrites = make([]*deque.Deque, len(c.cOptions.Stores)-1) + c.pendingWriteTickers = make([]*time.Ticker, len(c.cOptions.Stores)-1) + for i := 0; i < len(c.pendingWrites); i++ { + c.pendingWrites[i] = deque.New() + c.pendingWrites[i].Init() + c.pendingWriteTickers[i] = time.NewTicker(c.cOptions.SyncInterval * time.Duration(intpow(c.cOptions.SyncMultiplier, int64(i)))) + } + go c.cacheManager() + return nil +} + +// Options returns the cache's store options +func (c *cache) Options() store.Options { + return c.sOptions +} + +// String returns a printable string describing the cache +func (c *cache) String() string { + backends := make([]string, len(c.cOptions.Stores)) + for i, s := range c.cOptions.Stores { + backends[i] = s.String() + } + return fmt.Sprintf("cache %v", backends) +} + +func (c *cache) List(opts ...store.ListOption) ([]string, error) { + return c.cOptions.Stores[0].List(opts...) +} + +func (c *cache) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) { + return c.cOptions.Stores[0].Read(key, opts...) +} + +func (c *cache) Write(r *store.Record, opts ...store.WriteOption) error { + return c.cOptions.Stores[0].Write(r, opts...) +} + +// Delete removes a key from the cache +func (c *cache) Delete(key string, opts ...store.DeleteOption) error { + return c.cOptions.Stores[0].Delete(key, opts...) +} + +func (c *cache) Sync() error { + return nil +} + +type internalRecord struct { + key string + value []byte + expiresAt time.Time +} diff --git a/sync/store/cache_test.go b/sync/store/cache_test.go new file mode 100644 index 00000000..23d3aecd --- /dev/null +++ b/sync/store/cache_test.go @@ -0,0 +1,26 @@ +package store + +import ( + "context" + "testing" + "time" + + "github.com/micro/go-micro/v2/store" + "github.com/micro/go-micro/v2/store/memory" +) + +func TestCacheTicker(t *testing.T) { + l0 := memory.NewStore() + l0.Init() + l1 := memory.NewStore() + l1.Init() + l2 := memory.NewStore() + l2.Init() + c := NewCache(Stores(l0, l1, l2), SyncInterval(1*time.Second), SyncMultiplier(2)) + + if err := c.Init(store.WithContext(context.Background())); err != nil { + t.Fatal(err) + } + + time.Sleep(30 * time.Second) +} diff --git a/sync/store/manager.go b/sync/store/manager.go new file mode 100644 index 00000000..58544aa4 --- /dev/null +++ b/sync/store/manager.go @@ -0,0 +1,89 @@ +package store + +import ( + "time" + + "github.com/micro/go-micro/v2/store" + "github.com/pkg/errors" +) + +type operation struct { + operation action + record *store.Record + deadline time.Time + retries int + maxiumum int +} + +// action represents the type of a queued operation +type action int + +const ( + readOp action = iota + 1 + writeOp + deleteOp + listOp +) + +func (c *cache) cacheManager() { + tickerAggregator := make(chan struct{ index int }) + for i, ticker := range c.pendingWriteTickers { + go func(index int, c chan struct{ index int }, t *time.Ticker) { + for range t.C { + c <- struct{ index int }{index: index} + } + }(i, tickerAggregator, ticker) + } + for { + select { + case i := <-tickerAggregator: + println(i.index, "ticked") + c.processQueue(i.index) + } + } +} + +func (c *cache) processQueue(index int) { + c.Lock() + defer c.Unlock() + q := c.pendingWrites[index] + for i := 0; i < q.Len(); i++ { + r, ok := q.PopFront() + if !ok { + panic(errors.Errorf("retrieved an invalid value from the L%d cache queue", index+1)) + } + ir, ok := r.(*internalRecord) + if !ok { + panic(errors.Errorf("retrieved a non-internal record from the L%d cache queue", index+1)) + } + if !ir.expiresAt.IsZero() && time.Now().After(ir.expiresAt) { + continue + } + nr := &store.Record{ + Key: ir.key, + } + nr.Value = make([]byte, len(ir.value)) + copy(nr.Value, ir.value) + if !ir.expiresAt.IsZero() { + nr.Expiry = time.Until(ir.expiresAt) + } + // Todo = internal queue also has to hold the corresponding store.WriteOptions + if err := c.cOptions.Stores[index+1].Write(nr); err != nil { + // some error, so queue for retry and bail + q.PushBack(ir) + return + } + } +} + +func intpow(x, y int64) int64 { + result := int64(1) + for 0 != y { + if 0 != (y & 1) { + result *= x + } + y >>= 1 + x *= x + } + return result +} diff --git a/sync/store/options.go b/sync/store/options.go new file mode 100644 index 00000000..f2a75534 --- /dev/null +++ b/sync/store/options.go @@ -0,0 +1,44 @@ +package store + +import ( + "time" + + "github.com/micro/go-micro/v2/store" +) + +// Options represents Cache options +type Options struct { + // Stores represents layers in the cache in ascending order. L0, L1, L2, etc + Stores []store.Store + // SyncInterval is the duration between syncs from L0 to L1 + SyncInterval time.Duration + // SyncMultiplier is the multiplication factor between each store. + SyncMultiplier int64 +} + +// Option sets Cache Options +type Option func(o *Options) + +// Stores sets the layers that make up the cache +func Stores(stores ...store.Store) Option { + return func(o *Options) { + o.Stores = make([]store.Store, len(stores)) + for i, s := range stores { + o.Stores[i] = s + } + } +} + +// SyncInterval sets the duration between syncs from L0 to L1 +func SyncInterval(d time.Duration) Option { + return func(o *Options) { + o.SyncInterval = d + } +} + +// SyncMultiplier sets the multiplication factor for time to wait each cache layer +func SyncMultiplier(i int64) Option { + return func(o *Options) { + o.SyncMultiplier = i + } +}