From: rustaceanrob Date: Thu, 8 May 2025 19:30:07 +0000 (+0100) Subject: feat(cbf): implement transaction broadcasting X-Git-Tag: v1.0.0~4^2~1 X-Git-Url: http://internal-gitweb-vhost/?a=commitdiff_plain;h=b04fed2e8a58dd6b4e001c64d03d85ad46cf7d3c;p=bdk-cli feat(cbf): implement transaction broadcasting For the highest reliability, we wait for the connection requirement to be met by the node. Once met, we can broadcast and wait for confirmation. The function will either timeout after 15 seconds or successfully finish with gossip confirmation. --- diff --git a/src/commands.rs b/src/commands.rs index 67f8453..52790bc 100644 --- a/src/commands.rs +++ b/src/commands.rs @@ -239,7 +239,7 @@ pub struct ProxyOpts { #[derive(Debug, Args, Clone, PartialEq, Eq)] pub struct CompactFilterOpts { /// Sets the number of parallel node connections. - #[clap(name = "CONNECTIONS", long = "cbf-conn-count", default_value = "4", value_parser = value_parser!(u8).range(1..=15))] + #[clap(name = "CONNECTIONS", long = "cbf-conn-count", default_value = "2", value_parser = value_parser!(u8).range(1..=15))] pub conn_count: u8, /// Optionally skip initial `skip_blocks` blocks. diff --git a/src/handlers.rs b/src/handlers.rs index 1ef0526..35c881a 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -48,7 +48,11 @@ use std::str::FromStr; #[cfg(feature = "electrum")] use crate::utils::BlockchainClient::Electrum; +#[cfg(feature = "cbf")] +use bdk_kyoto::{Info, LightClient}; use bdk_wallet::bitcoin::base64::prelude::*; +#[cfg(feature = "cbf")] +use tokio::select; #[cfg(any( feature = "electrum", feature = "esplora", @@ -507,7 +511,6 @@ pub(crate) async fn handle_online_wallet_subcommand( (Some(_), Some(_)) => panic!("Both `psbt` and `tx` options not allowed"), (None, None) => panic!("Missing `psbt` and `tx` option"), }; - let txid = match client { #[cfg(feature = "electrum")] Electrum { @@ -531,8 +534,69 @@ pub(crate) async fn handle_online_wallet_subcommand( .map_err(|e| Error::Generic(e.to_string()))?, #[cfg(feature = "cbf")] - KyotoClient { client: _ } => { - unimplemented!() + KyotoClient { client } => { + let LightClient { + requester, + mut log_subscriber, + mut info_subscriber, + mut warning_subscriber, + update_subscriber: _, + node, + } = client; + + let subscriber = tracing_subscriber::FmtSubscriber::new(); + tracing::subscriber::set_global_default(subscriber) + .map_err(|e| Error::Generic(format!("SetGlobalDefault error: {}", e)))?; + + tokio::task::spawn(async move { node.run().await }); + tokio::task::spawn(async move { + select! { + log = log_subscriber.recv() => { + if let Some(log) = log { + tracing::info!("{log}"); + } + }, + warn = warning_subscriber.recv() => { + if let Some(warn) = warn { + tracing::warn!("{warn}"); + } + } + } + }); + let txid = tx.compute_txid(); + tracing::info!("Waiting for connections to broadcast..."); + while let Some(info) = info_subscriber.recv().await { + match info { + Info::ConnectionsMet => { + requester + .broadcast_random(tx.clone()) + .map_err(|e| Error::Generic(format!("{}", e)))?; + break; + } + _ => tracing::info!("{info}"), + } + } + tokio::time::timeout(tokio::time::Duration::from_secs(15), async move { + while let Some(info) = info_subscriber.recv().await { + match info { + Info::TxGossiped(wtxid) => { + tracing::info!("Succuessfully broadcast WTXID: {wtxid}"); + break; + } + Info::ConnectionsMet => { + tracing::info!("Rebroadcasting to new connections"); + requester.broadcast_random(tx.clone()).unwrap(); + } + _ => tracing::info!("{info}"), + } + } + }) + .await + .map_err(|_| { + tracing::warn!("Broadcast was unsuccessful"); + Error::Generic("Transaction broadcast timed out after 15 seconds".into()) + })?; + txid } }; Ok(json!({ "txid": txid })) @@ -857,7 +921,7 @@ async fn respond( subcommand: WalletSubCommand::OnlineWalletSubCommand(online_subcommand), } => { let blockchain = - new_blockchain_client(wallet_opts, &wallet, _datadir).map_err(|e| e.to_string())?; + new_blockchain_client(wallet_opts, wallet, _datadir).map_err(|e| e.to_string())?; let value = handle_online_wallet_subcommand(wallet, blockchain, online_subcommand) .await .map_err(|e| e.to_string())?;