Skip to content
This repository was archived by the owner on Jan 30, 2020. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion fleetctl/destroy.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ func runDestroyUnits(args []string) (exit int) {
for _, v := range units {
err := cAPI.DestroyUnit(v.Name)
if err != nil {
// If unit does not exist do not error out
u, _ := cAPI.Unit(v.Name)
if u == nil {
continue
}
stderr("Error destroying units: %v", err)
exit = 1
continue
Expand Down Expand Up @@ -71,7 +76,7 @@ func runDestroyUnits(args []string) (exit int) {
if u == nil {
break
}
time.Sleep(500 * time.Millisecond)
time.Sleep(defaultSleepTime)
}
}

Expand Down
143 changes: 143 additions & 0 deletions fleetctl/destroy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Copyright 2014 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"fmt"
"sync"
"testing"
"time"

"github.com/coreos/fleet/client"
"github.com/coreos/fleet/job"
"github.com/coreos/fleet/machine"
"github.com/coreos/fleet/registry"
"github.com/coreos/fleet/unit"
)

type DestroyTestResults struct {
Description string
DestroyUnits []string
ExpectedExit int
}

func newFakeRegistryForDestroy(prefix string, unitCnt int) client.API {
// clear machineStates for every invocation
machineStates = nil
machines := []machine.MachineState{
newMachineState("c31e44e1-f858-436e-933e-59c642517860", "1.2.3.4", map[string]string{"ping": "pong"}),
newMachineState("595989bb-cbb7-49ce-8726-722d6e157b4e", "5.6.7.8", map[string]string{"foo": "bar"}),
}

jobs := make([]job.Job, 0)
appendJobsForTests(&jobs, machines[0], prefix, unitCnt)
appendJobsForTests(&jobs, machines[1], prefix, unitCnt)

states := make([]unit.UnitState, 0)
for i := 1; i <= unitCnt; i++ {
state := unit.UnitState{
UnitName: fmt.Sprintf("%s%d.service", prefix, i),
LoadState: "loaded",
ActiveState: "active",
SubState: "listening",
MachineID: machines[0].ID,
}
states = append(states, state)
}

for i := 1; i <= unitCnt; i++ {
state := unit.UnitState{
UnitName: fmt.Sprintf("%s%d.service", prefix, i),
LoadState: "loaded",
ActiveState: "inactive",
SubState: "dead",
MachineID: machines[1].ID,
}
states = append(states, state)
}

reg := registry.NewFakeRegistry()
reg.SetMachines(machines)
reg.SetUnitStates(states)
reg.SetJobs(jobs)

return &client.RegistryClient{Registry: reg}
}

func doDestroyUnits(r DestroyTestResults, errchan chan error) {
exit := runDestroyUnits(r.DestroyUnits)
if exit != r.ExpectedExit {
errchan <- fmt.Errorf("%s: expected exit code %d but received %d", r.Description, r.ExpectedExit, exit)
}
for _, destroyedUnit := range r.DestroyUnits {
u, _ := cAPI.Unit(destroyedUnit)
if u != nil {
errchan <- fmt.Errorf("%s: unit %s was not destroyed as requested", r.Description, destroyedUnit)
}
}
}

// TestRunDestroyUnits checks for correct unit destruction
func TestRunDestroyUnits(t *testing.T) {
unitPrefix := "j"
results := []DestroyTestResults{
{
"destroy available units",
[]string{"j1", "j2", "j3", "j4", "j5"},
0,
},
{
"destroy non-existent units",
[]string{"y1", "y2"},
0,
},
{
"attempt to destroy available and non-available units",
[]string{"y1", "y2", "y3", "y4", "j1", "j2", "j3", "j4", "j5", "y0"},
0,
},
}

// Check with two goroutines we don't care we should just get
// the right result. If you happen to inspect this code for
// errors then you probably got hit by a race condition in
// Destroy command that should not happen
for _, r := range results {
var wg sync.WaitGroup
errchan := make(chan error)

cAPI = newFakeRegistryForDestroy(unitPrefix, len(r.DestroyUnits))

wg.Add(2)
go func() {
defer wg.Done()
time.Sleep(2 * time.Microsecond)
doDestroyUnits(r, errchan)
}()
go func() {
defer wg.Done()
doDestroyUnits(r, errchan)
}()

go func() {
wg.Wait()
close(errchan)
}()

for err := range errchan {
t.Errorf("%v", err)
}
}
}
174 changes: 131 additions & 43 deletions fleetctl/fleetctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ recommended to upgrade fleetctl to prevent incompatibility issues.
clientDriverAPI = "API"
clientDriverEtcd = "etcd"

defaultEndpoint = "unix:///var/run/fleet.sock"
defaultEndpoint = "unix:///var/run/fleet.sock"
defaultSleepTime = 500 * time.Millisecond
)

var (
Expand Down Expand Up @@ -482,6 +483,36 @@ func getChecker() *ssh.HostKeyChecker {
return ssh.NewHostKeyChecker(keyFile)
}

func getUnitFile(file string) (*unit.UnitFile, error) {
var uf *unit.UnitFile
name := unitNameMangle(file)

log.Debugf("Looking up for Unit(%s) or its corresponding template", name)

// Assume that the file references a local unit file on disk and
// attempt to load it, if it exists
if _, err := os.Stat(file); !os.IsNotExist(err) {
uf, err = getUnitFromFile(file)
if err != nil {
return nil, fmt.Errorf("failed getting Unit(%s) from file: %v", file, err)
}
} else {
// Otherwise (if the unit file does not exist), check if the
// name appears to be an instance unit, and if so, check for
// a corresponding template unit in the Registry or disk
uf, err = getUnitFileFromTemplate(file)
if err != nil {
return nil, err
}

// If we found a template unit, create a near-identical instance unit in
// the Registry - same unit file as the template, but different name
}

log.Debugf("Found Unit(%s)", name)
return uf, nil
}

// getUnitFromFile attempts to load a Unit from a given filename
// It returns the Unit or nil, and any error encountered
func getUnitFromFile(file string) (*unit.UnitFile, error) {
Expand All @@ -496,6 +527,48 @@ func getUnitFromFile(file string) (*unit.UnitFile, error) {
return unit.NewUnitFile(string(out))
}

// getUnitFileFromTemplate checks if the name appears to be an instance unit
// and gets its corresponding template unit from the registry or local disk
// It returns the Unit or nil; and any error encountered
func getUnitFileFromTemplate(arg string) (*unit.UnitFile, error) {
var uf *unit.UnitFile
name := unitNameMangle(arg)

// Check if the name appears to be an instance unit, and if so,
// check for a corresponding template unit in the Registry
uni := unit.NewUnitNameInfo(name)
if uni == nil {
return nil, fmt.Errorf("error extracting information from unit name %s", name)
} else if !uni.IsInstance() {
return nil, fmt.Errorf("unable to find Unit(%s) in Registry or on filesystem", name)
}

tmpl, err := cAPI.Unit(uni.Template)
if err != nil {
return nil, fmt.Errorf("error retrieving template Unit(%s) from Registry: %v", uni.Template, err)
}

if tmpl != nil {
warnOnDifferentLocalUnit(arg, tmpl)
uf = schema.MapSchemaUnitOptionsToUnitFile(tmpl.Options)
log.Debugf("Template Unit(%s) found in registry", uni.Template)
} else {
// Finally, if we could not find a template unit in the Registry,
// check the local disk for one instead
file := path.Join(path.Dir(arg), uni.Template)
if _, err := os.Stat(file); os.IsNotExist(err) {
return nil, fmt.Errorf("unable to find Unit(%s) or template Unit(%s) in Registry or on filesystem", name, uni.Template)
}

uf, err = getUnitFromFile(file)
if err != nil {
return nil, fmt.Errorf("failed getting template Unit(%s) from file: %v", uni.Template, err)
}
}

return uf, nil
}

func getTunnelFlag() string {
tun := globalFlags.Tunnel
if tun != "" && !strings.Contains(tun, ":") {
Expand Down Expand Up @@ -598,8 +671,6 @@ func lazyCreateUnits(args []string) error {
errchan := make(chan error)
var wg sync.WaitGroup
for _, arg := range args {
// TODO(jonboulle): this loop is getting too unwieldy; factor it out

arg = maybeAppendDefaultUnitType(arg)
name := unitNameMangle(arg)

Expand All @@ -614,45 +685,12 @@ func lazyCreateUnits(args []string) error {
continue
}

var uf *unit.UnitFile
// Failing that, assume the name references a local unit file on disk, and attempt to load that, if it exists
// TODO(mischief): consolidate these two near-identical codepaths
if _, err := os.Stat(arg); !os.IsNotExist(err) {
uf, err = getUnitFromFile(arg)
if err != nil {
return fmt.Errorf("failed getting Unit(%s) from file: %v", arg, err)
}
} else {
// Otherwise (if the unit file does not exist), check if the name appears to be an instance unit,
// and if so, check for a corresponding template unit in the Registry
uni := unit.NewUnitNameInfo(name)
if uni == nil {
return fmt.Errorf("error extracting information from unit name %s", name)
} else if !uni.IsInstance() {
return fmt.Errorf("unable to find Unit(%s) in Registry or on filesystem", name)
}
tmpl, err := cAPI.Unit(uni.Template)
if err != nil {
return fmt.Errorf("error retrieving template Unit(%s) from Registry: %v", uni.Template, err)
}

// Finally, if we could not find a template unit in the Registry, check the local disk for one instead
if tmpl == nil {
file := path.Join(path.Dir(arg), uni.Template)
if _, err := os.Stat(file); os.IsNotExist(err) {
return fmt.Errorf("unable to find Unit(%s) or template Unit(%s) in Registry or on filesystem", name, uni.Template)
}
uf, err = getUnitFromFile(file)
if err != nil {
return fmt.Errorf("failed getting template Unit(%s) from file: %v", uni.Template, err)
}
} else {
warnOnDifferentLocalUnit(arg, tmpl)
uf = schema.MapSchemaUnitOptionsToUnitFile(tmpl.Options)
}

// If we found a template unit, create a near-identical instance unit in
// the Registry - same unit file as the template, but different name
// Assume that the name references a local unit file on
// disk or if it is an instance unit and if so get its
// corresponding unit
uf, err := getUnitFile(arg)
if err != nil {
return err
}

_, err = createUnit(name, uf)
Expand Down Expand Up @@ -745,6 +783,56 @@ func setTargetStateOfUnits(units []string, state job.JobState) ([]*schema.Unit,
return triggered, nil
}

// getBlockAttempts gets the correct value of how many attempts to try
// before giving up on an operation.
// It returns a negative value which means try forever, if zero is
// returned then do not make any attempt, and if a positive value is
// returned then try up to that value
func getBlockAttempts() int {
// By default we wait forever
var attempts int = -1

// Up to BlockAttempts
if sharedFlags.BlockAttempts > 0 {
attempts = sharedFlags.BlockAttempts
}

// NoBlock we do not wait
if sharedFlags.NoBlock {
attempts = 0
}

return attempts
}

// tryWaitForUnitStates tries to wait for units to reach the desired state.
// It takes 5 arguments, the units to wait for, the desired state, the
// desired JobState, how many attempts before timing out and a writer
// interface.
// tryWaitForUnitStates polls each of the indicated units until they
// reach the desired state. If maxAttempts is zero, then it will not
// wait, it will assume that all units reached their desired state.
// If maxAttempts is negative tryWaitForUnitStates will retry forever, and
// if it is greater than zero, it will retry up to the indicated value.
// It returns 0 on success or 1 on errors.
func tryWaitForUnitStates(units []string, state string, js job.JobState, maxAttempts int, out io.Writer) (ret int) {
// We do not wait just assume we reached the desired state
if maxAttempts == 0 {
for _, name := range units {
stdout("Triggered unit %s %s", name, state)
}
return
}

errchan := waitForUnitStates(units, js, maxAttempts, out)
for err := range errchan {
stderr("Error waiting for units: %v", err)
ret = 1
}

return
}

// waitForUnitStates polls each of the indicated units until each of their
// states is equal to that which the caller indicates, or until the
// polling operation times out. waitForUnitStates will retry forever, or
Expand All @@ -771,7 +859,7 @@ func waitForUnitStates(units []string, js job.JobState, maxAttempts int, out io.
func checkUnitState(name string, js job.JobState, maxAttempts int, out io.Writer, wg *sync.WaitGroup, errchan chan error) {
defer wg.Done()

sleep := 500 * time.Millisecond
sleep := defaultSleepTime

if maxAttempts < 1 {
for {
Expand Down
Loading