Remove cloudflare-go and reimplement workers KV
This commit is contained in:
parent
caca93f65b
commit
4f5db08238
@ -3,20 +3,29 @@
|
|||||||
package cloudflare
|
package cloudflare
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
"log"
|
"log"
|
||||||
|
"math"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/cloudflare/cloudflare-go"
|
|
||||||
"github.com/micro/go-micro/config/options"
|
"github.com/micro/go-micro/config/options"
|
||||||
"github.com/micro/go-micro/store"
|
"github.com/micro/go-micro/store"
|
||||||
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
var namespaceUUID string
|
const apiBaseURL = "https://api.cloudflare.com/client/v4/"
|
||||||
|
|
||||||
type workersKV struct {
|
type workersKV struct {
|
||||||
options.Options
|
options.Options
|
||||||
api *cloudflare.API
|
httpClient *http.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
// New returns a cloudflare Store implementation.
|
// New returns a cloudflare Store implementation.
|
||||||
@ -30,7 +39,7 @@ func New(opts ...options.Option) (store.Store, error) {
|
|||||||
if !ok {
|
if !ok {
|
||||||
log.Fatal("Store: No CF_API_TOKEN passed as an option")
|
log.Fatal("Store: No CF_API_TOKEN passed as an option")
|
||||||
}
|
}
|
||||||
apiTokenString, ok := apiToken.(string)
|
_, ok = apiToken.(string)
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Fatal("Store: Option CF_API_TOKEN contains a non-string")
|
log.Fatal("Store: Option CF_API_TOKEN contains a non-string")
|
||||||
}
|
}
|
||||||
@ -38,7 +47,7 @@ func New(opts ...options.Option) (store.Store, error) {
|
|||||||
if !ok {
|
if !ok {
|
||||||
log.Fatal("Store: No CF_ACCOUNT_ID passed as an option")
|
log.Fatal("Store: No CF_ACCOUNT_ID passed as an option")
|
||||||
}
|
}
|
||||||
accountIDString, ok := accountID.(string)
|
_, ok = accountID.(string)
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Fatal("Store: Option CF_ACCOUNT_ID contains a non-string")
|
log.Fatal("Store: Option CF_ACCOUNT_ID contains a non-string")
|
||||||
}
|
}
|
||||||
@ -46,31 +55,45 @@ func New(opts ...options.Option) (store.Store, error) {
|
|||||||
if !ok {
|
if !ok {
|
||||||
log.Fatal("Store: No KV_NAMESPACE_ID passed as an option")
|
log.Fatal("Store: No KV_NAMESPACE_ID passed as an option")
|
||||||
}
|
}
|
||||||
namespaceUUID, ok = uuid.(string)
|
_, ok = uuid.(string)
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Fatal("Store: Option KV_NAMESPACE_ID contains a non-string")
|
log.Fatal("Store: Option KV_NAMESPACE_ID contains a non-string")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create API client
|
|
||||||
api, err := cloudflare.NewWithAPIToken(apiTokenString, cloudflare.UsingAccount(accountIDString))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return &workersKV{
|
return &workersKV{
|
||||||
Options: options,
|
Options: options,
|
||||||
api: api,
|
httpClient: &http.Client{},
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// In the cloudflare workers KV implemention, Sync() doesn't guarantee
|
// In the cloudflare workers KV implemention, Sync() doesn't guarantee
|
||||||
// anything as the workers API is eventually consistent.
|
// anything as the workers API is eventually consistent.
|
||||||
func (w *workersKV) Sync() ([]*store.Record, error) {
|
func (w *workersKV) Sync() ([]*store.Record, error) {
|
||||||
response, err := w.api.ListWorkersKVs(context.Background(), namespaceUUID)
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
accountID, _ := w.Options.Values().Get("CF_ACCOUNT_ID")
|
||||||
|
kvID, _ := w.Options.Values().Get("KV_NAMESPACE_ID")
|
||||||
|
|
||||||
|
path := fmt.Sprintf("accounts/%s/storage/kv/namespaces/%s/keys", accountID.(string), kvID.(string))
|
||||||
|
response, _, _, err := w.request(ctx, http.MethodGet, path, nil, make(http.Header))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
a := &APIResponse{}
|
||||||
|
if err := json.Unmarshal(response, a); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if !a.Success {
|
||||||
|
messages := ""
|
||||||
|
for _, m := range a.Errors {
|
||||||
|
messages += strconv.Itoa(m.Code) + " " + m.Message + "\n"
|
||||||
|
}
|
||||||
|
return nil, errors.New(messages)
|
||||||
|
}
|
||||||
|
|
||||||
var keys []string
|
var keys []string
|
||||||
for _, r := range response.Result {
|
for _, r := range a.Result {
|
||||||
keys = append(keys, r.Name)
|
keys = append(keys, r.Name)
|
||||||
}
|
}
|
||||||
return w.Read(keys...)
|
return w.Read(keys...)
|
||||||
@ -80,16 +103,31 @@ func (w *workersKV) Read(keys ...string) ([]*store.Record, error) {
|
|||||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
accountID, _ := w.Options.Values().Get("CF_ACCOUNT_ID")
|
||||||
|
kvID, _ := w.Options.Values().Get("KV_NAMESPACE_ID")
|
||||||
|
|
||||||
var records []*store.Record
|
var records []*store.Record
|
||||||
for _, k := range keys {
|
for _, k := range keys {
|
||||||
v, err := w.api.ReadWorkersKV(ctx, namespaceUUID, k)
|
path := fmt.Sprintf("accounts/%s/storage/kv/namespaces/%s/values/%s", accountID.(string), kvID.(string), url.PathEscape(k))
|
||||||
|
response, headers, status, err := w.request(ctx, http.MethodGet, path, nil, make(http.Header))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return records, err
|
return records, err
|
||||||
}
|
}
|
||||||
records = append(records, &store.Record{
|
if status < 200 || status >= 300 {
|
||||||
|
return records, errors.New("Received unexpected Status " + strconv.Itoa(status) + string(response))
|
||||||
|
}
|
||||||
|
record := &store.Record{
|
||||||
Key: k,
|
Key: k,
|
||||||
Value: v,
|
Value: response,
|
||||||
})
|
}
|
||||||
|
if expiry := headers.Get("Expiration"); len(expiry) != 0 {
|
||||||
|
expiryUnix, err := strconv.ParseInt(expiry, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return records, err
|
||||||
|
}
|
||||||
|
record.Expiry = time.Until(time.Unix(expiryUnix, 0))
|
||||||
|
}
|
||||||
|
records = append(records, record)
|
||||||
}
|
}
|
||||||
return records, nil
|
return records, nil
|
||||||
}
|
}
|
||||||
@ -98,10 +136,32 @@ func (w *workersKV) Write(records ...*store.Record) error {
|
|||||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
accountID, _ := w.Options.Values().Get("CF_ACCOUNT_ID")
|
||||||
|
kvID, _ := w.Options.Values().Get("KV_NAMESPACE_ID")
|
||||||
|
|
||||||
for _, r := range records {
|
for _, r := range records {
|
||||||
if _, err := w.api.WriteWorkersKV(ctx, namespaceUUID, r.Key, r.Value); err != nil {
|
path := fmt.Sprintf("accounts/%s/storage/kv/namespaces/%s/values/%s", accountID.(string), kvID.(string), url.PathEscape(r.Key))
|
||||||
|
if r.Expiry != 0 {
|
||||||
|
// Minimum cloudflare TTL is 60 Seconds
|
||||||
|
exp := int(math.Max(60, math.Round(r.Expiry.Seconds())))
|
||||||
|
path = path + "?expiration_ttl=" + strconv.Itoa(exp)
|
||||||
|
}
|
||||||
|
headers := make(http.Header)
|
||||||
|
resp, _, _, err := w.request(ctx, http.MethodPut, path, r.Value, headers)
|
||||||
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
a := &APIResponse{}
|
||||||
|
if err := json.Unmarshal(resp, a); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if !a.Success {
|
||||||
|
messages := ""
|
||||||
|
for _, m := range a.Errors {
|
||||||
|
messages += strconv.Itoa(m.Code) + " " + m.Message + "\n"
|
||||||
|
}
|
||||||
|
return errors.New(messages)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -110,10 +170,105 @@ func (w *workersKV) Delete(keys ...string) error {
|
|||||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
accountID, _ := w.Options.Values().Get("CF_ACCOUNT_ID")
|
||||||
|
kvID, _ := w.Options.Values().Get("KV_NAMESPACE_ID")
|
||||||
|
|
||||||
for _, k := range keys {
|
for _, k := range keys {
|
||||||
if _, err := w.api.DeleteWorkersKV(ctx, namespaceUUID, k); err != nil {
|
path := fmt.Sprintf("accounts/%s/storage/kv/namespaces/%s/values/%s", accountID.(string), kvID.(string), url.PathEscape(k))
|
||||||
|
resp, _, _, err := w.request(ctx, http.MethodDelete, path, nil, make(http.Header))
|
||||||
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
a := &APIResponse{}
|
||||||
|
if err := json.Unmarshal(resp, a); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if !a.Success {
|
||||||
|
messages := ""
|
||||||
|
for _, m := range a.Errors {
|
||||||
|
messages += strconv.Itoa(m.Code) + " " + m.Message + "\n"
|
||||||
|
}
|
||||||
|
return errors.New(messages)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *workersKV) request(ctx context.Context, method, path string, body interface{}, headers http.Header) ([]byte, http.Header, int, error) {
|
||||||
|
var jsonBody []byte
|
||||||
|
var err error
|
||||||
|
|
||||||
|
if body != nil {
|
||||||
|
if paramBytes, ok := body.([]byte); ok {
|
||||||
|
jsonBody = paramBytes
|
||||||
|
} else {
|
||||||
|
jsonBody, err = json.Marshal(body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, 0, errors.Wrap(err, "error marshalling params to JSON")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
jsonBody = nil
|
||||||
|
}
|
||||||
|
var reqBody io.Reader
|
||||||
|
if jsonBody != nil {
|
||||||
|
reqBody = bytes.NewReader(jsonBody)
|
||||||
|
}
|
||||||
|
req, err := http.NewRequestWithContext(ctx, method, apiBaseURL+path, reqBody)
|
||||||
|
for key, value := range headers {
|
||||||
|
req.Header[key] = value
|
||||||
|
}
|
||||||
|
if token, found := w.Options.Values().Get("CF_API_TOKEN"); found {
|
||||||
|
req.Header.Set("Authorization", "Bearer "+token.(string))
|
||||||
|
}
|
||||||
|
req.Header.Set("User-Agent", "micro/1.0 (https://micro.mu)")
|
||||||
|
|
||||||
|
// Official cloudflare client does exponential backoff here
|
||||||
|
resp, err := w.httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, 0, err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
respBody, err := ioutil.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return respBody, resp.Header, resp.StatusCode, err
|
||||||
|
}
|
||||||
|
return respBody, resp.Header, resp.StatusCode, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// APIResponse is a cloudflare v4 api response
|
||||||
|
type APIResponse struct {
|
||||||
|
Result []struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
Type string `json:"type"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
Expiration string `json:"expiration"`
|
||||||
|
Content string `json:"content"`
|
||||||
|
Proxiable bool `json:"proxiable"`
|
||||||
|
Proxied bool `json:"proxied"`
|
||||||
|
TTL int `json:"ttl"`
|
||||||
|
Priority int `json:"priority"`
|
||||||
|
Locked bool `json:"locked"`
|
||||||
|
ZoneID string `json:"zone_id"`
|
||||||
|
ZoneName string `json:"zone_name"`
|
||||||
|
ModifiedOn time.Time `json:"modified_on"`
|
||||||
|
CreatedOn time.Time `json:"created_on"`
|
||||||
|
} `json:"result"`
|
||||||
|
Success bool `json:"success"`
|
||||||
|
Errors []APIMessage `json:"errors"`
|
||||||
|
// not sure Messages is ever populated?
|
||||||
|
Messages []APIMessage `json:"messages"`
|
||||||
|
ResultInfo struct {
|
||||||
|
Page int `json:"page"`
|
||||||
|
PerPage int `json:"per_page"`
|
||||||
|
Count int `json:"count"`
|
||||||
|
TotalCount int `json:"total_count"`
|
||||||
|
} `json:"result_info"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// APIMessage is a Cloudflare v4 API Error
|
||||||
|
type APIMessage struct {
|
||||||
|
Code int `json:"code"`
|
||||||
|
Message string `json:"message"`
|
||||||
|
}
|
||||||
|
@ -31,15 +31,24 @@ func TestCloudflare(t *testing.T) {
|
|||||||
t.Fatal(err.Error())
|
t.Fatal(err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = wkv.Sync()
|
records, err := wkv.Sync()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Sync: %s\n", err.Error())
|
t.Fatalf("Sync: %s\n", err.Error())
|
||||||
|
} else {
|
||||||
|
t.Log("Synced " + strconv.Itoa(len(records)) + " records")
|
||||||
}
|
}
|
||||||
|
|
||||||
err = wkv.Write(&store.Record{
|
err = wkv.Write(
|
||||||
|
&store.Record{
|
||||||
Key: randomK,
|
Key: randomK,
|
||||||
Value: []byte(randomV),
|
Value: []byte(randomV),
|
||||||
})
|
},
|
||||||
|
&store.Record{
|
||||||
|
Key: "expirationtest",
|
||||||
|
Value: []byte("This message will self destruct"),
|
||||||
|
Expiry: 75 * time.Second,
|
||||||
|
},
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Write: %s", err.Error())
|
t.Errorf("Write: %s", err.Error())
|
||||||
}
|
}
|
||||||
@ -58,6 +67,23 @@ func TestCloudflare(t *testing.T) {
|
|||||||
t.Errorf("Read: expected %s, got %s\n", randomK, string(r[0].Value))
|
t.Errorf("Read: expected %s, got %s\n", randomK, string(r[0].Value))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
r, err = wkv.Read("expirationtest")
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Read: expirationtest should still exist")
|
||||||
|
}
|
||||||
|
if r[0].Expiry == 0 {
|
||||||
|
t.Error("Expected r to have an expiry")
|
||||||
|
} else {
|
||||||
|
t.Log(r[0].Expiry)
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(20 * time.Second)
|
||||||
|
r, err = wkv.Read("expirationtest")
|
||||||
|
if err == nil && len(r) != 0 {
|
||||||
|
t.Error("Read: Managed to read expirationtest, but it should have expired")
|
||||||
|
t.Log(err, r[0].Key, string(r[0].Value), r[0].Expiry, len(r))
|
||||||
|
}
|
||||||
|
|
||||||
err = wkv.Delete(randomK)
|
err = wkv.Delete(randomK)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Delete: %s\n", err.Error())
|
t.Errorf("Delete: %s\n", err.Error())
|
||||||
|
Loading…
Reference in New Issue
Block a user