]> Untitled Git - bdk-cli/commitdiff
refactor: use BlockchainClient as references
authorMshehu5 <musheu@gmail.com>
Sat, 20 Dec 2025 22:54:46 +0000 (23:54 +0100)
committerMshehu5 <musheu@gmail.com>
Mon, 2 Feb 2026 10:56:21 +0000 (11:56 +0100)
Update function signatures to accept &BlockchainClient instead of taking
ownership. This refactoring to allow the client to be Referenced across
multiple operations including repeated calls to sync_kyoto_client.

- Update handle_online_wallet_subcommand signature
- Update all PayjoinManager methods to use &BlockchainClient
- Fix parameter dereferencing in full_scan calls
- Update all call sites to pass references

src/handlers.rs
src/payjoin/mod.rs

index 1f867b41928f821a450007d953d7c755da35267e..47402456d6302dd8f0e836e747ebac1083ac6bed 100644 (file)
@@ -15,6 +15,8 @@ use crate::config::{WalletConfig, WalletConfigInner};
 use crate::error::BDKCliError as Error;
 #[cfg(any(feature = "sqlite", feature = "redb"))]
 use crate::persister::Persister;
+#[cfg(feature = "cbf")]
+use crate::utils::BlockchainClient::KyotoClient;
 use crate::utils::*;
 #[cfg(feature = "redb")]
 use bdk_redb::Store as RedbStore;
@@ -46,8 +48,6 @@ use bdk_wallet::{
 };
 use cli_table::{Cell, CellStruct, Style, Table, format::Justify};
 use serde_json::json;
-#[cfg(feature = "cbf")]
-use {crate::utils::BlockchainClient::KyotoClient, bdk_kyoto::LightClient, tokio::select};
 
 #[cfg(feature = "electrum")]
 use crate::utils::BlockchainClient::Electrum;
@@ -605,7 +605,7 @@ pub fn handle_offline_wallet_subcommand(
 ))]
 pub(crate) async fn handle_online_wallet_subcommand(
     wallet: &mut Wallet,
-    client: BlockchainClient,
+    client: &BlockchainClient,
     online_subcommand: OnlineWalletSubCommand,
 ) -> Result<String, Error> {
     match online_subcommand {
@@ -632,7 +632,7 @@ pub(crate) async fn handle_online_wallet_subcommand(
                     client
                         .populate_tx_cache(wallet.tx_graph().full_txs().map(|tx_node| tx_node.tx));
 
-                    let update = client.full_scan(request, _stop_gap, batch_size, false)?;
+                    let update = client.full_scan(request, _stop_gap, *batch_size, false)?;
                     wallet.apply_update(update)?;
                 }
                 #[cfg(feature = "esplora")]
@@ -641,7 +641,7 @@ pub(crate) async fn handle_online_wallet_subcommand(
                     parallel_requests,
                 } => {
                     let update = client
-                        .full_scan(request, _stop_gap, parallel_requests)
+                        .full_scan(request, _stop_gap, *parallel_requests)
                         .await
                         .map_err(|e| *e)?;
                     wallet.apply_update(update)?;
@@ -658,7 +658,7 @@ pub(crate) async fn handle_online_wallet_subcommand(
                         hash: genesis_block.block_hash(),
                     });
                     let mut emitter = Emitter::new(
-                        &*client,
+                        client.as_ref(),
                         genesis_cp.clone(),
                         genesis_cp.height(),
                         NO_EXPECTED_MEMPOOL_TXS,
@@ -1246,7 +1246,7 @@ pub(crate) async fn handle_command(cli_opts: CliOpts) -> Result<String, Error> {
 
                 let result = handle_online_wallet_subcommand(
                     &mut wallet,
-                    blockchain_client,
+                    &blockchain_client,
                     online_subcommand,
                 )
                 .await?;
@@ -1258,7 +1258,7 @@ pub(crate) async fn handle_command(cli_opts: CliOpts) -> Result<String, Error> {
                 let mut wallet = new_wallet(network, wallet_opts)?;
                 let blockchain_client =
                     crate::utils::new_blockchain_client(wallet_opts, &wallet, database_path)?;
-                handle_online_wallet_subcommand(&mut wallet, blockchain_client, online_subcommand)
+                handle_online_wallet_subcommand(&mut wallet, &blockchain_client, online_subcommand)
                     .await?
             };
             Ok(result)
@@ -1452,7 +1452,7 @@ async fn respond(
         } => {
             let blockchain =
                 new_blockchain_client(wallet_opts, wallet, _datadir).map_err(|e| e.to_string())?;
-            let value = handle_online_wallet_subcommand(wallet, blockchain, online_subcommand)
+            let value = handle_online_wallet_subcommand(wallet, &blockchain, online_subcommand)
                 .await
                 .map_err(|e| e.to_string())?;
             Some(value)
@@ -1508,7 +1508,7 @@ async fn respond(
     feature = "rpc"
 ))]
 /// Syncs a given wallet using the blockchain client.
-pub async fn sync_wallet(client: BlockchainClient, wallet: &mut Wallet) -> Result<(), Error> {
+pub async fn sync_wallet(client: &BlockchainClient, wallet: &mut Wallet) -> Result<(), Error> {
     #[cfg(any(feature = "electrum", feature = "esplora"))]
     let request = wallet
         .start_sync_with_revealed_spks()
@@ -1523,7 +1523,7 @@ pub async fn sync_wallet(client: BlockchainClient, wallet: &mut Wallet) -> Resul
             // already have.
             client.populate_tx_cache(wallet.tx_graph().full_txs().map(|tx_node| tx_node.tx));
 
-            let update = client.sync(request, batch_size, false)?;
+            let update = client.sync(request, *batch_size, false)?;
             wallet
                 .apply_update(update)
                 .map_err(|e| Error::Generic(e.to_string()))
@@ -1534,7 +1534,7 @@ pub async fn sync_wallet(client: BlockchainClient, wallet: &mut Wallet) -> Resul
             parallel_requests,
         } => {
             let update = client
-                .sync(request, parallel_requests)
+                .sync(request, *parallel_requests)
                 .await
                 .map_err(|e| *e)?;
             wallet
@@ -1549,7 +1549,7 @@ pub async fn sync_wallet(client: BlockchainClient, wallet: &mut Wallet) -> Resul
             // reload the last 200 blocks in case of a reorg
             let emitter_height = wallet_cp.height().saturating_sub(200);
             let mut emitter = Emitter::new(
-                &*client,
+                client.as_ref(),
                 wallet_cp,
                 emitter_height,
                 wallet
@@ -1600,7 +1600,7 @@ pub async fn sync_wallet(client: BlockchainClient, wallet: &mut Wallet) -> Resul
 ))]
 /// Broadcasts a given transaction using the blockchain client.
 pub async fn broadcast_transaction(
-    client: BlockchainClient,
+    client: &BlockchainClient,
     tx: Transaction,
 ) -> Result<Txid, Error> {
     match client {
@@ -1627,38 +1627,15 @@ pub async fn broadcast_transaction(
 
         #[cfg(feature = "cbf")]
         KyotoClient { client } => {
-            let LightClient {
-                requester,
-                mut info_subscriber,
-                mut warning_subscriber,
-                update_subscriber: _,
-                node,
-            } = *client;
-
-            let subscriber = tracing_subscriber::FmtSubscriber::new();
-            tracing::subscriber::set_global_default(subscriber)
-                .map_err(|e| Error::Generic(format!("SetGlobalDefault error: {e}")))?;
-
-            tokio::task::spawn(async move { node.run().await });
-            tokio::task::spawn(async move {
-                select! {
-                    info = info_subscriber.recv() => {
-                        if let Some(info) = info {
-                            tracing::info!("{info}");
-                        }
-                    },
-                    warn = warning_subscriber.recv() => {
-                        if let Some(warn) = warn {
-                            tracing::warn!("{warn}");
-                        }
-                    }
-                }
-            });
             let txid = tx.compute_txid();
-            let wtxid = requester.broadcast_random(tx.clone()).await.map_err(|_| {
-                tracing::warn!("Broadcast was unsuccessful");
-                Error::Generic("Transaction broadcast timed out after 30 seconds".into())
-            })?;
+            let wtxid = client
+                .requester
+                .broadcast_random(tx.clone())
+                .await
+                .map_err(|_| {
+                    tracing::warn!("Broadcast was unsuccessful");
+                    Error::Generic("Transaction broadcast timed out after 30 seconds".into())
+                })?;
             tracing::info!("Successfully broadcast WTXID: {wtxid}");
             Ok(txid)
         }
index f5e1274ca1acfa0a5085da1e0296fadcbc281798..64bd6309580f2f0b753e7d87c5e7299caa099884 100644 (file)
@@ -51,7 +51,7 @@ impl<'a> PayjoinManager<'a> {
         directory: String,
         max_fee_rate: Option<u64>,
         ohttp_relays: Vec<String>,
-        blockchain_client: BlockchainClient,
+        blockchain_client: &BlockchainClient,
     ) -> Result<String, Error> {
         let address = self
             .wallet
@@ -119,7 +119,7 @@ impl<'a> PayjoinManager<'a> {
         uri: String,
         fee_rate: u64,
         ohttp_relays: Vec<String>,
-        blockchain_client: BlockchainClient,
+        blockchain_client: &BlockchainClient,
     ) -> Result<String, Error> {
         let uri = payjoin::Uri::try_from(uri)
             .map_err(|e| Error::Generic(format!("Failed parsing to Payjoin URI: {}", e)))?;
@@ -237,7 +237,7 @@ impl<'a> PayjoinManager<'a> {
         persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
         relay: impl payjoin::IntoUrl,
         max_fee_rate: FeeRate,
-        blockchain_client: BlockchainClient,
+        blockchain_client: &BlockchainClient,
     ) -> Result<(), Error> {
         match session {
             ReceiveSession::Initialized(proposal) => {
@@ -306,7 +306,7 @@ impl<'a> PayjoinManager<'a> {
         persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
         relay: impl payjoin::IntoUrl,
         max_fee_rate: FeeRate,
-        blockchain_client: BlockchainClient,
+        blockchain_client: &BlockchainClient,
     ) -> Result<(), Error> {
         let mut current_receiver_typestate = receiver;
         let next_receiver_typestate = loop {
@@ -353,7 +353,7 @@ impl<'a> PayjoinManager<'a> {
         receiver: Receiver<UncheckedOriginalPayload>,
         persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
         max_fee_rate: FeeRate,
-        blockchain_client: BlockchainClient,
+        blockchain_client: &BlockchainClient,
     ) -> Result<(), Error> {
         let next_receiver_typestate = receiver
             .assume_interactive_receiver()
@@ -386,7 +386,7 @@ impl<'a> PayjoinManager<'a> {
         receiver: Receiver<MaybeInputsOwned>,
         persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
         max_fee_rate: FeeRate,
-        blockchain_client: BlockchainClient,
+        blockchain_client: &BlockchainClient,
     ) -> Result<(), Error> {
         let next_receiver_typestate = receiver
             .check_inputs_not_owned(&mut |input| {
@@ -411,7 +411,7 @@ impl<'a> PayjoinManager<'a> {
         receiver: Receiver<MaybeInputsSeen>,
         persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
         max_fee_rate: FeeRate,
-        blockchain_client: BlockchainClient,
+        blockchain_client: &BlockchainClient,
     ) -> Result<(), Error> {
         // This is not supported as there is no persistence of previous Payjoin attempts in BDK CLI
         // yet. If there is support either in the BDK persister or Payjoin persister, this can be
@@ -437,7 +437,7 @@ impl<'a> PayjoinManager<'a> {
         receiver: Receiver<OutputsUnknown>,
         persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
         max_fee_rate: FeeRate,
-        blockchain_client: BlockchainClient,
+        blockchain_client: &BlockchainClient,
     ) -> Result<(), Error> {
         let next_receiver_typestate = receiver.identify_receiver_outputs(&mut |output_script| {
             Ok(self.wallet.is_mine(output_script.to_owned()))
@@ -459,7 +459,7 @@ impl<'a> PayjoinManager<'a> {
         receiver: Receiver<WantsOutputs>,
         persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
         max_fee_rate: FeeRate,
-        blockchain_client: BlockchainClient,
+        blockchain_client: &BlockchainClient,
     ) -> Result<(), Error> {
         // This is a typestate to modify existing receiver-owned outputs in case the receiver wants
         // to do that. This is a very simple implementation of Payjoin so we are just going
@@ -483,7 +483,7 @@ impl<'a> PayjoinManager<'a> {
         receiver: Receiver<WantsInputs>,
         persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
         max_fee_rate: FeeRate,
-        blockchain_client: BlockchainClient,
+        blockchain_client: &BlockchainClient,
     ) -> Result<(), Error> {
         let candidate_inputs: Vec<InputPair> = self
             .wallet
@@ -533,7 +533,7 @@ impl<'a> PayjoinManager<'a> {
         receiver: Receiver<WantsFeeRange>,
         persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
         max_fee_rate: FeeRate,
-        blockchain_client: BlockchainClient,
+        blockchain_client: &BlockchainClient,
     ) -> Result<(), Error> {
         let next_receiver_typestate = receiver.apply_fee_range(None, Some(max_fee_rate)).save(persister).map_err(|e| {
             Error::Generic(format!("Error occurred when saving after applying the receiver fee range to the transaction: {e}"))
@@ -546,7 +546,7 @@ impl<'a> PayjoinManager<'a> {
         &mut self,
         receiver: Receiver<ProvisionalProposal>,
         persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
-        blockchain_client: BlockchainClient,
+        blockchain_client: &BlockchainClient,
     ) -> Result<(), Error> {
         let next_receiver_typestate = receiver
             .finalize_proposal(|psbt| {
@@ -580,7 +580,7 @@ impl<'a> PayjoinManager<'a> {
         &mut self,
         receiver: Receiver<PayjoinProposal>,
         persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
-        blockchain_client: BlockchainClient,
+        blockchain_client: &BlockchainClient,
     ) -> Result<(), Error> {
         let (req, ctx) = receiver.create_post_request(
             self.relay_manager
@@ -619,7 +619,7 @@ impl<'a> PayjoinManager<'a> {
         &mut self,
         receiver: Receiver<Monitor>,
         persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
-        blockchain_client: BlockchainClient,
+        blockchain_client: &BlockchainClient,
     ) -> Result<(), Error> {
         let wait_time_for_sync = 3;
         let poll_internal = tokio::time::Duration::from_secs(wait_time_for_sync);
@@ -734,7 +734,7 @@ impl<'a> PayjoinManager<'a> {
         session: SendSession,
         persister: &impl SessionPersister<SessionEvent = SenderSessionEvent>,
         relay: impl payjoin::IntoUrl,
-        blockchain_client: BlockchainClient,
+        blockchain_client: &BlockchainClient,
     ) -> Result<Txid, Error> {
         match session {
             SendSession::WithReplyKey(context) => {
@@ -757,7 +757,7 @@ impl<'a> PayjoinManager<'a> {
         sender: Sender<WithReplyKey>,
         relay: impl payjoin::IntoUrl,
         persister: &impl SessionPersister<SessionEvent = SenderSessionEvent>,
-        blockchain_client: BlockchainClient,
+        blockchain_client: &BlockchainClient,
     ) -> Result<Txid, Error> {
         let (req, ctx) = sender.create_v2_post_request(relay.as_str()).map_err(|e| {
             Error::Generic(format!(
@@ -780,7 +780,7 @@ impl<'a> PayjoinManager<'a> {
         sender: Sender<PollingForProposal>,
         relay: impl payjoin::IntoUrl,
         persister: &impl SessionPersister<SessionEvent = SenderSessionEvent>,
-        blockchain_client: BlockchainClient,
+        blockchain_client: &BlockchainClient,
     ) -> Result<Txid, Error> {
         let mut sender = sender.clone();
         loop {
@@ -815,7 +815,7 @@ impl<'a> PayjoinManager<'a> {
     async fn process_payjoin_proposal(
         &self,
         mut psbt: Psbt,
-        blockchain_client: BlockchainClient,
+        blockchain_client: &BlockchainClient,
     ) -> Result<Txid, Error> {
         if !self.wallet.sign(&mut psbt, SignOptions::default())? {
             return Err(Error::Generic(