refactor(datastore/fetch): Makes more failure proof fetching user-data files
- Adds URL validations - Adds timeout support for http client - Limits the amount of retries to not spin forever - Fails faster if response status code is 4xx - Does a little bit more of logging - Adds more tests
This commit is contained in:
parent
0a16532d4b
commit
ebf134f181
@ -1,53 +1,108 @@
|
|||||||
package datasource
|
package datasource
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"log"
|
"log"
|
||||||
"math"
|
"math"
|
||||||
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
neturl "net/url"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
const maxTimeout = time.Second * 5
|
const (
|
||||||
|
HTTP_2xx = 2
|
||||||
|
HTTP_4xx = 4
|
||||||
|
|
||||||
|
maxTimeout = time.Second * 5
|
||||||
|
maxRetries = 15
|
||||||
|
)
|
||||||
|
|
||||||
type Datasource interface {
|
type Datasource interface {
|
||||||
Fetch() ([]byte, error)
|
Fetch() ([]byte, error)
|
||||||
Type() string
|
Type() string
|
||||||
}
|
}
|
||||||
|
|
||||||
func fetchURL(url string) ([]byte, error) {
|
// HTTP client timeout
|
||||||
resp, err := getWithExponentialBackoff(url)
|
// This one is low since exponential backoff will kick off too.
|
||||||
if err != nil {
|
var timeout = time.Duration(2) * time.Second
|
||||||
return []byte{}, err
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
respBytes, err := ioutil.ReadAll(resp.Body)
|
func dialTimeout(network, addr string) (net.Conn, error) {
|
||||||
|
deadline := time.Now().Add(timeout)
|
||||||
|
c, err := net.DialTimeout(network, addr, timeout)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
c.SetDeadline(deadline)
|
||||||
|
return c, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetches user-data url with support for
|
||||||
|
// exponential backoff and maximum retries
|
||||||
|
func fetchURL(rawurl string) ([]byte, error) {
|
||||||
|
if rawurl == "" {
|
||||||
|
return nil, errors.New("user-data URL is empty. Skipping.")
|
||||||
|
}
|
||||||
|
|
||||||
|
url, err := neturl.Parse(rawurl)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return respBytes, nil
|
// Unfortunately, url.Parse
|
||||||
}
|
// is too generic to throw errors
|
||||||
|
// if a URL does not have a valid HTTP scheme.
|
||||||
|
// So we have to do this extra validation
|
||||||
|
if !strings.HasPrefix(url.Scheme, "http") {
|
||||||
|
return nil, fmt.Errorf("user-data URL %s, does not have a valid HTTP scheme. Skipping.", rawurl)
|
||||||
|
}
|
||||||
|
|
||||||
// getWithExponentialBackoff issues a GET to the specified URL. If the
|
userdataURL := url.String()
|
||||||
// response is a non-2xx or produces an error, retry the GET forever using
|
|
||||||
// an exponential backoff.
|
// We need to create our own client in order to
|
||||||
func getWithExponentialBackoff(url string) (*http.Response, error) {
|
// add timeout support.
|
||||||
var err error
|
// TODO(c4milo) Replace it once Go 1.3 is officially used by CoreOS
|
||||||
var resp *http.Response
|
// More info: https://code.google.com/p/go/source/detail?r=ada6f2d5f99f
|
||||||
for i := 0; ; i++ {
|
transport := &http.Transport{
|
||||||
resp, err = http.Get(url)
|
Dial: dialTimeout,
|
||||||
if err == nil && resp.StatusCode/100 == 2 {
|
}
|
||||||
return resp, nil
|
|
||||||
|
client := &http.Client{
|
||||||
|
Transport: transport,
|
||||||
|
}
|
||||||
|
|
||||||
|
for retry := 0; retry <= maxRetries; retry++ {
|
||||||
|
log.Printf("Fetching user-data from %s. Attempt #%d", userdataURL, retry)
|
||||||
|
|
||||||
|
resp, err := client.Get(userdataURL)
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
defer resp.Body.Close()
|
||||||
|
status := resp.StatusCode / 100
|
||||||
|
|
||||||
|
if status == HTTP_2xx {
|
||||||
|
return ioutil.ReadAll(resp.Body)
|
||||||
|
}
|
||||||
|
|
||||||
|
if status == HTTP_4xx {
|
||||||
|
return nil, fmt.Errorf("user-data not found. HTTP status code: %d", resp.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("user-data not found. HTTP status code: %d", resp.StatusCode)
|
||||||
|
} else {
|
||||||
|
log.Printf("unable to fetch user-data: %s", err.Error())
|
||||||
}
|
}
|
||||||
duration := time.Millisecond * time.Duration((math.Pow(float64(2), float64(i)) * 100))
|
|
||||||
|
duration := time.Millisecond * time.Duration((math.Pow(float64(2), float64(retry)) * 100))
|
||||||
if duration > maxTimeout {
|
if duration > maxTimeout {
|
||||||
duration = maxTimeout
|
duration = maxTimeout
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("unable to fetch user-data from %s, try again in %s", url, duration)
|
|
||||||
time.Sleep(duration)
|
time.Sleep(duration)
|
||||||
}
|
}
|
||||||
return resp, err
|
|
||||||
|
return nil, fmt.Errorf("unable to fetch user-data. Maximum retries reached: %d", maxRetries)
|
||||||
}
|
}
|
||||||
|
@ -17,7 +17,10 @@ var expBackoffTests = []struct {
|
|||||||
{2, "number of attempts: 2"},
|
{2, "number of attempts: 2"},
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGetWithExponentialBackoff(t *testing.T) {
|
// Test exponential backoff
|
||||||
|
// and that it continues retrying if a 5xx
|
||||||
|
// response is received
|
||||||
|
func TestFetchURLExpBackOff(t *testing.T) {
|
||||||
for i, tt := range expBackoffTests {
|
for i, tt := range expBackoffTests {
|
||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
count := 0
|
count := 0
|
||||||
@ -31,15 +34,87 @@ func TestGetWithExponentialBackoff(t *testing.T) {
|
|||||||
})
|
})
|
||||||
ts := httptest.NewServer(mux)
|
ts := httptest.NewServer(mux)
|
||||||
defer ts.Close()
|
defer ts.Close()
|
||||||
|
|
||||||
data, err := fetchURL(ts.URL)
|
data, err := fetchURL(ts.URL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Test case %d produced error: %v", i, err)
|
t.Errorf("Test case %d produced error: %v", i, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if count != tt.count {
|
if count != tt.count {
|
||||||
t.Errorf("Test case %d failed: %d != %d", i, count, tt.count)
|
t.Errorf("Test case %d failed: %d != %d", i, count, tt.count)
|
||||||
}
|
}
|
||||||
|
|
||||||
if string(data) != tt.body {
|
if string(data) != tt.body {
|
||||||
t.Errorf("Test case %d failed: %s != %s", i, tt.body, data)
|
t.Errorf("Test case %d failed: %s != %s", i, tt.body, data)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test that it stops retrying if a 4xx response comes back
|
||||||
|
func TestFetchURL4xx(t *testing.T) {
|
||||||
|
retries := 0
|
||||||
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
retries++
|
||||||
|
http.Error(w, "", 404)
|
||||||
|
}))
|
||||||
|
defer ts.Close()
|
||||||
|
|
||||||
|
_, err := fetchURL(ts.URL)
|
||||||
|
if err == nil {
|
||||||
|
t.Errorf("Incorrect result\ngot: %s\nwant: %s", err.Error(), "user-data not found. HTTP status code: 404")
|
||||||
|
}
|
||||||
|
|
||||||
|
if retries > 1 {
|
||||||
|
t.Errorf("Number of retries:\n%d\nExpected number of retries:\n%s", retries, 1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test that it fetches and returns user-data just fine
|
||||||
|
func TestFetchURL2xx(t *testing.T) {
|
||||||
|
var cloudcfg = `
|
||||||
|
#cloud-config
|
||||||
|
coreos:
|
||||||
|
oem:
|
||||||
|
id: test
|
||||||
|
name: CoreOS.box for Test
|
||||||
|
version-id: %VERSION_ID%+%BUILD_ID%
|
||||||
|
home-url: https://github.com/coreos/coreos-cloudinit
|
||||||
|
bug-report-url: https://github.com/coreos/coreos-cloudinit
|
||||||
|
update:
|
||||||
|
reboot-strategy: best-effort
|
||||||
|
`
|
||||||
|
|
||||||
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
fmt.Fprint(w, cloudcfg)
|
||||||
|
}))
|
||||||
|
defer ts.Close()
|
||||||
|
|
||||||
|
data, err := fetchURL(ts.URL)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Incorrect result\ngot: %v\nwant: %v", err, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
if string(data) != cloudcfg {
|
||||||
|
t.Errorf("Incorrect result\ngot: %s\nwant: %s", string(data), cloudcfg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test attempt to fetching using malformed URL
|
||||||
|
func TestFetchURLMalformed(t *testing.T) {
|
||||||
|
var tests = []struct {
|
||||||
|
url string
|
||||||
|
want string
|
||||||
|
}{
|
||||||
|
{"boo", "user-data URL boo, does not have a valid HTTP scheme. Skipping."},
|
||||||
|
{"mailto://boo", "user-data URL mailto://boo, does not have a valid HTTP scheme. Skipping."},
|
||||||
|
{"ftp://boo", "user-data URL ftp://boo, does not have a valid HTTP scheme. Skipping."},
|
||||||
|
{"", "user-data URL is empty. Skipping."},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
_, err := fetchURL(test.url)
|
||||||
|
if err != nil && err.Error() != test.want {
|
||||||
|
t.Errorf("Incorrect result\ngot: %v\nwant: %v", err, test.want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user