Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 52 additions & 25 deletions src/io/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ use lightning::ln::msgs::DecodeError;
use lightning::routing::gossip::NetworkGraph;
use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringDecayParameters};
use lightning::util::persist::{
KVSTORE_NAMESPACE_KEY_ALPHABET, KVSTORE_NAMESPACE_KEY_MAX_LEN, NETWORK_GRAPH_PERSISTENCE_KEY,
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
OUTPUT_SWEEPER_PERSISTENCE_KEY, OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY,
SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
KVSTORE_NAMESPACE_KEY_ALPHABET, KVSTORE_NAMESPACE_KEY_MAX_LEN,
NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_KEY,
OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
};
use lightning::util::ser::{Readable, ReadableArgs, Writeable};
use lightning::util::string::PrintableString;
Expand Down Expand Up @@ -203,33 +204,59 @@ where
}

/// Read previously persisted payments information from the store.
pub(crate) fn read_payments<L: Deref>(
pub(crate) fn read_payments<L: Deref + Clone + Send>(
kv_store: Arc<DynStore>, logger: L,
) -> Result<Vec<PaymentDetails>, std::io::Error>
where
L::Target: LdkLogger,
{
let mut res = Vec::new();

for stored_key in kv_store.list(
let keys = kv_store.list(
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
)? {
let mut reader = Cursor::new(kv_store.read(
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
&stored_key,
)?);
let payment = PaymentDetails::read(&mut reader).map_err(|e| {
log_error!(logger, "Failed to deserialize PaymentDetails: {}", e);
std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Failed to deserialize PaymentDetails",
)
})?;
res.push(payment);
}
Ok(res)
)?;

// Read all payments in parallel using scoped threads.
// This significantly improves performance for network-backed stores like VSS,
// where sequential reads would incur per-key network latency.
let results: Vec<Result<PaymentDetails, std::io::Error>> = std::thread::scope(|s| {
let handles: Vec<_> = keys
.iter()
.map(|key| {
let kv_store = Arc::clone(&kv_store);
let logger = logger.clone();
s.spawn(move || {
let mut reader = Cursor::new(kv_store.read(
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
key,
)?);
PaymentDetails::read(&mut reader).map_err(|e| {
log_error!(logger, "Failed to deserialize PaymentDetails: {}", e);
std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Failed to deserialize PaymentDetails",
)
})
})
})
.collect();

handles
.into_iter()
.map(|h| {
h.join()
.map_err(|_| {
std::io::Error::new(
std::io::ErrorKind::Other,
"Thread panicked while reading payment",
)
})
.and_then(|r| r)
})
.collect()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the docs:

All threads spawned within the scope that haven’t been manually joined will be automatically joined before this function returns.

Does that mean we do not need to manually join here? Can errors be mapped on line 259?

});

results.into_iter().collect()
}

/// Read `OutputSweeper` state from the store.
Expand Down
Loading