use std::io::Write;
use std::path::Path;
use std::str::FromStr;
-#[cfg(any(
- feature = "redb",
- feature = "compiler",
- feature = "electrum",
- feature = "esplora",
- feature = "cbf",
- feature = "rpc"
-))]
+#[cfg(any(feature = "redb", feature = "compiler"))]
use std::sync::Arc;
#[cfg(feature = "bip322")]
))]
use {
crate::commands::OnlineWalletSubCommand::*,
- crate::payjoin::{PayjoinManager, ohttp::RelayManager},
+ crate::payjoin::PayjoinManager,
bdk_wallet::bitcoin::{Transaction, consensus::Decodable, hex::FromHex},
- std::sync::Mutex,
};
#[cfg(feature = "esplora")]
use {crate::utils::BlockchainClient::Esplora, bdk_esplora::EsploraAsyncExt};
wallet: &mut Wallet,
client: &BlockchainClient,
online_subcommand: OnlineWalletSubCommand,
+ datadir: Option<std::path::PathBuf>,
+ wallet_name: &str,
) -> Result<String, Error> {
match online_subcommand {
FullScan {
ohttp_relay,
max_fee_rate,
} => {
- let relay_manager = Arc::new(Mutex::new(RelayManager::new()));
- let mut payjoin_manager = PayjoinManager::new(wallet, relay_manager);
+ let mut payjoin_manager = PayjoinManager::new(wallet, datadir.clone(), wallet_name)?;
return payjoin_manager
.receive_payjoin(amount, directory, max_fee_rate, ohttp_relay, client)
.await;
ohttp_relay,
fee_rate,
} => {
- let relay_manager = Arc::new(Mutex::new(RelayManager::new()));
- let mut payjoin_manager = PayjoinManager::new(wallet, relay_manager);
+ let mut payjoin_manager = PayjoinManager::new(wallet, datadir.clone(), wallet_name)?;
return payjoin_manager
.send_payjoin(uri, fee_rate, ohttp_relay, client)
.await;
}
+ ResumePayjoin {
+ directory,
+ ohttp_relay,
+ session_id,
+ } => {
+ let mut payjoin_manager = PayjoinManager::new(wallet, datadir, wallet_name)?;
+ return payjoin_manager
+ .resume_payjoins(directory, ohttp_relay, session_id, client)
+ .await;
+ }
+ PayjoinHistory => crate::payjoin::PayjoinManager::history(datadir, wallet_name),
}
}
feature = "rpc"
))]
CliSubCommand::Wallet {
- wallet,
+ wallet: wallet_name,
subcommand: WalletSubCommand::OnlineWalletSubCommand(online_subcommand),
} => {
- let home_dir = prepare_home_dir(cli_opts.datadir)?;
+ let home_dir = prepare_home_dir(cli_opts.datadir.clone())?;
- let (wallet_opts, network) = load_wallet_config(&home_dir, &wallet)?;
+ let (wallet_opts, network) = load_wallet_config(&home_dir, &wallet_name)?;
- let database_path = prepare_wallet_db_dir(&home_dir, &wallet)?;
+ let database_path = prepare_wallet_db_dir(&home_dir, &wallet_name)?;
#[cfg(any(feature = "sqlite", feature = "redb"))]
let result = {
}
#[cfg(feature = "redb")]
DatabaseType::Redb => {
- let wallet_name = &wallet_opts.wallet;
+ let redb_store_wallet_name = &wallet_opts.wallet;
let db = Arc::new(bdk_redb::redb::Database::create(
home_dir.join("wallet.redb"),
)?);
let store = RedbStore::new(
db,
- wallet_name.as_deref().unwrap_or("wallet").to_string(),
+ redb_store_wallet_name
+ .as_deref()
+ .unwrap_or("wallet")
+ .to_string(),
)?;
log::debug!("Redb database opened successfully");
Persister::RedbStore(store)
&mut wallet,
&blockchain_client,
online_subcommand,
+ cli_opts.datadir.clone(),
+ &wallet_name,
)
.await?;
wallet.persist(&mut persister)?;
let mut wallet = new_wallet(network, wallet_opts)?;
let blockchain_client =
crate::utils::new_blockchain_client(wallet_opts, &wallet, database_path)?;
- handle_online_wallet_subcommand(&mut wallet, &blockchain_client, online_subcommand)
- .await?
+ handle_online_wallet_subcommand(
+ &mut wallet,
+ &blockchain_client,
+ online_subcommand,
+ cli_opts.datadir.clone(),
+ &wallet_name,
+ )
+ .await?
};
Ok(result)
}
async fn respond(
network: Network,
wallet: &mut Wallet,
- wallet_name: &String,
+ wallet_name: &str,
wallet_opts: &mut WalletOpts,
line: &str,
_datadir: std::path::PathBuf,
} => {
let blockchain =
new_blockchain_client(wallet_opts, wallet, _datadir).map_err(|e| e.to_string())?;
- let value = handle_online_wallet_subcommand(wallet, &blockchain, online_subcommand)
- .await
- .map_err(|e| e.to_string())?;
+ let value = handle_online_wallet_subcommand(
+ wallet,
+ &blockchain,
+ online_subcommand,
+ cli_opts.datadir.clone(),
+ wallet_name,
+ )
+ .await
+ .map_err(|e| e.to_string())?;
Some(value)
}
ReplSubCommand::Wallet {
SignOptions, Wallet,
bitcoin::{FeeRate, Psbt, Txid, consensus::encode::serialize_hex},
};
+use cli_table::{Cell, CellStruct, Style, Table};
use payjoin::bitcoin::TxIn;
use payjoin::persist::{OptionalTransitionOutcome, SessionPersister};
use payjoin::receive::InputPair;
use payjoin::receive::v2::{
HasReplyableError, Initialized, MaybeInputsOwned, MaybeInputsSeen, Monitor, OutputsUnknown,
PayjoinProposal, ProvisionalProposal, ReceiveSession, Receiver,
- SessionEvent as ReceiverSessionEvent, UncheckedOriginalPayload, WantsFeeRange, WantsInputs,
- WantsOutputs,
+ SessionEvent as ReceiverSessionEvent, SessionOutcome as ReceiverSessionOutcome,
+ UncheckedOriginalPayload, WantsFeeRange, WantsInputs, WantsOutputs,
+ replay_event_log as replay_receiver_event_log,
};
use payjoin::send::v2::{
PollingForProposal, SendSession, Sender, SessionEvent as SenderSessionEvent,
SessionOutcome as SenderSessionOutcome, WithReplyKey,
+ replay_event_log as replay_sender_event_log,
};
-use payjoin::{ImplementationError, UriExt};
+use payjoin::{HpkePublicKey, ImplementationError, UriExt};
use serde_json::{json, to_string_pretty};
-use std::sync::{Arc, Mutex};
+use std::{
+ path::PathBuf,
+ sync::{Arc, Mutex},
+};
+use crate::payjoin::db::{ReceiverPersister, SenderPersister, open_payjoin_db};
use crate::payjoin::ohttp::{RelayManager, fetch_ohttp_keys};
+pub mod db;
pub mod ohttp;
-/// Implements all of the functions required to go through the Payjoin receive and send processes.
+/// Coordinates Payjoin receive and send flows.
///
-/// TODO: At the time of writing, this struct is written to make a Persister implementation easier
-/// but the persister is not implemented yet! For instance [`PayjoinManager::proceed_sender_session`] and
-/// [`PayjoinManager::proceed_receiver_session`] are designed such that the manager can enable
-/// resuming ongoing payjoins are well. So... this is a TODO for implementing persister.
+/// The manager owns the wallet, relay manager, and database handles needed to persist session
+/// events and resume ongoing Payjoins through [`PayjoinManager::proceed_sender_session`] and
+/// [`PayjoinManager::proceed_receiver_session`].
pub(crate) struct PayjoinManager<'a> {
wallet: &'a mut Wallet,
relay_manager: Arc<Mutex<RelayManager>>,
+ db: Arc<crate::payjoin::db::Database>,
+}
}
impl<'a> PayjoinManager<'a> {
- pub fn new(wallet: &'a mut Wallet, relay_manager: Arc<Mutex<RelayManager>>) -> Self {
- Self {
+ pub fn new(
+ wallet: &'a mut Wallet,
+ datadir: Option<PathBuf>,
+ wallet_name: &str,
+ ) -> Result<Self, Error> {
+ let db = open_payjoin_db(datadir, wallet_name)?;
+ let relay_manager = Arc::new(Mutex::new(RelayManager::new()));
+
+ Ok(Self {
wallet,
relay_manager,
- }
+ db,
+ })
}
pub async fn receive_payjoin(
let ohttp_keys =
fetch_ohttp_keys(ohttp_relays, &directory, self.relay_manager.clone()).await?;
- // TODO: Implement proper persister.
- let persister = payjoin::persist::NoopSessionPersister::<ReceiverSessionEvent>::default();
+
+ let persister = crate::payjoin::db::ReceiverPersister::new(self.db.clone())?;
let checked_max_fee_rate = max_fee_rate
.map(FeeRate::from_sat_per_kwu)
self.process_payjoin_proposal(psbt, blockchain_client)
.await?
}
- payjoin::PjParam::V2(_) => {
+ payjoin::PjParam::V2(v2_param) => {
let ohttp_relays: Vec<url::Url> = ohttp_relays
.into_iter()
.map(|s| url::Url::parse(&s))
"At least one valid OHTTP relay must be provided.".into(),
));
}
-
- // TODO: Implement proper persister.
- let persister =
- payjoin::persist::NoopSessionPersister::<SenderSessionEvent>::default();
-
- let sender = payjoin::send::v2::SenderBuilder::new(original_psbt.clone(), uri)
- .build_recommended(fee_rate)?
- .save(&persister)
- .map_err(|e| {
- Error::Generic(format!(
- "Failed to save the Payjoin v2 sender in the persister: {e}"
- ))
- })?;
-
- let selected_relay =
- fetch_ohttp_keys(ohttp_relays, &sender.endpoint(), self.relay_manager.clone())
- .await?
- .relay_url;
+ // Check for existing session with the same receiver pubkey
+ let receiver_pubkey = v2_param.receiver_pubkey();
+ let existing_session =
+ self.db
+ .get_send_session_ids()?
+ .into_iter()
+ .find_map(|session_id| {
+ let session_receiver_pubkey = self
+ .db
+ .get_send_session_receiver_pk(&session_id)
+ .expect("Receiver pubkey should exist if session id exists");
+ if session_receiver_pubkey == *receiver_pubkey {
+ Some(session_id)
+ } else {
+ None
+ }
+ });
+
+ let (sender_state, persister) = if let Some(session_id) = existing_session {
+ let sender_persister = SenderPersister::from_id(self.db.clone(), session_id);
+ let (send_session, _) =
+ replay_sender_event_log(&sender_persister).map_err(|e| {
+ Error::Generic(format!("Failed to replay sender event log: {e:?}"))
+ })?;
+ println!("Resuming existing sender session");
+ (send_session, sender_persister)
+ } else {
+ let persister = {
+ let receiver_pubkey: HpkePublicKey = v2_param.receiver_pubkey().clone();
+ SenderPersister::new(self.db.clone(), receiver_pubkey)?
+ };
+
+ let sender = payjoin::send::v2::SenderBuilder::new(original_psbt.clone(), uri)
+ .build_recommended(fee_rate)?
+ .save(&persister)
+ .map_err(|e| {
+ Error::Generic(format!(
+ "Failed to save the Payjoin v2 sender in the persister: {e}"
+ ))
+ })?;
+
+ (SendSession::WithReplyKey(sender), persister)
+ };
self.proceed_sender_session(
- SendSession::WithReplyKey(sender),
+ sender_state,
&persister,
- selected_relay.to_string(),
+ ohttp_relays,
blockchain_client,
)
.await?
blockchain_client: &BlockchainClient,
) -> Result<(), Error> {
let next_receiver_typestate = receiver
- .check_inputs_not_owned(&mut |input| {
- Ok(self.wallet.is_mine(input.to_owned()))
- })
- .save(persister)
- .map_err(|e| {
- Error::Generic(format!("Error occurred when saving after checking if inputs in the original proposal are not owned: {e}"))
- })?;
+ .check_inputs_not_owned(&mut |input| Ok(self.wallet.is_mine(input.to_owned())))
+ .save(persister)?;
self.check_no_inputs_seen_before(
next_receiver_typestate,
max_fee_rate: FeeRate,
blockchain_client: &BlockchainClient,
) -> Result<(), Error> {
- // This is not supported as there is no persistence of previous Payjoin attempts in BDK CLI
- // yet. If there is support either in the BDK persister or Payjoin persister, this can be
- // implemented, but it is not a concern as the use cases of the CLI does not warrant
- // protection against probing attacks.
- println!(
- "Checking whether the inputs in the proposal were seen before to protect from probing attacks is not supported. Skipping the check..."
- );
- let next_receiver_typestate = receiver.check_no_inputs_seen_before(&mut |_| Ok(false)).save(persister).map_err(|e| {
- Error::Generic(format!("Error occurred when saving after checking if the inputs in the proposal were seen before: {e}"))
- })?;
+ let db = self.db.clone();
+ let next_receiver_typestate = receiver
+ .check_no_inputs_seen_before(&mut |input| Ok(db.insert_input_seen_before(*input)?))
+ .save(persister)?;
+
self.identify_receiver_outputs(
next_receiver_typestate,
persister,
max_fee_rate: FeeRate,
blockchain_client: &BlockchainClient,
) -> Result<(), Error> {
- let next_receiver_typestate = receiver.identify_receiver_outputs(&mut |output_script| {
- Ok(self.wallet.is_mine(output_script.to_owned()))
- }).save(persister).map_err(|e| {
- Error::Generic(format!("Error occurred when saving after checking if the outputs in the original proposal are owned by the receiver: {e}"))
- })?;
+ let next_receiver_typestate = receiver
+ .identify_receiver_outputs(&mut |output_script| {
+ Ok(self.wallet.is_mine(output_script.to_owned()))
+ })
+ .save(persister)?;
self.commit_outputs(
next_receiver_typestate,
max_fee_rate: FeeRate,
blockchain_client: &BlockchainClient,
) -> Result<(), Error> {
- let next_receiver_typestate = receiver.apply_fee_range(None, Some(max_fee_rate)).save(persister).map_err(|e| {
- Error::Generic(format!("Error occurred when saving after applying the receiver fee range to the transaction: {e}"))
- })?;
+ let next_receiver_typestate = receiver
+ .apply_fee_range(None, Some(max_fee_rate))
+ .save(persister)?;
self.finalize_proposal(next_receiver_typestate, persister, blockchain_client)
.await
}
Ok(psbt_clone)
})
- .save(persister)
- .map_err(|e| {
- Error::Generic(format!(
- "Error occurred when saving after signing the Payjoin proposal: {e}"
- ))
- })?;
+ .save(persister)?;
self.send_payjoin_proposal(next_receiver_typestate, persister, blockchain_client)
.await
persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
blockchain_client: &BlockchainClient,
) -> Result<(), Error> {
- let (req, ctx) = receiver.create_post_request(
- self.relay_manager
- .lock()
- .expect("Lock should not be poisoned")
- .get_selected_relay()
- .expect("A relay should already be selected")
- .as_str(),
- ).map_err(|e| {
+ let (req, ctx) = receiver
+ .create_post_request(
+ self.unwrap_relay_or_else_fetch(vec![], None::<&str>)
+ .await?
+ .as_str(),
+ )
+ .map_err(|e| {
Error::Generic(format!("Error occurred when creating a post request for sending final Payjoin proposal: {e}"))
})?;
let res = self.send_payjoin_post_request(req).await?;
let payjoin_psbt = receiver.psbt().clone();
- let next_receiver_typestate = receiver.process_response(&res.bytes().await?, ctx).save(persister).map_err(|e| {
- Error::Generic(format!("Error occurred when saving after processing the response to the Payjoin proposal send: {e}"))
- })?;
+ let next_receiver_typestate = receiver
+ .process_response(&res.bytes().await?, ctx)
+ .save(persister)?;
println!(
"Response successful. TXID: {}",
payjoin_psbt.extract_tx_unchecked_fee_rate().compute_txid()
}
}
)
- .save(persister)
- .map_err(|e| {
- Error::Generic(format!("Error occurred when saving after checking that sender has broadcasted the Payjoin transaction: {e}"))
- });
+ .save(persister);
if let Ok(OptionalTransitionOutcome::Progress(_)) = check_result {
println!("Payjoin transaction detected in the mempool!");
) -> Result<(), Error> {
let (err_req, err_ctx) = receiver
.create_error_request(
- self.relay_manager
- .lock()
- .expect("Lock should not be poisoned")
- .get_selected_relay()
- .expect("A relay should already be selected")
+ self.unwrap_relay_or_else_fetch(vec![], None::<&str>)
+ .await?
.as_str(),
)
.map_err(|e| {
&self,
session: SendSession,
persister: &impl SessionPersister<SessionEvent = SenderSessionEvent>,
- relay: impl payjoin::IntoUrl,
+ ohttp_relays: Vec<url::Url>,
blockchain_client: &BlockchainClient,
) -> Result<Txid, Error> {
match session {
SendSession::WithReplyKey(context) => {
- self.post_original_proposal(context, relay, persister, blockchain_client)
- .await
+ let relay = self
+ .unwrap_relay_or_else_fetch(ohttp_relays, Some(context.endpoint()))
+ .await?;
+ self.post_original_proposal(
+ context,
+ persister,
+ blockchain_client,
+ relay.to_string(),
+ )
+ .await
}
SendSession::PollingForProposal(context) => {
- self.get_proposed_payjoin_proposal(context, relay, persister, blockchain_client)
- .await
+ let relay = self
+ .unwrap_relay_or_else_fetch(ohttp_relays, Some(context.endpoint()))
+ .await?;
+ self.get_proposed_payjoin_proposal(
+ context,
+ persister,
+ blockchain_client,
+ relay.to_string(),
+ )
+ .await
}
SendSession::Closed(SenderSessionOutcome::Success(psbt)) => {
self.process_payjoin_proposal(psbt, blockchain_client).await
}
}
+ async fn unwrap_relay_or_else_fetch(
+ &self,
+ ohttp_relays: Vec<url::Url>,
+ directory: Option<impl payjoin::IntoUrl>,
+ ) -> Result<url::Url, Error> {
+ let selected_relay = self
+ .relay_manager
+ .lock()
+ .expect("Lock should not be poisoned")
+ .get_selected_relay();
+ match selected_relay {
+ Some(relay) => Ok(relay),
+ None => {
+ let directory = directory.ok_or_else(|| {
+ Error::Generic("No directory URL provided and no relay selected".to_string())
+ })?;
+ Ok(
+ fetch_ohttp_keys(ohttp_relays, directory, self.relay_manager.clone())
+ .await?
+ .relay_url,
+ )
+ }
+ }
+ }
+
async fn post_original_proposal(
&self,
sender: Sender<WithReplyKey>,
- relay: impl payjoin::IntoUrl,
persister: &impl SessionPersister<SessionEvent = SenderSessionEvent>,
blockchain_client: &BlockchainClient,
+ relay: impl payjoin::IntoUrl,
) -> Result<Txid, Error> {
let (req, ctx) = sender.create_v2_post_request(relay.as_str())?;
let response = self.send_payjoin_post_request(req).await?;
let sender = sender
.process_response(&response.bytes().await?, ctx)
- .save(persister)
- .map_err(|e| {
- Error::Generic(format!("Failed to persist the Payjoin send after successfully sending original proposal: {e}"))
- })?;
- self.get_proposed_payjoin_proposal(sender, relay, persister, blockchain_client)
+ .save(persister)?;
+ self.get_proposed_payjoin_proposal(sender, persister, blockchain_client, relay)
.await
}
async fn get_proposed_payjoin_proposal(
&self,
sender: Sender<PollingForProposal>,
- relay: impl payjoin::IntoUrl,
persister: &impl SessionPersister<SessionEvent = SenderSessionEvent>,
blockchain_client: &BlockchainClient,
+ relay: impl payjoin::IntoUrl,
) -> Result<Txid, Error> {
let mut sender = sender.clone();
loop {
.send()
.await
}
+
+ /// Resume pending payjoin sessions from the database
+ pub async fn resume_payjoins(
+ &mut self,
+ directory: String,
+ ohttp_relays: Vec<String>,
+ session_id: Option<i64>,
+ blockchain_client: &BlockchainClient,
+ ) -> Result<String, Error> {
+ let db = self.db.clone();
+ let mut recv_session_ids = db.get_recv_session_ids()?;
+ let mut send_session_ids = db.get_send_session_ids()?;
+
+ if let Some(session_id) = session_id {
+ recv_session_ids.retain(|id| id.as_i64() == session_id);
+ send_session_ids.retain(|id| id.as_i64() == session_id);
+
+ if recv_session_ids.is_empty() && send_session_ids.is_empty() {
+ return Ok(serde_json::to_string_pretty(&json!({
+ "message": format!("No active session found for session_id {}.", session_id)
+ }))?);
+ }
+ }
+
+ if recv_session_ids.is_empty() && send_session_ids.is_empty() {
+ return Ok(serde_json::to_string_pretty(&json!({
+ "message": "No sessions to resume."
+ }))?);
+ }
+
+ let ohttp_relays: Vec<url::Url> = ohttp_relays
+ .into_iter()
+ .map(|s| url::Url::parse(&s))
+ .collect::<Result<_, _>>()
+ .map_err(|e| Error::Generic(format!("Failed to parse OHTTP URLs: {e}")))?;
+
+ let relay = self
+ .unwrap_relay_or_else_fetch(ohttp_relays, Some(&directory))
+ .await?;
+
+ let max_fee_rate = FeeRate::BROADCAST_MIN;
+ let total_sessions = recv_session_ids.len() + send_session_ids.len();
+ let mut completed = 0usize;
+ let mut timed_out = 0usize;
+ let mut failed = 0usize;
+
+ println!("Resuming {} payjoin session(s)...\n", total_sessions);
+
+ // Resume receiver sessions
+ for session_id in recv_session_ids {
+ let persister = ReceiverPersister::from_id(db.clone(), session_id.clone());
+ match replay_receiver_event_log(&persister) {
+ Ok((receiver_state, _)) => {
+ println!("Resuming receiver session {}", session_id);
+ match tokio::time::timeout(
+ std::time::Duration::from_secs(30),
+ self.proceed_receiver_session(
+ receiver_state,
+ &persister,
+ relay.as_str(),
+ max_fee_rate,
+ blockchain_client,
+ ),
+ )
+ .await
+ {
+ Ok(Ok(_)) => {
+ completed += 1;
+ }
+ Ok(Err(e)) => {
+ failed += 1;
+ println!("Receiver session {} failed: {}", session_id, e);
+ }
+ Err(_) => {
+ timed_out += 1;
+ println!("Receiver session {} timed out", session_id);
+ }
+ }
+ }
+ Err(e) => {
+ failed += 1;
+ println!("Failed to replay receiver session {}: {:?}", session_id, e);
+ }
+ }
+ }
+
+ // Resume sender sessions
+ for session_id in send_session_ids {
+ let persister = SenderPersister::from_id(db.clone(), session_id.clone());
+ match replay_sender_event_log(&persister) {
+ Ok((sender_state, _)) => {
+ println!("Resuming sender session {}", session_id);
+ match tokio::time::timeout(
+ std::time::Duration::from_secs(30),
+ self.proceed_sender_session(
+ sender_state,
+ &persister,
+ vec![relay.clone()],
+ blockchain_client,
+ ),
+ )
+ .await
+ {
+ Ok(Ok(_)) => {
+ completed += 1;
+ }
+ Ok(Err(e)) => {
+ failed += 1;
+ println!("Sender session {} failed: {}", session_id, e);
+ }
+ Err(_) => {
+ timed_out += 1;
+ println!("Sender session {} timed out", session_id);
+ }
+ }
+ }
+ Err(e) => {
+ failed += 1;
+ println!("Failed to replay sender session {}: {:?}", session_id, e);
+ }
+ }
+ }
+
+ Ok(serde_json::to_string_pretty(&json!({
+ "message": format!("Resumed polling for {} session(s).", total_sessions),
+ "outcome": format!(
+ "Completed: {}, timed out: {}, failed: {}.",
+ completed, timed_out, failed
+ )
+ }))?)
+ }
+
+ /// Show payjoin session history
+ pub fn history(
+ datadir: Option<std::path::PathBuf>,
+ wallet_name: &str,
+ ) -> Result<String, Error> {
+ let db = open_payjoin_db(datadir, wallet_name)?;
+ let mut send_rows: Vec<SessionHistoryRow> = Vec::new();
+ let mut recv_rows: Vec<SessionHistoryRow> = Vec::new();
+
+ // Active send sessions
+ for session_id in db
+ .get_send_session_ids()
+ .map_err(|e| Error::Generic(format!("{e}")))?
+ {
+ let persister = SenderPersister::from_id(db.clone(), session_id.clone());
+ let status = match replay_sender_event_log(&persister) {
+ Ok((state, _)) => state.status_text().to_string(),
+ Err(e) => e.to_string(),
+ };
+ send_rows.push(SessionHistoryRow {
+ id: session_id.to_string(),
+ role: "Sender",
+ status,
+ completed_at: None,
+ });
+ }
+
+ // Active receive sessions
+ for session_id in db
+ .get_recv_session_ids()
+ .map_err(|e| Error::Generic(format!("{e}")))?
+ {
+ let persister = ReceiverPersister::from_id(db.clone(), session_id.clone());
+ let status = match replay_receiver_event_log(&persister) {
+ Ok((state, _)) => state.status_text().to_string(),
+ Err(e) => e.to_string(),
+ };
+ recv_rows.push(SessionHistoryRow {
+ id: session_id.to_string(),
+ role: "Receiver",
+ status,
+ completed_at: None,
+ });
+ }
+
+ // Completed send sessions
+ for (session_id, completed_at) in db
+ .get_inactive_send_session_ids()
+ .map_err(|e| Error::Generic(format!("{e}")))?
+ {
+ let persister = SenderPersister::from_id(db.clone(), session_id.clone());
+ let status = match replay_sender_event_log(&persister) {
+ Ok((state, _)) => state.status_text().to_string(),
+ Err(e) => e.to_string(),
+ };
+ let completed_at = db
+ .format_unix_timestamp(completed_at)
+ .map_err(|e| Error::Generic(format!("{e}")))?;
+ send_rows.push(SessionHistoryRow {
+ id: session_id.to_string(),
+ role: "Sender",
+ status,
+ completed_at: Some(completed_at),
+ });
+ }
+
+ // Completed receive sessions
+ for (session_id, completed_at) in db
+ .get_inactive_recv_session_ids()
+ .map_err(|e| Error::Generic(format!("{e}")))?
+ {
+ let persister = ReceiverPersister::from_id(db.clone(), session_id.clone());
+ let status = match replay_receiver_event_log(&persister) {
+ Ok((state, _)) => state.status_text().to_string(),
+ Err(e) => e.to_string(),
+ };
+ let completed_at = db
+ .format_unix_timestamp(completed_at)
+ .map_err(|e| Error::Generic(format!("{e}")))?;
+ recv_rows.push(SessionHistoryRow {
+ id: session_id.to_string(),
+ role: "Receiver",
+ status,
+ completed_at: Some(completed_at),
+ });
+ }
+
+ let rows: Vec<Vec<CellStruct>> = send_rows
+ .iter()
+ .chain(recv_rows.iter())
+ .map(|row| {
+ vec![
+ row.id.as_str().cell(),
+ row.role.cell(),
+ row.completed_at
+ .clone()
+ .unwrap_or_else(|| "Not Completed".to_string())
+ .cell(),
+ row.status.as_str().cell(),
+ ]
+ })
+ .collect();
+
+ let table = rows
+ .table()
+ .title(vec![
+ "Session ID".cell().bold(true),
+ "Sender/Receiver".cell().bold(true),
+ "Completed At".cell().bold(true),
+ "Status".cell().bold(true),
+ ])
+ .display()
+ .map_err(|e| Error::Generic(e.to_string()))?;
+
+ Ok(format!("{table}"))
+ }
}