Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
93 commits
Select commit Hold shift + click to select a range
0836df8
Make cooldown opportunistic
IAvecilla Dec 23, 2025
80550e6
Fix some things
IAvecilla Dec 23, 2025
be24bda
Fix compilation
IAvecilla Dec 23, 2025
20ac8a2
End epoch after checkpointer
IAvecilla Dec 23, 2025
514c993
Make hub_repo checkpoint mandatory
IAvecilla Dec 23, 2025
3553f7d
Use client as checkpointer and pick random
IAvecilla Jan 5, 2026
00b97f1
Reuse opportunistic logic
IAvecilla Jan 5, 2026
0566a4a
Add anyhow to cargo lock
IAvecilla Jan 5, 2026
203d0c4
Fix opportunistic cooldown message
IAvecilla Jan 6, 2026
e8b1dd8
Add random index for client
IAvecilla Jan 6, 2026
0b6e9f5
Remove rand and get checkpointers at cooldown state
IAvecilla Jan 6, 2026
c001fa5
Rework on cooldown to select pseudo random
IAvecilla Jan 7, 2026
ef3b468
Fix clippy
IAvecilla Jan 7, 2026
8d8c3f4
Add support for multiple checkpointers
IAvecilla Jan 8, 2026
410c192
Fix lint
IAvecilla Jan 8, 2026
3ee7cb6
Update nano config
IAvecilla Jan 8, 2026
2b07c7d
Remove send_checkpoint function from backend
IAvecilla Jan 8, 2026
7fb8add
Reduce total amount of checkpointers
IAvecilla Jan 8, 2026
e7421ed
Remove check for permissions on hf repo
IAvecilla Jan 8, 2026
a215881
Merge branch 'main' into cooldown-rework
IAvecilla Jan 8, 2026
a66f089
Fix compilation error after merge
IAvecilla Jan 8, 2026
5d99bf0
Use convert function only on python trainer
IAvecilla Jan 8, 2026
2f77f46
Add conditional import for python feature
IAvecilla Jan 8, 2026
284ae5d
Fix decentralized integration test config and entrypoints
IAvecilla Jan 8, 2026
ba70ece
fix bug and remove debug prints
Jan 9, 2026
f82e387
remove unnecesary code
Jan 9, 2026
10e81f3
Add GCS checkpoint variant
IAvecilla Jan 9, 2026
0cb4ba0
Merge branch 'gcs-2' into gcs-upload-model
IAvecilla Jan 12, 2026
9fcf808
polish nits on coordinator code
Jan 12, 2026
9209a86
Merge branch 'main' into cooldown-rework
Jan 12, 2026
e893c7c
Fix convert call
IAvecilla Jan 12, 2026
df99802
Refactor on model download and upload
IAvecilla Jan 12, 2026
a41c457
Fix import with python feature
IAvecilla Jan 12, 2026
b19bc90
Remove vllm from nix
IAvecilla Jan 12, 2026
f02fd59
Merge branch 'gcs-upload-model' into cooldown-rework
IAvecilla Jan 12, 2026
9daad93
Add cancellation process after one client checkpoints
IAvecilla Jan 12, 2026
4511ae4
Fix cancellation for upload task
IAvecilla Jan 13, 2026
64ace6f
General refactor and cleanup for new checkpointers logic
IAvecilla Jan 13, 2026
d19df41
Remove comments
IAvecilla Jan 13, 2026
86ba846
Remove hub-repo and gcs-bucket from train args
IAvecilla Jan 14, 2026
4482623
Fix prefix
IAvecilla Jan 14, 2026
5ea5564
Fix tcp example
IAvecilla Jan 14, 2026
8da9c66
Calculate checkpointers instead of adding new config
IAvecilla Jan 14, 2026
6f7aaf2
fixing centralized tests
IAvecilla Jan 14, 2026
860f50d
Merge branch 'gcs-2' into cooldown-rework
IAvecilla Jan 14, 2026
3471706
Fix tcp example compilation
IAvecilla Jan 14, 2026
e8a4e50
Fix centralized tests avoiding uploading checks
IAvecilla Jan 14, 2026
1f545f9
Add test mode cli arg for training
IAvecilla Jan 14, 2026
5706ab7
Fix flag
IAvecilla Jan 14, 2026
5f0edb4
Lower cooldown time for centralized tests
IAvecilla Jan 15, 2026
8b0608e
Merge branch 'cooldown-rework' into remove-checkpointers-config
IAvecilla Jan 15, 2026
d53ff7b
update gcp crate to 1.5.x version
dsocolobsky Jan 14, 2026
978b63d
Merge branch 'cooldown-rework' into dy/gcp-new-version
dsocolobsky Jan 15, 2026
15b2d5e
Add docs with new cooldown behavior
IAvecilla Jan 15, 2026
634d926
Merge branch 'cooldown-rework' into dy/gcp-new-version
entropidelic Jan 16, 2026
172293d
Fix extra docs
IAvecilla Jan 16, 2026
d2ca5cb
Merge branch 'cooldown-rework' into dy/gcp-new-version
dsocolobsky Jan 16, 2026
90339fe
Merge branch 'gcs-2' into cooldown-rework
IAvecilla Jan 16, 2026
6254788
fix google cloud storage code
dsocolobsky Jan 16, 2026
64f683b
Merge branch 'cooldown-rework' into dy/gcp-new-version
dsocolobsky Jan 16, 2026
3bf0e42
Merge branch 'cooldown-rework' into remove-checkpoint-repo-arg
IAvecilla Jan 16, 2026
5355592
Remove hub-repo flag from test
IAvecilla Jan 16, 2026
e99a049
Merge branch 'gcs-2' into cooldown-rework
IAvecilla Jan 16, 2026
cb5b265
Add check for permissions before joining the run
IAvecilla Jan 16, 2026
1b0c1a6
Remove `--hub-repo` and `--gcs-bucket`from train args (#490)
IAvecilla Jan 16, 2026
f155690
add GCS credentials to scratch dir used in run manager
Jan 16, 2026
329c604
Merge branch 'cooldown-rework' into run-manager/gcs-credentials
entropidelic Jan 16, 2026
f361b93
Remove google-cloud-storage from coordinator toml
IAvecilla Jan 16, 2026
611f322
Remove hub repo arguments in test
IAvecilla Jan 16, 2026
dd55838
Merge branch 'cooldown-rework' into run-manager/gcs-credentials
entropidelic Jan 16, 2026
2809135
Update to version 1.6
IAvecilla Jan 16, 2026
089ca65
Fix extra args in script
IAvecilla Jan 16, 2026
2402b4e
Remove sanity check for permissions
IAvecilla Jan 16, 2026
ad5ba5e
Merge branch 'cooldown-rework' into dy/gcp-new-version
IAvecilla Jan 16, 2026
6983523
Check bucket and repo permissions before joining run
IAvecilla Jan 19, 2026
de7e19d
Fix centralized config
IAvecilla Jan 19, 2026
95c8a83
Add new NoUpload to avoid checkpointing in tests
IAvecilla Jan 19, 2026
84c02b3
Merge branch 'dy/gcp-new-version' into cooldown-rework
IAvecilla Jan 19, 2026
49669ff
Fix train solana test script
IAvecilla Jan 19, 2026
e048b25
CooldownStep.checkpoint_complete
pefontana Jan 19, 2026
581d87f
Fix crash after credential errors
IAvecilla Jan 20, 2026
5ff3184
Update only safetensors for cehckpointing
IAvecilla Jan 20, 2026
a8444b2
Refactor hub repo upload
IAvecilla Jan 20, 2026
b6802b5
Uncomment docker cleanup
IAvecilla Jan 20, 2026
da2fab6
Remove NoCheckpoint variant and use a client flag instead
IAvecilla Jan 20, 2026
d6a1201
Send cooldown witness on skip checkpoint
IAvecilla Jan 20, 2026
816ba5b
Skip local save with skip upload flag
IAvecilla Jan 20, 2026
3b83b2c
Merge branch 'cooldown-rework' into run-manager/gcs-credentials
IAvecilla Jan 20, 2026
c6016b7
Remove comment and add early return on skip upload check
IAvecilla Jan 26, 2026
98c1abe
Fix centralized permission check to upload
IAvecilla Jan 26, 2026
7d8bf2c
Fix cooldown checks and update comment on test flag
IAvecilla Jan 26, 2026
fb4810d
Merge branch 'gcs-2' into cooldown-rework
IAvecilla Jan 27, 2026
0134d01
Merge branch 'gcs-2' into cooldown-rework
IAvecilla Jan 29, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
509 changes: 367 additions & 142 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ indicatif = "0.17.5"
tokenizers = { version = "0.20.0", default-features = false, features = [
"onig",
] }
google-cloud-storage = "1.6.0"
tch = { git = "https://github.com/jquesnelle/tch-rs.git", rev = "11d1ca2ef6dbd3f1e5b0986fab0a90fbb6734496" }
torch-sys = { git = "https://github.com/jquesnelle/tch-rs.git", rev = "11d1ca2ef6dbd3f1e5b0986fab0a90fbb6734496" }
pyo3-tch = { git = "https://github.com/jquesnelle/tch-rs.git", rev = "11d1ca2ef6dbd3f1e5b0986fab0a90fbb6734496" }
Expand Down
2 changes: 2 additions & 0 deletions architectures/centralized/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ time.workspace = true
bytemuck.workspace = true
clap-markdown.workspace = true
hex = "0.4.3"
bytes.workspace = true
google-cloud-storage.workspace = true
psyche-python-extension-impl = { workspace = true, optional = true }

[features]
Expand Down
82 changes: 61 additions & 21 deletions architectures/centralized/client/src/app.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use anyhow::{Error, Result};
use bytemuck::Zeroable;
use hf_hub::Repo;
use google_cloud_storage::client::{Storage, StorageControl};
use psyche_centralized_shared::{ClientId, ClientToServerMessage, ServerToClientMessage};
use psyche_client::HubUploadInfo;
use psyche_client::UploadInfo;
use psyche_client::{
Client, ClientTUI, ClientTUIState, NC, RunInitConfig, TrainArgs, read_identity_secret_key,
};
use psyche_coordinator::{Coordinator, HealthChecks, model};
use psyche_client::{GcsUploadInfo, HubUploadInfo, UploadInfo};
use psyche_coordinator::model::Checkpoint;
use psyche_coordinator::{Coordinator, HealthChecks};
use psyche_metrics::ClientMetrics;
use psyche_network::{
AuthenticatableIdentity, EndpointId, NetworkTUIState, NetworkTui, SecretKey, TcpClient,
Expand All @@ -31,7 +31,7 @@ pub type TabsData = <Tabs as CustomWidget>::Data;
pub enum ToSend {
Witness(Box<OpportunisticData>),
HealthCheck(HealthChecks<ClientId>),
Checkpoint(model::Checkpoint),
Checkpoint(Checkpoint),
}

struct Backend {
Expand Down Expand Up @@ -69,7 +69,7 @@ impl WatcherBackend<ClientId> for Backend {
Ok(())
}

async fn send_checkpoint(&mut self, checkpoint: model::Checkpoint) -> Result<()> {
async fn send_checkpoint(&mut self, checkpoint: Checkpoint) -> Result<()> {
self.tx.send(ToSend::Checkpoint(checkpoint))?;
Ok(())
}
Expand All @@ -84,13 +84,15 @@ pub struct App {
server_conn: TcpClient<ClientId, ClientToServerMessage, ServerToClientMessage>,

metrics: Arc<ClientMetrics>,
skip_upload_check: bool,
}

pub async fn build_app(
cancel: CancellationToken,
server_addr: String,
tx_tui_state: Option<Sender<TabsData>>,
p: TrainArgs,
is_test: bool,
) -> Result<(
App,
allowlist::AllowDynamic,
Expand Down Expand Up @@ -162,6 +164,7 @@ pub async fn build_app(
server_conn,
run_id: p.run_id,
metrics,
skip_upload_check: is_test,
};
Ok((app, allowlist, p2p, state_options))
}
Expand All @@ -173,22 +176,59 @@ impl App {
p2p: NC,
state_options: RunInitConfig<ClientId, ClientId>,
) -> Result<()> {
// sanity checks
if let Some(checkpoint_config) = &state_options.checkpoint_config {
if let Some(UploadInfo::Hub(HubUploadInfo {
hub_repo,
hub_token,
})) = &checkpoint_config.upload_info
{
let api = hf_hub::api::tokio::ApiBuilder::new()
.with_token(Some(hub_token.clone()))
.build()?;
let repo_api = api.repo(Repo::new(hub_repo.clone(), hf_hub::RepoType::Model));
if !repo_api.is_writable().await {
// Sanity checks using the checkpoint config from state_options, not the zeroed coordinator state.
// The coordinator_state is only populated after receiving the first ServerToClientMessage::Coordinator.
if !self.skip_upload_check {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can invert the check and return early

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to execute the rest of the code in that function, so I don’t think we can return early there, unless you meant something else?

let upload_info = match &state_options.checkpoint_config {
config if config.skip_upload => Some(UploadInfo::Dummy()),
config => {
// Use HF_TOKEN from checkpoint_config for Hub uploads
if let Some(ref hub_token) = config.hub_token {
Some(UploadInfo::Hub(HubUploadInfo {
hub_repo: String::new(), // Will be validated when actual checkpoint is received
hub_token: hub_token.clone(),
}))
} else {
// Check if GCS credentials are available by attempting to create a client
match Storage::builder().build().await {
Ok(_) => Some(UploadInfo::Gcs(GcsUploadInfo {
gcs_bucket: String::new(), // Will be validated when actual checkpoint is received
gcs_prefix: None,
})),
Err(_) => None,
}
}
}
};

match upload_info {
Some(UploadInfo::Hub(HubUploadInfo {
hub_repo: _,
hub_token,
})) => {
let _api = hf_hub::api::tokio::ApiBuilder::new()
.with_token(Some(hub_token.clone()))
.build()?;
}
Some(UploadInfo::Gcs(_gcs_info)) => {
let _storage = Storage::builder()
.build()
.await
.map_err(|e| anyhow::anyhow!("Failed to create GCS client: {}", e))?;

let _storage_control =
StorageControl::builder().build().await.map_err(|e| {
anyhow::anyhow!("Failed to create GCS control client: {}", e)
})?;
// GCS credentials are valid - actual bucket writability will be checked during checkpoint
}
Some(UploadInfo::Dummy()) => {
// In test mode or skip_upload mode, we skip upload checks
}
None => {
anyhow::bail!(
"Checkpoint upload repo {} is not writable with the passed API key.",
hub_repo
)
"No upload credentials found for checkpointing. Set HF_TOKEN for HuggingFace Hub or configure GCS credentials."
);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion architectures/centralized/client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ async fn async_main() -> Result<()> {
)?;

let (mut app, allowlist, p2p, state_options) =
build_app(cancel, server_addr, tx_tui_state, args)
build_app(cancel, server_addr, tx_tui_state, args, false)
.await
.unwrap();

Expand Down
7 changes: 5 additions & 2 deletions architectures/centralized/server/src/app.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::{Result, anyhow, bail};
use async_trait::async_trait;
use psyche_centralized_shared::{ClientId, ClientToServerMessage, ServerToClientMessage};
use psyche_coordinator::model::{self, Checkpoint, LLM, LLMTrainingDataLocation, Model};
use psyche_coordinator::model::{Checkpoint, LLM, LLMTrainingDataLocation, Model};
use psyche_coordinator::{
Client, ClientState, Coordinator, CoordinatorError, HealthChecks, Round, RunState,
SOLANA_MAX_NUM_CLIENTS, TickResult,
Expand Down Expand Up @@ -81,7 +81,7 @@ impl psyche_watcher::Backend<ClientId> for ChannelCoordinatorBackend {
bail!("Server does not send health checks");
}

async fn send_checkpoint(&mut self, _checkpoint: model::Checkpoint) -> Result<()> {
async fn send_checkpoint(&mut self, _checkpoint: Checkpoint) -> Result<()> {
bail!("Server does not send checkpoints");
}
}
Expand Down Expand Up @@ -402,6 +402,9 @@ impl App {
Self::get_timestamp(),
rand::rng().next_u64(),
),
OpportunisticData::CooldownStep(witness) => {
self.coordinator.cooldown_witness(&from, witness)
}
} {
warn!("Error when processing witness: {error}");
};
Expand Down
2 changes: 2 additions & 0 deletions architectures/centralized/testing/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ impl Client {
client_app_params.server_addr,
None,
client_app_params.train_args,
true,
)
.await
.unwrap();
Expand All @@ -57,6 +58,7 @@ impl Client {
client_app_params.server_addr,
None,
client_app_params.train_args,
true,
)
.await
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion architectures/centralized/testing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ pub mod test_utils;
pub const WARMUP_TIME: u64 = 60;
pub const MAX_ROUND_TRAIN_TIME: u64 = 5;
pub const ROUND_WITNESS_TIME: u64 = 2;
pub const COOLDOWN_TIME: u64 = 3;
pub const COOLDOWN_TIME: u64 = 5;
2 changes: 2 additions & 0 deletions architectures/centralized/testing/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ pub fn dummy_client_app_params_with_training_delay(
run_id: &str,
training_delay_secs: u64,
) -> AppParams {
std::env::set_var("HF_TOKEN", "dummy_token");
AppParams {
cancel: CancellationToken::default(),
server_addr: format!("localhost:{server_port}").to_string(),
Expand All @@ -141,6 +142,7 @@ pub fn dummy_client_app_params_with_training_delay(
"--max-concurrent-parameter-requests", "10",
"--hub-max-concurrent-downloads", "1",
"--dummy-training-delay-secs", training_delay_secs.to_string().as_str(),
"--skip-checkpoint-upload",
])
.train_args,
}
Expand Down
12 changes: 10 additions & 2 deletions architectures/decentralized/justfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ set working-directory := '../../'
# In case a recipe is not found here, it will fallback to the root justfile.

AUTHORIZER := env_var_or_default("AUTHORIZER", "11111111111111111111111111111111")
HF_TOKEN := env_var_or_default("HF_TOKEN", "")
GOOGLE_APPLICATION_CREDENTIALS := env_var_or_default("GOOGLE_APPLICATION_CREDENTIALS", "")

set fallback := true

Expand Down Expand Up @@ -37,10 +39,16 @@ setup-solana-localnet-permissioned-light-test-run-treasurer run_id="test" *args=
RUN_ID={{ run_id }} CONFIG_FILE=./config/solana-test/light-config.toml ./scripts/deploy-solana-test.sh --treasurer {{ args }}

start-training-localnet-client run_id="test" *args='':
AUTHORIZER={{ AUTHORIZER }} RUN_ID={{ run_id }} ./scripts/train-solana-test.sh {{ args }}
HF_TOKEN={{ HF_TOKEN }} GOOGLE_APPLICATION_CREDENTIALS={{ GOOGLE_APPLICATION_CREDENTIALS }} AUTHORIZER={{ AUTHORIZER }} CHECKPOINT="false" RUN_ID={{ run_id }} ./scripts/train-solana-test.sh {{ args }}

start-training-localnet-light-client run_id="test" *args='':
AUTHORIZER={{ AUTHORIZER }} RUN_ID={{ run_id }} BATCH_SIZE=1 DP=1 ./scripts/train-solana-test.sh {{ args }}
HF_TOKEN={{ HF_TOKEN }} GOOGLE_APPLICATION_CREDENTIALS={{ GOOGLE_APPLICATION_CREDENTIALS }} AUTHORIZER={{ AUTHORIZER }} CHECKPOINT="false" RUN_ID={{ run_id }} BATCH_SIZE=1 DP=1 ./scripts/train-solana-test.sh {{ args }}

start-training-localnet-light-client-checkpoint run_id="test" *args='':
HF_TOKEN={{ HF_TOKEN }} GOOGLE_APPLICATION_CREDENTIALS={{ GOOGLE_APPLICATION_CREDENTIALS }} AUTHORIZER={{ AUTHORIZER }} CHECKPOINT="true" RUN_ID={{ run_id }} BATCH_SIZE=1 DP=1 ./scripts/train-solana-test.sh {{ args }}

start-training-localnet-client-checkpoint run_id="test" *args='':
HF_TOKEN={{ HF_TOKEN }} GOOGLE_APPLICATION_CREDENTIALS={{ GOOGLE_APPLICATION_CREDENTIALS }} AUTHORIZER={{ AUTHORIZER }} CHECKPOINT="true" RUN_ID={{ run_id }} ./scripts/train-solana-test.sh {{ args }}

OTLP_METRICS_URL := "http://localhost:4318/v1/metrics"
OTLP_LOGS_URL := "http://localhost:4318/v1/logs"
Expand Down
2 changes: 2 additions & 0 deletions architectures/decentralized/solana-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ time.workspace = true
tokio.workspace = true
tokio-util.workspace = true
tracing.workspace = true
google-cloud-storage.workspace = true
hf-hub.workspace = true
psyche-python-extension-impl = { workspace = true, optional = true }

[features]
Expand Down
81 changes: 79 additions & 2 deletions architectures/decentralized/solana-client/src/app.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::network_identity::NetworkIdentity;
use google_cloud_storage::client::StorageControl;
use hf_hub::Repo;
use psyche_solana_rpc::SolanaBackend;

use anchor_client::{
Expand All @@ -11,9 +13,13 @@ use anchor_client::{
};
use anyhow::{Result, anyhow};
use psyche_client::{
Client, ClientTUI, ClientTUIState, NC, RunInitConfig, TrainArgs, read_identity_secret_key,
Client, ClientTUI, ClientTUIState, GcsUploadInfo, HubUploadInfo, NC, RunInitConfig, TrainArgs,
UploadInfo, read_identity_secret_key,
};
use psyche_coordinator::{
ClientState, Coordinator, CoordinatorError, RunState,
model::{self, GcsRepo, HubRepo, LLM, Model},
};
use psyche_coordinator::{ClientState, Coordinator, CoordinatorError, RunState};
use psyche_core::sha256;
use psyche_metrics::ClientMetrics;

Expand Down Expand Up @@ -226,6 +232,77 @@ impl App {
let mut joined_run_this_epoch = None;
let mut ever_joined_run = false;

// sanity checks
let Model::LLM(LLM { checkpoint, .. }) = start_coordinator_state.model;
let upload_info = match checkpoint {
model::Checkpoint::Hub(HubRepo { repo_id, revision })
| model::Checkpoint::P2P(HubRepo { repo_id, revision }) => {
Some(UploadInfo::Hub(HubUploadInfo {
hub_repo: (&repo_id).into(),
hub_token: (&revision.unwrap_or_default()).into(),
}))
}
model::Checkpoint::Gcs(GcsRepo { bucket, prefix })
| model::Checkpoint::P2PGcs(model::GcsRepo { bucket, prefix }) => {
Some(UploadInfo::Gcs(GcsUploadInfo {
gcs_bucket: (&bucket).into(),
gcs_prefix: Some((&prefix.unwrap_or_default()).into()),
}))
}
_ => None,
};
match upload_info {
Some(UploadInfo::Hub(hub_info)) => {
let api = hf_hub::api::tokio::ApiBuilder::new()
.with_token(Some(hub_info.hub_token))
.build()?;
let repo_api = api.repo(Repo::new(
hub_info.hub_repo.clone(),
hf_hub::RepoType::Model,
));
if !repo_api.is_writable().await {
anyhow::bail!(
"Checkpoint upload repo {} is not writable with the passed API key.",
hub_info.hub_repo
)
}
}
Some(UploadInfo::Gcs(gcs_info)) => {
let client = StorageControl::builder().build().await?;

let permissions_to_test = vec![
"storage.objects.list",
"storage.objects.get",
"storage.objects.create",
"storage.objects.delete",
];

let resource = format!("projects/_/buckets/{}", gcs_info.gcs_bucket);
let perms_vec: Vec<String> =
permissions_to_test.iter().map(|s| s.to_string()).collect();
let response = client
.test_iam_permissions()
.set_resource(&resource)
.set_permissions(perms_vec)
.send()
.await?;

let correct_permissions = permissions_to_test
.into_iter()
.all(|p| response.permissions.contains(&p.to_string()));
if !correct_permissions {
anyhow::bail!(
"GCS bucket {} does not have the required permissions for checkpoint upload make sure to set GOOGLE_APPLICATION_CREDENTIALS environment variable correctly and have the correct permissions to the bucket.",
gcs_info.gcs_bucket
)
}
}
Some(UploadInfo::Dummy()) => {
// In test mode, we skip upload checks
}
None => {}
}

// if we're already in "WaitingForMembers" we won't get an update saying that
// (subscription is on change), so check if it's in that state right at boot
// and join the run if so
Expand Down
10 changes: 8 additions & 2 deletions architectures/decentralized/solana-common/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use anchor_client::{
};
use anyhow::{Context, Result, anyhow};
use futures_util::StreamExt;
use psyche_coordinator::model::{self, Checkpoint};
use psyche_coordinator::model::Checkpoint;
use psyche_coordinator::{CommitteeProof, Coordinator, HealthChecks};
use psyche_core::IntegrationTestLogMarker;
use psyche_watcher::{Backend as WatcherBackend, OpportunisticData};
Expand Down Expand Up @@ -309,6 +309,12 @@ impl SolanaBackend {
&user,
witness,
),
OpportunisticData::CooldownStep(witness) => instructions::coordinator_cooldown_witness(
&coordinator_instance,
&coordinator_account,
&user,
witness,
),
};
self.spawn_scheduled_send("Witness", &[instruction], &[]);
}
Expand Down Expand Up @@ -605,7 +611,7 @@ impl WatcherBackend<psyche_solana_coordinator::ClientId> for SolanaBackendRunner
Ok(())
}

async fn send_checkpoint(&mut self, checkpoint: model::Checkpoint) -> Result<()> {
async fn send_checkpoint(&mut self, checkpoint: Checkpoint) -> Result<()> {
self.backend
.send_checkpoint(self.instance, self.account, checkpoint);
Ok(())
Expand Down
Loading