From 68dc902ed14d31d66ee509797ca10da131c1ea2e Mon Sep 17 00:00:00 2001 From: Alex Crawford Date: Thu, 26 Jun 2014 14:58:32 -0700 Subject: [PATCH 1/2] HttpClient: Refactor timeout into two seperate functions --- datasource/metadata_service.go | 10 +-- datasource/metadata_service_test.go | 2 +- datasource/proc_cmdline.go | 2 +- datasource/url.go | 2 +- initialize/ssh_keys.go | 2 +- pkg/http_client.go | 109 ++++++++++++++++------------ pkg/http_client_test.go | 8 +- 7 files changed, 77 insertions(+), 58 deletions(-) diff --git a/datasource/metadata_service.go b/datasource/metadata_service.go index 06d2412..956d2b9 100644 --- a/datasource/metadata_service.go +++ b/datasource/metadata_service.go @@ -32,7 +32,7 @@ const ( type metadataService struct{} type getter interface { - Get(string) ([]byte, error) + GetRetry(string) ([]byte, error) } func NewMetadataService() *metadataService { @@ -49,12 +49,12 @@ func (ms *metadataService) FetchMetadata() ([]byte, error) { func (ms *metadataService) FetchUserdata() ([]byte, error) { client := pkg.NewHttpClient() - if data, err := client.Get(Ec2UserdataUrl); err == nil { + if data, err := client.GetRetry(Ec2UserdataUrl); err == nil { return data, err } else if _, ok := err.(pkg.ErrTimeout); ok { return data, err } - return client.Get(OpenstackUserdataUrl) + return client.GetRetry(OpenstackUserdataUrl) } func (ms *metadataService) Type() string { @@ -62,7 +62,7 @@ func (ms *metadataService) Type() string { } func fetchMetadata(client getter) ([]byte, error) { - if metadata, err := client.Get(OpenstackMetadataUrl); err == nil { + if metadata, err := client.GetRetry(OpenstackMetadataUrl); err == nil { return metadata, nil } else if _, ok := err.(pkg.ErrTimeout); ok { return nil, err @@ -76,7 +76,7 @@ func fetchMetadata(client getter) ([]byte, error) { } func fetchAttributes(client getter, url string) ([]string, error) { - resp, err := client.Get(url) + resp, err := client.GetRetry(url) if err != nil { return nil, err } diff --git a/datasource/metadata_service_test.go b/datasource/metadata_service_test.go index 104f97e..58ee298 100644 --- a/datasource/metadata_service_test.go +++ b/datasource/metadata_service_test.go @@ -14,7 +14,7 @@ type TestHttpClient struct { err error } -func (t *TestHttpClient) Get(url string) ([]byte, error) { +func (t *TestHttpClient) GetRetry(url string) ([]byte, error) { if t.err != nil { return nil, t.err } diff --git a/datasource/proc_cmdline.go b/datasource/proc_cmdline.go index ce181ab..b91ea7d 100644 --- a/datasource/proc_cmdline.go +++ b/datasource/proc_cmdline.go @@ -43,7 +43,7 @@ func (c *procCmdline) FetchUserdata() ([]byte, error) { } client := pkg.NewHttpClient() - cfg, err := client.Get(url) + cfg, err := client.GetRetry(url) if err != nil { return nil, err } diff --git a/datasource/url.go b/datasource/url.go index 9fcc788..80f6fb1 100644 --- a/datasource/url.go +++ b/datasource/url.go @@ -20,7 +20,7 @@ func (f *remoteFile) FetchMetadata() ([]byte, error) { func (f *remoteFile) FetchUserdata() ([]byte, error) { client := pkg.NewHttpClient() - return client.Get(f.url) + return client.GetRetry(f.url) } func (f *remoteFile) Type() string { diff --git a/initialize/ssh_keys.go b/initialize/ssh_keys.go index 67870d8..3caf039 100644 --- a/initialize/ssh_keys.go +++ b/initialize/ssh_keys.go @@ -25,7 +25,7 @@ func SSHImportKeysFromURL(system_user string, url string) error { func fetchUserKeys(url string) ([]string, error) { client := pkg.NewHttpClient() - data, err := client.Get(url) + data, err := client.GetRetry(url) if err != nil { return nil, err } diff --git a/pkg/http_client.go b/pkg/http_client.go index 5152fef..095f64a 100644 --- a/pkg/http_client.go +++ b/pkg/http_client.go @@ -20,15 +20,23 @@ const ( type Err error -type ErrTimeout struct{ +type ErrTimeout struct { Err } -type ErrNotFound struct{ +type ErrNotFound struct { Err } -type ErrInvalid struct{ +type ErrInvalid struct { + Err +} + +type ErrServer struct { + Err +} + +type ErrNetwork struct { Err } @@ -45,15 +53,39 @@ type HttpClient struct { // Whether or not to skip TLS verification. Defaults to false SkipTLS bool + + client *http.Client } func NewHttpClient() *HttpClient { - return &HttpClient{ + hc := &HttpClient{ MaxBackoff: time.Second * 5, MaxRetries: 15, Timeout: time.Duration(2) * time.Second, SkipTLS: false, } + + // We need to create our own client in order to add timeout support. + // TODO(c4milo) Replace it once Go 1.3 is officially used by CoreOS + // More info: https://code.google.com/p/go/source/detail?r=ada6f2d5f99f + hc.client = &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: hc.SkipTLS, + }, + Dial: func(network, addr string) (net.Conn, error) { + deadline := time.Now().Add(hc.Timeout) + c, err := net.DialTimeout(network, addr, hc.Timeout) + if err != nil { + return nil, err + } + c.SetDeadline(deadline) + return c, nil + }, + }, + } + + return hc } func expBackoff(interval, max time.Duration) time.Duration { @@ -64,8 +96,8 @@ func expBackoff(interval, max time.Duration) time.Duration { return interval } -// Fetches a given URL with support for exponential backoff and maximum retries -func (h *HttpClient) Get(rawurl string) ([]byte, error) { +// GetRetry fetches a given URL with support for exponential backoff and maximum retries +func (h *HttpClient) GetRetry(rawurl string) ([]byte, error) { if rawurl == "" { return nil, ErrInvalid{errors.New("URL is empty. Skipping.")} } @@ -83,49 +115,20 @@ func (h *HttpClient) Get(rawurl string) ([]byte, error) { dataURL := url.String() - // We need to create our own client in order to add timeout support. - // TODO(c4milo) Replace it once Go 1.3 is officially used by CoreOS - // More info: https://code.google.com/p/go/source/detail?r=ada6f2d5f99f - transport := &http.Transport{ - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: h.SkipTLS, - }, - Dial: func(network, addr string) (net.Conn, error) { - deadline := time.Now().Add(h.Timeout) - c, err := net.DialTimeout(network, addr, h.Timeout) - if err != nil { - return nil, err - } - c.SetDeadline(deadline) - return c, nil - }, - } - - client := &http.Client{ - Transport: transport, - } - duration := 50 * time.Millisecond for retry := 1; retry <= h.MaxRetries; retry++ { log.Printf("Fetching data from %s. Attempt #%d", dataURL, retry) - resp, err := client.Get(dataURL) - - if err == nil { - defer resp.Body.Close() - status := resp.StatusCode / 100 - - if status == HTTP_2xx { - return ioutil.ReadAll(resp.Body) - } - - if status == HTTP_4xx { - return nil, ErrNotFound{fmt.Errorf("Not found. HTTP status code: %d", resp.StatusCode)} - } - - log.Printf("Server error. HTTP status code: %d", resp.StatusCode) - } else { - log.Printf("Unable to fetch data: %s", err.Error()) + data, err := h.Get(dataURL) + switch err.(type) { + case ErrNetwork: + log.Printf(err.Error()) + case ErrServer: + log.Printf(err.Error()) + case ErrNotFound: + return data, err + default: + return data, err } duration = expBackoff(duration, h.MaxBackoff) @@ -135,3 +138,19 @@ func (h *HttpClient) Get(rawurl string) ([]byte, error) { return nil, ErrTimeout{fmt.Errorf("Unable to fetch data. Maximum retries reached: %d", h.MaxRetries)} } + +func (h *HttpClient) Get(dataURL string) ([]byte, error) { + if resp, err := h.client.Get(dataURL); err == nil { + defer resp.Body.Close() + switch resp.StatusCode / 100 { + case HTTP_2xx: + return ioutil.ReadAll(resp.Body) + case HTTP_4xx: + return nil, ErrNotFound{fmt.Errorf("Not found. HTTP status code: %d", resp.StatusCode)} + default: + return nil, ErrServer{fmt.Errorf("Server error. HTTP status code: %d", resp.StatusCode)} + } + } else { + return nil, ErrNetwork{fmt.Errorf("Unable to fetch data: %s", err.Error())} + } +} diff --git a/pkg/http_client_test.go b/pkg/http_client_test.go index 4c246a0..2af87b4 100644 --- a/pkg/http_client_test.go +++ b/pkg/http_client_test.go @@ -51,7 +51,7 @@ func TestGetURLExpBackOff(t *testing.T) { ts := httptest.NewServer(mux) defer ts.Close() - data, err := client.Get(ts.URL) + data, err := client.GetRetry(ts.URL) if err != nil { t.Errorf("Test case %d produced error: %v", i, err) } @@ -76,7 +76,7 @@ func TestGetURL4xx(t *testing.T) { })) defer ts.Close() - _, err := client.Get(ts.URL) + _, err := client.GetRetry(ts.URL) if err == nil { t.Errorf("Incorrect result\ngot: %s\nwant: %s", err.Error(), "Not found. HTTP status code: 404") } @@ -107,7 +107,7 @@ coreos: })) defer ts.Close() - data, err := client.Get(ts.URL) + data, err := client.GetRetry(ts.URL) if err != nil { t.Errorf("Incorrect result\ngot: %v\nwant: %v", err, nil) } @@ -132,7 +132,7 @@ func TestGetMalformedURL(t *testing.T) { } for _, test := range tests { - _, err := client.Get(test.url) + _, err := client.GetRetry(test.url) if err == nil || err.Error() != test.want { t.Errorf("Incorrect result\ngot: %v\nwant: %v", err, test.want) } From c089216cb583b37b160e076c95b6a498cabe54c4 Mon Sep 17 00:00:00 2001 From: Alex Crawford Date: Thu, 26 Jun 2014 15:17:53 -0700 Subject: [PATCH 2/2] datasources: Add support for specifying multiple datasources If multiple sources are specified, the first available source is used. --- coreos-cloudinit.go | 91 ++++++++++++++++++++++++++-------- datasource/configdrive.go | 9 ++++ datasource/datasource.go | 2 + datasource/file.go | 10 ++++ datasource/metadata_service.go | 10 ++++ datasource/proc_cmdline.go | 15 ++++++ datasource/url.go | 10 ++++ pkg/http_client.go | 4 +- pkg/http_client_test.go | 2 +- 9 files changed, 130 insertions(+), 23 deletions(-) diff --git a/coreos-cloudinit.go b/coreos-cloudinit.go index 0f5c143..c3ed368 100644 --- a/coreos-cloudinit.go +++ b/coreos-cloudinit.go @@ -4,13 +4,21 @@ import ( "flag" "fmt" "os" + "sync" + "time" "github.com/coreos/coreos-cloudinit/datasource" "github.com/coreos/coreos-cloudinit/initialize" + "github.com/coreos/coreos-cloudinit/pkg" "github.com/coreos/coreos-cloudinit/system" ) -const version = "0.7.7+git" +const ( + version = "0.7.7+git" + datasourceInterval = 100 * time.Millisecond + datasourceMaxInterval = 30 * time.Second + datasourceTimeout = 5 * time.Minute +) var ( printVersion bool @@ -68,12 +76,18 @@ func main() { os.Exit(1) } - ds := getDatasource() - if ds == nil { - fmt.Println("Provide exactly one of --from-file, --from-configdrive, --from-metadata-service, --from-url or --from-proc-cmdline") + dss := getDatasources() + if len(dss) == 0 { + fmt.Println("Provide at least one of --from-file, --from-configdrive, --from-metadata-service, --from-url or --from-proc-cmdline") os.Exit(1) } + ds := selectDatasource(dss) + if ds == nil { + fmt.Println("No datasources available in time") + die() + } + fmt.Printf("Fetching user-data from datasource of type %q\n", ds.Type()) userdataBytes, err := ds.FetchUserdata() if err != nil { @@ -119,33 +133,70 @@ func main() { } } -func getDatasource() datasource.Datasource { - var ds datasource.Datasource - var n int +func getDatasources() []datasource.Datasource { + dss := make([]datasource.Datasource, 0, 5) if sources.file != "" { - ds = datasource.NewLocalFile(sources.file) - n++ + dss = append(dss, datasource.NewLocalFile(sources.file)) } if sources.url != "" { - ds = datasource.NewRemoteFile(sources.url) - n++ + dss = append(dss, datasource.NewRemoteFile(sources.url)) } if sources.configDrive != "" { - ds = datasource.NewConfigDrive(sources.configDrive) - n++ + dss = append(dss, datasource.NewConfigDrive(sources.configDrive)) } if sources.metadataService { - ds = datasource.NewMetadataService() - n++ + dss = append(dss, datasource.NewMetadataService()) } if sources.procCmdLine { - ds = datasource.NewProcCmdline() - n++ + dss = append(dss, datasource.NewProcCmdline()) } - if n != 1 { - return nil + return dss +} + +func selectDatasource(sources []datasource.Datasource) datasource.Datasource { + ds := make(chan datasource.Datasource) + stop := make(chan struct{}) + var wg sync.WaitGroup + + for _, s := range sources { + wg.Add(1) + go func(s datasource.Datasource) { + defer wg.Done() + + duration := datasourceInterval + for { + fmt.Printf("Checking availability of %q\n", s.Type()) + if s.IsAvailable() { + ds <- s + return + } else if !s.AvailabilityChanges() { + return + } + select { + case <-stop: + return + case <-time.Tick(duration): + duration = pkg.ExpBackoff(duration, datasourceMaxInterval) + } + } + }(s) } - return ds + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + var s datasource.Datasource + select { + case s = <-ds: + case <-done: + case <-time.Tick(datasourceTimeout): + } + + close(stop) + return s } func processUserdata(userdata string, env *initialize.Environment) error { diff --git a/datasource/configdrive.go b/datasource/configdrive.go index ca618f3..3b008f4 100644 --- a/datasource/configdrive.go +++ b/datasource/configdrive.go @@ -14,6 +14,15 @@ func NewConfigDrive(root string) *configDrive { return &configDrive{path.Join(root, "openstack")} } +func (cd *configDrive) IsAvailable() bool { + _, err := os.Stat(cd.root) + return !os.IsNotExist(err) +} + +func (cd *configDrive) AvailabilityChanges() bool { + return true +} + func (cd *configDrive) ConfigRoot() string { return cd.root } diff --git a/datasource/datasource.go b/datasource/datasource.go index 6c6aec4..7146925 100644 --- a/datasource/datasource.go +++ b/datasource/datasource.go @@ -1,6 +1,8 @@ package datasource type Datasource interface { + IsAvailable() bool + AvailabilityChanges() bool ConfigRoot() string FetchMetadata() ([]byte, error) FetchUserdata() ([]byte, error) diff --git a/datasource/file.go b/datasource/file.go index 0798b7c..a542ff4 100644 --- a/datasource/file.go +++ b/datasource/file.go @@ -2,6 +2,7 @@ package datasource import ( "io/ioutil" + "os" ) type localFile struct { @@ -12,6 +13,15 @@ func NewLocalFile(path string) *localFile { return &localFile{path} } +func (f *localFile) IsAvailable() bool { + _, err := os.Stat(f.path) + return !os.IsNotExist(err) +} + +func (f *localFile) AvailabilityChanges() bool { + return true +} + func (f *localFile) ConfigRoot() string { return "" } diff --git a/datasource/metadata_service.go b/datasource/metadata_service.go index 956d2b9..8e7450e 100644 --- a/datasource/metadata_service.go +++ b/datasource/metadata_service.go @@ -39,6 +39,16 @@ func NewMetadataService() *metadataService { return &metadataService{} } +func (ms *metadataService) IsAvailable() bool { + client := pkg.NewHttpClient() + _, err := client.Get(BaseUrl) + return (err == nil) +} + +func (ms *metadataService) AvailabilityChanges() bool { + return true +} + func (ms *metadataService) ConfigRoot() string { return "" } diff --git a/datasource/proc_cmdline.go b/datasource/proc_cmdline.go index b91ea7d..16918b2 100644 --- a/datasource/proc_cmdline.go +++ b/datasource/proc_cmdline.go @@ -22,6 +22,21 @@ func NewProcCmdline() *procCmdline { return &procCmdline{Location: ProcCmdlineLocation} } +func (c *procCmdline) IsAvailable() bool { + contents, err := ioutil.ReadFile(c.Location) + if err != nil { + return false + } + + cmdline := strings.TrimSpace(string(contents)) + _, err = findCloudConfigURL(cmdline) + return (err == nil) +} + +func (c *procCmdline) AvailabilityChanges() bool { + return false +} + func (c *procCmdline) ConfigRoot() string { return "" } diff --git a/datasource/url.go b/datasource/url.go index 80f6fb1..3debca0 100644 --- a/datasource/url.go +++ b/datasource/url.go @@ -10,6 +10,16 @@ func NewRemoteFile(url string) *remoteFile { return &remoteFile{url} } +func (f *remoteFile) IsAvailable() bool { + client := pkg.NewHttpClient() + _, err := client.Get(f.url) + return (err == nil) +} + +func (f *remoteFile) AvailabilityChanges() bool { + return true +} + func (f *remoteFile) ConfigRoot() string { return "" } diff --git a/pkg/http_client.go b/pkg/http_client.go index 095f64a..d0da7ea 100644 --- a/pkg/http_client.go +++ b/pkg/http_client.go @@ -88,7 +88,7 @@ func NewHttpClient() *HttpClient { return hc } -func expBackoff(interval, max time.Duration) time.Duration { +func ExpBackoff(interval, max time.Duration) time.Duration { interval = interval * 2 if interval > max { interval = max @@ -131,7 +131,7 @@ func (h *HttpClient) GetRetry(rawurl string) ([]byte, error) { return data, err } - duration = expBackoff(duration, h.MaxBackoff) + duration = ExpBackoff(duration, h.MaxBackoff) log.Printf("Sleeping for %v...", duration) time.Sleep(duration) } diff --git a/pkg/http_client_test.go b/pkg/http_client_test.go index 2af87b4..6711a09 100644 --- a/pkg/http_client_test.go +++ b/pkg/http_client_test.go @@ -14,7 +14,7 @@ func TestExpBackoff(t *testing.T) { duration := time.Millisecond max := time.Hour for i := 0; i < math.MaxUint16; i++ { - duration = expBackoff(duration, max) + duration = ExpBackoff(duration, max) if duration < 0 { t.Fatalf("duration too small: %v %v", duration, i) }