Skip to content

perf: parallelize subscribe/unsubscribe calls in chainlink#1002

Draft
thlorenz wants to merge 5 commits intomasterfrom
thlorenz/parallel-subs
Draft

perf: parallelize subscribe/unsubscribe calls in chainlink#1002
thlorenz wants to merge 5 commits intomasterfrom
thlorenz/parallel-subs

Conversation

@thlorenz
Copy link
Collaborator

@thlorenz thlorenz commented Feb 26, 2026

Summary

Parallelizes sequential subscribe/unsubscribe calls in magicblock-chainlink to reduce latency when
fetching and cloning multiple accounts. Each subscribe call involves a network roundtrip; doing them
sequentially causes multiplicative delays that grow with the number of accounts in a batch.

Details

RemoteAccountProvider::setup_subscriptionsjoin_all

Replaced the sequential for loop (with early ? abort) with futures_util::future::join_all so
all subscription requests fly in parallel. Uses join_all (not try_join_all) to ensure every
subscribe attempt completes — preventing resource leaks in fetching_accounts and ensuring all
oneshot receivers get a response. Errors are collected and reported as a single
AccountSubscriptionsTaskFailed error.

resolve_ata_with_eata_projection — parallel ATA/eATA subscriptions + delegation record fetches

Collected ATA and eATA pubkeys first, then subscribed to all of them in a single join_all batch
instead of awaiting each inside the loop.

Additionally, after joining the ATA/eATA fetch tasks (already parallel via JoinSet), the
delegation record fetches — each a full subscribe→fetch→unsubscribe RPC cycle — are now batched
with join_all instead of being called sequentially per ATA result.

Undelegation refresh checks — sync check + parallel JoinSet

Split the per-account loop into two phases: (1) a synchronous bank lookup that immediately resolves
non-undelegating accounts, and (2) a JoinSet that runs all undelegating-account refresh checks
(each with a 5-second timeout RPC call) in parallel. This avoids multiplicative timeout costs when
a batch contains several undelegating accounts.

Also replaced FetchCloner's derive(Clone) with a manual Clone impl — the derive adds
V: Clone, C: Clone bounds that AccountsBank and Cloner don't satisfy; all fields are behind
Arc so the manual impl is trivially correct.

start_subscription_listener — managed JoinSet loop

Replaced unbounded tokio::spawn (no tracking, no backpressure) with a JoinSet-based loop using
try_recv/select!. This provides task lifecycle management and natural backpressure under high
subscription-update volume. Extracted the closure body into a process_subscription_update method
for readability.

Summary by CodeRabbit

  • Refactor
    • Enhanced account subscription and delegation handling with parallel execution
    • Improved error aggregation and consolidated error reporting across workflows
    • Optimized account cloning pipeline with staged processing phases
    • Strengthened concurrency patterns in subscription listener operations

@github-actions
Copy link

Manual Deploy Available

You can trigger a manual deploy of this PR branch to testnet:

Deploy to Testnet 🚀

Alternative: Comment /deploy on this PR to trigger deployment directly.

⚠️ Note: Manual deploy requires authorization. Only authorized users can trigger deployments.

Comment updated automatically when the PR is synchronized.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 26, 2026

📝 Walkthrough

Walkthrough

This pull request refactors the account cloning and subscription handling pipeline from sequential, inline operations to parallel, multi-phase execution patterns. Changes include: (1) in ata_projection.rs, restructuring ATA and eATA subscription and delegation handling into three distinct phases—input collection, parallel delegation fetch, and delegation application; (2) in fetch_cloner/mod.rs, introducing a manual Clone implementation, converting the subscription listener to JoinSet-based concurrency with a new process_subscription_update helper, and separating undelegation checks into parallel phases; (3) in remote_account_provider/mod.rs, switching from sequential to parallel subscription calls with consolidated error reporting. The changes emphasize concurrent execution across multiple stages rather than blocking on individual operations.

Suggested reviewers

  • GabrielePicco
  • bmuddha
  • Dodecahedr0x
✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch thlorenz/parallel-subs

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs (2)

48-83: 🧹 Nitpick | 🔵 Trivial

Deduplicate pubkeys_to_subscribe before batching subscriptions.

The current vector can contain duplicates, causing redundant subscribe calls and avoidable contention.

♻️ Suggested dedup before join_all
+use std::collections::HashSet;
+
 use dlp::state::DelegationRecord;
 use futures_util::future::join_all;
@@
-    let mut pubkeys_to_subscribe = vec![];
+    let mut pubkeys_to_subscribe = vec![];
@@
+    let mut seen_pubkeys = HashSet::new();
+    pubkeys_to_subscribe.retain(|pk| seen_pubkeys.insert(*pk));
+
     // Subscribe to all ATA and eATA accounts in parallel
     let subscription_results = join_all(
         pubkeys_to_subscribe
             .iter()
             .map(|pk| this.subscribe_to_account(pk)),
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs` around
lines 48 - 83, pubkeys_to_subscribe can contain duplicates (same ATA and eATA)
causing redundant subscribe calls; before calling
join_all/this.subscribe_to_account, deduplicate the vector (e.g., convert to a
HashSet or sort_unstable() + dedup()) and then collect back into a Vec to
iterate. Update the code around pubkeys_to_subscribe (the collection before
join_all) so you dedupe after the for loop and before
join_all/this.subscribe_to_account is invoked, leaving
try_derive_eata_address_and_bump, ata_join_set.spawn, and
FetchCloner::task_to_fetch_with_companion unchanged.

55-74: ⚠️ Potential issue | 🟡 Minor

Handle the None eATA-derivation branch to avoid dropping ATA clones.

When try_derive_eata_address_and_bump returns None, this ATA is not queued for fetch/projection and never added to accounts_to_clone.

💡 Suggested fallback
-    for (ata_pubkey, _, ata_info, ata_account_slot) in &atas {
+    for (ata_pubkey, ata_account, ata_info, ata_account_slot) in &atas {
         // Collect ATA pubkey for subscription
         pubkeys_to_subscribe.push(*ata_pubkey);

         if let Some((eata, _)) =
             try_derive_eata_address_and_bump(&ata_info.owner, &ata_info.mint)
         {
             // Collect eATA pubkey for subscription
             pubkeys_to_subscribe.push(eata);

             let effective_slot = if let Some(min_slot) = min_context_slot {
                 min_slot.max(*ata_account_slot)
             } else {
                 *ata_account_slot
             };
             ata_join_set.spawn(FetchCloner::task_to_fetch_with_companion(
                 this,
                 *ata_pubkey,
                 eata,
                 effective_slot,
                 fetch_origin,
             ));
+        } else {
+            accounts_to_clone.push(AccountCloneRequest {
+                pubkey: *ata_pubkey,
+                account: ata_account.clone(),
+                commit_frequency_ms: None,
+                delegated_to_other: None,
+            });
         }
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs` around
lines 55 - 74, The current code ignores ATAs when
try_derive_eata_address_and_bump(&ata_info.owner, &ata_info.mint) returns None,
dropping them from processing; add an else branch that ensures those ATAs are
still queued for cloning/subscription: push the ATA pubkey into
accounts_to_clone (and/or pubkeys_to_subscribe if appropriate) and spawn a fetch
task via ata_join_set.spawn using FetchCloner::task_to_fetch_with_companion (or
an equivalent fetch task) with the same effective_slot and fetch_origin but
without an eATA companion; ensure references to ata_pubkey, ata_account_slot,
fetch_origin, pubkeys_to_subscribe, accounts_to_clone, ata_join_set.spawn, and
FetchCloner::task_to_fetch_with_companion are used so ATAs are not dropped when
derivation returns None.
magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs (1)

1074-1168: 🧹 Nitpick | 🔵 Trivial

Use HashSet for in_bank membership to avoid O(n²) filtering.

pubkeys.retain(|p| !in_bank.contains(p)) on a Vec is quadratic for larger batches.

♻️ Suggested membership refactor
-        let mut in_bank = vec![];
+        let mut in_bank: HashSet<Pubkey> = HashSet::new();
@@
-                    in_bank.push(**pubkey);
+                    in_bank.insert(**pubkey);
@@
-                        in_bank.push(pubkey);
+                        in_bank.insert(pubkey);
@@
-        pubkeys.retain(|p| !in_bank.contains(p));
+        pubkeys.retain(|p| !in_bank.contains(*p));
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs` around lines 1074 -
1168, The membership check is currently O(n²) because in_bank is a Vec; change
in_bank to a HashSet<Pubkey> (use std::collections::HashSet) and replace all
in_bank.push(...) calls with in_bank.insert(...), i.e. update the synchronous
branch that currently does in_bank.push(**pubkey) and the undelegating JoinSet
result branch that does in_bank.push(pubkey) to insert the Pubkey into the set,
then change pubkeys.retain(|p| !in_bank.contains(p)) to use the HashSet contains
as-is; ensure type conversions match Pubkey ownership when inserting and when
checking contains.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs`:
- Around line 188-206: The loop currently spawns a new task into pending_tasks
for every subscription update (pending_tasks.spawn(...) inside the
subscription_updates.try_recv loop), which can let the JoinSet grow unbounded
under sustained load; fix it by adding a bounded-concurrency guard (e.g., an
Arc<Semaphore> or explicit cap on pending_tasks.len()) that must acquire a
permit before spawning and releases it when the spawned
process_subscription_update future completes, or by waiting when the cap is
reached (await permit or await try_join_next) so you never spawn beyond the
limit; apply the same bounded-concurrency change to the other identical spawn
site that uses pending_tasks.spawn(...) and subscription_updates.try_recv so
both locations use the shared semaphore/cap before spawning.

In `@magicblock-chainlink/src/remote_account_provider/mod.rs`:
- Around line 839-849: The failure return path that constructs
RemoteAccountProviderError::AccountSubscriptionsTaskFailed currently returns
while leaving entries inserted into fetching_accounts (from try_get_multi),
which can leave accounts stuck pending and accumulate dead request senders;
before returning the Err (where errors.is_empty() is checked and errors.join is
used), explicitly rollback those inserts by removing the corresponding keys from
fetching_accounts (and drop/close any pending request senders if applicable) so
the pending state and senders are cleaned up prior to returning the error.

---

Outside diff comments:
In `@magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs`:
- Around line 48-83: pubkeys_to_subscribe can contain duplicates (same ATA and
eATA) causing redundant subscribe calls; before calling
join_all/this.subscribe_to_account, deduplicate the vector (e.g., convert to a
HashSet or sort_unstable() + dedup()) and then collect back into a Vec to
iterate. Update the code around pubkeys_to_subscribe (the collection before
join_all) so you dedupe after the for loop and before
join_all/this.subscribe_to_account is invoked, leaving
try_derive_eata_address_and_bump, ata_join_set.spawn, and
FetchCloner::task_to_fetch_with_companion unchanged.
- Around line 55-74: The current code ignores ATAs when
try_derive_eata_address_and_bump(&ata_info.owner, &ata_info.mint) returns None,
dropping them from processing; add an else branch that ensures those ATAs are
still queued for cloning/subscription: push the ATA pubkey into
accounts_to_clone (and/or pubkeys_to_subscribe if appropriate) and spawn a fetch
task via ata_join_set.spawn using FetchCloner::task_to_fetch_with_companion (or
an equivalent fetch task) with the same effective_slot and fetch_origin but
without an eATA companion; ensure references to ata_pubkey, ata_account_slot,
fetch_origin, pubkeys_to_subscribe, accounts_to_clone, ata_join_set.spawn, and
FetchCloner::task_to_fetch_with_companion are used so ATAs are not dropped when
derivation returns None.

In `@magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs`:
- Around line 1074-1168: The membership check is currently O(n²) because in_bank
is a Vec; change in_bank to a HashSet<Pubkey> (use std::collections::HashSet)
and replace all in_bank.push(...) calls with in_bank.insert(...), i.e. update
the synchronous branch that currently does in_bank.push(**pubkey) and the
undelegating JoinSet result branch that does in_bank.push(pubkey) to insert the
Pubkey into the set, then change pubkeys.retain(|p| !in_bank.contains(p)) to use
the HashSet contains as-is; ensure type conversions match Pubkey ownership when
inserting and when checking contains.

ℹ️ Review info

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1271b9f and 860bfd7.

📒 Files selected for processing (3)
  • magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs
  • magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs
  • magicblock-chainlink/src/remote_account_provider/mod.rs

Comment on lines +188 to +206
loop {
match subscription_updates.try_recv() {
Ok(update) => {
let pubkey = update.pubkey;
trace!(
pubkey = %pubkey,
"FetchCloner received subscription update"
);

if in_bank.undelegating() {
// We expect the account to still be delegated, but with the delegation
// program owner
debug!(
pubkey = %pubkey,
in_bank_delegated = in_bank.delegated(),
in_bank_owner = %in_bank.owner(),
in_bank_slot = in_bank.remote_slot(),
chain_delegated = account.delegated(),
chain_owner = %account.owner(),
chain_slot = account.remote_slot(),
"Received update for undelegating account"
);
let this = Arc::clone(&self);
pending_tasks.spawn(async move {
Self::process_subscription_update(
&this, pubkey, update,
)
.await;
});

// This will only be true in the following case:
// 1. a commit was triggered for the account
// 2. a commit + undelegate was triggered for the account -> undelegating
// 3. we receive the update for (1.)
//
// Thus our state is more up to date and we don't need to update our
// bank.
if account_still_undelegating_on_chain(
&pubkey,
account.delegated(),
in_bank.remote_slot(),
deleg_record,
&this.validator_pubkey,
) {
return;
}
} else if in_bank.owner().eq(&dlp::id()) {
debug!(
pubkey = %pubkey,
"Received update for account owned by delegation program but not marked as undelegating"
while let Some(result) = pending_tasks.try_join_next() {
if let Err(err) = result {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Task backlog is still effectively unbounded under sustained update load.

pending_tasks.spawn(...) runs for each drained update without a concurrency cap, so pressure shifts from the channel buffer to JoinSet growth.

💡 Suggested bounded-concurrency guard
     pub fn start_subscription_listener(
         self: Arc<Self>,
         mut subscription_updates: mpsc::Receiver<ForwardedSubscriptionUpdate>,
     ) {
         tokio::spawn(async move {
+            const MAX_PENDING_SUB_UPDATE_TASKS: usize = 512;
             let mut pending_tasks: JoinSet<()> = JoinSet::new();

             loop {
                 match subscription_updates.try_recv() {
                     Ok(update) => {
+                        while pending_tasks.len() >= MAX_PENDING_SUB_UPDATE_TASKS {
+                            if let Some(result) = pending_tasks.join_next().await {
+                                if let Err(err) = result {
+                                    error!(error = ?err, "Subscription update task panicked");
+                                }
+                            }
+                        }
                         let pubkey = update.pubkey;
                         trace!(
                             pubkey = %pubkey,
                             "FetchCloner received subscription update"
                         );
@@
                                 let Some(update) = maybe_update else {
@@
                                     break;
                                 };
+                                while pending_tasks.len() >= MAX_PENDING_SUB_UPDATE_TASKS {
+                                    if let Some(result) = pending_tasks.join_next().await {
+                                        if let Err(err) = result {
+                                            error!(error = ?err, "Subscription update task panicked");
+                                        }
+                                    }
+                                }
                                 let pubkey = update.pubkey;
                                 let this = Arc::clone(&self);
                                 pending_tasks.spawn(async move {

Also applies to: 229-234

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs` around lines 188 -
206, The loop currently spawns a new task into pending_tasks for every
subscription update (pending_tasks.spawn(...) inside the
subscription_updates.try_recv loop), which can let the JoinSet grow unbounded
under sustained load; fix it by adding a bounded-concurrency guard (e.g., an
Arc<Semaphore> or explicit cap on pending_tasks.len()) that must acquire a
permit before spawning and releases it when the spawned
process_subscription_update future completes, or by waiting when the cap is
reached (await permit or await try_join_next) so you never spawn beyond the
limit; apply the same bounded-concurrency change to the other identical spawn
site that uses pending_tasks.spawn(...) and subscription_updates.try_recv so
both locations use the shared semaphore/cap before spawning.

Comment on lines +839 to +849
// Fail if ANY subscription failed
if !errors.is_empty() {
return Err(
RemoteAccountProviderError::AccountSubscriptionsTaskFailed(
format!(
"{} subscription(s) failed: [{}]",
errors.len(),
errors.join(", ")
),
),
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Subscription setup failure path still risks stale pending state.

This error return can leave entries previously inserted in fetching_accounts (from try_get_multi) without immediate rollback, which can keep accounts falsely pending and accumulate dead request senders across repeated failures.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-chainlink/src/remote_account_provider/mod.rs` around lines 839 -
849, The failure return path that constructs
RemoteAccountProviderError::AccountSubscriptionsTaskFailed currently returns
while leaving entries inserted into fetching_accounts (from try_get_multi),
which can leave accounts stuck pending and accumulate dead request senders;
before returning the Err (where errors.is_empty() is checked and errors.join is
used), explicitly rollback those inserts by removing the corresponding keys from
fetching_accounts (and drop/close any pending request senders if applicable) so
the pending state and senders are cleaned up prior to returning the error.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant