From 2d4a193c8af50bff5f28ad33c8e38d4568686e50 Mon Sep 17 00:00:00 2001 From: Swenschaeferjohann Date: Sun, 23 Nov 2025 13:05:33 -0500 Subject: [PATCH 1/4] parse ctoken accounts explicitly --- .gitignore | 3 +++ src/common/typedefs/account/v1.rs | 20 ++++++++++++++------ src/common/typedefs/account/v2.rs | 20 ++++++++++++++------ 3 files changed, 31 insertions(+), 12 deletions(-) diff --git a/.gitignore b/.gitignore index 8196c671..2e546578 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,6 @@ test-ledger/ minio test.db docker-compose.yml + +.cursor +**/photon.log \ No newline at end of file diff --git a/src/common/typedefs/account/v1.rs b/src/common/typedefs/account/v1.rs index 77e90793..c89e2f59 100644 --- a/src/common/typedefs/account/v1.rs +++ b/src/common/typedefs/account/v1.rs @@ -33,12 +33,20 @@ pub struct Account { impl Account { pub fn parse_token_data(&self) -> Result, IngesterError> { match self.data.as_ref() { - Some(data) if self.owner.0 == COMPRESSED_TOKEN_PROGRAM => { - let data_slice = data.data.0.as_slice(); - let token_data = TokenData::try_from_slice(data_slice).map_err(|e| { - IngesterError::ParserError(format!("Failed to parse token data: {:?}", e)) - })?; - Ok(Some(token_data)) + Some(data) => { + let is_v1_token = data.discriminator.0.to_le_bytes() == [2, 0, 0, 0, 0, 0, 0, 0]; + let is_v2_token = data.discriminator.0.to_le_bytes() == [0, 0, 0, 0, 0, 0, 0, 3]; + let is_sha_flat_token = data.discriminator.0.to_le_bytes() == [0, 0, 0, 0, 0, 0, 0, 4]; + + if self.owner.0 == COMPRESSED_TOKEN_PROGRAM && (is_v1_token || is_v2_token || is_sha_flat_token) { + let data_slice = data.data.0.as_slice(); + let token_data = TokenData::try_from_slice(data_slice).map_err(|e| { + IngesterError::ParserError(format!("Failed to parse token data: {:?}", e)) + })?; + Ok(Some(token_data)) + } else { + Ok(None) + } } _ => Ok(None), } diff --git a/src/common/typedefs/account/v2.rs b/src/common/typedefs/account/v2.rs index d377572b..dd7dfe3a 100644 --- a/src/common/typedefs/account/v2.rs +++ b/src/common/typedefs/account/v2.rs @@ -41,12 +41,20 @@ pub struct AccountV2 { impl AccountV2 { pub fn parse_token_data(&self) -> Result, IngesterError> { match self.data.as_ref() { - Some(data) if self.owner.0 == COMPRESSED_TOKEN_PROGRAM => { - let data_slice = data.data.0.as_slice(); - let token_data = TokenData::try_from_slice(data_slice).map_err(|e| { - IngesterError::ParserError(format!("Failed to parse token data: {:?}", e)) - })?; - Ok(Some(token_data)) + Some(data) => { + let is_v1_token = data.discriminator.0.to_le_bytes() == [2, 0, 0, 0, 0, 0, 0, 0]; + let is_v2_token = data.discriminator.0.to_le_bytes() == [0, 0, 0, 0, 0, 0, 0, 3]; + let is_sha_flat_token = data.discriminator.0.to_le_bytes() == [0, 0, 0, 0, 0, 0, 0, 4]; + + if self.owner.0 == COMPRESSED_TOKEN_PROGRAM && (is_v1_token || is_v2_token || is_sha_flat_token) { + let data_slice = data.data.0.as_slice(); + let token_data = TokenData::try_from_slice(data_slice).map_err(|e| { + IngesterError::ParserError(format!("Failed to parse token data: {:?}", e)) + })?; + Ok(Some(token_data)) + } else { + Ok(None) + } } _ => Ok(None), } From 21c40cb22d7a9cb2635dbd0d04dc807f85da370b Mon Sep 17 00:00:00 2001 From: Swenschaeferjohann Date: Sun, 23 Nov 2025 13:49:09 -0500 Subject: [PATCH 2/4] bump --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8e483cee..d72356ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4063,7 +4063,7 @@ dependencies = [ [[package]] name = "photon-indexer" -version = "0.51.0" +version = "0.51.1" dependencies = [ "anyhow", "ark-bn254 0.5.0", diff --git a/Cargo.toml b/Cargo.toml index 0c6f08a2..b74f4ef7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ name = "photon-indexer" publish = true readme = "README.md" repository = "https://github.com/helius-labs/photon" -version = "0.51.0" +version = "0.51.1" [[bin]] name = "photon" From 3dfde4fc05978068b55729bcf41a7b8b809e9382 Mon Sep 17 00:00:00 2001 From: Swenschaeferjohann Date: Thu, 27 Nov 2025 14:13:09 -0500 Subject: [PATCH 3/4] TEXT for sqlite --- .../method/get_compressed_account_balance.rs | 40 ++- .../method/get_compressed_balance_by_owner.rs | 38 ++- .../get_compressed_mint_token_holders.rs | 74 ++++-- .../get_compressed_token_account_balance.rs | 42 +++- .../get_compressed_token_balances_by_owner.rs | 71 ++++-- src/api/method/utils.rs | 27 +- src/ingester/persist/mod.rs | 236 +++++++++++++----- .../standard/m20220101_000001_init.rs | 12 +- .../standard/m20250206_000007_init.rs | 8 +- 9 files changed, 407 insertions(+), 141 deletions(-) diff --git a/src/api/method/get_compressed_account_balance.rs b/src/api/method/get_compressed_account_balance.rs index 418d5c4b..2ef8bf9a 100644 --- a/src/api/method/get_compressed_account_balance.rs +++ b/src/api/method/get_compressed_account_balance.rs @@ -1,11 +1,13 @@ use super::super::error::PhotonApiError; use super::utils::CompressedAccountRequest; -use super::utils::{parse_decimal, AccountBalanceResponse, AccountDataTable, LamportModel}; +use super::utils::{ + is_sqlite, parse_balance_string, parse_decimal, AccountBalanceResponse, AccountDataTable, + LamportModel, LamportModelString, +}; use crate::common::typedefs::context::Context; use crate::common::typedefs::unsigned_integer::UnsignedInteger; use crate::dao::generated::accounts; use sea_orm::{DatabaseConnection, EntityTrait, QueryFilter, QuerySelect}; -use sqlx::types::Decimal; pub async fn get_compressed_account_balance( conn: &DatabaseConnection, @@ -14,18 +16,32 @@ pub async fn get_compressed_account_balance( let context = Context::extract(conn).await?; let id = request.parse_id()?; - let balance = accounts::Entity::find() - .select_only() - .column(accounts::Column::Lamports) - .filter(id.filter(AccountDataTable::Accounts)) - .into_model::() - .one(conn) - .await? - .map(|x| x.lamports) - .unwrap_or(Decimal::from(0)); + let balance = if is_sqlite(conn) { + accounts::Entity::find() + .select_only() + .column(accounts::Column::Lamports) + .filter(id.filter(AccountDataTable::Accounts)) + .into_model::() + .one(conn) + .await? + .map(|x| parse_balance_string(&x.lamports)) + .transpose()? + .unwrap_or(0) + } else { + accounts::Entity::find() + .select_only() + .column(accounts::Column::Lamports) + .filter(id.filter(AccountDataTable::Accounts)) + .into_model::() + .one(conn) + .await? + .map(|x| parse_decimal(x.lamports)) + .transpose()? + .unwrap_or(0) + }; Ok(AccountBalanceResponse { - value: UnsignedInteger(parse_decimal(balance)?), + value: UnsignedInteger(balance), context, }) } diff --git a/src/api/method/get_compressed_balance_by_owner.rs b/src/api/method/get_compressed_balance_by_owner.rs index e5ebfebd..0268597c 100644 --- a/src/api/method/get_compressed_balance_by_owner.rs +++ b/src/api/method/get_compressed_balance_by_owner.rs @@ -1,5 +1,8 @@ use super::super::error::PhotonApiError; -use super::utils::{parse_decimal, AccountBalanceResponse, LamportModel}; +use super::utils::{ + is_sqlite, parse_balance_string, parse_decimal, AccountBalanceResponse, LamportModel, + LamportModelString, +}; use crate::common::typedefs::context::Context; use crate::common::typedefs::serializable_pubkey::SerializablePubkey; use crate::common::typedefs::unsigned_integer::UnsignedInteger; @@ -21,16 +24,29 @@ pub async fn get_compressed_balance_by_owner( let context = Context::extract(conn).await?; let owner = request.owner; - let balances = owner_balances::Entity::find() - .select_only() - .column(owner_balances::Column::Lamports) - .filter(owner_balances::Column::Owner.eq::>(owner.into())) - .into_model::() - .all(conn) - .await? - .iter() - .map(|x| parse_decimal(x.lamports)) - .collect::, PhotonApiError>>()?; + let balances = if is_sqlite(conn) { + owner_balances::Entity::find() + .select_only() + .column(owner_balances::Column::Lamports) + .filter(owner_balances::Column::Owner.eq::>(owner.into())) + .into_model::() + .all(conn) + .await? + .iter() + .map(|x| parse_balance_string(&x.lamports)) + .collect::, PhotonApiError>>()? + } else { + owner_balances::Entity::find() + .select_only() + .column(owner_balances::Column::Lamports) + .filter(owner_balances::Column::Owner.eq::>(owner.into())) + .into_model::() + .all(conn) + .await? + .iter() + .map(|x| parse_decimal(x.lamports)) + .collect::, PhotonApiError>>()? + }; let total_balance = balances.iter().sum::(); diff --git a/src/api/method/get_compressed_mint_token_holders.rs b/src/api/method/get_compressed_mint_token_holders.rs index c2ae1e98..d0750fca 100644 --- a/src/api/method/get_compressed_mint_token_holders.rs +++ b/src/api/method/get_compressed_mint_token_holders.rs @@ -1,6 +1,10 @@ use byteorder::{ByteOrder, LittleEndian}; -use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QueryOrder, QuerySelect}; +use sea_orm::{ + ColumnTrait, DatabaseConnection, EntityTrait, FromQueryResult, QueryFilter, QueryOrder, + QuerySelect, +}; use serde::{Deserialize, Serialize}; +use sqlx::types::Decimal; use utoipa::ToSchema; use crate::common::typedefs::bs58_string::Base58String; @@ -11,7 +15,19 @@ use crate::common::typedefs::unsigned_integer::UnsignedInteger; use crate::dao::generated::token_owner_balances; use super::super::error::PhotonApiError; -use super::utils::{parse_decimal, PAGE_LIMIT}; +use super::utils::{is_sqlite, parse_balance_string, parse_decimal, PAGE_LIMIT}; + +#[derive(FromQueryResult)] +struct TokenHolderModel { + pub owner: Vec, + pub amount: Decimal, +} + +#[derive(FromQueryResult)] +struct TokenHolderModelString { + pub owner: Vec, + pub amount: String, +} #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)] pub struct OwnerBalance { @@ -78,21 +94,47 @@ pub async fn get_compressed_mint_token_holders( } let limit = limit.map(|l| l.value()).unwrap_or(PAGE_LIMIT); - let items = token_owner_balances::Entity::find() - .filter(filter) - .order_by_desc(token_owner_balances::Column::Amount) - .order_by_desc(token_owner_balances::Column::Owner) - .limit(limit) - .all(conn) - .await? - .drain(..) - .map(|token_owner_balance| { - Ok(OwnerBalance { - owner: token_owner_balance.owner.try_into()?, - balance: UnsignedInteger(parse_decimal(token_owner_balance.amount)?), + let items = if is_sqlite(conn) { + token_owner_balances::Entity::find() + .select_only() + .column(token_owner_balances::Column::Owner) + .column(token_owner_balances::Column::Amount) + .filter(filter) + .order_by_desc(token_owner_balances::Column::Amount) + .order_by_desc(token_owner_balances::Column::Owner) + .limit(limit) + .into_model::() + .all(conn) + .await? + .drain(..) + .map(|token_owner_balance| { + Ok(OwnerBalance { + owner: token_owner_balance.owner.try_into()?, + balance: UnsignedInteger(parse_balance_string(&token_owner_balance.amount)?), + }) }) - }) - .collect::, PhotonApiError>>()?; + .collect::, PhotonApiError>>()? + } else { + token_owner_balances::Entity::find() + .select_only() + .column(token_owner_balances::Column::Owner) + .column(token_owner_balances::Column::Amount) + .filter(filter) + .order_by_desc(token_owner_balances::Column::Amount) + .order_by_desc(token_owner_balances::Column::Owner) + .limit(limit) + .into_model::() + .all(conn) + .await? + .drain(..) + .map(|token_owner_balance| { + Ok(OwnerBalance { + owner: token_owner_balance.owner.try_into()?, + balance: UnsignedInteger(parse_decimal(token_owner_balance.amount)?), + }) + }) + .collect::, PhotonApiError>>()? + }; let mut cursor = items.last().map(|item| { Base58String({ diff --git a/src/api/method/get_compressed_token_account_balance.rs b/src/api/method/get_compressed_token_account_balance.rs index a6fbbdd0..0ba0c347 100644 --- a/src/api/method/get_compressed_token_account_balance.rs +++ b/src/api/method/get_compressed_token_account_balance.rs @@ -4,10 +4,11 @@ use sea_orm::{DatabaseConnection, EntityTrait, QueryFilter, QuerySelect}; use serde::{Deserialize, Serialize}; use super::super::error::PhotonApiError; -use super::utils::{parse_decimal, AccountDataTable}; -use super::utils::{BalanceModel, CompressedAccountRequest}; +use super::utils::{ + is_sqlite, parse_balance_string, parse_decimal, AccountDataTable, BalanceModel, + BalanceModelString, CompressedAccountRequest, +}; use crate::common::typedefs::context::Context; -use sqlx::types::Decimal; use utoipa::ToSchema; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)] @@ -32,19 +33,34 @@ pub async fn get_compressed_token_account_balance( ) -> Result { let context = Context::extract(conn).await?; let id = request.parse_id()?; - let balance = token_accounts::Entity::find() - .select_only() - .column(token_accounts::Column::Amount) - .filter(id.filter(AccountDataTable::TokenAccounts)) - .into_model::() - .one(conn) - .await? - .map(|x| x.amount) - .unwrap_or(Decimal::from(0)); + + let balance = if is_sqlite(conn) { + token_accounts::Entity::find() + .select_only() + .column(token_accounts::Column::Amount) + .filter(id.filter(AccountDataTable::TokenAccounts)) + .into_model::() + .one(conn) + .await? + .map(|x| parse_balance_string(&x.amount)) + .transpose()? + .unwrap_or(0) + } else { + token_accounts::Entity::find() + .select_only() + .column(token_accounts::Column::Amount) + .filter(id.filter(AccountDataTable::TokenAccounts)) + .into_model::() + .one(conn) + .await? + .map(|x| parse_decimal(x.amount)) + .transpose()? + .unwrap_or(0) + }; Ok(GetCompressedTokenAccountBalanceResponse { value: TokenAccountBalance { - amount: UnsignedInteger(parse_decimal(balance)?), + amount: UnsignedInteger(balance), }, context, }) diff --git a/src/api/method/get_compressed_token_balances_by_owner.rs b/src/api/method/get_compressed_token_balances_by_owner.rs index 6298c610..57fe92cb 100644 --- a/src/api/method/get_compressed_token_balances_by_owner.rs +++ b/src/api/method/get_compressed_token_balances_by_owner.rs @@ -1,5 +1,9 @@ -use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QueryOrder, QuerySelect}; +use sea_orm::{ + ColumnTrait, DatabaseConnection, EntityTrait, FromQueryResult, QueryFilter, QueryOrder, + QuerySelect, +}; use serde::{Deserialize, Serialize}; +use sqlx::types::Decimal; use utoipa::ToSchema; use crate::common::typedefs::bs58_string::Base58String; @@ -10,7 +14,19 @@ use crate::common::typedefs::unsigned_integer::UnsignedInteger; use crate::dao::generated::token_owner_balances; use super::super::error::PhotonApiError; -use super::utils::{parse_decimal, PAGE_LIMIT}; +use super::utils::{is_sqlite, parse_balance_string, parse_decimal, PAGE_LIMIT}; + +#[derive(FromQueryResult)] +struct TokenOwnerBalanceModel { + pub mint: Vec, + pub amount: Decimal, +} + +#[derive(FromQueryResult)] +struct TokenOwnerBalanceModelString { + pub mint: Vec, + pub amount: String, +} #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)] pub struct TokenBalance { @@ -79,20 +95,45 @@ pub async fn get_compressed_token_balances_by_owner( } let limit = limit.map(|l| l.value()).unwrap_or(PAGE_LIMIT); - let items = token_owner_balances::Entity::find() - .filter(filter) - .order_by_asc(token_owner_balances::Column::Mint) - .limit(limit) - .all(conn) - .await? - .drain(..) - .map(|token_owner_balance| { - Ok(TokenBalance { - mint: token_owner_balance.mint.try_into()?, - balance: UnsignedInteger(parse_decimal(token_owner_balance.amount)?), + let items = if is_sqlite(conn) { + token_owner_balances::Entity::find() + .select_only() + .column(token_owner_balances::Column::Mint) + .column(token_owner_balances::Column::Amount) + .filter(filter) + .order_by_asc(token_owner_balances::Column::Mint) + .limit(limit) + .into_model::() + .all(conn) + .await? + .drain(..) + .map(|token_owner_balance| { + Ok(TokenBalance { + mint: token_owner_balance.mint.try_into()?, + balance: UnsignedInteger(parse_balance_string(&token_owner_balance.amount)?), + }) }) - }) - .collect::, PhotonApiError>>()?; + .collect::, PhotonApiError>>()? + } else { + token_owner_balances::Entity::find() + .select_only() + .column(token_owner_balances::Column::Mint) + .column(token_owner_balances::Column::Amount) + .filter(filter) + .order_by_asc(token_owner_balances::Column::Mint) + .limit(limit) + .into_model::() + .all(conn) + .await? + .drain(..) + .map(|token_owner_balance| { + Ok(TokenBalance { + mint: token_owner_balance.mint.try_into()?, + balance: UnsignedInteger(parse_decimal(token_owner_balance.amount)?), + }) + }) + .collect::, PhotonApiError>>()? + }; let mut cursor = items.last().map(|item| { Base58String({ diff --git a/src/api/method/utils.rs b/src/api/method/utils.rs index 1cf78467..9f874b08 100644 --- a/src/api/method/utils.rs +++ b/src/api/method/utils.rs @@ -10,8 +10,8 @@ use crate::dao::generated::{accounts, token_accounts}; use byteorder::{ByteOrder, LittleEndian}; use sea_orm::sea_query::SimpleExpr; use sea_orm::{ - ColumnTrait, ConnectionTrait, DatabaseConnection, EntityTrait, FromQueryResult, QueryFilter, - QueryOrder, QuerySelect, Statement, Value, + ColumnTrait, ConnectionTrait, DatabaseBackend, DatabaseConnection, EntityTrait, + FromQueryResult, QueryFilter, QueryOrder, QuerySelect, Statement, Value, }; use serde::{Deserialize, Serialize}; use solana_signature::Signature; @@ -35,6 +35,18 @@ pub fn parse_decimal(value: Decimal) -> Result { .map_err(|_| PhotonApiError::UnexpectedError("Invalid decimal value".to_string())) } +pub fn parse_balance_string(value: &str) -> Result { + // Handle potential decimal point from Decimal serialization + let value = value.split('.').next().unwrap_or(value); + value + .parse::() + .map_err(|_| PhotonApiError::UnexpectedError(format!("Invalid balance: {}", value))) +} + +pub fn is_sqlite(conn: &DatabaseConnection) -> bool { + conn.get_database_backend() == DatabaseBackend::Sqlite +} + pub(crate) fn parse_leaf_index(leaf_index: i64) -> Result { leaf_index .try_into() @@ -319,6 +331,17 @@ pub struct LamportModel { pub lamports: Decimal, } +// SQLite-specific models that read TEXT columns as String +#[derive(FromQueryResult)] +pub struct BalanceModelString { + pub amount: String, +} + +#[derive(FromQueryResult)] +pub struct LamportModelString { + pub lamports: String, +} + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema, Default)] #[serde(rename_all = "camelCase")] pub struct HashRequest { diff --git a/src/ingester/persist/mod.rs b/src/ingester/persist/mod.rs index 0f7db46a..f77cb663 100644 --- a/src/ingester/persist/mod.rs +++ b/src/ingester/persist/mod.rs @@ -313,6 +313,32 @@ fn bytes_to_sqlite_sql_format(bytes: Vec) -> String { format!("X'{}'", hex_string) // Properly formatted for SQLite BLOB } +fn option_bytes_to_sqlite(bytes: Option>) -> String { + match bytes { + Some(b) => bytes_to_sqlite_sql_format(b), + None => "NULL".to_string(), + } +} + +fn option_i64_to_sql(val: Option) -> String { + match val { + Some(v) => v.to_string(), + None => "NULL".to_string(), + } +} + +fn option_i32_to_sql(val: Option) -> String { + match val { + Some(v) => v.to_string(), + None => "NULL".to_string(), + } +} + +fn bool_to_sqlite(val: bool) -> &'static str { + if val { "1" } else { "0" } +} + + async fn execute_account_update_query_and_update_balances( txn: &DatabaseTransaction, mut query: Statement, @@ -344,11 +370,12 @@ async fn execute_account_update_query_and_update_balances( let prev_spent: Option = row.try_get("", "prev_spent")?; match (prev_spent, &modification_type) { (_, ModificationType::Append) | (Some(false), ModificationType::Spend) => { - let mut amount_of_interest = match db_backend { + let mut amount_of_interest: Decimal = match db_backend { DatabaseBackend::Postgres => row.try_get("", balance_column)?, DatabaseBackend::Sqlite => { - let amount: i64 = row.try_get("", balance_column)?; - Decimal::from(amount) + // SQLite stores as TEXT to preserve precision + let amount_str: String = row.try_get("", balance_column)?; + amount_str.parse().unwrap_or(Decimal::ZERO) } _ => panic!("Unsupported database backend"), }; @@ -375,16 +402,32 @@ async fn execute_account_update_query_and_update_balances( let values = balance_modifications .into_iter() .filter(|(_, value)| *value != Decimal::from(0)) - .map(|(key, value)| format!("({}, {})", key, value)) + .map(|(key, value)| { + if db_backend == DatabaseBackend::Sqlite { + // SQLite TEXT columns need quoted string values + format!("({}, '{}')", key, value) + } else { + format!("({}, {})", key, value) + } + }) .collect::>(); if !values.is_empty() { let values_string = values.join(", "); - let raw_sql = format!( - "INSERT INTO {owner_table_name} (owner {additional_columns}, {balance_column}) - VALUES {values_string} ON CONFLICT (owner{additional_columns}) - DO UPDATE SET {balance_column} = {owner_table_name}.{balance_column} + excluded.{balance_column}", - ); + let raw_sql = if db_backend == DatabaseBackend::Sqlite { + // SQLite: CAST TEXT to INTEGER for arithmetic, back to TEXT for storage + format!( + "INSERT INTO {owner_table_name} (owner {additional_columns}, {balance_column}) + VALUES {values_string} ON CONFLICT (owner{additional_columns}) + DO UPDATE SET {balance_column} = CAST(CAST({owner_table_name}.{balance_column} AS INTEGER) + CAST(excluded.{balance_column} AS INTEGER) AS TEXT)", + ) + } else { + format!( + "INSERT INTO {owner_table_name} (owner {additional_columns}, {balance_column}) + VALUES {values_string} ON CONFLICT (owner{additional_columns}) + DO UPDATE SET {balance_column} = {owner_table_name}.{balance_column} + excluded.{balance_column}", + ) + }; txn.execute(Statement::from_string(db_backend, raw_sql)) .await?; } @@ -422,37 +465,10 @@ async fn append_output_accounts( txn: &DatabaseTransaction, out_accounts: &[AccountWithContext], ) -> Result<(), IngesterError> { - let mut account_models = Vec::new(); + let db_backend = txn.get_database_backend(); let mut token_accounts = Vec::new(); for account in out_accounts { - account_models.push(accounts::ActiveModel { - hash: Set(account.account.hash.to_vec()), - address: Set(account.account.address.map(|x| x.to_bytes_vec())), - discriminator: Set(account - .account - .data - .as_ref() - .map(|x| Decimal::from(x.discriminator.0))), - data: Set(account.account.data.as_ref().map(|x| x.data.clone().0)), - data_hash: Set(account.account.data.as_ref().map(|x| x.data_hash.to_vec())), - tree: Set(account.account.tree.to_bytes_vec()), - queue: Set(Some(account.context.queue.to_bytes_vec())), - leaf_index: Set(account.account.leaf_index.0 as i64), - in_output_queue: Set(account.context.in_output_queue), - nullifier_queue_index: Set(account.context.nullifier_queue_index.map(|x| x.0 as i64)), - nullified_in_tree: Set(false), - tree_type: Set(Some(account.context.tree_type as i32)), - nullifier: Set(account.context.nullifier.as_ref().map(|x| x.to_vec())), - owner: Set(account.account.owner.to_bytes_vec()), - lamports: Set(Decimal::from(account.account.lamports.0)), - spent: Set(false), - slot_created: Set(account.account.slot_created.0 as i64), - seq: Set(account.account.seq.map(|x| x.0 as i64)), - prev_spent: Set(None), - tx_hash: Default::default(), // tx hashes are only set when an account is an input - }); - if let Some(token_data) = account.account.parse_token_data()? { token_accounts.push(EnrichedTokenAccount { token_data, @@ -462,13 +478,81 @@ async fn append_output_accounts( } if !out_accounts.is_empty() { - let query = accounts::Entity::insert_many(account_models) - .on_conflict( - OnConflict::column(accounts::Column::Hash) - .do_nothing() - .to_owned(), - ) - .build(txn.get_database_backend()); + let query = if db_backend == DatabaseBackend::Sqlite { + // SQLite: Use raw SQL with string values to preserve precision + let values: Vec = out_accounts + .iter() + .map(|account| { + let hash = bytes_to_sqlite_sql_format(account.account.hash.to_vec()); + let data = option_bytes_to_sqlite(account.account.data.as_ref().map(|x| x.data.clone().0)); + let data_hash = option_bytes_to_sqlite(account.account.data.as_ref().map(|x| x.data_hash.to_vec())); + let address = option_bytes_to_sqlite(account.account.address.map(|x| x.to_bytes_vec())); + let owner = bytes_to_sqlite_sql_format(account.account.owner.to_bytes_vec()); + let tree = bytes_to_sqlite_sql_format(account.account.tree.to_bytes_vec()); + let leaf_index = account.account.leaf_index.0 as i64; + let seq = option_i64_to_sql(account.account.seq.map(|x| x.0 as i64)); + let slot_created = account.account.slot_created.0 as i64; + let spent = bool_to_sqlite(false); + let prev_spent = "NULL"; + // Store lamports and discriminator as TEXT strings to preserve precision + let lamports = format!("'{}'", account.account.lamports.0); + let discriminator = match account.account.data.as_ref() { + Some(d) => format!("'{}'", d.discriminator.0), + None => "NULL".to_string(), + }; + let tree_type = option_i32_to_sql(Some(account.context.tree_type as i32)); + let nullified_in_tree = bool_to_sqlite(false); + let nullifier_queue_index = option_i64_to_sql(account.context.nullifier_queue_index.map(|x| x.0 as i64)); + let in_output_queue = bool_to_sqlite(account.context.in_output_queue); + let queue = option_bytes_to_sqlite(Some(account.context.queue.to_bytes_vec())); + let nullifier = option_bytes_to_sqlite(account.context.nullifier.as_ref().map(|x| x.to_vec())); + let tx_hash = "NULL"; + + format!( + "({hash}, {data}, {data_hash}, {address}, {owner}, {tree}, {leaf_index}, {seq}, {slot_created}, {spent}, {prev_spent}, {lamports}, {discriminator}, {tree_type}, {nullified_in_tree}, {nullifier_queue_index}, {in_output_queue}, {queue}, {nullifier}, {tx_hash})" + ) + }) + .collect(); + let values_string = values.join(", "); + let sql = format!( + "INSERT INTO accounts (hash, data, data_hash, address, owner, tree, leaf_index, seq, slot_created, spent, prev_spent, lamports, discriminator, tree_type, nullified_in_tree, nullifier_queue_index, in_output_queue, queue, nullifier, tx_hash) VALUES {values_string} ON CONFLICT (hash) DO NOTHING" + ); + Statement::from_string(db_backend, sql) + } else { + // PostgreSQL: Use SeaORM models + let account_models: Vec = out_accounts + .iter() + .map(|account| accounts::ActiveModel { + hash: Set(account.account.hash.to_vec()), + address: Set(account.account.address.map(|x| x.to_bytes_vec())), + discriminator: Set(account.account.data.as_ref().map(|x| Decimal::from(x.discriminator.0))), + data: Set(account.account.data.as_ref().map(|x| x.data.clone().0)), + data_hash: Set(account.account.data.as_ref().map(|x| x.data_hash.to_vec())), + tree: Set(account.account.tree.to_bytes_vec()), + queue: Set(Some(account.context.queue.to_bytes_vec())), + leaf_index: Set(account.account.leaf_index.0 as i64), + in_output_queue: Set(account.context.in_output_queue), + nullifier_queue_index: Set(account.context.nullifier_queue_index.map(|x| x.0 as i64)), + nullified_in_tree: Set(false), + tree_type: Set(Some(account.context.tree_type as i32)), + nullifier: Set(account.context.nullifier.as_ref().map(|x| x.to_vec())), + owner: Set(account.account.owner.to_bytes_vec()), + lamports: Set(Decimal::from(account.account.lamports.0)), + spent: Set(false), + slot_created: Set(account.account.slot_created.0 as i64), + seq: Set(account.account.seq.map(|x| x.0 as i64)), + prev_spent: Set(None), + tx_hash: Default::default(), + }) + .collect(); + accounts::Entity::insert_many(account_models) + .on_conflict( + OnConflict::column(accounts::Column::Hash) + .do_nothing() + .to_owned(), + ) + .build(db_backend) + }; execute_account_update_query_and_update_balances( txn, query, @@ -488,13 +572,42 @@ async fn append_output_accounts( pub async fn persist_token_accounts( txn: &DatabaseTransaction, - token_accounts: Vec, + enriched_token_accounts: Vec, ) -> Result<(), IngesterError> { - let token_models = token_accounts - .into_iter() - .map( - |EnrichedTokenAccount { token_data, hash }| token_accounts::ActiveModel { - hash: Set(hash.into()), + let db_backend = txn.get_database_backend(); + + let query = if db_backend == DatabaseBackend::Sqlite { + // SQLite: Use raw SQL with string values to preserve precision + let values: Vec = enriched_token_accounts + .iter() + .map(|EnrichedTokenAccount { token_data, hash }| { + let hash_sql = bytes_to_sqlite_sql_format(hash.clone().into()); + let owner = bytes_to_sqlite_sql_format(token_data.owner.to_bytes_vec()); + let mint = bytes_to_sqlite_sql_format(token_data.mint.to_bytes_vec()); + let delegate = option_bytes_to_sqlite(token_data.delegate.map(|d| d.to_bytes_vec())); + let state = token_data.state as i32; + let spent = bool_to_sqlite(false); + let prev_spent = "NULL"; + // Store amount as TEXT string to preserve precision + let amount = format!("'{}'", token_data.amount.0); + let tlv = option_bytes_to_sqlite(token_data.tlv.as_ref().map(|t| t.0.clone())); + + format!( + "({hash_sql}, {owner}, {mint}, {delegate}, {state}, {spent}, {prev_spent}, {amount}, {tlv})" + ) + }) + .collect(); + let values_string = values.join(", "); + let sql = format!( + "INSERT INTO token_accounts (hash, owner, mint, delegate, state, spent, prev_spent, amount, tlv) VALUES {values_string} ON CONFLICT (hash) DO NOTHING" + ); + Statement::from_string(db_backend, sql) + } else { + // PostgreSQL: Use SeaORM models + let token_models: Vec = enriched_token_accounts + .iter() + .map(|EnrichedTokenAccount { token_data, hash }| token_accounts::ActiveModel { + hash: Set(hash.clone().into()), mint: Set(token_data.mint.to_bytes_vec()), owner: Set(token_data.owner.to_bytes_vec()), amount: Set(Decimal::from(token_data.amount.0)), @@ -502,18 +615,17 @@ pub async fn persist_token_accounts( state: Set(token_data.state as i32), spent: Set(false), prev_spent: Set(None), - tlv: Set(token_data.tlv.map(|t| t.0)), - }, - ) - .collect::>(); - - let query = token_accounts::Entity::insert_many(token_models) - .on_conflict( - OnConflict::column(token_accounts::Column::Hash) - .do_nothing() - .to_owned(), - ) - .build(txn.get_database_backend()); + tlv: Set(token_data.tlv.as_ref().map(|t| t.0.clone())), + }) + .collect(); + token_accounts::Entity::insert_many(token_models) + .on_conflict( + OnConflict::column(token_accounts::Column::Hash) + .do_nothing() + .to_owned(), + ) + .build(db_backend) + }; execute_account_update_query_and_update_balances( txn, diff --git a/src/migration/migrations/standard/m20220101_000001_init.rs b/src/migration/migrations/standard/m20220101_000001_init.rs index 1f5fa7ff..e950ad7d 100644 --- a/src/migration/migrations/standard/m20220101_000001_init.rs +++ b/src/migration/migrations/standard/m20220101_000001_init.rs @@ -275,30 +275,30 @@ impl MigrationTrait for Migration { .await?; } DatabaseBackend::Sqlite => { - // HACK: SQLx Decimal is not compatible with INTEGER so we use REAL instead. - execute_sql(manager, "ALTER TABLE accounts ADD COLUMN lamports REAL;").await?; + // Use TEXT for large integers to preserve precision (f64/REAL loses precision beyond 2^53) + execute_sql(manager, "ALTER TABLE accounts ADD COLUMN lamports TEXT;").await?; execute_sql( manager, - "ALTER TABLE accounts ADD COLUMN discriminator REAL;", + "ALTER TABLE accounts ADD COLUMN discriminator TEXT;", ) .await?; execute_sql( manager, - "ALTER TABLE token_accounts ADD COLUMN amount REAL;", + "ALTER TABLE token_accounts ADD COLUMN amount TEXT;", ) .await?; execute_sql( manager, - "ALTER TABLE owner_balances ADD COLUMN lamports REAL;", + "ALTER TABLE owner_balances ADD COLUMN lamports TEXT;", ) .await?; execute_sql( manager, - "ALTER TABLE token_owner_balances ADD COLUMN amount REAL;", + "ALTER TABLE token_owner_balances ADD COLUMN amount TEXT;", ) .await?; } diff --git a/src/migration/migrations/standard/m20250206_000007_init.rs b/src/migration/migrations/standard/m20250206_000007_init.rs index ca777af6..ac758e8a 100644 --- a/src/migration/migrations/standard/m20250206_000007_init.rs +++ b/src/migration/migrations/standard/m20250206_000007_init.rs @@ -39,8 +39,8 @@ impl MigrationTrait for Migration { slot_created BIGINT NOT NULL, spent BOOLEAN NOT NULL, prev_spent BOOLEAN, - lamports REAL, - discriminator REAL, + lamports TEXT, + discriminator TEXT, in_output_queue BOOLEAN NOT NULL DEFAULT TRUE, nullifier BLOB, tx_hash BLOB, @@ -241,8 +241,8 @@ impl MigrationTrait for Migration { slot_created BIGINT NOT NULL, spent BOOLEAN NOT NULL, prev_spent BOOLEAN, - lamports REAL, - discriminator REAL + lamports TEXT, + discriminator TEXT ); INSERT INTO accounts_new From 743bebe2ba90f32446a64e5ccc4088487ba923b9 Mon Sep 17 00:00:00 2001 From: Swenschaeferjohann Date: Thu, 27 Nov 2025 14:16:55 -0500 Subject: [PATCH 4/4] clean --- .../get_compressed_mint_token_holders.rs | 39 +++++++------------ .../get_compressed_token_balances_by_owner.rs | 39 +++++++------------ src/api/method/utils.rs | 28 ++++++++++++- 3 files changed, 53 insertions(+), 53 deletions(-) diff --git a/src/api/method/get_compressed_mint_token_holders.rs b/src/api/method/get_compressed_mint_token_holders.rs index d0750fca..75b5d519 100644 --- a/src/api/method/get_compressed_mint_token_holders.rs +++ b/src/api/method/get_compressed_mint_token_holders.rs @@ -1,10 +1,6 @@ use byteorder::{ByteOrder, LittleEndian}; -use sea_orm::{ - ColumnTrait, DatabaseConnection, EntityTrait, FromQueryResult, QueryFilter, QueryOrder, - QuerySelect, -}; +use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QueryOrder, QuerySelect}; use serde::{Deserialize, Serialize}; -use sqlx::types::Decimal; use utoipa::ToSchema; use crate::common::typedefs::bs58_string::Base58String; @@ -15,19 +11,10 @@ use crate::common::typedefs::unsigned_integer::UnsignedInteger; use crate::dao::generated::token_owner_balances; use super::super::error::PhotonApiError; -use super::utils::{is_sqlite, parse_balance_string, parse_decimal, PAGE_LIMIT}; - -#[derive(FromQueryResult)] -struct TokenHolderModel { - pub owner: Vec, - pub amount: Decimal, -} - -#[derive(FromQueryResult)] -struct TokenHolderModelString { - pub owner: Vec, - pub amount: String, -} +use super::utils::{ + is_sqlite, parse_balance_string, parse_decimal, OwnerBalanceModel, OwnerBalanceModelString, + PAGE_LIMIT, +}; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)] pub struct OwnerBalance { @@ -103,14 +90,14 @@ pub async fn get_compressed_mint_token_holders( .order_by_desc(token_owner_balances::Column::Amount) .order_by_desc(token_owner_balances::Column::Owner) .limit(limit) - .into_model::() + .into_model::() .all(conn) .await? .drain(..) - .map(|token_owner_balance| { + .map(|m| { Ok(OwnerBalance { - owner: token_owner_balance.owner.try_into()?, - balance: UnsignedInteger(parse_balance_string(&token_owner_balance.amount)?), + owner: m.owner.try_into()?, + balance: UnsignedInteger(parse_balance_string(&m.amount)?), }) }) .collect::, PhotonApiError>>()? @@ -123,14 +110,14 @@ pub async fn get_compressed_mint_token_holders( .order_by_desc(token_owner_balances::Column::Amount) .order_by_desc(token_owner_balances::Column::Owner) .limit(limit) - .into_model::() + .into_model::() .all(conn) .await? .drain(..) - .map(|token_owner_balance| { + .map(|m| { Ok(OwnerBalance { - owner: token_owner_balance.owner.try_into()?, - balance: UnsignedInteger(parse_decimal(token_owner_balance.amount)?), + owner: m.owner.try_into()?, + balance: UnsignedInteger(parse_decimal(m.amount)?), }) }) .collect::, PhotonApiError>>()? diff --git a/src/api/method/get_compressed_token_balances_by_owner.rs b/src/api/method/get_compressed_token_balances_by_owner.rs index 57fe92cb..7bd9bcc8 100644 --- a/src/api/method/get_compressed_token_balances_by_owner.rs +++ b/src/api/method/get_compressed_token_balances_by_owner.rs @@ -1,9 +1,5 @@ -use sea_orm::{ - ColumnTrait, DatabaseConnection, EntityTrait, FromQueryResult, QueryFilter, QueryOrder, - QuerySelect, -}; +use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QueryOrder, QuerySelect}; use serde::{Deserialize, Serialize}; -use sqlx::types::Decimal; use utoipa::ToSchema; use crate::common::typedefs::bs58_string::Base58String; @@ -14,19 +10,10 @@ use crate::common::typedefs::unsigned_integer::UnsignedInteger; use crate::dao::generated::token_owner_balances; use super::super::error::PhotonApiError; -use super::utils::{is_sqlite, parse_balance_string, parse_decimal, PAGE_LIMIT}; - -#[derive(FromQueryResult)] -struct TokenOwnerBalanceModel { - pub mint: Vec, - pub amount: Decimal, -} - -#[derive(FromQueryResult)] -struct TokenOwnerBalanceModelString { - pub mint: Vec, - pub amount: String, -} +use super::utils::{ + is_sqlite, parse_balance_string, parse_decimal, MintBalanceModel, MintBalanceModelString, + PAGE_LIMIT, +}; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)] pub struct TokenBalance { @@ -103,14 +90,14 @@ pub async fn get_compressed_token_balances_by_owner( .filter(filter) .order_by_asc(token_owner_balances::Column::Mint) .limit(limit) - .into_model::() + .into_model::() .all(conn) .await? .drain(..) - .map(|token_owner_balance| { + .map(|m| { Ok(TokenBalance { - mint: token_owner_balance.mint.try_into()?, - balance: UnsignedInteger(parse_balance_string(&token_owner_balance.amount)?), + mint: m.mint.try_into()?, + balance: UnsignedInteger(parse_balance_string(&m.amount)?), }) }) .collect::, PhotonApiError>>()? @@ -122,14 +109,14 @@ pub async fn get_compressed_token_balances_by_owner( .filter(filter) .order_by_asc(token_owner_balances::Column::Mint) .limit(limit) - .into_model::() + .into_model::() .all(conn) .await? .drain(..) - .map(|token_owner_balance| { + .map(|m| { Ok(TokenBalance { - mint: token_owner_balance.mint.try_into()?, - balance: UnsignedInteger(parse_decimal(token_owner_balance.amount)?), + mint: m.mint.try_into()?, + balance: UnsignedInteger(parse_decimal(m.amount)?), }) }) .collect::, PhotonApiError>>()? diff --git a/src/api/method/utils.rs b/src/api/method/utils.rs index 9f874b08..6a001376 100644 --- a/src/api/method/utils.rs +++ b/src/api/method/utils.rs @@ -321,6 +321,7 @@ impl CompressedAccountRequest { } } +// Query result models - Decimal for PostgreSQL, String for SQLite #[derive(FromQueryResult)] pub struct BalanceModel { pub amount: Decimal, @@ -331,7 +332,6 @@ pub struct LamportModel { pub lamports: Decimal, } -// SQLite-specific models that read TEXT columns as String #[derive(FromQueryResult)] pub struct BalanceModelString { pub amount: String, @@ -342,6 +342,32 @@ pub struct LamportModelString { pub lamports: String, } +// Generic query result with mint + balance +#[derive(FromQueryResult)] +pub struct MintBalanceModel { + pub mint: Vec, + pub amount: Decimal, +} + +#[derive(FromQueryResult)] +pub struct MintBalanceModelString { + pub mint: Vec, + pub amount: String, +} + +#[derive(FromQueryResult)] +pub struct OwnerBalanceModel { + pub owner: Vec, + pub amount: Decimal, +} + +#[derive(FromQueryResult)] +pub struct OwnerBalanceModelString { + pub owner: Vec, + pub amount: String, +} + + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema, Default)] #[serde(rename_all = "camelCase")] pub struct HashRequest {