diff --git a/docs/how-to-run-multi-node.md b/docs/how-to-run-multi-node.md index 5c84e6cf..140f18d6 100644 --- a/docs/how-to-run-multi-node.md +++ b/docs/how-to-run-multi-node.md @@ -75,17 +75,144 @@ You can see at the end of these commands, we are pointing DLM/MAD to the shared- **NOTE: The above commands assumes the shared-file system is mounted at `/nfs` in the commands above. If this is not the case and a user simply copies/pastes the above commands on two nodes, DLM/MAD will create a folder called `nfs` on each node and copy the data there, which is not desired behavior.** +## SLURM Cluster Integration + +madengine now supports running workloads on SLURM clusters, allowing you to leverage job scheduling and resource management for multi-node training and inference. + +### Overview + +When `slurm_args` is provided in the `additional-context`, madengine will: +1. Parse the SLURM configuration parameters +2. Submit the job directly to the SLURM cluster using `sbatch` +3. Skip the standard Docker container build and run workflow +4. Execute the model-specific script (e.g., `scripts/sglang_disagg/run.sh`) which handles SLURM job submission + +### SLURM Arguments + +The following arguments can be specified in the `slurm_args` dictionary: + +| Argument | Description | Required | Default | Example | +|----------|-------------|----------|---------|---------| +| `FRAMEWORK` | Framework to use for the job | Yes | - | `'sglang_disagg'` | +| `PREFILL_NODES` | Number of nodes for prefill phase | Yes | - | `'2'` | +| `DECODE_NODES` | Number of nodes for decode phase | Yes | - | `'2'` | +| `PARTITION` | SLURM partition/queue name | Yes | - | `'amd-rccl'` | +| `TIME` | Maximum job runtime (HH:MM:SS) | Yes | - | `'12:00:00'` | +| `DOCKER_IMAGE` | Docker image to use | No | `''` | `'myregistry/image:tag'` | +| `EXCLUSIVE_MODE` | Request exclusive node access | No | `True` | `True` or `False` | + +### Usage Examples + +#### Basic SLURM Job Submission + +To run a model on SLURM with default settings: + +```bash +madengine run --tags sglang_disagg_pd_qwen3-32B \ + --additional-context "{'slurm_args': { + 'FRAMEWORK': 'sglang_disagg', + 'PREFILL_NODES': '2', + 'DECODE_NODES': '2', + 'PARTITION': 'amd-rccl', + 'TIME': '12:00:00', + 'DOCKER_IMAGE': '' + }}" +``` + +#### Custom Docker Image + +To specify a custom Docker image for the SLURM job: + +```bash +madengine run --tags sglang_disagg_pd_qwen3-32B \ + --additional-context "{'slurm_args': { + 'FRAMEWORK': 'sglang_disagg', + 'PREFILL_NODES': '4', + 'DECODE_NODES': '4', + 'PARTITION': 'gpu-high-priority', + 'TIME': '24:00:00', + 'DOCKER_IMAGE': 'myregistry/custom-image:latest' + }}" +``` + +#### Running Different Model Configurations + +For DeepSeek-V2 model: + +```bash +madengine run --tags sglang_disagg_pd_deepseek_v2 \ + --additional-context "{'slurm_args': { + 'FRAMEWORK': 'sglang_disagg', + 'PREFILL_NODES': '8', + 'DECODE_NODES': '8', + 'PARTITION': 'amd-mi300x', + 'TIME': '48:00:00', + 'DOCKER_IMAGE': '' + }}" +``` + +#### Using Exclusive Mode + +By default, `EXCLUSIVE_MODE` is `True`, which requests exclusive access to nodes (recommended for distributed inference). To share nodes with other jobs: + +```bash +madengine run --tags sglang_disagg_pd_qwen3-32B \ + --additional-context "{'slurm_args': { + 'FRAMEWORK': 'sglang_disagg', + 'PREFILL_NODES': '2', + 'DECODE_NODES': '2', + 'PARTITION': 'amd-rccl', + 'TIME': '12:00:00', + 'DOCKER_IMAGE': '', + 'EXCLUSIVE_MODE': False + }}" +``` + +**Note:** Exclusive mode (`--exclusive` in SLURM) is typically recommended for distributed multi-node workloads to ensure consistent performance and avoid interference from other jobs running on the same nodes. + +### Model Configuration + +Models configured for SLURM should include the model name in the `args` attribute of `models.json`. For example: + +```json +{ + "name": "sglang_disagg_pd_qwen3-32B", + "args": "--model Qwen3-32B", + "tags": ["sglang_disagg"] +} +``` + +The model name (e.g., `Qwen/Qwen3-32B`) will be extracted and set as the `MODEL_NAME` environment variable for the SLURM job. + +### Requirements + +To use SLURM integration, ensure the following are available: + +1. **SLURM Cluster Access**: Access to a SLURM cluster with proper credentials +2. **Model Scripts**: Framework-specific scripts (e.g., `scripts/sglang_disagg/run.sh`) that handle SLURM job submission + +### How It Works + +1. **Context Parsing**: madengine detects `slurm_args` in the additional context +2. **Model Selection**: Extracts model information from `models.json` based on the provided tags +3. **Environment Setup**: Prepares environment variables including `MODEL_NAME`, node counts, partition, etc. +4. **Job Submission**: Executes the framework-specific run script which submits the SLURM job using `sbatch` +5. **Job Monitoring**: The SLURM cluster manages job execution, resource allocation, and scheduling + ## TODO ### RUNNER +- [x] torchrun - [ ] mpirun (requires ansible integration) -### Job Schedulare +### Job Scheduler -- [ ] SLURM +- [x] SLURM (via slurm_args integration) - [ ] Kubernetes ### Design Consideration -- [ ] Having the python model script launched by individual bash scripts can be limiting for multi-node. Perhaps we can explore a full python workflow for multi-node and only the job scheduler uses a bash script like SLURM using sbatch script. +- [x] SLURM integration using sbatch scripts for job submission +- [ ] Full Python workflow for multi-node (without bash script intermediaries) +- [ ] Kubernetes-native job scheduling integration diff --git a/src/madengine/core/context.py b/src/madengine/core/context.py index cb628a69..4153af4f 100644 --- a/src/madengine/core/context.py +++ b/src/madengine/core/context.py @@ -92,22 +92,43 @@ def __init__( else: print("Warning: unknown numa balancing setup ...") - # Keeping gpu_vendor for filterning purposes, if we filter using file names we can get rid of this attribute. - self.ctx["gpu_vendor"] = self.get_gpu_vendor() - - # Initialize the docker context - self.ctx["docker_env_vars"] = {} - self.ctx["docker_env_vars"]["MAD_GPU_VENDOR"] = self.ctx["gpu_vendor"] - self.ctx["docker_env_vars"]["MAD_SYSTEM_NGPUS"] = self.get_system_ngpus() - self.ctx["docker_env_vars"]["MAD_SYSTEM_GPU_ARCHITECTURE"] = self.get_system_gpu_architecture() - self.ctx["docker_env_vars"]["MAD_SYSTEM_GPU_PRODUCT_NAME"] = self.get_system_gpu_product_name() - self.ctx['docker_env_vars']['MAD_SYSTEM_HIP_VERSION'] = self.get_system_hip_version() - self.ctx["docker_build_arg"] = { - "MAD_SYSTEM_GPU_ARCHITECTURE": self.get_system_gpu_architecture(), - "MAD_SYSTEM_GPU_PRODUCT_NAME": self.get_system_gpu_product_name() - } - self.ctx["docker_gpus"] = self.get_docker_gpus() - self.ctx["gpu_renderDs"] = self.get_gpu_renderD_nodes() + # Check if SLURM mode is requested before GPU detection + is_slurm_mode = self._is_slurm_mode(additional_context, additional_context_file) + + if is_slurm_mode: + # For SLURM mode, set minimal GPU context to avoid detection on control node + print("SLURM mode detected - skipping GPU detection on control node") + self.ctx["gpu_vendor"] = "AMD" # Default to AMD for SLURM environments + self.ctx["docker_env_vars"] = {} + self.ctx["docker_env_vars"]["MAD_GPU_VENDOR"] = self.ctx["gpu_vendor"] + self.ctx["docker_env_vars"]["MAD_SYSTEM_NGPUS"] = "8" # Default value for SLURM + self.ctx["docker_env_vars"]["MAD_SYSTEM_GPU_ARCHITECTURE"] = "gfx90a" # Default for SLURM + self.ctx["docker_env_vars"]["MAD_SYSTEM_GPU_PRODUCT_NAME"] = "AMD_GPU" # Default value + self.ctx['docker_env_vars']['MAD_SYSTEM_HIP_VERSION'] = "5.0.0" # Default value + self.ctx["docker_build_arg"] = { + "MAD_SYSTEM_GPU_ARCHITECTURE": "gfx90a", + "MAD_SYSTEM_GPU_PRODUCT_NAME": "AMD_GPU" + } + self.ctx["docker_gpus"] = "0,1,2,3,4,5,6,7" # Default GPU list + self.ctx["gpu_renderDs"] = [128, 129, 130, 131, 132, 133, 134, 135] # Default renderD nodes + else: + # Normal mode - detect GPUs + # Keeping gpu_vendor for filterning purposes, if we filter using file names we can get rid of this attribute. + self.ctx["gpu_vendor"] = self.get_gpu_vendor() + + # Initialize the docker context + self.ctx["docker_env_vars"] = {} + self.ctx["docker_env_vars"]["MAD_GPU_VENDOR"] = self.ctx["gpu_vendor"] + self.ctx["docker_env_vars"]["MAD_SYSTEM_NGPUS"] = self.get_system_ngpus() + self.ctx["docker_env_vars"]["MAD_SYSTEM_GPU_ARCHITECTURE"] = self.get_system_gpu_architecture() + self.ctx["docker_env_vars"]["MAD_SYSTEM_GPU_PRODUCT_NAME"] = self.get_system_gpu_product_name() + self.ctx['docker_env_vars']['MAD_SYSTEM_HIP_VERSION'] = self.get_system_hip_version() + self.ctx["docker_build_arg"] = { + "MAD_SYSTEM_GPU_ARCHITECTURE": self.get_system_gpu_architecture(), + "MAD_SYSTEM_GPU_PRODUCT_NAME": self.get_system_gpu_product_name() + } + self.ctx["docker_gpus"] = self.get_docker_gpus() + self.ctx["gpu_renderDs"] = self.get_gpu_renderD_nodes() # Default multi-node configuration self.ctx['multi_node_args'] = { @@ -148,6 +169,40 @@ def __init__( # Set multi-node runner after context update self.ctx['docker_env_vars']['MAD_MULTI_NODE_RUNNER'] = self.set_multi_node_runner() + def _is_slurm_mode(self, additional_context: str = None, additional_context_file: str = None) -> bool: + """Check if SLURM mode is requested. + + Args: + additional_context: The additional context string. + additional_context_file: The additional context file. + + Returns: + bool: True if SLURM mode is detected, False otherwise. + """ + import ast + import json + + # Check additional_context_file first + if additional_context_file: + try: + with open(additional_context_file) as f: + context_data = json.load(f) + if 'slurm_args' in context_data: + return True + except Exception: + pass + + # Check additional_context string + if additional_context: + try: + dict_additional_context = ast.literal_eval(additional_context) + if 'slurm_args' in dict_additional_context: + return True + except Exception: + pass + + return False + def get_ctx_test(self) -> str: """Get context test. diff --git a/src/madengine/tools/run_models.py b/src/madengine/tools/run_models.py index 4f26450d..e1ab7f96 100644 --- a/src/madengine/tools/run_models.py +++ b/src/madengine/tools/run_models.py @@ -199,6 +199,11 @@ def clean_up_docker_container(self, is_cleaned: bool = False) -> None: self.console.sh("docker ps -a || true") self.console.sh("docker kill $(docker ps -q) || true") + # Skip GPU info display in SLURM mode as control node may not have GPUs + if "slurm_args" in self.context.ctx: + print("SLURM mode detected - skipping GPU info display on control node") + return + # get gpu vendor gpu_vendor = self.context.ctx["docker_env_vars"]["MAD_GPU_VENDOR"] # show gpu info @@ -884,6 +889,246 @@ def run_model_impl( # explicitly delete model docker to stop the container, without waiting for the in-built garbage collector del model_docker + def run_model_slurm(self, model_info: typing.Dict) -> bool: + """Run model on SLURM cluster. + + Args: + model_info: The model information. + + Returns: + bool: The status of running model on SLURM cluster. + + Raises: + Exception: An error occurred while running model on SLURM cluster. + """ + print(f"Running model {model_info['name']} on SLURM cluster") + + # Extract SLURM arguments from context + slurm_args = self.context.ctx["slurm_args"] + + # Validate required SLURM arguments + required_args = ["FRAMEWORK", "PREFILL_NODES", "DECODE_NODES", "PARTITION", "TIME"] + for arg in required_args: + if arg not in slurm_args: + raise Exception(f"Missing required SLURM argument: {arg}") + + # Extract model name from model_info args (remove --model prefix) + model_name = "" + if "args" in model_info and model_info["args"]: + args_parts = model_info["args"].split() + if "--model" in args_parts: + model_index = args_parts.index("--model") + if model_index + 1 < len(args_parts): + model_name = args_parts[model_index + 1] + + if not model_name: + raise Exception(f"Could not extract model name from args: {model_info.get('args', '')}") + + print(f"Extracted model name: {model_name}") + + # Set up environment variables for the SLURM script + env_vars = { + "FRAMEWORK": slurm_args["FRAMEWORK"], + "PREFILL_NODES": str(slurm_args["PREFILL_NODES"]), + "DECODE_NODES": str(slurm_args["DECODE_NODES"]), + "PARTITION": slurm_args["PARTITION"], + "TIME": slurm_args["TIME"], + "MODEL_NAME": model_name + } + + # Add DOCKER_IMAGE if provided and not empty + if "DOCKER_IMAGE" in slurm_args and slurm_args["DOCKER_IMAGE"]: + env_vars["DOCKER_IMAGE"] = slurm_args["DOCKER_IMAGE"] + + # Add EXCLUSIVE_MODE if provided (default to true for multi-node jobs) + exclusive_mode = slurm_args.get("EXCLUSIVE_MODE", True) + if isinstance(exclusive_mode, bool): + env_vars["EXCLUSIVE_MODE"] = "true" if exclusive_mode else "false" + elif isinstance(exclusive_mode, str): + # Handle string values like "true", "false", "True", "False" + env_vars["EXCLUSIVE_MODE"] = exclusive_mode.lower() + else: + # Default to true for safety in multi-node distributed jobs + env_vars["EXCLUSIVE_MODE"] = "true" + + print(f"SLURM exclusive mode: {env_vars['EXCLUSIVE_MODE']}") + + # Set environment variables + for key, value in env_vars.items(): + os.environ[key] = value + print(f"Setting {key}={value}") + + # Prepare run details for result tracking + run_details = RunDetails() + run_details.model = model_info["name"] + run_details.n_gpus = model_info["n_gpus"] + run_details.training_precision = model_info["training_precision"] + run_details.args = model_info["args"] + run_details.tags = model_info["tags"] + run_details.pipeline = os.environ.get("pipeline") + run_details.machine_name = self.console.sh("hostname") + + try: + # Execute the SLURM script + script_path = model_info.get("scripts", "scripts/sglang_disagg/run.sh") + print(f"Executing SLURM script: {script_path}") + + # Make script executable + self.console.sh(f"chmod +x {script_path}") + + # Run the script without arguments (all parameters loaded from environment) + start_time = time.time() + log_file_path = f"{model_info['name'].replace('/', '_')}_slurm.live.log" + + with open(log_file_path, mode="w", buffering=1) as outlog: + with redirect_stdout(PythonicTee(outlog, self.args.live_output)), redirect_stderr(PythonicTee(outlog, self.args.live_output)): + result = self.console.sh(f"bash {script_path}", timeout=None) + + run_details.test_duration = time.time() - start_time + print(f"SLURM execution duration: {run_details.test_duration} seconds") + + # Extract performance metrics from log + multiple_results = model_info.get("multiple_results") + + if multiple_results: + run_details.performance = multiple_results + # Check if the results file exists and is valid + if os.path.exists(multiple_results): + with open(multiple_results, 'r') as f: + header = f.readline().strip().split(',') + for line in f: + row = line.strip().split(',') + for col in row: + if col == '': + run_details.performance = None + print("Error: Performance metric is empty in multiple results file.") + break + else: + print(f"Warning: Multiple results file {multiple_results} not found") + run_details.performance = None + else: + # Extract performance from log using regex + perf_regex = r".*performance:\s*\([+|-]?[0-9]*[.]?[0-9]*\(e[+|-]?[0-9]+\)?\)\s*.*\s*" + run_details.performance = self.console.sh("cat " + log_file_path + + " | sed -n 's/" + perf_regex + "/\\1/p'") + + metric_regex = r".*performance:\s*[+|-]?[0-9]*[.]?[0-9]*\(e[+|-]?[0-9]+\)?\s*\(\w*\)\s*" + run_details.metric = self.console.sh("cat " + log_file_path + + " | sed -n 's/" + metric_regex + "/\\2/p'") + + # Determine success/failure + run_details.status = 'SUCCESS' if run_details.performance else 'FAILURE' + + # Print performance results + run_details.print_perf() + + # Update CSV results + if multiple_results: + run_details.generate_json("common_info.json", multiple_results=True) + update_perf_csv( + multiple_results=multiple_results, + perf_csv=self.args.output, + model_name=run_details.model, + common_info="common_info.json", + ) + else: + run_details.generate_json("perf_entry.json") + update_perf_csv( + single_result="perf_entry.json", + perf_csv=self.args.output, + ) + + return run_details.status == 'SUCCESS' + + except Exception as e: + print("===== SLURM EXECUTION EXCEPTION =====") + print("Exception: ", e) + traceback.print_exc() + print("=======================================") + + run_details.status = "FAILURE" + run_details.generate_json("perf_entry.json") + update_perf_csv( + exception_result="perf_entry.json", + perf_csv=self.args.output, + ) + return False + + def _is_slurm_required_model(self, model_info: typing.Dict) -> bool: + """Check if model requires SLURM cluster execution. + + A model requires SLURM if it has the 'slurm' tag in its tags list. + This indicates the model is designed for distributed multi-node + inference and cannot run on single-node environments. + + Args: + model_info: The model information dictionary containing tags. + + Returns: + bool: True if model has 'slurm' tag, False otherwise. + """ + tags = model_info.get("tags", []) + return "slurm" in tags + + def _is_slurm_environment(self) -> bool: + """Check if current environment is a SLURM cluster. + + This method detects if the code is running on a SLURM cluster by attempting + to execute the 'scontrol show config' command, which is a standard SLURM + command available on all SLURM nodes. + + Returns: + bool: True if running on a SLURM cluster, False otherwise. + """ + # Try to execute scontrol command to check if SLURM is available + try: + result = self.console.sh("scontrol show config 2>/dev/null | head -n 1", canFail=True) + # If the command succeeds and returns output, we're on a SLURM cluster + if result and len(result.strip()) > 0: + print(f"SLURM environment detected via 'scontrol show config'") + print(f"SLURM config: {result.strip()}") + return True + except Exception as e: + # Command failed or not found - not a SLURM environment + pass + + print("Not running on a SLURM cluster - single-node environment detected") + return False + + def _write_skipped_model_result(self, model_info: typing.Dict, status: str) -> None: + """Write a skipped model entry to the performance CSV. + + This method creates a CSV entry for models that were skipped during + execution, allowing for complete tracking and reporting of all models + in the run, including those that couldn't be executed. + + Args: + model_info: The model information dictionary. + status: The skip status (e.g., "SKIPPED_SLURM_REQUIRED"). + """ + run_details = RunDetails() + run_details.model = model_info["name"] + run_details.n_gpus = model_info.get("n_gpus", "-1") + run_details.training_precision = model_info.get("training_precision", "") + run_details.args = model_info.get("args", "") + run_details.tags = model_info.get("tags", []) + run_details.status = status + run_details.pipeline = os.environ.get("pipeline", "") + run_details.machine_name = self.console.sh("hostname") + + # Get GPU architecture from context if available + if "docker_env_vars" in self.context.ctx and "MAD_SYSTEM_GPU_ARCHITECTURE" in self.context.ctx["docker_env_vars"]: + run_details.gpu_architecture = self.context.ctx["docker_env_vars"]["MAD_SYSTEM_GPU_ARCHITECTURE"] + else: + run_details.gpu_architecture = "N/A" + + # Generate JSON and update CSV + run_details.generate_json("perf_entry.json") + update_perf_csv( + single_result="perf_entry.json", + perf_csv=self.args.output, + ) + def run_model(self, model_info: typing.Dict) -> bool: """Run model on container. @@ -898,6 +1143,50 @@ def run_model(self, model_info: typing.Dict) -> bool: """ print(f"Running model {model_info['name']} with {model_info}") + # Check if model requires SLURM but SLURM is not configured + if self._is_slurm_required_model(model_info): + if not self._is_slurm_environment(): + print(f"") + print(f"=" * 80) + print(f"⚠️ WARNING: Model '{model_info['name']}' requires SLURM cluster execution") + print(f"=" * 80) + print(f"") + print(f"This model is tagged with 'slurm' and is designed for distributed") + print(f"multi-node inference. It cannot run on typical single-node environments.") + print(f"") + print(f"Current environment: Single-node execution (no slurm_args detected)") + print(f"Required environment: SLURM cluster with multi-node configuration") + print(f"") + print(f"⚠️ SKIPPING model execution") + print(f"") + print(f"To run this model on a SLURM cluster, use:") + print(f"") + print(f" madengine run --tags {model_info['name']} \\") + print(f" --additional-context \"{{") + print(f" 'slurm_args': {{") + print(f" 'FRAMEWORK': 'sglang_disagg',") + print(f" 'PREFILL_NODES': '2',") + print(f" 'DECODE_NODES': '2',") + print(f" 'PARTITION': 'gpu-partition',") + print(f" 'TIME': '12:00:00',") + print(f" 'DOCKER_IMAGE': ''") + print(f" }}") + print(f" }}\"") + print(f"") + print(f"For more information, see: docs/how-to-run-multi-node.md") + print(f"=" * 80) + print(f"") + + # Write skip status to CSV for reporting + self._write_skipped_model_result(model_info, "SKIPPED_SLURM_REQUIRED") + + # Return True to not fail the entire run (this is a skip, not a failure) + return True + + # Check if SLURM execution is requested + if "slurm_args" in self.context.ctx: + return self.run_model_slurm(model_info) + # set default values if model run fails run_details = RunDetails()