Merge pull request #164 from crawford/datasources

datasources: Add support for specifying multiple datasources
This commit is contained in:
Alex Crawford 2014-06-27 00:25:09 -07:00
commit 099de62e9a
11 changed files with 207 additions and 81 deletions

View File

@ -4,13 +4,21 @@ import (
"flag" "flag"
"fmt" "fmt"
"os" "os"
"sync"
"time"
"github.com/coreos/coreos-cloudinit/datasource" "github.com/coreos/coreos-cloudinit/datasource"
"github.com/coreos/coreos-cloudinit/initialize" "github.com/coreos/coreos-cloudinit/initialize"
"github.com/coreos/coreos-cloudinit/pkg"
"github.com/coreos/coreos-cloudinit/system" "github.com/coreos/coreos-cloudinit/system"
) )
const version = "0.7.7+git" const (
version = "0.7.7+git"
datasourceInterval = 100 * time.Millisecond
datasourceMaxInterval = 30 * time.Second
datasourceTimeout = 5 * time.Minute
)
var ( var (
printVersion bool printVersion bool
@ -68,12 +76,18 @@ func main() {
os.Exit(1) os.Exit(1)
} }
ds := getDatasource() dss := getDatasources()
if ds == nil { if len(dss) == 0 {
fmt.Println("Provide exactly one of --from-file, --from-configdrive, --from-metadata-service, --from-url or --from-proc-cmdline") fmt.Println("Provide at least one of --from-file, --from-configdrive, --from-metadata-service, --from-url or --from-proc-cmdline")
os.Exit(1) os.Exit(1)
} }
ds := selectDatasource(dss)
if ds == nil {
fmt.Println("No datasources available in time")
die()
}
fmt.Printf("Fetching user-data from datasource of type %q\n", ds.Type()) fmt.Printf("Fetching user-data from datasource of type %q\n", ds.Type())
userdataBytes, err := ds.FetchUserdata() userdataBytes, err := ds.FetchUserdata()
if err != nil { if err != nil {
@ -119,33 +133,70 @@ func main() {
} }
} }
func getDatasource() datasource.Datasource { func getDatasources() []datasource.Datasource {
var ds datasource.Datasource dss := make([]datasource.Datasource, 0, 5)
var n int
if sources.file != "" { if sources.file != "" {
ds = datasource.NewLocalFile(sources.file) dss = append(dss, datasource.NewLocalFile(sources.file))
n++
} }
if sources.url != "" { if sources.url != "" {
ds = datasource.NewRemoteFile(sources.url) dss = append(dss, datasource.NewRemoteFile(sources.url))
n++
} }
if sources.configDrive != "" { if sources.configDrive != "" {
ds = datasource.NewConfigDrive(sources.configDrive) dss = append(dss, datasource.NewConfigDrive(sources.configDrive))
n++
} }
if sources.metadataService { if sources.metadataService {
ds = datasource.NewMetadataService() dss = append(dss, datasource.NewMetadataService())
n++
} }
if sources.procCmdLine { if sources.procCmdLine {
ds = datasource.NewProcCmdline() dss = append(dss, datasource.NewProcCmdline())
n++
} }
if n != 1 { return dss
return nil }
func selectDatasource(sources []datasource.Datasource) datasource.Datasource {
ds := make(chan datasource.Datasource)
stop := make(chan struct{})
var wg sync.WaitGroup
for _, s := range sources {
wg.Add(1)
go func(s datasource.Datasource) {
defer wg.Done()
duration := datasourceInterval
for {
fmt.Printf("Checking availability of %q\n", s.Type())
if s.IsAvailable() {
ds <- s
return
} else if !s.AvailabilityChanges() {
return
}
select {
case <-stop:
return
case <-time.Tick(duration):
duration = pkg.ExpBackoff(duration, datasourceMaxInterval)
}
}
}(s)
} }
return ds
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
var s datasource.Datasource
select {
case s = <-ds:
case <-done:
case <-time.Tick(datasourceTimeout):
}
close(stop)
return s
} }
func processUserdata(userdata string, env *initialize.Environment) error { func processUserdata(userdata string, env *initialize.Environment) error {

View File

@ -14,6 +14,15 @@ func NewConfigDrive(root string) *configDrive {
return &configDrive{path.Join(root, "openstack")} return &configDrive{path.Join(root, "openstack")}
} }
func (cd *configDrive) IsAvailable() bool {
_, err := os.Stat(cd.root)
return !os.IsNotExist(err)
}
func (cd *configDrive) AvailabilityChanges() bool {
return true
}
func (cd *configDrive) ConfigRoot() string { func (cd *configDrive) ConfigRoot() string {
return cd.root return cd.root
} }

View File

@ -1,6 +1,8 @@
package datasource package datasource
type Datasource interface { type Datasource interface {
IsAvailable() bool
AvailabilityChanges() bool
ConfigRoot() string ConfigRoot() string
FetchMetadata() ([]byte, error) FetchMetadata() ([]byte, error)
FetchUserdata() ([]byte, error) FetchUserdata() ([]byte, error)

View File

@ -2,6 +2,7 @@ package datasource
import ( import (
"io/ioutil" "io/ioutil"
"os"
) )
type localFile struct { type localFile struct {
@ -12,6 +13,15 @@ func NewLocalFile(path string) *localFile {
return &localFile{path} return &localFile{path}
} }
func (f *localFile) IsAvailable() bool {
_, err := os.Stat(f.path)
return !os.IsNotExist(err)
}
func (f *localFile) AvailabilityChanges() bool {
return true
}
func (f *localFile) ConfigRoot() string { func (f *localFile) ConfigRoot() string {
return "" return ""
} }

View File

@ -32,13 +32,23 @@ const (
type metadataService struct{} type metadataService struct{}
type getter interface { type getter interface {
Get(string) ([]byte, error) GetRetry(string) ([]byte, error)
} }
func NewMetadataService() *metadataService { func NewMetadataService() *metadataService {
return &metadataService{} return &metadataService{}
} }
func (ms *metadataService) IsAvailable() bool {
client := pkg.NewHttpClient()
_, err := client.Get(BaseUrl)
return (err == nil)
}
func (ms *metadataService) AvailabilityChanges() bool {
return true
}
func (ms *metadataService) ConfigRoot() string { func (ms *metadataService) ConfigRoot() string {
return "" return ""
} }
@ -49,12 +59,12 @@ func (ms *metadataService) FetchMetadata() ([]byte, error) {
func (ms *metadataService) FetchUserdata() ([]byte, error) { func (ms *metadataService) FetchUserdata() ([]byte, error) {
client := pkg.NewHttpClient() client := pkg.NewHttpClient()
if data, err := client.Get(Ec2UserdataUrl); err == nil { if data, err := client.GetRetry(Ec2UserdataUrl); err == nil {
return data, err return data, err
} else if _, ok := err.(pkg.ErrTimeout); ok { } else if _, ok := err.(pkg.ErrTimeout); ok {
return data, err return data, err
} }
return client.Get(OpenstackUserdataUrl) return client.GetRetry(OpenstackUserdataUrl)
} }
func (ms *metadataService) Type() string { func (ms *metadataService) Type() string {
@ -62,7 +72,7 @@ func (ms *metadataService) Type() string {
} }
func fetchMetadata(client getter) ([]byte, error) { func fetchMetadata(client getter) ([]byte, error) {
if metadata, err := client.Get(OpenstackMetadataUrl); err == nil { if metadata, err := client.GetRetry(OpenstackMetadataUrl); err == nil {
return metadata, nil return metadata, nil
} else if _, ok := err.(pkg.ErrTimeout); ok { } else if _, ok := err.(pkg.ErrTimeout); ok {
return nil, err return nil, err
@ -76,7 +86,7 @@ func fetchMetadata(client getter) ([]byte, error) {
} }
func fetchAttributes(client getter, url string) ([]string, error) { func fetchAttributes(client getter, url string) ([]string, error) {
resp, err := client.Get(url) resp, err := client.GetRetry(url)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -14,7 +14,7 @@ type TestHttpClient struct {
err error err error
} }
func (t *TestHttpClient) Get(url string) ([]byte, error) { func (t *TestHttpClient) GetRetry(url string) ([]byte, error) {
if t.err != nil { if t.err != nil {
return nil, t.err return nil, t.err
} }

View File

@ -22,6 +22,21 @@ func NewProcCmdline() *procCmdline {
return &procCmdline{Location: ProcCmdlineLocation} return &procCmdline{Location: ProcCmdlineLocation}
} }
func (c *procCmdline) IsAvailable() bool {
contents, err := ioutil.ReadFile(c.Location)
if err != nil {
return false
}
cmdline := strings.TrimSpace(string(contents))
_, err = findCloudConfigURL(cmdline)
return (err == nil)
}
func (c *procCmdline) AvailabilityChanges() bool {
return false
}
func (c *procCmdline) ConfigRoot() string { func (c *procCmdline) ConfigRoot() string {
return "" return ""
} }
@ -43,7 +58,7 @@ func (c *procCmdline) FetchUserdata() ([]byte, error) {
} }
client := pkg.NewHttpClient() client := pkg.NewHttpClient()
cfg, err := client.Get(url) cfg, err := client.GetRetry(url)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -10,6 +10,16 @@ func NewRemoteFile(url string) *remoteFile {
return &remoteFile{url} return &remoteFile{url}
} }
func (f *remoteFile) IsAvailable() bool {
client := pkg.NewHttpClient()
_, err := client.Get(f.url)
return (err == nil)
}
func (f *remoteFile) AvailabilityChanges() bool {
return true
}
func (f *remoteFile) ConfigRoot() string { func (f *remoteFile) ConfigRoot() string {
return "" return ""
} }
@ -20,7 +30,7 @@ func (f *remoteFile) FetchMetadata() ([]byte, error) {
func (f *remoteFile) FetchUserdata() ([]byte, error) { func (f *remoteFile) FetchUserdata() ([]byte, error) {
client := pkg.NewHttpClient() client := pkg.NewHttpClient()
return client.Get(f.url) return client.GetRetry(f.url)
} }
func (f *remoteFile) Type() string { func (f *remoteFile) Type() string {

View File

@ -25,7 +25,7 @@ func SSHImportKeysFromURL(system_user string, url string) error {
func fetchUserKeys(url string) ([]string, error) { func fetchUserKeys(url string) ([]string, error) {
client := pkg.NewHttpClient() client := pkg.NewHttpClient()
data, err := client.Get(url) data, err := client.GetRetry(url)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -20,15 +20,23 @@ const (
type Err error type Err error
type ErrTimeout struct{ type ErrTimeout struct {
Err Err
} }
type ErrNotFound struct{ type ErrNotFound struct {
Err Err
} }
type ErrInvalid struct{ type ErrInvalid struct {
Err
}
type ErrServer struct {
Err
}
type ErrNetwork struct {
Err Err
} }
@ -45,18 +53,42 @@ type HttpClient struct {
// Whether or not to skip TLS verification. Defaults to false // Whether or not to skip TLS verification. Defaults to false
SkipTLS bool SkipTLS bool
client *http.Client
} }
func NewHttpClient() *HttpClient { func NewHttpClient() *HttpClient {
return &HttpClient{ hc := &HttpClient{
MaxBackoff: time.Second * 5, MaxBackoff: time.Second * 5,
MaxRetries: 15, MaxRetries: 15,
Timeout: time.Duration(2) * time.Second, Timeout: time.Duration(2) * time.Second,
SkipTLS: false, SkipTLS: false,
} }
// We need to create our own client in order to add timeout support.
// TODO(c4milo) Replace it once Go 1.3 is officially used by CoreOS
// More info: https://code.google.com/p/go/source/detail?r=ada6f2d5f99f
hc.client = &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: hc.SkipTLS,
},
Dial: func(network, addr string) (net.Conn, error) {
deadline := time.Now().Add(hc.Timeout)
c, err := net.DialTimeout(network, addr, hc.Timeout)
if err != nil {
return nil, err
}
c.SetDeadline(deadline)
return c, nil
},
},
}
return hc
} }
func expBackoff(interval, max time.Duration) time.Duration { func ExpBackoff(interval, max time.Duration) time.Duration {
interval = interval * 2 interval = interval * 2
if interval > max { if interval > max {
interval = max interval = max
@ -64,8 +96,8 @@ func expBackoff(interval, max time.Duration) time.Duration {
return interval return interval
} }
// Fetches a given URL with support for exponential backoff and maximum retries // GetRetry fetches a given URL with support for exponential backoff and maximum retries
func (h *HttpClient) Get(rawurl string) ([]byte, error) { func (h *HttpClient) GetRetry(rawurl string) ([]byte, error) {
if rawurl == "" { if rawurl == "" {
return nil, ErrInvalid{errors.New("URL is empty. Skipping.")} return nil, ErrInvalid{errors.New("URL is empty. Skipping.")}
} }
@ -83,55 +115,42 @@ func (h *HttpClient) Get(rawurl string) ([]byte, error) {
dataURL := url.String() dataURL := url.String()
// We need to create our own client in order to add timeout support.
// TODO(c4milo) Replace it once Go 1.3 is officially used by CoreOS
// More info: https://code.google.com/p/go/source/detail?r=ada6f2d5f99f
transport := &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: h.SkipTLS,
},
Dial: func(network, addr string) (net.Conn, error) {
deadline := time.Now().Add(h.Timeout)
c, err := net.DialTimeout(network, addr, h.Timeout)
if err != nil {
return nil, err
}
c.SetDeadline(deadline)
return c, nil
},
}
client := &http.Client{
Transport: transport,
}
duration := 50 * time.Millisecond duration := 50 * time.Millisecond
for retry := 1; retry <= h.MaxRetries; retry++ { for retry := 1; retry <= h.MaxRetries; retry++ {
log.Printf("Fetching data from %s. Attempt #%d", dataURL, retry) log.Printf("Fetching data from %s. Attempt #%d", dataURL, retry)
resp, err := client.Get(dataURL) data, err := h.Get(dataURL)
switch err.(type) {
if err == nil { case ErrNetwork:
defer resp.Body.Close() log.Printf(err.Error())
status := resp.StatusCode / 100 case ErrServer:
log.Printf(err.Error())
if status == HTTP_2xx { case ErrNotFound:
return ioutil.ReadAll(resp.Body) return data, err
} default:
return data, err
if status == HTTP_4xx {
return nil, ErrNotFound{fmt.Errorf("Not found. HTTP status code: %d", resp.StatusCode)}
}
log.Printf("Server error. HTTP status code: %d", resp.StatusCode)
} else {
log.Printf("Unable to fetch data: %s", err.Error())
} }
duration = expBackoff(duration, h.MaxBackoff) duration = ExpBackoff(duration, h.MaxBackoff)
log.Printf("Sleeping for %v...", duration) log.Printf("Sleeping for %v...", duration)
time.Sleep(duration) time.Sleep(duration)
} }
return nil, ErrTimeout{fmt.Errorf("Unable to fetch data. Maximum retries reached: %d", h.MaxRetries)} return nil, ErrTimeout{fmt.Errorf("Unable to fetch data. Maximum retries reached: %d", h.MaxRetries)}
} }
func (h *HttpClient) Get(dataURL string) ([]byte, error) {
if resp, err := h.client.Get(dataURL); err == nil {
defer resp.Body.Close()
switch resp.StatusCode / 100 {
case HTTP_2xx:
return ioutil.ReadAll(resp.Body)
case HTTP_4xx:
return nil, ErrNotFound{fmt.Errorf("Not found. HTTP status code: %d", resp.StatusCode)}
default:
return nil, ErrServer{fmt.Errorf("Server error. HTTP status code: %d", resp.StatusCode)}
}
} else {
return nil, ErrNetwork{fmt.Errorf("Unable to fetch data: %s", err.Error())}
}
}

View File

@ -14,7 +14,7 @@ func TestExpBackoff(t *testing.T) {
duration := time.Millisecond duration := time.Millisecond
max := time.Hour max := time.Hour
for i := 0; i < math.MaxUint16; i++ { for i := 0; i < math.MaxUint16; i++ {
duration = expBackoff(duration, max) duration = ExpBackoff(duration, max)
if duration < 0 { if duration < 0 {
t.Fatalf("duration too small: %v %v", duration, i) t.Fatalf("duration too small: %v %v", duration, i)
} }
@ -51,7 +51,7 @@ func TestGetURLExpBackOff(t *testing.T) {
ts := httptest.NewServer(mux) ts := httptest.NewServer(mux)
defer ts.Close() defer ts.Close()
data, err := client.Get(ts.URL) data, err := client.GetRetry(ts.URL)
if err != nil { if err != nil {
t.Errorf("Test case %d produced error: %v", i, err) t.Errorf("Test case %d produced error: %v", i, err)
} }
@ -76,7 +76,7 @@ func TestGetURL4xx(t *testing.T) {
})) }))
defer ts.Close() defer ts.Close()
_, err := client.Get(ts.URL) _, err := client.GetRetry(ts.URL)
if err == nil { if err == nil {
t.Errorf("Incorrect result\ngot: %s\nwant: %s", err.Error(), "Not found. HTTP status code: 404") t.Errorf("Incorrect result\ngot: %s\nwant: %s", err.Error(), "Not found. HTTP status code: 404")
} }
@ -107,7 +107,7 @@ coreos:
})) }))
defer ts.Close() defer ts.Close()
data, err := client.Get(ts.URL) data, err := client.GetRetry(ts.URL)
if err != nil { if err != nil {
t.Errorf("Incorrect result\ngot: %v\nwant: %v", err, nil) t.Errorf("Incorrect result\ngot: %v\nwant: %v", err, nil)
} }
@ -132,7 +132,7 @@ func TestGetMalformedURL(t *testing.T) {
} }
for _, test := range tests { for _, test := range tests {
_, err := client.Get(test.url) _, err := client.GetRetry(test.url)
if err == nil || err.Error() != test.want { if err == nil || err.Error() != test.want {
t.Errorf("Incorrect result\ngot: %v\nwant: %v", err, test.want) t.Errorf("Incorrect result\ngot: %v\nwant: %v", err, test.want)
} }