datasources: Add support for specifying multiple datasources
If multiple sources are specified, the first available source is used.
This commit is contained in:
parent
68dc902ed1
commit
c089216cb5
@ -4,13 +4,21 @@ import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/coreos-cloudinit/datasource"
|
||||
"github.com/coreos/coreos-cloudinit/initialize"
|
||||
"github.com/coreos/coreos-cloudinit/pkg"
|
||||
"github.com/coreos/coreos-cloudinit/system"
|
||||
)
|
||||
|
||||
const version = "0.7.7+git"
|
||||
const (
|
||||
version = "0.7.7+git"
|
||||
datasourceInterval = 100 * time.Millisecond
|
||||
datasourceMaxInterval = 30 * time.Second
|
||||
datasourceTimeout = 5 * time.Minute
|
||||
)
|
||||
|
||||
var (
|
||||
printVersion bool
|
||||
@ -68,12 +76,18 @@ func main() {
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
ds := getDatasource()
|
||||
if ds == nil {
|
||||
fmt.Println("Provide exactly one of --from-file, --from-configdrive, --from-metadata-service, --from-url or --from-proc-cmdline")
|
||||
dss := getDatasources()
|
||||
if len(dss) == 0 {
|
||||
fmt.Println("Provide at least one of --from-file, --from-configdrive, --from-metadata-service, --from-url or --from-proc-cmdline")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
ds := selectDatasource(dss)
|
||||
if ds == nil {
|
||||
fmt.Println("No datasources available in time")
|
||||
die()
|
||||
}
|
||||
|
||||
fmt.Printf("Fetching user-data from datasource of type %q\n", ds.Type())
|
||||
userdataBytes, err := ds.FetchUserdata()
|
||||
if err != nil {
|
||||
@ -119,33 +133,70 @@ func main() {
|
||||
}
|
||||
}
|
||||
|
||||
func getDatasource() datasource.Datasource {
|
||||
var ds datasource.Datasource
|
||||
var n int
|
||||
func getDatasources() []datasource.Datasource {
|
||||
dss := make([]datasource.Datasource, 0, 5)
|
||||
if sources.file != "" {
|
||||
ds = datasource.NewLocalFile(sources.file)
|
||||
n++
|
||||
dss = append(dss, datasource.NewLocalFile(sources.file))
|
||||
}
|
||||
if sources.url != "" {
|
||||
ds = datasource.NewRemoteFile(sources.url)
|
||||
n++
|
||||
dss = append(dss, datasource.NewRemoteFile(sources.url))
|
||||
}
|
||||
if sources.configDrive != "" {
|
||||
ds = datasource.NewConfigDrive(sources.configDrive)
|
||||
n++
|
||||
dss = append(dss, datasource.NewConfigDrive(sources.configDrive))
|
||||
}
|
||||
if sources.metadataService {
|
||||
ds = datasource.NewMetadataService()
|
||||
n++
|
||||
dss = append(dss, datasource.NewMetadataService())
|
||||
}
|
||||
if sources.procCmdLine {
|
||||
ds = datasource.NewProcCmdline()
|
||||
n++
|
||||
dss = append(dss, datasource.NewProcCmdline())
|
||||
}
|
||||
if n != 1 {
|
||||
return nil
|
||||
return dss
|
||||
}
|
||||
return ds
|
||||
|
||||
func selectDatasource(sources []datasource.Datasource) datasource.Datasource {
|
||||
ds := make(chan datasource.Datasource)
|
||||
stop := make(chan struct{})
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for _, s := range sources {
|
||||
wg.Add(1)
|
||||
go func(s datasource.Datasource) {
|
||||
defer wg.Done()
|
||||
|
||||
duration := datasourceInterval
|
||||
for {
|
||||
fmt.Printf("Checking availability of %q\n", s.Type())
|
||||
if s.IsAvailable() {
|
||||
ds <- s
|
||||
return
|
||||
} else if !s.AvailabilityChanges() {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-stop:
|
||||
return
|
||||
case <-time.Tick(duration):
|
||||
duration = pkg.ExpBackoff(duration, datasourceMaxInterval)
|
||||
}
|
||||
}
|
||||
}(s)
|
||||
}
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
var s datasource.Datasource
|
||||
select {
|
||||
case s = <-ds:
|
||||
case <-done:
|
||||
case <-time.Tick(datasourceTimeout):
|
||||
}
|
||||
|
||||
close(stop)
|
||||
return s
|
||||
}
|
||||
|
||||
func processUserdata(userdata string, env *initialize.Environment) error {
|
||||
|
@ -14,6 +14,15 @@ func NewConfigDrive(root string) *configDrive {
|
||||
return &configDrive{path.Join(root, "openstack")}
|
||||
}
|
||||
|
||||
func (cd *configDrive) IsAvailable() bool {
|
||||
_, err := os.Stat(cd.root)
|
||||
return !os.IsNotExist(err)
|
||||
}
|
||||
|
||||
func (cd *configDrive) AvailabilityChanges() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (cd *configDrive) ConfigRoot() string {
|
||||
return cd.root
|
||||
}
|
||||
|
@ -1,6 +1,8 @@
|
||||
package datasource
|
||||
|
||||
type Datasource interface {
|
||||
IsAvailable() bool
|
||||
AvailabilityChanges() bool
|
||||
ConfigRoot() string
|
||||
FetchMetadata() ([]byte, error)
|
||||
FetchUserdata() ([]byte, error)
|
||||
|
@ -2,6 +2,7 @@ package datasource
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
)
|
||||
|
||||
type localFile struct {
|
||||
@ -12,6 +13,15 @@ func NewLocalFile(path string) *localFile {
|
||||
return &localFile{path}
|
||||
}
|
||||
|
||||
func (f *localFile) IsAvailable() bool {
|
||||
_, err := os.Stat(f.path)
|
||||
return !os.IsNotExist(err)
|
||||
}
|
||||
|
||||
func (f *localFile) AvailabilityChanges() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (f *localFile) ConfigRoot() string {
|
||||
return ""
|
||||
}
|
||||
|
@ -39,6 +39,16 @@ func NewMetadataService() *metadataService {
|
||||
return &metadataService{}
|
||||
}
|
||||
|
||||
func (ms *metadataService) IsAvailable() bool {
|
||||
client := pkg.NewHttpClient()
|
||||
_, err := client.Get(BaseUrl)
|
||||
return (err == nil)
|
||||
}
|
||||
|
||||
func (ms *metadataService) AvailabilityChanges() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (ms *metadataService) ConfigRoot() string {
|
||||
return ""
|
||||
}
|
||||
|
@ -22,6 +22,21 @@ func NewProcCmdline() *procCmdline {
|
||||
return &procCmdline{Location: ProcCmdlineLocation}
|
||||
}
|
||||
|
||||
func (c *procCmdline) IsAvailable() bool {
|
||||
contents, err := ioutil.ReadFile(c.Location)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
cmdline := strings.TrimSpace(string(contents))
|
||||
_, err = findCloudConfigURL(cmdline)
|
||||
return (err == nil)
|
||||
}
|
||||
|
||||
func (c *procCmdline) AvailabilityChanges() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *procCmdline) ConfigRoot() string {
|
||||
return ""
|
||||
}
|
||||
|
@ -10,6 +10,16 @@ func NewRemoteFile(url string) *remoteFile {
|
||||
return &remoteFile{url}
|
||||
}
|
||||
|
||||
func (f *remoteFile) IsAvailable() bool {
|
||||
client := pkg.NewHttpClient()
|
||||
_, err := client.Get(f.url)
|
||||
return (err == nil)
|
||||
}
|
||||
|
||||
func (f *remoteFile) AvailabilityChanges() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (f *remoteFile) ConfigRoot() string {
|
||||
return ""
|
||||
}
|
||||
|
@ -88,7 +88,7 @@ func NewHttpClient() *HttpClient {
|
||||
return hc
|
||||
}
|
||||
|
||||
func expBackoff(interval, max time.Duration) time.Duration {
|
||||
func ExpBackoff(interval, max time.Duration) time.Duration {
|
||||
interval = interval * 2
|
||||
if interval > max {
|
||||
interval = max
|
||||
@ -131,7 +131,7 @@ func (h *HttpClient) GetRetry(rawurl string) ([]byte, error) {
|
||||
return data, err
|
||||
}
|
||||
|
||||
duration = expBackoff(duration, h.MaxBackoff)
|
||||
duration = ExpBackoff(duration, h.MaxBackoff)
|
||||
log.Printf("Sleeping for %v...", duration)
|
||||
time.Sleep(duration)
|
||||
}
|
||||
|
@ -14,7 +14,7 @@ func TestExpBackoff(t *testing.T) {
|
||||
duration := time.Millisecond
|
||||
max := time.Hour
|
||||
for i := 0; i < math.MaxUint16; i++ {
|
||||
duration = expBackoff(duration, max)
|
||||
duration = ExpBackoff(duration, max)
|
||||
if duration < 0 {
|
||||
t.Fatalf("duration too small: %v %v", duration, i)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user