Add a cache to workers KV storage implementation (#1284)
* cloudflare-cache * go mod tidy
This commit is contained in:
@@ -19,6 +19,8 @@ import (
|
||||
|
||||
"github.com/micro/go-micro/v2/store"
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/ReneKroon/ttlcache"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -35,6 +37,8 @@ type workersKV struct {
|
||||
namespace string
|
||||
// http client to use
|
||||
httpClient *http.Client
|
||||
// cache
|
||||
cache *ttlcache.Cache
|
||||
}
|
||||
|
||||
// apiResponse is a cloudflare v4 api response
|
||||
@@ -103,6 +107,16 @@ func (w *workersKV) Init(opts ...store.Option) error {
|
||||
if len(w.options.Namespace) > 0 {
|
||||
w.namespace = w.options.Namespace
|
||||
}
|
||||
ttl := w.options.Context.Value("STORE_CACHE_TTL")
|
||||
if ttl != nil {
|
||||
ttlint64, ok := ttl.(int64)
|
||||
if !ok {
|
||||
log.Fatal("STORE_CACHE_TTL from context must be type int64")
|
||||
}
|
||||
w.cache = ttlcache.NewCache()
|
||||
w.cache.SetTTL(time.Duration(ttlint64))
|
||||
w.cache.SkipTtlExtensionOnHit(true)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -191,6 +205,15 @@ func (w *workersKV) Read(key string, opts ...store.ReadOption) ([]*store.Record,
|
||||
var records []*store.Record
|
||||
|
||||
for _, k := range keys {
|
||||
if w.cache != nil {
|
||||
if resp, hit := w.cache.Get(k); hit {
|
||||
if record, ok := resp.(*store.Record); ok {
|
||||
records = append(records, record)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
path := fmt.Sprintf("accounts/%s/storage/kv/namespaces/%s/values/%s", w.account, w.namespace, url.PathEscape(k))
|
||||
response, headers, status, err := w.request(ctx, http.MethodGet, path, nil, make(http.Header))
|
||||
if err != nil {
|
||||
@@ -210,6 +233,7 @@ func (w *workersKV) Read(key string, opts ...store.ReadOption) ([]*store.Record,
|
||||
}
|
||||
record.Expiry = time.Until(time.Unix(expiryUnix, 0))
|
||||
}
|
||||
w.cache.Set(record.Key, record)
|
||||
records = append(records, record)
|
||||
}
|
||||
|
||||
@@ -217,6 +241,10 @@ func (w *workersKV) Read(key string, opts ...store.ReadOption) ([]*store.Record,
|
||||
}
|
||||
|
||||
func (w *workersKV) Write(r *store.Record) error {
|
||||
// Set it in local cache, with the global TTL from options
|
||||
if w.cache != nil {
|
||||
w.cache.Set(r.Key, r)
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
@@ -336,7 +364,7 @@ func (w *workersKV) String() string {
|
||||
return "cloudflare"
|
||||
}
|
||||
|
||||
// New returns a cloudflare Store implementation.
|
||||
// NewStore returns a cloudflare Store implementation.
|
||||
// Account ID, Token and Namespace must either be passed as options or
|
||||
// environment variables. If set as env vars we expect the following;
|
||||
// CF_API_TOKEN to a cloudflare API token scoped to Workers KV.
|
||||
|
@@ -24,6 +24,7 @@ func TestCloudflare(t *testing.T) {
|
||||
Token(apiToken),
|
||||
Account(accountID),
|
||||
Namespace(kvID),
|
||||
CacheTTL(60000000000),
|
||||
)
|
||||
|
||||
records, err := wkv.List()
|
||||
|
@@ -51,3 +51,13 @@ func Namespace(ns string) store.Option {
|
||||
o.Namespace = ns
|
||||
}
|
||||
}
|
||||
|
||||
// CacheTTL sets the timeout in nanoseconds of the read/write cache
|
||||
func CacheTTL(ttl int64) store.Option {
|
||||
return func(o *store.Options) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
o.Context = context.WithValue(o.Context, "STORE_CACHE_TTL", ttl)
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user