Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ In CI‑node mode, DDTest also fans out across local CPUs on that node and furth
| `--min-parallelism` | `DD_TEST_OPTIMIZATION_RUNNER_MIN_PARALLELISM` | vCPU count | Minimum workers to use for the split. |
| `--max-parallelism` | `DD_TEST_OPTIMIZATION_RUNNER_MAX_PARALLELISM` | vCPU count | Maximum workers to use for the split. |
| | `DD_TEST_OPTIMIZATION_RUNNER_CI_NODE` | `-1` (off) | Restrict this run to the slice assigned to node **N** (0‑indexed). Also parallelizes within the node across its CPUs. |
| `--worker-env` | `DD_TEST_OPTIMIZATION_RUNNER_WORKER_ENV` | `""` | Template env vars per local worker (e.g., isolate DBs): `--worker-env "DATABASE_NAME_TEST=app_test{{nodeIndex}}"`. |
| `--ci-node-workers` | `DD_TEST_OPTIMIZATION_RUNNER_CI_NODE_WORKERS` | vCPU count | Number of parallel workers per CI node. Tests assigned to a CI node are further split among this many local workers. |
| `--worker-env` | `DD_TEST_OPTIMIZATION_RUNNER_WORKER_ENV` | `""` | Template env vars per worker: `--worker-env "DATABASE_NAME_TEST=app_test{{nodeIndex}}"`. In CI-node mode, `{{nodeIndex}}` is the global worker index (`ciNode * ciNodeWorkers + localWorkerIndex`). |
| `--command` | `DD_TEST_OPTIMIZATION_RUNNER_COMMAND` | `""` | Override the default test command used by the framework. When provided, takes precedence over auto-detection (e.g., `--command "bundle exec custom-rspec"`). |
| `--tests-location` | `DD_TEST_OPTIMIZATION_RUNNER_TESTS_LOCATION` | `""` | Custom glob pattern to discover test files (e.g., `--tests-location "custom/spec/**/*_spec.rb"`). Defaults to `spec/**/*_spec.rb` for RSpec, `test/**/*_test.rb` for Minitest. |
| `--runtime-tags` | `DD_TEST_OPTIMIZATION_RUNNER_RUNTIME_TAGS` | `""` | JSON string to override runtime tags used to fetch skippable tests. Useful for local development on a different OS than CI (e.g., `--runtime-tags '{"os.platform":"linux","runtime.version":"3.2.0"}'`). |
Expand Down Expand Up @@ -479,7 +480,6 @@ The `--runtime-tags` option lets you override your local runtime tags to match y
![Runtime tags in Datadog](docs/images/runtime-tags-datadog.png)

Note the following tags:

- `os.architecture` (e.g., `x86_64`)
- `os.platform` (e.g., `linux`)
- `os.version` (e.g., `6.8.0-aws`)
Expand Down
5 changes: 5 additions & 0 deletions internal/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func init() {
rootCmd.PersistentFlags().Int("min-parallelism", defaultParallelism, "Minimum number of parallel test processes (default: number of CPUs)")
rootCmd.PersistentFlags().Int("max-parallelism", defaultParallelism, "Maximum number of parallel test processes (default: number of CPUs)")
rootCmd.PersistentFlags().String("worker-env", "", "Worker environment configuration")
rootCmd.PersistentFlags().Int("ci-node-workers", defaultParallelism, "Number of parallel workers per CI node (default: number of CPUs)")
rootCmd.PersistentFlags().String("command", "", "Test command that ddtest should wrap")
rootCmd.PersistentFlags().String("tests-location", "", "Glob pattern used to discover test files")
rootCmd.PersistentFlags().String("runtime-tags", "", "JSON string to override runtime tags (e.g. '{\"os.platform\":\"linux\",\"runtime.version\":\"3.2.0\"}')")
Expand All @@ -83,6 +84,10 @@ func init() {
fmt.Fprintf(os.Stderr, "Error binding worker-env flag: %v\n", err)
os.Exit(1)
}
if err := viper.BindPFlag("ci_node_workers", rootCmd.PersistentFlags().Lookup("ci-node-workers")); err != nil {
fmt.Fprintf(os.Stderr, "Error binding ci-node-workers flag: %v\n", err)
os.Exit(1)
}
if err := viper.BindPFlag("command", rootCmd.PersistentFlags().Lookup("command")); err != nil {
fmt.Fprintf(os.Stderr, "Error binding command flag: %v\n", err)
os.Exit(1)
Expand Down
79 changes: 77 additions & 2 deletions internal/runner/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,98 @@ import (

"github.com/DataDog/ddtest/internal/constants"
"github.com/DataDog/ddtest/internal/framework"
"github.com/DataDog/ddtest/internal/settings"
"golang.org/x/sync/errgroup"
)

// splitTestFilesIntoGroups splits a slice of test files into n groups
// using simple round-robin distribution
func splitTestFilesIntoGroups(testFiles []string, n int) [][]string {
if n <= 0 {
n = 1
}

result := make([][]string, n)
for i := range result {
result[i] = []string{}
}

for i, file := range testFiles {
groupIndex := i % n
result[groupIndex] = append(result[groupIndex], file)
}

return result
}

// runCINodeTests executes tests for a specific CI node (one split, not the whole tests set)
// It further splits the node's tests among local workers based on ci_node_workers setting.
func runCINodeTests(ctx context.Context, framework framework.Framework, workerEnvMap map[string]string, ciNode int) error {
return runCINodeTestsWithWorkers(ctx, framework, workerEnvMap, ciNode, settings.GetCiNodeWorkers())
}

// runCINodeTestsWithWorkers is the internal implementation that accepts ciNodeWorkers as a parameter
// for easier testing.
func runCINodeTestsWithWorkers(ctx context.Context, framework framework.Framework, workerEnvMap map[string]string, ciNode int, ciNodeWorkers int) error {
runnerFilePath := fmt.Sprintf("%s/runner-%d", constants.TestsSplitDir, ciNode)
if _, err := os.Stat(runnerFilePath); os.IsNotExist(err) {
return fmt.Errorf("runner file for ci-node %d does not exist: %s", ciNode, runnerFilePath)
}

slog.Info("Running tests for specific CI node", "ciNode", ciNode, "filePath", runnerFilePath)
if err := runTestsFromFile(ctx, framework, runnerFilePath, workerEnvMap, ciNode); err != nil {
testFiles, err := readTestFilesFromFile(runnerFilePath)
if err != nil {
return fmt.Errorf("failed to read test files for ci-node %d from %s: %w", ciNode, runnerFilePath, err)
}

if len(testFiles) == 0 {
slog.Info("No tests to run for CI node", "ciNode", ciNode)
return nil
}

// Single worker mode: run all tests with global index = ciNode * ciNodeWorkers
if ciNodeWorkers <= 1 {
globalIndex := ciNode
slog.Info("Running tests for CI node in single-worker mode", "ciNode", ciNode, "globalIndex", globalIndex)
return runTestsWithGlobalIndex(ctx, framework, testFiles, workerEnvMap, globalIndex)
}

// Multi-worker mode: split tests among local workers
slog.Info("Running tests for CI node in parallel mode",
"ciNode", ciNode, "ciNodeWorkers", ciNodeWorkers, "testFilesCount", len(testFiles))

groups := splitTestFilesIntoGroups(testFiles, ciNodeWorkers)

var g errgroup.Group
for localIndex, groupFiles := range groups {
if len(groupFiles) == 0 {
continue
}

// Global index = ciNode * ciNodeWorkers + localIndex
globalIndex := ciNode*ciNodeWorkers + localIndex
g.Go(func() error {
return runTestsWithGlobalIndex(ctx, framework, groupFiles, workerEnvMap, globalIndex)
Comment on lines +80 to +83

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Ensure global worker indices stay unique across CI nodes

The global index formula here (ciNode*ciNodeWorkers + localIndex) assumes every CI node uses the same ciNodeWorkers value, but runCINodeTests sources it from settings (default runtime.NumCPU) per node. In heterogeneous CI pools (different CPU counts), this yields overlapping indices (e.g., node0 with 8 workers produces 0–7, node1 with 4 workers produces 4–7), which defeats the stated uniqueness guarantee and can cause {{nodeIndex}}-based worker envs (DB names, ports) to collide across nodes unless users manually align ci-node-workers everywhere.

Useful? React with 👍 / 👎.

})
}

if err := g.Wait(); err != nil {
return fmt.Errorf("failed to run tests for ci-node %d: %w", ciNode, err)
}
return nil
}

// runTestsWithGlobalIndex runs a set of test files with the given global worker index for env templating
func runTestsWithGlobalIndex(ctx context.Context, framework framework.Framework, testFiles []string, workerEnvMap map[string]string, globalIndex int) error {
// Create a copy of the worker env map and replace nodeIndex placeholder with global index
workerEnv := make(map[string]string)
for key, value := range workerEnvMap {
workerEnv[key] = strings.ReplaceAll(value, constants.NodeIndexPlaceholder, fmt.Sprintf("%d", globalIndex))
}

slog.Info("Running tests in worker", "globalIndex", globalIndex, "testFilesCount", len(testFiles), "workerEnv", workerEnv)
return framework.RunTests(ctx, testFiles, workerEnv)
}

// runParallelTests executes tests across multiple parallel runners on a single node
func runParallelTests(ctx context.Context, framework framework.Framework, workerEnvMap map[string]string) error {
slog.Info("Running tests in parallel mode")
Expand Down
Loading
Loading