From 677ec224507dfdd1c3dd7caf703bd4f9b71de915 Mon Sep 17 00:00:00 2001 From: Martin Saposnic Date: Fri, 2 Jan 2026 17:12:15 -0300 Subject: [PATCH] Load payments in parallel using std::thread::scope This significantly improves startup time for network-backed stores like VSS, where sequential reads incur per-key network latency. --- src/io/utils.rs | 157 +++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 134 insertions(+), 23 deletions(-) diff --git a/src/io/utils.rs b/src/io/utils.rs index 928d4031b..4c9ac1cd5 100644 --- a/src/io/utils.rs +++ b/src/io/utils.rs @@ -213,35 +213,61 @@ where } /// Read previously persisted payments information from the store. -pub(crate) fn read_payments( +pub(crate) fn read_payments( kv_store: Arc, logger: L, ) -> Result, std::io::Error> where L::Target: LdkLogger, { - let mut res = Vec::new(); - - for stored_key in KVStoreSync::list( + let keys = KVStoreSync::list( &*kv_store, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - )? { - let mut reader = Cursor::new(KVStoreSync::read( - &*kv_store, - PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - &stored_key, - )?); - let payment = PaymentDetails::read(&mut reader).map_err(|e| { - log_error!(logger, "Failed to deserialize PaymentDetails: {}", e); - std::io::Error::new( - std::io::ErrorKind::InvalidData, - "Failed to deserialize PaymentDetails", - ) - })?; - res.push(payment); - } - Ok(res) + )?; + + // Read all payments in parallel using scoped threads. + // This significantly improves performance for network-backed stores like VSS, + // where sequential reads would incur per-key network latency. + let results: Vec> = std::thread::scope(|s| { + let handles: Vec<_> = keys + .iter() + .map(|key| { + let kv_store = Arc::clone(&kv_store); + let logger = logger.clone(); + s.spawn(move || { + let mut reader = Cursor::new(KVStoreSync::read( + &*kv_store, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + key, + )?); + PaymentDetails::read(&mut reader).map_err(|e| { + log_error!(logger, "Failed to deserialize PaymentDetails: {}", e); + std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Failed to deserialize PaymentDetails", + ) + }) + }) + }) + .collect(); + + handles + .into_iter() + .map(|h| { + h.join() + .map_err(|_| { + std::io::Error::new( + std::io::ErrorKind::Other, + "Thread panicked while reading payment", + ) + }) + .and_then(|r| r) + }) + .collect() + }); + + results.into_iter().collect() } /// Read `OutputSweeper` state from the store. @@ -578,8 +604,22 @@ pub(crate) fn read_bdk_wallet_change_set( #[cfg(test)] mod tests { - use super::read_or_generate_seed_file; - use super::test_utils::random_storage_path; + use std::sync::Arc; + + use lightning::ln::channelmanager::PaymentId; + use lightning::util::persist::KVStoreSync; + use lightning::util::ser::Writeable; + use lightning::util::test_utils::TestLogger; + use lightning_types::payment::PaymentHash; + + use super::test_utils::{random_storage_path, InMemoryStore}; + use super::{ + read_or_generate_seed_file, read_payments, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + }; + use crate::hex_utils; + use crate::payment::store::{PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus}; + use crate::types::{DynStore, DynStoreWrapper}; #[test] fn generated_seed_is_readable() { @@ -589,4 +629,75 @@ mod tests { let read_seed_bytes = read_or_generate_seed_file(&rand_path.to_str().unwrap()).unwrap(); assert_eq!(expected_seed_bytes, read_seed_bytes); } + + #[test] + fn read_payments_parallel_returns_all_payments() { + let store: Arc = Arc::new(DynStoreWrapper(InMemoryStore::new())); + let logger = Arc::new(TestLogger::new()); + + // Create and persist multiple payments + let num_payments = 50; + let mut expected_payments = Vec::with_capacity(num_payments); + + for i in 0..num_payments { + let mut id_bytes = [0u8; 32]; + id_bytes[0] = i as u8; + id_bytes[1] = (i >> 8) as u8; + let payment_id = PaymentId(id_bytes); + + let payment = PaymentDetails::new( + payment_id, + PaymentKind::Spontaneous { hash: PaymentHash(id_bytes), preimage: None }, + Some(1000 * (i as u64 + 1)), + None, + PaymentDirection::Outbound, + PaymentStatus::Succeeded, + ); + + // Write payment to store + let key = hex_utils::to_string(&payment_id.0); + KVStoreSync::write( + &*store, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + &key, + payment.encode(), + ) + .unwrap(); + + expected_payments.push(payment); + } + + // Read all payments back using the parallel implementation + let read_back = read_payments(Arc::clone(&store), Arc::clone(&logger)).unwrap(); + + // Verify count matches + assert_eq!( + read_back.len(), + num_payments, + "Expected {} payments but got {}", + num_payments, + read_back.len() + ); + + // Verify all expected payments are present (order may differ due to parallelism) + for expected in &expected_payments { + assert!( + read_back + .iter() + .any(|p| p.id == expected.id && p.amount_msat == expected.amount_msat), + "Payment {:?} not found in read results", + expected.id + ); + } + } + + #[test] + fn read_payments_empty_store_returns_empty_vec() { + let store: Arc = Arc::new(DynStoreWrapper(InMemoryStore::new())); + let logger = Arc::new(TestLogger::new()); + + let payments = read_payments(Arc::clone(&store), Arc::clone(&logger)).unwrap(); + assert!(payments.is_empty(), "Expected empty vec for empty store"); + } }