]> Untitled Git - bdk/commitdiff
Add `wallet_esplora_async` example and various fixes
author志宇 <hello@evanlinjin.me>
Wed, 8 Mar 2023 21:59:18 +0000 (10:59 +1300)
committer志宇 <hello@evanlinjin.me>
Wed, 8 Mar 2023 22:07:45 +0000 (11:07 +1300)
Fixes include:
* Allow `bdk_esplora` to use async with tls
* Reorganize `bdk_esplora` crate to have separate files for
  async vs blocking
* Use optional dependencies for `bdk_esplora` async

Cargo.toml
crates/esplora/Cargo.toml
crates/esplora/src/async_ext.rs [new file with mode: 0644]
crates/esplora/src/blocking_ext.rs [new file with mode: 0644]
crates/esplora/src/lib.rs
example-crates/wallet_esplora/Cargo.toml
example-crates/wallet_esplora_async/Cargo.toml [new file with mode: 0644]
example-crates/wallet_esplora_async/src/main.rs [new file with mode: 0644]

index 78adeb4506359333d56cfd28473c1e60b74902a7..2104196be0a4713a8d6bc9d0f50b32158da573dd 100644 (file)
@@ -9,6 +9,7 @@ members = [
     "example-crates/keychain_tracker_example_cli",
     "example-crates/wallet_electrum",
     "example-crates/wallet_esplora",
+    "example-crates/wallet_esplora_async",
     "nursery/tmp_plan",
     "nursery/coin_select"
 ]
index 109a7399b0e3cd13b2028ce11f97c2a938a1eef1..bacb2aad35da57ca5ba322da2ada0631d9c14e98 100644 (file)
@@ -14,10 +14,11 @@ readme = "README.md"
 [dependencies]
 bdk_chain = { path = "../chain", version = "0.3.1", features = ["serde", "miniscript"] }
 esplora-client = { version = "0.3", default-features = false }
-async-trait = "0.1.66"
-futures = "0.3.26"
+async-trait = { version = "0.1.66", optional = true }
+futures = { version = "0.3.26", optional = true }
 
 [features]
-default = ["async", "blocking"]
-async = ["esplora-client/async"]
+default = ["async-https", "blocking"]
+async = ["async-trait", "futures", "esplora-client/async"]
+async-https = ["async", "esplora-client/async-https"]
 blocking = ["esplora-client/blocking"]
diff --git a/crates/esplora/src/async_ext.rs b/crates/esplora/src/async_ext.rs
new file mode 100644 (file)
index 0000000..7796050
--- /dev/null
@@ -0,0 +1,296 @@
+use std::collections::BTreeMap;
+
+use async_trait::async_trait;
+use bdk_chain::{
+    bitcoin::{BlockHash, OutPoint, Script, Txid},
+    chain_graph::ChainGraph,
+    keychain::KeychainScan,
+    sparse_chain, BlockId, ConfirmationTime,
+};
+use esplora_client::{Error, OutputStatus};
+use futures::stream::{FuturesOrdered, TryStreamExt};
+
+use crate::map_confirmation_time;
+
+#[async_trait(?Send)]
+pub trait EsploraAsyncExt {
+    /// Scan the blockchain (via esplora) for the data specified and returns a [`KeychainScan`].
+    ///
+    /// - `local_chain`: the most recent block hashes present locally
+    /// - `keychain_spks`: keychains that we want to scan transactions for
+    /// - `txids`: transactions that we want updated [`ChainPosition`]s for
+    /// - `outpoints`: transactions associated with these outpoints (residing, spending) that we
+    ///     want to included in the update
+    ///
+    /// The scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated
+    /// transactions. `parallel_requests` specifies the max number of HTTP requests to make in
+    /// parallel.
+    ///
+    /// [`ChainPosition`]: bdk_chain::sparse_chain::ChainPosition
+    #[allow(clippy::result_large_err)] // FIXME
+    async fn scan<K: Ord + Clone>(
+        &self,
+        local_chain: &BTreeMap<u32, BlockHash>,
+        keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
+        txids: impl IntoIterator<Item = Txid>,
+        outpoints: impl IntoIterator<Item = OutPoint>,
+        stop_gap: usize,
+        parallel_requests: usize,
+    ) -> Result<KeychainScan<K, ConfirmationTime>, Error>;
+
+    /// Convenience method to call [`scan`] without requiring a keychain.
+    ///
+    /// [`scan`]: EsploraAsyncExt::scan
+    #[allow(clippy::result_large_err)] // FIXME
+    async fn scan_without_keychain(
+        &self,
+        local_chain: &BTreeMap<u32, BlockHash>,
+        misc_spks: impl IntoIterator<Item = Script>,
+        txids: impl IntoIterator<Item = Txid>,
+        outpoints: impl IntoIterator<Item = OutPoint>,
+        parallel_requests: usize,
+    ) -> Result<ChainGraph<ConfirmationTime>, Error> {
+        let wallet_scan = self
+            .scan(
+                local_chain,
+                [(
+                    (),
+                    misc_spks
+                        .into_iter()
+                        .enumerate()
+                        .map(|(i, spk)| (i as u32, spk)),
+                )]
+                .into(),
+                txids,
+                outpoints,
+                usize::MAX,
+                parallel_requests,
+            )
+            .await?;
+
+        Ok(wallet_scan.update)
+    }
+}
+
+#[async_trait(?Send)]
+impl EsploraAsyncExt for esplora_client::AsyncClient {
+    async fn scan<K: Ord + Clone>(
+        &self,
+        local_chain: &BTreeMap<u32, BlockHash>,
+        keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
+        txids: impl IntoIterator<Item = Txid>,
+        outpoints: impl IntoIterator<Item = OutPoint>,
+        stop_gap: usize,
+        parallel_requests: usize,
+    ) -> Result<KeychainScan<K, ConfirmationTime>, Error> {
+        let parallel_requests = parallel_requests.max(1);
+        let mut scan = KeychainScan::default();
+        let update = &mut scan.update;
+        let last_active_indices = &mut scan.last_active_indices;
+
+        for (&height, &original_hash) in local_chain.iter().rev() {
+            let update_block_id = BlockId {
+                height,
+                hash: self.get_block_hash(height).await?,
+            };
+            let _ = update
+                .insert_checkpoint(update_block_id)
+                .expect("cannot repeat height here");
+            if update_block_id.hash == original_hash {
+                break;
+            }
+        }
+        let tip_at_start = BlockId {
+            height: self.get_height().await?,
+            hash: self.get_tip_hash().await?,
+        };
+        if let Err(failure) = update.insert_checkpoint(tip_at_start) {
+            match failure {
+                sparse_chain::InsertCheckpointError::HashNotMatching { .. } => {
+                    // there has been a re-org before we started scanning. We haven't consumed any iterators so it's safe to recursively call.
+                    return EsploraAsyncExt::scan(
+                        self,
+                        local_chain,
+                        keychain_spks,
+                        txids,
+                        outpoints,
+                        stop_gap,
+                        parallel_requests,
+                    )
+                    .await;
+                }
+            }
+        }
+
+        for (keychain, spks) in keychain_spks {
+            let mut spks = spks.into_iter();
+            let mut last_active_index = None;
+            let mut empty_scripts = 0;
+            type IndexWithTxs = (u32, Vec<esplora_client::Tx>);
+
+            loop {
+                let futures: FuturesOrdered<_> = (0..parallel_requests)
+                    .filter_map(|_| {
+                        let (index, script) = spks.next()?;
+                        let client = self.clone();
+                        Some(async move {
+                            let mut related_txs = client.scripthash_txs(&script, None).await?;
+
+                            let n_confirmed =
+                                related_txs.iter().filter(|tx| tx.status.confirmed).count();
+                            // esplora pages on 25 confirmed transactions. If there's 25 or more we
+                            // keep requesting to see if there's more.
+                            if n_confirmed >= 25 {
+                                loop {
+                                    let new_related_txs = client
+                                        .scripthash_txs(
+                                            &script,
+                                            Some(related_txs.last().unwrap().txid),
+                                        )
+                                        .await?;
+                                    let n = new_related_txs.len();
+                                    related_txs.extend(new_related_txs);
+                                    // we've reached the end
+                                    if n < 25 {
+                                        break;
+                                    }
+                                }
+                            }
+
+                            Result::<_, esplora_client::Error>::Ok((index, related_txs))
+                        })
+                    })
+                    .collect();
+
+                let n_futures = futures.len();
+
+                let idx_with_tx: Vec<IndexWithTxs> = futures.try_collect().await?;
+
+                for (index, related_txs) in idx_with_tx {
+                    if related_txs.is_empty() {
+                        empty_scripts += 1;
+                    } else {
+                        last_active_index = Some(index);
+                        empty_scripts = 0;
+                    }
+                    for tx in related_txs {
+                        let confirmation_time =
+                            map_confirmation_time(&tx.status, tip_at_start.height);
+
+                        if let Err(failure) = update.insert_tx(tx.to_tx(), confirmation_time) {
+                            use bdk_chain::{
+                                chain_graph::InsertTxError, sparse_chain::InsertTxError::*,
+                            };
+                            match failure {
+                                InsertTxError::Chain(TxTooHigh { .. }) => {
+                                    unreachable!("chain position already checked earlier")
+                                }
+                                InsertTxError::Chain(TxMovedUnexpectedly { .. })
+                                | InsertTxError::UnresolvableConflict(_) => {
+                                    /* implies reorg during scan. We deal with that below */
+                                }
+                            }
+                        }
+                    }
+                }
+
+                if n_futures == 0 || empty_scripts >= stop_gap {
+                    break;
+                }
+            }
+
+            if let Some(last_active_index) = last_active_index {
+                last_active_indices.insert(keychain, last_active_index);
+            }
+        }
+
+        for txid in txids.into_iter() {
+            let (tx, tx_status) =
+                match (self.get_tx(&txid).await?, self.get_tx_status(&txid).await?) {
+                    (Some(tx), Some(tx_status)) => (tx, tx_status),
+                    _ => continue,
+                };
+
+            let confirmation_time = map_confirmation_time(&tx_status, tip_at_start.height);
+
+            if let Err(failure) = update.insert_tx(tx, confirmation_time) {
+                use bdk_chain::{chain_graph::InsertTxError, sparse_chain::InsertTxError::*};
+                match failure {
+                    InsertTxError::Chain(TxTooHigh { .. }) => {
+                        unreachable!("chain position already checked earlier")
+                    }
+                    InsertTxError::Chain(TxMovedUnexpectedly { .. })
+                    | InsertTxError::UnresolvableConflict(_) => {
+                        /* implies reorg during scan. We deal with that below */
+                    }
+                }
+            }
+        }
+
+        for op in outpoints.into_iter() {
+            let mut op_txs = Vec::with_capacity(2);
+            if let (Some(tx), Some(tx_status)) = (
+                self.get_tx(&op.txid).await?,
+                self.get_tx_status(&op.txid).await?,
+            ) {
+                op_txs.push((tx, tx_status));
+                if let Some(OutputStatus {
+                    txid: Some(txid),
+                    status: Some(spend_status),
+                    ..
+                }) = self.get_output_status(&op.txid, op.vout as _).await?
+                {
+                    if let Some(spend_tx) = self.get_tx(&txid).await? {
+                        op_txs.push((spend_tx, spend_status));
+                    }
+                }
+            }
+
+            for (tx, status) in op_txs {
+                let confirmation_time = map_confirmation_time(&status, tip_at_start.height);
+
+                if let Err(failure) = update.insert_tx(tx, confirmation_time) {
+                    use bdk_chain::{chain_graph::InsertTxError, sparse_chain::InsertTxError::*};
+                    match failure {
+                        InsertTxError::Chain(TxTooHigh { .. }) => {
+                            unreachable!("chain position already checked earlier")
+                        }
+                        InsertTxError::Chain(TxMovedUnexpectedly { .. })
+                        | InsertTxError::UnresolvableConflict(_) => {
+                            /* implies reorg during scan. We deal with that below */
+                        }
+                    }
+                }
+            }
+        }
+
+        let reorg_occurred = {
+            if let Some(checkpoint) = update.chain().latest_checkpoint() {
+                self.get_block_hash(checkpoint.height).await? != checkpoint.hash
+            } else {
+                false
+            }
+        };
+
+        if reorg_occurred {
+            // A reorg occurred so lets find out where all the txids we found are in the chain now.
+            // XXX: collect required because of weird type naming issues
+            let txids_found = update
+                .chain()
+                .txids()
+                .map(|(_, txid)| *txid)
+                .collect::<Vec<_>>();
+            scan.update = EsploraAsyncExt::scan_without_keychain(
+                self,
+                local_chain,
+                [],
+                txids_found,
+                [],
+                parallel_requests,
+            )
+            .await?;
+        }
+
+        Ok(scan)
+    }
+}
diff --git a/crates/esplora/src/blocking_ext.rs b/crates/esplora/src/blocking_ext.rs
new file mode 100644 (file)
index 0000000..3f461c0
--- /dev/null
@@ -0,0 +1,290 @@
+use std::collections::BTreeMap;
+
+use bdk_chain::{
+    bitcoin::{BlockHash, OutPoint, Script, Txid},
+    chain_graph::ChainGraph,
+    keychain::KeychainScan,
+    sparse_chain, BlockId, ConfirmationTime,
+};
+use esplora_client::{Error, OutputStatus};
+
+use crate::map_confirmation_time;
+
+/// Trait to extend [`esplora_client::BlockingClient`] functionality.
+///
+/// Refer to [crate-level documentation] for more.
+///
+/// [crate-level documentation]: crate
+pub trait EsploraExt {
+    /// Scan the blockchain (via esplora) for the data specified and returns a [`KeychainScan`].
+    ///
+    /// - `local_chain`: the most recent block hashes present locally
+    /// - `keychain_spks`: keychains that we want to scan transactions for
+    /// - `txids`: transactions that we want updated [`ChainPosition`]s for
+    /// - `outpoints`: transactions associated with these outpoints (residing, spending) that we
+    ///     want to included in the update
+    ///
+    /// The scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated
+    /// transactions. `parallel_requests` specifies the max number of HTTP requests to make in
+    /// parallel.
+    ///
+    /// [`ChainPosition`]: bdk_chain::sparse_chain::ChainPosition
+    #[allow(clippy::result_large_err)] // FIXME
+    fn scan<K: Ord + Clone>(
+        &self,
+        local_chain: &BTreeMap<u32, BlockHash>,
+        keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
+        txids: impl IntoIterator<Item = Txid>,
+        outpoints: impl IntoIterator<Item = OutPoint>,
+        stop_gap: usize,
+        parallel_requests: usize,
+    ) -> Result<KeychainScan<K, ConfirmationTime>, Error>;
+
+    /// Convenience method to call [`scan`] without requiring a keychain.
+    ///
+    /// [`scan`]: EsploraExt::scan
+    #[allow(clippy::result_large_err)] // FIXME
+    fn scan_without_keychain(
+        &self,
+        local_chain: &BTreeMap<u32, BlockHash>,
+        misc_spks: impl IntoIterator<Item = Script>,
+        txids: impl IntoIterator<Item = Txid>,
+        outpoints: impl IntoIterator<Item = OutPoint>,
+        parallel_requests: usize,
+    ) -> Result<ChainGraph<ConfirmationTime>, Error> {
+        let wallet_scan = self.scan(
+            local_chain,
+            [(
+                (),
+                misc_spks
+                    .into_iter()
+                    .enumerate()
+                    .map(|(i, spk)| (i as u32, spk)),
+            )]
+            .into(),
+            txids,
+            outpoints,
+            usize::MAX,
+            parallel_requests,
+        )?;
+
+        Ok(wallet_scan.update)
+    }
+}
+
+impl EsploraExt for esplora_client::BlockingClient {
+    fn scan<K: Ord + Clone>(
+        &self,
+        local_chain: &BTreeMap<u32, BlockHash>,
+        keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
+        txids: impl IntoIterator<Item = Txid>,
+        outpoints: impl IntoIterator<Item = OutPoint>,
+        stop_gap: usize,
+        parallel_requests: usize,
+    ) -> Result<KeychainScan<K, ConfirmationTime>, Error> {
+        let parallel_requests = parallel_requests.max(1);
+        let mut scan = KeychainScan::default();
+        let update = &mut scan.update;
+        let last_active_indices = &mut scan.last_active_indices;
+
+        for (&height, &original_hash) in local_chain.iter().rev() {
+            let update_block_id = BlockId {
+                height,
+                hash: self.get_block_hash(height)?,
+            };
+            let _ = update
+                .insert_checkpoint(update_block_id)
+                .expect("cannot repeat height here");
+            if update_block_id.hash == original_hash {
+                break;
+            }
+        }
+        let tip_at_start = BlockId {
+            height: self.get_height()?,
+            hash: self.get_tip_hash()?,
+        };
+        if let Err(failure) = update.insert_checkpoint(tip_at_start) {
+            match failure {
+                sparse_chain::InsertCheckpointError::HashNotMatching { .. } => {
+                    // there has been a re-org before we started scanning. We haven't consumed any iterators so it's safe to recursively call.
+                    return EsploraExt::scan(
+                        self,
+                        local_chain,
+                        keychain_spks,
+                        txids,
+                        outpoints,
+                        stop_gap,
+                        parallel_requests,
+                    );
+                }
+            }
+        }
+
+        for (keychain, spks) in keychain_spks {
+            let mut spks = spks.into_iter();
+            let mut last_active_index = None;
+            let mut empty_scripts = 0;
+            type IndexWithTxs = (u32, Vec<esplora_client::Tx>);
+
+            loop {
+                let handles = (0..parallel_requests)
+                    .filter_map(
+                        |_| -> Option<std::thread::JoinHandle<Result<IndexWithTxs, _>>> {
+                            let (index, script) = spks.next()?;
+                            let client = self.clone();
+                            Some(std::thread::spawn(move || {
+                                let mut related_txs = client.scripthash_txs(&script, None)?;
+
+                                let n_confirmed =
+                                    related_txs.iter().filter(|tx| tx.status.confirmed).count();
+                                // esplora pages on 25 confirmed transactions. If there's 25 or more we
+                                // keep requesting to see if there's more.
+                                if n_confirmed >= 25 {
+                                    loop {
+                                        let new_related_txs = client.scripthash_txs(
+                                            &script,
+                                            Some(related_txs.last().unwrap().txid),
+                                        )?;
+                                        let n = new_related_txs.len();
+                                        related_txs.extend(new_related_txs);
+                                        // we've reached the end
+                                        if n < 25 {
+                                            break;
+                                        }
+                                    }
+                                }
+
+                                Result::<_, esplora_client::Error>::Ok((index, related_txs))
+                            }))
+                        },
+                    )
+                    .collect::<Vec<_>>();
+
+                let n_handles = handles.len();
+
+                for handle in handles {
+                    let (index, related_txs) = handle.join().unwrap()?; // TODO: don't unwrap
+                    if related_txs.is_empty() {
+                        empty_scripts += 1;
+                    } else {
+                        last_active_index = Some(index);
+                        empty_scripts = 0;
+                    }
+                    for tx in related_txs {
+                        let confirmation_time =
+                            map_confirmation_time(&tx.status, tip_at_start.height);
+
+                        if let Err(failure) = update.insert_tx(tx.to_tx(), confirmation_time) {
+                            use bdk_chain::{
+                                chain_graph::InsertTxError, sparse_chain::InsertTxError::*,
+                            };
+                            match failure {
+                                InsertTxError::Chain(TxTooHigh { .. }) => {
+                                    unreachable!("chain position already checked earlier")
+                                }
+                                InsertTxError::Chain(TxMovedUnexpectedly { .. })
+                                | InsertTxError::UnresolvableConflict(_) => {
+                                    /* implies reorg during scan. We deal with that below */
+                                }
+                            }
+                        }
+                    }
+                }
+
+                if n_handles == 0 || empty_scripts >= stop_gap {
+                    break;
+                }
+            }
+
+            if let Some(last_active_index) = last_active_index {
+                last_active_indices.insert(keychain, last_active_index);
+            }
+        }
+
+        for txid in txids.into_iter() {
+            let (tx, tx_status) = match (self.get_tx(&txid)?, self.get_tx_status(&txid)?) {
+                (Some(tx), Some(tx_status)) => (tx, tx_status),
+                _ => continue,
+            };
+
+            let confirmation_time = map_confirmation_time(&tx_status, tip_at_start.height);
+
+            if let Err(failure) = update.insert_tx(tx, confirmation_time) {
+                use bdk_chain::{chain_graph::InsertTxError, sparse_chain::InsertTxError::*};
+                match failure {
+                    InsertTxError::Chain(TxTooHigh { .. }) => {
+                        unreachable!("chain position already checked earlier")
+                    }
+                    InsertTxError::Chain(TxMovedUnexpectedly { .. })
+                    | InsertTxError::UnresolvableConflict(_) => {
+                        /* implies reorg during scan. We deal with that below */
+                    }
+                }
+            }
+        }
+
+        for op in outpoints.into_iter() {
+            let mut op_txs = Vec::with_capacity(2);
+            if let (Some(tx), Some(tx_status)) =
+                (self.get_tx(&op.txid)?, self.get_tx_status(&op.txid)?)
+            {
+                op_txs.push((tx, tx_status));
+                if let Some(OutputStatus {
+                    txid: Some(txid),
+                    status: Some(spend_status),
+                    ..
+                }) = self.get_output_status(&op.txid, op.vout as _)?
+                {
+                    if let Some(spend_tx) = self.get_tx(&txid)? {
+                        op_txs.push((spend_tx, spend_status));
+                    }
+                }
+            }
+
+            for (tx, status) in op_txs {
+                let confirmation_time = map_confirmation_time(&status, tip_at_start.height);
+
+                if let Err(failure) = update.insert_tx(tx, confirmation_time) {
+                    use bdk_chain::{chain_graph::InsertTxError, sparse_chain::InsertTxError::*};
+                    match failure {
+                        InsertTxError::Chain(TxTooHigh { .. }) => {
+                            unreachable!("chain position already checked earlier")
+                        }
+                        InsertTxError::Chain(TxMovedUnexpectedly { .. })
+                        | InsertTxError::UnresolvableConflict(_) => {
+                            /* implies reorg during scan. We deal with that below */
+                        }
+                    }
+                }
+            }
+        }
+
+        let reorg_occurred = {
+            if let Some(checkpoint) = update.chain().latest_checkpoint() {
+                self.get_block_hash(checkpoint.height)? != checkpoint.hash
+            } else {
+                false
+            }
+        };
+
+        if reorg_occurred {
+            // A reorg occurred so lets find out where all the txids we found are in the chain now.
+            // XXX: collect required because of weird type naming issues
+            let txids_found = update
+                .chain()
+                .txids()
+                .map(|(_, txid)| *txid)
+                .collect::<Vec<_>>();
+            scan.update = EsploraExt::scan_without_keychain(
+                self,
+                local_chain,
+                [],
+                txids_found,
+                [],
+                parallel_requests,
+            )?;
+        }
+
+        Ok(scan)
+    }
+}
index 9ef2b1bda0442e5bbc737466ff6853726391a6e2..5964dc5b30f07c16697cd28f648493f79f71ac0a 100644 (file)
 //! This crate is used for updating structures of [`bdk_chain`] with data from an esplora server.
 //!
 //! The star of the show is the  [`EsploraExt::scan`] method which scans for relevant
-//! blockchain data (via esplora) and outputs a [`KeychainScan`].
+//! blockchain data (via esplora) and outputs a [`KeychainScan`](bdk_chain::keychain::KeychainScan).
 
-use async_trait::async_trait;
-use bdk_chain::{
-    bitcoin::{BlockHash, OutPoint, Script, Txid},
-    chain_graph::ChainGraph,
-    keychain::KeychainScan,
-    sparse_chain, BlockId, ConfirmationTime,
-};
-use esplora_client::{OutputStatus, TxStatus};
-use futures::stream::{FuturesOrdered, TryStreamExt};
-use std::collections::BTreeMap;
+use bdk_chain::ConfirmationTime;
+use esplora_client::TxStatus;
 
 pub use esplora_client;
-use esplora_client::Error;
-
-/// Trait to extend [`esplora_client::BlockingClient`] functionality.
-///
-/// Refer to [crate-level documentation] for more.
-///
-/// [crate-level documentation]: crate
 
 #[cfg(feature = "blocking")]
-pub trait EsploraExt {
-    /// Scan the blockchain (via esplora) for the data specified and returns a [`KeychainScan`].
-    ///
-    /// - `local_chain`: the most recent block hashes present locally
-    /// - `keychain_spks`: keychains that we want to scan transactions for
-    /// - `txids`: transactions that we want updated [`ChainPosition`]s for
-    /// - `outpoints`: transactions associated with these outpoints (residing, spending) that we
-    ///     want to included in the update
-    ///
-    /// The scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated
-    /// transactions. `parallel_requests` specifies the max number of HTTP requests to make in
-    /// parallel.
-    ///
-    /// [`ChainPosition`]: bdk_chain::sparse_chain::ChainPosition
-    #[allow(clippy::result_large_err)] // FIXME
-    fn scan<K: Ord + Clone>(
-        &self,
-        local_chain: &BTreeMap<u32, BlockHash>,
-        keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
-        txids: impl IntoIterator<Item = Txid>,
-        outpoints: impl IntoIterator<Item = OutPoint>,
-        stop_gap: usize,
-        parallel_requests: usize,
-    ) -> Result<KeychainScan<K, ConfirmationTime>, Error>;
-
-    /// Convenience method to call [`scan`] without requiring a keychain.
-    ///
-    /// [`scan`]: EsploraExt::scan
-    #[allow(clippy::result_large_err)] // FIXME
-    fn scan_without_keychain(
-        &self,
-        local_chain: &BTreeMap<u32, BlockHash>,
-        misc_spks: impl IntoIterator<Item = Script>,
-        txids: impl IntoIterator<Item = Txid>,
-        outpoints: impl IntoIterator<Item = OutPoint>,
-        parallel_requests: usize,
-    ) -> Result<ChainGraph<ConfirmationTime>, Error> {
-        let wallet_scan = self.scan(
-            local_chain,
-            [(
-                (),
-                misc_spks
-                    .into_iter()
-                    .enumerate()
-                    .map(|(i, spk)| (i as u32, spk)),
-            )]
-            .into(),
-            txids,
-            outpoints,
-            usize::MAX,
-            parallel_requests,
-        )?;
-
-        Ok(wallet_scan.update)
-    }
-}
-
+mod blocking_ext;
 #[cfg(feature = "blocking")]
-impl EsploraExt for esplora_client::BlockingClient {
-    fn scan<K: Ord + Clone>(
-        &self,
-        local_chain: &BTreeMap<u32, BlockHash>,
-        keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
-        txids: impl IntoIterator<Item = Txid>,
-        outpoints: impl IntoIterator<Item = OutPoint>,
-        stop_gap: usize,
-        parallel_requests: usize,
-    ) -> Result<KeychainScan<K, ConfirmationTime>, Error> {
-        let parallel_requests = parallel_requests.max(1);
-        let mut scan = KeychainScan::default();
-        let update = &mut scan.update;
-        let last_active_indices = &mut scan.last_active_indices;
-
-        for (&height, &original_hash) in local_chain.iter().rev() {
-            let update_block_id = BlockId {
-                height,
-                hash: self.get_block_hash(height)?,
-            };
-            let _ = update
-                .insert_checkpoint(update_block_id)
-                .expect("cannot repeat height here");
-            if update_block_id.hash == original_hash {
-                break;
-            }
-        }
-        let tip_at_start = BlockId {
-            height: self.get_height()?,
-            hash: self.get_tip_hash()?,
-        };
-        if let Err(failure) = update.insert_checkpoint(tip_at_start) {
-            match failure {
-                sparse_chain::InsertCheckpointError::HashNotMatching { .. } => {
-                    // there has been a re-org before we started scanning. We haven't consumed any iterators so it's safe to recursively call.
-                    return EsploraExt::scan(
-                        self,
-                        local_chain,
-                        keychain_spks,
-                        txids,
-                        outpoints,
-                        stop_gap,
-                        parallel_requests,
-                    );
-                }
-            }
-        }
+pub use blocking_ext::*;
 
-        for (keychain, spks) in keychain_spks {
-            let mut spks = spks.into_iter();
-            let mut last_active_index = None;
-            let mut empty_scripts = 0;
-            type IndexWithTxs = (u32, Vec<esplora_client::Tx>);
-
-            loop {
-                let handles = (0..parallel_requests)
-                    .filter_map(
-                        |_| -> Option<std::thread::JoinHandle<Result<IndexWithTxs, _>>> {
-                            let (index, script) = spks.next()?;
-                            let client = self.clone();
-                            Some(std::thread::spawn(move || {
-                                let mut related_txs = client.scripthash_txs(&script, None)?;
-
-                                let n_confirmed =
-                                    related_txs.iter().filter(|tx| tx.status.confirmed).count();
-                                // esplora pages on 25 confirmed transactions. If there's 25 or more we
-                                // keep requesting to see if there's more.
-                                if n_confirmed >= 25 {
-                                    loop {
-                                        let new_related_txs = client.scripthash_txs(
-                                            &script,
-                                            Some(related_txs.last().unwrap().txid),
-                                        )?;
-                                        let n = new_related_txs.len();
-                                        related_txs.extend(new_related_txs);
-                                        // we've reached the end
-                                        if n < 25 {
-                                            break;
-                                        }
-                                    }
-                                }
-
-                                Result::<_, esplora_client::Error>::Ok((index, related_txs))
-                            }))
-                        },
-                    )
-                    .collect::<Vec<_>>();
-
-                let n_handles = handles.len();
-
-                for handle in handles {
-                    let (index, related_txs) = handle.join().unwrap()?; // TODO: don't unwrap
-                    if related_txs.is_empty() {
-                        empty_scripts += 1;
-                    } else {
-                        last_active_index = Some(index);
-                        empty_scripts = 0;
-                    }
-                    for tx in related_txs {
-                        let confirmation_time =
-                            map_confirmation_time(&tx.status, tip_at_start.height);
-
-                        if let Err(failure) = update.insert_tx(tx.to_tx(), confirmation_time) {
-                            use bdk_chain::{
-                                chain_graph::InsertTxError, sparse_chain::InsertTxError::*,
-                            };
-                            match failure {
-                                InsertTxError::Chain(TxTooHigh { .. }) => {
-                                    unreachable!("chain position already checked earlier")
-                                }
-                                InsertTxError::Chain(TxMovedUnexpectedly { .. })
-                                | InsertTxError::UnresolvableConflict(_) => {
-                                    /* implies reorg during scan. We deal with that below */
-                                }
-                            }
-                        }
-                    }
-                }
-
-                if n_handles == 0 || empty_scripts >= stop_gap {
-                    break;
-                }
-            }
-
-            if let Some(last_active_index) = last_active_index {
-                last_active_indices.insert(keychain, last_active_index);
-            }
-        }
-
-        for txid in txids.into_iter() {
-            let (tx, tx_status) = match (self.get_tx(&txid)?, self.get_tx_status(&txid)?) {
-                (Some(tx), Some(tx_status)) => (tx, tx_status),
-                _ => continue,
-            };
-
-            let confirmation_time = map_confirmation_time(&tx_status, tip_at_start.height);
-
-            if let Err(failure) = update.insert_tx(tx, confirmation_time) {
-                use bdk_chain::{chain_graph::InsertTxError, sparse_chain::InsertTxError::*};
-                match failure {
-                    InsertTxError::Chain(TxTooHigh { .. }) => {
-                        unreachable!("chain position already checked earlier")
-                    }
-                    InsertTxError::Chain(TxMovedUnexpectedly { .. })
-                    | InsertTxError::UnresolvableConflict(_) => {
-                        /* implies reorg during scan. We deal with that below */
-                    }
-                }
-            }
-        }
-
-        for op in outpoints.into_iter() {
-            let mut op_txs = Vec::with_capacity(2);
-            if let (Some(tx), Some(tx_status)) =
-                (self.get_tx(&op.txid)?, self.get_tx_status(&op.txid)?)
-            {
-                op_txs.push((tx, tx_status));
-                if let Some(OutputStatus {
-                    txid: Some(txid),
-                    status: Some(spend_status),
-                    ..
-                }) = self.get_output_status(&op.txid, op.vout as _)?
-                {
-                    if let Some(spend_tx) = self.get_tx(&txid)? {
-                        op_txs.push((spend_tx, spend_status));
-                    }
-                }
-            }
-
-            for (tx, status) in op_txs {
-                let confirmation_time = map_confirmation_time(&status, tip_at_start.height);
-
-                if let Err(failure) = update.insert_tx(tx, confirmation_time) {
-                    use bdk_chain::{chain_graph::InsertTxError, sparse_chain::InsertTxError::*};
-                    match failure {
-                        InsertTxError::Chain(TxTooHigh { .. }) => {
-                            unreachable!("chain position already checked earlier")
-                        }
-                        InsertTxError::Chain(TxMovedUnexpectedly { .. })
-                        | InsertTxError::UnresolvableConflict(_) => {
-                            /* implies reorg during scan. We deal with that below */
-                        }
-                    }
-                }
-            }
-        }
-
-        let reorg_occurred = {
-            if let Some(checkpoint) = update.chain().latest_checkpoint() {
-                self.get_block_hash(checkpoint.height)? != checkpoint.hash
-            } else {
-                false
-            }
-        };
-
-        if reorg_occurred {
-            // A reorg occurred so lets find out where all the txids we found are in the chain now.
-            // XXX: collect required because of weird type naming issues
-            let txids_found = update
-                .chain()
-                .txids()
-                .map(|(_, txid)| *txid)
-                .collect::<Vec<_>>();
-            scan.update = EsploraExt::scan_without_keychain(
-                self,
-                local_chain,
-                [],
-                txids_found,
-                [],
-                parallel_requests,
-            )?;
-        }
-
-        Ok(scan)
-    }
-}
+#[cfg(feature = "async")]
+mod async_ext;
+#[cfg(feature = "async")]
+pub use async_ext::*;
 
-fn map_confirmation_time(tx_status: &TxStatus, height_at_start: u32) -> ConfirmationTime {
+pub(crate) fn map_confirmation_time(
+    tx_status: &TxStatus,
+    height_at_start: u32,
+) -> ConfirmationTime {
     match (tx_status.block_time, tx_status.block_height) {
         (Some(time), Some(height)) if height <= height_at_start => {
             ConfirmationTime::Confirmed { height, time }
@@ -307,288 +29,3 @@ fn map_confirmation_time(tx_status: &TxStatus, height_at_start: u32) -> Confirma
         _ => ConfirmationTime::Unconfirmed,
     }
 }
-
-#[cfg(feature = "async")]
-#[async_trait(?Send)]
-pub trait EsploraAsyncExt {
-    /// Scan the blockchain (via esplora) for the data specified and returns a [`KeychainScan`].
-    ///
-    /// - `local_chain`: the most recent block hashes present locally
-    /// - `keychain_spks`: keychains that we want to scan transactions for
-    /// - `txids`: transactions that we want updated [`ChainPosition`]s for
-    /// - `outpoints`: transactions associated with these outpoints (residing, spending) that we
-    ///     want to included in the update
-    ///
-    /// The scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated
-    /// transactions. `parallel_requests` specifies the max number of HTTP requests to make in
-    /// parallel.
-    ///
-    /// [`ChainPosition`]: bdk_chain::sparse_chain::ChainPosition
-    #[allow(clippy::result_large_err)] // FIXME
-    async fn scan<K: Ord + Clone>(
-        &self,
-        local_chain: &BTreeMap<u32, BlockHash>,
-        keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
-        txids: impl IntoIterator<Item = Txid>,
-        outpoints: impl IntoIterator<Item = OutPoint>,
-        stop_gap: usize,
-        parallel_requests: usize,
-    ) -> Result<KeychainScan<K, ConfirmationTime>, Error>;
-
-    /// Convenience method to call [`scan`] without requiring a keychain.
-    ///
-    /// [`scan`]: EsploraAsyncExt::scan
-    #[allow(clippy::result_large_err)] // FIXME
-    async fn scan_without_keychain(
-        &self,
-        local_chain: &BTreeMap<u32, BlockHash>,
-        misc_spks: impl IntoIterator<Item = Script>,
-        txids: impl IntoIterator<Item = Txid>,
-        outpoints: impl IntoIterator<Item = OutPoint>,
-        parallel_requests: usize,
-    ) -> Result<ChainGraph<ConfirmationTime>, Error> {
-        let wallet_scan = self
-            .scan(
-                local_chain,
-                [(
-                    (),
-                    misc_spks
-                        .into_iter()
-                        .enumerate()
-                        .map(|(i, spk)| (i as u32, spk)),
-                )]
-                .into(),
-                txids,
-                outpoints,
-                usize::MAX,
-                parallel_requests,
-            )
-            .await?;
-
-        Ok(wallet_scan.update)
-    }
-}
-
-#[cfg(feature = "async")]
-#[async_trait(?Send)]
-impl EsploraAsyncExt for esplora_client::AsyncClient {
-    async fn scan<K: Ord + Clone>(
-        &self,
-        local_chain: &BTreeMap<u32, BlockHash>,
-        keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
-        txids: impl IntoIterator<Item = Txid>,
-        outpoints: impl IntoIterator<Item = OutPoint>,
-        stop_gap: usize,
-        parallel_requests: usize,
-    ) -> Result<KeychainScan<K, ConfirmationTime>, Error> {
-        let parallel_requests = parallel_requests.max(1);
-        let mut scan = KeychainScan::default();
-        let update = &mut scan.update;
-        let last_active_indices = &mut scan.last_active_indices;
-
-        for (&height, &original_hash) in local_chain.iter().rev() {
-            let update_block_id = BlockId {
-                height,
-                hash: self.get_block_hash(height).await?,
-            };
-            let _ = update
-                .insert_checkpoint(update_block_id)
-                .expect("cannot repeat height here");
-            if update_block_id.hash == original_hash {
-                break;
-            }
-        }
-        let tip_at_start = BlockId {
-            height: self.get_height().await?,
-            hash: self.get_tip_hash().await?,
-        };
-        if let Err(failure) = update.insert_checkpoint(tip_at_start) {
-            match failure {
-                sparse_chain::InsertCheckpointError::HashNotMatching { .. } => {
-                    // there has been a re-org before we started scanning. We haven't consumed any iterators so it's safe to recursively call.
-                    return EsploraAsyncExt::scan(
-                        self,
-                        local_chain,
-                        keychain_spks,
-                        txids,
-                        outpoints,
-                        stop_gap,
-                        parallel_requests,
-                    )
-                    .await;
-                }
-            }
-        }
-
-        for (keychain, spks) in keychain_spks {
-            let mut spks = spks.into_iter();
-            let mut last_active_index = None;
-            let mut empty_scripts = 0;
-            type IndexWithTxs = (u32, Vec<esplora_client::Tx>);
-
-            loop {
-                let futures: FuturesOrdered<_> = (0..parallel_requests)
-                    .filter_map(|_| {
-                        let (index, script) = spks.next()?;
-                        let client = self.clone();
-                        Some(async move {
-                            let mut related_txs = client.scripthash_txs(&script, None).await?;
-
-                            let n_confirmed =
-                                related_txs.iter().filter(|tx| tx.status.confirmed).count();
-                            // esplora pages on 25 confirmed transactions. If there's 25 or more we
-                            // keep requesting to see if there's more.
-                            if n_confirmed >= 25 {
-                                loop {
-                                    let new_related_txs = client
-                                        .scripthash_txs(
-                                            &script,
-                                            Some(related_txs.last().unwrap().txid),
-                                        )
-                                        .await?;
-                                    let n = new_related_txs.len();
-                                    related_txs.extend(new_related_txs);
-                                    // we've reached the end
-                                    if n < 25 {
-                                        break;
-                                    }
-                                }
-                            }
-
-                            Result::<_, esplora_client::Error>::Ok((index, related_txs))
-                        })
-                    })
-                    .collect();
-
-                let n_futures = futures.len();
-
-                let idx_with_tx: Vec<IndexWithTxs> = futures.try_collect().await?;
-
-                for (index, related_txs) in idx_with_tx {
-                    if related_txs.is_empty() {
-                        empty_scripts += 1;
-                    } else {
-                        last_active_index = Some(index);
-                        empty_scripts = 0;
-                    }
-                    for tx in related_txs {
-                        let confirmation_time =
-                            map_confirmation_time(&tx.status, tip_at_start.height);
-
-                        if let Err(failure) = update.insert_tx(tx.to_tx(), confirmation_time) {
-                            use bdk_chain::{
-                                chain_graph::InsertTxError, sparse_chain::InsertTxError::*,
-                            };
-                            match failure {
-                                InsertTxError::Chain(TxTooHigh { .. }) => {
-                                    unreachable!("chain position already checked earlier")
-                                }
-                                InsertTxError::Chain(TxMovedUnexpectedly { .. })
-                                | InsertTxError::UnresolvableConflict(_) => {
-                                    /* implies reorg during scan. We deal with that below */
-                                }
-                            }
-                        }
-                    }
-                }
-
-                if n_futures == 0 || empty_scripts >= stop_gap {
-                    break;
-                }
-            }
-
-            if let Some(last_active_index) = last_active_index {
-                last_active_indices.insert(keychain, last_active_index);
-            }
-        }
-
-        for txid in txids.into_iter() {
-            let (tx, tx_status) =
-                match (self.get_tx(&txid).await?, self.get_tx_status(&txid).await?) {
-                    (Some(tx), Some(tx_status)) => (tx, tx_status),
-                    _ => continue,
-                };
-
-            let confirmation_time = map_confirmation_time(&tx_status, tip_at_start.height);
-
-            if let Err(failure) = update.insert_tx(tx, confirmation_time) {
-                use bdk_chain::{chain_graph::InsertTxError, sparse_chain::InsertTxError::*};
-                match failure {
-                    InsertTxError::Chain(TxTooHigh { .. }) => {
-                        unreachable!("chain position already checked earlier")
-                    }
-                    InsertTxError::Chain(TxMovedUnexpectedly { .. })
-                    | InsertTxError::UnresolvableConflict(_) => {
-                        /* implies reorg during scan. We deal with that below */
-                    }
-                }
-            }
-        }
-
-        for op in outpoints.into_iter() {
-            let mut op_txs = Vec::with_capacity(2);
-            if let (Some(tx), Some(tx_status)) = (
-                self.get_tx(&op.txid).await?,
-                self.get_tx_status(&op.txid).await?,
-            ) {
-                op_txs.push((tx, tx_status));
-                if let Some(OutputStatus {
-                    txid: Some(txid),
-                    status: Some(spend_status),
-                    ..
-                }) = self.get_output_status(&op.txid, op.vout as _).await?
-                {
-                    if let Some(spend_tx) = self.get_tx(&txid).await? {
-                        op_txs.push((spend_tx, spend_status));
-                    }
-                }
-            }
-
-            for (tx, status) in op_txs {
-                let confirmation_time = map_confirmation_time(&status, tip_at_start.height);
-
-                if let Err(failure) = update.insert_tx(tx, confirmation_time) {
-                    use bdk_chain::{chain_graph::InsertTxError, sparse_chain::InsertTxError::*};
-                    match failure {
-                        InsertTxError::Chain(TxTooHigh { .. }) => {
-                            unreachable!("chain position already checked earlier")
-                        }
-                        InsertTxError::Chain(TxMovedUnexpectedly { .. })
-                        | InsertTxError::UnresolvableConflict(_) => {
-                            /* implies reorg during scan. We deal with that below */
-                        }
-                    }
-                }
-            }
-        }
-
-        let reorg_occurred = {
-            if let Some(checkpoint) = update.chain().latest_checkpoint() {
-                self.get_block_hash(checkpoint.height).await? != checkpoint.hash
-            } else {
-                false
-            }
-        };
-
-        if reorg_occurred {
-            // A reorg occurred so lets find out where all the txids we found are in the chain now.
-            // XXX: collect required because of weird type naming issues
-            let txids_found = update
-                .chain()
-                .txids()
-                .map(|(_, txid)| *txid)
-                .collect::<Vec<_>>();
-            scan.update = EsploraAsyncExt::scan_without_keychain(
-                self,
-                local_chain,
-                [],
-                txids_found,
-                [],
-                parallel_requests,
-            )
-            .await?;
-        }
-
-        Ok(scan)
-    }
-}
index 917bea8f578a4b543e3e5d830f66ff2190b8659e..8e19cb7bd8ae242b679bbc76de25f9084b8dacc3 100644 (file)
@@ -8,5 +8,5 @@ publish = false
 
 [dependencies]
 bdk = { path = "../../crates/bdk" }
-bdk_esplora = { path = "../../crates/esplora" }
+bdk_esplora = { path = "../../crates/esplora", features = ["blocking"] }
 bdk_file_store = { path = "../../crates/file_store" }
diff --git a/example-crates/wallet_esplora_async/Cargo.toml b/example-crates/wallet_esplora_async/Cargo.toml
new file mode 100644 (file)
index 0000000..af368fc
--- /dev/null
@@ -0,0 +1,12 @@
+[package]
+name = "wallet_esplora_async"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+bdk = { path = "../../crates/bdk" }
+bdk_esplora = { path = "../../crates/esplora", features = ["async-https"] }
+bdk_file_store = { path = "../../crates/file_store" }
+tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros"] }
diff --git a/example-crates/wallet_esplora_async/src/main.rs b/example-crates/wallet_esplora_async/src/main.rs
new file mode 100644 (file)
index 0000000..b78b09d
--- /dev/null
@@ -0,0 +1,99 @@
+use std::{io::Write, str::FromStr};
+
+use bdk::{
+    bitcoin::{Address, Network},
+    wallet::AddressIndex,
+    SignOptions, Wallet,
+};
+use bdk_esplora::{esplora_client, EsploraAsyncExt};
+use bdk_file_store::KeychainStore;
+
+const SEND_AMOUNT: u64 = 5000;
+const STOP_GAP: usize = 50;
+const PARALLEL_REQUESTS: usize = 5;
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn std::error::Error>> {
+    let db_path = std::env::temp_dir().join("bdk-esplora-example");
+    let db = KeychainStore::new_from_path(db_path)?;
+    let external_descriptor = "wpkh(tprv8ZgxMBicQKsPdy6LMhUtFHAgpocR8GC6QmwMSFpZs7h6Eziw3SpThFfczTDh5rW2krkqffa11UpX3XkeTTB2FvzZKWXqPY54Y6Rq4AQ5R8L/84'/0'/0'/0/*)";
+    let internal_descriptor = "wpkh(tprv8ZgxMBicQKsPdy6LMhUtFHAgpocR8GC6QmwMSFpZs7h6Eziw3SpThFfczTDh5rW2krkqffa11UpX3XkeTTB2FvzZKWXqPY54Y6Rq4AQ5R8L/84'/0'/0'/1/*)";
+
+    let mut wallet = Wallet::new(
+        external_descriptor,
+        Some(internal_descriptor),
+        db,
+        Network::Testnet,
+    )?;
+
+    let address = wallet.get_address(AddressIndex::New);
+    println!("Generated Address: {}", address);
+
+    let balance = wallet.get_balance();
+    println!("Wallet balance before syncing: {} sats", balance.total());
+
+    print!("Syncing...");
+    // Scanning the blockchain
+    let esplora_url = "https://mempool.space/testnet/api";
+    let client = esplora_client::Builder::new(esplora_url).build_async()?;
+    let checkpoints = wallet.checkpoints();
+    let spks = wallet
+        .spks_of_all_keychains()
+        .into_iter()
+        .map(|(k, spks)| {
+            let mut first = true;
+            (
+                k,
+                spks.inspect(move |(spk_i, _)| {
+                    if first {
+                        first = false;
+                        print!("\nScanning keychain [{:?}]:", k);
+                    }
+                    print!(" {}", spk_i);
+                    let _ = std::io::stdout().flush();
+                }),
+            )
+        })
+        .collect();
+    let update = client
+        .scan(
+            checkpoints,
+            spks,
+            std::iter::empty(),
+            std::iter::empty(),
+            STOP_GAP,
+            PARALLEL_REQUESTS,
+        )
+        .await?;
+    println!();
+    wallet.apply_update(update)?;
+    wallet.commit()?;
+
+    let balance = wallet.get_balance();
+    println!("Wallet balance after syncing: {} sats", balance.total());
+
+    if balance.total() < SEND_AMOUNT {
+        println!(
+            "Please send at least {} sats to the receiving address",
+            SEND_AMOUNT
+        );
+        std::process::exit(0);
+    }
+
+    let faucet_address = Address::from_str("mkHS9ne12qx9pS9VojpwU5xtRd4T7X7ZUt")?;
+
+    let mut tx_builder = wallet.build_tx();
+    tx_builder
+        .add_recipient(faucet_address.script_pubkey(), SEND_AMOUNT)
+        .enable_rbf();
+
+    let (mut psbt, _) = tx_builder.finish()?;
+    let finalized = wallet.sign(&mut psbt, SignOptions::default())?;
+    assert!(finalized);
+
+    let tx = psbt.extract_tx();
+    client.broadcast(&tx).await?;
+    println!("Tx broadcasted! Txid: {}", tx.txid());
+
+    Ok(())
+}