From: Mshehu5 Date: Sun, 21 Dec 2025 10:50:12 +0000 (+0100) Subject: refactor(payjoin): implement polling-based monitoring with timeout X-Git-Url: http://internal-gitweb-vhost/parse/%22https:/struct.EncoderStringWriter.html?a=commitdiff_plain;h=35d831329a9c52ad561ae7ee1a5a085c282adddb;p=bdk-cli refactor(payjoin): implement polling-based monitoring with timeout Replace single sync-and-check with periodic polling loop. This allows multiple sync operations since sync_wallet now accepts a reference to BlockchainClient, enabling proper long-running monitoring instead of a one-time check. --- diff --git a/src/payjoin/mod.rs b/src/payjoin/mod.rs index 64bd630..2107a64 100644 --- a/src/payjoin/mod.rs +++ b/src/payjoin/mod.rs @@ -607,72 +607,90 @@ impl<'a> PayjoinManager<'a> { .await; } - /// Syncs the blockchain once and then checks whether the Payjoin was broadcasted by the + /// Polls the blockchain periodically and checks whether the Payjoin was broadcasted by the /// sender. /// - /// The currenty implementation does not support checking for the Payjoin broadcast in a loop - /// and returning only when it is detected or if a timeout is reached because the [`sync_wallet`] - /// function consumes the BlockchainClient. BDK CLI supports multiple blockchain clients, and - /// at the time of writing, Kyoto consumes the client since BDK CLI is not designed for long-running - /// tasks. + /// This function syncs the wallet at regular intervals and checks for the Payjoin transaction + /// in a loop until it is detected or a timeout is reached. Since [`sync_wallet`] now accepts + /// a reference to the BlockchainClient, we can call it multiple times in a loop. async fn monitor_payjoin_proposal( &mut self, receiver: Receiver, persister: &impl SessionPersister, blockchain_client: &BlockchainClient, ) -> Result<(), Error> { - let wait_time_for_sync = 3; - let poll_internal = tokio::time::Duration::from_secs(wait_time_for_sync); + let poll_interval = tokio::time::Duration::from_millis(200); + let sync_interval = tokio::time::Duration::from_secs(3); + let timeout_duration = tokio::time::Duration::from_secs(15); println!( - "Waiting for {wait_time_for_sync} seconds before syncing the blockchain and checking if the transaction has been broadcast..." + "Polling for Payjoin transaction broadcast. This may take up to {} seconds...", + timeout_duration.as_secs() ); - tokio::time::sleep(poll_internal).await; - sync_wallet(blockchain_client, self.wallet).await?; - - let check_result = receiver - .check_payment( - |txid| { - let Some(tx_details) = self.wallet.tx_details(txid) else { - return Err(ImplementationError::from("Cannot find the transaction in the mempool or the blockchain")); - }; - - let is_seen = match tx_details.chain_position { - bdk_wallet::chain::ChainPosition::Confirmed { .. } => true, - bdk_wallet::chain::ChainPosition::Unconfirmed { first_seen: Some(_), .. } => true, - _ => false - }; - - if is_seen { - return Ok(Some(tx_details.tx.as_ref().clone())); + let result = tokio::time::timeout(timeout_duration, async { + let mut poll_timer = tokio::time::interval(poll_interval); + let mut sync_timer = tokio::time::interval(sync_interval); + poll_timer.tick().await; + sync_timer.tick().await; + sync_wallet(blockchain_client, self.wallet).await?; + + loop { + tokio::select! { + _ = poll_timer.tick() => { + // Time to check payment + let check_result = receiver + .check_payment( + |txid| { + let Some(tx_details) = self.wallet.tx_details(txid) else { + return Err(ImplementationError::from("Cannot find the transaction in the mempool or the blockchain")); + }; + + let is_seen = match tx_details.chain_position { + bdk_wallet::chain::ChainPosition::Confirmed { .. } => true, + bdk_wallet::chain::ChainPosition::Unconfirmed { first_seen: Some(_), .. } => true, + _ => false + }; + + if is_seen { + return Ok(Some(tx_details.tx.as_ref().clone())); + } + return Err(ImplementationError::from("Cannot find the transaction in the mempool or the blockchain")); + }, + |outpoint| { + let utxo = self.wallet.get_utxo(outpoint); + match utxo { + Some(_) => Ok(false), + None => Ok(true), + } + } + ) + .save(persister) + .map_err(|e| { + Error::Generic(format!("Error occurred when saving after checking that sender has broadcasted the Payjoin transaction: {e}")) + }); + + if let Ok(OptionalTransitionOutcome::Progress(_)) = check_result { + println!("Payjoin transaction detected in the mempool!"); + return Ok(()); + } + // For Stasis or Err, continue polling (implicit - falls through to next loop iteration) } - return Err(ImplementationError::from("Cannot find the transaction in the mempool or the blockchain")); - }, - |outpoint| { - let utxo = self.wallet.get_utxo(outpoint); - match utxo { - Some(_) => Ok(false), - None => Ok(true), + _ = sync_timer.tick() => { + // Time to sync wallet + sync_wallet(blockchain_client, self.wallet).await?; } } - ) - .save(persister) - .map_err(|e| { - Error::Generic(format!("Error occurred when saving after checking that sender has broadcasted the Payjoin transaction: {e}")) - }); - - match check_result { - Ok(_) => { - println!("Payjoin transaction detected in the mempool!"); - } - Err(_) => { - println!( - "Transaction was not found in the mempool after {wait_time_for_sync}. Check the state of the transaction manually after running the sync command." - ); } + }) + .await; + + match result { + Ok(ok) => ok, + Err(_) => Err(Error::Generic(format!( + "Timeout waiting for Payjoin transaction broadcast after {:?}. Check the state of the transaction manually after running the sync command.", + timeout_duration + ))), } - - Ok(()) } async fn handle_error(