From c612d86480ad8f42dcc6a65b9945b9828c397b54 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Sat, 11 Apr 2020 09:33:10 +0100 Subject: [PATCH] Move sync store --- {sync/store => store/sync}/manager.go | 12 +-- {sync/store => store/sync}/options.go | 12 +-- store/sync/sync.go | 114 ++++++++++++++++++++++++++ sync/store/cache.go | 114 -------------------------- 4 files changed, 126 insertions(+), 126 deletions(-) rename {sync/store => store/sync}/manager.go (88%) rename {sync/store => store/sync}/options.go (78%) create mode 100644 store/sync/sync.go delete mode 100644 sync/store/cache.go diff --git a/sync/store/manager.go b/store/sync/manager.go similarity index 88% rename from sync/store/manager.go rename to store/sync/manager.go index 58544aa4..6f7c5d6e 100644 --- a/sync/store/manager.go +++ b/store/sync/manager.go @@ -1,4 +1,4 @@ -package store +package sync import ( "time" @@ -25,7 +25,7 @@ const ( listOp ) -func (c *cache) cacheManager() { +func (c *syncStore) syncManager() { tickerAggregator := make(chan struct{ index int }) for i, ticker := range c.pendingWriteTickers { go func(index int, c chan struct{ index int }, t *time.Ticker) { @@ -43,18 +43,18 @@ func (c *cache) cacheManager() { } } -func (c *cache) processQueue(index int) { +func (c *syncStore) processQueue(index int) { c.Lock() defer c.Unlock() q := c.pendingWrites[index] for i := 0; i < q.Len(); i++ { r, ok := q.PopFront() if !ok { - panic(errors.Errorf("retrieved an invalid value from the L%d cache queue", index+1)) + panic(errors.Errorf("retrieved an invalid value from the L%d sync queue", index+1)) } ir, ok := r.(*internalRecord) if !ok { - panic(errors.Errorf("retrieved a non-internal record from the L%d cache queue", index+1)) + panic(errors.Errorf("retrieved a non-internal record from the L%d sync queue", index+1)) } if !ir.expiresAt.IsZero() && time.Now().After(ir.expiresAt) { continue @@ -68,7 +68,7 @@ func (c *cache) processQueue(index int) { nr.Expiry = time.Until(ir.expiresAt) } // Todo = internal queue also has to hold the corresponding store.WriteOptions - if err := c.cOptions.Stores[index+1].Write(nr); err != nil { + if err := c.syncOpts.Stores[index+1].Write(nr); err != nil { // some error, so queue for retry and bail q.PushBack(ir) return diff --git a/sync/store/options.go b/store/sync/options.go similarity index 78% rename from sync/store/options.go rename to store/sync/options.go index f2a75534..30e493f2 100644 --- a/sync/store/options.go +++ b/store/sync/options.go @@ -1,4 +1,4 @@ -package store +package sync import ( "time" @@ -6,9 +6,9 @@ import ( "github.com/micro/go-micro/v2/store" ) -// Options represents Cache options +// Options represents Sync options type Options struct { - // Stores represents layers in the cache in ascending order. L0, L1, L2, etc + // Stores represents layers in the sync in ascending order. L0, L1, L2, etc Stores []store.Store // SyncInterval is the duration between syncs from L0 to L1 SyncInterval time.Duration @@ -16,10 +16,10 @@ type Options struct { SyncMultiplier int64 } -// Option sets Cache Options +// Option sets Sync Options type Option func(o *Options) -// Stores sets the layers that make up the cache +// Stores sets the layers that make up the sync func Stores(stores ...store.Store) Option { return func(o *Options) { o.Stores = make([]store.Store, len(stores)) @@ -36,7 +36,7 @@ func SyncInterval(d time.Duration) Option { } } -// SyncMultiplier sets the multiplication factor for time to wait each cache layer +// SyncMultiplier sets the multiplication factor for time to wait each sync layer func SyncMultiplier(i int64) Option { return func(o *Options) { o.SyncMultiplier = i diff --git a/store/sync/sync.go b/store/sync/sync.go new file mode 100644 index 00000000..79a7e36e --- /dev/null +++ b/store/sync/sync.go @@ -0,0 +1,114 @@ +// Package syncs will sync multiple stores +package sync + +import ( + "fmt" + "sync" + "time" + + "github.com/ef-ds/deque" + "github.com/micro/go-micro/v2/store" + "github.com/pkg/errors" +) + +// Sync implements a sync in for stores +type Sync interface { + // Implements the store interface + store.Store + // Force a full sync + Sync() error +} +type syncStore struct { + storeOpts store.Options + syncOpts Options + pendingWrites []*deque.Deque + pendingWriteTickers []*time.Ticker + sync.RWMutex +} + +// NewSync returns a new Sync +func NewSync(opts ...Option) Sync { + c := &syncStore{} + for _, o := range opts { + o(&c.syncOpts) + } + if c.syncOpts.SyncInterval == 0 { + c.syncOpts.SyncInterval = 1 * time.Minute + } + if c.syncOpts.SyncMultiplier == 0 { + c.syncOpts.SyncMultiplier = 5 + } + return c +} + +func (c *syncStore) Close() error { + return nil +} + +// Init initialises the storeOptions +func (c *syncStore) Init(opts ...store.Option) error { + for _, o := range opts { + o(&c.storeOpts) + } + if len(c.syncOpts.Stores) == 0 { + return errors.New("the sync has no stores") + } + if c.storeOpts.Context == nil { + return errors.New("please provide a context to the sync. Cancelling the context signals that the sync is being disposed and syncs the sync") + } + for _, s := range c.syncOpts.Stores { + if err := s.Init(); err != nil { + return errors.Wrapf(err, "Store %s failed to Init()", s.String()) + } + } + c.pendingWrites = make([]*deque.Deque, len(c.syncOpts.Stores)-1) + c.pendingWriteTickers = make([]*time.Ticker, len(c.syncOpts.Stores)-1) + for i := 0; i < len(c.pendingWrites); i++ { + c.pendingWrites[i] = deque.New() + c.pendingWrites[i].Init() + c.pendingWriteTickers[i] = time.NewTicker(c.syncOpts.SyncInterval * time.Duration(intpow(c.syncOpts.SyncMultiplier, int64(i)))) + } + go c.syncManager() + return nil +} + +// Options returns the sync's store options +func (c *syncStore) Options() store.Options { + return c.storeOpts +} + +// String returns a printable string describing the sync +func (c *syncStore) String() string { + backends := make([]string, len(c.syncOpts.Stores)) + for i, s := range c.syncOpts.Stores { + backends[i] = s.String() + } + return fmt.Sprintf("sync %v", backends) +} + +func (c *syncStore) List(opts ...store.ListOption) ([]string, error) { + return c.syncOpts.Stores[0].List(opts...) +} + +func (c *syncStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) { + return c.syncOpts.Stores[0].Read(key, opts...) +} + +func (c *syncStore) Write(r *store.Record, opts ...store.WriteOption) error { + return c.syncOpts.Stores[0].Write(r, opts...) +} + +// Delete removes a key from the sync +func (c *syncStore) Delete(key string, opts ...store.DeleteOption) error { + return c.syncOpts.Stores[0].Delete(key, opts...) +} + +func (c *syncStore) Sync() error { + return nil +} + +type internalRecord struct { + key string + value []byte + expiresAt time.Time +} diff --git a/sync/store/cache.go b/sync/store/cache.go deleted file mode 100644 index 2385e961..00000000 --- a/sync/store/cache.go +++ /dev/null @@ -1,114 +0,0 @@ -// Package store syncs multiple go-micro stores -package store - -import ( - "fmt" - "sync" - "time" - - "github.com/ef-ds/deque" - "github.com/micro/go-micro/v2/store" - "github.com/pkg/errors" -) - -// Cache implements a cache in front of go-micro Stores -type Cache interface { - store.Store - - // Force a full sync - Sync() error -} -type cache struct { - sOptions store.Options - cOptions Options - pendingWrites []*deque.Deque - pendingWriteTickers []*time.Ticker - sync.RWMutex -} - -// NewCache returns a new Cache -func NewCache(opts ...Option) Cache { - c := &cache{} - for _, o := range opts { - o(&c.cOptions) - } - if c.cOptions.SyncInterval == 0 { - c.cOptions.SyncInterval = 1 * time.Minute - } - if c.cOptions.SyncMultiplier == 0 { - c.cOptions.SyncMultiplier = 5 - } - return c -} - -func (c *cache) Close() error { - return nil -} - -// Init initialises the storeOptions -func (c *cache) Init(opts ...store.Option) error { - for _, o := range opts { - o(&c.sOptions) - } - if len(c.cOptions.Stores) == 0 { - return errors.New("the cache has no stores") - } - if c.sOptions.Context == nil { - return errors.New("please provide a context to the cache. Cancelling the context signals that the cache is being disposed and syncs the cache") - } - for _, s := range c.cOptions.Stores { - if err := s.Init(); err != nil { - return errors.Wrapf(err, "Store %s failed to Init()", s.String()) - } - } - c.pendingWrites = make([]*deque.Deque, len(c.cOptions.Stores)-1) - c.pendingWriteTickers = make([]*time.Ticker, len(c.cOptions.Stores)-1) - for i := 0; i < len(c.pendingWrites); i++ { - c.pendingWrites[i] = deque.New() - c.pendingWrites[i].Init() - c.pendingWriteTickers[i] = time.NewTicker(c.cOptions.SyncInterval * time.Duration(intpow(c.cOptions.SyncMultiplier, int64(i)))) - } - go c.cacheManager() - return nil -} - -// Options returns the cache's store options -func (c *cache) Options() store.Options { - return c.sOptions -} - -// String returns a printable string describing the cache -func (c *cache) String() string { - backends := make([]string, len(c.cOptions.Stores)) - for i, s := range c.cOptions.Stores { - backends[i] = s.String() - } - return fmt.Sprintf("cache %v", backends) -} - -func (c *cache) List(opts ...store.ListOption) ([]string, error) { - return c.cOptions.Stores[0].List(opts...) -} - -func (c *cache) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) { - return c.cOptions.Stores[0].Read(key, opts...) -} - -func (c *cache) Write(r *store.Record, opts ...store.WriteOption) error { - return c.cOptions.Stores[0].Write(r, opts...) -} - -// Delete removes a key from the cache -func (c *cache) Delete(key string, opts ...store.DeleteOption) error { - return c.cOptions.Stores[0].Delete(key, opts...) -} - -func (c *cache) Sync() error { - return nil -} - -type internalRecord struct { - key string - value []byte - expiresAt time.Time -}