diff --git a/fleetctl/destroy_test.go b/fleetctl/destroy_test.go index 626a1b7fe..601b59af8 100644 --- a/fleetctl/destroy_test.go +++ b/fleetctl/destroy_test.go @@ -63,7 +63,7 @@ func TestRunDestroyUnits(t *testing.T) { var wg sync.WaitGroup errchan := make(chan error) - cAPI = newFakeRegistryForCommands(unitPrefix, len(r.units)) + cAPI = newFakeRegistryForCommands(unitPrefix, len(r.units), false) wg.Add(2) go func() { diff --git a/fleetctl/fleetctl.go b/fleetctl/fleetctl.go index 71785606d..cc3b7fffb 100644 --- a/fleetctl/fleetctl.go +++ b/fleetctl/fleetctl.go @@ -483,6 +483,51 @@ func getChecker() *ssh.HostKeyChecker { return ssh.NewHostKeyChecker(keyFile) } +// getUnitFile attempts to get a UnitFile configuration +// It takes a unit file name as a parameter and tries first to lookup +// the unit from the local disk. If it fails, it checks if the provided +// file name may reference an instance of a template unit, if so, it +// tries to get the template configuration either from the registry or +// the local disk. +// It returns a UnitFile configuration or nil; and any error ecountered +func getUnitFile(file string) (*unit.UnitFile, error) { + var uf *unit.UnitFile + name := unitNameMangle(file) + + log.Debugf("Looking for Unit(%s) or its corresponding template", name) + + // Assume that the file references a local unit file on disk and + // attempt to load it, if it exists + if _, err := os.Stat(file); !os.IsNotExist(err) { + uf, err = getUnitFromFile(file) + if err != nil { + return nil, fmt.Errorf("failed getting Unit(%s) from file: %v", file, err) + } + } else { + // Otherwise (if the unit file does not exist), check if the + // name appears to be an instance of a template unit + info := unit.NewUnitNameInfo(name) + if info == nil { + return nil, fmt.Errorf("error extracting information from unit name %s", name) + } else if !info.IsInstance() { + return nil, fmt.Errorf("unable to find Unit(%s) in Registry or on filesystem", name) + } + + // If it is an instance check for a corresponding template + // unit in the Registry or disk. + // If we found a template unit, later we create a + // near-identical instance unit in the Registry - same + // unit file as the template, but different name + uf, err = getUnitFileFromTemplate(info, file) + if err != nil { + return nil, fmt.Errorf("failed getting Unit(%s) from template: %v", file, err) + } + } + + log.Debugf("Found Unit(%s)", name) + return uf, nil +} + // getUnitFromFile attempts to load a Unit from a given filename // It returns the Unit or nil, and any error encountered func getUnitFromFile(file string) (*unit.UnitFile, error) { @@ -497,6 +542,39 @@ func getUnitFromFile(file string) (*unit.UnitFile, error) { return unit.NewUnitFile(string(out)) } +// getUnitFileFromTemplate attempts to get a Unit from a template unit that +// is either in the registry or on the file system +// It takes two arguments, the template information and the unit file name +// It returns the Unit or nil; and any error encountered +func getUnitFileFromTemplate(uni *unit.UnitNameInfo, fileName string) (*unit.UnitFile, error) { + var uf *unit.UnitFile + + tmpl, err := cAPI.Unit(uni.Template) + if err != nil { + return nil, fmt.Errorf("unable to retrieve Unit(%s) from Registry: %v", uni.Template, err) + } + + if tmpl != nil { + warnOnDifferentLocalUnit(fileName, tmpl) + uf = schema.MapSchemaUnitOptionsToUnitFile(tmpl.Options) + log.Debugf("Template Unit(%s) found in registry", uni.Template) + } else { + // Finally, if we could not find a template unit in the Registry, + // check the local disk for one instead + filePath := path.Join(path.Dir(fileName), uni.Template) + if _, err := os.Stat(filePath); os.IsNotExist(err) { + return nil, fmt.Errorf("unable to find Unit(%s) in Registry or on filesystem", uni.Template) + } + + uf, err = getUnitFromFile(filePath) + if err != nil { + return nil, fmt.Errorf("unable to load Unit(%s) from file: %v", filePath, err) + } + } + + return uf, nil +} + func getTunnelFlag() string { tun := globalFlags.Tunnel if tun != "" && !strings.Contains(tun, ":") { @@ -599,8 +677,6 @@ func lazyCreateUnits(args []string) error { errchan := make(chan error) var wg sync.WaitGroup for _, arg := range args { - // TODO(jonboulle): this loop is getting too unwieldy; factor it out - arg = maybeAppendDefaultUnitType(arg) name := unitNameMangle(arg) @@ -615,45 +691,12 @@ func lazyCreateUnits(args []string) error { continue } - var uf *unit.UnitFile - // Failing that, assume the name references a local unit file on disk, and attempt to load that, if it exists - // TODO(mischief): consolidate these two near-identical codepaths - if _, err := os.Stat(arg); !os.IsNotExist(err) { - uf, err = getUnitFromFile(arg) - if err != nil { - return fmt.Errorf("failed getting Unit(%s) from file: %v", arg, err) - } - } else { - // Otherwise (if the unit file does not exist), check if the name appears to be an instance unit, - // and if so, check for a corresponding template unit in the Registry - uni := unit.NewUnitNameInfo(name) - if uni == nil { - return fmt.Errorf("error extracting information from unit name %s", name) - } else if !uni.IsInstance() { - return fmt.Errorf("unable to find Unit(%s) in Registry or on filesystem", name) - } - tmpl, err := cAPI.Unit(uni.Template) - if err != nil { - return fmt.Errorf("error retrieving template Unit(%s) from Registry: %v", uni.Template, err) - } - - // Finally, if we could not find a template unit in the Registry, check the local disk for one instead - if tmpl == nil { - file := path.Join(path.Dir(arg), uni.Template) - if _, err := os.Stat(file); os.IsNotExist(err) { - return fmt.Errorf("unable to find Unit(%s) or template Unit(%s) in Registry or on filesystem", name, uni.Template) - } - uf, err = getUnitFromFile(file) - if err != nil { - return fmt.Errorf("failed getting template Unit(%s) from file: %v", uni.Template, err) - } - } else { - warnOnDifferentLocalUnit(arg, tmpl) - uf = schema.MapSchemaUnitOptionsToUnitFile(tmpl.Options) - } - - // If we found a template unit, create a near-identical instance unit in - // the Registry - same unit file as the template, but different name + // Assume that the name references a local unit file on + // disk or if it is an instance unit and if so get its + // corresponding unit + uf, err := getUnitFile(arg) + if err != nil { + return err } _, err = createUnit(name, uf) @@ -746,6 +789,54 @@ func setTargetStateOfUnits(units []string, state job.JobState) ([]*schema.Unit, return triggered, nil } +// getBlockAttempts gets the correct value of how many attempts to try +// before giving up on an operation. +// It returns a negative value which means do not block, if zero is +// returned then it means try forever, and if a positive value is +// returned then try up to that value +func getBlockAttempts() int { + // By default we wait forever + var attempts int = 0 + + if sharedFlags.BlockAttempts > 0 { + attempts = sharedFlags.BlockAttempts + } + + if sharedFlags.NoBlock { + attempts = -1 + } + + return attempts +} + +// tryWaitForUnitStates tries to wait for units to reach the desired state. +// It takes 5 arguments, the units to wait for, the desired state, the +// desired JobState, how many attempts before timing out and a writer +// interface. +// tryWaitForUnitStates polls each of the indicated units until they +// reach the desired state. If maxAttempts is negative, then it will not +// wait, it will assume that all units reached their desired state. +// If maxAttempts is zero tryWaitForUnitStates will retry forever, and +// if it is greater than zero, it will retry up to the indicated value. +// It returns 0 on success or 1 on errors. +func tryWaitForUnitStates(units []string, state string, js job.JobState, maxAttempts int, out io.Writer) (ret int) { + // We do not wait just assume we reached the desired state + if maxAttempts <= -1 { + for _, name := range units { + stdout("Triggered unit %s %s", name, state) + } + return + } + + errchan := waitForUnitStates(units, js, maxAttempts, out) + for err := range errchan { + stderr("Error waiting for units: %v", err) + ret = 1 + } + + return +} + // waitForUnitStates polls each of the indicated units until each of their // states is equal to that which the caller indicates, or until the // polling operation times out. waitForUnitStates will retry forever, or diff --git a/fleetctl/fleetctl_test.go b/fleetctl/fleetctl_test.go index 0d50de40c..97b5cb046 100644 --- a/fleetctl/fleetctl_test.go +++ b/fleetctl/fleetctl_test.go @@ -22,6 +22,7 @@ import ( "github.com/coreos/fleet/job" "github.com/coreos/fleet/machine" "github.com/coreos/fleet/registry" + "github.com/coreos/fleet/schema" "github.com/coreos/fleet/unit" "github.com/coreos/fleet/version" @@ -34,7 +35,7 @@ type commandTestResults struct { expectedExit int } -func newFakeRegistryForCommands(unitPrefix string, unitCount int) client.API { +func newFakeRegistryForCommands(unitPrefix string, unitCount int, template bool) client.API { // clear machineStates for every invocation machineStates = nil machines := []machine.MachineState{ @@ -43,30 +44,43 @@ func newFakeRegistryForCommands(unitPrefix string, unitCount int) client.API { } jobs := make([]job.Job, 0) - appendJobsForTests(&jobs, machines[0], unitPrefix, unitCount) - appendJobsForTests(&jobs, machines[1], unitPrefix, unitCount) + appendJobsForTests(&jobs, machines[0], unitPrefix, unitCount, template) + appendJobsForTests(&jobs, machines[1], unitPrefix, unitCount, template) states := make([]unit.UnitState, 0) - for i := 1; i <= unitCount; i++ { + if template { state := unit.UnitState{ - UnitName: fmt.Sprintf("%s%d.service", unitPrefix, i), + UnitName: fmt.Sprintf("%s@.service", unitPrefix), LoadState: "loaded", - ActiveState: "active", - SubState: "listening", + ActiveState: "inactive", + SubState: "dead", MachineID: machines[0].ID, } states = append(states, state) - } + state.MachineID = machines[1].ID + states = append(states, state) + } else { + for i := 1; i <= unitCount; i++ { + state := unit.UnitState{ + UnitName: fmt.Sprintf("%s%d.service", unitPrefix, i), + LoadState: "loaded", + ActiveState: "active", + SubState: "listening", + MachineID: machines[0].ID, + } + states = append(states, state) + } - for i := 1; i <= unitCount; i++ { - state := unit.UnitState{ - UnitName: fmt.Sprintf("%s%d.service", unitPrefix, i), - LoadState: "loaded", - ActiveState: "inactive", - SubState: "dead", - MachineID: machines[1].ID, + for i := 1; i <= unitCount; i++ { + state := unit.UnitState{ + UnitName: fmt.Sprintf("%s%d.service", unitPrefix, i), + LoadState: "loaded", + ActiveState: "inactive", + SubState: "dead", + MachineID: machines[1].ID, + } + states = append(states, state) } - states = append(states, state) } reg := registry.NewFakeRegistry() @@ -77,14 +91,40 @@ func newFakeRegistryForCommands(unitPrefix string, unitCount int) client.API { return &client.RegistryClient{Registry: reg} } -func appendJobsForTests(jobs *[]job.Job, machine machine.MachineState, prefix string, unitCount int) { - for i := 1; i <= unitCount; i++ { +func appendJobsForTests(jobs *[]job.Job, machine machine.MachineState, prefix string, unitCount int, template bool) { + if template { + // for start or load operations we may need to wait + // during the creation of units, and since this is a + // faked registry just set the 'Global' flag so we don't + // block forever + Options := []*schema.UnitOption{ + &schema.UnitOption{ + Section: "Unit", + Name: "Description", + Value: fmt.Sprintf("Template %s@.service", prefix), + }, + &schema.UnitOption{ + Section: "X-Fleet", + Name: "Global", + Value: "true", + }, + } + uf := schema.MapSchemaUnitOptionsToUnitFile(Options) j := job.Job{ - Name: fmt.Sprintf("%s%d.service", prefix, i), - Unit: unit.UnitFile{}, + Name: fmt.Sprintf("%s@.service", prefix), + Unit: *uf, TargetMachineID: machine.ID, } *jobs = append(*jobs, j) + } else { + for i := 1; i <= unitCount; i++ { + j := job.Job{ + Name: fmt.Sprintf("%s%d.service", prefix, i), + Unit: unit.UnitFile{}, + TargetMachineID: machine.ID, + } + *jobs = append(*jobs, j) + } } return @@ -181,6 +221,37 @@ func TestUnitNameMangle(t *testing.T) { } } +func TestGetBlockAttempts(t *testing.T) { + oldNoBlock := sharedFlags.NoBlock + oldBlockAttempts := sharedFlags.BlockAttempts + + defer func() { + sharedFlags.NoBlock = oldNoBlock + sharedFlags.BlockAttempts = oldBlockAttempts + }() + + var blocktests = []struct { + noBlock bool + blockAttempts int + expected int + }{ + {true, 0, -1}, + {true, -1, -1}, + {true, 9999, -1}, + {false, 0, 0}, + {false, -1, 0}, + {false, 9999, 9999}, + } + + for _, tt := range blocktests { + sharedFlags.NoBlock = tt.noBlock + sharedFlags.BlockAttempts = tt.blockAttempts + if n := getBlockAttempts(); n != tt.expected { + t.Errorf("got %d, want %d", n, tt.expected) + } + } +} + func newUnitFile(t *testing.T, contents string) *unit.UnitFile { uf, err := unit.NewUnitFile(contents) if err != nil { diff --git a/fleetctl/load.go b/fleetctl/load.go index 497f86058..0f7b0923c 100644 --- a/fleetctl/load.go +++ b/fleetctl/load.go @@ -66,17 +66,7 @@ func runLoadUnits(args []string) (exit int) { } } - if !sharedFlags.NoBlock { - errchan := waitForUnitStates(loading, job.JobStateLoaded, sharedFlags.BlockAttempts, os.Stdout) - for err := range errchan { - stderr("Error waiting for units: %v", err) - exit = 1 - } - } else { - for _, name := range loading { - stdout("Triggered unit %s load", name) - } - } + exit = tryWaitForUnitStates(loading, "load", job.JobStateLoaded, getBlockAttempts(), os.Stdout) return } diff --git a/fleetctl/load_test.go b/fleetctl/load_test.go index e792468c7..01ce9e2de 100644 --- a/fleetctl/load_test.go +++ b/fleetctl/load_test.go @@ -83,7 +83,7 @@ func TestRunLoadUnits(t *testing.T) { var wg sync.WaitGroup errchan := make(chan error) - cAPI = newFakeRegistryForCommands(unitPrefix, len(r.units)) + cAPI = newFakeRegistryForCommands(unitPrefix, len(r.units), false) wg.Add(2) go func() { diff --git a/fleetctl/start.go b/fleetctl/start.go index 77ea0d343..2ded29539 100644 --- a/fleetctl/start.go +++ b/fleetctl/start.go @@ -74,17 +74,7 @@ func runStartUnit(args []string) (exit int) { } } - if !sharedFlags.NoBlock { - errchan := waitForUnitStates(starting, job.JobStateLaunched, sharedFlags.BlockAttempts, os.Stdout) - for err := range errchan { - stderr("Error waiting for units: %v", err) - exit = 1 - } - } else { - for _, name := range starting { - stdout("Triggered unit %s start", name) - } - } + exit = tryWaitForUnitStates(starting, "start", job.JobStateLaunched, getBlockAttempts(), os.Stdout) return } diff --git a/fleetctl/start_test.go b/fleetctl/start_test.go index 344c9603d..c97de662e 100644 --- a/fleetctl/start_test.go +++ b/fleetctl/start_test.go @@ -15,80 +15,123 @@ package main import ( - "time" + "fmt" + "sync" + "testing" - "github.com/coreos/fleet/client" "github.com/coreos/fleet/job" - "github.com/coreos/fleet/machine" - "github.com/coreos/fleet/registry" - "github.com/coreos/fleet/unit" + "github.com/coreos/fleet/schema" ) -type BlockedFakeRegistry struct { - EchoAttempts int - registry.FakeRegistry +func checkStartUnitState(unit schema.Unit, startRet int, errchan chan error) { + if startRet == 0 { + if job.JobState(unit.DesiredState) != job.JobStateLaunched { + errchan <- fmt.Errorf("Error: unit %s was not started as requested", unit.Name) + } + } else if unit.DesiredState != "" { + // if the whole start operation failed, then no unit + // should have a DesiredState set + errchan <- fmt.Errorf("Error: Unit(%s) DesiredState was set to (%s)", unit.Name, unit.DesiredState) + } } -func (b *BlockedFakeRegistry) Unit(name string) (*job.Unit, error) { - if name == "hello.service" { - time.Sleep(500 * time.Millisecond) +func doStartUnits(r commandTestResults, errchan chan error) { + exit := runStartUnit(r.units) + if exit != r.expectedExit { + errchan <- fmt.Errorf("%s: expected exit code %d but received %d", r.description, r.expectedExit, exit) + return } - if name == "echo.service" { - if b.EchoAttempts != 0 { - b.EchoAttempts-- - return nil, nil - } + real_units, err := findUnits(r.units) + if err != nil { + errchan <- err + return } - return b.FakeRegistry.Unit(name) + for _, v := range real_units { + checkStartUnitState(v, r.expectedExit, errchan) + } } -func setupRegistryForStart(echoAttempts int) { - m1 := machine.MachineState{ - ID: "c31e44e1-f858-436e-933e-59c642517860", - PublicIP: "1.2.3.4", - Metadata: map[string]string{"ping": "pong"}, - } - m2 := machine.MachineState{ - ID: "595989bb-cbb7-49ce-8726-722d6e157b4e", - PublicIP: "5.6.7.8", - Metadata: map[string]string{"foo": "bar"}, - } - m3 := machine.MachineState{ - ID: "520983A8-FB9C-4A68-B49C-CED5BB2E9D08", - Metadata: map[string]string{"foo": "bar"}, +func runStartUnits(t *testing.T, unitPrefix string, results []commandTestResults, template bool) { + unitsCount := 0 + sharedFlags.NoBlock = true + for _, r := range results { + var wg sync.WaitGroup + errchan := make(chan error) + + if !template { + unitsCount = len(r.units) + } + + cAPI = newFakeRegistryForCommands(unitPrefix, unitsCount, template) + + wg.Add(1) + go func() { + defer wg.Done() + doStartUnits(r, errchan) + }() + + go func() { + wg.Wait() + close(errchan) + }() + + for err := range errchan { + t.Errorf("%v", err) + } } +} - states := []unit.UnitState{ - unit.UnitState{ - UnitName: "pong.service", - LoadState: "loaded", - ActiveState: "active", - SubState: "listening", - MachineID: m1.ID, +func TestRunStartUnits(t *testing.T) { + unitPrefix := "start" + oldNoBlock := sharedFlags.NoBlock + defer func() { + sharedFlags.NoBlock = oldNoBlock + }() + + results := []commandTestResults{ + { + "start available units", + []string{"start1", "start2", "start3", "start4", "start5", "start6"}, + 0, + }, + { + "start non-available units", + []string{"y1", "y2"}, + 1, }, - unit.UnitState{ - UnitName: "hello.service", - LoadState: "loaded", - ActiveState: "inactive", - SubState: "dead", - MachineID: m2.ID, + { + "start available and non-available units", + []string{"y1", "y2", "y3", "y4", "start1", "start2", "start3", "start4", "start5", "start6", "y0"}, + 1, }, - unit.UnitState{ - UnitName: "echo.service", - LoadState: "loaded", - ActiveState: "inactive", - SubState: "dead", - MachineID: m2.ID, + { + "start a unit from a non-available template", + []string{"foo-template@1"}, + 1, }, } - machines := []machine.MachineState{m1, m2, m3} - - reg := registry.NewFakeRegistry() - reg.SetMachines(machines) - reg.SetUnitStates(states) + templateResults := []commandTestResults{ + { + "start a unit from a non-available template", + []string{"start-foo@1"}, + 1, + }, + { + "start units from an available template", + []string{"start@1", "start@100", "start@1000"}, + 0, + }, + { + "start same unit from an available template", + []string{"start@1", "start@1", "start@1"}, + 0, + }, + } - cAPI = &client.RegistryClient{Registry: &BlockedFakeRegistry{EchoAttempts: echoAttempts, FakeRegistry: *reg}} + sharedFlags.NoBlock = true + runStartUnits(t, unitPrefix, results, false) + runStartUnits(t, unitPrefix, templateResults, true) } diff --git a/fleetctl/stop.go b/fleetctl/stop.go index 262946bfb..941b33d7b 100644 --- a/fleetctl/stop.go +++ b/fleetctl/stop.go @@ -79,17 +79,7 @@ func runStopUnit(args []string) (exit int) { } } - if !sharedFlags.NoBlock { - errchan := waitForUnitStates(stopping, job.JobStateLoaded, sharedFlags.BlockAttempts, os.Stdout) - for err := range errchan { - stderr("Error waiting for units: %v", err) - exit = 1 - } - } else { - for _, name := range stopping { - stdout("Triggered unit %s stop", name) - } - } + exit = tryWaitForUnitStates(stopping, "stop", job.JobStateLoaded, getBlockAttempts(), os.Stdout) return } diff --git a/fleetctl/stop_test.go b/fleetctl/stop_test.go index cde6493c8..536419ec1 100644 --- a/fleetctl/stop_test.go +++ b/fleetctl/stop_test.go @@ -73,7 +73,7 @@ func TestRunStopUnits(t *testing.T) { var wg sync.WaitGroup errchan := make(chan error) - cAPI = newFakeRegistryForCommands(unitPrefix, len(r.units)) + cAPI = newFakeRegistryForCommands(unitPrefix, len(r.units), false) wg.Add(2) go func() { diff --git a/fleetctl/unload.go b/fleetctl/unload.go index 6758f2825..48eb833fc 100644 --- a/fleetctl/unload.go +++ b/fleetctl/unload.go @@ -60,17 +60,7 @@ func runUnloadUnit(args []string) (exit int) { } } - if !sharedFlags.NoBlock { - errchan := waitForUnitStates(wait, job.JobStateInactive, sharedFlags.BlockAttempts, os.Stdout) - for err := range errchan { - stderr("Error waiting for units: %v", err) - exit = 1 - } - } else { - for _, name := range wait { - stdout("Triggered unit %s unload", name) - } - } + exit = tryWaitForUnitStates(wait, "unload", job.JobStateInactive, getBlockAttempts(), os.Stdout) return } diff --git a/fleetctl/unload_test.go b/fleetctl/unload_test.go index a6702a803..8e61b3dd9 100644 --- a/fleetctl/unload_test.go +++ b/fleetctl/unload_test.go @@ -73,7 +73,7 @@ func TestRunUnloadUnits(t *testing.T) { var wg sync.WaitGroup errchan := make(chan error) - cAPI = newFakeRegistryForCommands(unitPrefix, len(r.units)) + cAPI = newFakeRegistryForCommands(unitPrefix, len(r.units), false) wg.Add(2) go func() {