Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
c51fb1c
use several data providers and cycle between them on error
dsocolobsky Apr 30, 2025
049c542
save latest working data provider to use next time
dsocolobsky Apr 30, 2025
e4cde0c
allow defining backup data providers in config
dsocolobsky Apr 30, 2025
0ea2ef7
Add test_backup_data_provider
dsocolobsky May 5, 2025
4ccbdf7
make DataFetcher::new use anyhow errors
dsocolobsky May 6, 2025
32699de
fix memnet tests
dsocolobsky May 7, 2025
7565ce4
update configs to use new data_locations
dsocolobsky May 7, 2025
b3237ad
centralized-server: throw error if no data providers present
dsocolobsky May 7, 2025
e410055
Simplify and shorten fetch retries
dsocolobsky May 8, 2025
c26c773
nix fmt
dsocolobsky Jun 18, 2025
1878512
client: handle multiple data providers initialization in init.rs
dsocolobsky Jun 19, 2025
a6fe9b8
Fix typos and correct old docs
IAvecilla Dec 5, 2025
7dad2a8
Merge remote-tracking branch 'origin/main' into dy/many-data-providers
arilotter Dec 8, 2025
063970e
wip
IAvecilla Dec 9, 2025
206b173
Fix localnet
IAvecilla Dec 9, 2025
2214d2a
Fix decentralized section
IAvecilla Dec 10, 2025
db5341b
Fix python section
IAvecilla Dec 10, 2025
48646fd
Fix latest development docs
IAvecilla Dec 10, 2025
b739d1a
Improve create run docs
IAvecilla Dec 11, 2025
b77ba97
Update create run section
IAvecilla Dec 11, 2025
e095f85
Fix rest of the sections
IAvecilla Dec 11, 2025
3882805
Update psyche-book/src/explain/index.md
IAvecilla Dec 11, 2025
5f928d5
Improve deploy script and docs
IAvecilla Dec 14, 2025
cf047c6
Add justfile for dev commands
IAvecilla Dec 15, 2025
44b70fc
Merge branch 'main' into clearing-psyche-book
IAvecilla Dec 15, 2025
b6b7e2f
Finish testing new commands
IAvecilla Dec 15, 2025
4dbc244
Solana mem fix wip
IAvecilla Dec 16, 2025
38ddfc0
Send instructions for each data location
IAvecilla Dec 19, 2025
8c41dd2
Merge branch 'main' into dy/many-data-providers
IAvecilla Dec 19, 2025
b010e1c
Fix clippy and remove commented code
IAvecilla Dec 19, 2025
1241efa
Fix tests
IAvecilla Dec 19, 2025
a2452c1
Fix memnet tests
IAvecilla Dec 19, 2025
8f528dd
Fix backend compilation
IAvecilla Dec 19, 2025
71ff151
Add instruction in backend
IAvecilla Dec 19, 2025
56d5152
Fix decentralized tests
IAvecilla Dec 22, 2025
8cc8d56
Merge branch 'main' into dy/many-data-providers
IAvecilla Dec 22, 2025
46718d9
Remove unused scripts
IAvecilla Dec 22, 2025
2e86d33
Separate data locations updates in new transaction
IAvecilla Dec 23, 2025
0aab10b
Merge branch 'main' into dy/many-data-providers
IAvecilla Dec 23, 2025
adf3d69
Merge branch 'main' into dy/many-data-providers
IAvecilla Jan 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions architectures/centralized/server/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,22 @@ impl App {

let training_data_server = match &coordinator.model {
Model::LLM(LLM {
data_location,
checkpoint,
..
}) => {
if let LLMTrainingDataLocation::Server(url) = data_location {
let data_locations = &coordinator.data_locations;
let data_location_server_urls:Vec<_> = data_locations.iter().filter_map(|l| match l {LLMTrainingDataLocation::Server(url) => Some(url.to_string()), _=> None}).collect();

if data_location_server_urls.is_empty() {
None
} else {
if data_location_server_urls.len() > 1 {
bail!("More than one LLMTrainingDataLocation::Server configured, but we only support hosting a single one.");
}

// we know there's a single url, and it's the one that includes the port we want to host on.
let url = data_location_server_urls.first().unwrap();

match checkpoint {
Checkpoint::Hub(hub_repo) => {
let repo_id = String::from(&hub_repo.repo_id);
Expand All @@ -206,7 +217,7 @@ impl App {
}
}

let server_addr: SocketAddr = String::from(url).parse().map_err(|e| {
let server_addr: SocketAddr = url.parse().map_err(|e| {
anyhow!("Failed to parse training data server URL {:?}: {}", url, e)
})?;
let data_server_port = server_addr.port();
Expand All @@ -231,8 +242,6 @@ impl App {
DataProviderTcpServer::start(local_data_provider, backend, data_server_port)
.await?;
Some((tx, data_server))
} else {
None
}
}
};
Expand Down
2 changes: 2 additions & 0 deletions architectures/centralized/testing/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::{MAX_ROUND_TRAIN_TIME, ROUND_WITNESS_TIME, WARMUP_TIME};
use bytemuck::Zeroable;
use psyche_centralized_server::app::App as ServerApp;
use psyche_centralized_shared::ClientId;
use psyche_coordinator::model::LLMDataLocations;
use psyche_coordinator::{Client, Round};
use psyche_coordinator::{
Coordinator, CoordinatorConfig, CoordinatorEpochState, RunState, SOLANA_MAX_NUM_CLIENTS,
Expand Down Expand Up @@ -94,6 +95,7 @@ impl CoordinatorServer {
model: Model::LLM(LLM::dummy()),
config: coordinator_config,
epoch_state,
data_locations: LLMDataLocations::dummy(),
..Coordinator::<ClientId>::zeroed()
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ pub async fn command_json_dump_run_execute(
"client_version": coordinator_account_state.state.client_version,
"metadata": coordinator_account_state.state.metadata,
"model": coordinator_account_state.state.coordinator.model,
"data_locations": coordinator_account_state.state.coordinator.data_locations,
"config": coordinator_account_state.state.coordinator.config,
},
"status": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub async fn command_set_future_epoch_rates_execute(
.map(|amount| ui_amount_to_native_amount(amount, mint_decimals)),
paused: None,
client_version: None,
data_location: None,
},
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub async fn command_set_paused_execute(
epoch_slashing_rate_per_client: None,
paused: Some(paused),
client_version: None,
data_location: None,
},
)
} else {
Expand Down
108 changes: 97 additions & 11 deletions architectures/decentralized/solana-client/src/command/update_config.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::path::PathBuf;

use anyhow::{Context, Result, bail};
use clap::Args;
use psyche_coordinator::{
CoordinatorConfig, CoordinatorProgress, get_data_index_for_step,
model::{Checkpoint, Model},
model::{Checkpoint, LLMDataLocations, LLMTrainingDataLocation, Model},
};
use psyche_core::FixedVec;
use psyche_solana_treasurer::logic::RunUpdateParams;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;

use crate::{SolanaBackend, instructions};

Expand Down Expand Up @@ -68,22 +68,58 @@ pub async fn command_update_config_execute(
.get_coordinator_account(&coordinator_account)
.await?;

let (config, mut model) = match config_path {
let (config, mut model, data_locations) = match config_path {
Some(config_path) => {
#[derive(Serialize, Deserialize)]
struct ModelWrapper {
#[serde(flatten)]
pub model: Model,
}

#[derive(Serialize, Deserialize)]
struct State {
pub config: CoordinatorConfig,
pub model: Model,
pub model: ModelWrapper,
}

// First, parse without data_locations to get the Model enum
let state: State = toml::from_str(std::str::from_utf8(
&std::fs::read(&config_path)
.with_context(|| format!("failed to read config toml file {config_path:?}"))?,
)?)
.with_context(|| format!("failed to parse config toml file {config_path:?}"))?;

(Some(state.config), Some(state.model))
// Then parse just the data_locations separately
#[derive(Serialize, Deserialize)]
struct DataLocationsWrapper {
pub data_locations: Vec<LLMTrainingDataLocation>,
}

#[derive(Serialize, Deserialize)]
struct LLMSection {
#[serde(rename = "LLM")]
pub llm: DataLocationsWrapper,
}

#[derive(Serialize, Deserialize)]
struct ModelSection {
pub model: LLMSection,
}

let data_section: ModelSection = toml::from_str(std::str::from_utf8(
&std::fs::read(&config_path)
.with_context(|| format!("failed to read config toml file {config_path:?}"))?,
)?)?;

let data_locs = LLMDataLocations {
data_locations: FixedVec::from_iter(
data_section.model.llm.data_locations.into_iter(),
),
};

(Some(state.config), Some(state.model.model), Some(data_locs))
}
None => (None, None),
None => (None, None, None),
};

model = if switch_to_hub {
Expand Down Expand Up @@ -133,6 +169,10 @@ pub async fn command_update_config_execute(
coordinator_account_state.state.coordinator.model = model;
}

if let Some(data_locations) = data_locations {
coordinator_account_state.state.coordinator.data_locations = data_locations;
}

let progress = restart_from_step.map(|step| CoordinatorProgress {
epoch: coordinator_account_state.state.coordinator.progress.epoch,
step,
Expand All @@ -148,11 +188,14 @@ pub async fn command_update_config_execute(
bail!("this invocation would not update anything, bailing.")
}

let instructions = if let Some(treasurer_index) = backend
let (instructions, data_location_instr) = if let Some(treasurer_index) = backend
.resolve_treasurer_index(&run_id, treasurer_index)
.await?
{
vec![instructions::treasurer_run_update(
let mut instructions = Vec::new();
let mut data_location_instr = Vec::new();

instructions.push(instructions::treasurer_run_update(
&run_id,
treasurer_index,
&coordinator_account,
Expand All @@ -166,10 +209,35 @@ pub async fn command_update_config_execute(
epoch_slashing_rate_per_client: None,
paused: None,
client_version: client_version.clone(),
data_location: None,
},
)]
));
if let Some(data_locations) = data_locations {
for dl in data_locations.data_locations.iter() {
data_location_instr.push(instructions::treasurer_run_update(
&run_id,
treasurer_index,
&coordinator_account,
&main_authority,
RunUpdateParams {
metadata: None,
config: None,
model: None,
progress: None,
epoch_earning_rate_total_shared: None,
epoch_slashing_rate_per_client: None,
paused: None,
client_version: None,
data_location: Some(*dl),
},
));
}
}
(instructions, data_location_instr)
} else {
let mut instructions = Vec::new();
let mut data_location_instr = Vec::new();
let data_locations_iter = data_locations.unwrap().iter().cloned().collect::<Vec<_>>();

if coordinator_update {
instructions.push(instructions::coordinator_update(
Expand All @@ -181,6 +249,19 @@ pub async fn command_update_config_execute(
model,
progress,
));
data_location_instr.push(instructions::clear_data_locations(
&run_id,
&coordinator_account,
&main_authority,
));
for dl in data_locations_iter.iter() {
data_location_instr.push(instructions::coordinator_update_data_locations(
&run_id,
&coordinator_account,
&main_authority,
Some(*dl),
));
}
}

if let Some(client_version) = client_version.clone() {
Expand All @@ -192,16 +273,21 @@ pub async fn command_update_config_execute(
));
}

instructions
(instructions, data_location_instr)
};
let signature = backend
.send_and_retry("Update config", &instructions, &[])
.await?;
println!("Updated config of {run_id} with transaction {signature}");

let signature = backend
.send_and_retry("Update data locations", &data_location_instr, &[])
.await?;

println!(" - Metadata: {metadata:#?}");
println!(" - Config: {config:#?}");
println!(" - Model: {model:#?}");
println!(" - Data locations: {data_locations:#?}");
println!(" - Progress: {progress:#?}");
println!(" - Client version: {client_version:#?}");

Expand Down
35 changes: 35 additions & 0 deletions architectures/decentralized/solana-client/src/instructions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,41 @@ pub fn coordinator_close_run(
)
}

pub fn clear_data_locations(
run_id: &str,
coordinator_account: &Pubkey,
main_authority: &Pubkey,
) -> Instruction {
let coordinator_instance = psyche_solana_coordinator::find_coordinator_instance(run_id);
anchor_instruction(
psyche_solana_coordinator::ID,
psyche_solana_coordinator::accounts::OwnerCoordinatorAccounts {
authority: *main_authority,
coordinator_instance,
coordinator_account: *coordinator_account,
},
psyche_solana_coordinator::instruction::ClearDataLocations {},
)
}

pub fn coordinator_update_data_locations(
run_id: &str,
coordinator_account: &Pubkey,
main_authority: &Pubkey,
data_location: Option<psyche_coordinator::model::LLMTrainingDataLocation>,
) -> Instruction {
let coordinator_instance = psyche_solana_coordinator::find_coordinator_instance(run_id);
anchor_instruction(
psyche_solana_coordinator::ID,
psyche_solana_coordinator::accounts::OwnerCoordinatorAccounts {
authority: *main_authority,
coordinator_instance,
coordinator_account: *coordinator_account,
},
psyche_solana_coordinator::instruction::UpdateDataLocations { data_location },
)
}

pub fn coordinator_update(
run_id: &str,
coordinator_account: &Pubkey,
Expand Down
Loading