]> Untitled Git - bdk-cli/commitdiff
feat: add payjoin receive support
authorMehmet Efe Umit <mehmetefeumit@protonmail.com>
Wed, 19 Nov 2025 06:23:10 +0000 (22:23 -0800)
committerMehmet Efe Umit <mehmetefeumit@protonmail.com>
Thu, 18 Dec 2025 16:11:17 +0000 (08:11 -0800)
src/commands.rs
src/handlers.rs
src/payjoin/mod.rs

index e730c0f9532adebcd8e3b29a68c173097e2f26ab..7e65af2a20cfa96b5b3a1380b8eda74038d6c31a 100644 (file)
@@ -430,6 +430,23 @@ pub enum OnlineWalletSubCommand {
         )]
         tx: Option<String>,
     },
+    // Generates a Payjoin receive URI and processes the sender's Payjoin proposal.
+    ReceivePayjoin {
+        /// Amount to be received in sats.
+        #[arg(env = "PAYJOIN_AMOUNT", long = "amount", required = true)]
+        amount: u64,
+        /// Payjoin directory which will be used to store the PSBTs which are pending action
+        /// from one of the parties.
+        #[arg(env = "PAYJOIN_DIRECTORY", long = "directory", required = true)]
+        directory: String,
+        /// URL of the Payjoin OHTTP relay. Can be repeated multiple times to attempt the
+        /// operation with multiple relays for redundancy.
+        #[arg(env = "PAYJOIN_OHTTP_RELAY", long = "ohttp_relay", required = true)]
+        ohttp_relay: Vec<String>,
+        /// Maximum effective fee rate the receiver is willing to pay for their own input/output contributions.
+        #[arg(env = "PAYJOIN_RECEIVER_MAX_FEE_RATE", long = "max_fee_rate")]
+        max_fee_rate: Option<u64>,
+    },
     /// Sends an original PSBT to a BIP 21 URI and broadcasts the returned Payjoin PSBT.
     SendPayjoin {
         /// BIP 21 URI for the Payjoin.
index d9b214ccc8abd28d96f921b05fdd6ca8c34642fa..4631186c666ac5e5fa510e7673a3598b937e1582 100644 (file)
@@ -9,7 +9,6 @@
 //! Command Handlers
 //!
 //! This module describes all the command handling logic used by bdk-cli.
-
 use crate::commands::OfflineWalletSubCommand::*;
 use crate::commands::*;
 use crate::error::BDKCliError as Error;
@@ -715,6 +714,18 @@ pub(crate) async fn handle_online_wallet_subcommand(
             let txid = broadcast_transaction(client, tx).await?;
             Ok(serde_json::to_string_pretty(&json!({ "txid": txid }))?)
         }
+        ReceivePayjoin {
+            amount,
+            directory,
+            ohttp_relay,
+            max_fee_rate,
+        } => {
+            let relay_manager = Arc::new(Mutex::new(RelayManager::new()));
+            let mut payjoin_manager = PayjoinManager::new(wallet, relay_manager);
+            return payjoin_manager
+                .receive_payjoin(amount, directory, max_fee_rate, ohttp_relay, client)
+                .await;
+        }
         SendPayjoin {
             uri,
             ohttp_relay,
index 2f36224b0902de50133f7a137aad375472a2017c..f5e1274ca1acfa0a5085da1e0296fadcbc281798 100644 (file)
@@ -45,6 +45,74 @@ impl<'a> PayjoinManager<'a> {
         }
     }
 
+    pub async fn receive_payjoin(
+        &mut self,
+        amount: u64,
+        directory: String,
+        max_fee_rate: Option<u64>,
+        ohttp_relays: Vec<String>,
+        blockchain_client: BlockchainClient,
+    ) -> Result<String, Error> {
+        let address = self
+            .wallet
+            .next_unused_address(bdk_wallet::KeychainKind::External);
+
+        let ohttp_relays: Vec<url::Url> = ohttp_relays
+            .into_iter()
+            .map(|s| url::Url::parse(&s))
+            .collect::<Result<_, _>>()
+            .map_err(|e| Error::Generic(format!("Failed to parse one or more OHTTP URLs: {e}")))?;
+
+        if ohttp_relays.is_empty() {
+            return Err(Error::Generic(
+                "At least one valid OHTTP relay must be provided.".into(),
+            ));
+        }
+
+        let ohttp_keys =
+            fetch_ohttp_keys(ohttp_relays, &directory, self.relay_manager.clone()).await?;
+        // TODO: Implement proper persister.
+        let persister = payjoin::persist::NoopSessionPersister::<ReceiverSessionEvent>::default();
+
+        let checked_max_fee_rate = max_fee_rate
+            .map(|rate| FeeRate::from_sat_per_kwu(rate))
+            .unwrap_or(FeeRate::BROADCAST_MIN);
+
+        let receiver = payjoin::receive::v2::ReceiverBuilder::new(
+            address.address,
+            directory,
+            ohttp_keys.ohttp_keys,
+        )
+        .map_err(|e| {
+            Error::Generic(format!(
+                "Failed to initialize a Payjoin ReceiverBuilder: {e}"
+            ))
+        })?
+        .with_amount(payjoin::bitcoin::Amount::from_sat(amount))
+        .with_max_fee_rate(checked_max_fee_rate)
+        .build()
+        .save(&persister)
+        .map_err(|e| {
+            Error::Generic(format!(
+                "Failed to persister the receiver after initialization: {e}"
+            ))
+        })?;
+
+        let pj_uri = receiver.pj_uri();
+        println!("Request Payjoin by sharing this Payjoin Uri:");
+        println!("{pj_uri}");
+
+        self.proceed_receiver_session(
+            ReceiveSession::Initialized(receiver.clone()),
+            &persister,
+            ohttp_keys.relay_url.to_string(),
+            checked_max_fee_rate,
+            blockchain_client,
+        )
+        .await?;
+
+        Ok(to_string_pretty(&json!({}))?)
+    }
 
     pub async fn send_payjoin(
         &mut self,
@@ -162,6 +230,505 @@ impl<'a> PayjoinManager<'a> {
 
         Ok(to_string_pretty(&json!({ "txid": txid }))?)
     }
+
+    async fn proceed_receiver_session(
+        &mut self,
+        session: ReceiveSession,
+        persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
+        relay: impl payjoin::IntoUrl,
+        max_fee_rate: FeeRate,
+        blockchain_client: BlockchainClient,
+    ) -> Result<(), Error> {
+        match session {
+            ReceiveSession::Initialized(proposal) => {
+                self.read_from_directory(
+                    proposal,
+                    persister,
+                    relay,
+                    max_fee_rate,
+                    blockchain_client,
+                )
+                .await
+            }
+            ReceiveSession::UncheckedOriginalPayload(proposal) => {
+                self.check_proposal(proposal, persister, max_fee_rate, blockchain_client)
+                    .await
+            }
+            ReceiveSession::MaybeInputsOwned(proposal) => {
+                self.check_inputs_not_owned(proposal, persister, max_fee_rate, blockchain_client)
+                    .await
+            }
+            ReceiveSession::MaybeInputsSeen(proposal) => {
+                self.check_no_inputs_seen_before(
+                    proposal,
+                    persister,
+                    max_fee_rate,
+                    blockchain_client,
+                )
+                .await
+            }
+            ReceiveSession::OutputsUnknown(proposal) => {
+                self.identify_receiver_outputs(proposal, persister, max_fee_rate, blockchain_client)
+                    .await
+            }
+            ReceiveSession::WantsOutputs(proposal) => {
+                self.commit_outputs(proposal, persister, max_fee_rate, blockchain_client)
+                    .await
+            }
+            ReceiveSession::WantsInputs(proposal) => {
+                self.contribute_inputs(proposal, persister, max_fee_rate, blockchain_client)
+                    .await
+            }
+            ReceiveSession::WantsFeeRange(proposal) => {
+                self.apply_fee_range(proposal, persister, max_fee_rate, blockchain_client)
+                    .await
+            }
+            ReceiveSession::ProvisionalProposal(proposal) => {
+                self.finalize_proposal(proposal, persister, blockchain_client)
+                    .await
+            }
+            ReceiveSession::PayjoinProposal(proposal) => {
+                self.send_payjoin_proposal(proposal, persister, blockchain_client)
+                    .await
+            }
+            ReceiveSession::Monitor(proposal) => {
+                self.monitor_payjoin_proposal(proposal, persister, blockchain_client)
+                    .await
+            }
+            ReceiveSession::HasReplyableError(error) => self.handle_error(error, persister).await,
+            ReceiveSession::Closed(_) => return Err(Error::Generic("Session closed".to_string())),
+        }
+    }
+
+    async fn read_from_directory(
+        &mut self,
+        receiver: Receiver<Initialized>,
+        persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
+        relay: impl payjoin::IntoUrl,
+        max_fee_rate: FeeRate,
+        blockchain_client: BlockchainClient,
+    ) -> Result<(), Error> {
+        let mut current_receiver_typestate = receiver;
+        let next_receiver_typestate = loop {
+            let (req, context) = current_receiver_typestate
+                .create_poll_request(relay.as_str())
+                .map_err(|e| {
+                    Error::Generic(format!(
+                        "Failed to create a poll request to read from the Payjoin directory: {e}"
+                    ))
+                })?;
+            println!("Polling receive request...");
+            let response = self.send_payjoin_post_request(req).await?;
+            let state_transition = current_receiver_typestate
+                .process_response(response.bytes().await?.to_vec().as_slice(), context)
+                .save(persister);
+            match state_transition {
+                Ok(OptionalTransitionOutcome::Progress(next_state)) => {
+                    println!("Got a request from the sender. Responding with a Payjoin proposal.");
+                    break next_state;
+                }
+                Ok(OptionalTransitionOutcome::Stasis(current_state)) => {
+                    current_receiver_typestate = current_state;
+                    continue;
+                }
+                Err(e) => {
+                    return Err(Error::Generic(format!(
+                        "Error occurred when polling for Payjoin proposal from the directory: {}",
+                        e.to_string()
+                    )));
+                }
+            }
+        };
+        self.check_proposal(
+            next_receiver_typestate,
+            persister,
+            max_fee_rate,
+            blockchain_client,
+        )
+        .await
+    }
+
+    async fn check_proposal(
+        &mut self,
+        receiver: Receiver<UncheckedOriginalPayload>,
+        persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
+        max_fee_rate: FeeRate,
+        blockchain_client: BlockchainClient,
+    ) -> Result<(), Error> {
+        let next_receiver_typestate = receiver
+            .assume_interactive_receiver()
+            .save(persister)
+            .map_err(|e| {
+                Error::Generic(format!(
+                    "Error occurred when saving after assuming interactive receiver and not checking proposal broadcastability: {e}"
+                ))
+            })?;
+
+        println!(
+            "Checking whether the original proposal can be broadcasted itself is not supported. If the Payjoin fails, manually fall back to the transaction below."
+        );
+        println!(
+            "{}",
+            serialize_hex(&next_receiver_typestate.extract_tx_to_schedule_broadcast())
+        );
+
+        self.check_inputs_not_owned(
+            next_receiver_typestate,
+            persister,
+            max_fee_rate,
+            blockchain_client,
+        )
+        .await
+    }
+
+    async fn check_inputs_not_owned(
+        &mut self,
+        receiver: Receiver<MaybeInputsOwned>,
+        persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
+        max_fee_rate: FeeRate,
+        blockchain_client: BlockchainClient,
+    ) -> Result<(), Error> {
+        let next_receiver_typestate = receiver
+            .check_inputs_not_owned(&mut |input| {
+                Ok(self.wallet.is_mine(input.to_owned()))
+            })
+            .save(persister)
+            .map_err(|e| {
+                Error::Generic(format!("Error occurred when saving after checking if inputs in the original proposal are not owned: {e}"))
+            })?;
+
+        self.check_no_inputs_seen_before(
+            next_receiver_typestate,
+            persister,
+            max_fee_rate,
+            blockchain_client,
+        )
+        .await
+    }
+
+    async fn check_no_inputs_seen_before(
+        &mut self,
+        receiver: Receiver<MaybeInputsSeen>,
+        persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
+        max_fee_rate: FeeRate,
+        blockchain_client: BlockchainClient,
+    ) -> Result<(), Error> {
+        // This is not supported as there is no persistence of previous Payjoin attempts in BDK CLI
+        // yet. If there is support either in the BDK persister or Payjoin persister, this can be
+        // implemented, but it is not a concern as the use cases of the CLI does not warrant
+        // protection against probing attacks.
+        println!(
+            "Checking whether the inputs in the proposal were seen before to protect from probing attacks is not supported. Skipping the check..."
+        );
+        let next_receiver_typestate = receiver.check_no_inputs_seen_before(&mut |_| Ok(false)).save(persister).map_err(|e| {
+            Error::Generic(format!("Error occurred when saving after checking if the inputs in the proposal were seen before: {e}"))
+        })?;
+        self.identify_receiver_outputs(
+            next_receiver_typestate,
+            persister,
+            max_fee_rate,
+            blockchain_client,
+        )
+        .await
+    }
+
+    async fn identify_receiver_outputs(
+        &mut self,
+        receiver: Receiver<OutputsUnknown>,
+        persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
+        max_fee_rate: FeeRate,
+        blockchain_client: BlockchainClient,
+    ) -> Result<(), Error> {
+        let next_receiver_typestate = receiver.identify_receiver_outputs(&mut |output_script| {
+            Ok(self.wallet.is_mine(output_script.to_owned()))
+        }).save(persister).map_err(|e| {
+            Error::Generic(format!("Error occurred when saving after checking if the outputs in the original proposal are owned by the receiver: {e}"))
+        })?;
+
+        self.commit_outputs(
+            next_receiver_typestate,
+            persister,
+            max_fee_rate,
+            blockchain_client,
+        )
+        .await
+    }
+
+    async fn commit_outputs(
+        &mut self,
+        receiver: Receiver<WantsOutputs>,
+        persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
+        max_fee_rate: FeeRate,
+        blockchain_client: BlockchainClient,
+    ) -> Result<(), Error> {
+        // This is a typestate to modify existing receiver-owned outputs in case the receiver wants
+        // to do that. This is a very simple implementation of Payjoin so we are just going
+        // to commit to the existing outputs which the sender included in the original proposal.
+        let next_receiver_typestate = receiver.commit_outputs().save(persister).map_err(|e| {
+            Error::Generic(format!(
+                "Error occurred when saving after committing to the outputs in the proposal: {e}"
+            ))
+        })?;
+        self.contribute_inputs(
+            next_receiver_typestate,
+            persister,
+            max_fee_rate,
+            blockchain_client,
+        )
+        .await
+    }
+
+    async fn contribute_inputs(
+        &mut self,
+        receiver: Receiver<WantsInputs>,
+        persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
+        max_fee_rate: FeeRate,
+        blockchain_client: BlockchainClient,
+    ) -> Result<(), Error> {
+        let candidate_inputs: Vec<InputPair> = self
+            .wallet
+            .list_unspent()
+            .map(|output| {
+                let psbtin = self
+                    .wallet
+                    .get_psbt_input(output.clone(), None, false)
+                    .expect(
+                        "Failed to get the PSBT Input using the output of the unspent transaction",
+                    );
+                let txin = TxIn {
+                    previous_output: output.outpoint,
+                    ..Default::default()
+                };
+                InputPair::new(txin, psbtin, None)
+                    .expect("Failed to create InputPair when contributing outputs to the proposal")
+            })
+            .collect();
+        let selected_input = receiver
+            .try_preserving_privacy(candidate_inputs)
+            .map_err(|e| {
+                Error::Generic(format!(
+                    "Error occurred when trying to pick an unspent UTXO for input contribution: {e}"
+                ))
+            })?;
+
+        let next_receiver_typestate = receiver.contribute_inputs(vec![selected_input])
+            .map_err(|e| {
+                Error::Generic(format!("Error occurred when contributing the selected input to the proposal: {e}"))
+            })?.commit_inputs().save(persister)
+            .map_err(|e| {
+                Error::Generic(format!("Error occurred when saving after committing to the inputs after receiver contribution: {e}"))
+            })?;
+
+        self.apply_fee_range(
+            next_receiver_typestate,
+            persister,
+            max_fee_rate,
+            blockchain_client,
+        )
+        .await
+    }
+
+    async fn apply_fee_range(
+        &mut self,
+        receiver: Receiver<WantsFeeRange>,
+        persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
+        max_fee_rate: FeeRate,
+        blockchain_client: BlockchainClient,
+    ) -> Result<(), Error> {
+        let next_receiver_typestate = receiver.apply_fee_range(None, Some(max_fee_rate)).save(persister).map_err(|e| {
+            Error::Generic(format!("Error occurred when saving after applying the receiver fee range to the transaction: {e}"))
+        })?;
+        self.finalize_proposal(next_receiver_typestate, persister, blockchain_client)
+            .await
+    }
+
+    async fn finalize_proposal(
+        &mut self,
+        receiver: Receiver<ProvisionalProposal>,
+        persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
+        blockchain_client: BlockchainClient,
+    ) -> Result<(), Error> {
+        let next_receiver_typestate = receiver
+            .finalize_proposal(|psbt| {
+                let mut psbt_clone = psbt.clone();
+
+                // We cannot finalize the transaction for broadcasting as it does not have the
+                // sender signatures yet. Hence, we only care about whether this returns an Err.
+                let _ = !self
+                    .wallet
+                    .sign(&mut psbt_clone, SignOptions::default())
+                    .map_err(|e| {
+                        ImplementationError::from(
+                            format!("Error occurred when signing the Payjoin PSBT: {e}").as_str(),
+                        )
+                    })?;
+
+                Ok(psbt_clone)
+            })
+            .save(persister)
+            .map_err(|e| {
+                Error::Generic(format!(
+                    "Error occurred when saving after signing the Payjoin proposal: {e}"
+                ))
+            })?;
+
+        self.send_payjoin_proposal(next_receiver_typestate, persister, blockchain_client)
+            .await
+    }
+
+    async fn send_payjoin_proposal(
+        &mut self,
+        receiver: Receiver<PayjoinProposal>,
+        persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
+        blockchain_client: BlockchainClient,
+    ) -> Result<(), Error> {
+        let (req, ctx) = receiver.create_post_request(
+            self.relay_manager
+                .lock()
+                .expect("Lock should not be poisoned")
+                .get_selected_relay()
+                .expect("A relay should already be selected")
+                .as_str(),
+        ).map_err(|e| {
+                Error::Generic(format!("Error occurred when creating a post request for sending final Payjoin proposal: {e}"))
+            })?;
+
+        let res = self.send_payjoin_post_request(req).await?;
+        let payjoin_psbt = receiver.psbt().clone();
+        let next_receiver_typestate = receiver.process_response(&res.bytes().await?, ctx).save(persister).map_err(|e| {
+            Error::Generic(format!("Error occurred when saving after processing the response to the Payjoin proposal send: {e}"))
+        })?;
+        println!(
+            "Response successful. TXID: {}",
+            payjoin_psbt.extract_tx_unchecked_fee_rate().compute_txid()
+        );
+        return self
+            .monitor_payjoin_proposal(next_receiver_typestate, persister, blockchain_client)
+            .await;
+    }
+
+    /// Syncs the blockchain once and then checks whether the Payjoin was broadcasted by the
+    /// sender.
+    ///
+    /// The currenty implementation does not support checking for the Payjoin broadcast in a loop
+    /// and returning only when it is detected or if a timeout is reached because the [`sync_wallet`]
+    /// function consumes the BlockchainClient. BDK CLI supports multiple blockchain clients, and
+    /// at the time of writing, Kyoto consumes the client since BDK CLI is not designed for long-running
+    /// tasks.
+    async fn monitor_payjoin_proposal(
+        &mut self,
+        receiver: Receiver<Monitor>,
+        persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
+        blockchain_client: BlockchainClient,
+    ) -> Result<(), Error> {
+        let wait_time_for_sync = 3;
+        let poll_internal = tokio::time::Duration::from_secs(wait_time_for_sync);
+
+        println!(
+            "Waiting for {wait_time_for_sync} seconds before syncing the blockchain and checking if the transaction has been broadcast..."
+        );
+        tokio::time::sleep(poll_internal).await;
+        sync_wallet(blockchain_client, self.wallet).await?;
+
+        let check_result = receiver
+            .check_payment(
+                |txid| {
+                    let Some(tx_details) = self.wallet.tx_details(txid) else {
+                        return Err(ImplementationError::from("Cannot find the transaction in the mempool or the blockchain"));
+                    };
+
+                    let is_seen = match tx_details.chain_position {
+                        bdk_wallet::chain::ChainPosition::Confirmed { .. } => true,
+                        bdk_wallet::chain::ChainPosition::Unconfirmed { first_seen: Some(_), .. } => true,
+                        _ => false
+                    };
+
+                    if is_seen {
+                        return Ok(Some(tx_details.tx.as_ref().clone()));
+                    }
+                    return Err(ImplementationError::from("Cannot find the transaction in the mempool or the blockchain"));
+                },
+                |outpoint| {
+                    let utxo = self.wallet.get_utxo(outpoint);
+                    match utxo {
+                        Some(_) => Ok(false),
+                        None => Ok(true),
+                    }
+                }
+            )
+            .save(persister)
+            .map_err(|e| {
+                Error::Generic(format!("Error occurred when saving after checking that sender has broadcasted the Payjoin transaction: {e}"))
+            });
+
+        match check_result {
+            Ok(_) => {
+                println!("Payjoin transaction detected in the mempool!");
+            }
+            Err(_) => {
+                println!(
+                    "Transaction was not found in the mempool after {wait_time_for_sync}. Check the state of the transaction manually after running the sync command."
+                );
+            }
+        }
+
+        Ok(())
+    }
+
+    async fn handle_error(
+        &self,
+        receiver: Receiver<HasReplyableError>,
+        persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
+    ) -> Result<(), Error> {
+        let (err_req, err_ctx) = receiver
+            .create_error_request(
+                self.relay_manager
+                    .lock()
+                    .expect("Lock should not be poisoned")
+                    .get_selected_relay()
+                    .expect("A relay should already be selected")
+                    .as_str(),
+            )
+            .map_err(|e| {
+                Error::Generic(format!(
+                    "Error occurred when creating a receiver error request: {}",
+                    e
+                ))
+            })?;
+
+        let err_response = match self.send_payjoin_post_request(err_req).await {
+            Ok(response) => response,
+            Err(e) => {
+                return Err(Error::Generic(format!(
+                    "Failed to post error request: {}",
+                    e
+                )));
+            }
+        };
+
+        let err_bytes = match err_response.bytes().await {
+            Ok(bytes) => bytes,
+            Err(e) => {
+                return Err(Error::Generic(format!(
+                    "Failed to get error response bytes: {}",
+                    e
+                )));
+            }
+        };
+
+        if let Err(e) = receiver
+            .process_error_response(&err_bytes, err_ctx)
+            .save(persister)
+        {
+            return Err(Error::Generic(format!(
+                "Failed to process error response: {}",
+                e
+            )));
+        }
+
+        Ok(())
+    }
+
     async fn proceed_sender_session(
         &self,
         session: SendSession,