]> Untitled Git - bdk-cli/commitdiff
refactor(payjoin): implement polling-based monitoring with timeout
authorMshehu5 <musheu@gmail.com>
Sun, 21 Dec 2025 10:50:12 +0000 (11:50 +0100)
committerMshehu5 <musheu@gmail.com>
Mon, 2 Feb 2026 10:56:22 +0000 (11:56 +0100)
Replace single sync-and-check with periodic polling loop.
This allows multiple sync operations since sync_wallet now accepts
a reference to BlockchainClient, enabling proper long-running
monitoring instead of a one-time check.

src/payjoin/mod.rs

index 64bd6309580f2f0b753e7d87c5e7299caa099884..2107a640692046613af88ed5c2d091a7bc3293df 100644 (file)
@@ -607,72 +607,90 @@ impl<'a> PayjoinManager<'a> {
             .await;
     }
 
-    /// Syncs the blockchain once and then checks whether the Payjoin was broadcasted by the
+    /// Polls the blockchain periodically and checks whether the Payjoin was broadcasted by the
     /// sender.
     ///
-    /// The currenty implementation does not support checking for the Payjoin broadcast in a loop
-    /// and returning only when it is detected or if a timeout is reached because the [`sync_wallet`]
-    /// function consumes the BlockchainClient. BDK CLI supports multiple blockchain clients, and
-    /// at the time of writing, Kyoto consumes the client since BDK CLI is not designed for long-running
-    /// tasks.
+    /// This function syncs the wallet at regular intervals and checks for the Payjoin transaction
+    /// in a loop until it is detected or a timeout is reached. Since [`sync_wallet`] now accepts
+    /// a reference to the BlockchainClient, we can call it multiple times in a loop.
     async fn monitor_payjoin_proposal(
         &mut self,
         receiver: Receiver<Monitor>,
         persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
         blockchain_client: &BlockchainClient,
     ) -> Result<(), Error> {
-        let wait_time_for_sync = 3;
-        let poll_internal = tokio::time::Duration::from_secs(wait_time_for_sync);
+        let poll_interval = tokio::time::Duration::from_millis(200);
+        let sync_interval = tokio::time::Duration::from_secs(3);
+        let timeout_duration = tokio::time::Duration::from_secs(15);
 
         println!(
-            "Waiting for {wait_time_for_sync} seconds before syncing the blockchain and checking if the transaction has been broadcast..."
+            "Polling for Payjoin transaction broadcast. This may take up to {} seconds...",
+            timeout_duration.as_secs()
         );
-        tokio::time::sleep(poll_internal).await;
-        sync_wallet(blockchain_client, self.wallet).await?;
-
-        let check_result = receiver
-            .check_payment(
-                |txid| {
-                    let Some(tx_details) = self.wallet.tx_details(txid) else {
-                        return Err(ImplementationError::from("Cannot find the transaction in the mempool or the blockchain"));
-                    };
-
-                    let is_seen = match tx_details.chain_position {
-                        bdk_wallet::chain::ChainPosition::Confirmed { .. } => true,
-                        bdk_wallet::chain::ChainPosition::Unconfirmed { first_seen: Some(_), .. } => true,
-                        _ => false
-                    };
-
-                    if is_seen {
-                        return Ok(Some(tx_details.tx.as_ref().clone()));
+        let result = tokio::time::timeout(timeout_duration, async {
+            let mut poll_timer = tokio::time::interval(poll_interval);
+            let mut sync_timer = tokio::time::interval(sync_interval);
+            poll_timer.tick().await;
+            sync_timer.tick().await;
+            sync_wallet(blockchain_client, self.wallet).await?;
+
+            loop {
+                tokio::select! {
+                    _ = poll_timer.tick() => {
+                        // Time to check payment
+                        let check_result = receiver
+                            .check_payment(
+                                |txid| {
+                                    let Some(tx_details) = self.wallet.tx_details(txid) else {
+                                        return Err(ImplementationError::from("Cannot find the transaction in the mempool or the blockchain"));
+                                    };
+
+                                    let is_seen = match tx_details.chain_position {
+                                        bdk_wallet::chain::ChainPosition::Confirmed { .. } => true,
+                                        bdk_wallet::chain::ChainPosition::Unconfirmed { first_seen: Some(_), .. } => true,
+                                        _ => false
+                                    };
+
+                                    if is_seen {
+                                        return Ok(Some(tx_details.tx.as_ref().clone()));
+                                    }
+                                    return Err(ImplementationError::from("Cannot find the transaction in the mempool or the blockchain"));
+                                },
+                                |outpoint| {
+                                    let utxo = self.wallet.get_utxo(outpoint);
+                                    match utxo {
+                                        Some(_) => Ok(false),
+                                        None => Ok(true),
+                                    }
+                                }
+                            )
+                            .save(persister)
+                            .map_err(|e| {
+                                Error::Generic(format!("Error occurred when saving after checking that sender has broadcasted the Payjoin transaction: {e}"))
+                            });
+
+                        if let Ok(OptionalTransitionOutcome::Progress(_)) = check_result {
+                            println!("Payjoin transaction detected in the mempool!");
+                            return Ok(());
+                        }
+                        // For Stasis or Err, continue polling (implicit - falls through to next loop iteration)
                     }
-                    return Err(ImplementationError::from("Cannot find the transaction in the mempool or the blockchain"));
-                },
-                |outpoint| {
-                    let utxo = self.wallet.get_utxo(outpoint);
-                    match utxo {
-                        Some(_) => Ok(false),
-                        None => Ok(true),
+                    _ = sync_timer.tick() => {
+                        // Time to sync wallet
+                        sync_wallet(blockchain_client, self.wallet).await?;
                     }
                 }
-            )
-            .save(persister)
-            .map_err(|e| {
-                Error::Generic(format!("Error occurred when saving after checking that sender has broadcasted the Payjoin transaction: {e}"))
-            });
-
-        match check_result {
-            Ok(_) => {
-                println!("Payjoin transaction detected in the mempool!");
-            }
-            Err(_) => {
-                println!(
-                    "Transaction was not found in the mempool after {wait_time_for_sync}. Check the state of the transaction manually after running the sync command."
-                );
             }
+        })
+        .await;
+
+        match result {
+            Ok(ok) => ok,
+            Err(_) => Err(Error::Generic(format!(
+                "Timeout waiting for Payjoin transaction broadcast after {:?}. Check the state of the transaction manually after running the sync command.",
+                timeout_duration
+            ))),
         }
-
-        Ok(())
     }
 
     async fn handle_error(