]> Untitled Git - bdk-cli/commitdiff
feat(cbf): implement transaction broadcasting
authorrustaceanrob <rob.netzke@gmail.com>
Thu, 8 May 2025 19:30:07 +0000 (20:30 +0100)
committerSteve Myers <steve@notmandatory.org>
Fri, 16 May 2025 14:39:47 +0000 (09:39 -0500)
For the highest reliability, we wait for the connection requirement to
be met by the node. Once met, we can broadcast and wait for
confirmation. The function will either timeout after 15 seconds or
successfully finish with gossip confirmation.

src/commands.rs
src/handlers.rs

index 67f845374e93cac6904c221d7f2380a2eddaa9ba..52790bce763adac2925e3724f341211ee89d6752 100644 (file)
@@ -239,7 +239,7 @@ pub struct ProxyOpts {
 #[derive(Debug, Args, Clone, PartialEq, Eq)]
 pub struct CompactFilterOpts {
     /// Sets the number of parallel node connections.
-    #[clap(name = "CONNECTIONS", long = "cbf-conn-count", default_value = "4", value_parser = value_parser!(u8).range(1..=15))]
+    #[clap(name = "CONNECTIONS", long = "cbf-conn-count", default_value = "2", value_parser = value_parser!(u8).range(1..=15))]
     pub conn_count: u8,
 
     /// Optionally skip initial `skip_blocks` blocks.
index 1ef05268df9e3b7be9d3174a013c32b56081aaa4..35c881a3e0e355a65ec4cf0fab417ed55f8d0d4a 100644 (file)
@@ -48,7 +48,11 @@ use std::str::FromStr;
 
 #[cfg(feature = "electrum")]
 use crate::utils::BlockchainClient::Electrum;
+#[cfg(feature = "cbf")]
+use bdk_kyoto::{Info, LightClient};
 use bdk_wallet::bitcoin::base64::prelude::*;
+#[cfg(feature = "cbf")]
+use tokio::select;
 #[cfg(any(
     feature = "electrum",
     feature = "esplora",
@@ -507,7 +511,6 @@ pub(crate) async fn handle_online_wallet_subcommand(
                 (Some(_), Some(_)) => panic!("Both `psbt` and `tx` options not allowed"),
                 (None, None) => panic!("Missing `psbt` and `tx` option"),
             };
-
             let txid = match client {
                 #[cfg(feature = "electrum")]
                 Electrum {
@@ -531,8 +534,69 @@ pub(crate) async fn handle_online_wallet_subcommand(
                     .map_err(|e| Error::Generic(e.to_string()))?,
 
                 #[cfg(feature = "cbf")]
-                KyotoClient { client: _ } => {
-                    unimplemented!()
+                KyotoClient { client } => {
+                    let LightClient {
+                        requester,
+                        mut log_subscriber,
+                        mut info_subscriber,
+                        mut warning_subscriber,
+                        update_subscriber: _,
+                        node,
+                    } = client;
+
+                    let subscriber = tracing_subscriber::FmtSubscriber::new();
+                    tracing::subscriber::set_global_default(subscriber)
+                        .map_err(|e| Error::Generic(format!("SetGlobalDefault error: {}", e)))?;
+
+                    tokio::task::spawn(async move { node.run().await });
+                    tokio::task::spawn(async move {
+                        select! {
+                            log = log_subscriber.recv() => {
+                                if let Some(log) = log {
+                                    tracing::info!("{log}");
+                                }
+                            },
+                            warn = warning_subscriber.recv() => {
+                                if let Some(warn) = warn {
+                                    tracing::warn!("{warn}");
+                                }
+                            }
+                        }
+                    });
+                    let txid = tx.compute_txid();
+                    tracing::info!("Waiting for connections to broadcast...");
+                    while let Some(info) = info_subscriber.recv().await {
+                        match info {
+                            Info::ConnectionsMet => {
+                                requester
+                                    .broadcast_random(tx.clone())
+                                    .map_err(|e| Error::Generic(format!("{}", e)))?;
+                                break;
+                            }
+                            _ => tracing::info!("{info}"),
+                        }
+                    }
+                    tokio::time::timeout(tokio::time::Duration::from_secs(15), async move {
+                        while let Some(info) = info_subscriber.recv().await {
+                            match info {
+                                Info::TxGossiped(wtxid) => {
+                                    tracing::info!("Succuessfully broadcast WTXID: {wtxid}");
+                                    break;
+                                }
+                                Info::ConnectionsMet => {
+                                    tracing::info!("Rebroadcasting to new connections");
+                                    requester.broadcast_random(tx.clone()).unwrap();
+                                }
+                                _ => tracing::info!("{info}"),
+                            }
+                        }
+                    })
+                    .await
+                    .map_err(|_| {
+                        tracing::warn!("Broadcast was unsuccessful");
+                        Error::Generic("Transaction broadcast timed out after 15 seconds".into())
+                    })?;
+                    txid
                 }
             };
             Ok(json!({ "txid": txid }))
@@ -857,7 +921,7 @@ async fn respond(
             subcommand: WalletSubCommand::OnlineWalletSubCommand(online_subcommand),
         } => {
             let blockchain =
-                new_blockchain_client(wallet_opts, &wallet, _datadir).map_err(|e| e.to_string())?;
+                new_blockchain_client(wallet_opts, wallet, _datadir).map_err(|e| e.to_string())?;
             let value = handle_online_wallet_subcommand(wallet, blockchain, online_subcommand)
                 .await
                 .map_err(|e| e.to_string())?;