.await;
}
- /// Syncs the blockchain once and then checks whether the Payjoin was broadcasted by the
+ /// Polls the blockchain periodically and checks whether the Payjoin was broadcasted by the
/// sender.
///
- /// The currenty implementation does not support checking for the Payjoin broadcast in a loop
- /// and returning only when it is detected or if a timeout is reached because the [`sync_wallet`]
- /// function consumes the BlockchainClient. BDK CLI supports multiple blockchain clients, and
- /// at the time of writing, Kyoto consumes the client since BDK CLI is not designed for long-running
- /// tasks.
+ /// This function syncs the wallet at regular intervals and checks for the Payjoin transaction
+ /// in a loop until it is detected or a timeout is reached. Since [`sync_wallet`] now accepts
+ /// a reference to the BlockchainClient, we can call it multiple times in a loop.
async fn monitor_payjoin_proposal(
&mut self,
receiver: Receiver<Monitor>,
persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
blockchain_client: &BlockchainClient,
) -> Result<(), Error> {
- let wait_time_for_sync = 3;
- let poll_internal = tokio::time::Duration::from_secs(wait_time_for_sync);
+ let poll_interval = tokio::time::Duration::from_millis(200);
+ let sync_interval = tokio::time::Duration::from_secs(3);
+ let timeout_duration = tokio::time::Duration::from_secs(15);
println!(
- "Waiting for {wait_time_for_sync} seconds before syncing the blockchain and checking if the transaction has been broadcast..."
+ "Polling for Payjoin transaction broadcast. This may take up to {} seconds...",
+ timeout_duration.as_secs()
);
- tokio::time::sleep(poll_internal).await;
- sync_wallet(blockchain_client, self.wallet).await?;
-
- let check_result = receiver
- .check_payment(
- |txid| {
- let Some(tx_details) = self.wallet.tx_details(txid) else {
- return Err(ImplementationError::from("Cannot find the transaction in the mempool or the blockchain"));
- };
-
- let is_seen = match tx_details.chain_position {
- bdk_wallet::chain::ChainPosition::Confirmed { .. } => true,
- bdk_wallet::chain::ChainPosition::Unconfirmed { first_seen: Some(_), .. } => true,
- _ => false
- };
-
- if is_seen {
- return Ok(Some(tx_details.tx.as_ref().clone()));
+ let result = tokio::time::timeout(timeout_duration, async {
+ let mut poll_timer = tokio::time::interval(poll_interval);
+ let mut sync_timer = tokio::time::interval(sync_interval);
+ poll_timer.tick().await;
+ sync_timer.tick().await;
+ sync_wallet(blockchain_client, self.wallet).await?;
+
+ loop {
+ tokio::select! {
+ _ = poll_timer.tick() => {
+ // Time to check payment
+ let check_result = receiver
+ .check_payment(
+ |txid| {
+ let Some(tx_details) = self.wallet.tx_details(txid) else {
+ return Err(ImplementationError::from("Cannot find the transaction in the mempool or the blockchain"));
+ };
+
+ let is_seen = match tx_details.chain_position {
+ bdk_wallet::chain::ChainPosition::Confirmed { .. } => true,
+ bdk_wallet::chain::ChainPosition::Unconfirmed { first_seen: Some(_), .. } => true,
+ _ => false
+ };
+
+ if is_seen {
+ return Ok(Some(tx_details.tx.as_ref().clone()));
+ }
+ return Err(ImplementationError::from("Cannot find the transaction in the mempool or the blockchain"));
+ },
+ |outpoint| {
+ let utxo = self.wallet.get_utxo(outpoint);
+ match utxo {
+ Some(_) => Ok(false),
+ None => Ok(true),
+ }
+ }
+ )
+ .save(persister)
+ .map_err(|e| {
+ Error::Generic(format!("Error occurred when saving after checking that sender has broadcasted the Payjoin transaction: {e}"))
+ });
+
+ if let Ok(OptionalTransitionOutcome::Progress(_)) = check_result {
+ println!("Payjoin transaction detected in the mempool!");
+ return Ok(());
+ }
+ // For Stasis or Err, continue polling (implicit - falls through to next loop iteration)
}
- return Err(ImplementationError::from("Cannot find the transaction in the mempool or the blockchain"));
- },
- |outpoint| {
- let utxo = self.wallet.get_utxo(outpoint);
- match utxo {
- Some(_) => Ok(false),
- None => Ok(true),
+ _ = sync_timer.tick() => {
+ // Time to sync wallet
+ sync_wallet(blockchain_client, self.wallet).await?;
}
}
- )
- .save(persister)
- .map_err(|e| {
- Error::Generic(format!("Error occurred when saving after checking that sender has broadcasted the Payjoin transaction: {e}"))
- });
-
- match check_result {
- Ok(_) => {
- println!("Payjoin transaction detected in the mempool!");
- }
- Err(_) => {
- println!(
- "Transaction was not found in the mempool after {wait_time_for_sync}. Check the state of the transaction manually after running the sync command."
- );
}
+ })
+ .await;
+
+ match result {
+ Ok(ok) => ok,
+ Err(_) => Err(Error::Generic(format!(
+ "Timeout waiting for Payjoin transaction broadcast after {:?}. Check the state of the transaction manually after running the sync command.",
+ timeout_duration
+ ))),
}
-
- Ok(())
}
async fn handle_error(