diff --git a/Cargo.lock b/Cargo.lock index 24ee94226..6eb0ed0cd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6946,6 +6946,7 @@ dependencies = [ "iroh", "iroh-blobs", "lazy_static", + "nvml-wrapper", "postcard", "psyche-coordinator", "psyche-core", diff --git a/architectures/centralized/client/src/app.rs b/architectures/centralized/client/src/app.rs index 3560e0a7a..4d2d7e873 100644 --- a/architectures/centralized/client/src/app.rs +++ b/architectures/centralized/client/src/app.rs @@ -131,6 +131,7 @@ pub async fn build_app( .await?; let state_options: RunInitConfig = RunInitConfig { + parallelism_auto: p.parallelism_auto, data_parallelism: p.data_parallelism, tensor_parallelism: p.tensor_parallelism, micro_batch_size: p.micro_batch_size, diff --git a/architectures/decentralized/solana-client/src/app.rs b/architectures/decentralized/solana-client/src/app.rs index 36a529bbb..82914752f 100644 --- a/architectures/decentralized/solana-client/src/app.rs +++ b/architectures/decentralized/solana-client/src/app.rs @@ -114,6 +114,7 @@ pub async fn build_app( let state_options: RunInitConfig = RunInitConfig { + parallelism_auto: p.parallelism_auto, data_parallelism: p.data_parallelism, tensor_parallelism: p.tensor_parallelism, micro_batch_size: p.micro_batch_size, diff --git a/nix/lib.nix b/nix/lib.nix index 961d212ad..8bfe63aef 100644 --- a/nix/lib.nix +++ b/nix/lib.nix @@ -20,7 +20,8 @@ let || (builtins.match ".*tests/fixtures/.*$" path != null) || (builtins.match ".*.config/.*$" path != null) || (builtins.match ".*local-dev-keypair.json$" path != null) - || (builtins.match ".*shared/client/src/state/prompt_texts/index\\.json$" path != null); + || (builtins.match ".*shared/client/src/state/prompt_texts/index\\.json$" path != null) + || (builtins.match ".*shared/client/src/parallelism_data\\.json$" path != null); src = lib.cleanSourceWith { src = ../.; diff --git a/psyche-book/src/enduser/create-run.md b/psyche-book/src/enduser/create-run.md index 7a9354ca1..6aaebb5d4 100644 --- a/psyche-book/src/enduser/create-run.md +++ b/psyche-book/src/enduser/create-run.md @@ -86,11 +86,37 @@ run-manager create-run \ At this point, your run has been successfully created. +### Adding parallelism configuration (required for --parallelism-auto) + +If you want clients to use `PARALLELISM_AUTO=true` for automatic configuration, you must add a `parallelism_data.json` file to your model's HuggingFace repository. + +```json +{ + "H100": { + "1": { "dp": 1, "tp": 1, "micro_batch_size": 4 }, + "8": { "dp": 4, "tp": 2, "micro_batch_size": 4 } + }, + "H200": { + "8": { "dp": 8, "tp": 1, "micro_batch_size": 8 } + } +} +``` + +Format: `gpu_type` → `num_gpus` → config + +- **gpu_type**: GPU model name (e.g., "H100", "H200") +- **num_gpus**: Number of GPUs available (e.g., "1", "8") +- **dp**: Data parallelism +- **tp**: Tensor parallelism +- **micro_batch_size**: Micro batch size per GPU + +The config is shared via P2P when clients join a run. + ### Initializing configuration Initially, the run will not have any configuration defined and will remain paused, so no clients can join yet. -To set the run configuration, you’ll need to provide mostly the same parameters as when creating the run, along with the path to a `config.toml` file that follows the [run config schema](./run-config.md). +To set the run configuration, you'll need to provide mostly the same parameters as when creating the run, along with the path to a `config.toml` file that follows the [run config schema](./run-config.md). ```bash run-manager update-config \ diff --git a/psyche-book/src/enduser/join-run.md b/psyche-book/src/enduser/join-run.md index b79678bcd..9ad7492c7 100644 --- a/psyche-book/src/enduser/join-run.md +++ b/psyche-book/src/enduser/join-run.md @@ -93,19 +93,29 @@ though you might need to. **`NVIDIA_DRIVER_CAPABILITIES`** - An environment variable that the NVIDIA Container Toolkit uses to determine which compute capabilities should be provided to your container. It is recommended to set it to 'all', e.g. `NVIDIA_DRIVER_CAPABILITIES=all`. +**`PARALLELISM_AUTO`** - Set to `true` to automatically detect optimal parallelism settings based on the model and your GPU hardware. + +- When enabled, the client will look up the best `DATA_PARALLELISM`, `TENSOR_PARALLELISM`, and `MICRO_BATCH_SIZE` values from a [built-in configuration table](https://github.com/PsycheFoundation/psyche/blob/main/shared/client/src/parallelism_data.json) +- Your model and GPU hardware combination must be present in the table +- This is the recommended option for most users +- If set, manual parallelism settings below will be ignored + **`DATA_PARALLELISM`** - Number of GPUs to distribute training data across. - If you have multiple GPUs, you can set this to 2, 4, etc. to speed up training - If you have 1 GPU, set this to `1` +- Ignored if `PARALLELISM_AUTO=true` **`TENSOR_PARALLELISM`** - Number of GPUs to distribute the model across, this lets you train a model you can't fit on one single GPU. - If you have 1 GPU, set this to `1` - If your have `n` GPUs you can distribute the model across all of them by setting it to `n`. +- Ignored if `PARALLELISM_AUTO=true` **`MICRO_BATCH_SIZE`** - Number of samples processed per GPU per training step - Set as high as your GPU memory allows +- Ignored if `PARALLELISM_AUTO=true` **`AUTHORIZER`** - The Solana address that authorized your wallet to join this run diff --git a/shared/client/Cargo.toml b/shared/client/Cargo.toml index 5f7159d81..faf508a4a 100644 --- a/shared/client/Cargo.toml +++ b/shared/client/Cargo.toml @@ -36,6 +36,7 @@ clap.workspace = true sysinfo = "0.32.0" iroh.workspace = true iroh-blobs.workspace = true +nvml-wrapper = "0.11.0" [features] parallelism = ["psyche-modeling/parallelism"] diff --git a/shared/client/src/cli.rs b/shared/client/src/cli.rs index 268ea753a..139e273a6 100644 --- a/shared/client/src/cli.rs +++ b/shared/client/src/cli.rs @@ -112,6 +112,10 @@ pub struct TrainArgs { #[clap(long, env, value_parser = parse_trim_quotes)] pub run_id: String, + /// Auto-detect parallelism settings from lookup table based on model and GPU count + #[clap(long, env)] + pub parallelism_auto: bool, + #[clap(long, default_value_t = 1, env)] pub data_parallelism: usize, diff --git a/shared/client/src/lib.rs b/shared/client/src/lib.rs index b4ea80576..c9aaaf2aa 100644 --- a/shared/client/src/lib.rs +++ b/shared/client/src/lib.rs @@ -1,6 +1,7 @@ mod cli; mod client; mod fetch_data; +pub mod parallelism_lookup; mod protocol; mod state; mod tui; diff --git a/shared/client/src/parallelism_lookup.rs b/shared/client/src/parallelism_lookup.rs new file mode 100644 index 000000000..d6f18035a --- /dev/null +++ b/shared/client/src/parallelism_lookup.rs @@ -0,0 +1,89 @@ +use anyhow::Result; +use hf_hub::{Repo, RepoType}; +use nvml_wrapper::Nvml; +use serde::Deserialize; +use std::collections::HashMap; +use tracing::info; + +const REMOTE_CONFIG_FILENAME: &str = "parallelism_data.json"; + +#[derive(Debug, Clone, Copy, Deserialize)] +pub struct ParallelismConfig { + pub dp: usize, + pub tp: usize, + pub micro_batch_size: usize, +} + +// Table format: gpu_type -> num_gpus -> config +type Table = HashMap>; + +/// Get GPU type from NVML (reads first visible GPU) +fn get_gpu_type_from_nvml() -> Result { + let nvml = Nvml::init()?; + let device = nvml.device_by_index(0)?; + Ok(device.name()?) +} + +fn normalize_gpu_name(raw_name: &str) -> String { + let upper = raw_name.to_uppercase(); + if upper.contains("H200") { + "H200".to_string() + } else if upper.contains("H100") { + "H100".to_string() + } else { + raw_name.to_string() + } +} + +/// Try to load parallelism config JSON from the model's HuggingFace repo +fn load_json_from_model_repo(model_repo_id: &str) -> Option { + let token = std::env::var("HF_TOKEN").ok(); + + let api = hf_hub::api::sync::ApiBuilder::new() + .with_token(token) + .build() + .ok()? + .repo(Repo::new(model_repo_id.to_string(), RepoType::Model)); + + let path = api.get(REMOTE_CONFIG_FILENAME).ok()?; + std::fs::read_to_string(path).ok() +} + +/// Lookup config in a table +fn lookup_in_table(table: &Table, gpu_type: &str, num_gpus: usize) -> Option { + table + .get(gpu_type) + .and_then(|n| n.get(&num_gpus.to_string())) + .copied() +} + +/// Lookup parallelism config from the model's HuggingFace repo +pub fn lookup(model_repo_id: &str) -> Result { + let device_count = tch::Cuda::device_count() as usize; + if device_count == 0 { + anyhow::bail!("No GPUs found!"); + } + + // Use NVML for GPU type detection + let gpu_type = normalize_gpu_name(&get_gpu_type_from_nvml()?); + info!("Detected {} x {} GPU(s)", device_count, gpu_type); + + let raw_json = load_json_from_model_repo(model_repo_id).ok_or_else(|| { + anyhow::anyhow!( + "No parallelism_data.json found in model repo '{}'. \ + Add this file to use --parallelism-auto", + model_repo_id + ) + })?; + + let table: Table = serde_json::from_str(&raw_json) + .map_err(|e| anyhow::anyhow!("Failed to parse parallelism_data.json: {}", e))?; + + info!( + "Using parallelism config from model repo '{}'", + model_repo_id + ); + + lookup_in_table(&table, &gpu_type, device_count) + .ok_or_else(|| anyhow::anyhow!("No config for {} x {}", device_count, gpu_type)) +} diff --git a/shared/client/src/state/init.rs b/shared/client/src/state/init.rs index f7326f382..dbd1d5e1b 100644 --- a/shared/client/src/state/init.rs +++ b/shared/client/src/state/init.rs @@ -1,3 +1,5 @@ +#[cfg(feature = "parallelism")] +use crate::parallelism_lookup; use crate::{WandBInfo, fetch_data::DataFetcher}; use psyche_coordinator::{ Coordinator, HealthChecks, @@ -50,6 +52,8 @@ pub struct RunInitConfig { pub device: Devices, pub hub_read_token: Option, pub hub_max_concurrent_downloads: usize, + /// If true, auto-detect parallelism from lookup table (overrides dp/tp/micro_batch_size) + pub parallelism_auto: bool, pub data_parallelism: usize, pub tensor_parallelism: usize, pub micro_batch_size: usize, @@ -115,6 +119,9 @@ pub enum InitRunError { #[error("Unsupported architecture: {0}")] UnsupportedArchitecture(String), + #[error("Parallelism auto-detection failed: {0}")] + ParallelismLookupFailed(#[from] anyhow::Error), + #[cfg(feature = "python")] #[error("Python distributed error: {0}")] PythonDistributedError(#[from] psyche_modeling::PythonDistributedCausalLMError), @@ -195,6 +202,59 @@ impl RunInitConfigAndIO { + (&hub_repo.repo_id).into() + } + _ => { + return Err(InitRunError::ParallelismLookupFailed(anyhow::anyhow!( + "--parallelism-auto requires a Hub or P2P checkpoint" + ))); + } + }; + + let config = parallelism_lookup::lookup(&model_repo_id)?; + (config.dp, config.tp, config.micro_batch_size) + } else { + ( + init_config.data_parallelism, + init_config.tensor_parallelism, + init_config.micro_batch_size, + ) + }; + + #[cfg(not(feature = "parallelism"))] + let (data_parallelism, tensor_parallelism, micro_batch_size) = ( + init_config.data_parallelism, + init_config.tensor_parallelism, + init_config.micro_batch_size, + ); + + info!( + "Parallelism: dp={}, tp={}, micro_batch_size={}", + data_parallelism, tensor_parallelism, micro_batch_size + ); + let hub_read_token = init_config.hub_read_token.clone(); let hub_max_concurrent_downloads = init_config.hub_max_concurrent_downloads; let data_future = async { @@ -277,7 +337,7 @@ impl RunInitConfigAndIO RunInitConfigAndIO 1 + if data_parallelism > 1 && llm.architecture == model::LLMArchitecture::HfAuto { 1 } else { - init_config.data_parallelism + data_parallelism }, ); @@ -467,8 +527,8 @@ impl RunInitConfigAndIO { #[cfg(feature = "python")] { - let dp = init_config.data_parallelism; - let tp = init_config.tensor_parallelism; + let dp = data_parallelism; + let tp = tensor_parallelism; tokio::task::spawn_blocking(move || { if tp != 1 || dp != 1 { @@ -520,31 +580,25 @@ impl RunInitConfigAndIO { let mut futures: Vec< JoinHandle, ModelLoadError>>, - > = Vec::with_capacity( - init_config.data_parallelism * init_config.tensor_parallelism, - ); + > = Vec::with_capacity(data_parallelism * tensor_parallelism); let devices = init_config.device.clone(); - for dp in 0..init_config.data_parallelism { + for dp in 0..data_parallelism { let communicator_id: Option = - match init_config.tensor_parallelism { + match tensor_parallelism { 0 | 1 => None, #[cfg(feature = "parallelism")] _ => Some(tch::CStore::new().into()), #[cfg(not(feature = "parallelism"))] _ => unimplemented!(), }; - for tp in 0..init_config.tensor_parallelism { + for tp in 0..tensor_parallelism { let tensor_parallelism_world = communicator_id.as_ref().map(|communicator_id| { - ( - communicator_id.clone(), - tp, - init_config.tensor_parallelism, - ) + (communicator_id.clone(), tp, tensor_parallelism) }); let source = source.clone(); - let rank = dp * init_config.tensor_parallelism + tp; + let rank = dp * tensor_parallelism + tp; let devices = devices.clone(); let device = devices.device_for_rank(rank); futures.push(tokio::task::spawn_blocking(move || { @@ -604,9 +658,9 @@ impl RunInitConfigAndIO RunInitConfigAndIO::new(data_provider, init_config.data_parallelism * 2); + let data_fetcher = DataFetcher::::new(data_provider, data_parallelism * 2); let trainers: Vec = match models { RawLoadedModelType::ParallelNativeModels(models) => { @@ -690,26 +743,24 @@ impl RunInitConfigAndIO)>> = - if init_config.data_parallelism > 1 { + if data_parallelism > 1 { #[cfg(feature = "parallelism")] { Some( - (0..init_config.tensor_parallelism) + (0..tensor_parallelism) .map(|_| { ( tch::CStore::new().into(), - Arc::new(CancellableBarrier::new( - init_config.tensor_parallelism, - )) + Arc::new(CancellableBarrier::new(tensor_parallelism)) as Arc, ) }) @@ -736,13 +787,12 @@ impl RunInitConfigAndIO; + let barrier = Arc::new(CancellableBarrier::new(tensor_parallelism)) + as Arc; LocalTrainer::new( ParallelModels { models, @@ -751,7 +801,7 @@ impl RunInitConfigAndIO RunInitConfigAndIO RunInitConfigAndIO