From b8db93ac41e6e9d4ef98cf712f32552d319d03d7 Mon Sep 17 00:00:00 2001 From: IAvecilla Date: Fri, 14 Nov 2025 16:36:28 -0300 Subject: [PATCH 1/2] Add new workflow to run python decentralized tests in the ci --- .github/actions/wait-for-garnix/action.yaml | 15 ++- .../workflows/solana-integration-test-run.yml | 67 ++++++++++++++ .../decentralized/testing/src/docker_setup.rs | 59 +++++++++--- .../decentralized/testing/src/utils.rs | 16 +++- .../testing/tests/integration_tests.rs | 91 +++++++++++++++++++ config/client/.env.local | 1 + docker/test/client_test_entrypoint.sh | 3 +- garnix.yaml | 1 + justfile | 1 + .../src/python_distributed_causal_lm.rs | 8 +- 10 files changed, 241 insertions(+), 21 deletions(-) diff --git a/.github/actions/wait-for-garnix/action.yaml b/.github/actions/wait-for-garnix/action.yaml index 6b747900d..3020a3d8e 100644 --- a/.github/actions/wait-for-garnix/action.yaml +++ b/.github/actions/wait-for-garnix/action.yaml @@ -12,6 +12,17 @@ inputs: runs: using: 'composite' steps: + - name: Install GitHub CLI + shell: bash + run: | + if ! command -v gh &> /dev/null; then + echo "Installing GitHub CLI..." + sudo apt update + sudo apt install gh -y + else + echo "GitHub CLI already installed" + fi + - name: Wait for All Garnix checks shell: bash env: @@ -26,13 +37,13 @@ runs: for i in $(seq 1 $TOTAL_ATTEMPTS); do if [ -z "$GARNIX_SUITE_ID" ]; then GARNIX_SUITE_ID=$(gh api repos/${{ github.repository }}/commits/$SHA/check-suites --jq '.check_suites[] | select(.app.name == "Garnix CI") | .id') - + if [ -z "$GARNIX_SUITE_ID" ]; then echo "No Garnix CI check suite found yet, waiting... (attempt $i/$TOTAL_ATTEMPTS)" sleep 10 continue fi - + echo "Found Garnix CI check suite: $GARNIX_SUITE_ID" fi diff --git a/.github/workflows/solana-integration-test-run.yml b/.github/workflows/solana-integration-test-run.yml index 25ae27075..aa9483228 100644 --- a/.github/workflows/solana-integration-test-run.yml +++ b/.github/workflows/solana-integration-test-run.yml @@ -4,6 +4,11 @@ on: branches: [main] pull_request: branches: [main, '**'] + +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + jobs: # First, build the validator image and cache it build-validator: @@ -36,3 +41,65 @@ jobs: with: test-name: ${{ matrix.test-name }} secrets: inherit + + decentralized-integration-python-test: + runs-on: self-hosted + needs: build-validator + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Get Validator Image from cache + id: cache-validator + uses: actions/cache/restore@v4 + with: + path: validator-image.tar.gz + key: validator-image-${{ runner.os }}-${{ hashFiles('shared/coordinator/src/coordinator.rs', 'architectures/decentralized/solana-coordinator/**/*.rs', 'architectures/decentralized/solana-coordinator/**/*.toml', 'architectures/decentralized/solana-coordinator/Cargo.lock', 'architectures/decentralized/solana-authorizer/**/*.rs', 'architectures/decentralized/solana-authorizer/**/*.toml', 'architectures/decentralized/solana-authorizer/Cargo.lock', 'docker/test/psyche_solana_validator_entrypoint.sh', 'nix/docker.nix', 'flake.lock') }} + fail-on-cache-miss: true + + - name: Load Validator Image + run: | + echo "Loading validator image from cache" + docker load < validator-image.tar.gz + docker images | grep psyche-solana-test-validator + + echo "Disk usage after loading validator" + df -h + + - name: Clean up validator tar file + run: | + # Remove the compressed validator image to free up disk space + rm -f validator-image.tar.gz + echo "Disk usage after removing validator tar" + df -h + + - name: Download Solana Test Client Python Image + run: | + echo "Disk space before client build" + df -h + + sleep 500 + # Calculate the derivation hash + echo "Calculating derivation path" + DRV_PATH=$(nix eval --raw .#docker-psyche-solana-test-client.drvPath) + echo "Derivation path: $DRV_PATH" + + OUT_PATH=$(nix derivation show $DRV_PATH | jq -r '.[].outputs.out.path') + echo "Output path: $OUT_PATH" + + # download from Garnix cache first + echo "Attempting to fetch from Garnix cache" + nix-store --realise $OUT_PATH --option substitute true + + # Load the image into Docker + $OUT_PATH | docker load + + echo "Disk space after client build" + df -h + + - name: Run decentralized integration test + env: + HF_TOKEN: ${{ secrets.HF_TOKEN }} + run: | + nix develop .#dev-python --command bash -c "cargo test --release --features python,parallelism -p psyche-decentralized-testing --test integration_tests -- --nocapture test_big_model_with_sidecars" diff --git a/architectures/decentralized/testing/src/docker_setup.rs b/architectures/decentralized/testing/src/docker_setup.rs index 4f5a80dd9..3d8e77a36 100644 --- a/architectures/decentralized/testing/src/docker_setup.rs +++ b/architectures/decentralized/testing/src/docker_setup.rs @@ -8,7 +8,9 @@ use bollard::{ secret::{ContainerSummary, HostConfig}, }; use psyche_core::IntegrationTestLogMarker; +use std::collections::HashMap; use std::process::{Command, Stdio}; +use std::str::FromStr; use std::sync::Arc; use std::time::Duration; use tokio::signal; @@ -18,11 +20,12 @@ use crate::{ utils::ConfigBuilder, }; -/// Check if GPU is available by looking for nvidia-smi or USE_GPU environment variable +/// Check if GPU is available by looking for nvidia-smi or USE_GPU environment variable. +/// This logic mirrors the justfile's GPU detection to ensure consistency. fn has_gpu_support() -> bool { - // Check if USE_GPU environment variable is set - if std::env::var("USE_GPU").is_ok() { - return true; + // Check if USE_GPU environment variable is explicitly set to "0" to disable GPU + if let Ok(val) = std::env::var("USE_GPU") { + return val != "0"; } // Check if nvidia-smi command exists @@ -38,6 +41,16 @@ pub const CLIENT_CONTAINER_PREFIX: &str = "test-psyche-test-client"; pub const VALIDATOR_CONTAINER_PREFIX: &str = "test-psyche-solana-test-validator"; pub const NGINX_PROXY_PREFIX: &str = "nginx-proxy"; +pub fn get_devices_for_client(id: u16) -> String { + let devices_per_client: HashMap> = HashMap::from([ + (1, vec!["0".to_string(), "1".to_string()]), + (2, vec!["2".to_string(), "3".to_string()]), + (3, vec!["4".to_string(), "5".to_string()]), + (4, vec!["6".to_string(), "7".to_string()]), + ]); + devices_per_client.get(&id).unwrap().join(",") +} + pub struct DockerTestCleanup; impl Drop for DockerTestCleanup { fn drop(&mut self) { @@ -75,17 +88,17 @@ pub async fn e2e_testing_setup_subscription( ) -> DockerTestCleanup { remove_old_client_containers(docker_client).await; #[cfg(not(feature = "python"))] - let config_file_path = ConfigBuilder::new() + ConfigBuilder::new() .with_num_clients(init_num_clients) .build(); #[cfg(feature = "python")] - let config_file_path = ConfigBuilder::new() + ConfigBuilder::new() .with_num_clients(init_num_clients) .with_architecture("HfAuto") + .with_model("NousResearch/Meta-Llama-3.1-8B") .with_batch_size(8 * init_num_clients as u32) .build(); - println!("[+] Config file written to: {}", config_file_path.display()); let mut command = Command::new("just"); let command = command .args([ @@ -156,13 +169,35 @@ pub async fn spawn_new_client(docker_client: Arc) -> Result>(); + if splited_env.len() == 2 { + final_envs.push(format!("CUDA_VISIBLE_DEVICES={}", devices)); + } + } else { + final_envs.push(env.to_string()); + } + } + let options = Some(CreateContainerOptions { name: new_container_name.clone(), platform: None, }); let config = Config { image: Some("psyche-solana-test-client"), - env: Some(env_vars.iter().map(|s| s.as_str()).collect()), + env: Some(final_envs.iter().map(|s| s.as_str()).collect()), host_config: Some(host_config), ..Default::default() }; @@ -235,18 +270,18 @@ pub async fn spawn_new_client_with_monitoring( // Updated spawn function pub fn spawn_psyche_network(init_num_clients: usize) -> Result<(), DockerWatcherError> { #[cfg(not(feature = "python"))] - let config_file_path = ConfigBuilder::new() + ConfigBuilder::new() .with_num_clients(init_num_clients) .build(); + #[cfg(feature = "python")] - let config_file_path = ConfigBuilder::new() + ConfigBuilder::new() .with_num_clients(init_num_clients) .with_architecture("HfAuto") + .with_model("NousResearch/Meta-Llama-3.1-8B") .with_batch_size(8 * init_num_clients as u32) .build(); - println!("[+] Config file written to: {}", config_file_path.display()); - let mut command = Command::new("just"); let output = command .args(["run_test_infra", &format!("{init_num_clients}")]) diff --git a/architectures/decentralized/testing/src/utils.rs b/architectures/decentralized/testing/src/utils.rs index 631b05f52..8db0c9377 100644 --- a/architectures/decentralized/testing/src/utils.rs +++ b/architectures/decentralized/testing/src/utils.rs @@ -130,6 +130,7 @@ pub struct ConfigBuilder { num_clients: usize, batch_size: u32, architecture: String, + model: String, } impl Default for ConfigBuilder { @@ -157,6 +158,7 @@ impl ConfigBuilder { num_clients: 1, batch_size: 4, architecture: String::from("HfLlama"), + model: String::from("pefontana/Nano-Llama"), } } @@ -170,12 +172,17 @@ impl ConfigBuilder { self } + pub fn with_model(mut self, model: &str) -> Self { + self.model = model.to_string(); + self + } + pub fn with_batch_size(mut self, batch_size: u32) -> Self { self.batch_size = batch_size; self } - pub fn build(mut self) -> PathBuf { + pub fn build(mut self) { // Apply runtime overrides self.set_value("config.min_clients", self.num_clients as u32); self.set_value("config.init_min_clients", self.num_clients as u32); @@ -186,15 +193,16 @@ impl ConfigBuilder { self.set_value("model.LLM.architecture", self.architecture.clone()); self.set_value("config.global_batch_size_start", self.batch_size); self.set_value("config.global_batch_size_end", self.batch_size); + self.set_value("model.LLM.checkpoint.Hub.repo_id", self.model.clone()); #[cfg(feature = "python")] - self.set_value("config.warmup_time", 100); + self.set_value("config.warmup_time", 500); + #[cfg(feature = "python")] + self.set_value("config.max_round_train_time", 100); let config_content = toml::to_string(&self.base_config).unwrap(); let config_file_path = PathBuf::from("../../../config/solana-test/test-config.toml"); fs::write(&config_file_path, config_content).unwrap(); - - config_file_path } fn set_value(&mut self, path: &str, value: impl Into) { diff --git a/architectures/decentralized/testing/tests/integration_tests.rs b/architectures/decentralized/testing/tests/integration_tests.rs index 90357444b..10091a0dc 100644 --- a/architectures/decentralized/testing/tests/integration_tests.rs +++ b/architectures/decentralized/testing/tests/integration_tests.rs @@ -949,3 +949,94 @@ async fn test_lost_only_peer_go_back_to_hub_checkpoint() { } } } + +/// spawn 1 clients and run for 3 epochs +/// assert client and coordinator state synchronization +/// assert that the loss decreases in each epoch +#[cfg(feature = "python")] +#[test_log::test(tokio::test(flavor = "multi_thread"))] +#[serial] +async fn test_big_model_with_sidecars() { + let run_id = "test".to_string(); + // epochs the test will run + let num_of_epochs_to_run = 3; + let n_new_clients = 3; + + // Initialize DockerWatcher + let docker = Arc::new(Docker::connect_with_socket_defaults().unwrap()); + let mut watcher = DockerWatcher::new(docker.clone()); + + // Initialize a Solana run with 1 client + let _cleanup = e2e_testing_setup(docker.clone(), 1).await; + + // Monitor the client container + let _monitor_client_1 = watcher + .monitor_container( + &format!("{CLIENT_CONTAINER_PREFIX}-1"), + vec![ + IntegrationTestLogMarker::StateChange, + IntegrationTestLogMarker::Loss, + ], + ) + .unwrap(); + + println!("Waiting for run to go on with the first client"); + tokio::time::sleep(Duration::from_secs(30)).await; + + // Initialize solana client to query the coordinator state + let solana_client = SolanaTestClient::new(run_id).await; + let mut live_interval = time::interval(Duration::from_secs(10)); + let mut clients_with_model = 0; + + println!("Adding new clients"); + for i in 1..=n_new_clients { + spawn_new_client(docker.clone()).await.unwrap(); + let _monitor_client = watcher + .monitor_container( + &format!("{CLIENT_CONTAINER_PREFIX}-{}", i + 1), + vec![ + IntegrationTestLogMarker::LoadedModel, + IntegrationTestLogMarker::Loss, + ], + ) + .unwrap(); + } + + loop { + tokio::select! { + _ = live_interval.tick() => { + if let Err(e) = watcher.monitor_clients_health(n_new_clients + 1).await { + panic!("{}", e); + } + } + response = watcher.log_rx.recv() => { + match response { + Some(Response::StateChange(timestamp, _client_1, old_state, new_state, _ , _)) => { + let _coordinator_state = solana_client.get_run_state().await; + println!( + "client: new_state: {new_state}, old_state: {old_state}, timestamp: {timestamp}" + ); + } + Some(Response::Loss(client, epoch, step, loss)) => { + println!( + "client: {client:?}, epoch: {epoch}, step: {step}, Loss: {loss:?}" + ); + if epoch == num_of_epochs_to_run { + break; + } + } + Some(Response::LoadedModel(checkpoint)) => { + // assert client and coordinator state synchronization + assert!(checkpoint.starts_with("P2P"), "The model should be obtained from P2P"); + println!("Client got the model with P2P"); + clients_with_model += 1; + if clients_with_model == n_new_clients { + println!("All clients got the model with P2P"); + } + } + _ => unreachable!(), + } + } + } + } +} diff --git a/config/client/.env.local b/config/client/.env.local index db48366d7..a415ebb58 100644 --- a/config/client/.env.local +++ b/config/client/.env.local @@ -2,6 +2,7 @@ RPC=http://psyche-solana-test-validator:8899 WS_RPC=ws://psyche-solana-test-validator:8900 RUN_ID=test +CUDA_VISIBLE_DEVICES=0,1 # This will override the default wallet path used by the run-manager if defined #WALLET_PRIVATE_KEY_PATH=/keys/somewallet.json diff --git a/docker/test/client_test_entrypoint.sh b/docker/test/client_test_entrypoint.sh index 64e739841..d9a2b4205 100644 --- a/docker/test/client_test_entrypoint.sh +++ b/docker/test/client_test_entrypoint.sh @@ -16,8 +16,9 @@ if [ "${PYTHON_ENABLED}" = "true" ]; then --rpc "${RPC}" \ --ws-rpc "${WS_RPC}" \ --run-id "${RUN_ID}" \ - --data-parallelism 8 \ + --data-parallelism 2 \ --sidecar-port "${SIDECAR_PORT}" \ + --iroh-relay "n0" \ --logs "json" else echo "Starting client without Python features" diff --git a/garnix.yaml b/garnix.yaml index 673620553..d111b72f3 100644 --- a/garnix.yaml +++ b/garnix.yaml @@ -30,6 +30,7 @@ builds: - 'packages.x86_64-linux.psyche-centralized-server' - 'packages.x86_64-linux.docker-psyche-solana-client' - 'packages.x86_64-linux.docker-psyche-solana-test-client-no-python' + - 'packages.x86_64-linux.docker-psyche-solana-test-client' - 'packages.x86_64-linux.psyche-centralized-local-testnet' - 'packages.x86_64-linux.expand-distro' - 'devShells.x86_64-linux.*' diff --git a/justfile b/justfile index 3fc4f3afe..4e9daba9f 100644 --- a/justfile +++ b/justfile @@ -193,3 +193,4 @@ run_test_infra_with_proxies_validator num_clients="1": stop_test_infra: cd docker/test && docker compose -f docker-compose.yml -f subscriptions_test/docker-compose.yml down + docker ps --filter name=test-psyche-test-client -q | xargs -r docker stop diff --git a/shared/modeling/src/python_distributed_causal_lm.rs b/shared/modeling/src/python_distributed_causal_lm.rs index 6a230ec28..360e6c89f 100644 --- a/shared/modeling/src/python_distributed_causal_lm.rs +++ b/shared/modeling/src/python_distributed_causal_lm.rs @@ -19,7 +19,7 @@ use std::{ }; use tch::{Device, Tensor}; use thiserror::Error; -use tracing::{debug, error, info, trace}; +use tracing::{debug, error, info, trace, warn}; #[derive(Debug, Error)] pub enum PythonDistributedCausalLMError { @@ -235,11 +235,15 @@ impl PythonDistributedCausalLM { } let num_local_ranks = num_local_ranks.unwrap_or_else(tch::Cuda::device_count); let world_size = parallelism.dp * parallelism.tp; - if world_size < (num_local_ranks as usize) { + if world_size > (num_local_ranks as usize) { return Err(PythonDistributedCausalLMError::IncompatibleWorldSize( world_size, num_local_ranks as usize, )); + } else if world_size < num_local_ranks as usize { + warn!( + "The client will use {world_size} devices, but {num_local_ranks} are available. Make sure that underusing the available devices is okay" + ); } let rank = match device { From 5a63de1652d6021e003c13ff96c676f1d44761c3 Mon Sep 17 00:00:00 2001 From: IAvecilla Date: Sat, 24 Jan 2026 11:20:51 -0300 Subject: [PATCH 2/2] Add Torchtitan test --- .../workflows/solana-integration-test-run.yml | 2 +- .../solana-authorizer/Cargo.lock | 2 +- .../solana-coordinator/Cargo.lock | 8 +- .../decentralized/testing/src/docker_setup.rs | 150 ++++++++++-------- .../decentralized/testing/src/utils.rs | 17 +- .../testing/tests/integration_tests.rs | 61 ++++--- python/python/psyche/sidecar/__main__.py | 1 + 7 files changed, 135 insertions(+), 106 deletions(-) diff --git a/.github/workflows/solana-integration-test-run.yml b/.github/workflows/solana-integration-test-run.yml index aa9483228..515dd87ac 100644 --- a/.github/workflows/solana-integration-test-run.yml +++ b/.github/workflows/solana-integration-test-run.yml @@ -102,4 +102,4 @@ jobs: env: HF_TOKEN: ${{ secrets.HF_TOKEN }} run: | - nix develop .#dev-python --command bash -c "cargo test --release --features python,parallelism -p psyche-decentralized-testing --test integration_tests -- --nocapture test_big_model_with_sidecars" + nix develop .#dev-python --command bash -c "cargo test --release --features python,parallelism -p psyche-decentralized-testing --test integration_tests -- --nocapture test_run_with_python" diff --git a/architectures/decentralized/solana-authorizer/Cargo.lock b/architectures/decentralized/solana-authorizer/Cargo.lock index c1a36b8a6..7eb386dad 100644 --- a/architectures/decentralized/solana-authorizer/Cargo.lock +++ b/architectures/decentralized/solana-authorizer/Cargo.lock @@ -1389,7 +1389,7 @@ dependencies = [ [[package]] name = "psyche-solana-authorizer" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anchor-lang", "anchor-spl", diff --git a/architectures/decentralized/solana-coordinator/Cargo.lock b/architectures/decentralized/solana-coordinator/Cargo.lock index 22d64fadc..72a38df53 100644 --- a/architectures/decentralized/solana-coordinator/Cargo.lock +++ b/architectures/decentralized/solana-coordinator/Cargo.lock @@ -1602,7 +1602,7 @@ dependencies = [ [[package]] name = "psyche-coordinator" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anchor-lang", "async-trait", @@ -1616,7 +1616,7 @@ dependencies = [ [[package]] name = "psyche-core" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anchor-lang", "anchor-lang-idl", @@ -1635,7 +1635,7 @@ dependencies = [ [[package]] name = "psyche-solana-authorizer" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anchor-lang", "anchor-spl", @@ -1643,7 +1643,7 @@ dependencies = [ [[package]] name = "psyche-solana-coordinator" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anchor-lang", "bytemuck", diff --git a/architectures/decentralized/testing/src/docker_setup.rs b/architectures/decentralized/testing/src/docker_setup.rs index 3d8e77a36..d7d11dbd2 100644 --- a/architectures/decentralized/testing/src/docker_setup.rs +++ b/architectures/decentralized/testing/src/docker_setup.rs @@ -68,61 +68,117 @@ impl Drop for DockerTestCleanup { } } -/// FIXME: The config path must be relative to the compose file for now. -pub async fn e2e_testing_setup( - docker_client: Arc, - init_num_clients: usize, -) -> DockerTestCleanup { - remove_old_client_containers(docker_client).await; +/// Configuration for spawning a psyche network with custom settings +pub struct PsycheNetworkConfig { + pub num_clients: usize, + pub architecture: String, + pub model: String, + pub batch_size: u32, + pub use_proxies: bool, +} - spawn_psyche_network(init_num_clients).unwrap(); +impl Default for PsycheNetworkConfig { + fn default() -> Self { + #[cfg(not(feature = "python"))] + { + Self { + num_clients: 1, + architecture: "HfLlama".to_string(), + model: "pefontana/Nano-Llama".to_string(), + batch_size: 4, + use_proxies: false, + } + } + #[cfg(feature = "python")] + { + Self { + num_clients: 1, + architecture: "HfAuto".to_string(), + model: "NousResearch/Meta-Llama-3.1-8B".to_string(), + batch_size: 8, + use_proxies: false, + } + } + } +} - spawn_ctrl_c_task(); +impl PsycheNetworkConfig { + pub fn with_num_clients(mut self, num_clients: usize) -> Self { + self.num_clients = num_clients; + self + } - DockerTestCleanup {} + pub fn with_proxies(mut self) -> Self { + self.use_proxies = true; + self + } } -pub async fn e2e_testing_setup_subscription( - docker_client: Arc, - init_num_clients: usize, -) -> DockerTestCleanup { - remove_old_client_containers(docker_client).await; - #[cfg(not(feature = "python"))] - ConfigBuilder::new() - .with_num_clients(init_num_clients) - .build(); - #[cfg(feature = "python")] +/// Spawn psyche network with configuration +fn spawn_psyche_network(config: &PsycheNetworkConfig) -> Result<(), DockerWatcherError> { ConfigBuilder::new() - .with_num_clients(init_num_clients) - .with_architecture("HfAuto") - .with_model("NousResearch/Meta-Llama-3.1-8B") - .with_batch_size(8 * init_num_clients as u32) + .with_num_clients(config.num_clients) + .with_architecture(&config.architecture) + .with_model(&config.model) + .with_batch_size(config.batch_size) .build(); - let mut command = Command::new("just"); - let command = command - .args([ - "run_test_infra_with_proxies_validator", - &format!("{init_num_clients}"), - ]) - .stdout(Stdio::inherit()) - .stderr(Stdio::inherit()); + let just_command = if config.use_proxies { + "run_test_infra_with_proxies_validator" + } else { + "run_test_infra" + }; + let mut command = Command::new("just"); let output = command + .args([just_command, &format!("{}", config.num_clients)]) + .stdout(Stdio::inherit()) + .stderr(Stdio::inherit()) .output() .expect("Failed to spawn docker compose instances"); + if !output.status.success() { panic!("Error: {}", String::from_utf8_lossy(&output.stderr)); } println!("\n[+] Docker compose network spawned successfully!"); println!(); + Ok(()) +} + +/// E2E testing setup with configuration +pub async fn e2e_testing_setup_with_config( + docker_client: Arc, + config: PsycheNetworkConfig, +) -> DockerTestCleanup { + remove_old_client_containers(docker_client).await; + + spawn_psyche_network(&config).unwrap(); spawn_ctrl_c_task(); DockerTestCleanup {} } +/// FIXME: The config path must be relative to the compose file for now. +pub async fn e2e_testing_setup( + docker_client: Arc, + init_num_clients: usize, +) -> DockerTestCleanup { + let config = PsycheNetworkConfig::default().with_num_clients(init_num_clients); + e2e_testing_setup_with_config(docker_client, config).await +} + +pub async fn e2e_testing_setup_subscription( + docker_client: Arc, + init_num_clients: usize, +) -> DockerTestCleanup { + let config = PsycheNetworkConfig::default() + .with_num_clients(init_num_clients) + .with_proxies(); + e2e_testing_setup_with_config(docker_client, config).await +} + pub async fn spawn_new_client(docker_client: Arc) -> Result { // Set the container name based on the ones that are already running. let new_container_name = get_name_of_new_client_container(docker_client.clone()).await; @@ -267,38 +323,6 @@ pub async fn spawn_new_client_with_monitoring( Ok(container_id) } -// Updated spawn function -pub fn spawn_psyche_network(init_num_clients: usize) -> Result<(), DockerWatcherError> { - #[cfg(not(feature = "python"))] - ConfigBuilder::new() - .with_num_clients(init_num_clients) - .build(); - - #[cfg(feature = "python")] - ConfigBuilder::new() - .with_num_clients(init_num_clients) - .with_architecture("HfAuto") - .with_model("NousResearch/Meta-Llama-3.1-8B") - .with_batch_size(8 * init_num_clients as u32) - .build(); - - let mut command = Command::new("just"); - let output = command - .args(["run_test_infra", &format!("{init_num_clients}")]) - .stdout(Stdio::inherit()) - .stderr(Stdio::inherit()) - .output() - .expect("Failed to spawn docker compose instances"); - - if !output.status.success() { - panic!("Error: {}", String::from_utf8_lossy(&output.stderr)); - } - - println!("\n[+] Docker compose network spawned successfully!"); - println!(); - Ok(()) -} - pub fn spawn_ctrl_c_task() { tokio::spawn(async { signal::ctrl_c().await.expect("Failed to listen for Ctrl+C"); diff --git a/architectures/decentralized/testing/src/utils.rs b/architectures/decentralized/testing/src/utils.rs index 8db0c9377..d4ac1bdb2 100644 --- a/architectures/decentralized/testing/src/utils.rs +++ b/architectures/decentralized/testing/src/utils.rs @@ -5,7 +5,7 @@ use anchor_client::{ solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Keypair}, }; use psyche_coordinator::{ - NUM_STORED_ROUNDS, Round, RunState, + RunState, model::{Checkpoint, Model}, }; use psyche_core::FixedVec; @@ -73,26 +73,11 @@ impl SolanaTestClient { coordinator.state.coordinator.epoch_state.clients } - pub async fn get_clients_len(&self) -> usize { - let clients = self.get_clients().await; - clients.len() - } - pub async fn get_run_state(&self) -> RunState { let coordinator = self.get_coordinator_account().await; coordinator.state.coordinator.run_state } - pub async fn get_rounds(&self) -> [Round; NUM_STORED_ROUNDS] { - let coordinator = self.get_coordinator_account().await; - coordinator.state.coordinator.epoch_state.rounds - } - - pub async fn get_rounds_head(&self) -> u32 { - let coordinator = self.get_coordinator_account().await; - coordinator.state.coordinator.epoch_state.rounds_head - } - pub async fn get_current_epoch(&self) -> u16 { let coordinator = self.get_coordinator_account().await; coordinator.state.coordinator.progress.epoch diff --git a/architectures/decentralized/testing/tests/integration_tests.rs b/architectures/decentralized/testing/tests/integration_tests.rs index 10091a0dc..e0b22f5af 100644 --- a/architectures/decentralized/testing/tests/integration_tests.rs +++ b/architectures/decentralized/testing/tests/integration_tests.rs @@ -11,12 +11,13 @@ use bollard::container::StartContainerOptions; use bollard::{Docker, container::KillContainerOptions}; use psyche_coordinator::{RunState, model::Checkpoint}; use psyche_core::IntegrationTestLogMarker; -use psyche_decentralized_testing::docker_setup::e2e_testing_setup_subscription; use psyche_decentralized_testing::{ CLIENT_CONTAINER_PREFIX, NGINX_PROXY_PREFIX, chaos::{ChaosAction, ChaosScheduler}, docker_setup::{ - e2e_testing_setup, kill_all_clients, spawn_new_client, spawn_new_client_with_monitoring, + PsycheNetworkConfig, e2e_testing_setup, e2e_testing_setup_subscription, + e2e_testing_setup_with_config, kill_all_clients, spawn_new_client, + spawn_new_client_with_monitoring, }, docker_watcher::{DockerWatcher, Response}, utils::SolanaTestClient, @@ -950,37 +951,50 @@ async fn test_lost_only_peer_go_back_to_hub_checkpoint() { } } -/// spawn 1 clients and run for 3 epochs -/// assert client and coordinator state synchronization -/// assert that the loss decreases in each epoch +/// Test P2P model sharing with different backends. +/// This exercises the code path where: +/// 1. First client downloads model from HuggingFace Hub +/// 2. Model is loaded into the specified backend for training +/// 3. New clients join and receive the model via P2P sharing +/// 4. The P2P-received model is loaded into the backend #[cfg(feature = "python")] +#[rstest] +#[case("HfLlama")] +#[case("Torchtitan")] +#[trace] #[test_log::test(tokio::test(flavor = "multi_thread"))] #[serial] -async fn test_big_model_with_sidecars() { +async fn test_run_with_python(#[case] architecture: &str) { let run_id = "test".to_string(); - // epochs the test will run let num_of_epochs_to_run = 3; - let n_new_clients = 3; + let n_new_clients: usize = 2; + let init_num_clients = 1; - // Initialize DockerWatcher let docker = Arc::new(Docker::connect_with_socket_defaults().unwrap()); let mut watcher = DockerWatcher::new(docker.clone()); - // Initialize a Solana run with 1 client - let _cleanup = e2e_testing_setup(docker.clone(), 1).await; + let config = PsycheNetworkConfig { + num_clients: init_num_clients, + architecture: architecture.to_string(), + model: "NousResearch/Meta-Llama-3.1-8B".to_string(), + batch_size: 8 * (init_num_clients as u32 + n_new_clients as u32), + use_proxies: false, + }; + let _cleanup = e2e_testing_setup_with_config(docker.clone(), config).await; - // Monitor the client container + // Monitor the first client container let _monitor_client_1 = watcher .monitor_container( &format!("{CLIENT_CONTAINER_PREFIX}-1"), vec![ IntegrationTestLogMarker::StateChange, IntegrationTestLogMarker::Loss, + IntegrationTestLogMarker::LoadedModel, ], ) .unwrap(); - println!("Waiting for run to go on with the first client"); + println!("Waiting for first client to load model and start training"); tokio::time::sleep(Duration::from_secs(30)).await; // Initialize solana client to query the coordinator state @@ -988,7 +1002,7 @@ async fn test_big_model_with_sidecars() { let mut live_interval = time::interval(Duration::from_secs(10)); let mut clients_with_model = 0; - println!("Adding new clients"); + println!("Adding new clients to test P2P model sharing with {architecture}"); for i in 1..=n_new_clients { spawn_new_client(docker.clone()).await.unwrap(); let _monitor_client = watcher @@ -1005,7 +1019,7 @@ async fn test_big_model_with_sidecars() { loop { tokio::select! { _ = live_interval.tick() => { - if let Err(e) = watcher.monitor_clients_health(n_new_clients + 1).await { + if let Err(e) = watcher.monitor_clients_health((n_new_clients + init_num_clients).try_into().unwrap()).await { panic!("{}", e); } } @@ -1026,12 +1040,17 @@ async fn test_big_model_with_sidecars() { } } Some(Response::LoadedModel(checkpoint)) => { - // assert client and coordinator state synchronization - assert!(checkpoint.starts_with("P2P"), "The model should be obtained from P2P"); - println!("Client got the model with P2P"); - clients_with_model += 1; - if clients_with_model == n_new_clients { - println!("All clients got the model with P2P"); + println!("LoadedModel event: checkpoint = {checkpoint}"); + if checkpoint.starts_with("Hub") { + // First client loaded from Hub + println!("First client loaded model from Hub ({architecture} backend)"); + } else if checkpoint.starts_with("P2P") { + // Subsequent clients loaded via P2P + println!("Client got the model via P2P ({architecture} backend)"); + clients_with_model += 1; + if clients_with_model == n_new_clients { + println!("All {n_new_clients} new clients got the model via P2P with {architecture}!"); + } } } _ => unreachable!(), diff --git a/python/python/psyche/sidecar/__main__.py b/python/python/psyche/sidecar/__main__.py index 1b605a28b..b1a3ccb8e 100644 --- a/python/python/psyche/sidecar/__main__.py +++ b/python/python/psyche/sidecar/__main__.py @@ -6,6 +6,7 @@ import torch.distributed as dist from datetime import timedelta + from .. import ( make_causal_lm, PretrainedSourceRepoFiles,