]> Untitled Git - bdk-cli/commitdiff
Integrate payjoin session persistence into workflow
authorMshehu5 <musheu@gmail.com>
Mon, 16 Feb 2026 13:48:33 +0000 (14:48 +0100)
committerMshehu5 <musheu@gmail.com>
Thu, 2 Jul 2026 11:30:27 +0000 (12:30 +0100)
Wire the payjoin persistence layer into the existing send and receive
flows so session state is saved during normal operation and can be
replayed later. Initialize the database from the handlers, replace
the noop persisters with SQLite-backed persisters, resume existing
sessions, and record seen inputs in the receiver flow for replay
protection.

This moves persistence from a standalone storage layer into the
runtime payjoin workflow so interrupted sessions can continue from
saved state instead of starting over. It also simplifies error
handling by relying on `?` once the storage errors map cleanly into
the CLI error type.

src/handlers.rs
src/payjoin/mod.rs
src/utils.rs

index e1e640fe60b45809b49bf0d2a79bc66be0f47f1e..4f7900a02a00a70e0db6eb78fa3fe85206f016ce 100644 (file)
@@ -77,14 +77,7 @@ use std::convert::TryFrom;
 use std::io::Write;
 use std::path::Path;
 use std::str::FromStr;
-#[cfg(any(
-    feature = "redb",
-    feature = "compiler",
-    feature = "electrum",
-    feature = "esplora",
-    feature = "cbf",
-    feature = "rpc"
-))]
+#[cfg(any(feature = "redb", feature = "compiler"))]
 use std::sync::Arc;
 
 #[cfg(feature = "bip322")]
@@ -100,9 +93,8 @@ use bdk_bip322::{BIP322, MessageProof, MessageVerificationResult};
 ))]
 use {
     crate::commands::OnlineWalletSubCommand::*,
-    crate::payjoin::{PayjoinManager, ohttp::RelayManager},
+    crate::payjoin::PayjoinManager,
     bdk_wallet::bitcoin::{Transaction, consensus::Decodable, hex::FromHex},
-    std::sync::Mutex,
 };
 #[cfg(feature = "esplora")]
 use {crate::utils::BlockchainClient::Esplora, bdk_esplora::EsploraAsyncExt};
@@ -863,6 +855,8 @@ pub(crate) async fn handle_online_wallet_subcommand(
     wallet: &mut Wallet,
     client: &BlockchainClient,
     online_subcommand: OnlineWalletSubCommand,
+    datadir: Option<std::path::PathBuf>,
+    wallet_name: &str,
 ) -> Result<String, Error> {
     match online_subcommand {
         FullScan {
@@ -989,8 +983,7 @@ pub(crate) async fn handle_online_wallet_subcommand(
             ohttp_relay,
             max_fee_rate,
         } => {
-            let relay_manager = Arc::new(Mutex::new(RelayManager::new()));
-            let mut payjoin_manager = PayjoinManager::new(wallet, relay_manager);
+            let mut payjoin_manager = PayjoinManager::new(wallet, datadir.clone(), wallet_name)?;
             return payjoin_manager
                 .receive_payjoin(amount, directory, max_fee_rate, ohttp_relay, client)
                 .await;
@@ -1000,12 +993,22 @@ pub(crate) async fn handle_online_wallet_subcommand(
             ohttp_relay,
             fee_rate,
         } => {
-            let relay_manager = Arc::new(Mutex::new(RelayManager::new()));
-            let mut payjoin_manager = PayjoinManager::new(wallet, relay_manager);
+            let mut payjoin_manager = PayjoinManager::new(wallet, datadir.clone(), wallet_name)?;
             return payjoin_manager
                 .send_payjoin(uri, fee_rate, ohttp_relay, client)
                 .await;
         }
+        ResumePayjoin {
+            directory,
+            ohttp_relay,
+            session_id,
+        } => {
+            let mut payjoin_manager = PayjoinManager::new(wallet, datadir, wallet_name)?;
+            return payjoin_manager
+                .resume_payjoins(directory, ohttp_relay, session_id, client)
+                .await;
+        }
+        PayjoinHistory => crate::payjoin::PayjoinManager::history(datadir, wallet_name),
     }
 }
 
@@ -1511,14 +1514,14 @@ pub(crate) async fn handle_command(cli_opts: CliOpts) -> Result<String, Error> {
             feature = "rpc"
         ))]
         CliSubCommand::Wallet {
-            wallet,
+            wallet: wallet_name,
             subcommand: WalletSubCommand::OnlineWalletSubCommand(online_subcommand),
         } => {
-            let home_dir = prepare_home_dir(cli_opts.datadir)?;
+            let home_dir = prepare_home_dir(cli_opts.datadir.clone())?;
 
-            let (wallet_opts, network) = load_wallet_config(&home_dir, &wallet)?;
+            let (wallet_opts, network) = load_wallet_config(&home_dir, &wallet_name)?;
 
-            let database_path = prepare_wallet_db_dir(&home_dir, &wallet)?;
+            let database_path = prepare_wallet_db_dir(&home_dir, &wallet_name)?;
 
             #[cfg(any(feature = "sqlite", feature = "redb"))]
             let result = {
@@ -1532,13 +1535,16 @@ pub(crate) async fn handle_command(cli_opts: CliOpts) -> Result<String, Error> {
                     }
                     #[cfg(feature = "redb")]
                     DatabaseType::Redb => {
-                        let wallet_name = &wallet_opts.wallet;
+                        let redb_store_wallet_name = &wallet_opts.wallet;
                         let db = Arc::new(bdk_redb::redb::Database::create(
                             home_dir.join("wallet.redb"),
                         )?);
                         let store = RedbStore::new(
                             db,
-                            wallet_name.as_deref().unwrap_or("wallet").to_string(),
+                            redb_store_wallet_name
+                                .as_deref()
+                                .unwrap_or("wallet")
+                                .to_string(),
                         )?;
                         log::debug!("Redb database opened successfully");
                         Persister::RedbStore(store)
@@ -1553,6 +1559,8 @@ pub(crate) async fn handle_command(cli_opts: CliOpts) -> Result<String, Error> {
                     &mut wallet,
                     &blockchain_client,
                     online_subcommand,
+                    cli_opts.datadir.clone(),
+                    &wallet_name,
                 )
                 .await?;
                 wallet.persist(&mut persister)?;
@@ -1563,8 +1571,14 @@ pub(crate) async fn handle_command(cli_opts: CliOpts) -> Result<String, Error> {
                 let mut wallet = new_wallet(network, wallet_opts)?;
                 let blockchain_client =
                     crate::utils::new_blockchain_client(wallet_opts, &wallet, database_path)?;
-                handle_online_wallet_subcommand(&mut wallet, &blockchain_client, online_subcommand)
-                    .await?
+                handle_online_wallet_subcommand(
+                    &mut wallet,
+                    &blockchain_client,
+                    online_subcommand,
+                    cli_opts.datadir.clone(),
+                    &wallet_name,
+                )
+                .await?
             };
             Ok(result)
         }
@@ -1751,7 +1765,7 @@ pub(crate) async fn handle_command(cli_opts: CliOpts) -> Result<String, Error> {
 async fn respond(
     network: Network,
     wallet: &mut Wallet,
-    wallet_name: &String,
+    wallet_name: &str,
     wallet_opts: &mut WalletOpts,
     line: &str,
     _datadir: std::path::PathBuf,
@@ -1773,9 +1787,15 @@ async fn respond(
         } => {
             let blockchain =
                 new_blockchain_client(wallet_opts, wallet, _datadir).map_err(|e| e.to_string())?;
-            let value = handle_online_wallet_subcommand(wallet, &blockchain, online_subcommand)
-                .await
-                .map_err(|e| e.to_string())?;
+            let value = handle_online_wallet_subcommand(
+                wallet,
+                &blockchain,
+                online_subcommand,
+                cli_opts.datadir.clone(),
+                wallet_name,
+            )
+            .await
+            .map_err(|e| e.to_string())?;
             Some(value)
         }
         ReplSubCommand::Wallet {
index 19beb7d2b8662ccab5c65f843fcf65fabee49384..a01203738308d1da36e1a4c8865b7903ef0c9eec 100644 (file)
@@ -5,44 +5,61 @@ use bdk_wallet::{
     SignOptions, Wallet,
     bitcoin::{FeeRate, Psbt, Txid, consensus::encode::serialize_hex},
 };
+use cli_table::{Cell, CellStruct, Style, Table};
 use payjoin::bitcoin::TxIn;
 use payjoin::persist::{OptionalTransitionOutcome, SessionPersister};
 use payjoin::receive::InputPair;
 use payjoin::receive::v2::{
     HasReplyableError, Initialized, MaybeInputsOwned, MaybeInputsSeen, Monitor, OutputsUnknown,
     PayjoinProposal, ProvisionalProposal, ReceiveSession, Receiver,
-    SessionEvent as ReceiverSessionEvent, UncheckedOriginalPayload, WantsFeeRange, WantsInputs,
-    WantsOutputs,
+    SessionEvent as ReceiverSessionEvent, SessionOutcome as ReceiverSessionOutcome,
+    UncheckedOriginalPayload, WantsFeeRange, WantsInputs, WantsOutputs,
+    replay_event_log as replay_receiver_event_log,
 };
 use payjoin::send::v2::{
     PollingForProposal, SendSession, Sender, SessionEvent as SenderSessionEvent,
     SessionOutcome as SenderSessionOutcome, WithReplyKey,
+    replay_event_log as replay_sender_event_log,
 };
-use payjoin::{ImplementationError, UriExt};
+use payjoin::{HpkePublicKey, ImplementationError, UriExt};
 use serde_json::{json, to_string_pretty};
-use std::sync::{Arc, Mutex};
+use std::{
+    path::PathBuf,
+    sync::{Arc, Mutex},
+};
 
+use crate::payjoin::db::{ReceiverPersister, SenderPersister, open_payjoin_db};
 use crate::payjoin::ohttp::{RelayManager, fetch_ohttp_keys};
 
+pub mod db;
 pub mod ohttp;
 
-/// Implements all of the functions required to go through the Payjoin receive and send processes.
+/// Coordinates Payjoin receive and send flows.
 ///
-/// TODO: At the time of writing, this struct is written to make a Persister implementation easier
-/// but the persister is not implemented yet! For instance [`PayjoinManager::proceed_sender_session`] and
-/// [`PayjoinManager::proceed_receiver_session`] are designed such that the manager can enable
-/// resuming ongoing payjoins are well. So... this is a TODO for implementing persister.
+/// The manager owns the wallet, relay manager, and database handles needed to persist session
+/// events and resume ongoing Payjoins through [`PayjoinManager::proceed_sender_session`] and
+/// [`PayjoinManager::proceed_receiver_session`].
 pub(crate) struct PayjoinManager<'a> {
     wallet: &'a mut Wallet,
     relay_manager: Arc<Mutex<RelayManager>>,
+    db: Arc<crate::payjoin::db::Database>,
+}
 }
 
 impl<'a> PayjoinManager<'a> {
-    pub fn new(wallet: &'a mut Wallet, relay_manager: Arc<Mutex<RelayManager>>) -> Self {
-        Self {
+    pub fn new(
+        wallet: &'a mut Wallet,
+        datadir: Option<PathBuf>,
+        wallet_name: &str,
+    ) -> Result<Self, Error> {
+        let db = open_payjoin_db(datadir, wallet_name)?;
+        let relay_manager = Arc::new(Mutex::new(RelayManager::new()));
+
+        Ok(Self {
             wallet,
             relay_manager,
-        }
+            db,
+        })
     }
 
     pub async fn receive_payjoin(
@@ -71,8 +88,8 @@ impl<'a> PayjoinManager<'a> {
 
         let ohttp_keys =
             fetch_ohttp_keys(ohttp_relays, &directory, self.relay_manager.clone()).await?;
-        // TODO: Implement proper persister.
-        let persister = payjoin::persist::NoopSessionPersister::<ReceiverSessionEvent>::default();
+
+        let persister = crate::payjoin::db::ReceiverPersister::new(self.db.clone())?;
 
         let checked_max_fee_rate = max_fee_rate
             .map(FeeRate::from_sat_per_kwu)
@@ -161,7 +178,7 @@ impl<'a> PayjoinManager<'a> {
                 self.process_payjoin_proposal(psbt, blockchain_client)
                     .await?
             }
-            payjoin::PjParam::V2(_) => {
+            payjoin::PjParam::V2(v2_param) => {
                 let ohttp_relays: Vec<url::Url> = ohttp_relays
                     .into_iter()
                     .map(|s| url::Url::parse(&s))
@@ -175,29 +192,54 @@ impl<'a> PayjoinManager<'a> {
                         "At least one valid OHTTP relay must be provided.".into(),
                     ));
                 }
-
-                // TODO: Implement proper persister.
-                let persister =
-                    payjoin::persist::NoopSessionPersister::<SenderSessionEvent>::default();
-
-                let sender = payjoin::send::v2::SenderBuilder::new(original_psbt.clone(), uri)
-                    .build_recommended(fee_rate)?
-                    .save(&persister)
-                    .map_err(|e| {
-                        Error::Generic(format!(
-                            "Failed to save the Payjoin v2 sender in the persister: {e}"
-                        ))
-                    })?;
-
-                let selected_relay =
-                    fetch_ohttp_keys(ohttp_relays, &sender.endpoint(), self.relay_manager.clone())
-                        .await?
-                        .relay_url;
+                // Check for existing session with the same receiver pubkey
+                let receiver_pubkey = v2_param.receiver_pubkey();
+                let existing_session =
+                    self.db
+                        .get_send_session_ids()?
+                        .into_iter()
+                        .find_map(|session_id| {
+                            let session_receiver_pubkey = self
+                                .db
+                                .get_send_session_receiver_pk(&session_id)
+                                .expect("Receiver pubkey should exist if session id exists");
+                            if session_receiver_pubkey == *receiver_pubkey {
+                                Some(session_id)
+                            } else {
+                                None
+                            }
+                        });
+
+                let (sender_state, persister) = if let Some(session_id) = existing_session {
+                    let sender_persister = SenderPersister::from_id(self.db.clone(), session_id);
+                    let (send_session, _) =
+                        replay_sender_event_log(&sender_persister).map_err(|e| {
+                            Error::Generic(format!("Failed to replay sender event log: {e:?}"))
+                        })?;
+                    println!("Resuming existing sender session");
+                    (send_session, sender_persister)
+                } else {
+                    let persister = {
+                        let receiver_pubkey: HpkePublicKey = v2_param.receiver_pubkey().clone();
+                        SenderPersister::new(self.db.clone(), receiver_pubkey)?
+                    };
+
+                    let sender = payjoin::send::v2::SenderBuilder::new(original_psbt.clone(), uri)
+                        .build_recommended(fee_rate)?
+                        .save(&persister)
+                        .map_err(|e| {
+                            Error::Generic(format!(
+                                "Failed to save the Payjoin v2 sender in the persister: {e}"
+                            ))
+                        })?;
+
+                    (SendSession::WithReplyKey(sender), persister)
+                };
 
                 self.proceed_sender_session(
-                    SendSession::WithReplyKey(sender),
+                    sender_state,
                     &persister,
-                    selected_relay.to_string(),
+                    ohttp_relays,
                     blockchain_client,
                 )
                 .await?
@@ -360,13 +402,8 @@ impl<'a> PayjoinManager<'a> {
         blockchain_client: &BlockchainClient,
     ) -> Result<(), Error> {
         let next_receiver_typestate = receiver
-            .check_inputs_not_owned(&mut |input| {
-                Ok(self.wallet.is_mine(input.to_owned()))
-            })
-            .save(persister)
-            .map_err(|e| {
-                Error::Generic(format!("Error occurred when saving after checking if inputs in the original proposal are not owned: {e}"))
-            })?;
+            .check_inputs_not_owned(&mut |input| Ok(self.wallet.is_mine(input.to_owned())))
+            .save(persister)?;
 
         self.check_no_inputs_seen_before(
             next_receiver_typestate,
@@ -384,16 +421,11 @@ impl<'a> PayjoinManager<'a> {
         max_fee_rate: FeeRate,
         blockchain_client: &BlockchainClient,
     ) -> Result<(), Error> {
-        // This is not supported as there is no persistence of previous Payjoin attempts in BDK CLI
-        // yet. If there is support either in the BDK persister or Payjoin persister, this can be
-        // implemented, but it is not a concern as the use cases of the CLI does not warrant
-        // protection against probing attacks.
-        println!(
-            "Checking whether the inputs in the proposal were seen before to protect from probing attacks is not supported. Skipping the check..."
-        );
-        let next_receiver_typestate = receiver.check_no_inputs_seen_before(&mut |_| Ok(false)).save(persister).map_err(|e| {
-            Error::Generic(format!("Error occurred when saving after checking if the inputs in the proposal were seen before: {e}"))
-        })?;
+        let db = self.db.clone();
+        let next_receiver_typestate = receiver
+            .check_no_inputs_seen_before(&mut |input| Ok(db.insert_input_seen_before(*input)?))
+            .save(persister)?;
+
         self.identify_receiver_outputs(
             next_receiver_typestate,
             persister,
@@ -410,11 +442,11 @@ impl<'a> PayjoinManager<'a> {
         max_fee_rate: FeeRate,
         blockchain_client: &BlockchainClient,
     ) -> Result<(), Error> {
-        let next_receiver_typestate = receiver.identify_receiver_outputs(&mut |output_script| {
-            Ok(self.wallet.is_mine(output_script.to_owned()))
-        }).save(persister).map_err(|e| {
-            Error::Generic(format!("Error occurred when saving after checking if the outputs in the original proposal are owned by the receiver: {e}"))
-        })?;
+        let next_receiver_typestate = receiver
+            .identify_receiver_outputs(&mut |output_script| {
+                Ok(self.wallet.is_mine(output_script.to_owned()))
+            })
+            .save(persister)?;
 
         self.commit_outputs(
             next_receiver_typestate,
@@ -498,9 +530,9 @@ impl<'a> PayjoinManager<'a> {
         max_fee_rate: FeeRate,
         blockchain_client: &BlockchainClient,
     ) -> Result<(), Error> {
-        let next_receiver_typestate = receiver.apply_fee_range(None, Some(max_fee_rate)).save(persister).map_err(|e| {
-            Error::Generic(format!("Error occurred when saving after applying the receiver fee range to the transaction: {e}"))
-        })?;
+        let next_receiver_typestate = receiver
+            .apply_fee_range(None, Some(max_fee_rate))
+            .save(persister)?;
         self.finalize_proposal(next_receiver_typestate, persister, blockchain_client)
             .await
     }
@@ -528,12 +560,7 @@ impl<'a> PayjoinManager<'a> {
 
                 Ok(psbt_clone)
             })
-            .save(persister)
-            .map_err(|e| {
-                Error::Generic(format!(
-                    "Error occurred when saving after signing the Payjoin proposal: {e}"
-                ))
-            })?;
+            .save(persister)?;
 
         self.send_payjoin_proposal(next_receiver_typestate, persister, blockchain_client)
             .await
@@ -545,22 +572,21 @@ impl<'a> PayjoinManager<'a> {
         persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
         blockchain_client: &BlockchainClient,
     ) -> Result<(), Error> {
-        let (req, ctx) = receiver.create_post_request(
-            self.relay_manager
-                .lock()
-                .expect("Lock should not be poisoned")
-                .get_selected_relay()
-                .expect("A relay should already be selected")
-                .as_str(),
-        ).map_err(|e| {
+        let (req, ctx) = receiver
+            .create_post_request(
+                self.unwrap_relay_or_else_fetch(vec![], None::<&str>)
+                    .await?
+                    .as_str(),
+            )
+            .map_err(|e| {
                 Error::Generic(format!("Error occurred when creating a post request for sending final Payjoin proposal: {e}"))
             })?;
 
         let res = self.send_payjoin_post_request(req).await?;
         let payjoin_psbt = receiver.psbt().clone();
-        let next_receiver_typestate = receiver.process_response(&res.bytes().await?, ctx).save(persister).map_err(|e| {
-            Error::Generic(format!("Error occurred when saving after processing the response to the Payjoin proposal send: {e}"))
-        })?;
+        let next_receiver_typestate = receiver
+            .process_response(&res.bytes().await?, ctx)
+            .save(persister)?;
         println!(
             "Response successful. TXID: {}",
             payjoin_psbt.extract_tx_unchecked_fee_rate().compute_txid()
@@ -623,10 +649,7 @@ impl<'a> PayjoinManager<'a> {
                                     }
                                 }
                             )
-                            .save(persister)
-                            .map_err(|e| {
-                                Error::Generic(format!("Error occurred when saving after checking that sender has broadcasted the Payjoin transaction: {e}"))
-                            });
+                            .save(persister);
 
                         if let Ok(OptionalTransitionOutcome::Progress(_)) = check_result {
                             println!("Payjoin transaction detected in the mempool!");
@@ -659,11 +682,8 @@ impl<'a> PayjoinManager<'a> {
     ) -> Result<(), Error> {
         let (err_req, err_ctx) = receiver
             .create_error_request(
-                self.relay_manager
-                    .lock()
-                    .expect("Lock should not be poisoned")
-                    .get_selected_relay()
-                    .expect("A relay should already be selected")
+                self.unwrap_relay_or_else_fetch(vec![], None::<&str>)
+                    .await?
                     .as_str(),
             )
             .map_err(|e| {
@@ -710,17 +730,33 @@ impl<'a> PayjoinManager<'a> {
         &self,
         session: SendSession,
         persister: &impl SessionPersister<SessionEvent = SenderSessionEvent>,
-        relay: impl payjoin::IntoUrl,
+        ohttp_relays: Vec<url::Url>,
         blockchain_client: &BlockchainClient,
     ) -> Result<Txid, Error> {
         match session {
             SendSession::WithReplyKey(context) => {
-                self.post_original_proposal(context, relay, persister, blockchain_client)
-                    .await
+                let relay = self
+                    .unwrap_relay_or_else_fetch(ohttp_relays, Some(context.endpoint()))
+                    .await?;
+                self.post_original_proposal(
+                    context,
+                    persister,
+                    blockchain_client,
+                    relay.to_string(),
+                )
+                .await
             }
             SendSession::PollingForProposal(context) => {
-                self.get_proposed_payjoin_proposal(context, relay, persister, blockchain_client)
-                    .await
+                let relay = self
+                    .unwrap_relay_or_else_fetch(ohttp_relays, Some(context.endpoint()))
+                    .await?;
+                self.get_proposed_payjoin_proposal(
+                    context,
+                    persister,
+                    blockchain_client,
+                    relay.to_string(),
+                )
+                .await
             }
             SendSession::Closed(SenderSessionOutcome::Success(psbt)) => {
                 self.process_payjoin_proposal(psbt, blockchain_client).await
@@ -729,31 +765,53 @@ impl<'a> PayjoinManager<'a> {
         }
     }
 
+    async fn unwrap_relay_or_else_fetch(
+        &self,
+        ohttp_relays: Vec<url::Url>,
+        directory: Option<impl payjoin::IntoUrl>,
+    ) -> Result<url::Url, Error> {
+        let selected_relay = self
+            .relay_manager
+            .lock()
+            .expect("Lock should not be poisoned")
+            .get_selected_relay();
+        match selected_relay {
+            Some(relay) => Ok(relay),
+            None => {
+                let directory = directory.ok_or_else(|| {
+                    Error::Generic("No directory URL provided and no relay selected".to_string())
+                })?;
+                Ok(
+                    fetch_ohttp_keys(ohttp_relays, directory, self.relay_manager.clone())
+                        .await?
+                        .relay_url,
+                )
+            }
+        }
+    }
+
     async fn post_original_proposal(
         &self,
         sender: Sender<WithReplyKey>,
-        relay: impl payjoin::IntoUrl,
         persister: &impl SessionPersister<SessionEvent = SenderSessionEvent>,
         blockchain_client: &BlockchainClient,
+        relay: impl payjoin::IntoUrl,
     ) -> Result<Txid, Error> {
         let (req, ctx) = sender.create_v2_post_request(relay.as_str())?;
         let response = self.send_payjoin_post_request(req).await?;
         let sender = sender
             .process_response(&response.bytes().await?, ctx)
-            .save(persister)
-        .map_err(|e| {
-                Error::Generic(format!("Failed to persist the Payjoin send after successfully sending original proposal: {e}"))
-            })?;
-        self.get_proposed_payjoin_proposal(sender, relay, persister, blockchain_client)
+            .save(persister)?;
+        self.get_proposed_payjoin_proposal(sender, persister, blockchain_client, relay)
             .await
     }
 
     async fn get_proposed_payjoin_proposal(
         &self,
         sender: Sender<PollingForProposal>,
-        relay: impl payjoin::IntoUrl,
         persister: &impl SessionPersister<SessionEvent = SenderSessionEvent>,
         blockchain_client: &BlockchainClient,
+        relay: impl payjoin::IntoUrl,
     ) -> Result<Txid, Error> {
         let mut sender = sender.clone();
         loop {
@@ -807,4 +865,252 @@ impl<'a> PayjoinManager<'a> {
             .send()
             .await
     }
+
+    /// Resume pending payjoin sessions from the database
+    pub async fn resume_payjoins(
+        &mut self,
+        directory: String,
+        ohttp_relays: Vec<String>,
+        session_id: Option<i64>,
+        blockchain_client: &BlockchainClient,
+    ) -> Result<String, Error> {
+        let db = self.db.clone();
+        let mut recv_session_ids = db.get_recv_session_ids()?;
+        let mut send_session_ids = db.get_send_session_ids()?;
+
+        if let Some(session_id) = session_id {
+            recv_session_ids.retain(|id| id.as_i64() == session_id);
+            send_session_ids.retain(|id| id.as_i64() == session_id);
+
+            if recv_session_ids.is_empty() && send_session_ids.is_empty() {
+                return Ok(serde_json::to_string_pretty(&json!({
+                    "message": format!("No active session found for session_id {}.", session_id)
+                }))?);
+            }
+        }
+
+        if recv_session_ids.is_empty() && send_session_ids.is_empty() {
+            return Ok(serde_json::to_string_pretty(&json!({
+                "message": "No sessions to resume."
+            }))?);
+        }
+
+        let ohttp_relays: Vec<url::Url> = ohttp_relays
+            .into_iter()
+            .map(|s| url::Url::parse(&s))
+            .collect::<Result<_, _>>()
+            .map_err(|e| Error::Generic(format!("Failed to parse OHTTP URLs: {e}")))?;
+
+        let relay = self
+            .unwrap_relay_or_else_fetch(ohttp_relays, Some(&directory))
+            .await?;
+
+        let max_fee_rate = FeeRate::BROADCAST_MIN;
+        let total_sessions = recv_session_ids.len() + send_session_ids.len();
+        let mut completed = 0usize;
+        let mut timed_out = 0usize;
+        let mut failed = 0usize;
+
+        println!("Resuming {} payjoin session(s)...\n", total_sessions);
+
+        // Resume receiver sessions
+        for session_id in recv_session_ids {
+            let persister = ReceiverPersister::from_id(db.clone(), session_id.clone());
+            match replay_receiver_event_log(&persister) {
+                Ok((receiver_state, _)) => {
+                    println!("Resuming receiver session {}", session_id);
+                    match tokio::time::timeout(
+                        std::time::Duration::from_secs(30),
+                        self.proceed_receiver_session(
+                            receiver_state,
+                            &persister,
+                            relay.as_str(),
+                            max_fee_rate,
+                            blockchain_client,
+                        ),
+                    )
+                    .await
+                    {
+                        Ok(Ok(_)) => {
+                            completed += 1;
+                        }
+                        Ok(Err(e)) => {
+                            failed += 1;
+                            println!("Receiver session {} failed: {}", session_id, e);
+                        }
+                        Err(_) => {
+                            timed_out += 1;
+                            println!("Receiver session {} timed out", session_id);
+                        }
+                    }
+                }
+                Err(e) => {
+                    failed += 1;
+                    println!("Failed to replay receiver session {}: {:?}", session_id, e);
+                }
+            }
+        }
+
+        // Resume sender sessions
+        for session_id in send_session_ids {
+            let persister = SenderPersister::from_id(db.clone(), session_id.clone());
+            match replay_sender_event_log(&persister) {
+                Ok((sender_state, _)) => {
+                    println!("Resuming sender session {}", session_id);
+                    match tokio::time::timeout(
+                        std::time::Duration::from_secs(30),
+                        self.proceed_sender_session(
+                            sender_state,
+                            &persister,
+                            vec![relay.clone()],
+                            blockchain_client,
+                        ),
+                    )
+                    .await
+                    {
+                        Ok(Ok(_)) => {
+                            completed += 1;
+                        }
+                        Ok(Err(e)) => {
+                            failed += 1;
+                            println!("Sender session {} failed: {}", session_id, e);
+                        }
+                        Err(_) => {
+                            timed_out += 1;
+                            println!("Sender session {} timed out", session_id);
+                        }
+                    }
+                }
+                Err(e) => {
+                    failed += 1;
+                    println!("Failed to replay sender session {}: {:?}", session_id, e);
+                }
+            }
+        }
+
+        Ok(serde_json::to_string_pretty(&json!({
+            "message": format!("Resumed polling for {} session(s).", total_sessions),
+            "outcome": format!(
+                "Completed: {}, timed out: {}, failed: {}.",
+                completed, timed_out, failed
+            )
+        }))?)
+    }
+
+    /// Show payjoin session history
+    pub fn history(
+        datadir: Option<std::path::PathBuf>,
+        wallet_name: &str,
+    ) -> Result<String, Error> {
+        let db = open_payjoin_db(datadir, wallet_name)?;
+        let mut send_rows: Vec<SessionHistoryRow> = Vec::new();
+        let mut recv_rows: Vec<SessionHistoryRow> = Vec::new();
+
+        // Active send sessions
+        for session_id in db
+            .get_send_session_ids()
+            .map_err(|e| Error::Generic(format!("{e}")))?
+        {
+            let persister = SenderPersister::from_id(db.clone(), session_id.clone());
+            let status = match replay_sender_event_log(&persister) {
+                Ok((state, _)) => state.status_text().to_string(),
+                Err(e) => e.to_string(),
+            };
+            send_rows.push(SessionHistoryRow {
+                id: session_id.to_string(),
+                role: "Sender",
+                status,
+                completed_at: None,
+            });
+        }
+
+        // Active receive sessions
+        for session_id in db
+            .get_recv_session_ids()
+            .map_err(|e| Error::Generic(format!("{e}")))?
+        {
+            let persister = ReceiverPersister::from_id(db.clone(), session_id.clone());
+            let status = match replay_receiver_event_log(&persister) {
+                Ok((state, _)) => state.status_text().to_string(),
+                Err(e) => e.to_string(),
+            };
+            recv_rows.push(SessionHistoryRow {
+                id: session_id.to_string(),
+                role: "Receiver",
+                status,
+                completed_at: None,
+            });
+        }
+
+        // Completed send sessions
+        for (session_id, completed_at) in db
+            .get_inactive_send_session_ids()
+            .map_err(|e| Error::Generic(format!("{e}")))?
+        {
+            let persister = SenderPersister::from_id(db.clone(), session_id.clone());
+            let status = match replay_sender_event_log(&persister) {
+                Ok((state, _)) => state.status_text().to_string(),
+                Err(e) => e.to_string(),
+            };
+            let completed_at = db
+                .format_unix_timestamp(completed_at)
+                .map_err(|e| Error::Generic(format!("{e}")))?;
+            send_rows.push(SessionHistoryRow {
+                id: session_id.to_string(),
+                role: "Sender",
+                status,
+                completed_at: Some(completed_at),
+            });
+        }
+
+        // Completed receive sessions
+        for (session_id, completed_at) in db
+            .get_inactive_recv_session_ids()
+            .map_err(|e| Error::Generic(format!("{e}")))?
+        {
+            let persister = ReceiverPersister::from_id(db.clone(), session_id.clone());
+            let status = match replay_receiver_event_log(&persister) {
+                Ok((state, _)) => state.status_text().to_string(),
+                Err(e) => e.to_string(),
+            };
+            let completed_at = db
+                .format_unix_timestamp(completed_at)
+                .map_err(|e| Error::Generic(format!("{e}")))?;
+            recv_rows.push(SessionHistoryRow {
+                id: session_id.to_string(),
+                role: "Receiver",
+                status,
+                completed_at: Some(completed_at),
+            });
+        }
+
+        let rows: Vec<Vec<CellStruct>> = send_rows
+            .iter()
+            .chain(recv_rows.iter())
+            .map(|row| {
+                vec![
+                    row.id.as_str().cell(),
+                    row.role.cell(),
+                    row.completed_at
+                        .clone()
+                        .unwrap_or_else(|| "Not Completed".to_string())
+                        .cell(),
+                    row.status.as_str().cell(),
+                ]
+            })
+            .collect();
+
+        let table = rows
+            .table()
+            .title(vec![
+                "Session ID".cell().bold(true),
+                "Sender/Receiver".cell().bold(true),
+                "Completed At".cell().bold(true),
+                "Status".cell().bold(true),
+            ])
+            .display()
+            .map_err(|e| Error::Generic(e.to_string()))?;
+
+        Ok(format!("{table}"))
+    }
 }
index 7d357e818b65b40ab2d2bb0058e1f96e89fb9a92..47139fbf9d4575520d6640b50b5291ef91e114ff 100644 (file)
@@ -175,7 +175,6 @@ pub(crate) fn prepare_wallet_db_dir(
 
     Ok(dir)
 }
-
 #[cfg(any(
     feature = "electrum",
     feature = "esplora",