diff --git a/magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs b/magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs index 11bd4c322..b74dcc62c 100644 --- a/magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs +++ b/magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs @@ -1,9 +1,7 @@ -use std::sync::atomic::AtomicU16; - +use dlp::state::DelegationRecord; +use futures_util::future::join_all; use magicblock_accounts_db::traits::AccountsBank; -use magicblock_core::{ - logger::log_trace_warn, token_programs::try_derive_eata_address_and_bump, -}; +use magicblock_core::token_programs::try_derive_eata_address_and_bump; use magicblock_metrics::metrics; use solana_account::AccountSharedData; use solana_pubkey::Pubkey; @@ -13,7 +11,9 @@ use tracing::*; use super::{delegation, types::AccountWithCompanion, FetchCloner}; use crate::{ cloner::{AccountCloneRequest, Cloner}, - remote_account_provider::{ChainPubsubClient, ChainRpcClient}, + remote_account_provider::{ + ChainPubsubClient, ChainRpcClient, ResolvedAccountSharedData, + }, }; /// Resolves ATAs with eATA projection. @@ -45,35 +45,18 @@ where let mut accounts_to_clone = vec![]; let mut ata_join_set = JoinSet::new(); - // Subscribe first so subsequent fetches are kept up-to-date + // Collect all pubkeys to subscribe to and spawn fetch tasks + let mut pubkeys_to_subscribe = vec![]; + for (ata_pubkey, _, ata_info, ata_account_slot) in &atas { - if let Err(err) = this.subscribe_to_account(ata_pubkey).await { - static ATA_SUBSCRIPTION_FAILURE_COUNT: AtomicU16 = - AtomicU16::new(0); - log_trace_warn( - "Failed to subscribe to ATA", - "Failed to subscribe to ATAs", - &ata_pubkey, - &err, - 1000, - &ATA_SUBSCRIPTION_FAILURE_COUNT, - ); - } + // Collect ATA pubkey for subscription + pubkeys_to_subscribe.push(*ata_pubkey); + if let Some((eata, _)) = try_derive_eata_address_and_bump(&ata_info.owner, &ata_info.mint) { - if let Err(err) = this.subscribe_to_account(&eata).await { - static EATA_SUBSCRIPTION_FAILURE_COUNT: AtomicU16 = - AtomicU16::new(0); - log_trace_warn( - "Failed to subscribe to derived eATA", - "Failed to subscribe to derived eATAs", - &eata, - &err, - 1000, - &EATA_SUBSCRIPTION_FAILURE_COUNT, - ); - } + // Collect eATA pubkey for subscription + pubkeys_to_subscribe.push(eata); let effective_slot = if let Some(min_slot) = min_context_slot { min_slot.max(*ata_account_slot) @@ -90,8 +73,38 @@ where } } + // Subscribe to all ATA and eATA accounts in parallel + let subscription_results = join_all( + pubkeys_to_subscribe + .iter() + .map(|pk| this.subscribe_to_account(pk)), + ) + .await; + + for (pubkey, result) in + pubkeys_to_subscribe.iter().zip(subscription_results) + { + if let Err(err) = result { + warn!( + pubkey = %pubkey, + err = ?err, + "Failed to subscribe to ATA/eATA account" + ); + } + } + let ata_results = ata_join_set.join_all().await; + // Phase 1: Collect successfully resolved ATAs + struct AtaResolutionInput { + ata_pubkey: Pubkey, + ata_account: ResolvedAccountSharedData, + eata_pubkey: Pubkey, + eata_shared: Option, + } + + let mut ata_inputs: Vec = Vec::new(); + for result in ata_results { let AccountWithCompanion { pubkey: ata_pubkey, @@ -110,31 +123,48 @@ where } }; - // Defaults: clone the ATA as-is - let mut account_to_clone = ata_account.account_shared_data_cloned(); - let mut commit_frequency_ms = None; - let mut delegated_to_other = None; - - // If there's an eATA, try to use it + delegation record to project the ATA - if let Some(eata_acc) = maybe_eata_account { - let eata_shared = eata_acc.account_shared_data_cloned(); + let eata_shared = + maybe_eata_account.map(|e| e.account_shared_data_cloned()); + ata_inputs.push(AtaResolutionInput { + ata_pubkey, + ata_account, + eata_pubkey, + eata_shared, + }); + } - if let Some(deleg) = delegation::fetch_and_parse_delegation_record( + // Phase 2: Fetch delegation records in parallel for all eATAs + let deleg_futures = ata_inputs.iter().filter_map(|input| { + input.eata_shared.as_ref().map(|_| { + delegation::fetch_and_parse_delegation_record( this, - eata_pubkey, + input.eata_pubkey, this.remote_account_provider.chain_slot(), fetch_origin, ) - .await - { + }) + }); + let deleg_results: Vec> = + join_all(deleg_futures).await; + + // Phase 3: Combine results + let mut deleg_iter = deleg_results.into_iter(); + for input in ata_inputs { + let mut account_to_clone = + input.ata_account.account_shared_data_cloned(); + let mut commit_frequency_ms = None; + let mut delegated_to_other = None; + + if let Some(eata_shared) = &input.eata_shared { + if let Some(Some(deleg)) = deleg_iter.next() { delegated_to_other = delegation::get_delegated_to_other(this, &deleg); commit_frequency_ms = Some(deleg.commit_frequency_ms); if let Some(projected_ata) = this .maybe_project_delegated_ata_from_eata( - ata_account.account_shared_data(), - &eata_shared, + input.ata_account.account_shared_data(), + eata_shared, &deleg, ) { @@ -144,7 +174,7 @@ where } accounts_to_clone.push(AccountCloneRequest { - pubkey: ata_pubkey, + pubkey: input.ata_pubkey, account: account_to_clone, commit_frequency_ms, delegated_to_other, diff --git a/magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs b/magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs index 8b94864d8..31f3e5193 100644 --- a/magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs +++ b/magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs @@ -61,7 +61,6 @@ use crate::{ type RemoteAccountRequests = Vec>; -#[derive(Clone)] pub struct FetchCloner where T: ChainRpcClient, @@ -90,6 +89,30 @@ where allowed_programs: Option>, } +/// Manual Clone impl: `#[derive(Clone)]` would add `V: Clone, C: Clone` +/// bounds that are not satisfied (`AccountsBank` and `Cloner` don't +/// require `Clone`). All fields are behind `Arc` so Clone is not needed. +impl Clone for FetchCloner +where + T: ChainRpcClient, + U: ChainPubsubClient, + V: AccountsBank, + C: Cloner, +{ + fn clone(&self) -> Self { + Self { + remote_account_provider: self.remote_account_provider.clone(), + pending_requests: self.pending_requests.clone(), + fetch_count: self.fetch_count.clone(), + accounts_bank: self.accounts_bank.clone(), + cloner: self.cloner.clone(), + validator_pubkey: self.validator_pubkey, + blacklisted_accounts: self.blacklisted_accounts.clone(), + allowed_programs: self.allowed_programs.clone(), + } + } +} + impl FetchCloner where T: ChainRpcClient, @@ -152,161 +175,218 @@ where } } - /// Start listening to subscription updates + /// Start listening to subscription updates. + /// Uses a JoinSet-based loop with try_recv/select! for backpressure + /// and task lifecycle management instead of unbounded tokio::spawn. pub fn start_subscription_listener( self: Arc, mut subscription_updates: mpsc::Receiver, ) { tokio::spawn(async move { - while let Some(update) = subscription_updates.recv().await { - let pubkey = update.pubkey; - let slot = update.account.slot(); - trace!(pubkey = %pubkey, slot, "FetchCloner received subscription update"); - - // Process each subscription update concurrently to avoid blocking on delegation - // record fetches. This allows multiple updates to be processed in parallel. - let this = Arc::clone(&self); - tokio::spawn(async move { - let (resolved_account, deleg_record) = - this.resolve_account_to_clone_from_forwarded_sub_with_unsubscribe(update) - .await; - if let Some(account) = resolved_account { - // Ensure that the subscription update isn't out of order, i.e. we don't already - // hold a newer version of the account in our bank - let out_of_order_slot = this - .accounts_bank - .get_account(&pubkey) - .and_then(|in_bank| { - if in_bank.remote_slot() - >= account.remote_slot() - { - Some(in_bank.remote_slot()) - } else { - None - } - }); - if let Some(in_bank_slot) = out_of_order_slot { - let update_slot = account.remote_slot(); - trace!( - pubkey = %pubkey, - bank_slot = in_bank_slot, - update_slot, - "Ignoring out-of-order subscription update" - ); - return; - } + let mut pending_tasks: JoinSet<()> = JoinSet::new(); - if let Some(in_bank) = - this.accounts_bank.get_account(&pubkey) - { - if in_bank.delegated() && !in_bank.undelegating() { - this.unsubscribe_from_delegated_account(pubkey) - .await; - return; - } + loop { + match subscription_updates.try_recv() { + Ok(update) => { + let pubkey = update.pubkey; + trace!( + pubkey = %pubkey, + "FetchCloner received subscription update" + ); - if in_bank.undelegating() { - // We expect the account to still be delegated, but with the delegation - // program owner - debug!( - pubkey = %pubkey, - in_bank_delegated = in_bank.delegated(), - in_bank_owner = %in_bank.owner(), - in_bank_slot = in_bank.remote_slot(), - chain_delegated = account.delegated(), - chain_owner = %account.owner(), - chain_slot = account.remote_slot(), - "Received update for undelegating account" - ); + let this = Arc::clone(&self); + pending_tasks.spawn(async move { + Self::process_subscription_update( + &this, pubkey, update, + ) + .await; + }); - // This will only be true in the following case: - // 1. a commit was triggered for the account - // 2. a commit + undelegate was triggered for the account -> undelegating - // 3. we receive the update for (1.) - // - // Thus our state is more up to date and we don't need to update our - // bank. - if account_still_undelegating_on_chain( - &pubkey, - account.delegated(), - in_bank.remote_slot(), - deleg_record, - &this.validator_pubkey, - ) { - return; - } - } else if in_bank.owner().eq(&dlp::id()) { - debug!( - pubkey = %pubkey, - "Received update for account owned by delegation program but not marked as undelegating" + while let Some(result) = pending_tasks.try_join_next() { + if let Err(err) = result { + error!( + error = ?err, + "Subscription update task panicked" ); } - } else { - warn!( - pubkey = %pubkey, - "Received update for account not in bank" - ); - } - - // Determine if delegated to another validator - let delegated_to_other = deleg_record - .as_ref() - .and_then(|dr| this.get_delegated_to_other(dr)); - let projected_ata_clone_request = this - .maybe_build_projected_ata_clone_request_from_eata_sub_update( - pubkey, - &account, - deleg_record.as_ref(), - ); - - // Once we clone an account that is delegated to us we no longer need - // to receive updates for it from chain - // The subscription will be turned back on once the committor service schedules - // a commit for it that includes undelegation - if account.delegated() { - this.unsubscribe_from_delegated_account(pubkey) - .await; } - - if account.executable() { - this.handle_executable_sub_update(pubkey, account) - .await; - } else if let Err(err) = this - .cloner - .clone_account(AccountCloneRequest { - pubkey, - account, - commit_frequency_ms: None, - delegated_to_other, - }) - .await - { - error!( - pubkey = %pubkey, - error = %err, - "Failed to clone account into bank" - ); - } else if let Some(projected_ata_clone_request) = - projected_ata_clone_request - { - if let Err(err) = this - .cloner - .clone_account(projected_ata_clone_request) - .await + } + Err(mpsc::error::TryRecvError::Empty) => { + tokio::select! { + maybe_update = + subscription_updates.recv() => { - error!( - pubkey = %pubkey, - error = %err, - "Failed to clone projected ATA from delegated eATA update" - ); + let Some(update) = maybe_update else { + while pending_tasks + .join_next() + .await + .is_some() + {} + break; + }; + let pubkey = update.pubkey; + let this = Arc::clone(&self); + pending_tasks.spawn(async move { + Self::process_subscription_update( + &this, pubkey, update, + ) + .await; + }); + } + Some(result) = pending_tasks.join_next(), + if !pending_tasks.is_empty() => + { + if let Err(err) = result { + error!( + error = ?err, + "Subscription update task panicked" + ); + } } } } - }); + Err(mpsc::error::TryRecvError::Disconnected) => { + while pending_tasks.join_next().await.is_some() {} + break; + } + } } }); } + async fn process_subscription_update( + &self, + pubkey: Pubkey, + update: ForwardedSubscriptionUpdate, + ) { + let (resolved_account, deleg_record) = self + .resolve_account_to_clone_from_forwarded_sub_with_unsubscribe( + update, + ) + .await; + let Some(account) = resolved_account else { + return; + }; + + // Ensure that the subscription update isn't out of order, i.e. + // we don't already hold a newer version of the account in our bank + let out_of_order_slot = + self.accounts_bank.get_account(&pubkey).and_then(|in_bank| { + if in_bank.remote_slot() >= account.remote_slot() { + Some(in_bank.remote_slot()) + } else { + None + } + }); + if let Some(in_bank_slot) = out_of_order_slot { + let update_slot = account.remote_slot(); + trace!( + pubkey = %pubkey, + bank_slot = in_bank_slot, + update_slot, + "Ignoring out-of-order subscription update" + ); + return; + } + + if let Some(in_bank) = self.accounts_bank.get_account(&pubkey) { + if in_bank.delegated() && !in_bank.undelegating() { + self.unsubscribe_from_delegated_account(pubkey).await; + return; + } + + if in_bank.undelegating() { + debug!( + pubkey = %pubkey, + in_bank_delegated = in_bank.delegated(), + in_bank_owner = %in_bank.owner(), + in_bank_slot = in_bank.remote_slot(), + chain_delegated = account.delegated(), + chain_owner = %account.owner(), + chain_slot = account.remote_slot(), + "Received update for undelegating account" + ); + + // This will only be true in the following case: + // 1. a commit was triggered for the account + // 2. a commit + undelegate was triggered for the account -> undelegating + // 3. we receive the update for (1.) + // + // Thus our state is more up to date and we don't + // need to update our bank. + if account_still_undelegating_on_chain( + &pubkey, + account.delegated(), + in_bank.remote_slot(), + deleg_record, + &self.validator_pubkey, + ) { + return; + } + } else if in_bank.owner().eq(&dlp::id()) { + debug!( + pubkey = %pubkey, + "Received update for account owned by delegation program but not marked as undelegating" + ); + } + } else { + warn!( + pubkey = %pubkey, + "Received update for account not in bank" + ); + } + + // Determine if delegated to another validator + let delegated_to_other = deleg_record + .as_ref() + .and_then(|dr| self.get_delegated_to_other(dr)); + let projected_ata_clone_request = self + .maybe_build_projected_ata_clone_request_from_eata_sub_update( + pubkey, + &account, + deleg_record.as_ref(), + ); + + // Once we clone an account that is delegated to us we no + // longer need to receive updates for it from chain. + // The subscription will be turned back on once the committor + // service schedules a commit for it that includes undelegation. + if account.delegated() { + self.unsubscribe_from_delegated_account(pubkey).await; + } + + if account.executable() { + self.handle_executable_sub_update(pubkey, account).await; + } else if let Err(err) = self + .cloner + .clone_account(AccountCloneRequest { + pubkey, + account, + commit_frequency_ms: None, + delegated_to_other, + }) + .await + { + error!( + pubkey = %pubkey, + error = %err, + "Failed to clone account into bank" + ); + } else if let Some(projected_ata_clone_request) = + projected_ata_clone_request + { + if let Err(err) = + self.cloner.clone_account(projected_ata_clone_request).await + { + error!( + pubkey = %pubkey, + error = %err, + "Failed to clone projected ATA from delegated eATA update" + ); + } + } + } + async fn handle_executable_sub_update( &self, pubkey: Pubkey, @@ -995,30 +1075,70 @@ where let mut fetch_new = vec![]; let mut in_bank = vec![]; let mut extra_mark_empty = vec![]; + + // Phase 1: Sync bank check — separate undelegating accounts + // (which need async RPC) from non-undelegating (handled + // synchronously) + let mut undelegating_checks: Vec<(Pubkey, AccountSharedData)> = vec![]; for pubkey in pubkeys.iter() { if let Some(account_in_bank) = self.accounts_bank.get_account(pubkey) { - let decision = match tokio::time::timeout( - Duration::from_secs(5), - self.should_refresh_undelegating_in_bank_account( - pubkey, - &account_in_bank, - fetch_origin, - ), - ) - .await - { - Ok(decision) => decision, - Err(_timeout) => { - warn!( + if account_in_bank.undelegating() { + undelegating_checks.push((**pubkey, account_in_bank)); + } else { + if account_in_bank.owner().eq(&dlp::id()) { + debug!( pubkey = %pubkey, - "Timeout checking if account is still undelegating after 5 seconds" + "Account owned by deleg program not marked as undelegating" ); - RefreshDecision::No } - }; + if tracing::enabled!(tracing::Level::TRACE) { + let delegated = account_in_bank.delegated(); + let owner = account_in_bank.owner(); + trace!( + pubkey = %pubkey, + undelegating = false, + delegated, + owner = %owner, + "Account found in bank in valid state, no fetch needed" + ); + } + in_bank.push(**pubkey); + } + } + } + + // Phase 2: Parallel undelegation checks via JoinSet + if !undelegating_checks.is_empty() { + let mut join_set = JoinSet::new(); + for (pubkey, account_in_bank) in undelegating_checks { + let this = self.clone(); + join_set.spawn(async move { + let decision = match tokio::time::timeout( + Duration::from_secs(5), + this.should_refresh_undelegating_in_bank_account( + &pubkey, + &account_in_bank, + fetch_origin, + ), + ) + .await + { + Ok(decision) => decision, + Err(_timeout) => { + warn!( + pubkey = %pubkey, + "Timeout checking if account is still undelegating after 5 seconds" + ); + RefreshDecision::No + } + }; + (pubkey, decision) + }); + } + for (pubkey, decision) in join_set.join_all().await { match decision { RefreshDecision::Yes | RefreshDecision::YesAndMarkEmptyIfNotFound => { @@ -1030,24 +1150,17 @@ where if let RefreshDecision::YesAndMarkEmptyIfNotFound = decision { - extra_mark_empty.push(*pubkey); + extra_mark_empty.push(pubkey); } } RefreshDecision::No => { - // Account is in bank and subscribed correctly - no fetch needed if tracing::enabled!(tracing::Level::TRACE) { - let undelegating = account_in_bank.undelegating(); - let delegated = account_in_bank.delegated(); - let owner = account_in_bank.owner(); trace!( pubkey = %pubkey, - undelegating, - delegated, - owner = %owner, - "Account found in bank in valid state, no fetch needed" + "Undelegating account still valid, no fetch needed" ); } - in_bank.push(*pubkey); + in_bank.push(pubkey); } } } diff --git a/magicblock-chainlink/src/remote_account_provider/mod.rs b/magicblock-chainlink/src/remote_account_provider/mod.rs index c8e9189df..68d9da3dc 100644 --- a/magicblock-chainlink/src/remote_account_provider/mod.rs +++ b/magicblock-chainlink/src/remote_account_provider/mod.rs @@ -809,10 +809,46 @@ impl RemoteAccountProvider { .join(", "); trace!(pubkeys = pubkeys, "Subscribing to accounts"); } - for (pubkey, _) in subscribe_and_fetch.iter() { - // Register the subscription for the pubkey (handles LRU cache and eviction first) - self.subscribe(pubkey).await?; + // Send all subscription requests in parallel (non-fail-fast) + // We use join_all instead of try_join_all to ensure ALL + // subscribe attempts complete, even if some fail. This + // prevents resource leaks in fetching_accounts and ensures + // all oneshot receivers get a response (either success or + // error). + let subscription_results = join_all( + subscribe_and_fetch + .iter() + .map(|(pubkey, _)| self.subscribe(pubkey)), + ) + .await; + + // Collect errors and log each individual failure + let mut errors = Vec::new(); + for (result, (pubkey, _)) in + subscription_results.iter().zip(subscribe_and_fetch.iter()) + { + if let Err(err) = result { + error!( + pubkey = %pubkey, err = ?err, + "Failed to subscribe to account" + ); + errors.push(format!("{}: {}", pubkey, err)); + } + } + + // Fail if ANY subscription failed + if !errors.is_empty() { + return Err( + RemoteAccountProviderError::AccountSubscriptionsTaskFailed( + format!( + "{} subscription(s) failed: [{}]", + errors.len(), + errors.join(", ") + ), + ), + ); } + Ok(()) }