From: Mehmet Efe Umit Date: Wed, 19 Nov 2025 06:23:10 +0000 (-0800) Subject: feat: add payjoin receive support X-Git-Url: http://internal-gitweb-vhost/script/%22https:/enum.SignError.html?a=commitdiff_plain;h=4cab2fa8f4d4b7fe95453bf3391019cf2dd4d7aa;p=bdk-cli feat: add payjoin receive support --- diff --git a/src/commands.rs b/src/commands.rs index e730c0f..7e65af2 100644 --- a/src/commands.rs +++ b/src/commands.rs @@ -430,6 +430,23 @@ pub enum OnlineWalletSubCommand { )] tx: Option, }, + // Generates a Payjoin receive URI and processes the sender's Payjoin proposal. + ReceivePayjoin { + /// Amount to be received in sats. + #[arg(env = "PAYJOIN_AMOUNT", long = "amount", required = true)] + amount: u64, + /// Payjoin directory which will be used to store the PSBTs which are pending action + /// from one of the parties. + #[arg(env = "PAYJOIN_DIRECTORY", long = "directory", required = true)] + directory: String, + /// URL of the Payjoin OHTTP relay. Can be repeated multiple times to attempt the + /// operation with multiple relays for redundancy. + #[arg(env = "PAYJOIN_OHTTP_RELAY", long = "ohttp_relay", required = true)] + ohttp_relay: Vec, + /// Maximum effective fee rate the receiver is willing to pay for their own input/output contributions. + #[arg(env = "PAYJOIN_RECEIVER_MAX_FEE_RATE", long = "max_fee_rate")] + max_fee_rate: Option, + }, /// Sends an original PSBT to a BIP 21 URI and broadcasts the returned Payjoin PSBT. SendPayjoin { /// BIP 21 URI for the Payjoin. diff --git a/src/handlers.rs b/src/handlers.rs index d9b214c..4631186 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -9,7 +9,6 @@ //! Command Handlers //! //! This module describes all the command handling logic used by bdk-cli. - use crate::commands::OfflineWalletSubCommand::*; use crate::commands::*; use crate::error::BDKCliError as Error; @@ -715,6 +714,18 @@ pub(crate) async fn handle_online_wallet_subcommand( let txid = broadcast_transaction(client, tx).await?; Ok(serde_json::to_string_pretty(&json!({ "txid": txid }))?) } + ReceivePayjoin { + amount, + directory, + ohttp_relay, + max_fee_rate, + } => { + let relay_manager = Arc::new(Mutex::new(RelayManager::new())); + let mut payjoin_manager = PayjoinManager::new(wallet, relay_manager); + return payjoin_manager + .receive_payjoin(amount, directory, max_fee_rate, ohttp_relay, client) + .await; + } SendPayjoin { uri, ohttp_relay, diff --git a/src/payjoin/mod.rs b/src/payjoin/mod.rs index 2f36224..f5e1274 100644 --- a/src/payjoin/mod.rs +++ b/src/payjoin/mod.rs @@ -45,6 +45,74 @@ impl<'a> PayjoinManager<'a> { } } + pub async fn receive_payjoin( + &mut self, + amount: u64, + directory: String, + max_fee_rate: Option, + ohttp_relays: Vec, + blockchain_client: BlockchainClient, + ) -> Result { + let address = self + .wallet + .next_unused_address(bdk_wallet::KeychainKind::External); + + let ohttp_relays: Vec = ohttp_relays + .into_iter() + .map(|s| url::Url::parse(&s)) + .collect::>() + .map_err(|e| Error::Generic(format!("Failed to parse one or more OHTTP URLs: {e}")))?; + + if ohttp_relays.is_empty() { + return Err(Error::Generic( + "At least one valid OHTTP relay must be provided.".into(), + )); + } + + let ohttp_keys = + fetch_ohttp_keys(ohttp_relays, &directory, self.relay_manager.clone()).await?; + // TODO: Implement proper persister. + let persister = payjoin::persist::NoopSessionPersister::::default(); + + let checked_max_fee_rate = max_fee_rate + .map(|rate| FeeRate::from_sat_per_kwu(rate)) + .unwrap_or(FeeRate::BROADCAST_MIN); + + let receiver = payjoin::receive::v2::ReceiverBuilder::new( + address.address, + directory, + ohttp_keys.ohttp_keys, + ) + .map_err(|e| { + Error::Generic(format!( + "Failed to initialize a Payjoin ReceiverBuilder: {e}" + )) + })? + .with_amount(payjoin::bitcoin::Amount::from_sat(amount)) + .with_max_fee_rate(checked_max_fee_rate) + .build() + .save(&persister) + .map_err(|e| { + Error::Generic(format!( + "Failed to persister the receiver after initialization: {e}" + )) + })?; + + let pj_uri = receiver.pj_uri(); + println!("Request Payjoin by sharing this Payjoin Uri:"); + println!("{pj_uri}"); + + self.proceed_receiver_session( + ReceiveSession::Initialized(receiver.clone()), + &persister, + ohttp_keys.relay_url.to_string(), + checked_max_fee_rate, + blockchain_client, + ) + .await?; + + Ok(to_string_pretty(&json!({}))?) + } pub async fn send_payjoin( &mut self, @@ -162,6 +230,505 @@ impl<'a> PayjoinManager<'a> { Ok(to_string_pretty(&json!({ "txid": txid }))?) } + + async fn proceed_receiver_session( + &mut self, + session: ReceiveSession, + persister: &impl SessionPersister, + relay: impl payjoin::IntoUrl, + max_fee_rate: FeeRate, + blockchain_client: BlockchainClient, + ) -> Result<(), Error> { + match session { + ReceiveSession::Initialized(proposal) => { + self.read_from_directory( + proposal, + persister, + relay, + max_fee_rate, + blockchain_client, + ) + .await + } + ReceiveSession::UncheckedOriginalPayload(proposal) => { + self.check_proposal(proposal, persister, max_fee_rate, blockchain_client) + .await + } + ReceiveSession::MaybeInputsOwned(proposal) => { + self.check_inputs_not_owned(proposal, persister, max_fee_rate, blockchain_client) + .await + } + ReceiveSession::MaybeInputsSeen(proposal) => { + self.check_no_inputs_seen_before( + proposal, + persister, + max_fee_rate, + blockchain_client, + ) + .await + } + ReceiveSession::OutputsUnknown(proposal) => { + self.identify_receiver_outputs(proposal, persister, max_fee_rate, blockchain_client) + .await + } + ReceiveSession::WantsOutputs(proposal) => { + self.commit_outputs(proposal, persister, max_fee_rate, blockchain_client) + .await + } + ReceiveSession::WantsInputs(proposal) => { + self.contribute_inputs(proposal, persister, max_fee_rate, blockchain_client) + .await + } + ReceiveSession::WantsFeeRange(proposal) => { + self.apply_fee_range(proposal, persister, max_fee_rate, blockchain_client) + .await + } + ReceiveSession::ProvisionalProposal(proposal) => { + self.finalize_proposal(proposal, persister, blockchain_client) + .await + } + ReceiveSession::PayjoinProposal(proposal) => { + self.send_payjoin_proposal(proposal, persister, blockchain_client) + .await + } + ReceiveSession::Monitor(proposal) => { + self.monitor_payjoin_proposal(proposal, persister, blockchain_client) + .await + } + ReceiveSession::HasReplyableError(error) => self.handle_error(error, persister).await, + ReceiveSession::Closed(_) => return Err(Error::Generic("Session closed".to_string())), + } + } + + async fn read_from_directory( + &mut self, + receiver: Receiver, + persister: &impl SessionPersister, + relay: impl payjoin::IntoUrl, + max_fee_rate: FeeRate, + blockchain_client: BlockchainClient, + ) -> Result<(), Error> { + let mut current_receiver_typestate = receiver; + let next_receiver_typestate = loop { + let (req, context) = current_receiver_typestate + .create_poll_request(relay.as_str()) + .map_err(|e| { + Error::Generic(format!( + "Failed to create a poll request to read from the Payjoin directory: {e}" + )) + })?; + println!("Polling receive request..."); + let response = self.send_payjoin_post_request(req).await?; + let state_transition = current_receiver_typestate + .process_response(response.bytes().await?.to_vec().as_slice(), context) + .save(persister); + match state_transition { + Ok(OptionalTransitionOutcome::Progress(next_state)) => { + println!("Got a request from the sender. Responding with a Payjoin proposal."); + break next_state; + } + Ok(OptionalTransitionOutcome::Stasis(current_state)) => { + current_receiver_typestate = current_state; + continue; + } + Err(e) => { + return Err(Error::Generic(format!( + "Error occurred when polling for Payjoin proposal from the directory: {}", + e.to_string() + ))); + } + } + }; + self.check_proposal( + next_receiver_typestate, + persister, + max_fee_rate, + blockchain_client, + ) + .await + } + + async fn check_proposal( + &mut self, + receiver: Receiver, + persister: &impl SessionPersister, + max_fee_rate: FeeRate, + blockchain_client: BlockchainClient, + ) -> Result<(), Error> { + let next_receiver_typestate = receiver + .assume_interactive_receiver() + .save(persister) + .map_err(|e| { + Error::Generic(format!( + "Error occurred when saving after assuming interactive receiver and not checking proposal broadcastability: {e}" + )) + })?; + + println!( + "Checking whether the original proposal can be broadcasted itself is not supported. If the Payjoin fails, manually fall back to the transaction below." + ); + println!( + "{}", + serialize_hex(&next_receiver_typestate.extract_tx_to_schedule_broadcast()) + ); + + self.check_inputs_not_owned( + next_receiver_typestate, + persister, + max_fee_rate, + blockchain_client, + ) + .await + } + + async fn check_inputs_not_owned( + &mut self, + receiver: Receiver, + persister: &impl SessionPersister, + max_fee_rate: FeeRate, + blockchain_client: BlockchainClient, + ) -> Result<(), Error> { + let next_receiver_typestate = receiver + .check_inputs_not_owned(&mut |input| { + Ok(self.wallet.is_mine(input.to_owned())) + }) + .save(persister) + .map_err(|e| { + Error::Generic(format!("Error occurred when saving after checking if inputs in the original proposal are not owned: {e}")) + })?; + + self.check_no_inputs_seen_before( + next_receiver_typestate, + persister, + max_fee_rate, + blockchain_client, + ) + .await + } + + async fn check_no_inputs_seen_before( + &mut self, + receiver: Receiver, + persister: &impl SessionPersister, + max_fee_rate: FeeRate, + blockchain_client: BlockchainClient, + ) -> Result<(), Error> { + // This is not supported as there is no persistence of previous Payjoin attempts in BDK CLI + // yet. If there is support either in the BDK persister or Payjoin persister, this can be + // implemented, but it is not a concern as the use cases of the CLI does not warrant + // protection against probing attacks. + println!( + "Checking whether the inputs in the proposal were seen before to protect from probing attacks is not supported. Skipping the check..." + ); + let next_receiver_typestate = receiver.check_no_inputs_seen_before(&mut |_| Ok(false)).save(persister).map_err(|e| { + Error::Generic(format!("Error occurred when saving after checking if the inputs in the proposal were seen before: {e}")) + })?; + self.identify_receiver_outputs( + next_receiver_typestate, + persister, + max_fee_rate, + blockchain_client, + ) + .await + } + + async fn identify_receiver_outputs( + &mut self, + receiver: Receiver, + persister: &impl SessionPersister, + max_fee_rate: FeeRate, + blockchain_client: BlockchainClient, + ) -> Result<(), Error> { + let next_receiver_typestate = receiver.identify_receiver_outputs(&mut |output_script| { + Ok(self.wallet.is_mine(output_script.to_owned())) + }).save(persister).map_err(|e| { + Error::Generic(format!("Error occurred when saving after checking if the outputs in the original proposal are owned by the receiver: {e}")) + })?; + + self.commit_outputs( + next_receiver_typestate, + persister, + max_fee_rate, + blockchain_client, + ) + .await + } + + async fn commit_outputs( + &mut self, + receiver: Receiver, + persister: &impl SessionPersister, + max_fee_rate: FeeRate, + blockchain_client: BlockchainClient, + ) -> Result<(), Error> { + // This is a typestate to modify existing receiver-owned outputs in case the receiver wants + // to do that. This is a very simple implementation of Payjoin so we are just going + // to commit to the existing outputs which the sender included in the original proposal. + let next_receiver_typestate = receiver.commit_outputs().save(persister).map_err(|e| { + Error::Generic(format!( + "Error occurred when saving after committing to the outputs in the proposal: {e}" + )) + })?; + self.contribute_inputs( + next_receiver_typestate, + persister, + max_fee_rate, + blockchain_client, + ) + .await + } + + async fn contribute_inputs( + &mut self, + receiver: Receiver, + persister: &impl SessionPersister, + max_fee_rate: FeeRate, + blockchain_client: BlockchainClient, + ) -> Result<(), Error> { + let candidate_inputs: Vec = self + .wallet + .list_unspent() + .map(|output| { + let psbtin = self + .wallet + .get_psbt_input(output.clone(), None, false) + .expect( + "Failed to get the PSBT Input using the output of the unspent transaction", + ); + let txin = TxIn { + previous_output: output.outpoint, + ..Default::default() + }; + InputPair::new(txin, psbtin, None) + .expect("Failed to create InputPair when contributing outputs to the proposal") + }) + .collect(); + let selected_input = receiver + .try_preserving_privacy(candidate_inputs) + .map_err(|e| { + Error::Generic(format!( + "Error occurred when trying to pick an unspent UTXO for input contribution: {e}" + )) + })?; + + let next_receiver_typestate = receiver.contribute_inputs(vec![selected_input]) + .map_err(|e| { + Error::Generic(format!("Error occurred when contributing the selected input to the proposal: {e}")) + })?.commit_inputs().save(persister) + .map_err(|e| { + Error::Generic(format!("Error occurred when saving after committing to the inputs after receiver contribution: {e}")) + })?; + + self.apply_fee_range( + next_receiver_typestate, + persister, + max_fee_rate, + blockchain_client, + ) + .await + } + + async fn apply_fee_range( + &mut self, + receiver: Receiver, + persister: &impl SessionPersister, + max_fee_rate: FeeRate, + blockchain_client: BlockchainClient, + ) -> Result<(), Error> { + let next_receiver_typestate = receiver.apply_fee_range(None, Some(max_fee_rate)).save(persister).map_err(|e| { + Error::Generic(format!("Error occurred when saving after applying the receiver fee range to the transaction: {e}")) + })?; + self.finalize_proposal(next_receiver_typestate, persister, blockchain_client) + .await + } + + async fn finalize_proposal( + &mut self, + receiver: Receiver, + persister: &impl SessionPersister, + blockchain_client: BlockchainClient, + ) -> Result<(), Error> { + let next_receiver_typestate = receiver + .finalize_proposal(|psbt| { + let mut psbt_clone = psbt.clone(); + + // We cannot finalize the transaction for broadcasting as it does not have the + // sender signatures yet. Hence, we only care about whether this returns an Err. + let _ = !self + .wallet + .sign(&mut psbt_clone, SignOptions::default()) + .map_err(|e| { + ImplementationError::from( + format!("Error occurred when signing the Payjoin PSBT: {e}").as_str(), + ) + })?; + + Ok(psbt_clone) + }) + .save(persister) + .map_err(|e| { + Error::Generic(format!( + "Error occurred when saving after signing the Payjoin proposal: {e}" + )) + })?; + + self.send_payjoin_proposal(next_receiver_typestate, persister, blockchain_client) + .await + } + + async fn send_payjoin_proposal( + &mut self, + receiver: Receiver, + persister: &impl SessionPersister, + blockchain_client: BlockchainClient, + ) -> Result<(), Error> { + let (req, ctx) = receiver.create_post_request( + self.relay_manager + .lock() + .expect("Lock should not be poisoned") + .get_selected_relay() + .expect("A relay should already be selected") + .as_str(), + ).map_err(|e| { + Error::Generic(format!("Error occurred when creating a post request for sending final Payjoin proposal: {e}")) + })?; + + let res = self.send_payjoin_post_request(req).await?; + let payjoin_psbt = receiver.psbt().clone(); + let next_receiver_typestate = receiver.process_response(&res.bytes().await?, ctx).save(persister).map_err(|e| { + Error::Generic(format!("Error occurred when saving after processing the response to the Payjoin proposal send: {e}")) + })?; + println!( + "Response successful. TXID: {}", + payjoin_psbt.extract_tx_unchecked_fee_rate().compute_txid() + ); + return self + .monitor_payjoin_proposal(next_receiver_typestate, persister, blockchain_client) + .await; + } + + /// Syncs the blockchain once and then checks whether the Payjoin was broadcasted by the + /// sender. + /// + /// The currenty implementation does not support checking for the Payjoin broadcast in a loop + /// and returning only when it is detected or if a timeout is reached because the [`sync_wallet`] + /// function consumes the BlockchainClient. BDK CLI supports multiple blockchain clients, and + /// at the time of writing, Kyoto consumes the client since BDK CLI is not designed for long-running + /// tasks. + async fn monitor_payjoin_proposal( + &mut self, + receiver: Receiver, + persister: &impl SessionPersister, + blockchain_client: BlockchainClient, + ) -> Result<(), Error> { + let wait_time_for_sync = 3; + let poll_internal = tokio::time::Duration::from_secs(wait_time_for_sync); + + println!( + "Waiting for {wait_time_for_sync} seconds before syncing the blockchain and checking if the transaction has been broadcast..." + ); + tokio::time::sleep(poll_internal).await; + sync_wallet(blockchain_client, self.wallet).await?; + + let check_result = receiver + .check_payment( + |txid| { + let Some(tx_details) = self.wallet.tx_details(txid) else { + return Err(ImplementationError::from("Cannot find the transaction in the mempool or the blockchain")); + }; + + let is_seen = match tx_details.chain_position { + bdk_wallet::chain::ChainPosition::Confirmed { .. } => true, + bdk_wallet::chain::ChainPosition::Unconfirmed { first_seen: Some(_), .. } => true, + _ => false + }; + + if is_seen { + return Ok(Some(tx_details.tx.as_ref().clone())); + } + return Err(ImplementationError::from("Cannot find the transaction in the mempool or the blockchain")); + }, + |outpoint| { + let utxo = self.wallet.get_utxo(outpoint); + match utxo { + Some(_) => Ok(false), + None => Ok(true), + } + } + ) + .save(persister) + .map_err(|e| { + Error::Generic(format!("Error occurred when saving after checking that sender has broadcasted the Payjoin transaction: {e}")) + }); + + match check_result { + Ok(_) => { + println!("Payjoin transaction detected in the mempool!"); + } + Err(_) => { + println!( + "Transaction was not found in the mempool after {wait_time_for_sync}. Check the state of the transaction manually after running the sync command." + ); + } + } + + Ok(()) + } + + async fn handle_error( + &self, + receiver: Receiver, + persister: &impl SessionPersister, + ) -> Result<(), Error> { + let (err_req, err_ctx) = receiver + .create_error_request( + self.relay_manager + .lock() + .expect("Lock should not be poisoned") + .get_selected_relay() + .expect("A relay should already be selected") + .as_str(), + ) + .map_err(|e| { + Error::Generic(format!( + "Error occurred when creating a receiver error request: {}", + e + )) + })?; + + let err_response = match self.send_payjoin_post_request(err_req).await { + Ok(response) => response, + Err(e) => { + return Err(Error::Generic(format!( + "Failed to post error request: {}", + e + ))); + } + }; + + let err_bytes = match err_response.bytes().await { + Ok(bytes) => bytes, + Err(e) => { + return Err(Error::Generic(format!( + "Failed to get error response bytes: {}", + e + ))); + } + }; + + if let Err(e) = receiver + .process_error_response(&err_bytes, err_ctx) + .save(persister) + { + return Err(Error::Generic(format!( + "Failed to process error response: {}", + e + ))); + } + + Ok(()) + } + async fn proceed_sender_session( &self, session: SendSession,