From 4f5db082387ed96f510e5324e962c5831c1a75e5 Mon Sep 17 00:00:00 2001 From: Jake Sanders Date: Wed, 23 Oct 2019 17:26:34 +0100 Subject: [PATCH] Remove cloudflare-go and reimplement workers KV --- store/cloudflare/cloudflare.go | 197 +++++++++++++++++++++++++--- store/cloudflare/cloudflare_test.go | 36 ++++- 2 files changed, 207 insertions(+), 26 deletions(-) diff --git a/store/cloudflare/cloudflare.go b/store/cloudflare/cloudflare.go index a35302dd..61bf592a 100644 --- a/store/cloudflare/cloudflare.go +++ b/store/cloudflare/cloudflare.go @@ -3,20 +3,29 @@ package cloudflare import ( + "bytes" "context" + "encoding/json" + "fmt" + "io" + "io/ioutil" "log" + "math" + "net/http" + "net/url" + "strconv" "time" - "github.com/cloudflare/cloudflare-go" "github.com/micro/go-micro/config/options" "github.com/micro/go-micro/store" + "github.com/pkg/errors" ) -var namespaceUUID string +const apiBaseURL = "https://api.cloudflare.com/client/v4/" type workersKV struct { options.Options - api *cloudflare.API + httpClient *http.Client } // New returns a cloudflare Store implementation. @@ -30,7 +39,7 @@ func New(opts ...options.Option) (store.Store, error) { if !ok { log.Fatal("Store: No CF_API_TOKEN passed as an option") } - apiTokenString, ok := apiToken.(string) + _, ok = apiToken.(string) if !ok { log.Fatal("Store: Option CF_API_TOKEN contains a non-string") } @@ -38,7 +47,7 @@ func New(opts ...options.Option) (store.Store, error) { if !ok { log.Fatal("Store: No CF_ACCOUNT_ID passed as an option") } - accountIDString, ok := accountID.(string) + _, ok = accountID.(string) if !ok { log.Fatal("Store: Option CF_ACCOUNT_ID contains a non-string") } @@ -46,31 +55,45 @@ func New(opts ...options.Option) (store.Store, error) { if !ok { log.Fatal("Store: No KV_NAMESPACE_ID passed as an option") } - namespaceUUID, ok = uuid.(string) + _, ok = uuid.(string) if !ok { log.Fatal("Store: Option KV_NAMESPACE_ID contains a non-string") } - // Create API client - api, err := cloudflare.NewWithAPIToken(apiTokenString, cloudflare.UsingAccount(accountIDString)) - if err != nil { - return nil, err - } return &workersKV{ - Options: options, - api: api, + Options: options, + httpClient: &http.Client{}, }, nil } // In the cloudflare workers KV implemention, Sync() doesn't guarantee // anything as the workers API is eventually consistent. func (w *workersKV) Sync() ([]*store.Record, error) { - response, err := w.api.ListWorkersKVs(context.Background(), namespaceUUID) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + accountID, _ := w.Options.Values().Get("CF_ACCOUNT_ID") + kvID, _ := w.Options.Values().Get("KV_NAMESPACE_ID") + + path := fmt.Sprintf("accounts/%s/storage/kv/namespaces/%s/keys", accountID.(string), kvID.(string)) + response, _, _, err := w.request(ctx, http.MethodGet, path, nil, make(http.Header)) if err != nil { return nil, err } + a := &APIResponse{} + if err := json.Unmarshal(response, a); err != nil { + return nil, err + } + if !a.Success { + messages := "" + for _, m := range a.Errors { + messages += strconv.Itoa(m.Code) + " " + m.Message + "\n" + } + return nil, errors.New(messages) + } + var keys []string - for _, r := range response.Result { + for _, r := range a.Result { keys = append(keys, r.Name) } return w.Read(keys...) @@ -80,16 +103,31 @@ func (w *workersKV) Read(keys ...string) ([]*store.Record, error) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() + accountID, _ := w.Options.Values().Get("CF_ACCOUNT_ID") + kvID, _ := w.Options.Values().Get("KV_NAMESPACE_ID") + var records []*store.Record for _, k := range keys { - v, err := w.api.ReadWorkersKV(ctx, namespaceUUID, k) + path := fmt.Sprintf("accounts/%s/storage/kv/namespaces/%s/values/%s", accountID.(string), kvID.(string), url.PathEscape(k)) + response, headers, status, err := w.request(ctx, http.MethodGet, path, nil, make(http.Header)) if err != nil { return records, err } - records = append(records, &store.Record{ + if status < 200 || status >= 300 { + return records, errors.New("Received unexpected Status " + strconv.Itoa(status) + string(response)) + } + record := &store.Record{ Key: k, - Value: v, - }) + Value: response, + } + if expiry := headers.Get("Expiration"); len(expiry) != 0 { + expiryUnix, err := strconv.ParseInt(expiry, 10, 64) + if err != nil { + return records, err + } + record.Expiry = time.Until(time.Unix(expiryUnix, 0)) + } + records = append(records, record) } return records, nil } @@ -98,10 +136,32 @@ func (w *workersKV) Write(records ...*store.Record) error { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() + accountID, _ := w.Options.Values().Get("CF_ACCOUNT_ID") + kvID, _ := w.Options.Values().Get("KV_NAMESPACE_ID") + for _, r := range records { - if _, err := w.api.WriteWorkersKV(ctx, namespaceUUID, r.Key, r.Value); err != nil { + path := fmt.Sprintf("accounts/%s/storage/kv/namespaces/%s/values/%s", accountID.(string), kvID.(string), url.PathEscape(r.Key)) + if r.Expiry != 0 { + // Minimum cloudflare TTL is 60 Seconds + exp := int(math.Max(60, math.Round(r.Expiry.Seconds()))) + path = path + "?expiration_ttl=" + strconv.Itoa(exp) + } + headers := make(http.Header) + resp, _, _, err := w.request(ctx, http.MethodPut, path, r.Value, headers) + if err != nil { return err } + a := &APIResponse{} + if err := json.Unmarshal(resp, a); err != nil { + return err + } + if !a.Success { + messages := "" + for _, m := range a.Errors { + messages += strconv.Itoa(m.Code) + " " + m.Message + "\n" + } + return errors.New(messages) + } } return nil } @@ -110,10 +170,105 @@ func (w *workersKV) Delete(keys ...string) error { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() + accountID, _ := w.Options.Values().Get("CF_ACCOUNT_ID") + kvID, _ := w.Options.Values().Get("KV_NAMESPACE_ID") + for _, k := range keys { - if _, err := w.api.DeleteWorkersKV(ctx, namespaceUUID, k); err != nil { + path := fmt.Sprintf("accounts/%s/storage/kv/namespaces/%s/values/%s", accountID.(string), kvID.(string), url.PathEscape(k)) + resp, _, _, err := w.request(ctx, http.MethodDelete, path, nil, make(http.Header)) + if err != nil { return err } + + a := &APIResponse{} + if err := json.Unmarshal(resp, a); err != nil { + return err + } + if !a.Success { + messages := "" + for _, m := range a.Errors { + messages += strconv.Itoa(m.Code) + " " + m.Message + "\n" + } + return errors.New(messages) + } } return nil } + +func (w *workersKV) request(ctx context.Context, method, path string, body interface{}, headers http.Header) ([]byte, http.Header, int, error) { + var jsonBody []byte + var err error + + if body != nil { + if paramBytes, ok := body.([]byte); ok { + jsonBody = paramBytes + } else { + jsonBody, err = json.Marshal(body) + if err != nil { + return nil, nil, 0, errors.Wrap(err, "error marshalling params to JSON") + } + } + } else { + jsonBody = nil + } + var reqBody io.Reader + if jsonBody != nil { + reqBody = bytes.NewReader(jsonBody) + } + req, err := http.NewRequestWithContext(ctx, method, apiBaseURL+path, reqBody) + for key, value := range headers { + req.Header[key] = value + } + if token, found := w.Options.Values().Get("CF_API_TOKEN"); found { + req.Header.Set("Authorization", "Bearer "+token.(string)) + } + req.Header.Set("User-Agent", "micro/1.0 (https://micro.mu)") + + // Official cloudflare client does exponential backoff here + resp, err := w.httpClient.Do(req) + if err != nil { + return nil, nil, 0, err + } + defer resp.Body.Close() + respBody, err := ioutil.ReadAll(resp.Body) + if err != nil { + return respBody, resp.Header, resp.StatusCode, err + } + return respBody, resp.Header, resp.StatusCode, nil +} + +// APIResponse is a cloudflare v4 api response +type APIResponse struct { + Result []struct { + ID string `json:"id"` + Type string `json:"type"` + Name string `json:"name"` + Expiration string `json:"expiration"` + Content string `json:"content"` + Proxiable bool `json:"proxiable"` + Proxied bool `json:"proxied"` + TTL int `json:"ttl"` + Priority int `json:"priority"` + Locked bool `json:"locked"` + ZoneID string `json:"zone_id"` + ZoneName string `json:"zone_name"` + ModifiedOn time.Time `json:"modified_on"` + CreatedOn time.Time `json:"created_on"` + } `json:"result"` + Success bool `json:"success"` + Errors []APIMessage `json:"errors"` + // not sure Messages is ever populated? + Messages []APIMessage `json:"messages"` + ResultInfo struct { + Page int `json:"page"` + PerPage int `json:"per_page"` + Count int `json:"count"` + TotalCount int `json:"total_count"` + } `json:"result_info"` +} + +// APIMessage is a Cloudflare v4 API Error +type APIMessage struct { + Code int `json:"code"` + Message string `json:"message"` +} diff --git a/store/cloudflare/cloudflare_test.go b/store/cloudflare/cloudflare_test.go index faf4465f..ed73e796 100644 --- a/store/cloudflare/cloudflare_test.go +++ b/store/cloudflare/cloudflare_test.go @@ -31,15 +31,24 @@ func TestCloudflare(t *testing.T) { t.Fatal(err.Error()) } - _, err = wkv.Sync() + records, err := wkv.Sync() if err != nil { t.Fatalf("Sync: %s\n", err.Error()) + } else { + t.Log("Synced " + strconv.Itoa(len(records)) + " records") } - err = wkv.Write(&store.Record{ - Key: randomK, - Value: []byte(randomV), - }) + err = wkv.Write( + &store.Record{ + Key: randomK, + Value: []byte(randomV), + }, + &store.Record{ + Key: "expirationtest", + Value: []byte("This message will self destruct"), + Expiry: 75 * time.Second, + }, + ) if err != nil { t.Errorf("Write: %s", err.Error()) } @@ -58,6 +67,23 @@ func TestCloudflare(t *testing.T) { t.Errorf("Read: expected %s, got %s\n", randomK, string(r[0].Value)) } + r, err = wkv.Read("expirationtest") + if err != nil { + t.Errorf("Read: expirationtest should still exist") + } + if r[0].Expiry == 0 { + t.Error("Expected r to have an expiry") + } else { + t.Log(r[0].Expiry) + } + + time.Sleep(20 * time.Second) + r, err = wkv.Read("expirationtest") + if err == nil && len(r) != 0 { + t.Error("Read: Managed to read expirationtest, but it should have expired") + t.Log(err, r[0].Key, string(r[0].Value), r[0].Expiry, len(r)) + } + err = wkv.Delete(randomK) if err != nil { t.Errorf("Delete: %s\n", err.Error())