perf: parallelize subscribe/unsubscribe calls in chainlink#1002
perf: parallelize subscribe/unsubscribe calls in chainlink#1002
Conversation
Amp-Thread-ID: https://ampcode.com/threads/T-019c9925-c886-7351-b769-7a5523b4dc9d Co-authored-by: Amp <amp@ampcode.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019c9963-e3e2-7586-b020-4da38d680d7c Co-authored-by: Amp <amp@ampcode.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019c9969-8b24-73cc-8613-af33967c16ed Co-authored-by: Amp <amp@ampcode.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019c9986-8736-738c-9cdd-832b80351f1e Co-authored-by: Amp <amp@ampcode.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019c998b-bb61-7326-8f9e-b41a03d09aff Co-authored-by: Amp <amp@ampcode.com>
Manual Deploy AvailableYou can trigger a manual deploy of this PR branch to testnet: Alternative: Comment
Comment updated automatically when the PR is synchronized. |
📝 WalkthroughWalkthroughThis pull request refactors the account cloning and subscription handling pipeline from sequential, inline operations to parallel, multi-phase execution patterns. Changes include: (1) in ata_projection.rs, restructuring ATA and eATA subscription and delegation handling into three distinct phases—input collection, parallel delegation fetch, and delegation application; (2) in fetch_cloner/mod.rs, introducing a manual Clone implementation, converting the subscription listener to JoinSet-based concurrency with a new process_subscription_update helper, and separating undelegation checks into parallel phases; (3) in remote_account_provider/mod.rs, switching from sequential to parallel subscription calls with consolidated error reporting. The changes emphasize concurrent execution across multiple stages rather than blocking on individual operations. Suggested reviewers
✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs (2)
48-83: 🧹 Nitpick | 🔵 TrivialDeduplicate
pubkeys_to_subscribebefore batching subscriptions.The current vector can contain duplicates, causing redundant subscribe calls and avoidable contention.
♻️ Suggested dedup before
join_all+use std::collections::HashSet; + use dlp::state::DelegationRecord; use futures_util::future::join_all; @@ - let mut pubkeys_to_subscribe = vec![]; + let mut pubkeys_to_subscribe = vec![]; @@ + let mut seen_pubkeys = HashSet::new(); + pubkeys_to_subscribe.retain(|pk| seen_pubkeys.insert(*pk)); + // Subscribe to all ATA and eATA accounts in parallel let subscription_results = join_all( pubkeys_to_subscribe .iter() .map(|pk| this.subscribe_to_account(pk)),🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs` around lines 48 - 83, pubkeys_to_subscribe can contain duplicates (same ATA and eATA) causing redundant subscribe calls; before calling join_all/this.subscribe_to_account, deduplicate the vector (e.g., convert to a HashSet or sort_unstable() + dedup()) and then collect back into a Vec to iterate. Update the code around pubkeys_to_subscribe (the collection before join_all) so you dedupe after the for loop and before join_all/this.subscribe_to_account is invoked, leaving try_derive_eata_address_and_bump, ata_join_set.spawn, and FetchCloner::task_to_fetch_with_companion unchanged.
55-74:⚠️ Potential issue | 🟡 MinorHandle the
NoneeATA-derivation branch to avoid dropping ATA clones.When
try_derive_eata_address_and_bumpreturnsNone, this ATA is not queued for fetch/projection and never added toaccounts_to_clone.💡 Suggested fallback
- for (ata_pubkey, _, ata_info, ata_account_slot) in &atas { + for (ata_pubkey, ata_account, ata_info, ata_account_slot) in &atas { // Collect ATA pubkey for subscription pubkeys_to_subscribe.push(*ata_pubkey); if let Some((eata, _)) = try_derive_eata_address_and_bump(&ata_info.owner, &ata_info.mint) { // Collect eATA pubkey for subscription pubkeys_to_subscribe.push(eata); let effective_slot = if let Some(min_slot) = min_context_slot { min_slot.max(*ata_account_slot) } else { *ata_account_slot }; ata_join_set.spawn(FetchCloner::task_to_fetch_with_companion( this, *ata_pubkey, eata, effective_slot, fetch_origin, )); + } else { + accounts_to_clone.push(AccountCloneRequest { + pubkey: *ata_pubkey, + account: ata_account.clone(), + commit_frequency_ms: None, + delegated_to_other: None, + }); } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs` around lines 55 - 74, The current code ignores ATAs when try_derive_eata_address_and_bump(&ata_info.owner, &ata_info.mint) returns None, dropping them from processing; add an else branch that ensures those ATAs are still queued for cloning/subscription: push the ATA pubkey into accounts_to_clone (and/or pubkeys_to_subscribe if appropriate) and spawn a fetch task via ata_join_set.spawn using FetchCloner::task_to_fetch_with_companion (or an equivalent fetch task) with the same effective_slot and fetch_origin but without an eATA companion; ensure references to ata_pubkey, ata_account_slot, fetch_origin, pubkeys_to_subscribe, accounts_to_clone, ata_join_set.spawn, and FetchCloner::task_to_fetch_with_companion are used so ATAs are not dropped when derivation returns None.magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs (1)
1074-1168: 🧹 Nitpick | 🔵 TrivialUse
HashSetforin_bankmembership to avoid O(n²) filtering.
pubkeys.retain(|p| !in_bank.contains(p))on aVecis quadratic for larger batches.♻️ Suggested membership refactor
- let mut in_bank = vec![]; + let mut in_bank: HashSet<Pubkey> = HashSet::new(); @@ - in_bank.push(**pubkey); + in_bank.insert(**pubkey); @@ - in_bank.push(pubkey); + in_bank.insert(pubkey); @@ - pubkeys.retain(|p| !in_bank.contains(p)); + pubkeys.retain(|p| !in_bank.contains(*p));🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs` around lines 1074 - 1168, The membership check is currently O(n²) because in_bank is a Vec; change in_bank to a HashSet<Pubkey> (use std::collections::HashSet) and replace all in_bank.push(...) calls with in_bank.insert(...), i.e. update the synchronous branch that currently does in_bank.push(**pubkey) and the undelegating JoinSet result branch that does in_bank.push(pubkey) to insert the Pubkey into the set, then change pubkeys.retain(|p| !in_bank.contains(p)) to use the HashSet contains as-is; ensure type conversions match Pubkey ownership when inserting and when checking contains.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs`:
- Around line 188-206: The loop currently spawns a new task into pending_tasks
for every subscription update (pending_tasks.spawn(...) inside the
subscription_updates.try_recv loop), which can let the JoinSet grow unbounded
under sustained load; fix it by adding a bounded-concurrency guard (e.g., an
Arc<Semaphore> or explicit cap on pending_tasks.len()) that must acquire a
permit before spawning and releases it when the spawned
process_subscription_update future completes, or by waiting when the cap is
reached (await permit or await try_join_next) so you never spawn beyond the
limit; apply the same bounded-concurrency change to the other identical spawn
site that uses pending_tasks.spawn(...) and subscription_updates.try_recv so
both locations use the shared semaphore/cap before spawning.
In `@magicblock-chainlink/src/remote_account_provider/mod.rs`:
- Around line 839-849: The failure return path that constructs
RemoteAccountProviderError::AccountSubscriptionsTaskFailed currently returns
while leaving entries inserted into fetching_accounts (from try_get_multi),
which can leave accounts stuck pending and accumulate dead request senders;
before returning the Err (where errors.is_empty() is checked and errors.join is
used), explicitly rollback those inserts by removing the corresponding keys from
fetching_accounts (and drop/close any pending request senders if applicable) so
the pending state and senders are cleaned up prior to returning the error.
---
Outside diff comments:
In `@magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs`:
- Around line 48-83: pubkeys_to_subscribe can contain duplicates (same ATA and
eATA) causing redundant subscribe calls; before calling
join_all/this.subscribe_to_account, deduplicate the vector (e.g., convert to a
HashSet or sort_unstable() + dedup()) and then collect back into a Vec to
iterate. Update the code around pubkeys_to_subscribe (the collection before
join_all) so you dedupe after the for loop and before
join_all/this.subscribe_to_account is invoked, leaving
try_derive_eata_address_and_bump, ata_join_set.spawn, and
FetchCloner::task_to_fetch_with_companion unchanged.
- Around line 55-74: The current code ignores ATAs when
try_derive_eata_address_and_bump(&ata_info.owner, &ata_info.mint) returns None,
dropping them from processing; add an else branch that ensures those ATAs are
still queued for cloning/subscription: push the ATA pubkey into
accounts_to_clone (and/or pubkeys_to_subscribe if appropriate) and spawn a fetch
task via ata_join_set.spawn using FetchCloner::task_to_fetch_with_companion (or
an equivalent fetch task) with the same effective_slot and fetch_origin but
without an eATA companion; ensure references to ata_pubkey, ata_account_slot,
fetch_origin, pubkeys_to_subscribe, accounts_to_clone, ata_join_set.spawn, and
FetchCloner::task_to_fetch_with_companion are used so ATAs are not dropped when
derivation returns None.
In `@magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs`:
- Around line 1074-1168: The membership check is currently O(n²) because in_bank
is a Vec; change in_bank to a HashSet<Pubkey> (use std::collections::HashSet)
and replace all in_bank.push(...) calls with in_bank.insert(...), i.e. update
the synchronous branch that currently does in_bank.push(**pubkey) and the
undelegating JoinSet result branch that does in_bank.push(pubkey) to insert the
Pubkey into the set, then change pubkeys.retain(|p| !in_bank.contains(p)) to use
the HashSet contains as-is; ensure type conversions match Pubkey ownership when
inserting and when checking contains.
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (3)
magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rsmagicblock-chainlink/src/chainlink/fetch_cloner/mod.rsmagicblock-chainlink/src/remote_account_provider/mod.rs
| loop { | ||
| match subscription_updates.try_recv() { | ||
| Ok(update) => { | ||
| let pubkey = update.pubkey; | ||
| trace!( | ||
| pubkey = %pubkey, | ||
| "FetchCloner received subscription update" | ||
| ); | ||
|
|
||
| if in_bank.undelegating() { | ||
| // We expect the account to still be delegated, but with the delegation | ||
| // program owner | ||
| debug!( | ||
| pubkey = %pubkey, | ||
| in_bank_delegated = in_bank.delegated(), | ||
| in_bank_owner = %in_bank.owner(), | ||
| in_bank_slot = in_bank.remote_slot(), | ||
| chain_delegated = account.delegated(), | ||
| chain_owner = %account.owner(), | ||
| chain_slot = account.remote_slot(), | ||
| "Received update for undelegating account" | ||
| ); | ||
| let this = Arc::clone(&self); | ||
| pending_tasks.spawn(async move { | ||
| Self::process_subscription_update( | ||
| &this, pubkey, update, | ||
| ) | ||
| .await; | ||
| }); | ||
|
|
||
| // This will only be true in the following case: | ||
| // 1. a commit was triggered for the account | ||
| // 2. a commit + undelegate was triggered for the account -> undelegating | ||
| // 3. we receive the update for (1.) | ||
| // | ||
| // Thus our state is more up to date and we don't need to update our | ||
| // bank. | ||
| if account_still_undelegating_on_chain( | ||
| &pubkey, | ||
| account.delegated(), | ||
| in_bank.remote_slot(), | ||
| deleg_record, | ||
| &this.validator_pubkey, | ||
| ) { | ||
| return; | ||
| } | ||
| } else if in_bank.owner().eq(&dlp::id()) { | ||
| debug!( | ||
| pubkey = %pubkey, | ||
| "Received update for account owned by delegation program but not marked as undelegating" | ||
| while let Some(result) = pending_tasks.try_join_next() { | ||
| if let Err(err) = result { |
There was a problem hiding this comment.
Task backlog is still effectively unbounded under sustained update load.
pending_tasks.spawn(...) runs for each drained update without a concurrency cap, so pressure shifts from the channel buffer to JoinSet growth.
💡 Suggested bounded-concurrency guard
pub fn start_subscription_listener(
self: Arc<Self>,
mut subscription_updates: mpsc::Receiver<ForwardedSubscriptionUpdate>,
) {
tokio::spawn(async move {
+ const MAX_PENDING_SUB_UPDATE_TASKS: usize = 512;
let mut pending_tasks: JoinSet<()> = JoinSet::new();
loop {
match subscription_updates.try_recv() {
Ok(update) => {
+ while pending_tasks.len() >= MAX_PENDING_SUB_UPDATE_TASKS {
+ if let Some(result) = pending_tasks.join_next().await {
+ if let Err(err) = result {
+ error!(error = ?err, "Subscription update task panicked");
+ }
+ }
+ }
let pubkey = update.pubkey;
trace!(
pubkey = %pubkey,
"FetchCloner received subscription update"
);
@@
let Some(update) = maybe_update else {
@@
break;
};
+ while pending_tasks.len() >= MAX_PENDING_SUB_UPDATE_TASKS {
+ if let Some(result) = pending_tasks.join_next().await {
+ if let Err(err) = result {
+ error!(error = ?err, "Subscription update task panicked");
+ }
+ }
+ }
let pubkey = update.pubkey;
let this = Arc::clone(&self);
pending_tasks.spawn(async move {Also applies to: 229-234
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs` around lines 188 -
206, The loop currently spawns a new task into pending_tasks for every
subscription update (pending_tasks.spawn(...) inside the
subscription_updates.try_recv loop), which can let the JoinSet grow unbounded
under sustained load; fix it by adding a bounded-concurrency guard (e.g., an
Arc<Semaphore> or explicit cap on pending_tasks.len()) that must acquire a
permit before spawning and releases it when the spawned
process_subscription_update future completes, or by waiting when the cap is
reached (await permit or await try_join_next) so you never spawn beyond the
limit; apply the same bounded-concurrency change to the other identical spawn
site that uses pending_tasks.spawn(...) and subscription_updates.try_recv so
both locations use the shared semaphore/cap before spawning.
| // Fail if ANY subscription failed | ||
| if !errors.is_empty() { | ||
| return Err( | ||
| RemoteAccountProviderError::AccountSubscriptionsTaskFailed( | ||
| format!( | ||
| "{} subscription(s) failed: [{}]", | ||
| errors.len(), | ||
| errors.join(", ") | ||
| ), | ||
| ), | ||
| ); |
There was a problem hiding this comment.
Subscription setup failure path still risks stale pending state.
This error return can leave entries previously inserted in fetching_accounts (from try_get_multi) without immediate rollback, which can keep accounts falsely pending and accumulate dead request senders across repeated failures.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@magicblock-chainlink/src/remote_account_provider/mod.rs` around lines 839 -
849, The failure return path that constructs
RemoteAccountProviderError::AccountSubscriptionsTaskFailed currently returns
while leaving entries inserted into fetching_accounts (from try_get_multi),
which can leave accounts stuck pending and accumulate dead request senders;
before returning the Err (where errors.is_empty() is checked and errors.join is
used), explicitly rollback those inserts by removing the corresponding keys from
fetching_accounts (and drop/close any pending request senders if applicable) so
the pending state and senders are cleaned up prior to returning the error.
Summary
Parallelizes sequential subscribe/unsubscribe calls in
magicblock-chainlinkto reduce latency whenfetching and cloning multiple accounts. Each subscribe call involves a network roundtrip; doing them
sequentially causes multiplicative delays that grow with the number of accounts in a batch.
Details
RemoteAccountProvider::setup_subscriptions—join_allReplaced the sequential
forloop (with early?abort) withfutures_util::future::join_allsoall subscription requests fly in parallel. Uses
join_all(nottry_join_all) to ensure everysubscribe attempt completes — preventing resource leaks in
fetching_accountsand ensuring alloneshot receivers get a response. Errors are collected and reported as a single
AccountSubscriptionsTaskFailederror.resolve_ata_with_eata_projection— parallel ATA/eATA subscriptions + delegation record fetchesCollected ATA and eATA pubkeys first, then subscribed to all of them in a single
join_allbatchinstead of awaiting each inside the loop.
Additionally, after joining the ATA/eATA fetch tasks (already parallel via
JoinSet), thedelegation record fetches — each a full subscribe→fetch→unsubscribe RPC cycle — are now batched
with
join_allinstead of being called sequentially per ATA result.Undelegation refresh checks — sync check + parallel
JoinSetSplit the per-account loop into two phases: (1) a synchronous bank lookup that immediately resolves
non-undelegating accounts, and (2) a
JoinSetthat runs all undelegating-account refresh checks(each with a 5-second timeout RPC call) in parallel. This avoids multiplicative timeout costs when
a batch contains several undelegating accounts.
Also replaced
FetchCloner'sderive(Clone)with a manualCloneimpl — the derive addsV: Clone, C: Clonebounds thatAccountsBankandClonerdon't satisfy; all fields are behindArcso the manual impl is trivially correct.start_subscription_listener— managedJoinSetloopReplaced unbounded
tokio::spawn(no tracking, no backpressure) with aJoinSet-based loop usingtry_recv/select!. This provides task lifecycle management and natural backpressure under highsubscription-update volume. Extracted the closure body into a
process_subscription_updatemethodfor readability.
Summary by CodeRabbit