Merge pull request #114 from c4milo/fetch-url-refactor
refactor(datastore/fetch): Makes more failure proof fetching user-data files.
This commit is contained in:
		| @@ -1,53 +1,104 @@ | ||||
| package datasource | ||||
|  | ||||
| import ( | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"io/ioutil" | ||||
| 	"log" | ||||
| 	"math" | ||||
| 	"net" | ||||
| 	"net/http" | ||||
| 	neturl "net/url" | ||||
| 	"strings" | ||||
| 	"time" | ||||
| ) | ||||
|  | ||||
| const maxTimeout = time.Second * 5 | ||||
| const ( | ||||
| 	HTTP_2xx = 2 | ||||
| 	HTTP_4xx = 4 | ||||
|  | ||||
| 	maxTimeout = time.Second * 5 | ||||
| 	maxRetries = 15 | ||||
| ) | ||||
|  | ||||
| type Datasource interface { | ||||
| 	Fetch() ([]byte, error) | ||||
| 	Type() string | ||||
| } | ||||
|  | ||||
| func fetchURL(url string) ([]byte, error) { | ||||
| 	resp, err := getWithExponentialBackoff(url) | ||||
| 	if err != nil { | ||||
| 		return []byte{}, err | ||||
| 	} | ||||
| 	defer resp.Body.Close() | ||||
| // HTTP client timeout | ||||
| // This one is low since exponential backoff will kick off too. | ||||
| var timeout = time.Duration(2) * time.Second | ||||
|  | ||||
| 	respBytes, err := ioutil.ReadAll(resp.Body) | ||||
| func dialTimeout(network, addr string) (net.Conn, error) { | ||||
| 	deadline := time.Now().Add(timeout) | ||||
| 	c, err := net.DialTimeout(network, addr, timeout) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	c.SetDeadline(deadline) | ||||
| 	return c, nil | ||||
| } | ||||
|  | ||||
| // Fetches user-data url with support for exponential backoff and maximum retries | ||||
| func fetchURL(rawurl string) ([]byte, error) { | ||||
| 	if rawurl == "" { | ||||
| 		return nil, errors.New("user-data URL is empty. Skipping.") | ||||
| 	} | ||||
|  | ||||
| 	url, err := neturl.Parse(rawurl) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	return respBytes, nil | ||||
| } | ||||
| 	// Unfortunately, url.Parse is too generic to throw errors if a URL does not | ||||
| 	// have a valid HTTP scheme. So, we have to do this extra validation | ||||
| 	if !strings.HasPrefix(url.Scheme, "http") { | ||||
| 		return nil, fmt.Errorf("user-data URL %s does not have a valid HTTP scheme. Skipping.", rawurl) | ||||
| 	} | ||||
|  | ||||
| // getWithExponentialBackoff issues a GET to the specified URL. If the | ||||
| // response is a non-2xx or produces an error, retry the GET forever using | ||||
| // an exponential backoff. | ||||
| func getWithExponentialBackoff(url string) (*http.Response, error) { | ||||
| 	var err error | ||||
| 	var resp *http.Response | ||||
| 	for i := 0; ; i++ { | ||||
| 		resp, err = http.Get(url) | ||||
| 		if err == nil && resp.StatusCode/100 == 2 { | ||||
| 			return resp, nil | ||||
| 	userdataURL := url.String() | ||||
|  | ||||
| 	// We need to create our own client in order to add timeout support. | ||||
| 	// TODO(c4milo) Replace it once Go 1.3 is officially used by CoreOS | ||||
| 	// More info: https://code.google.com/p/go/source/detail?r=ada6f2d5f99f | ||||
| 	transport := &http.Transport{ | ||||
| 		Dial: dialTimeout, | ||||
| 	} | ||||
|  | ||||
| 	client := &http.Client{ | ||||
| 		Transport: transport, | ||||
| 	} | ||||
|  | ||||
| 	for retry := 1; retry <= maxRetries; retry++ { | ||||
| 		log.Printf("Fetching user-data from %s. Attempt #%d", userdataURL, retry) | ||||
|  | ||||
| 		resp, err := client.Get(userdataURL) | ||||
|  | ||||
| 		if err == nil { | ||||
| 			defer resp.Body.Close() | ||||
| 			status := resp.StatusCode / 100 | ||||
|  | ||||
| 			if status == HTTP_2xx { | ||||
| 				return ioutil.ReadAll(resp.Body) | ||||
| 			} | ||||
|  | ||||
| 			if status == HTTP_4xx { | ||||
| 				return nil, fmt.Errorf("user-data not found. HTTP status code: %d", resp.StatusCode) | ||||
| 			} | ||||
|  | ||||
| 			log.Printf("user-data not found. HTTP status code: %d", resp.StatusCode) | ||||
| 		} else { | ||||
| 			log.Printf("unable to fetch user-data: %s", err.Error()) | ||||
| 		} | ||||
| 		duration := time.Millisecond * time.Duration((math.Pow(float64(2), float64(i)) * 100)) | ||||
|  | ||||
| 		duration := time.Millisecond * time.Duration((math.Pow(float64(2), float64(retry)) * 100)) | ||||
| 		if duration > maxTimeout { | ||||
| 			duration = maxTimeout | ||||
| 		} | ||||
|  | ||||
| 		log.Printf("unable to fetch user-data from %s, try again in %s", url, duration) | ||||
| 		time.Sleep(duration) | ||||
| 	} | ||||
| 	return resp, err | ||||
|  | ||||
| 	return nil, fmt.Errorf("unable to fetch user-data. Maximum retries reached: %d", maxRetries) | ||||
| } | ||||
|   | ||||
| @@ -17,7 +17,9 @@ var expBackoffTests = []struct { | ||||
| 	{2, "number of attempts: 2"}, | ||||
| } | ||||
|  | ||||
| func TestGetWithExponentialBackoff(t *testing.T) { | ||||
| // Test exponential backoff and that it continues retrying if a 5xx response is | ||||
| // received | ||||
| func TestFetchURLExpBackOff(t *testing.T) { | ||||
| 	for i, tt := range expBackoffTests { | ||||
| 		mux := http.NewServeMux() | ||||
| 		count := 0 | ||||
| @@ -31,15 +33,87 @@ func TestGetWithExponentialBackoff(t *testing.T) { | ||||
| 		}) | ||||
| 		ts := httptest.NewServer(mux) | ||||
| 		defer ts.Close() | ||||
|  | ||||
| 		data, err := fetchURL(ts.URL) | ||||
| 		if err != nil { | ||||
| 			t.Errorf("Test case %d produced error: %v", i, err) | ||||
| 		} | ||||
|  | ||||
| 		if count != tt.count { | ||||
| 			t.Errorf("Test case %d failed: %d != %d", i, count, tt.count) | ||||
| 		} | ||||
|  | ||||
| 		if string(data) != tt.body { | ||||
| 			t.Errorf("Test case %d failed: %s != %s", i, tt.body, data) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Test that it stops retrying if a 4xx response comes back | ||||
| func TestFetchURL4xx(t *testing.T) { | ||||
| 	retries := 0 | ||||
| 	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||||
| 		retries++ | ||||
| 		http.Error(w, "", 404) | ||||
| 	})) | ||||
| 	defer ts.Close() | ||||
|  | ||||
| 	_, err := fetchURL(ts.URL) | ||||
| 	if err == nil { | ||||
| 		t.Errorf("Incorrect result\ngot:  %s\nwant: %s", err.Error(), "user-data not found. HTTP status code: 404") | ||||
| 	} | ||||
|  | ||||
| 	if retries > 1 { | ||||
| 		t.Errorf("Number of retries:\n%d\nExpected number of retries:\n%s", retries, 1) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Test that it fetches and returns user-data just fine | ||||
| func TestFetchURL2xx(t *testing.T) { | ||||
| 	var cloudcfg = ` | ||||
| #cloud-config | ||||
| coreos:  | ||||
| 	oem: | ||||
| 	    id: test | ||||
| 	    name: CoreOS.box for Test | ||||
| 	    version-id: %VERSION_ID%+%BUILD_ID% | ||||
| 	    home-url: https://github.com/coreos/coreos-cloudinit | ||||
| 	    bug-report-url: https://github.com/coreos/coreos-cloudinit | ||||
| 	update: | ||||
| 		reboot-strategy: best-effort | ||||
| ` | ||||
|  | ||||
| 	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||||
| 		fmt.Fprint(w, cloudcfg) | ||||
| 	})) | ||||
| 	defer ts.Close() | ||||
|  | ||||
| 	data, err := fetchURL(ts.URL) | ||||
| 	if err != nil { | ||||
| 		t.Errorf("Incorrect result\ngot:  %v\nwant: %v", err, nil) | ||||
| 	} | ||||
|  | ||||
| 	if string(data) != cloudcfg { | ||||
| 		t.Errorf("Incorrect result\ngot:  %s\nwant: %s", string(data), cloudcfg) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Test attempt to fetching using malformed URL | ||||
| func TestFetchURLMalformed(t *testing.T) { | ||||
| 	var tests = []struct { | ||||
| 		url  string | ||||
| 		want string | ||||
| 	}{ | ||||
| 		{"boo", "user-data URL boo does not have a valid HTTP scheme. Skipping."}, | ||||
| 		{"mailto://boo", "user-data URL mailto://boo does not have a valid HTTP scheme. Skipping."}, | ||||
| 		{"ftp://boo", "user-data URL ftp://boo does not have a valid HTTP scheme. Skipping."}, | ||||
| 		{"", "user-data URL is empty. Skipping."}, | ||||
| 	} | ||||
|  | ||||
| 	for _, test := range tests { | ||||
| 		_, err := fetchURL(test.url) | ||||
| 		if err == nil || err.Error() != test.want { | ||||
| 			t.Errorf("Incorrect result\ngot:  %v\nwant: %v", err, test.want) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user