#[derive(Debug, Args, Clone, PartialEq, Eq)]
pub struct CompactFilterOpts {
/// Sets the number of parallel node connections.
- #[clap(name = "CONNECTIONS", long = "cbf-conn-count", default_value = "4", value_parser = value_parser!(u8).range(1..=15))]
+ #[clap(name = "CONNECTIONS", long = "cbf-conn-count", default_value = "2", value_parser = value_parser!(u8).range(1..=15))]
pub conn_count: u8,
/// Optionally skip initial `skip_blocks` blocks.
#[cfg(feature = "electrum")]
use crate::utils::BlockchainClient::Electrum;
+#[cfg(feature = "cbf")]
+use bdk_kyoto::{Info, LightClient};
use bdk_wallet::bitcoin::base64::prelude::*;
+#[cfg(feature = "cbf")]
+use tokio::select;
#[cfg(any(
feature = "electrum",
feature = "esplora",
(Some(_), Some(_)) => panic!("Both `psbt` and `tx` options not allowed"),
(None, None) => panic!("Missing `psbt` and `tx` option"),
};
-
let txid = match client {
#[cfg(feature = "electrum")]
Electrum {
.map_err(|e| Error::Generic(e.to_string()))?,
#[cfg(feature = "cbf")]
- KyotoClient { client: _ } => {
- unimplemented!()
+ KyotoClient { client } => {
+ let LightClient {
+ requester,
+ mut log_subscriber,
+ mut info_subscriber,
+ mut warning_subscriber,
+ update_subscriber: _,
+ node,
+ } = client;
+
+ let subscriber = tracing_subscriber::FmtSubscriber::new();
+ tracing::subscriber::set_global_default(subscriber)
+ .map_err(|e| Error::Generic(format!("SetGlobalDefault error: {}", e)))?;
+
+ tokio::task::spawn(async move { node.run().await });
+ tokio::task::spawn(async move {
+ select! {
+ log = log_subscriber.recv() => {
+ if let Some(log) = log {
+ tracing::info!("{log}");
+ }
+ },
+ warn = warning_subscriber.recv() => {
+ if let Some(warn) = warn {
+ tracing::warn!("{warn}");
+ }
+ }
+ }
+ });
+ let txid = tx.compute_txid();
+ tracing::info!("Waiting for connections to broadcast...");
+ while let Some(info) = info_subscriber.recv().await {
+ match info {
+ Info::ConnectionsMet => {
+ requester
+ .broadcast_random(tx.clone())
+ .map_err(|e| Error::Generic(format!("{}", e)))?;
+ break;
+ }
+ _ => tracing::info!("{info}"),
+ }
+ }
+ tokio::time::timeout(tokio::time::Duration::from_secs(15), async move {
+ while let Some(info) = info_subscriber.recv().await {
+ match info {
+ Info::TxGossiped(wtxid) => {
+ tracing::info!("Succuessfully broadcast WTXID: {wtxid}");
+ break;
+ }
+ Info::ConnectionsMet => {
+ tracing::info!("Rebroadcasting to new connections");
+ requester.broadcast_random(tx.clone()).unwrap();
+ }
+ _ => tracing::info!("{info}"),
+ }
+ }
+ })
+ .await
+ .map_err(|_| {
+ tracing::warn!("Broadcast was unsuccessful");
+ Error::Generic("Transaction broadcast timed out after 15 seconds".into())
+ })?;
+ txid
}
};
Ok(json!({ "txid": txid }))
subcommand: WalletSubCommand::OnlineWalletSubCommand(online_subcommand),
} => {
let blockchain =
- new_blockchain_client(wallet_opts, &wallet, _datadir).map_err(|e| e.to_string())?;
+ new_blockchain_client(wallet_opts, wallet, _datadir).map_err(|e| e.to_string())?;
let value = handle_online_wallet_subcommand(wallet, blockchain, online_subcommand)
.await
.map_err(|e| e.to_string())?;