Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 134 additions & 23 deletions src/io/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,35 +213,61 @@ where
}

/// Read previously persisted payments information from the store.
pub(crate) fn read_payments<L: Deref>(
pub(crate) fn read_payments<L: Deref + Clone + Send>(
kv_store: Arc<DynStore>, logger: L,
) -> Result<Vec<PaymentDetails>, std::io::Error>
where
L::Target: LdkLogger,
{
let mut res = Vec::new();

for stored_key in KVStoreSync::list(
let keys = KVStoreSync::list(
&*kv_store,
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
)? {
let mut reader = Cursor::new(KVStoreSync::read(
&*kv_store,
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
&stored_key,
)?);
let payment = PaymentDetails::read(&mut reader).map_err(|e| {
log_error!(logger, "Failed to deserialize PaymentDetails: {}", e);
std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Failed to deserialize PaymentDetails",
)
})?;
res.push(payment);
}
Ok(res)
)?;

// Read all payments in parallel using scoped threads.
// This significantly improves performance for network-backed stores like VSS,
// where sequential reads would incur per-key network latency.
let results: Vec<Result<PaymentDetails, std::io::Error>> = std::thread::scope(|s| {
let handles: Vec<_> = keys
.iter()
.map(|key| {
let kv_store = Arc::clone(&kv_store);
let logger = logger.clone();
s.spawn(move || {
let mut reader = Cursor::new(KVStoreSync::read(
&*kv_store,
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
key,
)?);
PaymentDetails::read(&mut reader).map_err(|e| {
log_error!(logger, "Failed to deserialize PaymentDetails: {}", e);
std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Failed to deserialize PaymentDetails",
)
})
})
})
.collect();

handles
.into_iter()
.map(|h| {
h.join()
.map_err(|_| {
std::io::Error::new(
std::io::ErrorKind::Other,
"Thread panicked while reading payment",
)
})
.and_then(|r| r)
})
.collect()
});

results.into_iter().collect()
}

/// Read `OutputSweeper` state from the store.
Expand Down Expand Up @@ -578,8 +604,22 @@ pub(crate) fn read_bdk_wallet_change_set(

#[cfg(test)]
mod tests {
use super::read_or_generate_seed_file;
use super::test_utils::random_storage_path;
use std::sync::Arc;

use lightning::ln::channelmanager::PaymentId;
use lightning::util::persist::KVStoreSync;
use lightning::util::ser::Writeable;
use lightning::util::test_utils::TestLogger;
use lightning_types::payment::PaymentHash;

use super::test_utils::{random_storage_path, InMemoryStore};
use super::{
read_or_generate_seed_file, read_payments, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
};
use crate::hex_utils;
use crate::payment::store::{PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus};
use crate::types::{DynStore, DynStoreWrapper};

#[test]
fn generated_seed_is_readable() {
Expand All @@ -589,4 +629,75 @@ mod tests {
let read_seed_bytes = read_or_generate_seed_file(&rand_path.to_str().unwrap()).unwrap();
assert_eq!(expected_seed_bytes, read_seed_bytes);
}

#[test]
fn read_payments_parallel_returns_all_payments() {
let store: Arc<DynStore> = Arc::new(DynStoreWrapper(InMemoryStore::new()));
let logger = Arc::new(TestLogger::new());

// Create and persist multiple payments
let num_payments = 50;
let mut expected_payments = Vec::with_capacity(num_payments);

for i in 0..num_payments {
let mut id_bytes = [0u8; 32];
id_bytes[0] = i as u8;
id_bytes[1] = (i >> 8) as u8;
let payment_id = PaymentId(id_bytes);

let payment = PaymentDetails::new(
payment_id,
PaymentKind::Spontaneous { hash: PaymentHash(id_bytes), preimage: None },
Some(1000 * (i as u64 + 1)),
None,
PaymentDirection::Outbound,
PaymentStatus::Succeeded,
);

// Write payment to store
let key = hex_utils::to_string(&payment_id.0);
KVStoreSync::write(
&*store,
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
&key,
payment.encode(),
)
.unwrap();

expected_payments.push(payment);
}

// Read all payments back using the parallel implementation
let read_back = read_payments(Arc::clone(&store), Arc::clone(&logger)).unwrap();

// Verify count matches
assert_eq!(
read_back.len(),
num_payments,
"Expected {} payments but got {}",
num_payments,
read_back.len()
);

// Verify all expected payments are present (order may differ due to parallelism)
for expected in &expected_payments {
assert!(
read_back
.iter()
.any(|p| p.id == expected.id && p.amount_msat == expected.amount_msat),
"Payment {:?} not found in read results",
expected.id
);
}
}

#[test]
fn read_payments_empty_store_returns_empty_vec() {
let store: Arc<DynStore> = Arc::new(DynStoreWrapper(InMemoryStore::new()));
let logger = Arc::new(TestLogger::new());

let payments = read_payments(Arc::clone(&store), Arc::clone(&logger)).unwrap();
assert!(payments.is_empty(), "Expected empty vec for empty store");
}
}