[WIP] Store Sync (#1365)
* Initial cache implementation * Write queue implementation * Accidentally started writing the storage sync service
This commit is contained in:
		
							
								
								
									
										1
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										1
									
								
								go.mod
									
									
									
									
									
								
							| @@ -15,6 +15,7 @@ require ( | |||||||
| 	github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect | 	github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect | ||||||
| 	github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect | 	github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect | ||||||
| 	github.com/dgrijalva/jwt-go v3.2.0+incompatible | 	github.com/dgrijalva/jwt-go v3.2.0+incompatible | ||||||
|  | 	github.com/ef-ds/deque v1.0.4-0.20190904040645-54cb57c252a1 | ||||||
| 	github.com/forestgiant/sliceutil v0.0.0-20160425183142-94783f95db6c | 	github.com/forestgiant/sliceutil v0.0.0-20160425183142-94783f95db6c | ||||||
| 	github.com/fsnotify/fsnotify v1.4.7 | 	github.com/fsnotify/fsnotify v1.4.7 | ||||||
| 	github.com/fsouza/go-dockerclient v1.6.0 | 	github.com/fsouza/go-dockerclient v1.6.0 | ||||||
|   | |||||||
							
								
								
									
										2
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								go.sum
									
									
									
									
									
								
							| @@ -123,6 +123,8 @@ github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDD | |||||||
| github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= | github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= | ||||||
| github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= | github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= | ||||||
| github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= | github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= | ||||||
|  | github.com/ef-ds/deque v1.0.4-0.20190904040645-54cb57c252a1 h1:jFGzikHboUMRXmMBtwD/PbxoTHPs2919Irp/3rxMbvM= | ||||||
|  | github.com/ef-ds/deque v1.0.4-0.20190904040645-54cb57c252a1/go.mod h1:HvODWzv6Y6kBf3Ah2WzN1bHjDUezGLaAhwuWVwfpEJs= | ||||||
| github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg= | github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg= | ||||||
| github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o= | github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o= | ||||||
| github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= | github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= | ||||||
|   | |||||||
							
								
								
									
										39
									
								
								store/cache/cache.go
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										39
									
								
								store/cache/cache.go
									
									
									
									
										vendored
									
									
								
							| @@ -1,39 +0,0 @@ | |||||||
| package cache |  | ||||||
|  |  | ||||||
| import ( |  | ||||||
| 	"github.com/micro/go-micro/v2/store" |  | ||||||
| 	"github.com/pkg/errors" |  | ||||||
| ) |  | ||||||
|  |  | ||||||
| // Cache implements a cache in front of a micro Store |  | ||||||
| type Cache struct { |  | ||||||
| 	options store.Options |  | ||||||
| 	store.Store |  | ||||||
|  |  | ||||||
| 	stores []store.Store |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // NewStore returns new cache |  | ||||||
| func NewStore(opts ...store.Option) store.Store { |  | ||||||
| 	s := &Cache{ |  | ||||||
| 		options: store.Options{}, |  | ||||||
| 		stores:  []store.Store{}, |  | ||||||
| 	} |  | ||||||
| 	for _, o := range opts { |  | ||||||
| 		o(&s.options) |  | ||||||
| 	} |  | ||||||
| 	return s |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // Init initialises a new cache |  | ||||||
| func (c *Cache) Init(opts ...store.Option) error { |  | ||||||
| 	for _, o := range opts { |  | ||||||
| 		o(&c.options) |  | ||||||
| 	} |  | ||||||
| 	for _, s := range c.stores { |  | ||||||
| 		if err := s.Init(); err != nil { |  | ||||||
| 			return errors.Wrapf(err, "Store %s failed to Init()", s.String()) |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 	return nil |  | ||||||
| } |  | ||||||
							
								
								
									
										15
									
								
								store/cache/cache_test.go
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										15
									
								
								store/cache/cache_test.go
									
									
									
									
										vendored
									
									
								
							| @@ -1,15 +0,0 @@ | |||||||
| package cache |  | ||||||
|  |  | ||||||
| // import "testing" |  | ||||||
|  |  | ||||||
| // func TestCache(t *testing.T) { |  | ||||||
| // 	c := NewStore() |  | ||||||
| // 	if err := c.Init(); err != nil { |  | ||||||
| // 		//t.Fatal(err) |  | ||||||
| // 	} |  | ||||||
| // 	if results, err := c.Read("test"); err != nil { |  | ||||||
| // 		//t.Fatal(err) |  | ||||||
| // 	} else { |  | ||||||
| // 		println(results) |  | ||||||
| // 	} |  | ||||||
| // } |  | ||||||
							
								
								
									
										110
									
								
								sync/store/cache.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										110
									
								
								sync/store/cache.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,110 @@ | |||||||
|  | // Package store syncs multiple go-micro stores | ||||||
|  | package store | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"fmt" | ||||||
|  | 	"sync" | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
|  | 	"github.com/ef-ds/deque" | ||||||
|  | 	"github.com/micro/go-micro/v2/store" | ||||||
|  | 	"github.com/pkg/errors" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | // Cache implements a cache in front of go-micro Stores | ||||||
|  | type Cache interface { | ||||||
|  | 	store.Store | ||||||
|  |  | ||||||
|  | 	// Force a full sync | ||||||
|  | 	Sync() error | ||||||
|  | } | ||||||
|  | type cache struct { | ||||||
|  | 	sOptions            store.Options | ||||||
|  | 	cOptions            Options | ||||||
|  | 	pendingWrites       []*deque.Deque | ||||||
|  | 	pendingWriteTickers []*time.Ticker | ||||||
|  | 	sync.RWMutex | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // NewCache returns a new Cache | ||||||
|  | func NewCache(opts ...Option) Cache { | ||||||
|  | 	c := &cache{} | ||||||
|  | 	for _, o := range opts { | ||||||
|  | 		o(&c.cOptions) | ||||||
|  | 	} | ||||||
|  | 	if c.cOptions.SyncInterval == 0 { | ||||||
|  | 		c.cOptions.SyncInterval = 1 * time.Minute | ||||||
|  | 	} | ||||||
|  | 	if c.cOptions.SyncMultiplier == 0 { | ||||||
|  | 		c.cOptions.SyncMultiplier = 5 | ||||||
|  | 	} | ||||||
|  | 	return c | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Init initialises the storeOptions | ||||||
|  | func (c *cache) Init(opts ...store.Option) error { | ||||||
|  | 	for _, o := range opts { | ||||||
|  | 		o(&c.sOptions) | ||||||
|  | 	} | ||||||
|  | 	if len(c.cOptions.Stores) == 0 { | ||||||
|  | 		return errors.New("the cache has no stores") | ||||||
|  | 	} | ||||||
|  | 	if c.sOptions.Context == nil { | ||||||
|  | 		return errors.New("please provide a context to the cache. Cancelling the context signals that the cache is being disposed and syncs the cache") | ||||||
|  | 	} | ||||||
|  | 	for _, s := range c.cOptions.Stores { | ||||||
|  | 		if err := s.Init(); err != nil { | ||||||
|  | 			return errors.Wrapf(err, "Store %s failed to Init()", s.String()) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	c.pendingWrites = make([]*deque.Deque, len(c.cOptions.Stores)-1) | ||||||
|  | 	c.pendingWriteTickers = make([]*time.Ticker, len(c.cOptions.Stores)-1) | ||||||
|  | 	for i := 0; i < len(c.pendingWrites); i++ { | ||||||
|  | 		c.pendingWrites[i] = deque.New() | ||||||
|  | 		c.pendingWrites[i].Init() | ||||||
|  | 		c.pendingWriteTickers[i] = time.NewTicker(c.cOptions.SyncInterval * time.Duration(intpow(c.cOptions.SyncMultiplier, int64(i)))) | ||||||
|  | 	} | ||||||
|  | 	go c.cacheManager() | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Options returns the cache's store options | ||||||
|  | func (c *cache) Options() store.Options { | ||||||
|  | 	return c.sOptions | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // String returns a printable string describing the cache | ||||||
|  | func (c *cache) String() string { | ||||||
|  | 	backends := make([]string, len(c.cOptions.Stores)) | ||||||
|  | 	for i, s := range c.cOptions.Stores { | ||||||
|  | 		backends[i] = s.String() | ||||||
|  | 	} | ||||||
|  | 	return fmt.Sprintf("cache %v", backends) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *cache) List(opts ...store.ListOption) ([]string, error) { | ||||||
|  | 	return c.cOptions.Stores[0].List(opts...) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *cache) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) { | ||||||
|  | 	return c.cOptions.Stores[0].Read(key, opts...) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *cache) Write(r *store.Record, opts ...store.WriteOption) error { | ||||||
|  | 	return c.cOptions.Stores[0].Write(r, opts...) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Delete removes a key from the cache | ||||||
|  | func (c *cache) Delete(key string, opts ...store.DeleteOption) error { | ||||||
|  | 	return c.cOptions.Stores[0].Delete(key, opts...) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *cache) Sync() error { | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type internalRecord struct { | ||||||
|  | 	key       string | ||||||
|  | 	value     []byte | ||||||
|  | 	expiresAt time.Time | ||||||
|  | } | ||||||
							
								
								
									
										26
									
								
								sync/store/cache_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										26
									
								
								sync/store/cache_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,26 @@ | |||||||
|  | package store | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"testing" | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
|  | 	"github.com/micro/go-micro/v2/store" | ||||||
|  | 	"github.com/micro/go-micro/v2/store/memory" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | func TestCacheTicker(t *testing.T) { | ||||||
|  | 	l0 := memory.NewStore() | ||||||
|  | 	l0.Init() | ||||||
|  | 	l1 := memory.NewStore() | ||||||
|  | 	l1.Init() | ||||||
|  | 	l2 := memory.NewStore() | ||||||
|  | 	l2.Init() | ||||||
|  | 	c := NewCache(Stores(l0, l1, l2), SyncInterval(1*time.Second), SyncMultiplier(2)) | ||||||
|  |  | ||||||
|  | 	if err := c.Init(store.WithContext(context.Background())); err != nil { | ||||||
|  | 		t.Fatal(err) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	time.Sleep(30 * time.Second) | ||||||
|  | } | ||||||
							
								
								
									
										89
									
								
								sync/store/manager.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										89
									
								
								sync/store/manager.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,89 @@ | |||||||
|  | package store | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
|  | 	"github.com/micro/go-micro/v2/store" | ||||||
|  | 	"github.com/pkg/errors" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | type operation struct { | ||||||
|  | 	operation action | ||||||
|  | 	record    *store.Record | ||||||
|  | 	deadline  time.Time | ||||||
|  | 	retries   int | ||||||
|  | 	maxiumum  int | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // action represents the type of a queued operation | ||||||
|  | type action int | ||||||
|  |  | ||||||
|  | const ( | ||||||
|  | 	readOp action = iota + 1 | ||||||
|  | 	writeOp | ||||||
|  | 	deleteOp | ||||||
|  | 	listOp | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | func (c *cache) cacheManager() { | ||||||
|  | 	tickerAggregator := make(chan struct{ index int }) | ||||||
|  | 	for i, ticker := range c.pendingWriteTickers { | ||||||
|  | 		go func(index int, c chan struct{ index int }, t *time.Ticker) { | ||||||
|  | 			for range t.C { | ||||||
|  | 				c <- struct{ index int }{index: index} | ||||||
|  | 			} | ||||||
|  | 		}(i, tickerAggregator, ticker) | ||||||
|  | 	} | ||||||
|  | 	for { | ||||||
|  | 		select { | ||||||
|  | 		case i := <-tickerAggregator: | ||||||
|  | 			println(i.index, "ticked") | ||||||
|  | 			c.processQueue(i.index) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *cache) processQueue(index int) { | ||||||
|  | 	c.Lock() | ||||||
|  | 	defer c.Unlock() | ||||||
|  | 	q := c.pendingWrites[index] | ||||||
|  | 	for i := 0; i < q.Len(); i++ { | ||||||
|  | 		r, ok := q.PopFront() | ||||||
|  | 		if !ok { | ||||||
|  | 			panic(errors.Errorf("retrieved an invalid value from the L%d cache queue", index+1)) | ||||||
|  | 		} | ||||||
|  | 		ir, ok := r.(*internalRecord) | ||||||
|  | 		if !ok { | ||||||
|  | 			panic(errors.Errorf("retrieved a non-internal record from the L%d cache queue", index+1)) | ||||||
|  | 		} | ||||||
|  | 		if !ir.expiresAt.IsZero() && time.Now().After(ir.expiresAt) { | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 		nr := &store.Record{ | ||||||
|  | 			Key: ir.key, | ||||||
|  | 		} | ||||||
|  | 		nr.Value = make([]byte, len(ir.value)) | ||||||
|  | 		copy(nr.Value, ir.value) | ||||||
|  | 		if !ir.expiresAt.IsZero() { | ||||||
|  | 			nr.Expiry = time.Until(ir.expiresAt) | ||||||
|  | 		} | ||||||
|  | 		// Todo = internal queue also has to hold the corresponding store.WriteOptions | ||||||
|  | 		if err := c.cOptions.Stores[index+1].Write(nr); err != nil { | ||||||
|  | 			// some error, so queue for retry and bail | ||||||
|  | 			q.PushBack(ir) | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func intpow(x, y int64) int64 { | ||||||
|  | 	result := int64(1) | ||||||
|  | 	for 0 != y { | ||||||
|  | 		if 0 != (y & 1) { | ||||||
|  | 			result *= x | ||||||
|  | 		} | ||||||
|  | 		y >>= 1 | ||||||
|  | 		x *= x | ||||||
|  | 	} | ||||||
|  | 	return result | ||||||
|  | } | ||||||
							
								
								
									
										44
									
								
								sync/store/options.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										44
									
								
								sync/store/options.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,44 @@ | |||||||
|  | package store | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
|  | 	"github.com/micro/go-micro/v2/store" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | // Options represents Cache options | ||||||
|  | type Options struct { | ||||||
|  | 	// Stores represents layers in the cache in ascending order. L0, L1, L2, etc | ||||||
|  | 	Stores []store.Store | ||||||
|  | 	// SyncInterval is the duration between syncs from L0 to L1 | ||||||
|  | 	SyncInterval time.Duration | ||||||
|  | 	// SyncMultiplier is the multiplication factor between each store. | ||||||
|  | 	SyncMultiplier int64 | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Option sets Cache Options | ||||||
|  | type Option func(o *Options) | ||||||
|  |  | ||||||
|  | // Stores sets the layers that make up the cache | ||||||
|  | func Stores(stores ...store.Store) Option { | ||||||
|  | 	return func(o *Options) { | ||||||
|  | 		o.Stores = make([]store.Store, len(stores)) | ||||||
|  | 		for i, s := range stores { | ||||||
|  | 			o.Stores[i] = s | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // SyncInterval sets the duration between syncs from L0 to L1 | ||||||
|  | func SyncInterval(d time.Duration) Option { | ||||||
|  | 	return func(o *Options) { | ||||||
|  | 		o.SyncInterval = d | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // SyncMultiplier sets the multiplication factor for time to wait each cache layer | ||||||
|  | func SyncMultiplier(i int64) Option { | ||||||
|  | 	return func(o *Options) { | ||||||
|  | 		o.SyncMultiplier = i | ||||||
|  | 	} | ||||||
|  | } | ||||||
		Reference in New Issue
	
	Block a user