diff --git a/Cargo.lock b/Cargo.lock index 24ee94226..866adf6e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8112,6 +8112,7 @@ dependencies = [ "serde", "serde_json", "serial_test", + "solana-account-decoder-client-types", "solana-client", "time", "tokio", diff --git a/architectures/decentralized/justfile b/architectures/decentralized/justfile index 77ba7d4ed..e1eddf893 100644 --- a/architectures/decentralized/justfile +++ b/architectures/decentralized/justfile @@ -25,16 +25,16 @@ setup-solana-localnet-light-test-run-treasurer run_id="test" *args='': RUN_ID={{ run_id }} CONFIG_FILE=./config/solana-test/light-config.toml PERMISSIONLESS=true ./scripts/setup-and-deploy-solana-test.sh --treasurer {{ args }} setup-solana-localnet-permissioned-test-run run_id="test" *args='': - RUN_ID={{ run_id }} ./scripts/deploy-solana-test.sh {{ args }} + RUN_ID={{ run_id }} ./scripts/setup-and-deploy-solana-test.sh {{ args }} setup-solana-localnet-permissioned-light-test-run run_id="test" *args='': - RUN_ID={{ run_id }} CONFIG_FILE=./config/solana-test/light-config.toml ./scripts/deploy-solana-test.sh {{ args }} + RUN_ID={{ run_id }} CONFIG_FILE=./config/solana-test/light-config.toml ./scripts/setup-and-deploy-solana-test.sh {{ args }} setup-solana-localnet-permissioned-test-run-treasurer run_id="test" *args='': - RUN_ID={{ run_id }} ./scripts/deploy-solana-test.sh --treasurer {{ args }} + RUN_ID={{ run_id }} ./scripts/setup-and-deploy-solana-test.sh --treasurer {{ args }} setup-solana-localnet-permissioned-light-test-run-treasurer run_id="test" *args='': - RUN_ID={{ run_id }} CONFIG_FILE=./config/solana-test/light-config.toml ./scripts/deploy-solana-test.sh --treasurer {{ args }} + RUN_ID={{ run_id }} CONFIG_FILE=./config/solana-test/light-config.toml ./scripts/setup-and-deploy-solana-test.sh --treasurer {{ args }} start-training-localnet-client run_id="test" *args='': AUTHORIZER={{ AUTHORIZER }} RUN_ID={{ run_id }} ./scripts/train-solana-test.sh {{ args }} diff --git a/architectures/decentralized/solana-authorizer/Cargo.lock b/architectures/decentralized/solana-authorizer/Cargo.lock index c1a36b8a6..7eb386dad 100644 --- a/architectures/decentralized/solana-authorizer/Cargo.lock +++ b/architectures/decentralized/solana-authorizer/Cargo.lock @@ -1389,7 +1389,7 @@ dependencies = [ [[package]] name = "psyche-solana-authorizer" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anchor-lang", "anchor-spl", diff --git a/architectures/decentralized/solana-coordinator/Cargo.lock b/architectures/decentralized/solana-coordinator/Cargo.lock index 22d64fadc..72a38df53 100644 --- a/architectures/decentralized/solana-coordinator/Cargo.lock +++ b/architectures/decentralized/solana-coordinator/Cargo.lock @@ -1602,7 +1602,7 @@ dependencies = [ [[package]] name = "psyche-coordinator" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anchor-lang", "async-trait", @@ -1616,7 +1616,7 @@ dependencies = [ [[package]] name = "psyche-core" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anchor-lang", "anchor-lang-idl", @@ -1635,7 +1635,7 @@ dependencies = [ [[package]] name = "psyche-solana-authorizer" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anchor-lang", "anchor-spl", @@ -1643,7 +1643,7 @@ dependencies = [ [[package]] name = "psyche-solana-coordinator" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anchor-lang", "bytemuck", diff --git a/architectures/decentralized/solana-mining-pool/Cargo.lock b/architectures/decentralized/solana-mining-pool/Cargo.lock index 06fb31df5..225d03bf9 100644 --- a/architectures/decentralized/solana-mining-pool/Cargo.lock +++ b/architectures/decentralized/solana-mining-pool/Cargo.lock @@ -1389,7 +1389,7 @@ dependencies = [ [[package]] name = "psyche-solana-mining-pool" -version = "0.1.1" +version = "0.2.0" dependencies = [ "anchor-lang", "anchor-spl", diff --git a/architectures/decentralized/solana-treasurer/Cargo.lock b/architectures/decentralized/solana-treasurer/Cargo.lock index 5d56eb74d..9be05b852 100644 --- a/architectures/decentralized/solana-treasurer/Cargo.lock +++ b/architectures/decentralized/solana-treasurer/Cargo.lock @@ -1602,7 +1602,7 @@ dependencies = [ [[package]] name = "psyche-coordinator" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anchor-lang", "async-trait", @@ -1616,7 +1616,7 @@ dependencies = [ [[package]] name = "psyche-core" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anchor-lang", "anchor-lang-idl", @@ -1635,7 +1635,7 @@ dependencies = [ [[package]] name = "psyche-solana-authorizer" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anchor-lang", "anchor-spl", @@ -1643,7 +1643,7 @@ dependencies = [ [[package]] name = "psyche-solana-coordinator" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anchor-lang", "bytemuck", @@ -1656,7 +1656,7 @@ dependencies = [ [[package]] name = "psyche-solana-treasurer" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anchor-lang", "anchor-spl", diff --git a/psyche-book/src/enduser/join-run.md b/psyche-book/src/enduser/join-run.md index b79678bcd..968b09980 100644 --- a/psyche-book/src/enduser/join-run.md +++ b/psyche-book/src/enduser/join-run.md @@ -64,7 +64,8 @@ WALLET_PATH=/path/to/your/keypair.json RPC=https://your-primary-rpc-provider.com WS_RPC=wss://your-primary-rpc-provider.com -# Required: Which run id to join +# Optional: Which run id to join +# If not set, the client will automatically discover and join an available run RUN_ID=your_run_id_here # Recommended: Fallback RPC Endpoints (for reliability) @@ -78,6 +79,19 @@ Then, you can start training through the run manager running: ./run-manager --env-file /path/to/your/.env ``` +### Automatic Run Selection + +If you don't specify a `RUN_ID` in your `.env` file, the run-manager will automatically query the Solana coordinator to find a suitable run to join. +This makes it easier to join training without needing to know the specific run ID in advance. The run-manager will display which run it selected in the logs: + +``` +INFO RUN_ID not set, discovering available runs... +INFO Found 2 available run(s): +INFO - run_abc123 (state: Waiting for members) +INFO - run_def456 (state: Training) +INFO Selected run: run_abc123 (state: Waiting for members) +``` + After the initial setup, you'll see the Psyche client logs streaming in real-time. These logs show training progress, network status, and other important information. To stop the client, press `Ctrl+C` in the terminal. @@ -86,9 +100,19 @@ To stop the client, press `Ctrl+C` in the terminal. We recommend using a dedicated RPC service such as [Helius](https://www.helius.dev/), [QuickNode](https://www.quicknode.com/), [Triton](https://triton.one/), or self-hosting your own Solana RPC node. +## Filtering by Authorizer + +If you want to only join runs authorized by a specific entity, you can use the `--authorizer` flag: + +```bash +./run-manager --env-file /path/to/your/.env --authorizer +``` + +This is useful when you want to ensure you only join runs from a trusted coordinator. + ## Additional config variables -In general it's not neccesary to change these variables to join a run since we provide sensible defaults, +In general it's not necessary to change these variables to join a run since we provide sensible defaults, though you might need to. **`NVIDIA_DRIVER_CAPABILITIES`** - An environment variable that the NVIDIA Container Toolkit uses to determine which compute capabilities should be provided to your container. It is recommended to set it to 'all', e.g. `NVIDIA_DRIVER_CAPABILITIES=all`. diff --git a/scripts/setup-and-deploy-solana-test.sh b/scripts/setup-and-deploy-solana-test.sh index 7886b50bf..69a627cbd 100755 --- a/scripts/setup-and-deploy-solana-test.sh +++ b/scripts/setup-and-deploy-solana-test.sh @@ -40,13 +40,19 @@ sleep 3 solana airdrop 10 --url ${RPC} --keypair ${WALLET_FILE} -# Pass treasurer flag to deploy script if set -if [[ "$DEPLOY_TREASURER" == "true" && "$PERMISSIONLESS" == "true" ]]; then + +if [[ "$DEPLOY_TREASURER" == "true" ]]; then WALLET_FILE=${WALLET_FILE} ./scripts/deploy-solana-test.sh --treasurer "${EXTRA_ARGS[@]}" - CONFIG_FILE=${CONFIG_FILE} WALLET_FILE=${WALLET_FILE} ./scripts/create-permissionless-run.sh --treasurer "${EXTRA_ARGS[@]}" -elif [[ "$PERMISSIONLESS" == "true" ]]; then +else WALLET_FILE=${WALLET_FILE} ./scripts/deploy-solana-test.sh "${EXTRA_ARGS[@]}" - CONFIG_FILE=${CONFIG_FILE} WALLET_FILE=${WALLET_FILE} ./scripts/create-permissionless-run.sh "${EXTRA_ARGS[@]}" +fi + +if [[ "$PERMISSIONLESS" == "true" ]]; then + if [[ "$DEPLOY_TREASURER" == "true" ]]; then + CONFIG_FILE=${CONFIG_FILE} WALLET_FILE=${WALLET_FILE} ./scripts/create-permissionless-run.sh --treasurer "${EXTRA_ARGS[@]}" + else + CONFIG_FILE=${CONFIG_FILE} WALLET_FILE=${WALLET_FILE} ./scripts/create-permissionless-run.sh "${EXTRA_ARGS[@]}" + fi fi echo -e "\n[+] Testing Solana setup ready, starting Solana logs...\n" diff --git a/tools/rust-tools/run-manager/Cargo.toml b/tools/rust-tools/run-manager/Cargo.toml index e47a1931f..d6b133134 100644 --- a/tools/rust-tools/run-manager/Cargo.toml +++ b/tools/rust-tools/run-manager/Cargo.toml @@ -35,6 +35,7 @@ rand.workspace = true rand_chacha.workspace = true time.workspace = true solana-client = "=2.1.4" +solana-account-decoder-client-types = "=2.1.4" [dev-dependencies] serial_test = "3.0" diff --git a/tools/rust-tools/run-manager/src/docker/coordinator_client.rs b/tools/rust-tools/run-manager/src/docker/coordinator_client.rs index 729a38786..7ae9f5719 100644 --- a/tools/rust-tools/run-manager/src/docker/coordinator_client.rs +++ b/tools/rust-tools/run-manager/src/docker/coordinator_client.rs @@ -1,16 +1,30 @@ -use anchor_client::solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey}; +use anchor_client::solana_sdk::{ + commitment_config::CommitmentConfig, pubkey::Pubkey, system_program, +}; use anchor_lang::AccountDeserialize; use anyhow::{Context, Result}; +use psyche_coordinator::RunState; +use psyche_solana_authorizer::state::Authorization; use psyche_solana_coordinator::{ CoordinatorInstance, coordinator_account_from_bytes, find_coordinator_instance, + logic::JOIN_RUN_AUTHORIZATION_SCOPE, }; +use solana_account_decoder_client_types::UiAccountEncoding; use solana_client::rpc_client::RpcClient; -use tracing::info; +use solana_client::rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}; +use tracing::{debug, info, warn}; + +#[derive(Debug, Clone)] +pub struct RunInfo { + pub run_id: String, + pub instance_pubkey: Pubkey, + pub coordinator_account: Pubkey, + pub run_state: RunState, +} /// Coordinator client for querying Solana pub struct CoordinatorClient { rpc_client: RpcClient, - #[allow(dead_code)] program_id: Pubkey, } @@ -40,19 +54,39 @@ impl CoordinatorClient { Ok(instance) } + fn fetch_run_state(&self, coordinator_account: &Pubkey) -> Result { + // Fetch the raw Solana account data from the blockchain + let solana_account = self + .rpc_client + .get_account(coordinator_account) + .with_context(|| { + format!( + "Failed to fetch coordinator account {}", + coordinator_account + ) + })?; + + // Deserialize the account data into a CoordinatorAccount struct + let coordinator = + coordinator_account_from_bytes(&solana_account.data).with_context(|| { + format!( + "Failed to deserialize coordinator account {}", + coordinator_account + ) + })?; + + Ok(coordinator.state.coordinator.run_state) + } + pub fn get_docker_tag_for_run(&self, run_id: &str, local_docker: bool) -> Result { info!("Querying coordinator for Run ID: {}", run_id); let instance = self.fetch_coordinator_data(run_id)?; // Fetch the coordinator account to get the client version - let coordinator_account_data = self - .rpc_client - .get_account(&instance.coordinator_account) - .context("RPC error: failed to get coordinator account")?; - - let coordinator_account = coordinator_account_from_bytes(&coordinator_account_data.data) - .context("Failed to deserialize CoordinatorAccount")?; + let coordinator_account_data = + self.rpc_client.get_account(&instance.coordinator_account)?; + let coordinator_account = coordinator_account_from_bytes(&coordinator_account_data.data)?; let client_version = String::from(&coordinator_account.state.client_version); @@ -82,4 +116,103 @@ impl CoordinatorClient { Ok(image_name) } + + pub fn get_all_runs(&self) -> Result> { + // Fetch all CoordinatorInstance accounts that are owned by the program + let accounts = self + .rpc_client + .get_program_accounts_with_config( + &self.program_id, + RpcProgramAccountsConfig { + account_config: RpcAccountInfoConfig { + encoding: Some(UiAccountEncoding::Base64), + commitment: Some(CommitmentConfig::confirmed()), + ..Default::default() + }, + ..Default::default() + }, + ) + .map_err(|e| { + anyhow::anyhow!( + "Failed to fetch program accounts from coordinator program {}: {}", + self.program_id, + e + ) + })?; + + let mut runs = Vec::new(); + for (pubkey, account) in accounts { + match CoordinatorInstance::try_deserialize(&mut account.data.as_slice()) { + Ok(instance) => { + if let Ok(run_state) = self.fetch_run_state(&instance.coordinator_account) { + runs.push(RunInfo { + run_id: instance.run_id.clone(), + instance_pubkey: pubkey, + coordinator_account: instance.coordinator_account, + run_state, + }); + } else { + debug!( + "Skipping run {} (instance: {}) - could not fetch coordinator state", + instance.run_id, pubkey + ); + } + } + Err(e) => { + debug!( + "Failed to deserialize CoordinatorInstance at {}: {}", + pubkey, e + ); + } + } + } + + Ok(runs) + } + + /// Check if a user is authorized to join a specific run. + /// + /// This checks both permissionless authorization (grantee = system_program::ID) + /// and user-specific authorization (grantee = user_pubkey). + pub fn can_user_join_run(&self, run: &RunInfo, user_pubkey: &Pubkey) -> Result { + // Fetch the CoordinatorInstance to get join_authority + let instance = self.fetch_coordinator_data(&run.run_id)?; + let join_authority = instance.join_authority; + + // Try permissionless authorization first (grantee = system_program::ID) + if self.check_authorization_for_grantee(&join_authority, &system_program::ID, user_pubkey) { + return Ok(true); + } + + // Try user-specific authorization (grantee = user_pubkey) + Ok(self.check_authorization_for_grantee(&join_authority, user_pubkey, user_pubkey)) + } + + /// Check if an authorization exists and is valid for a specific grantee. + fn check_authorization_for_grantee( + &self, + join_authority: &Pubkey, + grantee: &Pubkey, + user_pubkey: &Pubkey, + ) -> bool { + let auth_pda = psyche_solana_authorizer::find_authorization( + join_authority, + grantee, + JOIN_RUN_AUTHORIZATION_SCOPE, + ); + + let Ok(account) = self.rpc_client.get_account(&auth_pda) else { + return false; + }; + + let Ok(authorization) = Authorization::try_deserialize(&mut account.data.as_slice()) else { + warn!( + "Failed to deserialize authorization at {}: invalid data", + auth_pda + ); + return false; + }; + + authorization.is_valid_for(join_authority, user_pubkey, JOIN_RUN_AUTHORIZATION_SCOPE) + } } diff --git a/tools/rust-tools/run-manager/src/docker/manager.rs b/tools/rust-tools/run-manager/src/docker/manager.rs index cf20f480a..40c884f95 100644 --- a/tools/rust-tools/run-manager/src/docker/manager.rs +++ b/tools/rust-tools/run-manager/src/docker/manager.rs @@ -1,15 +1,18 @@ use anchor_client::solana_sdk::pubkey::Pubkey; +use anchor_client::solana_sdk::signature::{EncodableKey, Keypair, Signer}; use anyhow::{Context, Result, anyhow, bail}; use std::fs; -use std::io::{BufRead, BufReader}; +use std::io::{BufRead, BufReader, Cursor}; use std::path::PathBuf; use std::process::{Command, Stdio}; use tokio::signal; use tracing::{error, info, warn}; +use crate::docker::RunInfo; use crate::docker::coordinator_client::CoordinatorClient; use crate::get_env_var; use crate::load_and_apply_env_file; +use psyche_coordinator::RunState; const RETRY_DELAY_SECS: u64 = 5; const VERSION_MISMATCH_EXIT_CODE: i32 = 10; @@ -34,6 +37,7 @@ impl RunManager { coordinator_program_id: String, env_file: PathBuf, local_docker: bool, + authorizer: Option, ) -> Result { // Verify docker is available Command::new("docker") @@ -64,13 +68,43 @@ impl RunManager { info!("Using coordinator program ID: {}", coordinator_program_id); - let run_id = get_env_var("RUN_ID")?; let rpc = get_env_var("RPC")?; - let scratch_dir = std::env::var("SCRATCH_DIR").ok(); let coordinator_client = CoordinatorClient::new(rpc, coordinator_program_id); + // Try to get RUN_ID from env, or discover available runs + if let Ok(run_id) = std::env::var("RUN_ID") { + if !run_id.is_empty() { + info!("Using RUN_ID from environment: {}", run_id); + return Ok(Self { + wallet_key, + run_id, + coordinator_client, + env_file, + local_docker, + scratch_dir, + }); + } + } + + info!("RUN_ID not set, discovering available runs..."); + let runs = coordinator_client.get_all_runs()?; + if runs.is_empty() { + bail!("No runs found on coordinator program"); + } + + // Parse wallet key to get user's pubkey for authorization checks + let user_pubkey = parse_wallet_pubkey(&wallet_key)?; + info!("User pubkey: {}", user_pubkey); + + let run_id = select_best_run( + &runs, + &user_pubkey, + &coordinator_client, + authorizer.as_ref(), + )?; + Ok(Self { wallet_key, run_id, @@ -159,6 +193,8 @@ impl RunManager { .arg(format!("RAW_WALLET_PRIVATE_KEY={}", &self.wallet_key)) .arg("--env") .arg(format!("CLIENT_VERSION={}", client_version)) + .arg("--env") + .arg(format!("RUN_ID={}", &self.run_id)) .arg("--env-file") .arg(&self.env_file); @@ -320,3 +356,93 @@ impl RunManager { } } } + +/// Parse wallet key string to extract the user's pubkey. +fn parse_wallet_pubkey(wallet_key: &str) -> Result { + let keypair = if wallet_key.starts_with('[') { + // Assume Keypair::read format (JSON array of bytes) + Keypair::read(&mut Cursor::new(wallet_key)) + .map_err(|e| anyhow!("Failed to parse wallet key: {}", e))? + } else { + // Assume base58 encoded private key + Keypair::from_base58_string(wallet_key) + }; + Ok(keypair.pubkey()) +} + +fn select_best_run( + runs: &[RunInfo], + user_pubkey: &Pubkey, + coordinator_client: &CoordinatorClient, + authorizer: Option<&Pubkey>, +) -> Result { + // Filter out unjoinable run states + let mut candidates: Vec<_> = runs + .iter() + .filter(|run| { + !matches!( + run.run_state, + RunState::Uninitialized | RunState::Finished | RunState::Paused + ) + }) + .collect(); + + if candidates.is_empty() { + bail!( + "No joinable runs found. All {} run(s) are in unjoinable states.", + runs.len() + ); + } + + // Filter by join_authority if --authorizer was specified + if let Some(auth) = authorizer { + info!("Filtering runs by join_authority: {}", auth); + candidates.retain( + |run| match coordinator_client.fetch_coordinator_data(&run.run_id) { + Ok(data) => data.join_authority == *auth, + Err(e) => { + warn!("Skipping run {} - failed to fetch data: {}", run.run_id, e); + false + } + }, + ); + if candidates.is_empty() { + bail!("No runs found matching authorizer {}", auth); + } + } + + // Filter to runs the user is authorized to join + candidates.retain( + |run| match coordinator_client.can_user_join_run(run, user_pubkey) { + Ok(authorized) => authorized, + Err(e) => { + warn!( + "Skipping run {} - authorization check failed: {}", + run.run_id, e + ); + false + } + }, + ); + + if candidates.is_empty() { + bail!("No authorized runs found for user {}", user_pubkey); + } + + // Prioritize runs waiting for members + candidates.sort_by_key(|run| match run.run_state { + RunState::WaitingForMembers => 0, + _ => 1, + }); + + info!("Found {} available run(s):", candidates.len()); + for run in &candidates { + info!(" - {} (state: {})", run.run_id, run.run_state); + } + + info!( + "Selected run: {} (state: {})", + candidates[0].run_id, candidates[0].run_state + ); + Ok(candidates[0].run_id.clone()) +} diff --git a/tools/rust-tools/run-manager/src/docker/mod.rs b/tools/rust-tools/run-manager/src/docker/mod.rs index b3221d7eb..41f2aae7e 100644 --- a/tools/rust-tools/run-manager/src/docker/mod.rs +++ b/tools/rust-tools/run-manager/src/docker/mod.rs @@ -2,4 +2,5 @@ pub mod coordinator_client; pub mod manager; // Re-exports +pub use coordinator_client::RunInfo; pub use manager::{Entrypoint, RunManager}; diff --git a/tools/rust-tools/run-manager/src/main.rs b/tools/rust-tools/run-manager/src/main.rs index 717234d9d..d6036585b 100644 --- a/tools/rust-tools/run-manager/src/main.rs +++ b/tools/rust-tools/run-manager/src/main.rs @@ -59,6 +59,10 @@ struct CliArgs { #[arg(long)] local: bool, + /// Only join runs where this pubkey is the join_authority (Docker mode) + #[arg(long)] + authorizer: Option, + /// Optional entrypoint (Docker mode) #[arg(long)] entrypoint: Option, @@ -287,7 +291,22 @@ async fn async_main() -> Result<()> { None => None, }; - let run_mgr = RunManager::new(args.coordinator_program_id, env_file, args.local)?; + // Parse pubkey into Pubkey type + let authorizer = args + .authorizer + .as_ref() + .map(|s| { + s.parse() + .map_err(|e| anyhow::anyhow!("Failed to parse authorizer pubkey: {}", e)) + }) + .transpose()?; + + let run_mgr = RunManager::new( + args.coordinator_program_id, + env_file, + args.local, + authorizer, + )?; let result = run_mgr.run(entrypoint).await; if let Err(e) = &result { error!("Error: {}", e);