diff --git a/Cargo.toml b/Cargo.toml index e8364c909..60ebccf02 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,18 +40,18 @@ default = [] #lightning-macros = { version = "0.2.0" } #lightning-dns-resolver = { version = "0.3.0" } -lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "38a62c32454d3eac22578144c479dbf9a6d9bff6", features = ["std"] } -lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "38a62c32454d3eac22578144c479dbf9a6d9bff6" } -lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "38a62c32454d3eac22578144c479dbf9a6d9bff6", features = ["std"] } -lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "38a62c32454d3eac22578144c479dbf9a6d9bff6" } -lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "38a62c32454d3eac22578144c479dbf9a6d9bff6", features = ["tokio"] } -lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "38a62c32454d3eac22578144c479dbf9a6d9bff6" } -lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "38a62c32454d3eac22578144c479dbf9a6d9bff6" } -lightning-block-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "38a62c32454d3eac22578144c479dbf9a6d9bff6", features = ["rest-client", "rpc-client", "tokio"] } -lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "38a62c32454d3eac22578144c479dbf9a6d9bff6", features = ["esplora-async-https", "time", "electrum-rustls-ring"] } -lightning-liquidity = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "38a62c32454d3eac22578144c479dbf9a6d9bff6", features = ["std"] } -lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "38a62c32454d3eac22578144c479dbf9a6d9bff6" } -lightning-dns-resolver = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "38a62c32454d3eac22578144c479dbf9a6d9bff6" } +lightning = { git = "https://github.com/jkczyz/rust-lightning", rev = "86dcedebe380737cbed0dd1d4230b4bc1e90dd05", features = ["std"] } +lightning-types = { git = "https://github.com/jkczyz/rust-lightning", rev = "86dcedebe380737cbed0dd1d4230b4bc1e90dd05" } +lightning-invoice = { git = "https://github.com/jkczyz/rust-lightning", rev = "86dcedebe380737cbed0dd1d4230b4bc1e90dd05", features = ["std"] } +lightning-net-tokio = { git = "https://github.com/jkczyz/rust-lightning", rev = "86dcedebe380737cbed0dd1d4230b4bc1e90dd05" } +lightning-persister = { git = "https://github.com/jkczyz/rust-lightning", rev = "86dcedebe380737cbed0dd1d4230b4bc1e90dd05", features = ["tokio"] } +lightning-background-processor = { git = "https://github.com/jkczyz/rust-lightning", rev = "86dcedebe380737cbed0dd1d4230b4bc1e90dd05" } +lightning-rapid-gossip-sync = { git = "https://github.com/jkczyz/rust-lightning", rev = "86dcedebe380737cbed0dd1d4230b4bc1e90dd05" } +lightning-block-sync = { git = "https://github.com/jkczyz/rust-lightning", rev = "86dcedebe380737cbed0dd1d4230b4bc1e90dd05", features = ["rest-client", "rpc-client", "tokio"] } +lightning-transaction-sync = { git = "https://github.com/jkczyz/rust-lightning", rev = "86dcedebe380737cbed0dd1d4230b4bc1e90dd05", features = ["esplora-async-https", "time", "electrum-rustls-ring"] } +lightning-liquidity = { git = "https://github.com/jkczyz/rust-lightning", rev = "86dcedebe380737cbed0dd1d4230b4bc1e90dd05", features = ["std"] } +lightning-macros = { git = "https://github.com/jkczyz/rust-lightning", rev = "86dcedebe380737cbed0dd1d4230b4bc1e90dd05" } +lightning-dns-resolver = { git = "https://github.com/jkczyz/rust-lightning", rev = "86dcedebe380737cbed0dd1d4230b4bc1e90dd05" } bdk_chain = { version = "0.23.0", default-features = false, features = ["std"] } bdk_esplora = { version = "0.22.0", default-features = false, features = ["async-https-rustls", "tokio"]} @@ -81,13 +81,13 @@ async-trait = { version = "0.1", default-features = false } vss-client = { package = "vss-client-ng", version = "0.5" } prost = { version = "0.11.6", default-features = false} #bitcoin-payment-instructions = { version = "0.6" } -bitcoin-payment-instructions = { git = "https://github.com/jkczyz/bitcoin-payment-instructions", rev = "a7b32d5fded9bb45f73bf82e6d7187adf705171c" } +bitcoin-payment-instructions = { git = "https://github.com/jkczyz/bitcoin-payment-instructions", rev = "91b60116d87e19b42c06bcdf1cbf1affb566ffc2" } [target.'cfg(windows)'.dependencies] winapi = { version = "0.3", features = ["winbase"] } [dev-dependencies] -lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "38a62c32454d3eac22578144c479dbf9a6d9bff6", features = ["std", "_test_utils"] } +lightning = { git = "https://github.com/jkczyz/rust-lightning", rev = "86dcedebe380737cbed0dd1d4230b4bc1e90dd05", features = ["std", "_test_utils"] } rand = { version = "0.9.2", default-features = false, features = ["std", "thread_rng", "os_rng"] } proptest = "1.0.0" regex = "1.5.6" diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index c32604708..6138e6d54 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -123,6 +123,8 @@ interface Node { [Throws=NodeError] void splice_out([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id, [ByRef]Address address, u64 splice_amount_sats); [Throws=NodeError] + void rbf_channel([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id); + [Throws=NodeError] void close_channel([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id); [Throws=NodeError] void force_close_channel([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id, string? reason); @@ -233,12 +235,6 @@ enum NodeError { typedef dictionary NodeStatus; -[Remote] -dictionary BestBlock { - BlockHash block_hash; - u32 height; -}; - typedef enum BuildError; [Trait, WithForeign] diff --git a/src/builder.rs b/src/builder.rs index b0ff1d03b..f45a32c25 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -19,10 +19,10 @@ use bdk_wallet::{KeychainKind, Wallet as BdkWallet}; use bitcoin::bip32::{ChildNumber, Xpriv}; use bitcoin::key::Secp256k1; use bitcoin::secp256k1::PublicKey; -use bitcoin::{BlockHash, Network}; +use bitcoin::Network; use bitcoin_payment_instructions::dns_resolver::DNSHrnResolver; use bitcoin_payment_instructions::onion_message_resolver::LDKOnionMessageDNSSECHrnResolver; -use lightning::chain::{chainmonitor, BestBlock}; +use lightning::chain::{chainmonitor, BestBlock as BlockLocator}; use lightning::ln::channelmanager::{self, ChainParameters, ChannelManagerReadArgs}; use lightning::ln::msgs::{RoutingMessageHandler, SocketAddress}; use lightning::ln::peer_handler::{IgnoringMessageHandler, MessageHandler}; @@ -1496,6 +1496,8 @@ fn build_with_store_internal( Arc::clone(&pending_payment_store), )); + tx_broadcaster.set_wallet(Arc::downgrade(&wallet)); + // Initialize the KeysManager let cur_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).map_err(|e| { log_error!(logger, "Failed to get current time: {}", e); @@ -1657,11 +1659,6 @@ fn build_with_store_internal( // If we act as an LSPS2 service, we allow forwarding to unannounced channels. user_config.accept_forwards_to_priv_channels = true; - - // If we act as an LSPS2 service, set the HTLC-value-in-flight to 100% of the channel value - // to ensure we can forward the initial payment. - user_config.channel_handshake_config.max_inbound_htlc_value_in_flight_percent_of_channel = - 100; } if let Some(role) = async_payments_role { @@ -1695,8 +1692,8 @@ fn build_with_store_internal( user_config, channel_monitor_references, ); - let (_hash, channel_manager) = - <(BlockHash, ChannelManager)>::read(&mut &*reader, read_args).map_err(|e| { + let (_best_block, channel_manager) = + <(BlockLocator, ChannelManager)>::read(&mut &*reader, read_args).map_err(|e| { log_error!(logger, "Failed to read channel manager from store: {}", e); BuildError::ReadFailed })?; @@ -1704,7 +1701,7 @@ fn build_with_store_internal( } else { // We're starting a fresh node. let best_block = - chain_tip_opt.unwrap_or_else(|| BestBlock::from_network(config.network)); + chain_tip_opt.unwrap_or_else(|| BlockLocator::from_network(config.network)); let chain_params = ChainParameters { network: config.network.into(), best_block }; channelmanager::ChannelManager::new( diff --git a/src/chain/bitcoind.rs b/src/chain/bitcoind.rs index 2bf059f4e..7ece757ae 100644 --- a/src/chain/bitcoind.rs +++ b/src/chain/bitcoind.rs @@ -5,7 +5,7 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. -use std::collections::{HashMap, VecDeque}; +use std::collections::HashMap; use std::fmt; use std::future::Future; use std::sync::atomic::{AtomicU64, Ordering}; @@ -16,7 +16,7 @@ use base64::prelude::BASE64_STANDARD; use base64::Engine; use bitcoin::{BlockHash, FeeRate, Network, OutPoint, Transaction, Txid}; use lightning::chain::chaininterface::ConfirmationTarget as LdkConfirmationTarget; -use lightning::chain::{BestBlock, Listen}; +use lightning::chain::{BestBlock as BlockLocator, Listen}; use lightning::util::ser::Writeable; use lightning_block_sync::gossip::UtxoSource; use lightning_block_sync::http::{HttpClientError, JsonResponse}; @@ -25,7 +25,7 @@ use lightning_block_sync::poll::{ChainPoller, ChainTip, ValidatedBlockHeader}; use lightning_block_sync::rest::RestClient; use lightning_block_sync::rpc::{RpcClient, RpcClientError}; use lightning_block_sync::{ - BlockData, BlockHeaderData, BlockSource, BlockSourceError, BlockSourceErrorKind, Cache, + BlockData, BlockHeaderData, BlockSource, BlockSourceError, BlockSourceErrorKind, HeaderCache, SpvClient, }; use serde::Serialize; @@ -49,7 +49,6 @@ const CHAIN_POLLING_TIMEOUT_SECS: u64 = 10; pub(super) struct BitcoindChainSource { api_client: Arc, - header_cache: tokio::sync::Mutex, latest_chain_tip: RwLock>, wallet_polling_status: Mutex, fee_estimator: Arc, @@ -72,12 +71,10 @@ impl BitcoindChainSource { rpc_password.clone(), )); - let header_cache = tokio::sync::Mutex::new(BoundedHeaderCache::new()); let latest_chain_tip = RwLock::new(None); let wallet_polling_status = Mutex::new(WalletSyncStatus::Completed); Self { api_client, - header_cache, latest_chain_tip, wallet_polling_status, fee_estimator, @@ -103,13 +100,11 @@ impl BitcoindChainSource { rpc_password, )); - let header_cache = tokio::sync::Mutex::new(BoundedHeaderCache::new()); let latest_chain_tip = RwLock::new(None); let wallet_polling_status = Mutex::new(WalletSyncStatus::Completed); Self { api_client, - header_cache, latest_chain_tip, wallet_polling_status, fee_estimator, @@ -153,14 +148,14 @@ impl BitcoindChainSource { return; } - let channel_manager_best_block_hash = channel_manager.current_best_block().block_hash; - let sweeper_best_block_hash = output_sweeper.current_best_block().block_hash; - let onchain_wallet_best_block_hash = onchain_wallet.current_best_block().block_hash; + let onchain_wallet_best_block = onchain_wallet.current_best_block(); + let channel_manager_best_block = channel_manager.current_best_block(); + let sweeper_best_block = output_sweeper.current_best_block(); let mut chain_listeners = vec![ - (onchain_wallet_best_block_hash, &*onchain_wallet as &(dyn Listen + Send + Sync)), - (channel_manager_best_block_hash, &*channel_manager as &(dyn Listen + Send + Sync)), - (sweeper_best_block_hash, &*output_sweeper as &(dyn Listen + Send + Sync)), + (onchain_wallet_best_block, &*onchain_wallet as &(dyn Listen + Send + Sync)), + (channel_manager_best_block, &*channel_manager as &(dyn Listen + Send + Sync)), + (sweeper_best_block, &*output_sweeper as &(dyn Listen + Send + Sync)), ]; // TODO: Eventually we might want to see if we can synchronize `ChannelMonitor`s @@ -168,31 +163,28 @@ impl BitcoindChainSource { // trivial as we load them on initialization (in the `Builder`) and only gain // network access during `start`. For now, we just make sure we get the worst known // block hash and sychronize them via `ChainMonitor`. - if let Some(worst_channel_monitor_block_hash) = chain_monitor + if let Some(worst_channel_monitor_best_block) = chain_monitor .list_monitors() .iter() .flat_map(|channel_id| chain_monitor.get_monitor(*channel_id)) .map(|m| m.current_best_block()) .min_by_key(|b| b.height) - .map(|b| b.block_hash) { chain_listeners.push(( - worst_channel_monitor_block_hash, + worst_channel_monitor_best_block, &*chain_monitor as &(dyn Listen + Send + Sync), )); } - let mut locked_header_cache = self.header_cache.lock().await; let now = SystemTime::now(); match synchronize_listeners( self.api_client.as_ref(), self.config.network, - &mut *locked_header_cache, chain_listeners.clone(), ) .await { - Ok(chain_tip) => { + Ok((_header_cache, chain_tip)) => { { let elapsed_ms = now.elapsed().map(|d| d.as_millis()).unwrap_or(0); log_info!( @@ -333,7 +325,7 @@ impl BitcoindChainSource { } } - pub(super) async fn poll_best_block(&self) -> Result { + pub(super) async fn poll_best_block(&self) -> Result { self.poll_chain_tip().await.map(|tip| tip.to_best_block()) } @@ -400,7 +392,6 @@ impl BitcoindChainSource { let chain_tip = if let Some(tip) = latest_chain_tip_opt { tip } else { self.poll_chain_tip().await? }; - let mut locked_header_cache = self.header_cache.lock().await; let chain_poller = ChainPoller::new(Arc::clone(&self.api_client), self.config.network); let chain_listener = ChainListener { onchain_wallet: Arc::clone(&onchain_wallet), @@ -409,7 +400,7 @@ impl BitcoindChainSource { output_sweeper, }; let mut spv_client = - SpvClient::new(chain_tip, chain_poller, &mut *locked_header_cache, &chain_listener); + SpvClient::new(chain_tip, chain_poller, HeaderCache::new(), &chain_listener); let now = SystemTime::now(); match spv_client.poll_best_tip().await { @@ -1350,46 +1341,6 @@ pub(crate) enum FeeRateEstimationMode { Conservative, } -const MAX_HEADER_CACHE_ENTRIES: usize = 100; - -pub(crate) struct BoundedHeaderCache { - header_map: HashMap, - recently_seen: VecDeque, -} - -impl BoundedHeaderCache { - pub(crate) fn new() -> Self { - let header_map = HashMap::new(); - let recently_seen = VecDeque::new(); - Self { header_map, recently_seen } - } -} - -impl Cache for BoundedHeaderCache { - fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> { - self.header_map.get(block_hash) - } - - fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) { - self.recently_seen.push_back(block_hash); - self.header_map.insert(block_hash, block_header); - - if self.header_map.len() >= MAX_HEADER_CACHE_ENTRIES { - // Keep dropping old entries until we've actually removed a header entry. - while let Some(oldest_entry) = self.recently_seen.pop_front() { - if self.header_map.remove(&oldest_entry).is_some() { - break; - } - } - } - } - - fn block_disconnected(&mut self, block_hash: &BlockHash) -> Option { - self.recently_seen.retain(|e| e != block_hash); - self.header_map.remove(block_hash) - } -} - pub(crate) struct ChainListener { pub(crate) onchain_wallet: Arc, pub(crate) channel_manager: Arc, diff --git a/src/chain/mod.rs b/src/chain/mod.rs index 537ee04d3..00b868d2b 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -14,7 +14,7 @@ use std::sync::{Arc, Mutex, RwLock}; use std::time::Duration; use bitcoin::{Script, Txid}; -use lightning::chain::{BestBlock, Filter}; +use lightning::chain::{BestBlock as BlockLocator, Filter}; use crate::chain::bitcoind::{BitcoindChainSource, UtxoSourceClient}; use crate::chain::electrum::ElectrumChainSource; @@ -24,7 +24,7 @@ use crate::config::{ WALLET_SYNC_INTERVAL_MINIMUM_SECS, }; use crate::fee_estimator::OnchainFeeEstimator; -use crate::logger::{log_debug, log_info, log_trace, LdkLogger, Logger}; +use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; use crate::runtime::Runtime; use crate::types::{Broadcaster, ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet}; use crate::{Error, NodeMetrics}; @@ -101,7 +101,7 @@ impl ChainSource { fee_estimator: Arc, tx_broadcaster: Arc, kv_store: Arc, config: Arc, logger: Arc, node_metrics: Arc>, - ) -> Result<(Self, Option), ()> { + ) -> Result<(Self, Option), ()> { let esplora_chain_source = EsploraChainSource::new( server_url, headers, @@ -122,7 +122,7 @@ impl ChainSource { fee_estimator: Arc, tx_broadcaster: Arc, kv_store: Arc, config: Arc, logger: Arc, node_metrics: Arc>, - ) -> (Self, Option) { + ) -> (Self, Option) { let electrum_chain_source = ElectrumChainSource::new( server_url, sync_config, @@ -142,7 +142,7 @@ impl ChainSource { fee_estimator: Arc, tx_broadcaster: Arc, kv_store: Arc, config: Arc, logger: Arc, node_metrics: Arc>, - ) -> (Self, Option) { + ) -> (Self, Option) { let bitcoind_chain_source = BitcoindChainSource::new_rpc( rpc_host, rpc_port, @@ -165,7 +165,7 @@ impl ChainSource { fee_estimator: Arc, tx_broadcaster: Arc, kv_store: Arc, config: Arc, rest_client_config: BitcoindRestClientConfig, logger: Arc, node_metrics: Arc>, - ) -> (Self, Option) { + ) -> (Self, Option) { let bitcoind_chain_source = BitcoindChainSource::new_rest( rpc_host, rpc_port, @@ -453,15 +453,26 @@ impl ChainSource { return; } Some(next_package) = receiver.recv() => { + let txs = match self.tx_broadcaster.classify_package(next_package).await { + Ok(txs) => txs, + Err(e) => { + log_error!( + tx_bcast_logger, + "Skipping broadcast: failed to persist payment records: {:?}", + e, + ); + continue; + }, + }; match &self.kind { ChainSourceKind::Esplora(esplora_chain_source) => { - esplora_chain_source.process_broadcast_package(next_package).await + esplora_chain_source.process_broadcast_package(txs).await }, ChainSourceKind::Electrum(electrum_chain_source) => { - electrum_chain_source.process_broadcast_package(next_package).await + electrum_chain_source.process_broadcast_package(txs).await }, ChainSourceKind::Bitcoind(bitcoind_chain_source) => { - bitcoind_chain_source.process_broadcast_package(next_package).await + bitcoind_chain_source.process_broadcast_package(txs).await }, } } diff --git a/src/event.rs b/src/event.rs index 3161daa2a..3b4205165 100644 --- a/src/event.rs +++ b/src/event.rs @@ -26,9 +26,7 @@ use lightning::ln::channelmanager::{PaymentId, TrustedChannelFeatures}; use lightning::ln::types::ChannelId; use lightning::routing::gossip::NodeId; use lightning::sign::EntropySource; -use lightning::util::config::{ - ChannelConfigOverrides, ChannelConfigUpdate, ChannelHandshakeConfigUpdate, -}; +use lightning::util::config::{ChannelConfigOverrides, ChannelConfigUpdate}; use lightning::util::errors::APIError; use lightning::util::persist::KVStore; use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer}; @@ -1273,19 +1271,12 @@ where if lsp_node_id == counterparty_node_id { // When we're an LSPS2 client, allow claiming underpaying HTLCs as the LSP will skim off some fee. We'll // check that they don't take too much before claiming. - // - // We also set maximum allowed inbound HTLC value in flight - // to 100%. We should eventually be able to set this on a per-channel basis, but for - // now we just bump the default for all channels. channel_override_config = Some(ChannelConfigOverrides { - handshake_overrides: Some(ChannelHandshakeConfigUpdate { - max_inbound_htlc_value_in_flight_percent_of_channel: Some(100), - ..Default::default() - }), update_overrides: Some(ChannelConfigUpdate { accept_underpaying_htlcs: Some(true), ..Default::default() }), + ..Default::default() }); } } @@ -1527,6 +1518,18 @@ where ); } + if let Err(e) = + self.wallet.handle_channel_ready(channel_id, funding_txo.map(|txo| txo.txid)) + { + log_error!( + self.logger, + "Failed to graduate funding payment on ChannelReady for channel {}: {:?}", + channel_id, + e, + ); + return Err(ReplayEvent()); + } + if let Some(liquidity_source) = self.liquidity_source.as_ref() { liquidity_source .handle_channel_ready(user_channel_id, &channel_id, &counterparty_node_id) @@ -1556,6 +1559,16 @@ where } => { log_info!(self.logger, "Channel {} closed due to: {}", channel_id, reason); + if let Err(e) = self.wallet.handle_channel_closed(channel_id) { + log_error!( + self.logger, + "Failed to handle ChannelClosed for channel {}: {:?}", + channel_id, + e, + ); + return Err(ReplayEvent()); + } + let event = Event::ChannelClosed { channel_id, user_channel_id: UserChannelId(user_channel_id), diff --git a/src/lib.rs b/src/lib.rs index faeb6d339..d51389f1e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -117,6 +117,7 @@ pub use balance::{BalanceDetails, LightningBalance, PendingSweepBalance}; pub use bip39; pub use bitcoin; use bitcoin::secp256k1::PublicKey; +use bitcoin::BlockHash; #[cfg(feature = "uniffi")] pub use bitcoin::FeeRate; #[cfg(not(feature = "uniffi"))] @@ -145,7 +146,7 @@ use gossip::GossipSource; use graph::NetworkGraph; use io::utils::update_and_persist_node_metrics; pub use lightning; -use lightning::chain::BestBlock; +use lightning::chain::BestBlock as BlockLocator; use lightning::impl_writeable_tlv_based; use lightning::ln::chan_utils::FUNDING_TRANSACTION_WITNESS_WEIGHT; use lightning::ln::channel_state::ChannelDetails as LdkChannelDetails; @@ -1194,14 +1195,6 @@ impl Node { let mut user_config = default_user_config(&self.config); user_config.channel_handshake_config.announce_for_forwarding = announce_for_forwarding; user_config.channel_config = (channel_config.unwrap_or_default()).clone().into(); - // We set the max inflight to 100% for private channels. - // FIXME: LDK will default to this behavior soon, too, at which point we should drop this - // manual override. - if !announce_for_forwarding { - user_config - .channel_handshake_config - .max_inbound_htlc_value_in_flight_percent_of_channel = 100; - } let push_msat = push_to_counterparty_msat.unwrap_or(0); let user_channel_id: u128 = u128::from_ne_bytes( @@ -1577,6 +1570,14 @@ impl Node { Error::ChannelSplicingFailed })?; + if funding_template.min_rbf_feerate().is_some() { + log_error!( + self.logger, + "Failed to splice channel: pending splice requires RBF, use rbf_channel instead" + ); + return Err(Error::ChannelSplicingFailed); + } + let contribution = self .runtime .block_on(funding_template.splice_in( @@ -1690,6 +1691,14 @@ impl Node { Error::ChannelSplicingFailed })?; + if funding_template.min_rbf_feerate().is_some() { + log_error!( + self.logger, + "Failed to splice channel: pending splice requires RBF, use rbf_channel instead" + ); + return Err(Error::ChannelSplicingFailed); + } + let outputs = vec![bitcoin::TxOut { value: Amount::from_sat(splice_amount_sats), script_pubkey: address.script_pubkey(), @@ -1729,6 +1738,69 @@ impl Node { } } + /// Replace a pending splice's funding transaction with a higher-feerate version. + /// + /// If a prior splice negotiation is pending, this bumps its feerate via RBF. The prior + /// contribution is reused when possible; otherwise, coin selection is re-run. + /// + /// # Experimental API + /// + /// This API is experimental and may change in the future. + pub fn rbf_channel( + &self, user_channel_id: &UserChannelId, counterparty_node_id: PublicKey, + ) -> Result<(), Error> { + let open_channels = + self.channel_manager.list_channels_with_counterparty(&counterparty_node_id); + if let Some(channel_details) = + open_channels.iter().find(|c| c.user_channel_id == user_channel_id.0) + { + let min_feerate = + self.fee_estimator.estimate_fee_rate(ConfirmationTarget::ChannelFunding); + let max_feerate = FeeRate::from_sat_per_kwu(min_feerate.to_sat_per_kwu() * 3 / 2); + + let funding_template = self + .channel_manager + .splice_channel(&channel_details.channel_id, &counterparty_node_id) + .map_err(|e| { + log_error!(self.logger, "Failed to RBF channel: {:?}", e); + Error::ChannelSplicingFailed + })?; + + if funding_template.min_rbf_feerate().is_none() { + log_error!(self.logger, "Failed to RBF channel: no pending splice to replace"); + return Err(Error::ChannelSplicingFailed); + } + + let contribution = self + .runtime + .block_on(funding_template.rbf(max_feerate, Arc::clone(&self.wallet))) + .map_err(|e| { + log_error!(self.logger, "Failed to RBF channel: {}", e); + Error::ChannelSplicingFailed + })?; + + self.channel_manager + .funding_contributed( + &channel_details.channel_id, + &counterparty_node_id, + contribution, + None, + ) + .map_err(|e| { + log_error!(self.logger, "Failed to RBF channel: {:?}", e); + Error::ChannelSplicingFailed + }) + } else { + log_error!( + self.logger, + "Channel not found for user_channel_id {} and counterparty {}", + user_channel_id, + counterparty_node_id + ); + Err(Error::ChannelSplicingFailed) + } + } + /// Manually sync the LDK and BDK wallets with the current chain state and update the fee rate /// cache. /// @@ -2056,6 +2128,22 @@ impl Drop for Node { } } +/// The best known block as identified by its hash and height. +#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)] +#[cfg_attr(feature = "uniffi", derive(uniffi::Record))] +pub struct BestBlock { + /// The block's hash. + pub block_hash: BlockHash, + /// The height at which the block was confirmed. + pub height: u32, +} + +impl From for BestBlock { + fn from(locator: BlockLocator) -> Self { + Self { block_hash: locator.block_hash, height: locator.height } + } +} + /// Represents the status of the [`Node`]. #[derive(Clone, Debug, PartialEq, Eq)] #[cfg_attr(feature = "uniffi", derive(uniffi::Record))] diff --git a/src/liquidity.rs b/src/liquidity.rs index 9f02af886..224bceab0 100644 --- a/src/liquidity.rs +++ b/src/liquidity.rs @@ -781,11 +781,12 @@ where let mut config = self.channel_manager.get_current_config().clone(); - // We set these LSP-specific values during Node building, here we're making sure it's actually set. + // If we act as an LSPS2 service, the HTLC-value-in-flight must be 100% of the + // channel value to ensure we can forward the initial payment. debug_assert_eq!( config .channel_handshake_config - .max_inbound_htlc_value_in_flight_percent_of_channel, + .unannounced_channel_max_inbound_htlc_value_in_flight_percentage, 100 ); debug_assert!(config.accept_forwards_to_priv_channels); diff --git a/src/payment/pending_payment_store.rs b/src/payment/pending_payment_store.rs index eb72f89ec..425f9182c 100644 --- a/src/payment/pending_payment_store.rs +++ b/src/payment/pending_payment_store.rs @@ -5,14 +5,82 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. +use bitcoin::secp256k1::PublicKey; use bitcoin::Txid; use lightning::impl_writeable_tlv_based; +use lightning::impl_writeable_tlv_based_enum; use lightning::ln::channelmanager::PaymentId; +use lightning::ln::funding::FundingContribution; +use lightning::ln::types::ChannelId; use crate::data_store::{StorableObject, StorableObjectUpdate}; use crate::payment::store::PaymentDetailsUpdate; use crate::payment::PaymentDetails; +/// Identifies which channel lifecycle event a [`FundingDetails`] record tracks. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum FundingPurpose { + /// The funding transaction opens a channel. + Establishment, + /// The funding transaction splices an already-open channel. + Splice, +} + +impl_writeable_tlv_based_enum!(FundingPurpose, + (0, Establishment) => {}, + (2, Splice) => {} +); + +/// One broadcast of a funding transaction (channel open or splice) in which this node +/// had a stake. +/// +/// When an RBF produces multiple candidates for the same [`FundingDetails`], each +/// broadcast is recorded as its own [`FundingCandidate`] so that whichever candidate +/// actually confirms can be identified and its contribution values restored onto the +/// outer [`PaymentDetails`]. +/// +/// `contribution` is set for dual-funded opens and splices, where the local node submits +/// a [`FundingContribution`] describing its inputs, outputs, and fee share. It is `None` +/// for single-funded opens, which have exactly one candidate and no alternative to swap +/// to. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct FundingCandidate { + /// Transaction ID of this broadcast. + pub txid: Txid, + /// The contribution used to build this candidate, if any. + pub contribution: Option, +} + +impl_writeable_tlv_based!(FundingCandidate, { + (0, txid, required), + (2, contribution, option), +}); + +/// Marks an on-chain payment as belonging to a channel's funding lifecycle (open or +/// splice), and carries the per-candidate state needed to react to RBF replacements. +/// +/// The candidate whose `txid` matches the outer [`PaymentDetails`]`::kind.txid` is the +/// one currently reflected by the payment's `amount_msat` and `fee_paid_msat`. On RBF, a +/// new candidate is appended and becomes active. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct FundingDetails { + /// The channel whose funding is being tracked. + pub channel_id: ChannelId, + /// The channel's counterparty. + pub counterparty_node_id: PublicKey, + /// Whether this funding is for a channel open or a splice. + pub purpose: FundingPurpose, + /// Broadcast candidates, in the order they were observed. + pub candidates: Vec, +} + +impl_writeable_tlv_based!(FundingDetails, { + (0, channel_id, required), + (2, counterparty_node_id, required), + (4, purpose, required), + (6, candidates, optional_vec), +}); + /// Represents a pending payment #[derive(Clone, Debug, PartialEq, Eq)] pub struct PendingPaymentDetails { @@ -20,11 +88,24 @@ pub struct PendingPaymentDetails { pub details: PaymentDetails, /// Transaction IDs that have replaced or conflict with this payment. pub conflicting_txids: Vec, + /// Set when the payment's transaction is a channel funding (open or splice). The + /// record transitions to [`PaymentStatus::Succeeded`] on `ChannelReady` instead of + /// after [`ANTI_REORG_DELAY`] confirmations. + /// + /// [`PaymentStatus::Succeeded`]: crate::payment::store::PaymentStatus::Succeeded + /// [`ANTI_REORG_DELAY`]: lightning::chain::channelmonitor::ANTI_REORG_DELAY + pub funding_details: Option, } impl PendingPaymentDetails { pub(crate) fn new(details: PaymentDetails, conflicting_txids: Vec) -> Self { - Self { details, conflicting_txids } + Self { details, conflicting_txids, funding_details: None } + } + + pub(crate) fn with_funding_details( + details: PaymentDetails, conflicting_txids: Vec, funding_details: FundingDetails, + ) -> Self { + Self { details, conflicting_txids, funding_details: Some(funding_details) } } /// Convert to finalized payment for the main payment store @@ -36,6 +117,7 @@ impl PendingPaymentDetails { impl_writeable_tlv_based!(PendingPaymentDetails, { (0, details, required), (2, conflicting_txids, optional_vec), + (4, funding_details, option), }); #[derive(Clone, Debug, PartialEq, Eq)] @@ -43,6 +125,7 @@ pub(crate) struct PendingPaymentDetailsUpdate { pub id: PaymentId, pub payment_update: Option, pub conflicting_txids: Option>, + pub funding_details: Option>, } impl StorableObject for PendingPaymentDetails { @@ -68,6 +151,13 @@ impl StorableObject for PendingPaymentDetails { } } + if let Some(new_funding_details) = update.funding_details { + if self.funding_details != new_funding_details { + self.funding_details = new_funding_details; + updated = true; + } + } + updated } @@ -89,6 +179,11 @@ impl From<&PendingPaymentDetails> for PendingPaymentDetailsUpdate { } else { Some(value.conflicting_txids.clone()) }; - Self { id: value.id(), payment_update: Some(value.details.to_update()), conflicting_txids } + Self { + id: value.id(), + payment_update: Some(value.details.to_update()), + conflicting_txids, + funding_details: Some(value.funding_details.clone()), + } } } diff --git a/src/tx_broadcaster.rs b/src/tx_broadcaster.rs index 7084135b0..58045a62f 100644 --- a/src/tx_broadcaster.rs +++ b/src/tx_broadcaster.rs @@ -6,21 +6,35 @@ // accordance with one or both of these licenses. use std::ops::Deref; +use std::sync::{Mutex as StdMutex, Weak}; use bitcoin::Transaction; use lightning::chain::chaininterface::{BroadcasterInterface, TransactionType}; use tokio::sync::{mpsc, Mutex, MutexGuard}; use crate::logger::{log_error, LdkLogger}; +use crate::types::Wallet; +use crate::Error; const BCAST_PACKAGE_QUEUE_SIZE: usize = 50; +/// A package of transactions that LDK handed to the broadcaster in one +/// `broadcast_transactions` call, along with each transaction's type. Queued until the +/// background task classifies and broadcasts it. +pub(crate) type BroadcastPackage = Vec<(Transaction, TransactionType)>; + pub(crate) struct TransactionBroadcaster where L::Target: LdkLogger, { - queue_sender: mpsc::Sender>, - queue_receiver: Mutex>>, + queue_sender: mpsc::Sender, + queue_receiver: Mutex>, + /// Weak handle to the [`Wallet`] that performs classification of funding broadcasts + /// (channel opens and splices) into payment records. Remains `None` while the + /// builder is wiring the node up, during which broadcasts are forwarded to the + /// queue but no payment record is written. [`Self::set_wallet`] installs the handle + /// once the [`Wallet`] exists. + wallet: StdMutex>>, logger: L, } @@ -30,14 +44,49 @@ where { pub(crate) fn new(logger: L) -> Self { let (queue_sender, queue_receiver) = mpsc::channel(BCAST_PACKAGE_QUEUE_SIZE); - Self { queue_sender, queue_receiver: Mutex::new(queue_receiver), logger } + Self { + queue_sender, + queue_receiver: Mutex::new(queue_receiver), + wallet: StdMutex::new(None), + logger, + } + } + + /// Installs the [`Wallet`] handle used to classify funding broadcasts (channel + /// opens and splices) into payment records. Called once the builder has constructed + /// both the broadcaster and the wallet. + pub(crate) fn set_wallet(&self, wallet: Weak) { + *self.wallet.lock().expect("lock") = Some(wallet); } pub(crate) async fn get_broadcast_queue( &self, - ) -> MutexGuard<'_, mpsc::Receiver>> { + ) -> MutexGuard<'_, mpsc::Receiver> { self.queue_receiver.lock().await } + + /// Classifies a queued package into payment records and returns the raw + /// transactions ready for the chain client. Returns `Err` if any classification + /// fails; callers must not broadcast the package in that case, since a crash would + /// leave the tx on-chain without a record. + pub(crate) async fn classify_package( + &self, package: BroadcastPackage, + ) -> Result, Error> { + let wallet_opt = self.wallet.lock().expect("lock").as_ref().and_then(Weak::upgrade); + if let Some(wallet) = wallet_opt { + let package = tokio::task::spawn_blocking(move || { + for (tx, tx_type) in &package { + wallet.classify_broadcast(tx, tx_type)?; + } + Ok::<_, Error>(package) + }) + .await + .map_err(|_| Error::PersistenceFailed)??; + Ok(package.into_iter().map(|(tx, _)| tx).collect()) + } else { + Ok(package.into_iter().map(|(tx, _)| tx).collect()) + } + } } impl BroadcasterInterface for TransactionBroadcaster @@ -45,7 +94,8 @@ where L::Target: LdkLogger, { fn broadcast_transactions(&self, txs: &[(&Transaction, TransactionType)]) { - let package = txs.iter().map(|(t, _)| (*t).clone()).collect::>(); + let package: BroadcastPackage = + txs.iter().map(|(tx, tx_type)| ((*tx).clone(), tx_type.clone())).collect(); self.queue_sender.try_send(package).unwrap_or_else(|e| { log_error!(self.logger, "Failed to broadcast transactions: {}", e); }); diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index cb982e303..2c93a2d52 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -5,6 +5,7 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. +use std::collections::HashMap; use std::future::Future; use std::ops::Deref; use std::str::FromStr; @@ -32,15 +33,16 @@ use bitcoin::{ WitnessProgram, WitnessVersion, }; use lightning::chain::chaininterface::{ - BroadcasterInterface, INCREMENTAL_RELAY_FEE_SAT_PER_1000_WEIGHT, + BroadcasterInterface, TransactionType, INCREMENTAL_RELAY_FEE_SAT_PER_1000_WEIGHT, }; use lightning::chain::channelmonitor::ANTI_REORG_DELAY; -use lightning::chain::{BestBlock, ClaimId, Listen}; +use lightning::chain::{BestBlock as BlockLocator, ClaimId, Listen}; use lightning::ln::channelmanager::PaymentId; -use lightning::ln::funding::FundingTxInput; +use lightning::ln::funding::{FundingContribution, FundingTxInput}; use lightning::ln::inbound_payment::ExpandedKey; use lightning::ln::msgs::UnsignedGossipMessage; use lightning::ln::script::ShutdownScript; +use lightning::ln::types::ChannelId as LnChannelId; use lightning::sign::{ ChangeDestinationSource, EntropySource, InMemorySigner, KeysManager, NodeSigner, OutputSpender, PeerStorageKey, Recipient, SignerProvider, SpendableOutputDescriptor, @@ -55,6 +57,7 @@ use persist::KVStoreWalletPersister; use crate::config::Config; use crate::fee_estimator::{ConfirmationTarget, FeeEstimator, OnchainFeeEstimator}; use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; +use crate::payment::pending_payment_store::{FundingCandidate, FundingDetails, FundingPurpose}; use crate::payment::store::ConfirmationStatus; use crate::payment::{ PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, PendingPaymentDetails, @@ -136,9 +139,16 @@ impl Wallet { .collect() } - pub(crate) fn current_best_block(&self) -> BestBlock { + pub(crate) fn current_best_block(&self) -> BlockLocator { let checkpoint = self.inner.lock().expect("lock").latest_checkpoint(); - BestBlock { block_hash: checkpoint.hash(), height: checkpoint.height() } + let mut current_block = Some(checkpoint.clone()); + let previous_blocks = std::array::from_fn(|_| { + let child = current_block.take()?; + let parent = child.prev().filter(|cp| cp.height() + 1 == child.height())?; + current_block = Some(parent.clone()); + Some(parent.hash()) + }); + BlockLocator { block_hash: checkpoint.hash(), height: checkpoint.height(), previous_blocks } } pub(crate) fn apply_update(&self, update: impl Into) -> Result<(), Error> { @@ -243,18 +253,9 @@ impl Wallet { for event in events { match event { WalletEvent::TxConfirmed { txid, tx, block_time, .. } => { - let cur_height = locked_wallet.latest_checkpoint().height(); - let confirmation_height = block_time.block_id.height; - let payment_status = if cur_height >= confirmation_height + ANTI_REORG_DELAY - 1 - { - PaymentStatus::Succeeded - } else { - PaymentStatus::Pending - }; - let confirmation_status = ConfirmationStatus::Confirmed { block_hash: block_time.block_id.hash, - height: confirmation_height, + height: block_time.block_id.height, timestamp: block_time.confirmation_time, }; @@ -262,6 +263,23 @@ impl Wallet { .find_payment_by_txid(txid) .unwrap_or_else(|| PaymentId(txid.to_byte_array())); + if self.apply_funding_details_status_update( + payment_id, + txid, + confirmation_status, + )? { + continue; + } + + let cur_height = locked_wallet.latest_checkpoint().height(); + let confirmation_height = block_time.block_id.height; + let payment_status = if cur_height >= confirmation_height + ANTI_REORG_DELAY - 1 + { + PaymentStatus::Succeeded + } else { + PaymentStatus::Pending + }; + let payment = self.create_payment_from_tx( locked_wallet, txid, @@ -271,13 +289,12 @@ impl Wallet { confirmation_status, ); - self.payment_store.insert_or_update(payment.clone())?; - if payment_status == PaymentStatus::Pending { let pending_payment = self.create_pending_payment_from_tx(payment, Vec::new()); - - self.pending_payment_store.insert_or_update(pending_payment)?; + self.persist_pending(pending_payment)?; + } else { + self.payment_store.insert_or_update(payment)?; } }, WalletEvent::ChainTipChanged { new_tip, .. } => { @@ -288,8 +305,11 @@ impl Wallet { "Non-pending payment {:?} found in pending store", p.details.id, ); + // Funding records complete on `ChannelReady`, not after + // `ANTI_REORG_DELAY` confirmations. p.details.status == PaymentStatus::Pending && matches!(p.details.kind, PaymentKind::Onchain { .. }) + && p.funding_details.is_none() }); let mut unconfirmed_outbound_txids: Vec = Vec::new(); @@ -350,6 +370,14 @@ impl Wallet { .find_payment_by_txid(txid) .unwrap_or_else(|| PaymentId(txid.to_byte_array())); + if self.apply_funding_details_status_update( + payment_id, + txid, + ConfirmationStatus::Unconfirmed, + )? { + continue; + } + let payment = self.create_payment_from_tx( locked_wallet, txid, @@ -358,10 +386,8 @@ impl Wallet { PaymentStatus::Pending, ConfirmationStatus::Unconfirmed, ); - let pending_payment = - self.create_pending_payment_from_tx(payment.clone(), Vec::new()); - self.payment_store.insert_or_update(payment)?; - self.pending_payment_store.insert_or_update(pending_payment)?; + let pending_payment = self.create_pending_payment_from_tx(payment, Vec::new()); + self.persist_pending(pending_payment)?; }, WalletEvent::TxReplaced { txid, conflicts, .. } => { let Some(payment_id) = self.find_payment_by_txid(txid) else { @@ -397,6 +423,15 @@ impl Wallet { let payment_id = self .find_payment_by_txid(txid) .unwrap_or_else(|| PaymentId(txid.to_byte_array())); + + if self.apply_funding_details_status_update( + payment_id, + txid, + ConfirmationStatus::Unconfirmed, + )? { + continue; + } + let payment = self.create_payment_from_tx( locked_wallet, txid, @@ -405,10 +440,8 @@ impl Wallet { PaymentStatus::Pending, ConfirmationStatus::Unconfirmed, ); - let pending_payment = - self.create_pending_payment_from_tx(payment.clone(), Vec::new()); - self.payment_store.insert_or_update(payment)?; - self.pending_payment_store.insert_or_update(pending_payment)?; + let pending_payment = self.create_pending_payment_from_tx(payment, Vec::new()); + self.persist_pending(pending_payment)?; }, _ => { continue; @@ -1076,9 +1109,12 @@ impl Wallet { let mut psbt = Psbt::from_unsigned_tx(unsigned_tx).map_err(|e| { log_error!(self.logger, "Failed to construct PSBT: {}", e); })?; + // Use list_output rather than get_utxo to include outputs spent by unconfirmed + // transactions (e.g., a prior splice being replaced via RBF). + let mut wallet_outputs: HashMap = + locked_wallet.list_output().map(|o| (o.outpoint, o)).collect(); for (i, txin) in psbt.unsigned_tx.input.iter().enumerate() { - if let Some(utxo) = locked_wallet.get_utxo(txin.previous_output) { - debug_assert!(!utxo.is_spent); + if let Some(utxo) = wallet_outputs.remove(&txin.previous_output) { psbt.inputs[i] = locked_wallet.get_psbt_input(utxo, None, true).map_err(|e| { log_error!(self.logger, "Failed to construct PSBT input: {}", e); })?; @@ -1136,6 +1172,41 @@ impl Wallet { Ok(tx) } + /// Computes the amount, fee, and direction of an on-chain payment from the + /// wallet's view of the transaction. Used by [`TransactionBroadcaster`] to + /// describe a single-funded channel-open, for which no [`FundingContribution`] + /// is available. + /// + /// [`TransactionBroadcaster`]: crate::tx_broadcaster::TransactionBroadcaster + /// [`FundingContribution`]: lightning::ln::funding::FundingContribution + pub(crate) fn onchain_payment_fields( + &self, tx: &Transaction, + ) -> (Option, Option, PaymentDirection) { + let locked_wallet = self.inner.lock().expect("lock"); + let fee = locked_wallet.calculate_fee(tx).unwrap_or(Amount::ZERO); + let (sent, received) = locked_wallet.sent_and_received(tx); + let fee_sat = fee.to_sat(); + + let (direction, amount_msat) = if sent > received { + ( + PaymentDirection::Outbound, + Some( + (sent.to_sat().saturating_sub(fee_sat).saturating_sub(received.to_sat())) + * 1000, + ), + ) + } else { + ( + PaymentDirection::Inbound, + Some( + received.to_sat().saturating_sub(sent.to_sat().saturating_sub(fee_sat)) * 1000, + ), + ) + }; + + (amount_msat, Some(fee_sat * 1000), direction) + } + fn create_payment_from_tx( &self, locked_wallet: &PersistedWallet, txid: Txid, payment_id: PaymentId, tx: &Transaction, payment_status: PaymentStatus, @@ -1192,6 +1263,219 @@ impl Wallet { PendingPaymentDetails::new(payment, conflicting_txids) } + /// Writes a [`PendingPaymentDetails`] and its inner [`PaymentDetails`] to their + /// respective stores in a fixed order. Callers that need to keep the two stores in + /// sync should always go through this. + fn persist_pending(&self, pending: PendingPaymentDetails) -> Result<(), Error> { + self.payment_store.insert_or_update(pending.details.clone())?; + self.pending_payment_store.insert_or_update(pending)?; + Ok(()) + } + + /// Called on `ChannelReady` to mark a funding payment (channel open or splice) as + /// succeeded. + /// + /// If `funding_txo_txid` matches a candidate other than the currently-active one, + /// that candidate is promoted to active first and the outer [`PaymentDetails`] is + /// updated from its contribution. If no candidate matches (the confirmed funding + /// txid belongs to a broadcast this node didn't contribute to), the pending record + /// is left in place for later handling. + pub(crate) fn handle_channel_ready( + &self, channel_id: LnChannelId, funding_txo_txid: Option, + ) -> Result<(), Error> { + let funding_txo_txid = match funding_txo_txid { + Some(t) => t, + None => return Ok(()), + }; + + let mut pending = match self + .pending_payment_store + .list_filter(|p| { + p.funding_details.as_ref().map(|fd| fd.channel_id == channel_id).unwrap_or(false) + }) + .into_iter() + .next() + { + Some(p) => p, + None => return Ok(()), + }; + let funding_details = match pending.funding_details.clone() { + Some(fd) => fd, + None => return Ok(()), + }; + + let candidate = match funding_details.candidates.iter().find(|c| c.txid == funding_txo_txid) + { + Some(c) => c.clone(), + None => { + // Confirmed `funding_txo` wasn't produced by any of our broadcasts. The + // record is left alone; some higher-level flow decides what to do. + log_debug!( + self.logger, + "ChannelReady for channel {}: confirmed funding_txo {} is not one of our candidates", + channel_id, + funding_txo_txid, + ); + return Ok(()); + }, + }; + + let old_txid = match pending.details.kind { + PaymentKind::Onchain { txid, .. } => txid, + _ => { + debug_assert!(false, "funding record must use PaymentKind::Onchain"); + return Ok(()); + }, + }; + + if old_txid != funding_txo_txid { + if !pending.conflicting_txids.contains(&old_txid) { + pending.conflicting_txids.push(old_txid); + } + pending.conflicting_txids.retain(|t| *t != funding_txo_txid); + + if let Some(contribution) = candidate.contribution.as_ref() { + pending.details.amount_msat = contribution_amount_msat(contribution); + pending.details.fee_paid_msat = Some(our_actual_fee_msat(contribution)); + } + } + + // Preserve the confirmation status already on the record (set by wallet sync if + // it's seen the tx confirm). `ChannelReady` alone doesn't carry block details. + let existing_status = match pending.details.kind { + PaymentKind::Onchain { status, .. } => status, + _ => ConfirmationStatus::Unconfirmed, + }; + pending.details.kind = + PaymentKind::Onchain { txid: funding_txo_txid, status: existing_status }; + + pending.details.status = PaymentStatus::Succeeded; + let payment_id = pending.details.id; + self.payment_store.insert_or_update(pending.details)?; + self.pending_payment_store.remove(&payment_id)?; + + Ok(()) + } + + /// Called on `ChannelClosed`. Removes any funding record (channel open or splice) + /// for `channel_id` whose candidates never reached confirmed — e.g. a funding + /// transaction that never made it on-chain. A record that does reflect a confirmed + /// transaction is left alone and will transition to `Succeeded` normally. + pub(crate) fn handle_channel_closed(&self, channel_id: LnChannelId) -> Result<(), Error> { + let pending = match self + .pending_payment_store + .list_filter(|p| { + p.funding_details.as_ref().map(|fd| fd.channel_id == channel_id).unwrap_or(false) + }) + .into_iter() + .next() + { + Some(p) => p, + None => return Ok(()), + }; + + let is_confirmed = matches!( + pending.details.kind, + PaymentKind::Onchain { status: ConfirmationStatus::Confirmed { .. }, .. } + ); + if is_confirmed { + return Ok(()); + } + + let payment_id = pending.details.id; + self.pending_payment_store.remove(&payment_id)?; + self.payment_store.remove(&payment_id)?; + Ok(()) + } + + /// Updates a funding record's `kind` in response to a wallet-sync event, swapping + /// the active candidate when `event_txid` differs from the current one. + /// + /// Amount, fee, and direction are not recomputed from the wallet's view: they were + /// set at broadcast time from the `FundingContribution` and must persist until + /// `ChannelReady`. + /// + /// Returns `true` when a funding record was updated (so the caller skips the + /// default Onchain create/update path), `false` otherwise. + fn apply_funding_details_status_update( + &self, payment_id: PaymentId, event_txid: Txid, confirmation_status: ConfirmationStatus, + ) -> Result { + // `ChannelReady` may move the payment to the main store before wallet sync + // sees the tx confirm. In that case, update `kind` directly; recomputing from + // the wallet's view would overwrite the per-node fee set at broadcast time. + if let Some(mut existing) = self.payment_store.get(&payment_id) { + if existing.status == PaymentStatus::Succeeded + && matches!(existing.kind, PaymentKind::Onchain { .. }) + && self.pending_payment_store.get(&payment_id).is_none() + { + let needs_update = match existing.kind { + PaymentKind::Onchain { txid, status } => { + txid != event_txid || status != confirmation_status + }, + _ => false, + }; + if needs_update { + existing.kind = + PaymentKind::Onchain { txid: event_txid, status: confirmation_status }; + self.payment_store.insert_or_update(existing)?; + } + return Ok(true); + } + } + + let mut pending = match self.pending_payment_store.get(&payment_id) { + Some(p) => p, + None => return Ok(false), + }; + let funding_details = match pending.funding_details.as_ref() { + Some(fd) => fd, + None => return Ok(false), + }; + + let candidate = match funding_details.candidates.iter().find(|c| c.txid == event_txid) { + Some(c) => c.clone(), + None => { + log_debug!( + self.logger, + "Event txid {} resolved to funding_details payment {} but is not in candidates", + event_txid, + payment_id, + ); + return Ok(false); + }, + }; + + let old_txid = match pending.details.kind { + PaymentKind::Onchain { txid, .. } => txid, + _ => { + debug_assert!(false, "funding_details record must use PaymentKind::Onchain"); + return Ok(false); + }, + }; + + if old_txid != event_txid { + // A different candidate confirmed. Move the previous active txid onto + // `conflicting_txids` and re-derive amount/fee from the new candidate's + // contribution. + if !pending.conflicting_txids.contains(&old_txid) { + pending.conflicting_txids.push(old_txid); + } + pending.conflicting_txids.retain(|t| *t != event_txid); + + if let Some(contribution) = candidate.contribution.as_ref() { + pending.details.amount_msat = contribution_amount_msat(contribution); + pending.details.fee_paid_msat = Some(our_actual_fee_msat(contribution)); + } + } + + pending.details.kind = + PaymentKind::Onchain { txid: event_txid, status: confirmation_status }; + + self.persist_pending(pending)?; + + Ok(true) + } + fn find_payment_by_txid(&self, target_txid: Txid) -> Option { let direct_payment_id = PaymentId(target_txid.to_byte_array()); if self.pending_payment_store.contains_key(&direct_payment_id) { @@ -1203,12 +1487,28 @@ impl Wallet { .list_filter(|p| { matches!(p.details.kind, PaymentKind::Onchain { txid, .. } if txid == target_txid) || p.conflicting_txids.contains(&target_txid) + || p.funding_details + .as_ref() + .map(|fd| fd.candidates.iter().any(|c| c.txid == target_txid)) + .unwrap_or(false) }) .first() { return Some(replaced_details.details.id); } + // Once moved to the main store, a funding payment is still matched by its + // confirmed txid so late wallet events resolve correctly. + if let Some(p) = self + .payment_store + .list_filter( + |p| matches!(p.kind, PaymentKind::Onchain { txid, .. } if txid == target_txid), + ) + .first() + { + return Some(p.id); + } + None } @@ -1407,16 +1707,284 @@ impl Wallet { ConfirmationStatus::Unconfirmed, ); - let pending_payment_store = - self.create_pending_payment_from_tx(new_payment.clone(), Vec::new()); - - self.pending_payment_store.insert_or_update(pending_payment_store)?; - self.payment_store.insert_or_update(new_payment)?; + let pending_payment = self.create_pending_payment_from_tx(new_payment, Vec::new()); + self.persist_pending(pending_payment)?; log_info!(self.logger, "RBF successful: replaced {} with {}", txid, new_txid); Ok(new_txid) } + + // TODO: `classify_funding` and `classify_splice` assume they are invoked once per + // transaction. LDK currently calls `broadcast_transactions` exactly once per signed + // funding or splice tx, so the assumption holds. If upstream adds rebroadcasting of + // unconfirmed funding/splice txs, add idempotency guards (e.g. skip when the txid is + // already tracked via `find_payment_by_txid`, and broaden the "last candidate" check + // in `classify_splice` to "any candidate"). + pub(crate) fn classify_broadcast( + &self, tx: &Transaction, tx_type: &TransactionType, + ) -> Result<(), Error> { + match tx_type { + TransactionType::Funding { channels } => self.classify_funding(tx, channels), + TransactionType::Splice { + counterparty_node_id, + channel_id, + contribution, + replaced_txid, + } => self.classify_splice( + tx, + *channel_id, + *counterparty_node_id, + contribution.as_ref(), + *replaced_txid, + ), + _ => Ok(()), + } + } + + fn classify_funding( + &self, tx: &Transaction, channels: &[(PublicKey, LnChannelId)], + ) -> Result<(), Error> { + // Batch funding (one transaction funding multiple channels) isn't supported; let + // wallet sync record the payment normally so graduation still runs through + // ANTI_REORG_DELAY. + if channels.len() != 1 { + if channels.len() > 1 { + log_trace!( + self.logger, + "Skipping funding classification for batched broadcast ({} channels)", + channels.len() + ); + } + return Ok(()); + } + + let (counterparty_node_id, channel_id) = channels[0]; + let txid = tx.compute_txid(); + let (amount_msat, fee_paid_msat, direction) = self.onchain_payment_fields(tx); + + let candidate = FundingCandidate { txid, contribution: None }; + + let details = PaymentDetails::new( + PaymentId(txid.to_byte_array()), + PaymentKind::Onchain { txid, status: ConfirmationStatus::Unconfirmed }, + amount_msat, + fee_paid_msat, + direction, + PaymentStatus::Pending, + ); + + let funding_details = FundingDetails { + channel_id, + counterparty_node_id, + purpose: FundingPurpose::Establishment, + candidates: vec![candidate], + }; + + let pending = + PendingPaymentDetails::with_funding_details(details, Vec::new(), funding_details); + + self.persist_pending(pending)?; + log_debug!( + self.logger, + "Recorded channel-funding broadcast {} for channel {}", + txid, + channel_id, + ); + Ok(()) + } + + fn classify_splice( + &self, tx: &Transaction, channel_id: LnChannelId, counterparty_node_id: PublicKey, + contribution: Option<&FundingContribution>, replaced_txid: Option, + ) -> Result<(), Error> { + // Only record splices where this node contributed. A counterparty-only candidate + // that gets replaced by one of ours is captured via `replaced_txid` on our first + // contributing broadcast. + let contribution = match contribution { + Some(c) => c.clone(), + None => return Ok(()), + }; + + let txid = tx.compute_txid(); + + // Skip broadcasts that don't move funds in or out of our on-chain wallet — e.g. a + // splice-out we initiated toward an external address. Recording such a tx would + // surface a zero-valued payment that doesn't correspond to any wallet activity. + let (wallet_amount_msat, _wallet_fee_msat, wallet_direction) = + self.onchain_payment_fields(tx); + if wallet_amount_msat == Some(0) { + log_trace!( + self.logger, + "Skipping splice broadcast {} for channel {}: no wallet-level activity", + txid, + channel_id, + ); + return Ok(()); + } + // A splice that both adds and removes value in the same transaction isn't + // currently reachable from ldk-node's API; skip it so we don't record a + // misleading direction/amount. + if contribution_amount_msat(&contribution).is_none() { + log_trace!( + self.logger, + "Skipping mixed splice-in-and-out broadcast {} for channel {}", + txid, + channel_id, + ); + return Ok(()); + } + // Use the wallet's view for direction and amount so a splice-out paid to our own + // address lands as Inbound with the received amount. The fee is computed from the + // `FundingContribution` itself (see [`our_actual_fee_msat`]). + let amount_msat = wallet_amount_msat.unwrap_or(0); + let fee_paid_msat = our_actual_fee_msat(&contribution); + let direction = wallet_direction; + + let existing = find_splice_pending_for_channel(&self.pending_payment_store, channel_id); + + match existing { + None => { + let candidate = FundingCandidate { txid, contribution: Some(contribution) }; + + let details = PaymentDetails::new( + PaymentId(txid.to_byte_array()), + PaymentKind::Onchain { txid, status: ConfirmationStatus::Unconfirmed }, + Some(amount_msat), + Some(fee_paid_msat), + direction, + PaymentStatus::Pending, + ); + + let funding_details = FundingDetails { + channel_id, + counterparty_node_id, + purpose: FundingPurpose::Splice, + candidates: vec![candidate], + }; + + let conflicting_txids = replaced_txid.into_iter().collect(); + let pending = PendingPaymentDetails::with_funding_details( + details, + conflicting_txids, + funding_details, + ); + + self.persist_pending(pending)?; + log_debug!( + self.logger, + "Recorded splice broadcast {} for channel {}", + txid, + channel_id, + ); + }, + Some(mut pending) => { + let mut funding_details = pending.funding_details.clone().expect("present"); + if funding_details.candidates.last().map(|c| c.txid) == Some(txid) { + return Ok(()); + } + + let old_txid = match &pending.details.kind { + PaymentKind::Onchain { txid, .. } => *txid, + _ => { + debug_assert!(false, "splice record must use PaymentKind::Onchain"); + return Ok(()); + }, + }; + + funding_details + .candidates + .push(FundingCandidate { txid, contribution: Some(contribution) }); + + if !pending.conflicting_txids.contains(&old_txid) { + pending.conflicting_txids.push(old_txid); + } + + pending.details.kind = + PaymentKind::Onchain { txid, status: ConfirmationStatus::Unconfirmed }; + pending.details.amount_msat = Some(amount_msat); + pending.details.fee_paid_msat = Some(fee_paid_msat); + pending.funding_details = Some(funding_details); + + self.persist_pending(pending)?; + log_debug!( + self.logger, + "Recorded splice RBF broadcast {} for channel {} (replaces {})", + txid, + channel_id, + old_txid, + ); + }, + } + + Ok(()) + } +} + +/// Returns this node's share of the on-chain fee for a funding transaction (channel +/// open or splice), in millisatoshis. +/// +/// When the contribution includes wallet inputs, the fee is whatever's left after the +/// contribution's outputs, change, and value added to the channel: +/// +/// ```text +/// our_fee = sum(inputs) - sum(outputs) - change - value_added +/// ``` +/// +/// This is exact: the change output was picked during coin selection so the identity +/// holds, and LDK re-balances it whenever the contribution's role (initiator vs. +/// acceptor) is finalized. +/// +/// A pure splice-out contributes no wallet inputs (the fee comes out of the channel +/// balance instead), so the identity above doesn't apply; fall back to the +/// [`FundingContribution::estimated_fee`] LDK computed for that case. +fn our_actual_fee_msat(contribution: &FundingContribution) -> u64 { + if contribution.inputs().is_empty() { + return contribution.estimated_fee().to_sat() * 1000; + } + let inputs_sum: Amount = contribution.inputs().iter().map(|i| i.output().value).sum(); + let outputs_sum: Amount = contribution.outputs().iter().map(|o| o.value).sum(); + let change: Amount = contribution.change_output().map(|o| o.value).unwrap_or(Amount::ZERO); + let value_added = contribution.value_added(); + inputs_sum + .checked_sub(outputs_sum) + .and_then(|a| a.checked_sub(change)) + .and_then(|a| a.checked_sub(value_added)) + .map(|a| a.to_sat() * 1000) + .unwrap_or(0) +} + +/// Returns the amount a [`FundingContribution`] moves between this node's on-chain +/// wallet and its channel balance, in millisatoshis. `None` for a mixed contribution +/// (both adding and removing value) or an empty one, which can't be classified as a +/// single inbound or outbound payment. +fn contribution_amount_msat(contribution: &FundingContribution) -> Option { + let value_added = contribution.value_added(); + let outputs_total: Amount = contribution.outputs().iter().map(|o| o.value).sum(); + + if value_added > Amount::ZERO && outputs_total == Amount::ZERO { + Some(value_added.to_sat() * 1000) + } else if value_added == Amount::ZERO && outputs_total > Amount::ZERO { + Some(outputs_total.to_sat() * 1000) + } else { + None + } +} + +fn find_splice_pending_for_channel( + store: &PendingPaymentStore, channel_id: LnChannelId, +) -> Option { + store + .list_filter(|p| { + p.funding_details + .as_ref() + .map(|fd| { + fd.channel_id == channel_id && matches!(fd.purpose, FundingPurpose::Splice) + }) + .unwrap_or(false) + }) + .into_iter() + .next() } impl Listen for Wallet { @@ -1491,7 +2059,7 @@ impl Listen for Wallet { }; } - fn blocks_disconnected(&self, _fork_point_block: BestBlock) { + fn blocks_disconnected(&self, _fork_point_block: BlockLocator) { // This is a no-op as we don't have to tell BDK about disconnections. According to the BDK // team, it's sufficient in case of a reorg to always connect blocks starting from the last // point of disagreement. diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index d2c057a16..613e7b3c2 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -26,7 +26,7 @@ use common::{ setup_bitcoind_and_electrsd, setup_builder, setup_node, setup_two_nodes, splice_in_with_all, wait_for_tx, TestChainSource, TestStoreType, TestSyncStore, }; -use electrsd::corepc_node::Node as BitcoinD; +use electrsd::corepc_node::{self, Node as BitcoinD}; use electrsd::ElectrsD; use ldk_node::config::{AsyncPaymentsRole, EsploraSyncConfig}; use ldk_node::entropy::NodeEntropy; @@ -1067,6 +1067,9 @@ async fn splice_channel() { expect_channel_ready_event!(node_a, node_b.node_id()); expect_channel_ready_event!(node_b, node_a.node_id()); + // Our per-node fee contribution, computed exactly from the `FundingContribution` + // (inputs − outputs − change − value_added). For a sole-contributor splice-in + // that equals the whole-tx fee BDK pays from our wallet. let expected_splice_in_fee_sat = 255; let payments = node_b.list_payments(); @@ -1127,6 +1130,171 @@ async fn splice_channel() { ); } +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn rbf_splice_channel() { + // Use a custom bitcoind config with a lower incrementalrelayfee so that the +25 sat/kwu + // (0.1 sat/vB) RBF feerate bump satisfies BIP125's absolute fee increase requirement. + let bitcoind_exe = std::env::var("BITCOIND_EXE") + .ok() + .or_else(|| corepc_node::downloaded_exe_path().ok()) + .expect( + "you need to provide an env var BITCOIND_EXE or specify a bitcoind version feature", + ); + let mut bitcoind_conf = corepc_node::Conf::default(); + bitcoind_conf.network = "regtest"; + bitcoind_conf.args.push("-rest"); + bitcoind_conf.args.push("-incrementalrelayfee=0.00000100"); + let bitcoind = BitcoinD::with_conf(bitcoind_exe, &bitcoind_conf).unwrap(); + + let electrs_exe = std::env::var("ELECTRS_EXE") + .ok() + .or_else(electrsd::downloaded_exe_path) + .expect("you need to provide env var ELECTRS_EXE or specify an electrsd version feature"); + let mut electrsd_conf = electrsd::Conf::default(); + electrsd_conf.http_enabled = true; + electrsd_conf.network = "regtest"; + let electrsd = ElectrsD::with_conf(electrs_exe, &bitcoind, &electrsd_conf).unwrap(); + let chain_source = random_chain_source(&bitcoind, &electrsd); + + let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); + + let address_a = node_a.onchain_payment().new_address().unwrap(); + let address_b = node_b.onchain_payment().new_address().unwrap(); + let premine_amount_sat = 5_000_000; + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![address_a, address_b], + Amount::from_sat(premine_amount_sat), + ) + .await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + open_channel(&node_a, &node_b, 4_000_000, false, &electrsd).await; + + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + let user_channel_id_a = expect_channel_ready_event!(node_a, node_b.node_id()); + let user_channel_id_b = expect_channel_ready_event!(node_b, node_a.node_id()); + + // rbf_channel should fail when there's no pending splice + assert_eq!( + node_b.rbf_channel(&user_channel_id_b, node_a.node_id()), + Err(NodeError::ChannelSplicingFailed), + ); + + // Initiate a splice-in to create a pending splice + node_b.splice_in(&user_channel_id_b, node_a.node_id(), 1_000_000).unwrap(); + + let original_txo = expect_splice_pending_event!(node_a, node_b.node_id()); + expect_splice_pending_event!(node_b, node_a.node_id()); + + // splice_in should fail when there's a pending splice (RBF guard) + assert_eq!( + node_b.splice_in(&user_channel_id_b, node_a.node_id(), 1_000_000), + Err(NodeError::ChannelSplicingFailed), + ); + + // splice_out should fail when there's a pending splice (RBF guard) + let address = node_a.onchain_payment().new_address().unwrap(); + assert_eq!( + node_a.splice_out(&user_channel_id_a, node_b.node_id(), &address, 100_000), + Err(NodeError::ChannelSplicingFailed), + ); + + // rbf_channel should succeed when there's a pending splice + node_b.rbf_channel(&user_channel_id_b, node_a.node_id()).unwrap(); + + let rbf_txo = expect_splice_pending_event!(node_a, node_b.node_id()); + expect_splice_pending_event!(node_b, node_a.node_id()); + + assert_ne!(original_txo, rbf_txo, "RBF should produce a different funding txo"); + + // After RBF but before confirmation, node_b (the initiator) should have a single + // on-chain payment covering both candidates: id anchored to the first broadcast, + // `kind.txid` pointing at the latest (RBF) candidate, and the original candidate + // recorded as a replaced one on the pending record. + { + let payment_id = PaymentId(original_txo.txid.to_byte_array()); + let payment = node_b.payment(&payment_id).expect("splice payment exists"); + match payment.kind { + PaymentKind::Onchain { txid, status: ConfirmationStatus::Unconfirmed } => { + assert_eq!(txid, rbf_txo.txid); + }, + ref other => panic!("expected Onchain Unconfirmed, got {:?}", other), + } + assert_eq!(payment.status, PaymentStatus::Pending); + // Only one Onchain Pending payment for this splice attempt (not one per candidate). + let splice_payments = node_b.list_payments_with_filter(|p| { + p.direction == PaymentDirection::Outbound + && matches!(p.kind, PaymentKind::Onchain { .. }) + && p.status == PaymentStatus::Pending + }); + assert_eq!( + splice_payments.len(), + 1, + "expected exactly one pending Onchain payment for the splice, got {}: {:#?}", + splice_payments.len(), + splice_payments, + ); + } + + // Wait for the RBF transaction to replace the original in the mempool + wait_for_tx(&electrsd.client, rbf_txo.txid).await; + + // Mine blocks and confirm the RBF splice + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + // Verify the RBF transaction is the one that locked, not the original + match node_a.next_event_async().await { + Event::ChannelReady { funding_txo, counterparty_node_id, .. } => { + assert_eq!(counterparty_node_id, Some(node_b.node_id())); + assert_eq!(funding_txo, Some(rbf_txo)); + node_a.event_handled().unwrap(); + }, + ref e => panic!("node_a got unexpected event: {:?}", e), + } + match node_b.next_event_async().await { + Event::ChannelReady { funding_txo, counterparty_node_id, .. } => { + assert_eq!(counterparty_node_id, Some(node_a.node_id())); + assert_eq!(funding_txo, Some(rbf_txo)); + node_b.event_handled().unwrap(); + }, + ref e => panic!("node_b got unexpected event: {:?}", e), + } + + // After `ChannelReady` we should have graduated to `Succeeded` — even though + // `ANTI_REORG_DELAY` may not have elapsed yet — and the `kind.txid` should + // reflect the winning RBF candidate, with `fee_paid_msat` matching our + // per-node `FundingContribution::estimated_fee` for that candidate. + { + let payment_id = PaymentId(original_txo.txid.to_byte_array()); + let payment = node_b.payment(&payment_id).expect("splice payment graduated"); + assert_eq!(payment.status, PaymentStatus::Succeeded); + match payment.kind { + PaymentKind::Onchain { txid, status: ConfirmationStatus::Confirmed { .. } } => { + assert_eq!(txid, rbf_txo.txid); + }, + ref other => panic!("expected Onchain Confirmed, got {:?}", other), + } + assert!( + payment.fee_paid_msat.is_some(), + "splice payment should carry a fee from its FundingContribution", + ); + } + + node_a.stop().unwrap(); + node_b.stop().unwrap(); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn simple_bolt12_send_receive() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd();