From: Mshehu5 Date: Sat, 20 Dec 2025 22:54:46 +0000 (+0100) Subject: refactor: use BlockchainClient as references X-Git-Url: http://internal-gitweb-vhost/parse/%22https:/struct.EncoderStringWriter.html?a=commitdiff_plain;h=b88426adcdafa7c327465be6dfd9d8a6d03d89be;p=bdk-cli refactor: use BlockchainClient as references Update function signatures to accept &BlockchainClient instead of taking ownership. This refactoring to allow the client to be Referenced across multiple operations including repeated calls to sync_kyoto_client. - Update handle_online_wallet_subcommand signature - Update all PayjoinManager methods to use &BlockchainClient - Fix parameter dereferencing in full_scan calls - Update all call sites to pass references --- diff --git a/src/handlers.rs b/src/handlers.rs index 1f867b4..4740245 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -15,6 +15,8 @@ use crate::config::{WalletConfig, WalletConfigInner}; use crate::error::BDKCliError as Error; #[cfg(any(feature = "sqlite", feature = "redb"))] use crate::persister::Persister; +#[cfg(feature = "cbf")] +use crate::utils::BlockchainClient::KyotoClient; use crate::utils::*; #[cfg(feature = "redb")] use bdk_redb::Store as RedbStore; @@ -46,8 +48,6 @@ use bdk_wallet::{ }; use cli_table::{Cell, CellStruct, Style, Table, format::Justify}; use serde_json::json; -#[cfg(feature = "cbf")] -use {crate::utils::BlockchainClient::KyotoClient, bdk_kyoto::LightClient, tokio::select}; #[cfg(feature = "electrum")] use crate::utils::BlockchainClient::Electrum; @@ -605,7 +605,7 @@ pub fn handle_offline_wallet_subcommand( ))] pub(crate) async fn handle_online_wallet_subcommand( wallet: &mut Wallet, - client: BlockchainClient, + client: &BlockchainClient, online_subcommand: OnlineWalletSubCommand, ) -> Result { match online_subcommand { @@ -632,7 +632,7 @@ pub(crate) async fn handle_online_wallet_subcommand( client .populate_tx_cache(wallet.tx_graph().full_txs().map(|tx_node| tx_node.tx)); - let update = client.full_scan(request, _stop_gap, batch_size, false)?; + let update = client.full_scan(request, _stop_gap, *batch_size, false)?; wallet.apply_update(update)?; } #[cfg(feature = "esplora")] @@ -641,7 +641,7 @@ pub(crate) async fn handle_online_wallet_subcommand( parallel_requests, } => { let update = client - .full_scan(request, _stop_gap, parallel_requests) + .full_scan(request, _stop_gap, *parallel_requests) .await .map_err(|e| *e)?; wallet.apply_update(update)?; @@ -658,7 +658,7 @@ pub(crate) async fn handle_online_wallet_subcommand( hash: genesis_block.block_hash(), }); let mut emitter = Emitter::new( - &*client, + client.as_ref(), genesis_cp.clone(), genesis_cp.height(), NO_EXPECTED_MEMPOOL_TXS, @@ -1246,7 +1246,7 @@ pub(crate) async fn handle_command(cli_opts: CliOpts) -> Result { let result = handle_online_wallet_subcommand( &mut wallet, - blockchain_client, + &blockchain_client, online_subcommand, ) .await?; @@ -1258,7 +1258,7 @@ pub(crate) async fn handle_command(cli_opts: CliOpts) -> Result { let mut wallet = new_wallet(network, wallet_opts)?; let blockchain_client = crate::utils::new_blockchain_client(wallet_opts, &wallet, database_path)?; - handle_online_wallet_subcommand(&mut wallet, blockchain_client, online_subcommand) + handle_online_wallet_subcommand(&mut wallet, &blockchain_client, online_subcommand) .await? }; Ok(result) @@ -1452,7 +1452,7 @@ async fn respond( } => { let blockchain = new_blockchain_client(wallet_opts, wallet, _datadir).map_err(|e| e.to_string())?; - let value = handle_online_wallet_subcommand(wallet, blockchain, online_subcommand) + let value = handle_online_wallet_subcommand(wallet, &blockchain, online_subcommand) .await .map_err(|e| e.to_string())?; Some(value) @@ -1508,7 +1508,7 @@ async fn respond( feature = "rpc" ))] /// Syncs a given wallet using the blockchain client. -pub async fn sync_wallet(client: BlockchainClient, wallet: &mut Wallet) -> Result<(), Error> { +pub async fn sync_wallet(client: &BlockchainClient, wallet: &mut Wallet) -> Result<(), Error> { #[cfg(any(feature = "electrum", feature = "esplora"))] let request = wallet .start_sync_with_revealed_spks() @@ -1523,7 +1523,7 @@ pub async fn sync_wallet(client: BlockchainClient, wallet: &mut Wallet) -> Resul // already have. client.populate_tx_cache(wallet.tx_graph().full_txs().map(|tx_node| tx_node.tx)); - let update = client.sync(request, batch_size, false)?; + let update = client.sync(request, *batch_size, false)?; wallet .apply_update(update) .map_err(|e| Error::Generic(e.to_string())) @@ -1534,7 +1534,7 @@ pub async fn sync_wallet(client: BlockchainClient, wallet: &mut Wallet) -> Resul parallel_requests, } => { let update = client - .sync(request, parallel_requests) + .sync(request, *parallel_requests) .await .map_err(|e| *e)?; wallet @@ -1549,7 +1549,7 @@ pub async fn sync_wallet(client: BlockchainClient, wallet: &mut Wallet) -> Resul // reload the last 200 blocks in case of a reorg let emitter_height = wallet_cp.height().saturating_sub(200); let mut emitter = Emitter::new( - &*client, + client.as_ref(), wallet_cp, emitter_height, wallet @@ -1600,7 +1600,7 @@ pub async fn sync_wallet(client: BlockchainClient, wallet: &mut Wallet) -> Resul ))] /// Broadcasts a given transaction using the blockchain client. pub async fn broadcast_transaction( - client: BlockchainClient, + client: &BlockchainClient, tx: Transaction, ) -> Result { match client { @@ -1627,38 +1627,15 @@ pub async fn broadcast_transaction( #[cfg(feature = "cbf")] KyotoClient { client } => { - let LightClient { - requester, - mut info_subscriber, - mut warning_subscriber, - update_subscriber: _, - node, - } = *client; - - let subscriber = tracing_subscriber::FmtSubscriber::new(); - tracing::subscriber::set_global_default(subscriber) - .map_err(|e| Error::Generic(format!("SetGlobalDefault error: {e}")))?; - - tokio::task::spawn(async move { node.run().await }); - tokio::task::spawn(async move { - select! { - info = info_subscriber.recv() => { - if let Some(info) = info { - tracing::info!("{info}"); - } - }, - warn = warning_subscriber.recv() => { - if let Some(warn) = warn { - tracing::warn!("{warn}"); - } - } - } - }); let txid = tx.compute_txid(); - let wtxid = requester.broadcast_random(tx.clone()).await.map_err(|_| { - tracing::warn!("Broadcast was unsuccessful"); - Error::Generic("Transaction broadcast timed out after 30 seconds".into()) - })?; + let wtxid = client + .requester + .broadcast_random(tx.clone()) + .await + .map_err(|_| { + tracing::warn!("Broadcast was unsuccessful"); + Error::Generic("Transaction broadcast timed out after 30 seconds".into()) + })?; tracing::info!("Successfully broadcast WTXID: {wtxid}"); Ok(txid) } diff --git a/src/payjoin/mod.rs b/src/payjoin/mod.rs index f5e1274..64bd630 100644 --- a/src/payjoin/mod.rs +++ b/src/payjoin/mod.rs @@ -51,7 +51,7 @@ impl<'a> PayjoinManager<'a> { directory: String, max_fee_rate: Option, ohttp_relays: Vec, - blockchain_client: BlockchainClient, + blockchain_client: &BlockchainClient, ) -> Result { let address = self .wallet @@ -119,7 +119,7 @@ impl<'a> PayjoinManager<'a> { uri: String, fee_rate: u64, ohttp_relays: Vec, - blockchain_client: BlockchainClient, + blockchain_client: &BlockchainClient, ) -> Result { let uri = payjoin::Uri::try_from(uri) .map_err(|e| Error::Generic(format!("Failed parsing to Payjoin URI: {}", e)))?; @@ -237,7 +237,7 @@ impl<'a> PayjoinManager<'a> { persister: &impl SessionPersister, relay: impl payjoin::IntoUrl, max_fee_rate: FeeRate, - blockchain_client: BlockchainClient, + blockchain_client: &BlockchainClient, ) -> Result<(), Error> { match session { ReceiveSession::Initialized(proposal) => { @@ -306,7 +306,7 @@ impl<'a> PayjoinManager<'a> { persister: &impl SessionPersister, relay: impl payjoin::IntoUrl, max_fee_rate: FeeRate, - blockchain_client: BlockchainClient, + blockchain_client: &BlockchainClient, ) -> Result<(), Error> { let mut current_receiver_typestate = receiver; let next_receiver_typestate = loop { @@ -353,7 +353,7 @@ impl<'a> PayjoinManager<'a> { receiver: Receiver, persister: &impl SessionPersister, max_fee_rate: FeeRate, - blockchain_client: BlockchainClient, + blockchain_client: &BlockchainClient, ) -> Result<(), Error> { let next_receiver_typestate = receiver .assume_interactive_receiver() @@ -386,7 +386,7 @@ impl<'a> PayjoinManager<'a> { receiver: Receiver, persister: &impl SessionPersister, max_fee_rate: FeeRate, - blockchain_client: BlockchainClient, + blockchain_client: &BlockchainClient, ) -> Result<(), Error> { let next_receiver_typestate = receiver .check_inputs_not_owned(&mut |input| { @@ -411,7 +411,7 @@ impl<'a> PayjoinManager<'a> { receiver: Receiver, persister: &impl SessionPersister, max_fee_rate: FeeRate, - blockchain_client: BlockchainClient, + blockchain_client: &BlockchainClient, ) -> Result<(), Error> { // This is not supported as there is no persistence of previous Payjoin attempts in BDK CLI // yet. If there is support either in the BDK persister or Payjoin persister, this can be @@ -437,7 +437,7 @@ impl<'a> PayjoinManager<'a> { receiver: Receiver, persister: &impl SessionPersister, max_fee_rate: FeeRate, - blockchain_client: BlockchainClient, + blockchain_client: &BlockchainClient, ) -> Result<(), Error> { let next_receiver_typestate = receiver.identify_receiver_outputs(&mut |output_script| { Ok(self.wallet.is_mine(output_script.to_owned())) @@ -459,7 +459,7 @@ impl<'a> PayjoinManager<'a> { receiver: Receiver, persister: &impl SessionPersister, max_fee_rate: FeeRate, - blockchain_client: BlockchainClient, + blockchain_client: &BlockchainClient, ) -> Result<(), Error> { // This is a typestate to modify existing receiver-owned outputs in case the receiver wants // to do that. This is a very simple implementation of Payjoin so we are just going @@ -483,7 +483,7 @@ impl<'a> PayjoinManager<'a> { receiver: Receiver, persister: &impl SessionPersister, max_fee_rate: FeeRate, - blockchain_client: BlockchainClient, + blockchain_client: &BlockchainClient, ) -> Result<(), Error> { let candidate_inputs: Vec = self .wallet @@ -533,7 +533,7 @@ impl<'a> PayjoinManager<'a> { receiver: Receiver, persister: &impl SessionPersister, max_fee_rate: FeeRate, - blockchain_client: BlockchainClient, + blockchain_client: &BlockchainClient, ) -> Result<(), Error> { let next_receiver_typestate = receiver.apply_fee_range(None, Some(max_fee_rate)).save(persister).map_err(|e| { Error::Generic(format!("Error occurred when saving after applying the receiver fee range to the transaction: {e}")) @@ -546,7 +546,7 @@ impl<'a> PayjoinManager<'a> { &mut self, receiver: Receiver, persister: &impl SessionPersister, - blockchain_client: BlockchainClient, + blockchain_client: &BlockchainClient, ) -> Result<(), Error> { let next_receiver_typestate = receiver .finalize_proposal(|psbt| { @@ -580,7 +580,7 @@ impl<'a> PayjoinManager<'a> { &mut self, receiver: Receiver, persister: &impl SessionPersister, - blockchain_client: BlockchainClient, + blockchain_client: &BlockchainClient, ) -> Result<(), Error> { let (req, ctx) = receiver.create_post_request( self.relay_manager @@ -619,7 +619,7 @@ impl<'a> PayjoinManager<'a> { &mut self, receiver: Receiver, persister: &impl SessionPersister, - blockchain_client: BlockchainClient, + blockchain_client: &BlockchainClient, ) -> Result<(), Error> { let wait_time_for_sync = 3; let poll_internal = tokio::time::Duration::from_secs(wait_time_for_sync); @@ -734,7 +734,7 @@ impl<'a> PayjoinManager<'a> { session: SendSession, persister: &impl SessionPersister, relay: impl payjoin::IntoUrl, - blockchain_client: BlockchainClient, + blockchain_client: &BlockchainClient, ) -> Result { match session { SendSession::WithReplyKey(context) => { @@ -757,7 +757,7 @@ impl<'a> PayjoinManager<'a> { sender: Sender, relay: impl payjoin::IntoUrl, persister: &impl SessionPersister, - blockchain_client: BlockchainClient, + blockchain_client: &BlockchainClient, ) -> Result { let (req, ctx) = sender.create_v2_post_request(relay.as_str()).map_err(|e| { Error::Generic(format!( @@ -780,7 +780,7 @@ impl<'a> PayjoinManager<'a> { sender: Sender, relay: impl payjoin::IntoUrl, persister: &impl SessionPersister, - blockchain_client: BlockchainClient, + blockchain_client: &BlockchainClient, ) -> Result { let mut sender = sender.clone(); loop { @@ -815,7 +815,7 @@ impl<'a> PayjoinManager<'a> { async fn process_payjoin_proposal( &self, mut psbt: Psbt, - blockchain_client: BlockchainClient, + blockchain_client: &BlockchainClient, ) -> Result { if !self.wallet.sign(&mut psbt, SignOptions::default())? { return Err(Error::Generic(