Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ Key and Value pairs are specified in a specific format documented in the [Tutori
| `epcc runbooks show <RUNBOOK> <ACTION>` | Show a specific runbook (script) |
| `epcc runbooks validate` | Validates all runbooks (built in and user supplied, outputting any errors) |
| `epcc runbooks run <RUNBOOK> <ACTION>` | Run a specific runbook (script) |
| `epcc runbooks exec-script <FILE>` | Execute a standalone YAML script file containing epcc commands |

#### Tuning Runbooks

Expand Down
327 changes: 187 additions & 140 deletions cmd/runbooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cmd
import (
"context"
"fmt"
"os"
"strconv"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -36,6 +37,7 @@ func initRunbookCommands() {

runbookGlobalCmd.AddCommand(initRunbookShowCommands())
runbookGlobalCmd.AddCommand(initRunbookRunCommands())
runbookGlobalCmd.AddCommand(initRunbookRunScriptCmd())
runbookGlobalCmd.AddCommand(initRunbookDevCommands())
runbookGlobalCmd.AddCommand(initRunbookValidateCommands())
}
Expand Down Expand Up @@ -262,185 +264,230 @@ func initRunbookRunCommands() *cobra.Command {
Long: runbookAction.Description.Long,
Short: runbookAction.Description.Short,
RunE: func(cmd *cobra.Command, args []string) error {
numSteps := len(runbookAction.RawCommands)
return processRunBookCommands(runbook.Name, runbookStringArguments, runbookAction, maxConcurrency, execTimeoutInSeconds)
},
}
processRunbookVariablesOnCommand(runbookActionRunActionCommand, runbookStringArguments, runbookAction.Variables, true)

parentCtx := clictx.Ctx
runbookRunRunbookCmd.AddCommand(runbookActionRunActionCommand)
}
}

ctx, cancelFunc := context.WithCancel(parentCtx)
return runbookRunCommand
}

concurrentRunSemaphore := semaphore.NewWeighted(int64(*maxConcurrency))
factory := pool.NewPooledObjectFactorySimple(
func(ctx2 context.Context) (interface{}, error) {
return generateRunbookCmd(), nil
})
func initRunbookRunScriptCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "exec-script",
Short: "Execute a YAML script file containing epcc commands",
Args: cobra.ExactArgs(1),
SilenceUsage: true,
}

objectPool := pool.NewObjectPool(ctx, factory, &pool.ObjectPoolConfig{
MaxTotal: *maxConcurrency,
MaxIdle: *maxConcurrency,
})
execTimeoutInSeconds := cmd.Flags().Int64("execution-timeout", 900, "How long should the script take to execute before timing out")
maxConcurrency := cmd.Flags().Int("max-concurrency", 20, "Maximum number of commands that can run simultaneously")

rawCmds := runbookAction.RawCommands
for stepIdx := 0; stepIdx < len(rawCmds); stepIdx++ {
cmd.ValidArgsFunction = func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
return []string{"yaml", "yml"}, cobra.ShellCompDirectiveFilterFileExt
}

origIndex := &stepIdx
// Create a copy of loop variables
stepIdx := stepIdx
rawCmd := rawCmds[stepIdx]
cmd.PersistentPreRunE = func(cmd *cobra.Command, args []string) error {
return RootCmd.PersistentPreRunE(RootCmd, args)
}

templateName := fmt.Sprintf("Runbook: %s Action: %s Step: %d", runbook.Name, runbookAction.Name, stepIdx)
rawCmdLines, err := runbooks.RenderTemplates(templateName, rawCmd, runbookStringArguments, runbookAction.Variables)
cmd.RunE = func(cmd *cobra.Command, args []string) error {
data, err := os.ReadFile(args[0])
if err != nil {
return fmt.Errorf("could not read file %s: %v", args[0], err)
}

if err != nil {
cancelFunc()
return err
}
var commands []string
err = yaml.Unmarshal(data, &commands)
if err != nil {
return fmt.Errorf("could not parse YAML file %s: %v", args[0], err)
}

joinedString := strings.Join(rawCmdLines, "\n")
renderedCmd := []string{}
runbookAction := &runbooks.RunbookAction{
RawCommands: commands,
}

err = yaml.Unmarshal([]byte(joinedString), &renderedCmd)
return processRunBookCommands("exec-script", map[string]*string{}, runbookAction, maxConcurrency, execTimeoutInSeconds)
}

if err == nil {
log.Tracef("Line %d is a Yaml array %s, inserting into stack", stepIdx, joinedString)
newCmds := make([]string, 0, len(rawCmds)+len(renderedCmd)-1)
newCmds = append(newCmds, rawCmds[0:stepIdx]...)
newCmds = append(newCmds, renderedCmd...)
newCmds = append(newCmds, rawCmds[stepIdx+1:]...)
rawCmds = newCmds
*origIndex--
continue
}
return cmd
}

log.Infof("Executing> %s", rawCmd)
resultChan := make(chan *commandResult, *maxConcurrency*2)
funcs := make([]func(), 0, len(rawCmdLines))
func processRunBookCommands(runbookName string, runbookStringArguments map[string]*string, runbookAction *runbooks.RunbookAction, maxConcurrency *int, execTimeoutInSeconds *int64) error {
numSteps := len(runbookAction.RawCommands)

for commandIdx, rawCmdLine := range rawCmdLines {
parentCtx := clictx.Ctx

commandIdx := commandIdx
rawCmdLine := strings.Trim(rawCmdLine, " \n")
ctx, cancelFunc := context.WithCancel(parentCtx)

if rawCmdLine == "" {
// Allow blank lines
continue
}
concurrentRunSemaphore := semaphore.NewWeighted(int64(*maxConcurrency))
factory := pool.NewPooledObjectFactorySimple(
func(ctx2 context.Context) (interface{}, error) {
return generateRunbookCmd(), nil
})

if !strings.HasPrefix(rawCmdLine, "epcc ") {
// Some commands like sleep don't have prefix
// This hack allows them to run
rawCmdLine = "epcc " + rawCmdLine
}
rawCmdArguments, err := shellwords.SplitPosix(strings.Trim(rawCmdLine, " \n"))
objectPool := pool.NewObjectPool(ctx, factory, &pool.ObjectPoolConfig{
MaxTotal: *maxConcurrency,
MaxIdle: *maxConcurrency,
})

if err != nil {
cancelFunc()
return err
}
rawCmds := runbookAction.RawCommands
for stepIdx := 0; stepIdx < len(rawCmds); stepIdx++ {

funcs = append(funcs, func() {
origIndex := &stepIdx
// Create a copy of loop variables
stepIdx := stepIdx
rawCmd := rawCmds[stepIdx]

log.Tracef("(Step %d/%d Command %d/%d) Building Commmand", stepIdx+1, numSteps, commandIdx+1, len(funcs))
templateName := fmt.Sprintf("Runbook: %s Action: %s Step: %d", runbookName, runbookAction.Name, stepIdx)
rawCmdLines, err := runbooks.RenderTemplates(templateName, rawCmd, runbookStringArguments, runbookAction.Variables)

stepCmdObject, err := objectPool.BorrowObject(ctx)
defer objectPool.ReturnObject(ctx, stepCmdObject)
if err != nil {
cancelFunc()
return err
}

if err == nil {
commandAndResetFunc := stepCmdObject.(*CommandAndReset)
commandAndResetFunc.reset()
stepCmd := commandAndResetFunc.cmd
joinedString := strings.Join(rawCmdLines, "\n")
renderedCmd := []string{}

tweakedArguments := misc.AddImplicitDoubleDash(rawCmdArguments)
stepCmd.SetArgs(tweakedArguments[1:])
err = yaml.Unmarshal([]byte(joinedString), &renderedCmd)

stepCmd.SilenceErrors = true
log.Tracef("(Step %d/%d Command %d/%d) Starting Command", stepIdx+1, numSteps, commandIdx+1, len(funcs))
if err == nil {
log.Tracef("Line %d is a Yaml array %s, inserting into stack", stepIdx, joinedString)
newCmds := make([]string, 0, len(rawCmds)+len(renderedCmd)-1)
newCmds = append(newCmds, rawCmds[0:stepIdx]...)
newCmds = append(newCmds, renderedCmd...)
newCmds = append(newCmds, rawCmds[stepIdx+1:]...)
rawCmds = newCmds
*origIndex--
continue
}

stepCmd.ResetFlags()
err = stepCmd.ExecuteContext(ctx)
log.Tracef("(Step %d/%d Command %d/%d) Complete Command", stepIdx+1, numSteps, commandIdx+1, len(funcs))
}
log.Infof("Executing> %s", rawCmd)
resultChan := make(chan *commandResult, *maxConcurrency*2)
funcs := make([]func(), 0, len(rawCmdLines))

commandResult := &commandResult{
stepIdx: stepIdx,
commandIdx: commandIdx,
commandLine: rawCmdLine,
error: err,
}
for commandIdx, rawCmdLine := range rawCmdLines {

resultChan <- commandResult
commandIdx := commandIdx
rawCmdLine := strings.Trim(rawCmdLine, " \n")

})
if rawCmdLine == "" {
// Allow blank lines
continue
}

}
if !strings.HasPrefix(rawCmdLine, "epcc ") {
// Some commands like sleep don't have prefix
// This hack allows them to run
rawCmdLine = "epcc " + rawCmdLine
}
rawCmdArguments, err := shellwords.SplitPosix(strings.Trim(rawCmdLine, " \n"))

if len(funcs) > 1 {
log.Debugf("Running %d commands", len(funcs))
}
if err != nil {
cancelFunc()
return err
}

// Start processing all the functions
go func() {
for idx, fn := range funcs {
idx := idx
if shutdown.ShutdownFlag.Load() {
log.Infof("Aborting runbook execution, after %d scheduled executions", idx)
cancelFunc()
break
}
funcs = append(funcs, func() {

fn := fn
log.Tracef("Run %d is waiting on semaphore", idx)
if err := concurrentRunSemaphore.Acquire(ctx, 1); err == nil {
go func() {
log.Tracef("Run %d is starting", idx)
defer concurrentRunSemaphore.Release(1)
fn()
}()
} else {
log.Warnf("Run %d failed to get semaphore %v", idx, err)
}
}
}()

errorCount := 0
for i := 0; i < len(funcs); i++ {
select {
case result := <-resultChan:
if !shutdown.ShutdownFlag.Load() {
if result.error != nil {
log.Warnf("(Step %d/%d Command %d/%d) %v", result.stepIdx+1, numSteps, result.commandIdx+1, len(funcs), fmt.Errorf("error processing command [%s], %w", result.commandLine, result.error))
errorCount++
} else {
log.Debugf("(Step %d/%d Command %d/%d) finished successfully ", result.stepIdx+1, numSteps, result.commandIdx+1, len(funcs))
}
} else {
log.Tracef("Shutdown flag enabled, completion result %v", result)
cancelFunc()
}
case <-time.After(time.Duration(*execTimeoutInSeconds) * time.Second):
return fmt.Errorf("timeout of %d seconds reached, only %d of %d commands finished of step %d/%d", *execTimeoutInSeconds, i+1, len(funcs), stepIdx+1, numSteps)
log.Tracef("(Step %d/%d Command %d/%d) Building Commmand", stepIdx+1, numSteps, commandIdx+1, len(funcs))

}
}
stepCmdObject, err := objectPool.BorrowObject(ctx)
defer objectPool.ReturnObject(ctx, stepCmdObject)

if len(funcs) > 1 {
log.Debugf("Running %d commands complete", len(funcs))
}
if err == nil {
commandAndResetFunc := stepCmdObject.(*CommandAndReset)
commandAndResetFunc.reset()
stepCmd := commandAndResetFunc.cmd

if !runbookAction.IgnoreErrors && errorCount > 0 {
return fmt.Errorf("error occurred while processing script aborting")
}
tweakedArguments := misc.AddImplicitDoubleDash(rawCmdArguments)
stepCmd.SetArgs(tweakedArguments[1:])

stepCmd.SilenceErrors = true
log.Tracef("(Step %d/%d Command %d/%d) Starting Command", stepIdx+1, numSteps, commandIdx+1, len(funcs))

stepCmd.ResetFlags()
err = stepCmd.ExecuteContext(ctx)
log.Tracef("(Step %d/%d Command %d/%d) Complete Command", stepIdx+1, numSteps, commandIdx+1, len(funcs))
}

commandResult := &commandResult{
stepIdx: stepIdx,
commandIdx: commandIdx,
commandLine: rawCmdLine,
error: err,
}

resultChan <- commandResult

})

}

if len(funcs) > 1 {
log.Debugf("Running %d commands", len(funcs))
}

// Start processing all the functions
go func() {
for idx, fn := range funcs {
idx := idx
if shutdown.ShutdownFlag.Load() {
log.Infof("Aborting runbook execution, after %d scheduled executions", idx)
cancelFunc()
break
}

fn := fn
log.Tracef("Run %d is waiting on semaphore", idx)
if err := concurrentRunSemaphore.Acquire(ctx, 1); err == nil {
go func() {
log.Tracef("Run %d is starting", idx)
defer concurrentRunSemaphore.Release(1)
fn()
}()
} else {
log.Warnf("Run %d failed to get semaphore %v", idx, err)
}
}
}()

errorCount := 0
for i := 0; i < len(funcs); i++ {
select {
case result := <-resultChan:
if !shutdown.ShutdownFlag.Load() {
if result.error != nil {
log.Warnf("(Step %d/%d Command %d/%d) %v", result.stepIdx+1, numSteps, result.commandIdx+1, len(funcs), fmt.Errorf("error processing command [%s], %w", result.commandLine, result.error))
errorCount++
} else {
log.Debugf("(Step %d/%d Command %d/%d) finished successfully ", result.stepIdx+1, numSteps, result.commandIdx+1, len(funcs))
}
defer cancelFunc()
return nil
},
} else {
log.Tracef("Shutdown flag enabled, completion result %v", result)
cancelFunc()
}
case <-time.After(time.Duration(*execTimeoutInSeconds) * time.Second):
return fmt.Errorf("timeout of %d seconds reached, only %d of %d commands finished of step %d/%d", *execTimeoutInSeconds, i+1, len(funcs), stepIdx+1, numSteps)

}
processRunbookVariablesOnCommand(runbookActionRunActionCommand, runbookStringArguments, runbookAction.Variables, true)
}

runbookRunRunbookCmd.AddCommand(runbookActionRunActionCommand)
if len(funcs) > 1 {
log.Debugf("Running %d commands complete", len(funcs))
}
}

return runbookRunCommand
if !runbookAction.IgnoreErrors && errorCount > 0 {
return fmt.Errorf("error occurred while processing script aborting")
}
}
defer cancelFunc()
return nil
}

func processRunbookVariablesOnCommand(runbookActionRunActionCommand *cobra.Command, runbookStringArguments map[string]*string, variables map[string]runbooks.Variable, enableRequiredVars bool) {
Expand Down
Loading
Loading