Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 76 additions & 46 deletions magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use std::sync::atomic::AtomicU16;

use dlp::state::DelegationRecord;
use futures_util::future::join_all;
use magicblock_accounts_db::traits::AccountsBank;
use magicblock_core::{
logger::log_trace_warn, token_programs::try_derive_eata_address_and_bump,
};
use magicblock_core::token_programs::try_derive_eata_address_and_bump;
use magicblock_metrics::metrics;
use solana_account::AccountSharedData;
use solana_pubkey::Pubkey;
Expand All @@ -13,7 +11,9 @@ use tracing::*;
use super::{delegation, types::AccountWithCompanion, FetchCloner};
use crate::{
cloner::{AccountCloneRequest, Cloner},
remote_account_provider::{ChainPubsubClient, ChainRpcClient},
remote_account_provider::{
ChainPubsubClient, ChainRpcClient, ResolvedAccountSharedData,
},
};

/// Resolves ATAs with eATA projection.
Expand Down Expand Up @@ -45,35 +45,18 @@ where
let mut accounts_to_clone = vec![];
let mut ata_join_set = JoinSet::new();

// Subscribe first so subsequent fetches are kept up-to-date
// Collect all pubkeys to subscribe to and spawn fetch tasks
let mut pubkeys_to_subscribe = vec![];

for (ata_pubkey, _, ata_info, ata_account_slot) in &atas {
if let Err(err) = this.subscribe_to_account(ata_pubkey).await {
static ATA_SUBSCRIPTION_FAILURE_COUNT: AtomicU16 =
AtomicU16::new(0);
log_trace_warn(
"Failed to subscribe to ATA",
"Failed to subscribe to ATAs",
&ata_pubkey,
&err,
1000,
&ATA_SUBSCRIPTION_FAILURE_COUNT,
);
}
// Collect ATA pubkey for subscription
pubkeys_to_subscribe.push(*ata_pubkey);

if let Some((eata, _)) =
try_derive_eata_address_and_bump(&ata_info.owner, &ata_info.mint)
{
if let Err(err) = this.subscribe_to_account(&eata).await {
static EATA_SUBSCRIPTION_FAILURE_COUNT: AtomicU16 =
AtomicU16::new(0);
log_trace_warn(
"Failed to subscribe to derived eATA",
"Failed to subscribe to derived eATAs",
&eata,
&err,
1000,
&EATA_SUBSCRIPTION_FAILURE_COUNT,
);
}
// Collect eATA pubkey for subscription
pubkeys_to_subscribe.push(eata);

let effective_slot = if let Some(min_slot) = min_context_slot {
min_slot.max(*ata_account_slot)
Expand All @@ -90,8 +73,38 @@ where
}
}

// Subscribe to all ATA and eATA accounts in parallel
let subscription_results = join_all(
pubkeys_to_subscribe
.iter()
.map(|pk| this.subscribe_to_account(pk)),
)
.await;

for (pubkey, result) in
pubkeys_to_subscribe.iter().zip(subscription_results)
{
if let Err(err) = result {
warn!(
pubkey = %pubkey,
err = ?err,
"Failed to subscribe to ATA/eATA account"
);
}
}

let ata_results = ata_join_set.join_all().await;

// Phase 1: Collect successfully resolved ATAs
struct AtaResolutionInput {
ata_pubkey: Pubkey,
ata_account: ResolvedAccountSharedData,
eata_pubkey: Pubkey,
eata_shared: Option<AccountSharedData>,
}

let mut ata_inputs: Vec<AtaResolutionInput> = Vec::new();

for result in ata_results {
let AccountWithCompanion {
pubkey: ata_pubkey,
Expand All @@ -110,31 +123,48 @@ where
}
};

// Defaults: clone the ATA as-is
let mut account_to_clone = ata_account.account_shared_data_cloned();
let mut commit_frequency_ms = None;
let mut delegated_to_other = None;

// If there's an eATA, try to use it + delegation record to project the ATA
if let Some(eata_acc) = maybe_eata_account {
let eata_shared = eata_acc.account_shared_data_cloned();
let eata_shared =
maybe_eata_account.map(|e| e.account_shared_data_cloned());
ata_inputs.push(AtaResolutionInput {
ata_pubkey,
ata_account,
eata_pubkey,
eata_shared,
});
}

if let Some(deleg) = delegation::fetch_and_parse_delegation_record(
// Phase 2: Fetch delegation records in parallel for all eATAs
let deleg_futures = ata_inputs.iter().filter_map(|input| {
input.eata_shared.as_ref().map(|_| {
delegation::fetch_and_parse_delegation_record(
this,
eata_pubkey,
input.eata_pubkey,
this.remote_account_provider.chain_slot(),
fetch_origin,
)
.await
{
})
});
let deleg_results: Vec<Option<DelegationRecord>> =
join_all(deleg_futures).await;

// Phase 3: Combine results
let mut deleg_iter = deleg_results.into_iter();
for input in ata_inputs {
let mut account_to_clone =
input.ata_account.account_shared_data_cloned();
let mut commit_frequency_ms = None;
let mut delegated_to_other = None;

if let Some(eata_shared) = &input.eata_shared {
if let Some(Some(deleg)) = deleg_iter.next() {
delegated_to_other =
delegation::get_delegated_to_other(this, &deleg);
commit_frequency_ms = Some(deleg.commit_frequency_ms);

if let Some(projected_ata) = this
.maybe_project_delegated_ata_from_eata(
ata_account.account_shared_data(),
&eata_shared,
input.ata_account.account_shared_data(),
eata_shared,
&deleg,
)
{
Expand All @@ -144,7 +174,7 @@ where
}

accounts_to_clone.push(AccountCloneRequest {
pubkey: ata_pubkey,
pubkey: input.ata_pubkey,
account: account_to_clone,
commit_frequency_ms,
delegated_to_other,
Expand Down
Loading
Loading