diff --git a/Cargo.lock b/Cargo.lock index c20c1197..fe358dca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,6 +26,12 @@ dependencies = [ "libc", ] +[[package]] +name = "arrayvec" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" + [[package]] name = "autocfg" version = "1.1.0" @@ -47,6 +53,16 @@ dependencies = [ "rustc-demangle", ] +[[package]] +name = "base58ck" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c8d66485a3a2ea485c1913c4572ce0256067a5377ac8c75c4960e1cda98605f" +dependencies = [ + "bitcoin-internals", + "bitcoin_hashes 0.14.1", +] + [[package]] name = "base64" version = "0.13.1" @@ -61,21 +77,26 @@ checksum = "cf9ff0bbfd639f15c74af777d81383cf53efb7c93613f6cab67c6c11e05bbf8b" [[package]] name = "bech32" -version = "0.9.1" +version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d86b93f97252c47b41663388e6d155714a9d0c398b99f1005cbc5f978b29f445" +checksum = "32637268377fc7b10a8c6d51de3e7fba1ce5dd371a96e342b34e6078db558e7f" [[package]] name = "bitcoin" -version = "0.30.2" +version = "0.32.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1945a5048598e4189e239d3f809b19bdad4845c4b2ba400d304d2dcf26d2c462" +checksum = "1e499f9fc0407f50fe98af744ab44fa67d409f76b6772e1689ec8485eb0c0f66" dependencies = [ - "bech32 0.9.1", - "bitcoin-private", - "bitcoin_hashes", + "base58ck", + "bech32 0.11.1", + "bitcoin-internals", + "bitcoin-io", + "bitcoin-units", + "bitcoin_hashes 0.14.1", + "hex-conservative", "hex_lit", "secp256k1", + "serde", ] [[package]] @@ -87,12 +108,37 @@ dependencies = [ "bech32 0.8.1", ] +[[package]] +name = "bitcoin-internals" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30bdbe14aa07b06e6cfeffc529a1f099e5fbe249524f8125358604df99a4bed2" +dependencies = [ + "serde", +] + +[[package]] +name = "bitcoin-io" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dee39a0ee5b4095224a0cfc6bf4cc1baf0f9624b96b367e53b66d974e51d953" + [[package]] name = "bitcoin-private" version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73290177011694f38ec25e165d0387ab7ea749a4b81cd4c80dae5988229f7a57" +[[package]] +name = "bitcoin-units" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5285c8bcaa25876d07f37e3d30c303f2609179716e11d688f51e8f1fe70063e2" +dependencies = [ + "bitcoin-internals", + "serde", +] + [[package]] name = "bitcoin_hashes" version = "0.12.0" @@ -102,6 +148,17 @@ dependencies = [ "bitcoin-private", ] +[[package]] +name = "bitcoin_hashes" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26ec84b80c482df901772e931a9a681e26a1b9ee2302edeff23cb30328745c8b" +dependencies = [ + "bitcoin-io", + "hex-conservative", + "serde", +] + [[package]] name = "bumpalo" version = "3.11.1" @@ -128,14 +185,14 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.23" +version = "0.4.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16b0a3d9ed01224b22057780a37bb8c5dbfe1be8ba48678e7bf57ec4b385411f" +checksum = "145052bdd345b87320e369255277e3fb5152762ad123a901ef5c262dd38fe8d2" dependencies = [ "iana-time-zone", - "num-integer", "num-traits", - "winapi", + "serde", + "windows-link", ] [[package]] @@ -204,18 +261,45 @@ dependencies = [ "syn 1.0.107", ] +[[package]] +name = "dnssec-prover" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec4f825369fc7134da70ca4040fddc8e03b80a46d249ae38d9c1c39b7b4476bf" +dependencies = [ + "bitcoin_hashes 0.14.1", + "tokio", +] + [[package]] name = "fuchsia-cprng" version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" +[[package]] +name = "getrandom" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + [[package]] name = "gimli" version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" +[[package]] +name = "hashbrown" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" + [[package]] name = "hermit-abi" version = "0.2.6" @@ -227,9 +311,12 @@ dependencies = [ [[package]] name = "hex-conservative" -version = "0.1.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30ed443af458ccb6d81c1e7e661545f94d3176752fb1df2f543b902a1e0f51e2" +checksum = "fda06d18ac606267c40c04e41b9947729bf8b9efe74bd4e82b61a5f26a510b9f" +dependencies = [ + "arrayvec", +] [[package]] name = "hex_lit" @@ -289,7 +376,9 @@ dependencies = [ "lightning", "lightning-background-processor", "lightning-block-sync", + "lightning-dns-resolver", "lightning-invoice", + "lightning-macros", "lightning-net-tokio", "lightning-persister", "lightning-rapid-gossip-sync", @@ -300,62 +389,117 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.151" +version = "0.2.179" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4" +checksum = "c5a2d376baa530d1238d133232d15e239abad80d05838b4b59354e5268af431f" + +[[package]] +name = "libm" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de" [[package]] name = "lightning" -version = "0.0.123" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fd92d4aa159374be430c7590e169b4a6c0fb79018f5bc4ea1bffde536384db3" +checksum = "4342d07db2b3fe7c9a73849e94d012ebcfa3588c25097daf0b5ff2857c04e0e1" dependencies = [ + "bech32 0.11.1", "bitcoin", - "hex-conservative", + "dnssec-prover", + "hashbrown", + "libm", + "lightning-invoice", + "lightning-macros", + "lightning-types", + "possiblyrandom", ] [[package]] name = "lightning-background-processor" -version = "0.0.123" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb1c2c64050e37cee7c3b6b022106523784055ac3ee572d360780a1d6fe8062c" +checksum = "abdc5450264184deba88b1dc61fa8d2ca905e21748bad556915757ac73d91103" dependencies = [ "bitcoin", + "bitcoin-io", + "bitcoin_hashes 0.14.1", "lightning", + "lightning-liquidity", "lightning-rapid-gossip-sync", + "possiblyrandom", ] [[package]] name = "lightning-block-sync" -version = "0.0.123" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61e1e70fa351daccede0c366cf16320b16a3e42b05ae3c7ec9c0df6b5d3a3e18" +checksum = "ee5069846b07a62aaecdaf25233e067bc69f245b7c8fd00cc9c217053221f875" dependencies = [ "bitcoin", "chunked_transfer", - "hex-conservative", "lightning", "serde_json", "tokio", ] +[[package]] +name = "lightning-dns-resolver" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2feda8a39d2ce9cc3e7a5d7be0fc2d0b820b4b648048a1b88e88ccfcdd5a6196" +dependencies = [ + "dnssec-prover", + "lightning", + "lightning-types", + "tokio", +] + [[package]] name = "lightning-invoice" -version = "0.31.0" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b85e5e14bcdb30d746e9785b04f27938292e8944f78f26517e01e91691f6b3f2" +dependencies = [ + "bech32 0.11.1", + "bitcoin", + "lightning-types", + "serde", +] + +[[package]] +name = "lightning-liquidity" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26d07d01cf197bf2184b929b7dc94aa70d935aac6df896c256a3a9475b7e9d40" +checksum = "58a6480d4d7726c49b4cd170b18a39563bbe897d0b8960be11d5e4a0cebd43b0" dependencies = [ - "bech32 0.9.1", "bitcoin", + "chrono", "lightning", - "secp256k1", + "lightning-invoice", + "lightning-macros", + "lightning-types", + "serde", + "serde_json", +] + +[[package]] +name = "lightning-macros" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80bd6063f4d0c34320f1db9193138c878e64142e6d1c42bd5f0124936e8764ec" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.47", ] [[package]] name = "lightning-net-tokio" -version = "0.0.123" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9e6a4d49c50a1344916d080dc8c012ce3a778cdd45de8def75350b2b40fe018" +checksum = "8055737e3d2d06240a3fdf10e26b2716110fcea90011a0839e8e82fc6e58ff5e" dependencies = [ "bitcoin", "lightning", @@ -364,9 +508,9 @@ dependencies = [ [[package]] name = "lightning-persister" -version = "0.0.123" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a8dd33971815fa074b05678e09a6d4b15c78225ea34d66ed4f17c35a53467a9" +checksum = "e6d78990de56ca75c5535c3f8e6f86b183a1aa8f521eb32afb9e8181f3bd91d7" dependencies = [ "bitcoin", "lightning", @@ -375,14 +519,25 @@ dependencies = [ [[package]] name = "lightning-rapid-gossip-sync" -version = "0.0.123" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d861b0f0cd5f8fe8c63760023c4fd4fd32c384881b41780b62ced2a8a619f91" +checksum = "b094f79f22713aa95194a166c77b2f6c7d68f9d76622a43552a29b8fe6fa92d0" dependencies = [ "bitcoin", + "bitcoin-io", + "bitcoin_hashes 0.14.1", "lightning", ] +[[package]] +name = "lightning-types" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5681708d3075bdff3a1b4daa400590e2703e7871bdc14e94ee7334fb6314ae40" +dependencies = [ + "bitcoin", +] + [[package]] name = "link-cplusplus" version = "1.0.8" @@ -427,16 +582,6 @@ dependencies = [ "windows-sys", ] -[[package]] -name = "num-integer" -version = "0.1.45" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9" -dependencies = [ - "autocfg", - "num-traits", -] - [[package]] name = "num-traits" version = "0.2.15" @@ -477,6 +622,15 @@ version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" +[[package]] +name = "possiblyrandom" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b122a615d72104fb3d8b26523fdf9232cd8ee06949fb37e4ce3ff964d15dffd" +dependencies = [ + "getrandom", +] + [[package]] name = "proc-macro2" version = "1.0.75" @@ -552,19 +706,20 @@ checksum = "ddccb15bcce173023b3fedd9436f882a0739b8dfb45e4f6b6002bee5929f61b2" [[package]] name = "secp256k1" -version = "0.27.0" +version = "0.29.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25996b82292a7a57ed3508f052cfff8640d38d32018784acd714758b43da9c8f" +checksum = "9465315bc9d4566e1724f0fffcbcc446268cb522e60f9a27bcded6b19c108113" dependencies = [ - "bitcoin_hashes", + "bitcoin_hashes 0.12.0", "secp256k1-sys", + "serde", ] [[package]] name = "secp256k1-sys" -version = "0.8.1" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70a129b9e9efbfb223753b9163c4ab3b13cff7fd9c7f010fbac25ab4099fa07e" +checksum = "d4387882333d3aa8cb20530a17c69a3752e97837832f34f6dccc760e715001d9" dependencies = [ "cc", ] @@ -574,6 +729,20 @@ name = "serde" version = "1.0.152" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bb7d1f0d3021d347a83e556fc4683dea2ea09d87bccdf88ff5c12545d89d5efb" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.152" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af487d118eecd09402d70a5d72551860e788df87b464af30e5ea6a38c75c541e" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.107", +] [[package]] name = "serde_json" @@ -758,6 +927,12 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-link" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" + [[package]] name = "windows-sys" version = "0.48.0" diff --git a/Cargo.toml b/Cargo.toml index 20ebddb9..91750b0b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,14 +8,15 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -lightning = { version = "0.1.0", features = ["dnssec"] } -lightning-block-sync = { version = "0.1.0", features = [ "rpc-client", "tokio" ] } -lightning-dns-resolver = { version = "0.2.0" } -lightning-invoice = { version = "0.33.0" } -lightning-net-tokio = { version = "0.1.0" } -lightning-persister = { version = "0.1.0" } -lightning-background-processor = { version = "0.1.0", features = [ "futures" ] } -lightning-rapid-gossip-sync = { version = "0.1.0" } +lightning = { version = "0.2.0", features = ["dnssec"] } +lightning-block-sync = { version = "0.2.0", features = [ "rpc-client", "tokio" ] } +lightning-dns-resolver = { version = "0.3.0" } +lightning-invoice = { version = "0.34.0" } +lightning-net-tokio = { version = "0.2.0" } +lightning-persister = { version = "0.2.0", features = [ "tokio" ] } +lightning-background-processor = { version = "0.2.0" } +lightning-rapid-gossip-sync = { version = "0.2.0" } +lightning-macros = { version = "0.2.0" } base64 = "0.13.0" bitcoin = "0.32" @@ -26,7 +27,7 @@ libc = "0.2" chrono = { version = "0.4", default-features = false, features = ["clock"] } rand = "0.4" serde_json = { version = "1.0" } -tokio = { version = "1", features = [ "io-util", "macros", "rt", "rt-multi-thread", "sync", "net", "time" ] } +tokio = { version = "1", features = [ "io-util", "macros", "rt", "rt-multi-thread", "sync", "net", "time", "io-std" ] } [profile.release] panic = "abort" diff --git a/src/bitcoind_client.rs b/src/bitcoind_client.rs index f16c5f41..71dfc09f 100644 --- a/src/bitcoind_client.rs +++ b/src/bitcoind_client.rs @@ -19,6 +19,7 @@ use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, use lightning::events::bump_transaction::{Utxo, WalletSource}; use lightning::log_error; use lightning::sign::ChangeDestinationSource; +use lightning::util::async_poll::AsyncResult; use lightning::util::logger::Logger; use lightning_block_sync::http::HttpEndpoint; use lightning_block_sync::rpc::RpcClient; @@ -31,7 +32,7 @@ use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use std::time::Duration; -use tokio::runtime::{self, Runtime}; +use tokio::runtime::Handle; pub struct BitcoindClient { pub(crate) bitcoind_rpc_client: Arc, @@ -41,8 +42,7 @@ pub struct BitcoindClient { rpc_user: String, rpc_password: String, fees: Arc>, - main_runtime_handle: runtime::Handle, - inner_runtime: Arc, + main_runtime_handle: Handle, logger: Arc, } @@ -70,7 +70,7 @@ const MIN_FEERATE: u32 = 253; impl BitcoindClient { pub(crate) async fn new( host: String, port: u16, rpc_user: String, rpc_password: String, network: Network, - handle: runtime::Handle, logger: Arc, + handle: Handle, logger: Arc, ) -> std::io::Result { let http_endpoint = HttpEndpoint::for_host(host.clone()).with_port(port); let rpc_credentials = @@ -99,15 +99,6 @@ impl BitcoindClient { fees.insert(ConfirmationTarget::ChannelCloseMinimum, AtomicU32::new(MIN_FEERATE)); fees.insert(ConfirmationTarget::OutputSpendingFee, AtomicU32::new(MIN_FEERATE)); - let mut builder = runtime::Builder::new_multi_thread(); - let runtime = - builder.enable_all().worker_threads(1).thread_name("rpc-worker").build().unwrap(); - let inner_runtime = Arc::new(runtime); - // Tokio will panic if we drop a runtime while in another runtime. Because the entire - // application runs inside a tokio runtime, we have to ensure this runtime is never - // `drop`'d, which we do by leaking an Arc reference. - std::mem::forget(Arc::clone(&inner_runtime)); - let client = Self { bitcoind_rpc_client: Arc::new(bitcoind_rpc_client), host, @@ -117,7 +108,6 @@ impl BitcoindClient { network, fees: Arc::new(fees), main_runtime_handle: handle.clone(), - inner_runtime, logger, }; BitcoindClient::poll_for_fee_estimates( @@ -130,7 +120,7 @@ impl BitcoindClient { fn poll_for_fee_estimates( fees: Arc>, rpc_client: Arc, - handle: tokio::runtime::Handle, + handle: Handle, ) { handle.spawn(async move { loop { @@ -240,39 +230,6 @@ impl BitcoindClient { }); } - fn run_future_in_blocking_context(&self, future: F) -> F::Output - where - F::Output: Send + 'static, - { - // Tokio deliberately makes it nigh impossible to block on a future in a sync context that - // is running in an async task (which makes it really hard to interact with sync code that - // has callbacks in an async project). - // - // Reading the docs, it *seems* like - // `tokio::task::block_in_place(tokio::runtime::Handle::spawn(future))` should do the - // trick, and 99.999% of the time it does! But tokio has a "non-stealable I/O driver" - if - // the task we're running happens to, by sheer luck, be holding the "I/O driver" when we go - // into a `block_in_place` call, and the inner future requires I/O (which of course it - // does, its a future!), the whole thing will come to a grinding halt as no other thread is - // allowed to poll I/O until the blocked one finishes. - // - // This is, of course, nuts, and an almost trivial performance penalty of occasional - // additional wakeups would solve this, but tokio refuses to do so because any performance - // penalty at all would be too much (tokio issue #4730). - // - // Instead, we have to do a rather insane dance - we have to spawn the `future` we want to - // run on a *different* (threaded) tokio runtime (doing the `block_in_place` dance to avoid - // blocking too many threads on the main runtime). We want to block on that `future` being - // run on the other runtime's threads, but tokio only provides `block_on` to do so, which - // runs the `future` itself on the current thread, panicing if this thread is already a - // part of a tokio runtime (which in this case it is - the main tokio runtime). Thus, we - // have to `spawn` the `future` on the secondary runtime and then `block_on` the resulting - // `JoinHandle` on the main runtime. - tokio::task::block_in_place(move || { - self.main_runtime_handle.block_on(self.inner_runtime.spawn(future)).unwrap() - }) - } - pub fn get_new_rpc_client(&self) -> RpcClient { let http_endpoint = HttpEndpoint::for_host(self.host.clone()).with_port(self.port); let rpc_credentials = base64::encode(format!("{}:{}", self.rpc_user, self.rpc_password)); @@ -406,59 +363,59 @@ impl BroadcasterInterface for BitcoindClient { } impl ChangeDestinationSource for BitcoindClient { - fn get_change_destination_script(&self) -> Result { - let future = self.get_new_address(); - Ok(self.run_future_in_blocking_context(async move { future.await.script_pubkey() })) + fn get_change_destination_script<'a>(&'a self) -> AsyncResult<'a, ScriptBuf, ()> { + Box::pin(async move { Ok(self.get_new_address().await.script_pubkey()) }) } } impl WalletSource for BitcoindClient { - fn list_confirmed_utxos(&self) -> Result, ()> { - let future = self.list_unspent(); - let utxos = self.run_future_in_blocking_context(async move { future.await.0 }); - Ok(utxos - .into_iter() - .filter_map(|utxo| { - let outpoint = OutPoint { txid: utxo.txid, vout: utxo.vout }; - let value = bitcoin::Amount::from_sat(utxo.amount); - match utxo.address.witness_program() { - Some(prog) if prog.is_p2wpkh() => { - WPubkeyHash::from_slice(prog.program().as_bytes()) - .map(|wpkh| Utxo::new_v0_p2wpkh(outpoint, value, &wpkh)) - .ok() - }, - Some(prog) if prog.is_p2tr() => { - // TODO: Add `Utxo::new_v1_p2tr` upstream. - XOnlyPublicKey::from_slice(prog.program().as_bytes()) - .map(|_| Utxo { - outpoint, - output: TxOut { - value, - script_pubkey: utxo.address.script_pubkey(), - }, - satisfaction_weight: 1 /* empty script_sig */ * WITNESS_SCALE_FACTOR as u64 + - 1 /* witness items */ + 1 /* schnorr sig len */ + 64, /* schnorr sig */ - }) - .ok() - }, - _ => None, - } - }) - .collect()) + fn list_confirmed_utxos<'a>(&'a self) -> AsyncResult<'a, Vec, ()> { + Box::pin(async move { + let utxos = self.list_unspent().await.0; + Ok(utxos + .into_iter() + .filter_map(|utxo| { + let outpoint = OutPoint { txid: utxo.txid, vout: utxo.vout }; + let value = bitcoin::Amount::from_sat(utxo.amount); + match utxo.address.witness_program() { + Some(prog) if prog.is_p2wpkh() => { + WPubkeyHash::from_slice(prog.program().as_bytes()) + .map(|wpkh| Utxo::new_v0_p2wpkh(outpoint, value, &wpkh)) + .ok() + }, + Some(prog) if prog.is_p2tr() => { + // TODO: Add `Utxo::new_v1_p2tr` upstream. + XOnlyPublicKey::from_slice(prog.program().as_bytes()) + .map(|_| Utxo { + outpoint, + output: TxOut { + value, + script_pubkey: utxo.address.script_pubkey(), + }, + satisfaction_weight: 1 /* empty script_sig */ * WITNESS_SCALE_FACTOR as u64 + + 1 /* witness items */ + 1 /* schnorr sig len */ + 64, /* schnorr sig */ + }) + .ok() + }, + _ => None, + } + }) + .collect()) + }) } - fn get_change_script(&self) -> Result { - let future = self.get_new_address(); - Ok(self.run_future_in_blocking_context(async move { future.await.script_pubkey() })) + fn get_change_script<'a>(&'a self) -> AsyncResult<'a, ScriptBuf, ()> { + Box::pin(async move { Ok(self.get_new_address().await.script_pubkey()) }) } - fn sign_psbt(&self, tx: Psbt) -> Result { - let mut tx_bytes = Vec::new(); - let _ = tx.unsigned_tx.consensus_encode(&mut tx_bytes).map_err(|_| ()); - let tx_hex = hex_utils::hex_str(&tx_bytes); - let future = self.sign_raw_transaction_with_wallet(tx_hex); - let signed_tx = self.run_future_in_blocking_context(async move { future.await }); - let signed_tx_bytes = hex_utils::to_vec(&signed_tx.hex).ok_or(())?; - Transaction::consensus_decode(&mut signed_tx_bytes.as_slice()).map_err(|_| ()) + fn sign_psbt<'a>(&'a self, tx: Psbt) -> AsyncResult<'a, Transaction, ()> { + Box::pin(async move { + let mut tx_bytes = Vec::new(); + let _ = tx.unsigned_tx.consensus_encode(&mut tx_bytes).map_err(|_| ()); + let tx_hex = hex_utils::hex_str(&tx_bytes); + let signed_tx = self.sign_raw_transaction_with_wallet(tx_hex).await; + let signed_tx_bytes = hex_utils::to_vec(&signed_tx.hex).ok_or(())?; + Transaction::consensus_decode(&mut signed_tx_bytes.as_slice()).map_err(|_| ()) + }) } } diff --git a/src/cli.rs b/src/cli.rs index 528cc73f..183eec5b 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -1,4 +1,4 @@ -use crate::disk::{self, INBOUND_PAYMENTS_FNAME, OUTBOUND_PAYMENTS_FNAME}; +use crate::disk::{INBOUND_PAYMENTS_FNAME, OUTBOUND_PAYMENTS_FNAME}; use crate::hex_utils; use crate::{ ChainMonitor, ChannelManager, HTLCStatus, InboundPaymentInfoStorage, MillisatAmount, @@ -9,10 +9,8 @@ use bitcoin::hashes::Hash; use bitcoin::network::Network; use bitcoin::secp256k1::PublicKey; use lightning::chain::channelmonitor::Balance; -use lightning::ln::bolt11_payment::payment_parameters_from_invoice; -use lightning::ln::bolt11_payment::payment_parameters_from_variable_amount_invoice; use lightning::ln::channelmanager::{ - Bolt11InvoiceParameters, PaymentId, RecipientOnionFields, Retry, + Bolt11InvoiceParameters, OptionalOfferPaymentParams, PaymentId, RecipientOnionFields, Retry, }; use lightning::ln::msgs::SocketAddress; use lightning::ln::types::ChannelId; @@ -20,7 +18,7 @@ use lightning::offers::offer::{self, Offer}; use lightning::onion_message::dns_resolution::HumanReadableName; use lightning::onion_message::messenger::Destination; use lightning::routing::gossip::NodeId; -use lightning::routing::router::{PaymentParameters, RouteParameters}; +use lightning::routing::router::{PaymentParameters, RouteParameters, RouteParametersConfig}; use lightning::sign::{EntropySource, KeysManager}; use lightning::types::payment::{PaymentHash, PaymentPreimage}; use lightning::util::config::{ChannelHandshakeConfig, ChannelHandshakeLimits, UserConfig}; @@ -31,11 +29,12 @@ use lightning_persister::fs_store::FilesystemStore; use std::env; use std::io::Write; use std::net::{SocketAddr, ToSocketAddrs}; -use std::path::Path; use std::str::FromStr; use std::sync::{Arc, Mutex}; use std::time::Duration; +use tokio::io::{AsyncBufReadExt, BufReader}; + pub(crate) struct LdkUserInfo { pub(crate) bitcoind_rpc_username: String, pub(crate) bitcoind_rpc_password: String, @@ -48,30 +47,31 @@ pub(crate) struct LdkUserInfo { pub(crate) network: Network, } -pub(crate) fn poll_for_user_input( +pub(crate) async fn poll_for_user_input( peer_manager: Arc, channel_manager: Arc, chain_monitor: Arc, keys_manager: Arc, network_graph: Arc, inbound_payments: Arc>, - outbound_payments: Arc>, ldk_data_dir: String, - fs_store: Arc, + outbound_payments: Arc>, fs_store: Arc, ) { println!( "LDK startup successful. Enter \"help\" to view available commands. Press Ctrl-D to quit." ); println!("LDK logs are available at /.ldk/logs"); println!("Local Node ID is {}.", channel_manager.get_our_node_id()); + + let mut input = BufReader::new(tokio::io::stdin()).lines(); 'read_command: loop { print!("> "); std::io::stdout().flush().unwrap(); // Without flushing, the `>` doesn't print - let mut line = String::new(); - if let Err(e) = std::io::stdin().read_line(&mut line) { - break println!("ERROR: {}", e); - } - - if line.len() == 0 { - // We hit EOF / Ctrl-D - break; - } + let line = match input.next_line().await { + Ok(Some(l)) => l, + Err(e) => { + break println!("ERROR: {}", e); + }, + Ok(None) => { + break println!("ERROR: End of stdin"); + }, + }; let mut words = line.split_whitespace(); if let Some(word) = words.next() { @@ -111,12 +111,8 @@ pub(crate) fn poll_for_user_input( }, }; - if tokio::runtime::Handle::current() - .block_on(connect_peer_if_necessary( - pubkey, - peer_addr, - peer_manager.clone(), - )) + if connect_peer_if_necessary(pubkey, peer_addr, peer_manager.clone()) + .await .is_err() { continue; @@ -142,24 +138,13 @@ pub(crate) fn poll_for_user_input( } } - if open_channel( + let _ = open_channel( pubkey, chan_amt_sat.unwrap(), announce_channel, with_anchors, channel_manager.clone(), - ) - .is_ok() - { - if peer_addr_str.is_some() { - let peer_data_path = - format!("{}/channel_peer_data", ldk_data_dir.clone()); - let _ = disk::persist_channel_peer( - Path::new(&peer_data_path), - peer_pubkey_and_ip_addr, - ); - } - } + ); }, "sendpayment" => { let invoice_str = words.next(); @@ -201,15 +186,17 @@ pub(crate) fn poll_for_user_input( print!("Paying offer for {} msat. Continue (Y/N)? >", amt_msat); std::io::stdout().flush().unwrap(); - if let Err(e) = std::io::stdin().read_line(&mut line) { - println!("ERROR: {}", e); - break 'read_command; - } - - if line.len() == 0 { - // We hit EOF / Ctrl-D - break 'read_command; - } + let line = match input.next_line().await { + Ok(Some(l)) => l, + Err(e) => { + println!("ERROR: {}", e); + break 'read_command; + }, + Ok(None) => { + println!("ERROR: End of stdin"); + break 'read_command; + }, + }; if line.starts_with("Y") { break; @@ -229,13 +216,16 @@ pub(crate) fn poll_for_user_input( }, ); fs_store - .write("", "", OUTBOUND_PAYMENTS_FNAME, &outbound_payments.encode()) + .write("", "", OUTBOUND_PAYMENTS_FNAME, outbound_payments.encode()) + .await .unwrap(); - let retry = Retry::Timeout(Duration::from_secs(10)); + let params = OptionalOfferPaymentParams { + retry_strategy: Retry::Timeout(Duration::from_secs(10)), + ..Default::default() + }; let amt = Some(amt_msat); - let pay = channel_manager - .pay_for_offer(&offer, None, amt, None, payment_id, retry, None); + let pay = channel_manager.pay_for_offer(&offer, amt, payment_id, params); if pay.is_ok() { println!("Payment in flight"); } else { @@ -292,14 +282,18 @@ pub(crate) fn poll_for_user_input( }, ); fs_store - .write("", "", OUTBOUND_PAYMENTS_FNAME, &outbound_payments.encode()) + .write("", "", OUTBOUND_PAYMENTS_FNAME, outbound_payments.encode()) + .await .unwrap(); - let retry = Retry::Timeout(Duration::from_secs(10)); - let pay = |a, b, c, d, e, f| { - channel_manager.pay_for_offer_from_human_readable_name(a, b, c, d, e, f) + let params = OptionalOfferPaymentParams { + retry_strategy: Retry::Timeout(Duration::from_secs(10)), + ..Default::default() + }; + let pay = |a, b, c, d, e| { + channel_manager.pay_for_offer_from_human_readable_name(a, b, c, d, e) }; - let pay = pay(hrn, amt_msat, payment_id, retry, None, dns_resolvers); + let pay = pay(hrn, amt_msat, payment_id, params, dns_resolvers); if pay.is_ok() { println!("Payment in flight"); } else { @@ -307,13 +301,16 @@ pub(crate) fn poll_for_user_input( } } else { match Bolt11Invoice::from_str(invoice_str) { - Ok(invoice) => send_payment( - &channel_manager, - &invoice, - user_provided_amt, - &mut outbound_payments.lock().unwrap(), - Arc::clone(&fs_store), - ), + Ok(invoice) => { + send_payment( + &channel_manager, + &invoice, + user_provided_amt, + &outbound_payments, + &*fs_store, + ) + .await + }, Err(e) => { println!("ERROR: invalid invoice: {:?}", e); }, @@ -353,12 +350,13 @@ pub(crate) fn poll_for_user_input( dest_pubkey, amt_msat, &*keys_manager, - &mut outbound_payments.lock().unwrap(), - Arc::clone(&fs_store), - ); + &outbound_payments, + &*fs_store, + ) + .await; }, "getoffer" => { - let offer_builder = channel_manager.create_offer_builder(None); + let offer_builder = channel_manager.create_offer_builder(); if let Err(e) = offer_builder { println!("ERROR: Failed to initiate offer building: {:?}", e); continue; @@ -410,16 +408,17 @@ pub(crate) fn poll_for_user_input( continue; } - let mut inbound_payments = inbound_payments.lock().unwrap(); - get_invoice( - amt_msat.unwrap(), - &mut inbound_payments, - &channel_manager, - expiry_secs.unwrap(), - ); - fs_store - .write("", "", INBOUND_PAYMENTS_FNAME, &inbound_payments.encode()) - .unwrap(); + let write_future = { + let mut inbound_payments = inbound_payments.lock().unwrap(); + get_invoice( + amt_msat.unwrap(), + &mut inbound_payments, + &channel_manager, + expiry_secs.unwrap(), + ); + fs_store.write("", "", INBOUND_PAYMENTS_FNAME, inbound_payments.encode()) + }; + write_future.await.unwrap(); }, "connectpeer" => { let peer_pubkey_and_ip_addr = words.next(); @@ -435,12 +434,8 @@ pub(crate) fn poll_for_user_input( continue; }, }; - if tokio::runtime::Handle::current() - .block_on(connect_peer_if_necessary( - pubkey, - peer_addr, - peer_manager.clone(), - )) + if connect_peer_if_necessary(pubkey, peer_addr, peer_manager.clone()) + .await .is_ok() { println!("SUCCESS: connected to peer {}", pubkey); @@ -618,9 +613,11 @@ fn node_info( let local_balance_sat = balances.iter().map(|b| b.claimable_amount_satoshis()).sum::(); println!("\t\t local_balance_sats: {}", local_balance_sat); let close_fees_map = |b| match b { - &Balance::ClaimableOnChannelClose { transaction_fee_satoshis, .. } => { - transaction_fee_satoshis - }, + &Balance::ClaimableOnChannelClose { + ref balance_candidates, + confirmed_balance_candidate_index, + .. + } => balance_candidates[confirmed_balance_candidate_index].transaction_fee_satoshis, _ => 0, }; let close_fees_sats = balances.iter().map(close_fees_map).sum::(); @@ -738,10 +735,8 @@ fn list_payments( pub(crate) async fn connect_peer_if_necessary( pubkey: PublicKey, peer_addr: SocketAddr, peer_manager: Arc, ) -> Result<(), ()> { - for peer_details in peer_manager.list_peers() { - if peer_details.counterparty_node_id == pubkey { - return Ok(()); - } + if peer_manager.peer_by_node_id(&pubkey).is_some() { + return Ok(()); } let res = do_connect_peer(pubkey, peer_addr, peer_manager).await; if res.is_err() { @@ -823,78 +818,76 @@ fn open_channel( } } -fn send_payment( +async fn send_payment( channel_manager: &ChannelManager, invoice: &Bolt11Invoice, required_amount_msat: Option, - outbound_payments: &mut OutboundPaymentInfoStorage, fs_store: Arc, + outbound_payments: &Mutex, fs_store: &FilesystemStore, ) { let payment_id = PaymentId((*invoice.payment_hash()).to_byte_array()); let payment_secret = Some(*invoice.payment_secret()); - let zero_amt_invoice = - invoice.amount_milli_satoshis().is_none() || invoice.amount_milli_satoshis() == Some(0); - let pay_params_opt = if zero_amt_invoice { - if let Some(amt_msat) = required_amount_msat { - payment_parameters_from_variable_amount_invoice(invoice, amt_msat) - } else { - println!("Need an amount for the given 0-value invoice"); - print!("> "); - return; - } - } else { - if required_amount_msat.is_some() && invoice.amount_milli_satoshis() != required_amount_msat - { + let amt_msat = match (invoice.amount_milli_satoshis(), required_amount_msat) { + // pay_for_bolt11_invoice only validates that the amount we pay is >= the invoice's + // required amount, not that its equal (to allow for overpayment). As that is somewhat + // surprising, here we check and reject all disagreements in amount. + (Some(inv_amt), Some(req_amt)) if inv_amt != req_amt => { println!( "Amount didn't match invoice value of {}msat", invoice.amount_milli_satoshis().unwrap_or(0) ); print!("> "); return; - } - payment_parameters_from_invoice(invoice) - }; - let (payment_hash, recipient_onion, route_params) = match pay_params_opt { - Ok(res) => res, - Err(e) => { - println!("Failed to parse invoice: {:?}", e); + }, + (Some(inv_amt), _) => inv_amt, + (_, Some(req_amt)) => req_amt, + (None, None) => { + println!("Need an amount to pay an amountless invoice"); print!("> "); return; }, }; - outbound_payments.payments.insert( - payment_id, - PaymentInfo { - preimage: None, - secret: payment_secret, - status: HTLCStatus::Pending, - amt_msat: MillisatAmount(invoice.amount_milli_satoshis()), - }, - ); - fs_store.write("", "", OUTBOUND_PAYMENTS_FNAME, &outbound_payments.encode()).unwrap(); + let write_future = { + let mut outbound_payments = outbound_payments.lock().unwrap(); + outbound_payments.payments.insert( + payment_id, + PaymentInfo { + preimage: None, + secret: payment_secret, + status: HTLCStatus::Pending, + amt_msat: MillisatAmount(Some(amt_msat)), + }, + ); + fs_store.write("", "", OUTBOUND_PAYMENTS_FNAME, outbound_payments.encode()) + }; + write_future.await.unwrap(); - match channel_manager.send_payment( - payment_hash, - recipient_onion, + match channel_manager.pay_for_bolt11_invoice( + invoice, payment_id, - route_params, + required_amount_msat, + RouteParametersConfig::default(), Retry::Timeout(Duration::from_secs(10)), ) { Ok(_) => { let payee_pubkey = invoice.recover_payee_pub_key(); - let amt_msat = invoice.amount_milli_satoshis().unwrap(); println!("EVENT: initiated sending {} msats to {}", amt_msat, payee_pubkey); print!("> "); }, Err(e) => { println!("ERROR: failed to send payment: {:?}", e); print!("> "); - outbound_payments.payments.get_mut(&payment_id).unwrap().status = HTLCStatus::Failed; - fs_store.write("", "", OUTBOUND_PAYMENTS_FNAME, &outbound_payments.encode()).unwrap(); + let write_future = { + let mut outbound_payments = outbound_payments.lock().unwrap(); + outbound_payments.payments.get_mut(&payment_id).unwrap().status = + HTLCStatus::Failed; + fs_store.write("", "", OUTBOUND_PAYMENTS_FNAME, outbound_payments.encode()) + }; + write_future.await.unwrap(); }, }; } -fn keysend( +async fn keysend( channel_manager: &ChannelManager, payee_pubkey: PublicKey, amt_msat: u64, entropy_source: &E, - outbound_payments: &mut OutboundPaymentInfoStorage, fs_store: Arc, + outbound_payments: &Mutex, fs_store: &FilesystemStore, ) { let payment_preimage = PaymentPreimage(entropy_source.get_secure_random_bytes()); let payment_id = PaymentId(Sha256::hash(&payment_preimage.0[..]).to_byte_array()); @@ -903,16 +896,20 @@ fn keysend( PaymentParameters::for_keysend(payee_pubkey, 40, false), amt_msat, ); - outbound_payments.payments.insert( - payment_id, - PaymentInfo { - preimage: None, - secret: None, - status: HTLCStatus::Pending, - amt_msat: MillisatAmount(Some(amt_msat)), - }, - ); - fs_store.write("", "", OUTBOUND_PAYMENTS_FNAME, &outbound_payments.encode()).unwrap(); + let write_future = { + let mut outbound_payments = outbound_payments.lock().unwrap(); + outbound_payments.payments.insert( + payment_id, + PaymentInfo { + preimage: None, + secret: None, + status: HTLCStatus::Pending, + amt_msat: MillisatAmount(Some(amt_msat)), + }, + ); + fs_store.write("", "", OUTBOUND_PAYMENTS_FNAME, outbound_payments.encode()) + }; + write_future.await.unwrap(); match channel_manager.send_spontaneous_payment( Some(payment_preimage), RecipientOnionFields::spontaneous_empty(), @@ -927,8 +924,13 @@ fn keysend( Err(e) => { println!("ERROR: failed to send payment: {:?}", e); print!("> "); - outbound_payments.payments.get_mut(&payment_id).unwrap().status = HTLCStatus::Failed; - fs_store.write("", "", OUTBOUND_PAYMENTS_FNAME, &outbound_payments.encode()).unwrap(); + let write_future = { + let mut outbound_payments = outbound_payments.lock().unwrap(); + outbound_payments.payments.get_mut(&payment_id).unwrap().status = + HTLCStatus::Failed; + fs_store.write("", "", OUTBOUND_PAYMENTS_FNAME, outbound_payments.encode()) + }; + write_future.await.unwrap(); }, }; } diff --git a/src/disk.rs b/src/disk.rs index 00f0ea4d..19a77584 100644 --- a/src/disk.rs +++ b/src/disk.rs @@ -1,15 +1,13 @@ -use crate::{cli, InboundPaymentInfoStorage, NetworkGraph, OutboundPaymentInfoStorage}; -use bitcoin::secp256k1::PublicKey; +use crate::{InboundPaymentInfoStorage, NetworkGraph, OutboundPaymentInfoStorage}; use bitcoin::Network; use chrono::Utc; use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringDecayParameters}; -use lightning::util::hash_tables::{new_hash_map, HashMap}; +use lightning::util::hash_tables::new_hash_map; use lightning::util::logger::{Level, Logger, Record}; use lightning::util::ser::{Readable, ReadableArgs}; use std::fs; use std::fs::File; -use std::io::{BufRead, BufReader, Write}; -use std::net::SocketAddr; +use std::io::{BufReader, Write}; use std::path::Path; use std::sync::Arc; @@ -54,30 +52,6 @@ impl Logger for FilesystemLogger { .unwrap(); } } -pub(crate) fn persist_channel_peer(path: &Path, peer_info: &str) -> std::io::Result<()> { - let mut file = fs::OpenOptions::new().create(true).append(true).open(path)?; - file.write_all(format!("{}\n", peer_info).as_bytes()) -} - -pub(crate) fn read_channel_peer_data( - path: &Path, -) -> Result, std::io::Error> { - let mut peer_data = new_hash_map(); - if !Path::new(&path).exists() { - return Ok(new_hash_map()); - } - let file = File::open(path)?; - let reader = BufReader::new(file); - for line in reader.lines() { - match cli::parse_peer_info(line.unwrap()) { - Ok((pubkey, socket_addr)) => { - peer_data.insert(pubkey, socket_addr); - }, - Err(e) => return Err(e), - } - } - Ok(peer_data) -} pub(crate) fn read_network( path: &Path, network: Network, logger: Arc, diff --git a/src/main.rs b/src/main.rs index 370838da..1b1a2ac7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,7 +16,7 @@ use bitcoin::BlockHash; use bitcoin_bech32::WitnessProgram; use disk::{INBOUND_PAYMENTS_FNAME, OUTBOUND_PAYMENTS_FNAME}; use lightning::chain::{chainmonitor, ChannelMonitorUpdateStatus}; -use lightning::chain::{BestBlock, Filter, Watch}; +use lightning::chain::{BestBlock, Filter}; use lightning::events::bump_transaction::{BumpTransactionEventHandler, Wallet}; use lightning::events::{Event, PaymentFailureReason, PaymentPurpose}; use lightning::ln::channelmanager::{self, RecentPaymentDetails}; @@ -35,23 +35,21 @@ use lightning::routing::gossip; use lightning::routing::gossip::{NodeId, P2PGossipSync}; use lightning::routing::router::DefaultRouter; use lightning::routing::scoring::ProbabilisticScoringFeeParameters; -use lightning::sign::{EntropySource, InMemorySigner, KeysManager}; +use lightning::sign::{EntropySource, InMemorySigner, KeysManager, NodeSigner}; use lightning::types::payment::{PaymentHash, PaymentPreimage, PaymentSecret}; use lightning::util::config::UserConfig; use lightning::util::hash_tables::hash_map::Entry; use lightning::util::hash_tables::HashMap; use lightning::util::persist::{ - self, KVStore, MonitorUpdatingPersister, OUTPUT_SWEEPER_PERSISTENCE_KEY, + self, KVStore, MonitorUpdatingPersisterAsync, OUTPUT_SWEEPER_PERSISTENCE_KEY, OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, }; use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer}; use lightning::util::sweep as ldk_sweep; use lightning::{chain, impl_writeable_tlv_based, impl_writeable_tlv_based_enum}; -use lightning_background_processor::{process_events_async, GossipSync}; -use lightning_block_sync::init; -use lightning_block_sync::poll; -use lightning_block_sync::SpvClient; -use lightning_block_sync::UnboundedCache; +use lightning_background_processor::{process_events_async, GossipSync, NO_LIQUIDITY_MANAGER}; +use lightning_block_sync::gossip::TokioSpawner; +use lightning_block_sync::{init, poll, SpvClient, UnboundedCache}; use lightning_dns_resolver::OMDomainResolver; use lightning_net_tokio::SocketDescriptor; use lightning_persister::fs_store::FilesystemStore; @@ -141,20 +139,20 @@ type ChainMonitor = chainmonitor::ChainMonitor< Arc, Arc, Arc, - Arc< - MonitorUpdatingPersister< - Arc, - Arc, - Arc, - Arc, - Arc, - Arc, - >, + chainmonitor::AsyncPersister< + Arc, + TokioSpawner, + Arc, + Arc, + Arc, + Arc, + Arc, >, + Arc, >; pub(crate) type GossipVerifier = lightning_block_sync::gossip::GossipVerifier< - lightning_block_sync::gossip::TokioSpawner, + TokioSpawner, Arc, Arc, >; @@ -169,6 +167,7 @@ pub(crate) type PeerManager = LdkPeerManager< Arc, IgnoringMessageHandler, Arc, + Arc, >; pub(crate) type ChannelManager = @@ -210,339 +209,373 @@ pub(crate) type OutputSweeper = ldk_sweep::OutputSweeper< // Needed due to rust-lang/rust#63033. struct OutputSweeperWrapper(Arc); -async fn handle_ldk_events( - channel_manager: Arc, bitcoind_client: &BitcoindClient, - network_graph: &NetworkGraph, keys_manager: &KeysManager, - bump_tx_event_handler: &BumpTxEventHandler, peer_manager: Arc, +fn handle_ldk_events<'a>( + channel_manager: Arc, bitcoind_client: &'a BitcoindClient, + network_graph: &'a NetworkGraph, keys_manager: &'a KeysManager, + bump_tx_event_handler: &'a BumpTxEventHandler, peer_manager: Arc, inbound_payments: Arc>, outbound_payments: Arc>, fs_store: Arc, output_sweeper: OutputSweeperWrapper, network: Network, event: Event, -) { - match event { - Event::FundingGenerationReady { - temporary_channel_id, - counterparty_node_id, - channel_value_satoshis, - output_script, - .. - } => { - // Construct the raw transaction with one output, that is paid the amount of the - // channel. - let addr = WitnessProgram::from_scriptpubkey( - &output_script.as_bytes(), - match network { - Network::Bitcoin => bitcoin_bech32::constants::Network::Bitcoin, - Network::Regtest => bitcoin_bech32::constants::Network::Regtest, - Network::Signet => bitcoin_bech32::constants::Network::Signet, - Network::Testnet | _ => bitcoin_bech32::constants::Network::Testnet, - }, - ) - .expect("Lightning funding tx should always be to a SegWit output") - .to_address(); - let mut outputs = vec![StdHashMap::new()]; - outputs[0].insert(addr, channel_value_satoshis as f64 / 100_000_000.0); - let raw_tx = bitcoind_client.create_raw_transaction(outputs).await; - - // Have your wallet put the inputs into the transaction such that the output is - // satisfied. - let funded_tx = bitcoind_client.fund_raw_transaction(raw_tx).await; - - // Sign the final funding transaction and give it to LDK, who will eventually broadcast it. - let signed_tx = bitcoind_client.sign_raw_transaction_with_wallet(funded_tx.hex).await; - assert_eq!(signed_tx.complete, true); - let final_tx: Transaction = - encode::deserialize(&hex_utils::to_vec(&signed_tx.hex).unwrap()).unwrap(); - // Give the funding transaction back to LDK for opening the channel. - if channel_manager - .funding_transaction_generated(temporary_channel_id, counterparty_node_id, final_tx) - .is_err() - { - println!( - "\nERROR: Channel went away before we could fund it. The peer disconnected or refused the channel."); - print!("> "); - std::io::stdout().flush().unwrap(); - } - }, - Event::FundingTxBroadcastSafe { .. } => { - // We don't use the manual broadcasting feature, so this event should never be seen. - }, - Event::PaymentClaimable { payment_hash, purpose, amount_msat, .. } => { - println!( - "\nEVENT: received payment from payment hash {} of {} millisatoshis", - payment_hash, amount_msat, - ); - print!("> "); - std::io::stdout().flush().unwrap(); - let payment_preimage = match purpose { - PaymentPurpose::Bolt11InvoicePayment { payment_preimage, .. } => payment_preimage, - PaymentPurpose::Bolt12OfferPayment { payment_preimage, .. } => payment_preimage, - PaymentPurpose::Bolt12RefundPayment { payment_preimage, .. } => payment_preimage, - PaymentPurpose::SpontaneousPayment(preimage) => Some(preimage), - }; - channel_manager.claim_funds(payment_preimage.unwrap()); - }, - Event::PaymentClaimed { payment_hash, purpose, amount_msat, .. } => { - println!( - "\nEVENT: claimed payment from payment hash {} of {} millisatoshis", - payment_hash, amount_msat, - ); - print!("> "); - std::io::stdout().flush().unwrap(); - let (payment_preimage, payment_secret) = match purpose { - PaymentPurpose::Bolt11InvoicePayment { - payment_preimage, payment_secret, .. - } => (payment_preimage, Some(payment_secret)), - PaymentPurpose::Bolt12OfferPayment { payment_preimage, payment_secret, .. } => { - (payment_preimage, Some(payment_secret)) - }, - PaymentPurpose::Bolt12RefundPayment { - payment_preimage, payment_secret, .. - } => (payment_preimage, Some(payment_secret)), - PaymentPurpose::SpontaneousPayment(preimage) => (Some(preimage), None), - }; - let mut inbound = inbound_payments.lock().unwrap(); - match inbound.payments.entry(payment_hash) { - Entry::Occupied(mut e) => { - let payment = e.get_mut(); - payment.status = HTLCStatus::Succeeded; - payment.preimage = payment_preimage; - payment.secret = payment_secret; - }, - Entry::Vacant(e) => { - e.insert(PaymentInfo { - preimage: payment_preimage, - secret: payment_secret, - status: HTLCStatus::Succeeded, - amt_msat: MillisatAmount(Some(amount_msat)), - }); - }, - } - fs_store.write("", "", INBOUND_PAYMENTS_FNAME, &inbound.encode()).unwrap(); - }, - Event::PaymentSent { - payment_preimage, payment_hash, fee_paid_msat, payment_id, .. - } => { - let mut outbound = outbound_payments.lock().unwrap(); - for (id, payment) in outbound.payments.iter_mut() { - if *id == payment_id.unwrap() { - payment.preimage = Some(payment_preimage); - payment.status = HTLCStatus::Succeeded; +) -> impl core::future::Future + 'a { + async move { + match event { + Event::FundingGenerationReady { + temporary_channel_id, + counterparty_node_id, + channel_value_satoshis, + output_script, + .. + } => { + // Construct the raw transaction with one output, that is paid the amount of the + // channel. + let addr = WitnessProgram::from_scriptpubkey( + &output_script.as_bytes(), + match network { + Network::Bitcoin => bitcoin_bech32::constants::Network::Bitcoin, + Network::Regtest => bitcoin_bech32::constants::Network::Regtest, + Network::Signet => bitcoin_bech32::constants::Network::Signet, + Network::Testnet | _ => bitcoin_bech32::constants::Network::Testnet, + }, + ) + .expect("Lightning funding tx should always be to a SegWit output") + .to_address(); + let mut outputs = vec![StdHashMap::new()]; + outputs[0].insert(addr, channel_value_satoshis as f64 / 100_000_000.0); + let raw_tx = bitcoind_client.create_raw_transaction(outputs).await; + + // Have your wallet put the inputs into the transaction such that the output is + // satisfied. + let funded_tx = bitcoind_client.fund_raw_transaction(raw_tx).await; + + // Sign the final funding transaction and give it to LDK, who will eventually broadcast it. + let signed_tx = + bitcoind_client.sign_raw_transaction_with_wallet(funded_tx.hex).await; + assert_eq!(signed_tx.complete, true); + let final_tx: Transaction = + encode::deserialize(&hex_utils::to_vec(&signed_tx.hex).unwrap()).unwrap(); + // Give the funding transaction back to LDK for opening the channel. + if channel_manager + .funding_transaction_generated( + temporary_channel_id, + counterparty_node_id, + final_tx, + ) + .is_err() + { println!( - "\nEVENT: successfully sent payment of {} millisatoshis{} from \ - payment hash {} with preimage {}", - payment.amt_msat, - if let Some(fee) = fee_paid_msat { - format!(" (fee {} msat)", fee) - } else { - "".to_string() - }, - payment_hash, - payment_preimage - ); + "\nERROR: Channel went away before we could fund it. The peer disconnected or refused the channel."); print!("> "); std::io::stdout().flush().unwrap(); } - } - fs_store.write("", "", OUTBOUND_PAYMENTS_FNAME, &outbound.encode()).unwrap(); - }, - Event::OpenChannelRequest { - ref temporary_channel_id, ref counterparty_node_id, .. - } => { - let mut random_bytes = [0u8; 16]; - random_bytes.copy_from_slice(&keys_manager.get_secure_random_bytes()[..16]); - let user_channel_id = u128::from_be_bytes(random_bytes); - let res = channel_manager.accept_inbound_channel( - temporary_channel_id, - counterparty_node_id, - user_channel_id, - ); - - if let Err(e) = res { - print!( - "\nEVENT: Failed to accept inbound channel ({}) from {}: {:?}", - temporary_channel_id, - hex_utils::hex_str(&counterparty_node_id.serialize()), - e, - ); - } else { - print!( - "\nEVENT: Accepted inbound channel ({}) from {}", - temporary_channel_id, - hex_utils::hex_str(&counterparty_node_id.serialize()), + }, + Event::FundingTxBroadcastSafe { .. } => { + // We don't use the manual broadcasting feature, so this event should never be seen. + }, + Event::PaymentClaimable { payment_hash, purpose, amount_msat, .. } => { + println!( + "\nEVENT: received payment from payment hash {} of {} millisatoshis", + payment_hash, amount_msat, ); - } - print!("> "); - std::io::stdout().flush().unwrap(); - }, - Event::PaymentPathSuccessful { .. } => {}, - Event::PaymentPathFailed { .. } => {}, - Event::ProbeSuccessful { .. } => {}, - Event::ProbeFailed { .. } => {}, - Event::PaymentFailed { payment_hash, reason, payment_id, .. } => { - if let Some(hash) = payment_hash { - print!( - "\nEVENT: Failed to send payment to payment ID {}, payment hash {}: {:?}", - payment_id, - hash, - if let Some(r) = reason { r } else { PaymentFailureReason::RetriesExhausted } + print!("> "); + std::io::stdout().flush().unwrap(); + let payment_preimage = match purpose { + PaymentPurpose::Bolt11InvoicePayment { payment_preimage, .. } => { + payment_preimage + }, + PaymentPurpose::Bolt12OfferPayment { payment_preimage, .. } => payment_preimage, + PaymentPurpose::Bolt12RefundPayment { payment_preimage, .. } => { + payment_preimage + }, + PaymentPurpose::SpontaneousPayment(preimage) => Some(preimage), + }; + channel_manager.claim_funds(payment_preimage.unwrap()); + }, + Event::PaymentClaimed { payment_hash, purpose, amount_msat, .. } => { + println!( + "\nEVENT: claimed payment from payment hash {} of {} millisatoshis", + payment_hash, amount_msat, ); - } else { - print!( - "\nEVENT: Failed fetch invoice for payment ID {}: {:?}", - payment_id, - if let Some(r) = reason { r } else { PaymentFailureReason::RetriesExhausted } + print!("> "); + std::io::stdout().flush().unwrap(); + let (payment_preimage, payment_secret) = match purpose { + PaymentPurpose::Bolt11InvoicePayment { + payment_preimage, + payment_secret, + .. + } => (payment_preimage, Some(payment_secret)), + PaymentPurpose::Bolt12OfferPayment { + payment_preimage, payment_secret, .. + } => (payment_preimage, Some(payment_secret)), + PaymentPurpose::Bolt12RefundPayment { + payment_preimage, + payment_secret, + .. + } => (payment_preimage, Some(payment_secret)), + PaymentPurpose::SpontaneousPayment(preimage) => (Some(preimage), None), + }; + let write_future = { + let mut inbound = inbound_payments.lock().unwrap(); + match inbound.payments.entry(payment_hash) { + Entry::Occupied(mut e) => { + let payment = e.get_mut(); + payment.status = HTLCStatus::Succeeded; + payment.preimage = payment_preimage; + payment.secret = payment_secret; + }, + Entry::Vacant(e) => { + e.insert(PaymentInfo { + preimage: payment_preimage, + secret: payment_secret, + status: HTLCStatus::Succeeded, + amt_msat: MillisatAmount(Some(amount_msat)), + }); + }, + } + fs_store.write("", "", INBOUND_PAYMENTS_FNAME, inbound.encode()) + }; + write_future.await.unwrap(); + }, + Event::PaymentSent { + payment_preimage, + payment_hash, + fee_paid_msat, + payment_id, + .. + } => { + let write_future = { + let mut outbound = outbound_payments.lock().unwrap(); + for (id, payment) in outbound.payments.iter_mut() { + if *id == payment_id.unwrap() { + payment.preimage = Some(payment_preimage); + payment.status = HTLCStatus::Succeeded; + println!( + "\nEVENT: successfully sent payment of {} millisatoshis{} from \ + payment hash {} with preimage {}", + payment.amt_msat, + if let Some(fee) = fee_paid_msat { + format!(" (fee {} msat)", fee) + } else { + "".to_string() + }, + payment_hash, + payment_preimage + ); + print!("> "); + std::io::stdout().flush().unwrap(); + } + } + fs_store.write("", "", OUTBOUND_PAYMENTS_FNAME, outbound.encode()) + }; + write_future.await.unwrap(); + }, + Event::OpenChannelRequest { + ref temporary_channel_id, + ref counterparty_node_id, + .. + } => { + let mut random_bytes = [0u8; 16]; + random_bytes.copy_from_slice(&keys_manager.get_secure_random_bytes()[..16]); + let user_channel_id = u128::from_be_bytes(random_bytes); + let res = channel_manager.accept_inbound_channel( + temporary_channel_id, + counterparty_node_id, + user_channel_id, + None, ); - } - print!("> "); - std::io::stdout().flush().unwrap(); - let mut outbound = outbound_payments.lock().unwrap(); - if outbound.payments.contains_key(&payment_id) { - let payment = outbound.payments.get_mut(&payment_id).unwrap(); - payment.status = HTLCStatus::Failed; - } - fs_store.write("", "", OUTBOUND_PAYMENTS_FNAME, &outbound.encode()).unwrap(); - }, - Event::InvoiceReceived { .. } => { - // We don't use the manual invoice payment logic, so this event should never be seen. - }, - Event::PaymentForwarded { - prev_channel_id, - next_channel_id, - total_fee_earned_msat, - claim_from_onchain_tx, - outbound_amount_forwarded_msat, - .. - } => { - let read_only_network_graph = network_graph.read_only(); - let nodes = read_only_network_graph.nodes(); - let channels = channel_manager.list_channels(); - - let node_str = |channel_id: &Option| match channel_id { - None => String::new(), - Some(channel_id) => match channels.iter().find(|c| c.channel_id == *channel_id) { + if let Err(e) = res { + print!( + "\nEVENT: Failed to accept inbound channel ({}) from {}: {:?}", + temporary_channel_id, + hex_utils::hex_str(&counterparty_node_id.serialize()), + e, + ); + } else { + print!( + "\nEVENT: Accepted inbound channel ({}) from {}", + temporary_channel_id, + hex_utils::hex_str(&counterparty_node_id.serialize()), + ); + } + print!("> "); + std::io::stdout().flush().unwrap(); + }, + Event::PaymentPathSuccessful { .. } => {}, + Event::PaymentPathFailed { .. } => {}, + Event::ProbeSuccessful { .. } => {}, + Event::ProbeFailed { .. } => {}, + Event::PaymentFailed { payment_hash, reason, payment_id, .. } => { + if let Some(hash) = payment_hash { + print!( + "\nEVENT: Failed to send payment to payment ID {}, payment hash {}: {:?}", + payment_id, + hash, + if let Some(r) = reason { + r + } else { + PaymentFailureReason::RetriesExhausted + } + ); + } else { + print!( + "\nEVENT: Failed fetch invoice for payment ID {}: {:?}", + payment_id, + if let Some(r) = reason { + r + } else { + PaymentFailureReason::RetriesExhausted + } + ); + } + print!("> "); + std::io::stdout().flush().unwrap(); + + let write_future = { + let mut outbound = outbound_payments.lock().unwrap(); + if outbound.payments.contains_key(&payment_id) { + let payment = outbound.payments.get_mut(&payment_id).unwrap(); + payment.status = HTLCStatus::Failed; + } + fs_store.write("", "", OUTBOUND_PAYMENTS_FNAME, outbound.encode()) + }; + write_future.await.unwrap(); + }, + Event::InvoiceReceived { .. } => { + // We don't use the manual invoice payment logic, so this event should never be seen. + }, + Event::PaymentForwarded { + prev_channel_id, + next_channel_id, + total_fee_earned_msat, + claim_from_onchain_tx, + outbound_amount_forwarded_msat, + .. + } => { + let read_only_network_graph = network_graph.read_only(); + let nodes = read_only_network_graph.nodes(); + let channels = channel_manager.list_channels(); + + let node_str = |channel_id: &Option| match channel_id { None => String::new(), - Some(channel) => { - match nodes.get(&NodeId::from_pubkey(&channel.counterparty.node_id)) { - None => "private node".to_string(), - Some(node) => match &node.announcement_info { - None => "unnamed node".to_string(), - Some(announcement) => { - format!("node {}", announcement.alias()) + Some(channel_id) => match channels.iter().find(|c| c.channel_id == *channel_id) + { + None => String::new(), + Some(channel) => { + match nodes.get(&NodeId::from_pubkey(&channel.counterparty.node_id)) { + None => "private node".to_string(), + Some(node) => match &node.announcement_info { + None => "unnamed node".to_string(), + Some(announcement) => { + format!("node {}", announcement.alias()) + }, }, - }, - } + } + }, }, - }, - }; - let channel_str = |channel_id: &Option| { - channel_id - .map(|channel_id| format!(" with channel {}", channel_id)) - .unwrap_or_default() - }; - let from_prev_str = - format!(" from {}{}", node_str(&prev_channel_id), channel_str(&prev_channel_id)); - let to_next_str = - format!(" to {}{}", node_str(&next_channel_id), channel_str(&next_channel_id)); - - let from_onchain_str = if claim_from_onchain_tx { - "from onchain downstream claim" - } else { - "from HTLC fulfill message" - }; - let amt_args = if let Some(v) = outbound_amount_forwarded_msat { - format!("{}", v) - } else { - "?".to_string() - }; - if let Some(fee_earned) = total_fee_earned_msat { + }; + let channel_str = |channel_id: &Option| { + channel_id + .map(|channel_id| format!(" with channel {}", channel_id)) + .unwrap_or_default() + }; + let from_prev_str = format!( + " from {}{}", + node_str(&prev_channel_id), + channel_str(&prev_channel_id), + ); + let to_next_str = + format!(" to {}{}", node_str(&next_channel_id), channel_str(&next_channel_id)); + + let from_onchain_str = if claim_from_onchain_tx { + "from onchain downstream claim" + } else { + "from HTLC fulfill message" + }; + let amt_args = if let Some(v) = outbound_amount_forwarded_msat { + format!("{}", v) + } else { + "?".to_string() + }; + if let Some(fee_earned) = total_fee_earned_msat { + println!( + "\nEVENT: Forwarded payment for {} msat{}{}, earning {} msat {}", + amt_args, from_prev_str, to_next_str, fee_earned, from_onchain_str + ); + } else { + println!( + "\nEVENT: Forwarded payment for {} msat{}{}, claiming onchain {}", + amt_args, from_prev_str, to_next_str, from_onchain_str + ); + } + print!("> "); + std::io::stdout().flush().unwrap(); + }, + Event::HTLCHandlingFailed { .. } => {}, + Event::SpendableOutputs { outputs, channel_id } => { + output_sweeper + .0 + .track_spendable_outputs(outputs, channel_id, false, None) + .await + .unwrap(); + }, + Event::ChannelPending { channel_id, counterparty_node_id, .. } => { println!( - "\nEVENT: Forwarded payment for {} msat{}{}, earning {} msat {}", - amt_args, from_prev_str, to_next_str, fee_earned, from_onchain_str + "\nEVENT: Channel {} with peer {} is pending awaiting funding lock-in!", + channel_id, + hex_utils::hex_str(&counterparty_node_id.serialize()), ); - } else { + print!("> "); + std::io::stdout().flush().unwrap(); + }, + Event::ChannelReady { ref channel_id, ref counterparty_node_id, .. } => { println!( - "\nEVENT: Forwarded payment for {} msat{}{}, claiming onchain {}", - amt_args, from_prev_str, to_next_str, from_onchain_str + "\nEVENT: Channel {} with peer {} is ready to be used!", + channel_id, + hex_utils::hex_str(&counterparty_node_id.serialize()), ); - } - print!("> "); - std::io::stdout().flush().unwrap(); - }, - Event::HTLCHandlingFailed { .. } => {}, - Event::PendingHTLCsForwardable { time_forwardable } => { - let forwarding_channel_manager = channel_manager.clone(); - let min = time_forwardable.as_millis() as u64; - tokio::spawn(async move { - let millis_to_sleep = thread_rng().gen_range(min, min * 5) as u64; - tokio::time::sleep(Duration::from_millis(millis_to_sleep)).await; - forwarding_channel_manager.process_pending_htlc_forwards(); - }); - }, - Event::SpendableOutputs { outputs, channel_id } => { - output_sweeper.0.track_spendable_outputs(outputs, channel_id, false, None).unwrap(); - }, - Event::ChannelPending { channel_id, counterparty_node_id, .. } => { - println!( - "\nEVENT: Channel {} with peer {} is pending awaiting funding lock-in!", - channel_id, - hex_utils::hex_str(&counterparty_node_id.serialize()), - ); - print!("> "); - std::io::stdout().flush().unwrap(); - }, - Event::ChannelReady { - ref channel_id, - user_channel_id: _, - ref counterparty_node_id, - channel_type: _, - } => { - println!( - "\nEVENT: Channel {} with peer {} is ready to be used!", - channel_id, - hex_utils::hex_str(&counterparty_node_id.serialize()), - ); - print!("> "); - std::io::stdout().flush().unwrap(); - }, - Event::ChannelClosed { channel_id, reason, counterparty_node_id, .. } => { - println!( - "\nEVENT: Channel {} with counterparty {} closed due to: {:?}", - channel_id, - counterparty_node_id.map(|id| format!("{}", id)).unwrap_or("".to_owned()), - reason - ); - print!("> "); - std::io::stdout().flush().unwrap(); - }, - Event::DiscardFunding { .. } => { - // A "real" node should probably "lock" the UTXOs spent in funding transactions until - // the funding transaction either confirms, or this event is generated. - }, - Event::HTLCIntercepted { .. } => {}, - Event::OnionMessageIntercepted { .. } => { - // We don't use the onion message interception feature, so this event should never be - // seen. - }, - Event::OnionMessagePeerConnected { .. } => { - // We don't use the onion message interception feature, so we have no use for this - // event. - }, - Event::BumpTransaction(event) => bump_tx_event_handler.handle_event(&event), - Event::ConnectionNeeded { node_id, addresses } => { - tokio::spawn(async move { - for address in addresses { - if let Ok(sockaddrs) = address.to_socket_addrs() { - for addr in sockaddrs { - let pm = Arc::clone(&peer_manager); - if cli::connect_peer_if_necessary(node_id, addr, pm).await.is_ok() { - return; + print!("> "); + std::io::stdout().flush().unwrap(); + }, + Event::ChannelClosed { channel_id, reason, counterparty_node_id, .. } => { + println!( + "\nEVENT: Channel {} with counterparty {} closed due to: {:?}", + channel_id, + counterparty_node_id.map(|id| format!("{}", id)).unwrap_or("".to_owned()), + reason + ); + print!("> "); + std::io::stdout().flush().unwrap(); + }, + Event::DiscardFunding { .. } => { + // A "real" node should probably "lock" the UTXOs spent in funding transactions until + // the funding transaction either confirms, or this event is generated. + }, + Event::HTLCIntercepted { .. } => {}, + Event::OnionMessageIntercepted { .. } => { + // We don't use the onion message interception feature, so this event should never be + // seen. + }, + Event::OnionMessagePeerConnected { .. } => { + // We don't use the onion message interception feature, so we have no use for this + // event. + }, + Event::BumpTransaction(event) => bump_tx_event_handler.handle_event(&event).await, + Event::ConnectionNeeded { node_id, addresses } => { + tokio::spawn(async move { + for address in addresses { + if let Ok(sockaddrs) = address.to_socket_addrs() { + for addr in sockaddrs { + let pm = Arc::clone(&peer_manager); + if cli::connect_peer_if_necessary(node_id, addr, pm).await.is_ok() { + return; + } } } } - } - }); - }, + }); + }, + _ => {}, + } } } @@ -633,7 +666,8 @@ async fn start_ldk() { key }; let cur = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap(); - let keys_manager = Arc::new(KeysManager::new(&keys_seed, cur.as_secs(), cur.subsec_nanos())); + let keys_manager = + Arc::new(KeysManager::new(&keys_seed, cur.as_secs(), cur.subsec_nanos(), true)); let bump_tx_event_handler = Arc::new(BumpTransactionEventHandler::new( Arc::clone(&broadcaster), @@ -644,34 +678,37 @@ async fn start_ldk() { // Step 5: Initialize Persistence let fs_store = Arc::new(FilesystemStore::new(ldk_data_dir.clone().into())); - let persister = Arc::new(MonitorUpdatingPersister::new( + let persister = MonitorUpdatingPersisterAsync::new( Arc::clone(&fs_store), + TokioSpawner, Arc::clone(&logger), 1000, Arc::clone(&keys_manager), Arc::clone(&keys_manager), Arc::clone(&bitcoind_client), Arc::clone(&bitcoind_client), - )); + ); // Alternatively, you can use the `FilesystemStore` as a `Persist` directly, at the cost of // larger `ChannelMonitor` update writes (but no deletion or cleanup): //let persister = Arc::clone(&fs_store); - // Step 6: Initialize the ChainMonitor - let chain_monitor: Arc = Arc::new(chainmonitor::ChainMonitor::new( + // Step 6: Read ChannelMonitor state from disk + let mut channelmonitors = persister.read_all_channel_monitors_with_updates().await.unwrap(); + // If you are using the `FilesystemStore` as a `Persist` directly, use + // `lightning::util::persist::read_channel_monitors` like this: + // read_channel_monitors(Arc::clone(&persister), Arc::clone(&keys_manager), Arc::clone(&keys_manager)).unwrap(); + + // Step 7: Initialize the ChainMonitor + let chain_monitor: Arc = Arc::new(chainmonitor::ChainMonitor::new_async_beta( None, Arc::clone(&broadcaster), Arc::clone(&logger), Arc::clone(&fee_estimator), - Arc::clone(&persister), + persister, + Arc::clone(&keys_manager), + keys_manager.get_peer_storage_key(), )); - // Step 7: Read ChannelMonitor state from disk - let mut channelmonitors = persister.read_all_channel_monitors_with_updates().unwrap(); - // If you are using the `FilesystemStore` as a `Persist` directly, use - // `lightning::util::persist::read_channel_monitors` like this: - //read_channel_monitors(Arc::clone(&persister), Arc::clone(&keys_manager), Arc::clone(&keys_manager)).unwrap(); - // Step 8: Poll for the best chain tip, which may be used by the channel manager & spv client let polled_chain_tip = init::validate_best_block_header(bitcoind_client.as_ref()) .await @@ -755,11 +792,14 @@ async fn start_ldk() { }; // Step 12: Initialize the OutputSweeper. - let (sweeper_best_block, output_sweeper) = match fs_store.read( - OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, - OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, - OUTPUT_SWEEPER_PERSISTENCE_KEY, - ) { + let (sweeper_best_block, output_sweeper) = match fs_store + .read( + OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, + OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, + OUTPUT_SWEEPER_PERSISTENCE_KEY, + ) + .await + { Err(e) if e.kind() == io::ErrorKind::NotFound => { let sweeper = OutputSweeper::new( channel_manager.current_best_block(), @@ -800,11 +840,11 @@ async fn start_ldk() { ]; for (blockhash, channel_monitor) in channelmonitors.drain(..) { - let outpoint = channel_monitor.get_funding_txo().0; + let funding_txo = channel_monitor.get_funding_txo(); chain_listener_channel_monitors.push(( blockhash, (channel_monitor, broadcaster.clone(), fee_estimator.clone(), logger.clone()), - outpoint, + funding_txo, )); } @@ -828,11 +868,12 @@ async fn start_ldk() { }; // Step 14: Give ChannelMonitors to ChainMonitor - for item in chain_listener_channel_monitors.drain(..) { - let channel_monitor = item.1 .0; - let funding_outpoint = item.2; + for (_, (channel_monitor, _, _, _), _) in chain_listener_channel_monitors { + let channel_id = channel_monitor.channel_id(); + // Note that this may not return `Completed` for ChannelMonitors which were last written by + // a version of LDK prior to 0.1. assert_eq!( - chain_monitor.watch_channel(funding_outpoint, channel_monitor), + chain_monitor.load_existing_monitor(channel_id, channel_monitor), Ok(ChannelMonitorUpdateStatus::Completed) ); } @@ -867,10 +908,11 @@ async fn start_ldk() { let current_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); rand::thread_rng().fill_bytes(&mut ephemeral_bytes); let lightning_msg_handler = MessageHandler { - chan_handler: channel_manager.clone(), - route_handler: gossip_sync.clone(), - onion_message_handler: onion_messenger.clone(), + chan_handler: Arc::clone(&channel_manager), + route_handler: Arc::clone(&gossip_sync), + onion_message_handler: Arc::clone(&onion_messenger), custom_message_handler: IgnoringMessageHandler {}, + send_only_message_handler: Arc::clone(&chain_monitor), }; let peer_manager: Arc = Arc::new(PeerManager::new( lightning_msg_handler, @@ -883,7 +925,7 @@ async fn start_ldk() { // Install a GossipVerifier in in the P2PGossipSync let utxo_lookup = GossipVerifier::new( Arc::clone(&bitcoind_client.bitcoind_rpc_client), - lightning_block_sync::gossip::TokioSpawner, + TokioSpawner, Arc::clone(&gossip_sync), Arc::clone(&peer_manager), ); @@ -962,7 +1004,8 @@ async fn start_ldk() { } } fs_store - .write("", "", OUTBOUND_PAYMENTS_FNAME, &outbound_payments.lock().unwrap().encode()) + .write("", "", OUTBOUND_PAYMENTS_FNAME, outbound_payments.lock().unwrap().encode()) + .await .unwrap(); // Step 20: Handle LDK Events @@ -1007,21 +1050,20 @@ async fn start_ldk() { } }; - // Step 21: Persist ChannelManager and NetworkGraph - let persister = Arc::new(FilesystemStore::new(ldk_data_dir.clone().into())); - - // Step 22: Background Processing + // Step 21: Background Processing let (bp_exit, bp_exit_check) = tokio::sync::watch::channel(()); let mut background_processor = tokio::spawn(process_events_async( - Arc::clone(&persister), + Arc::clone(&fs_store), event_handler, - chain_monitor.clone(), - channel_manager.clone(), + Arc::clone(&chain_monitor), + Arc::clone(&channel_manager), Some(onion_messenger), - GossipSync::p2p(gossip_sync.clone()), - peer_manager.clone(), - logger.clone(), - Some(scorer.clone()), + GossipSync::p2p(Arc::clone(&gossip_sync)), + Arc::clone(&peer_manager), + NO_LIQUIDITY_MANAGER, + Some(Arc::clone(&output_sweeper)), + Arc::clone(&logger), + Some(Arc::clone(&scorer)), move |t| { let mut bp_exit_fut_check = bp_exit_check.clone(); Box::pin(async move { @@ -1038,37 +1080,47 @@ async fn start_ldk() { // Regularly reconnect to channel peers. let connect_cm = Arc::clone(&channel_manager); let connect_pm = Arc::clone(&peer_manager); - let peer_data_path = format!("{}/channel_peer_data", ldk_data_dir); let stop_connect = Arc::clone(&stop_listen_connect); + let graph_connect = Arc::clone(&network_graph); tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(1)); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); loop { interval.tick().await; - match disk::read_channel_peer_data(Path::new(&peer_data_path)) { - Ok(info) => { - for node_id in connect_cm - .list_channels() - .iter() - .map(|chan| chan.counterparty.node_id) - .filter(|id| connect_pm.peer_by_node_id(id).is_none()) - { - if stop_connect.load(Ordering::Acquire) { - return; - } - for (pubkey, peer_addr) in info.iter() { - if *pubkey == node_id { - let _ = cli::do_connect_peer( - *pubkey, - peer_addr.clone(), - Arc::clone(&connect_pm), - ) - .await; - } - } + for node_id in connect_cm + .list_channels() + .iter() + .map(|chan| chan.counterparty.node_id) + .filter(|id| connect_pm.peer_by_node_id(id).is_none()) + { + if stop_connect.load(Ordering::Acquire) { + return; + } + let id = NodeId::from_pubkey(&node_id); + let addrs = if let Some(node) = graph_connect.read_only().node(&id) { + if let Some(ann) = &node.announcement_info { + let non_onion = |addr| match addr { + &lightning::ln::msgs::SocketAddress::OnionV2(_) => None, + &lightning::ln::msgs::SocketAddress::OnionV3 { .. } => None, + _ => Some(addr.clone()), + }; + ann.addresses().iter().filter_map(non_onion).collect::>() + } else { + Vec::new() + } + } else { + Vec::new() + }; + for addr in addrs { + let sockaddrs = addr.to_socket_addrs(); + if sockaddrs.is_err() { + continue; } - }, - Err(e) => println!("ERROR: errored reading channel peer info from disk: {:?}", e), + for sockaddr in sockaddrs.unwrap() { + let _ = + cli::do_connect_peer(node_id, sockaddr, Arc::clone(&connect_pm)).await; + } + } } } }); @@ -1102,28 +1154,25 @@ async fn start_ldk() { ldk_data_dir.clone(), Arc::clone(&keys_manager), Arc::clone(&logger), - Arc::clone(&persister), + Arc::clone(&fs_store), Arc::clone(&output_sweeper), )); // Start the CLI. let cli_channel_manager = Arc::clone(&channel_manager); let cli_chain_monitor = Arc::clone(&chain_monitor); - let cli_persister = Arc::clone(&persister); + let cli_fs_store = Arc::clone(&fs_store); let cli_peer_manager = Arc::clone(&peer_manager); - let cli_poll = tokio::task::spawn_blocking(move || { - cli::poll_for_user_input( - cli_peer_manager, - cli_channel_manager, - cli_chain_monitor, - keys_manager, - network_graph, - inbound_payments, - outbound_payments, - ldk_data_dir, - cli_persister, - ) - }); + let cli_poll = tokio::task::spawn(cli::poll_for_user_input( + cli_peer_manager, + cli_channel_manager, + cli_chain_monitor, + keys_manager, + network_graph, + inbound_payments, + outbound_payments, + cli_fs_store, + )); // Exit if either CLI polling exits or the background processor exits (which shouldn't happen // unless we fail to write to the filesystem). @@ -1141,13 +1190,14 @@ async fn start_ldk() { peer_manager.disconnect_all_peers(); if let Err(e) = bg_res { - let persist_res = persister + let persist_res = fs_store .write( persist::CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, persist::CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, persist::CHANNEL_MANAGER_PERSISTENCE_KEY, - &channel_manager.encode(), + channel_manager.encode(), ) + .await .unwrap(); use lightning::util::logger::Logger; lightning::log_error!( diff --git a/src/sweep.rs b/src/sweep.rs index 7ab3b824..4dd21ee4 100644 --- a/src/sweep.rs +++ b/src/sweep.rs @@ -62,7 +62,8 @@ pub(crate) async fn migrate_deprecated_spendable_outputs( if !outputs.is_empty() { let key = hex_utils::hex_str(&keys_manager.get_secure_random_bytes()); persister - .write("spendable_outputs", "", &key, &WithoutLength(&outputs).encode()) + .write("spendable_outputs", "", &key, WithoutLength(&outputs).encode()) + .await .unwrap(); fs::remove_dir_all(&processing_spendables_dir).unwrap(); } @@ -90,7 +91,7 @@ pub(crate) async fn migrate_deprecated_spendable_outputs( } let spend_delay = Some(best_block.height + 2); - sweeper.track_spendable_outputs(outputs.clone(), None, false, spend_delay).unwrap(); + sweeper.track_spendable_outputs(outputs.clone(), None, false, spend_delay).await.unwrap(); fs::remove_dir_all(&spendables_dir).unwrap(); fs::remove_dir_all(&pending_spendables_dir).unwrap();