From 30125cbc61d87ed2aa5271914671ade1e22df288 Mon Sep 17 00:00:00 2001 From: Thorsten Lorenz Date: Thu, 26 Feb 2026 15:57:41 +0700 Subject: [PATCH 01/10] perf: parallelize setup_subscriptions Amp-Thread-ID: https://ampcode.com/threads/T-019c9925-c886-7351-b769-7a5523b4dc9d Co-authored-by: Amp --- .../src/remote_account_provider/mod.rs | 42 +++++++++++++++++-- 1 file changed, 39 insertions(+), 3 deletions(-) diff --git a/magicblock-chainlink/src/remote_account_provider/mod.rs b/magicblock-chainlink/src/remote_account_provider/mod.rs index c8e9189df..68d9da3dc 100644 --- a/magicblock-chainlink/src/remote_account_provider/mod.rs +++ b/magicblock-chainlink/src/remote_account_provider/mod.rs @@ -809,10 +809,46 @@ impl RemoteAccountProvider { .join(", "); trace!(pubkeys = pubkeys, "Subscribing to accounts"); } - for (pubkey, _) in subscribe_and_fetch.iter() { - // Register the subscription for the pubkey (handles LRU cache and eviction first) - self.subscribe(pubkey).await?; + // Send all subscription requests in parallel (non-fail-fast) + // We use join_all instead of try_join_all to ensure ALL + // subscribe attempts complete, even if some fail. This + // prevents resource leaks in fetching_accounts and ensures + // all oneshot receivers get a response (either success or + // error). + let subscription_results = join_all( + subscribe_and_fetch + .iter() + .map(|(pubkey, _)| self.subscribe(pubkey)), + ) + .await; + + // Collect errors and log each individual failure + let mut errors = Vec::new(); + for (result, (pubkey, _)) in + subscription_results.iter().zip(subscribe_and_fetch.iter()) + { + if let Err(err) = result { + error!( + pubkey = %pubkey, err = ?err, + "Failed to subscribe to account" + ); + errors.push(format!("{}: {}", pubkey, err)); + } + } + + // Fail if ANY subscription failed + if !errors.is_empty() { + return Err( + RemoteAccountProviderError::AccountSubscriptionsTaskFailed( + format!( + "{} subscription(s) failed: [{}]", + errors.len(), + errors.join(", ") + ), + ), + ); } + Ok(()) } From c3d5dcdc2eca75cef9e071c561c24e98b8d990c9 Mon Sep 17 00:00:00 2001 From: Thorsten Lorenz Date: Thu, 26 Feb 2026 17:05:09 +0700 Subject: [PATCH 02/10] perf: parallelize ATA/eATA subscriptions Amp-Thread-ID: https://ampcode.com/threads/T-019c9963-e3e2-7586-b020-4da38d680d7c Co-authored-by: Amp --- .../chainlink/fetch_cloner/ata_projection.rs | 60 +++++++++---------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs b/magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs index 11bd4c322..97b0ea353 100644 --- a/magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs +++ b/magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs @@ -1,9 +1,6 @@ -use std::sync::atomic::AtomicU16; - +use futures_util::future::join_all; use magicblock_accounts_db::traits::AccountsBank; -use magicblock_core::{ - logger::log_trace_warn, token_programs::try_derive_eata_address_and_bump, -}; +use magicblock_core::token_programs::try_derive_eata_address_and_bump; use magicblock_metrics::metrics; use solana_account::AccountSharedData; use solana_pubkey::Pubkey; @@ -45,35 +42,18 @@ where let mut accounts_to_clone = vec![]; let mut ata_join_set = JoinSet::new(); - // Subscribe first so subsequent fetches are kept up-to-date + // Collect all pubkeys to subscribe to and spawn fetch tasks + let mut pubkeys_to_subscribe = vec![]; + for (ata_pubkey, _, ata_info, ata_account_slot) in &atas { - if let Err(err) = this.subscribe_to_account(ata_pubkey).await { - static ATA_SUBSCRIPTION_FAILURE_COUNT: AtomicU16 = - AtomicU16::new(0); - log_trace_warn( - "Failed to subscribe to ATA", - "Failed to subscribe to ATAs", - &ata_pubkey, - &err, - 1000, - &ATA_SUBSCRIPTION_FAILURE_COUNT, - ); - } + // Collect ATA pubkey for subscription + pubkeys_to_subscribe.push(*ata_pubkey); + if let Some((eata, _)) = try_derive_eata_address_and_bump(&ata_info.owner, &ata_info.mint) { - if let Err(err) = this.subscribe_to_account(&eata).await { - static EATA_SUBSCRIPTION_FAILURE_COUNT: AtomicU16 = - AtomicU16::new(0); - log_trace_warn( - "Failed to subscribe to derived eATA", - "Failed to subscribe to derived eATAs", - &eata, - &err, - 1000, - &EATA_SUBSCRIPTION_FAILURE_COUNT, - ); - } + // Collect eATA pubkey for subscription + pubkeys_to_subscribe.push(eata); let effective_slot = if let Some(min_slot) = min_context_slot { min_slot.max(*ata_account_slot) @@ -90,6 +70,26 @@ where } } + // Subscribe to all ATA and eATA accounts in parallel + let subscription_results = join_all( + pubkeys_to_subscribe + .iter() + .map(|pk| this.subscribe_to_account(pk)), + ) + .await; + + for (pubkey, result) in + pubkeys_to_subscribe.iter().zip(subscription_results) + { + if let Err(err) = result { + warn!( + pubkey = %pubkey, + err = ?err, + "Failed to subscribe to ATA/eATA account" + ); + } + } + let ata_results = ata_join_set.join_all().await; for result in ata_results { From 79bda11bf759fb33d3c32555059b53914a6c0b59 Mon Sep 17 00:00:00 2001 From: Thorsten Lorenz Date: Thu, 26 Feb 2026 17:36:59 +0700 Subject: [PATCH 03/10] perf: parallelize undelegation refresh checks via JoinSet Amp-Thread-ID: https://ampcode.com/threads/T-019c9969-8b24-73cc-8613-af33967c16ed Co-authored-by: Amp --- .../src/chainlink/fetch_cloner/mod.rs | 110 +++++++++++++----- 1 file changed, 83 insertions(+), 27 deletions(-) diff --git a/magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs b/magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs index 8b94864d8..9768bd14c 100644 --- a/magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs +++ b/magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs @@ -61,7 +61,6 @@ use crate::{ type RemoteAccountRequests = Vec>; -#[derive(Clone)] pub struct FetchCloner where T: ChainRpcClient, @@ -90,6 +89,30 @@ where allowed_programs: Option>, } +/// Manual Clone impl: `#[derive(Clone)]` would add `V: Clone, C: Clone` +/// bounds that are not satisfied (`AccountsBank` and `Cloner` don't +/// require `Clone`). All fields are behind `Arc` so Clone is not needed. +impl Clone for FetchCloner +where + T: ChainRpcClient, + U: ChainPubsubClient, + V: AccountsBank, + C: Cloner, +{ + fn clone(&self) -> Self { + Self { + remote_account_provider: self.remote_account_provider.clone(), + pending_requests: self.pending_requests.clone(), + fetch_count: self.fetch_count.clone(), + accounts_bank: self.accounts_bank.clone(), + cloner: self.cloner.clone(), + validator_pubkey: self.validator_pubkey, + blacklisted_accounts: self.blacklisted_accounts.clone(), + allowed_programs: self.allowed_programs.clone(), + } + } +} + impl FetchCloner where T: ChainRpcClient, @@ -995,30 +1018,70 @@ where let mut fetch_new = vec![]; let mut in_bank = vec![]; let mut extra_mark_empty = vec![]; + + // Phase 1: Sync bank check — separate undelegating accounts + // (which need async RPC) from non-undelegating (handled + // synchronously) + let mut undelegating_checks: Vec<(Pubkey, AccountSharedData)> = vec![]; for pubkey in pubkeys.iter() { if let Some(account_in_bank) = self.accounts_bank.get_account(pubkey) { - let decision = match tokio::time::timeout( - Duration::from_secs(5), - self.should_refresh_undelegating_in_bank_account( - pubkey, - &account_in_bank, - fetch_origin, - ), - ) - .await - { - Ok(decision) => decision, - Err(_timeout) => { - warn!( + if account_in_bank.undelegating() { + undelegating_checks.push((**pubkey, account_in_bank)); + } else { + if account_in_bank.owner().eq(&dlp::id()) { + debug!( pubkey = %pubkey, - "Timeout checking if account is still undelegating after 5 seconds" + "Account owned by deleg program not marked as undelegating" ); - RefreshDecision::No } - }; + if tracing::enabled!(tracing::Level::TRACE) { + let delegated = account_in_bank.delegated(); + let owner = account_in_bank.owner(); + trace!( + pubkey = %pubkey, + undelegating = false, + delegated, + owner = %owner, + "Account found in bank in valid state, no fetch needed" + ); + } + in_bank.push(**pubkey); + } + } + } + + // Phase 2: Parallel undelegation checks via JoinSet + if !undelegating_checks.is_empty() { + let mut join_set = JoinSet::new(); + for (pubkey, account_in_bank) in undelegating_checks { + let this = self.clone(); + join_set.spawn(async move { + let decision = match tokio::time::timeout( + Duration::from_secs(5), + this.should_refresh_undelegating_in_bank_account( + &pubkey, + &account_in_bank, + fetch_origin, + ), + ) + .await + { + Ok(decision) => decision, + Err(_timeout) => { + warn!( + pubkey = %pubkey, + "Timeout checking if account is still undelegating after 5 seconds" + ); + RefreshDecision::No + } + }; + (pubkey, decision) + }); + } + for (pubkey, decision) in join_set.join_all().await { match decision { RefreshDecision::Yes | RefreshDecision::YesAndMarkEmptyIfNotFound => { @@ -1030,24 +1093,17 @@ where if let RefreshDecision::YesAndMarkEmptyIfNotFound = decision { - extra_mark_empty.push(*pubkey); + extra_mark_empty.push(pubkey); } } RefreshDecision::No => { - // Account is in bank and subscribed correctly - no fetch needed if tracing::enabled!(tracing::Level::TRACE) { - let undelegating = account_in_bank.undelegating(); - let delegated = account_in_bank.delegated(); - let owner = account_in_bank.owner(); trace!( pubkey = %pubkey, - undelegating, - delegated, - owner = %owner, - "Account found in bank in valid state, no fetch needed" + "Undelegating account still valid, no fetch needed" ); } - in_bank.push(*pubkey); + in_bank.push(pubkey); } } } From 06ec7f9dc9c733aa7c56b620d70721fd4fb5ff45 Mon Sep 17 00:00:00 2001 From: Thorsten Lorenz Date: Thu, 26 Feb 2026 17:42:47 +0700 Subject: [PATCH 04/10] perf: parallelize ATA delegation record fetches Amp-Thread-ID: https://ampcode.com/threads/T-019c9986-8736-738c-9cdd-832b80351f1e Co-authored-by: Amp --- .../chainlink/fetch_cloner/ata_projection.rs | 62 ++++++++++++++----- 1 file changed, 46 insertions(+), 16 deletions(-) diff --git a/magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs b/magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs index 97b0ea353..b74dcc62c 100644 --- a/magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs +++ b/magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs @@ -1,3 +1,4 @@ +use dlp::state::DelegationRecord; use futures_util::future::join_all; use magicblock_accounts_db::traits::AccountsBank; use magicblock_core::token_programs::try_derive_eata_address_and_bump; @@ -10,7 +11,9 @@ use tracing::*; use super::{delegation, types::AccountWithCompanion, FetchCloner}; use crate::{ cloner::{AccountCloneRequest, Cloner}, - remote_account_provider::{ChainPubsubClient, ChainRpcClient}, + remote_account_provider::{ + ChainPubsubClient, ChainRpcClient, ResolvedAccountSharedData, + }, }; /// Resolves ATAs with eATA projection. @@ -92,6 +95,16 @@ where let ata_results = ata_join_set.join_all().await; + // Phase 1: Collect successfully resolved ATAs + struct AtaResolutionInput { + ata_pubkey: Pubkey, + ata_account: ResolvedAccountSharedData, + eata_pubkey: Pubkey, + eata_shared: Option, + } + + let mut ata_inputs: Vec = Vec::new(); + for result in ata_results { let AccountWithCompanion { pubkey: ata_pubkey, @@ -110,31 +123,48 @@ where } }; - // Defaults: clone the ATA as-is - let mut account_to_clone = ata_account.account_shared_data_cloned(); - let mut commit_frequency_ms = None; - let mut delegated_to_other = None; - - // If there's an eATA, try to use it + delegation record to project the ATA - if let Some(eata_acc) = maybe_eata_account { - let eata_shared = eata_acc.account_shared_data_cloned(); + let eata_shared = + maybe_eata_account.map(|e| e.account_shared_data_cloned()); + ata_inputs.push(AtaResolutionInput { + ata_pubkey, + ata_account, + eata_pubkey, + eata_shared, + }); + } - if let Some(deleg) = delegation::fetch_and_parse_delegation_record( + // Phase 2: Fetch delegation records in parallel for all eATAs + let deleg_futures = ata_inputs.iter().filter_map(|input| { + input.eata_shared.as_ref().map(|_| { + delegation::fetch_and_parse_delegation_record( this, - eata_pubkey, + input.eata_pubkey, this.remote_account_provider.chain_slot(), fetch_origin, ) - .await - { + }) + }); + let deleg_results: Vec> = + join_all(deleg_futures).await; + + // Phase 3: Combine results + let mut deleg_iter = deleg_results.into_iter(); + for input in ata_inputs { + let mut account_to_clone = + input.ata_account.account_shared_data_cloned(); + let mut commit_frequency_ms = None; + let mut delegated_to_other = None; + + if let Some(eata_shared) = &input.eata_shared { + if let Some(Some(deleg)) = deleg_iter.next() { delegated_to_other = delegation::get_delegated_to_other(this, &deleg); commit_frequency_ms = Some(deleg.commit_frequency_ms); if let Some(projected_ata) = this .maybe_project_delegated_ata_from_eata( - ata_account.account_shared_data(), - &eata_shared, + input.ata_account.account_shared_data(), + eata_shared, &deleg, ) { @@ -144,7 +174,7 @@ where } accounts_to_clone.push(AccountCloneRequest { - pubkey: ata_pubkey, + pubkey: input.ata_pubkey, account: account_to_clone, commit_frequency_ms, delegated_to_other, From 860bfd71e6a2fa18340d58ed55f2163dc0294367 Mon Sep 17 00:00:00 2001 From: Thorsten Lorenz Date: Thu, 26 Feb 2026 17:54:56 +0700 Subject: [PATCH 05/10] chore: managed JoinSet loop for subscription updates Amp-Thread-ID: https://ampcode.com/threads/T-019c998b-bb61-7326-8f9e-b41a03d09aff Co-authored-by: Amp --- .../src/chainlink/fetch_cloner/mod.rs | 329 ++++++++++-------- 1 file changed, 193 insertions(+), 136 deletions(-) diff --git a/magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs b/magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs index 9768bd14c..31f3e5193 100644 --- a/magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs +++ b/magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs @@ -175,161 +175,218 @@ where } } - /// Start listening to subscription updates + /// Start listening to subscription updates. + /// Uses a JoinSet-based loop with try_recv/select! for backpressure + /// and task lifecycle management instead of unbounded tokio::spawn. pub fn start_subscription_listener( self: Arc, mut subscription_updates: mpsc::Receiver, ) { tokio::spawn(async move { - while let Some(update) = subscription_updates.recv().await { - let pubkey = update.pubkey; - let slot = update.account.slot(); - trace!(pubkey = %pubkey, slot, "FetchCloner received subscription update"); - - // Process each subscription update concurrently to avoid blocking on delegation - // record fetches. This allows multiple updates to be processed in parallel. - let this = Arc::clone(&self); - tokio::spawn(async move { - let (resolved_account, deleg_record) = - this.resolve_account_to_clone_from_forwarded_sub_with_unsubscribe(update) - .await; - if let Some(account) = resolved_account { - // Ensure that the subscription update isn't out of order, i.e. we don't already - // hold a newer version of the account in our bank - let out_of_order_slot = this - .accounts_bank - .get_account(&pubkey) - .and_then(|in_bank| { - if in_bank.remote_slot() - >= account.remote_slot() - { - Some(in_bank.remote_slot()) - } else { - None - } - }); - if let Some(in_bank_slot) = out_of_order_slot { - let update_slot = account.remote_slot(); - trace!( - pubkey = %pubkey, - bank_slot = in_bank_slot, - update_slot, - "Ignoring out-of-order subscription update" - ); - return; - } + let mut pending_tasks: JoinSet<()> = JoinSet::new(); - if let Some(in_bank) = - this.accounts_bank.get_account(&pubkey) - { - if in_bank.delegated() && !in_bank.undelegating() { - this.unsubscribe_from_delegated_account(pubkey) - .await; - return; - } + loop { + match subscription_updates.try_recv() { + Ok(update) => { + let pubkey = update.pubkey; + trace!( + pubkey = %pubkey, + "FetchCloner received subscription update" + ); - if in_bank.undelegating() { - // We expect the account to still be delegated, but with the delegation - // program owner - debug!( - pubkey = %pubkey, - in_bank_delegated = in_bank.delegated(), - in_bank_owner = %in_bank.owner(), - in_bank_slot = in_bank.remote_slot(), - chain_delegated = account.delegated(), - chain_owner = %account.owner(), - chain_slot = account.remote_slot(), - "Received update for undelegating account" - ); + let this = Arc::clone(&self); + pending_tasks.spawn(async move { + Self::process_subscription_update( + &this, pubkey, update, + ) + .await; + }); - // This will only be true in the following case: - // 1. a commit was triggered for the account - // 2. a commit + undelegate was triggered for the account -> undelegating - // 3. we receive the update for (1.) - // - // Thus our state is more up to date and we don't need to update our - // bank. - if account_still_undelegating_on_chain( - &pubkey, - account.delegated(), - in_bank.remote_slot(), - deleg_record, - &this.validator_pubkey, - ) { - return; - } - } else if in_bank.owner().eq(&dlp::id()) { - debug!( - pubkey = %pubkey, - "Received update for account owned by delegation program but not marked as undelegating" + while let Some(result) = pending_tasks.try_join_next() { + if let Err(err) = result { + error!( + error = ?err, + "Subscription update task panicked" ); } - } else { - warn!( - pubkey = %pubkey, - "Received update for account not in bank" - ); } - - // Determine if delegated to another validator - let delegated_to_other = deleg_record - .as_ref() - .and_then(|dr| this.get_delegated_to_other(dr)); - let projected_ata_clone_request = this - .maybe_build_projected_ata_clone_request_from_eata_sub_update( - pubkey, - &account, - deleg_record.as_ref(), - ); - - // Once we clone an account that is delegated to us we no longer need - // to receive updates for it from chain - // The subscription will be turned back on once the committor service schedules - // a commit for it that includes undelegation - if account.delegated() { - this.unsubscribe_from_delegated_account(pubkey) - .await; - } - - if account.executable() { - this.handle_executable_sub_update(pubkey, account) - .await; - } else if let Err(err) = this - .cloner - .clone_account(AccountCloneRequest { - pubkey, - account, - commit_frequency_ms: None, - delegated_to_other, - }) - .await - { - error!( - pubkey = %pubkey, - error = %err, - "Failed to clone account into bank" - ); - } else if let Some(projected_ata_clone_request) = - projected_ata_clone_request - { - if let Err(err) = this - .cloner - .clone_account(projected_ata_clone_request) - .await + } + Err(mpsc::error::TryRecvError::Empty) => { + tokio::select! { + maybe_update = + subscription_updates.recv() => { - error!( - pubkey = %pubkey, - error = %err, - "Failed to clone projected ATA from delegated eATA update" - ); + let Some(update) = maybe_update else { + while pending_tasks + .join_next() + .await + .is_some() + {} + break; + }; + let pubkey = update.pubkey; + let this = Arc::clone(&self); + pending_tasks.spawn(async move { + Self::process_subscription_update( + &this, pubkey, update, + ) + .await; + }); + } + Some(result) = pending_tasks.join_next(), + if !pending_tasks.is_empty() => + { + if let Err(err) = result { + error!( + error = ?err, + "Subscription update task panicked" + ); + } } } } - }); + Err(mpsc::error::TryRecvError::Disconnected) => { + while pending_tasks.join_next().await.is_some() {} + break; + } + } } }); } + async fn process_subscription_update( + &self, + pubkey: Pubkey, + update: ForwardedSubscriptionUpdate, + ) { + let (resolved_account, deleg_record) = self + .resolve_account_to_clone_from_forwarded_sub_with_unsubscribe( + update, + ) + .await; + let Some(account) = resolved_account else { + return; + }; + + // Ensure that the subscription update isn't out of order, i.e. + // we don't already hold a newer version of the account in our bank + let out_of_order_slot = + self.accounts_bank.get_account(&pubkey).and_then(|in_bank| { + if in_bank.remote_slot() >= account.remote_slot() { + Some(in_bank.remote_slot()) + } else { + None + } + }); + if let Some(in_bank_slot) = out_of_order_slot { + let update_slot = account.remote_slot(); + trace!( + pubkey = %pubkey, + bank_slot = in_bank_slot, + update_slot, + "Ignoring out-of-order subscription update" + ); + return; + } + + if let Some(in_bank) = self.accounts_bank.get_account(&pubkey) { + if in_bank.delegated() && !in_bank.undelegating() { + self.unsubscribe_from_delegated_account(pubkey).await; + return; + } + + if in_bank.undelegating() { + debug!( + pubkey = %pubkey, + in_bank_delegated = in_bank.delegated(), + in_bank_owner = %in_bank.owner(), + in_bank_slot = in_bank.remote_slot(), + chain_delegated = account.delegated(), + chain_owner = %account.owner(), + chain_slot = account.remote_slot(), + "Received update for undelegating account" + ); + + // This will only be true in the following case: + // 1. a commit was triggered for the account + // 2. a commit + undelegate was triggered for the account -> undelegating + // 3. we receive the update for (1.) + // + // Thus our state is more up to date and we don't + // need to update our bank. + if account_still_undelegating_on_chain( + &pubkey, + account.delegated(), + in_bank.remote_slot(), + deleg_record, + &self.validator_pubkey, + ) { + return; + } + } else if in_bank.owner().eq(&dlp::id()) { + debug!( + pubkey = %pubkey, + "Received update for account owned by delegation program but not marked as undelegating" + ); + } + } else { + warn!( + pubkey = %pubkey, + "Received update for account not in bank" + ); + } + + // Determine if delegated to another validator + let delegated_to_other = deleg_record + .as_ref() + .and_then(|dr| self.get_delegated_to_other(dr)); + let projected_ata_clone_request = self + .maybe_build_projected_ata_clone_request_from_eata_sub_update( + pubkey, + &account, + deleg_record.as_ref(), + ); + + // Once we clone an account that is delegated to us we no + // longer need to receive updates for it from chain. + // The subscription will be turned back on once the committor + // service schedules a commit for it that includes undelegation. + if account.delegated() { + self.unsubscribe_from_delegated_account(pubkey).await; + } + + if account.executable() { + self.handle_executable_sub_update(pubkey, account).await; + } else if let Err(err) = self + .cloner + .clone_account(AccountCloneRequest { + pubkey, + account, + commit_frequency_ms: None, + delegated_to_other, + }) + .await + { + error!( + pubkey = %pubkey, + error = %err, + "Failed to clone account into bank" + ); + } else if let Some(projected_ata_clone_request) = + projected_ata_clone_request + { + if let Err(err) = + self.cloner.clone_account(projected_ata_clone_request).await + { + error!( + pubkey = %pubkey, + error = %err, + "Failed to clone projected ATA from delegated eATA update" + ); + } + } + } + async fn handle_executable_sub_update( &self, pubkey: Pubkey, From 26877c4e7cc5bd454e652866e97b44eef6721465 Mon Sep 17 00:00:00 2001 From: Thorsten Lorenz Date: Fri, 27 Feb 2026 12:01:02 +0700 Subject: [PATCH 06/10] fix: rollback fetching on sub setup fail Amp-Thread-ID: https://ampcode.com/threads/T-019c9d76-c925-71e1-a221-6510804f323e Co-authored-by: Amp --- .../src/remote_account_provider/mod.rs | 33 ++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/magicblock-chainlink/src/remote_account_provider/mod.rs b/magicblock-chainlink/src/remote_account_provider/mod.rs index 68d9da3dc..cc3d34b74 100644 --- a/magicblock-chainlink/src/remote_account_provider/mod.rs +++ b/magicblock-chainlink/src/remote_account_provider/mod.rs @@ -715,6 +715,10 @@ impl RemoteAccountProvider { let fetch_start_slot = fetch_start_slot.unwrap_or_else(|| self.chain_slot.load()); + // Track which pubkeys we created new entries for (Vacant) + // so we can roll them back if setup_subscriptions fails. + let mut newly_inserted = Vec::new(); + { let mut fetching = self.fetching_accounts.lock().unwrap(); for &pubkey in pubkeys { @@ -725,6 +729,7 @@ impl RemoteAccountProvider { } Entry::Vacant(entry) => { entry.insert((fetch_start_slot, vec![sender])); + newly_inserted.push(pubkey); } } subscription_overrides.push((pubkey, receiver)); @@ -732,7 +737,33 @@ impl RemoteAccountProvider { } // Setup subscriptions first (to catch updates during fetch) - self.setup_subscriptions(&subscription_overrides).await?; + if let Err(err) = + self.setup_subscriptions(&subscription_overrides).await + { + // Rollback fetching_accounts entries we created to prevent + // accounts being stuck as pending indefinitely. We only + // remove entries we created (Vacant); entries that already + // existed (Occupied) belong to other callers whose fetch + // will clean them up. + let mut fetching = self.fetching_accounts.lock().unwrap(); + for pubkey in &newly_inserted { + if let Some((_, senders)) = fetching.remove(pubkey) { + for sender in senders { + let _ = sender.send(Err( + RemoteAccountProviderError::AccountSubscriptionsTaskFailed( + format!( + "{}: subscription setup failed, rolling back", + pubkey + ), + ), + )); + } + } + } + // Receivers in subscription_overrides are dropped here, + // closing any pending channels + return Err(err); + } // Start the fetch let min_context_slot = fetch_start_slot; From a992eb846a5a2098a8d91c7cf44d1d263b9b7fdd Mon Sep 17 00:00:00 2001 From: Thorsten Lorenz Date: Fri, 27 Feb 2026 12:20:36 +0700 Subject: [PATCH 07/10] perf: deduplicate pubkeys before ATA subscription Amp-Thread-ID: https://ampcode.com/threads/T-019c9d83-feab-7369-88f4-8f80eb557a6c Co-authored-by: Amp --- .../src/chainlink/fetch_cloner/ata_projection.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs b/magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs index b74dcc62c..0a856a7f2 100644 --- a/magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs +++ b/magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs @@ -5,6 +5,7 @@ use magicblock_core::token_programs::try_derive_eata_address_and_bump; use magicblock_metrics::metrics; use solana_account::AccountSharedData; use solana_pubkey::Pubkey; +use std::collections::HashSet; use tokio::task::JoinSet; use tracing::*; @@ -73,6 +74,13 @@ where } } + // Deduplicate pubkeys to avoid redundant subscribe calls + pubkeys_to_subscribe = pubkeys_to_subscribe + .into_iter() + .collect::>() + .into_iter() + .collect(); + // Subscribe to all ATA and eATA accounts in parallel let subscription_results = join_all( pubkeys_to_subscribe From 1f7eae6baca2d1aace4acc4619cdde3ba77078ad Mon Sep 17 00:00:00 2001 From: Thorsten Lorenz Date: Fri, 27 Feb 2026 12:25:24 +0700 Subject: [PATCH 08/10] fix: queue ATAs for cloning when eATA derivation fails Amp-Thread-ID: https://ampcode.com/threads/T-019c9d8c-8ed7-7688-8419-a546c109b5d6 Co-authored-by: Amp --- .../chainlink/fetch_cloner/ata_projection.rs | 23 +++++++++++++++---- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs b/magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs index 0a856a7f2..fe03492c7 100644 --- a/magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs +++ b/magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs @@ -53,17 +53,18 @@ where // Collect ATA pubkey for subscription pubkeys_to_subscribe.push(*ata_pubkey); + let effective_slot = if let Some(min_slot) = min_context_slot { + min_slot.max(*ata_account_slot) + } else { + *ata_account_slot + }; + if let Some((eata, _)) = try_derive_eata_address_and_bump(&ata_info.owner, &ata_info.mint) { // Collect eATA pubkey for subscription pubkeys_to_subscribe.push(eata); - let effective_slot = if let Some(min_slot) = min_context_slot { - min_slot.max(*ata_account_slot) - } else { - *ata_account_slot - }; ata_join_set.spawn(FetchCloner::task_to_fetch_with_companion( this, *ata_pubkey, @@ -71,6 +72,18 @@ where effective_slot, fetch_origin, )); + } else { + // eATA derivation failed, but still queue the ATA for cloning + // without a companion by using a dummy companion pubkey + // The resolve_account_with_companion logic handles the case + // where the companion is not found + ata_join_set.spawn(FetchCloner::task_to_fetch_with_companion( + this, + *ata_pubkey, + Pubkey::default(), // Dummy companion - will be marked as NotFound + effective_slot, + fetch_origin, + )); } } From 921e9aeab37064c64ed9368914e91d8b74740faa Mon Sep 17 00:00:00 2001 From: Thorsten Lorenz Date: Fri, 27 Feb 2026 12:25:50 +0700 Subject: [PATCH 09/10] chore: fmt --- .../src/chainlink/fetch_cloner/ata_projection.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs b/magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs index fe03492c7..a364d89f1 100644 --- a/magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs +++ b/magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs @@ -1,3 +1,5 @@ +use std::collections::HashSet; + use dlp::state::DelegationRecord; use futures_util::future::join_all; use magicblock_accounts_db::traits::AccountsBank; @@ -5,7 +7,6 @@ use magicblock_core::token_programs::try_derive_eata_address_and_bump; use magicblock_metrics::metrics; use solana_account::AccountSharedData; use solana_pubkey::Pubkey; -use std::collections::HashSet; use tokio::task::JoinSet; use tracing::*; From 1484c1f184766f86093c845242d91e7984688a61 Mon Sep 17 00:00:00 2001 From: Thorsten Lorenz Date: Fri, 27 Feb 2026 12:27:32 +0700 Subject: [PATCH 10/10] chore: expect mutex lock (instead of unwrap) --- magicblock-chainlink/src/remote_account_provider/mod.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/magicblock-chainlink/src/remote_account_provider/mod.rs b/magicblock-chainlink/src/remote_account_provider/mod.rs index cc3d34b74..aad27ab05 100644 --- a/magicblock-chainlink/src/remote_account_provider/mod.rs +++ b/magicblock-chainlink/src/remote_account_provider/mod.rs @@ -745,7 +745,10 @@ impl RemoteAccountProvider { // remove entries we created (Vacant); entries that already // existed (Occupied) belong to other callers whose fetch // will clean them up. - let mut fetching = self.fetching_accounts.lock().unwrap(); + let mut fetching = self + .fetching_accounts + .lock() + .expect("fetching_accounts lock poisoned"); for pubkey in &newly_inserted { if let Some((_, senders)) = fetching.remove(pubkey) { for sender in senders {