"example-crates/keychain_tracker_example_cli",
"example-crates/wallet_electrum",
"example-crates/wallet_esplora",
+ "example-crates/wallet_esplora_async",
"nursery/tmp_plan",
"nursery/coin_select"
]
[dependencies]
bdk_chain = { path = "../chain", version = "0.3.1", features = ["serde", "miniscript"] }
esplora-client = { version = "0.3", default-features = false }
-async-trait = "0.1.66"
-futures = "0.3.26"
+async-trait = { version = "0.1.66", optional = true }
+futures = { version = "0.3.26", optional = true }
[features]
-default = ["async", "blocking"]
-async = ["esplora-client/async"]
+default = ["async-https", "blocking"]
+async = ["async-trait", "futures", "esplora-client/async"]
+async-https = ["async", "esplora-client/async-https"]
blocking = ["esplora-client/blocking"]
--- /dev/null
+use std::collections::BTreeMap;
+
+use async_trait::async_trait;
+use bdk_chain::{
+ bitcoin::{BlockHash, OutPoint, Script, Txid},
+ chain_graph::ChainGraph,
+ keychain::KeychainScan,
+ sparse_chain, BlockId, ConfirmationTime,
+};
+use esplora_client::{Error, OutputStatus};
+use futures::stream::{FuturesOrdered, TryStreamExt};
+
+use crate::map_confirmation_time;
+
+#[async_trait(?Send)]
+pub trait EsploraAsyncExt {
+ /// Scan the blockchain (via esplora) for the data specified and returns a [`KeychainScan`].
+ ///
+ /// - `local_chain`: the most recent block hashes present locally
+ /// - `keychain_spks`: keychains that we want to scan transactions for
+ /// - `txids`: transactions that we want updated [`ChainPosition`]s for
+ /// - `outpoints`: transactions associated with these outpoints (residing, spending) that we
+ /// want to included in the update
+ ///
+ /// The scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated
+ /// transactions. `parallel_requests` specifies the max number of HTTP requests to make in
+ /// parallel.
+ ///
+ /// [`ChainPosition`]: bdk_chain::sparse_chain::ChainPosition
+ #[allow(clippy::result_large_err)] // FIXME
+ async fn scan<K: Ord + Clone>(
+ &self,
+ local_chain: &BTreeMap<u32, BlockHash>,
+ keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
+ txids: impl IntoIterator<Item = Txid>,
+ outpoints: impl IntoIterator<Item = OutPoint>,
+ stop_gap: usize,
+ parallel_requests: usize,
+ ) -> Result<KeychainScan<K, ConfirmationTime>, Error>;
+
+ /// Convenience method to call [`scan`] without requiring a keychain.
+ ///
+ /// [`scan`]: EsploraAsyncExt::scan
+ #[allow(clippy::result_large_err)] // FIXME
+ async fn scan_without_keychain(
+ &self,
+ local_chain: &BTreeMap<u32, BlockHash>,
+ misc_spks: impl IntoIterator<Item = Script>,
+ txids: impl IntoIterator<Item = Txid>,
+ outpoints: impl IntoIterator<Item = OutPoint>,
+ parallel_requests: usize,
+ ) -> Result<ChainGraph<ConfirmationTime>, Error> {
+ let wallet_scan = self
+ .scan(
+ local_chain,
+ [(
+ (),
+ misc_spks
+ .into_iter()
+ .enumerate()
+ .map(|(i, spk)| (i as u32, spk)),
+ )]
+ .into(),
+ txids,
+ outpoints,
+ usize::MAX,
+ parallel_requests,
+ )
+ .await?;
+
+ Ok(wallet_scan.update)
+ }
+}
+
+#[async_trait(?Send)]
+impl EsploraAsyncExt for esplora_client::AsyncClient {
+ async fn scan<K: Ord + Clone>(
+ &self,
+ local_chain: &BTreeMap<u32, BlockHash>,
+ keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
+ txids: impl IntoIterator<Item = Txid>,
+ outpoints: impl IntoIterator<Item = OutPoint>,
+ stop_gap: usize,
+ parallel_requests: usize,
+ ) -> Result<KeychainScan<K, ConfirmationTime>, Error> {
+ let parallel_requests = parallel_requests.max(1);
+ let mut scan = KeychainScan::default();
+ let update = &mut scan.update;
+ let last_active_indices = &mut scan.last_active_indices;
+
+ for (&height, &original_hash) in local_chain.iter().rev() {
+ let update_block_id = BlockId {
+ height,
+ hash: self.get_block_hash(height).await?,
+ };
+ let _ = update
+ .insert_checkpoint(update_block_id)
+ .expect("cannot repeat height here");
+ if update_block_id.hash == original_hash {
+ break;
+ }
+ }
+ let tip_at_start = BlockId {
+ height: self.get_height().await?,
+ hash: self.get_tip_hash().await?,
+ };
+ if let Err(failure) = update.insert_checkpoint(tip_at_start) {
+ match failure {
+ sparse_chain::InsertCheckpointError::HashNotMatching { .. } => {
+ // there has been a re-org before we started scanning. We haven't consumed any iterators so it's safe to recursively call.
+ return EsploraAsyncExt::scan(
+ self,
+ local_chain,
+ keychain_spks,
+ txids,
+ outpoints,
+ stop_gap,
+ parallel_requests,
+ )
+ .await;
+ }
+ }
+ }
+
+ for (keychain, spks) in keychain_spks {
+ let mut spks = spks.into_iter();
+ let mut last_active_index = None;
+ let mut empty_scripts = 0;
+ type IndexWithTxs = (u32, Vec<esplora_client::Tx>);
+
+ loop {
+ let futures: FuturesOrdered<_> = (0..parallel_requests)
+ .filter_map(|_| {
+ let (index, script) = spks.next()?;
+ let client = self.clone();
+ Some(async move {
+ let mut related_txs = client.scripthash_txs(&script, None).await?;
+
+ let n_confirmed =
+ related_txs.iter().filter(|tx| tx.status.confirmed).count();
+ // esplora pages on 25 confirmed transactions. If there's 25 or more we
+ // keep requesting to see if there's more.
+ if n_confirmed >= 25 {
+ loop {
+ let new_related_txs = client
+ .scripthash_txs(
+ &script,
+ Some(related_txs.last().unwrap().txid),
+ )
+ .await?;
+ let n = new_related_txs.len();
+ related_txs.extend(new_related_txs);
+ // we've reached the end
+ if n < 25 {
+ break;
+ }
+ }
+ }
+
+ Result::<_, esplora_client::Error>::Ok((index, related_txs))
+ })
+ })
+ .collect();
+
+ let n_futures = futures.len();
+
+ let idx_with_tx: Vec<IndexWithTxs> = futures.try_collect().await?;
+
+ for (index, related_txs) in idx_with_tx {
+ if related_txs.is_empty() {
+ empty_scripts += 1;
+ } else {
+ last_active_index = Some(index);
+ empty_scripts = 0;
+ }
+ for tx in related_txs {
+ let confirmation_time =
+ map_confirmation_time(&tx.status, tip_at_start.height);
+
+ if let Err(failure) = update.insert_tx(tx.to_tx(), confirmation_time) {
+ use bdk_chain::{
+ chain_graph::InsertTxError, sparse_chain::InsertTxError::*,
+ };
+ match failure {
+ InsertTxError::Chain(TxTooHigh { .. }) => {
+ unreachable!("chain position already checked earlier")
+ }
+ InsertTxError::Chain(TxMovedUnexpectedly { .. })
+ | InsertTxError::UnresolvableConflict(_) => {
+ /* implies reorg during scan. We deal with that below */
+ }
+ }
+ }
+ }
+ }
+
+ if n_futures == 0 || empty_scripts >= stop_gap {
+ break;
+ }
+ }
+
+ if let Some(last_active_index) = last_active_index {
+ last_active_indices.insert(keychain, last_active_index);
+ }
+ }
+
+ for txid in txids.into_iter() {
+ let (tx, tx_status) =
+ match (self.get_tx(&txid).await?, self.get_tx_status(&txid).await?) {
+ (Some(tx), Some(tx_status)) => (tx, tx_status),
+ _ => continue,
+ };
+
+ let confirmation_time = map_confirmation_time(&tx_status, tip_at_start.height);
+
+ if let Err(failure) = update.insert_tx(tx, confirmation_time) {
+ use bdk_chain::{chain_graph::InsertTxError, sparse_chain::InsertTxError::*};
+ match failure {
+ InsertTxError::Chain(TxTooHigh { .. }) => {
+ unreachable!("chain position already checked earlier")
+ }
+ InsertTxError::Chain(TxMovedUnexpectedly { .. })
+ | InsertTxError::UnresolvableConflict(_) => {
+ /* implies reorg during scan. We deal with that below */
+ }
+ }
+ }
+ }
+
+ for op in outpoints.into_iter() {
+ let mut op_txs = Vec::with_capacity(2);
+ if let (Some(tx), Some(tx_status)) = (
+ self.get_tx(&op.txid).await?,
+ self.get_tx_status(&op.txid).await?,
+ ) {
+ op_txs.push((tx, tx_status));
+ if let Some(OutputStatus {
+ txid: Some(txid),
+ status: Some(spend_status),
+ ..
+ }) = self.get_output_status(&op.txid, op.vout as _).await?
+ {
+ if let Some(spend_tx) = self.get_tx(&txid).await? {
+ op_txs.push((spend_tx, spend_status));
+ }
+ }
+ }
+
+ for (tx, status) in op_txs {
+ let confirmation_time = map_confirmation_time(&status, tip_at_start.height);
+
+ if let Err(failure) = update.insert_tx(tx, confirmation_time) {
+ use bdk_chain::{chain_graph::InsertTxError, sparse_chain::InsertTxError::*};
+ match failure {
+ InsertTxError::Chain(TxTooHigh { .. }) => {
+ unreachable!("chain position already checked earlier")
+ }
+ InsertTxError::Chain(TxMovedUnexpectedly { .. })
+ | InsertTxError::UnresolvableConflict(_) => {
+ /* implies reorg during scan. We deal with that below */
+ }
+ }
+ }
+ }
+ }
+
+ let reorg_occurred = {
+ if let Some(checkpoint) = update.chain().latest_checkpoint() {
+ self.get_block_hash(checkpoint.height).await? != checkpoint.hash
+ } else {
+ false
+ }
+ };
+
+ if reorg_occurred {
+ // A reorg occurred so lets find out where all the txids we found are in the chain now.
+ // XXX: collect required because of weird type naming issues
+ let txids_found = update
+ .chain()
+ .txids()
+ .map(|(_, txid)| *txid)
+ .collect::<Vec<_>>();
+ scan.update = EsploraAsyncExt::scan_without_keychain(
+ self,
+ local_chain,
+ [],
+ txids_found,
+ [],
+ parallel_requests,
+ )
+ .await?;
+ }
+
+ Ok(scan)
+ }
+}
--- /dev/null
+use std::collections::BTreeMap;
+
+use bdk_chain::{
+ bitcoin::{BlockHash, OutPoint, Script, Txid},
+ chain_graph::ChainGraph,
+ keychain::KeychainScan,
+ sparse_chain, BlockId, ConfirmationTime,
+};
+use esplora_client::{Error, OutputStatus};
+
+use crate::map_confirmation_time;
+
+/// Trait to extend [`esplora_client::BlockingClient`] functionality.
+///
+/// Refer to [crate-level documentation] for more.
+///
+/// [crate-level documentation]: crate
+pub trait EsploraExt {
+ /// Scan the blockchain (via esplora) for the data specified and returns a [`KeychainScan`].
+ ///
+ /// - `local_chain`: the most recent block hashes present locally
+ /// - `keychain_spks`: keychains that we want to scan transactions for
+ /// - `txids`: transactions that we want updated [`ChainPosition`]s for
+ /// - `outpoints`: transactions associated with these outpoints (residing, spending) that we
+ /// want to included in the update
+ ///
+ /// The scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated
+ /// transactions. `parallel_requests` specifies the max number of HTTP requests to make in
+ /// parallel.
+ ///
+ /// [`ChainPosition`]: bdk_chain::sparse_chain::ChainPosition
+ #[allow(clippy::result_large_err)] // FIXME
+ fn scan<K: Ord + Clone>(
+ &self,
+ local_chain: &BTreeMap<u32, BlockHash>,
+ keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
+ txids: impl IntoIterator<Item = Txid>,
+ outpoints: impl IntoIterator<Item = OutPoint>,
+ stop_gap: usize,
+ parallel_requests: usize,
+ ) -> Result<KeychainScan<K, ConfirmationTime>, Error>;
+
+ /// Convenience method to call [`scan`] without requiring a keychain.
+ ///
+ /// [`scan`]: EsploraExt::scan
+ #[allow(clippy::result_large_err)] // FIXME
+ fn scan_without_keychain(
+ &self,
+ local_chain: &BTreeMap<u32, BlockHash>,
+ misc_spks: impl IntoIterator<Item = Script>,
+ txids: impl IntoIterator<Item = Txid>,
+ outpoints: impl IntoIterator<Item = OutPoint>,
+ parallel_requests: usize,
+ ) -> Result<ChainGraph<ConfirmationTime>, Error> {
+ let wallet_scan = self.scan(
+ local_chain,
+ [(
+ (),
+ misc_spks
+ .into_iter()
+ .enumerate()
+ .map(|(i, spk)| (i as u32, spk)),
+ )]
+ .into(),
+ txids,
+ outpoints,
+ usize::MAX,
+ parallel_requests,
+ )?;
+
+ Ok(wallet_scan.update)
+ }
+}
+
+impl EsploraExt for esplora_client::BlockingClient {
+ fn scan<K: Ord + Clone>(
+ &self,
+ local_chain: &BTreeMap<u32, BlockHash>,
+ keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
+ txids: impl IntoIterator<Item = Txid>,
+ outpoints: impl IntoIterator<Item = OutPoint>,
+ stop_gap: usize,
+ parallel_requests: usize,
+ ) -> Result<KeychainScan<K, ConfirmationTime>, Error> {
+ let parallel_requests = parallel_requests.max(1);
+ let mut scan = KeychainScan::default();
+ let update = &mut scan.update;
+ let last_active_indices = &mut scan.last_active_indices;
+
+ for (&height, &original_hash) in local_chain.iter().rev() {
+ let update_block_id = BlockId {
+ height,
+ hash: self.get_block_hash(height)?,
+ };
+ let _ = update
+ .insert_checkpoint(update_block_id)
+ .expect("cannot repeat height here");
+ if update_block_id.hash == original_hash {
+ break;
+ }
+ }
+ let tip_at_start = BlockId {
+ height: self.get_height()?,
+ hash: self.get_tip_hash()?,
+ };
+ if let Err(failure) = update.insert_checkpoint(tip_at_start) {
+ match failure {
+ sparse_chain::InsertCheckpointError::HashNotMatching { .. } => {
+ // there has been a re-org before we started scanning. We haven't consumed any iterators so it's safe to recursively call.
+ return EsploraExt::scan(
+ self,
+ local_chain,
+ keychain_spks,
+ txids,
+ outpoints,
+ stop_gap,
+ parallel_requests,
+ );
+ }
+ }
+ }
+
+ for (keychain, spks) in keychain_spks {
+ let mut spks = spks.into_iter();
+ let mut last_active_index = None;
+ let mut empty_scripts = 0;
+ type IndexWithTxs = (u32, Vec<esplora_client::Tx>);
+
+ loop {
+ let handles = (0..parallel_requests)
+ .filter_map(
+ |_| -> Option<std::thread::JoinHandle<Result<IndexWithTxs, _>>> {
+ let (index, script) = spks.next()?;
+ let client = self.clone();
+ Some(std::thread::spawn(move || {
+ let mut related_txs = client.scripthash_txs(&script, None)?;
+
+ let n_confirmed =
+ related_txs.iter().filter(|tx| tx.status.confirmed).count();
+ // esplora pages on 25 confirmed transactions. If there's 25 or more we
+ // keep requesting to see if there's more.
+ if n_confirmed >= 25 {
+ loop {
+ let new_related_txs = client.scripthash_txs(
+ &script,
+ Some(related_txs.last().unwrap().txid),
+ )?;
+ let n = new_related_txs.len();
+ related_txs.extend(new_related_txs);
+ // we've reached the end
+ if n < 25 {
+ break;
+ }
+ }
+ }
+
+ Result::<_, esplora_client::Error>::Ok((index, related_txs))
+ }))
+ },
+ )
+ .collect::<Vec<_>>();
+
+ let n_handles = handles.len();
+
+ for handle in handles {
+ let (index, related_txs) = handle.join().unwrap()?; // TODO: don't unwrap
+ if related_txs.is_empty() {
+ empty_scripts += 1;
+ } else {
+ last_active_index = Some(index);
+ empty_scripts = 0;
+ }
+ for tx in related_txs {
+ let confirmation_time =
+ map_confirmation_time(&tx.status, tip_at_start.height);
+
+ if let Err(failure) = update.insert_tx(tx.to_tx(), confirmation_time) {
+ use bdk_chain::{
+ chain_graph::InsertTxError, sparse_chain::InsertTxError::*,
+ };
+ match failure {
+ InsertTxError::Chain(TxTooHigh { .. }) => {
+ unreachable!("chain position already checked earlier")
+ }
+ InsertTxError::Chain(TxMovedUnexpectedly { .. })
+ | InsertTxError::UnresolvableConflict(_) => {
+ /* implies reorg during scan. We deal with that below */
+ }
+ }
+ }
+ }
+ }
+
+ if n_handles == 0 || empty_scripts >= stop_gap {
+ break;
+ }
+ }
+
+ if let Some(last_active_index) = last_active_index {
+ last_active_indices.insert(keychain, last_active_index);
+ }
+ }
+
+ for txid in txids.into_iter() {
+ let (tx, tx_status) = match (self.get_tx(&txid)?, self.get_tx_status(&txid)?) {
+ (Some(tx), Some(tx_status)) => (tx, tx_status),
+ _ => continue,
+ };
+
+ let confirmation_time = map_confirmation_time(&tx_status, tip_at_start.height);
+
+ if let Err(failure) = update.insert_tx(tx, confirmation_time) {
+ use bdk_chain::{chain_graph::InsertTxError, sparse_chain::InsertTxError::*};
+ match failure {
+ InsertTxError::Chain(TxTooHigh { .. }) => {
+ unreachable!("chain position already checked earlier")
+ }
+ InsertTxError::Chain(TxMovedUnexpectedly { .. })
+ | InsertTxError::UnresolvableConflict(_) => {
+ /* implies reorg during scan. We deal with that below */
+ }
+ }
+ }
+ }
+
+ for op in outpoints.into_iter() {
+ let mut op_txs = Vec::with_capacity(2);
+ if let (Some(tx), Some(tx_status)) =
+ (self.get_tx(&op.txid)?, self.get_tx_status(&op.txid)?)
+ {
+ op_txs.push((tx, tx_status));
+ if let Some(OutputStatus {
+ txid: Some(txid),
+ status: Some(spend_status),
+ ..
+ }) = self.get_output_status(&op.txid, op.vout as _)?
+ {
+ if let Some(spend_tx) = self.get_tx(&txid)? {
+ op_txs.push((spend_tx, spend_status));
+ }
+ }
+ }
+
+ for (tx, status) in op_txs {
+ let confirmation_time = map_confirmation_time(&status, tip_at_start.height);
+
+ if let Err(failure) = update.insert_tx(tx, confirmation_time) {
+ use bdk_chain::{chain_graph::InsertTxError, sparse_chain::InsertTxError::*};
+ match failure {
+ InsertTxError::Chain(TxTooHigh { .. }) => {
+ unreachable!("chain position already checked earlier")
+ }
+ InsertTxError::Chain(TxMovedUnexpectedly { .. })
+ | InsertTxError::UnresolvableConflict(_) => {
+ /* implies reorg during scan. We deal with that below */
+ }
+ }
+ }
+ }
+ }
+
+ let reorg_occurred = {
+ if let Some(checkpoint) = update.chain().latest_checkpoint() {
+ self.get_block_hash(checkpoint.height)? != checkpoint.hash
+ } else {
+ false
+ }
+ };
+
+ if reorg_occurred {
+ // A reorg occurred so lets find out where all the txids we found are in the chain now.
+ // XXX: collect required because of weird type naming issues
+ let txids_found = update
+ .chain()
+ .txids()
+ .map(|(_, txid)| *txid)
+ .collect::<Vec<_>>();
+ scan.update = EsploraExt::scan_without_keychain(
+ self,
+ local_chain,
+ [],
+ txids_found,
+ [],
+ parallel_requests,
+ )?;
+ }
+
+ Ok(scan)
+ }
+}
//! This crate is used for updating structures of [`bdk_chain`] with data from an esplora server.
//!
//! The star of the show is the [`EsploraExt::scan`] method which scans for relevant
-//! blockchain data (via esplora) and outputs a [`KeychainScan`].
+//! blockchain data (via esplora) and outputs a [`KeychainScan`](bdk_chain::keychain::KeychainScan).
-use async_trait::async_trait;
-use bdk_chain::{
- bitcoin::{BlockHash, OutPoint, Script, Txid},
- chain_graph::ChainGraph,
- keychain::KeychainScan,
- sparse_chain, BlockId, ConfirmationTime,
-};
-use esplora_client::{OutputStatus, TxStatus};
-use futures::stream::{FuturesOrdered, TryStreamExt};
-use std::collections::BTreeMap;
+use bdk_chain::ConfirmationTime;
+use esplora_client::TxStatus;
pub use esplora_client;
-use esplora_client::Error;
-
-/// Trait to extend [`esplora_client::BlockingClient`] functionality.
-///
-/// Refer to [crate-level documentation] for more.
-///
-/// [crate-level documentation]: crate
#[cfg(feature = "blocking")]
-pub trait EsploraExt {
- /// Scan the blockchain (via esplora) for the data specified and returns a [`KeychainScan`].
- ///
- /// - `local_chain`: the most recent block hashes present locally
- /// - `keychain_spks`: keychains that we want to scan transactions for
- /// - `txids`: transactions that we want updated [`ChainPosition`]s for
- /// - `outpoints`: transactions associated with these outpoints (residing, spending) that we
- /// want to included in the update
- ///
- /// The scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated
- /// transactions. `parallel_requests` specifies the max number of HTTP requests to make in
- /// parallel.
- ///
- /// [`ChainPosition`]: bdk_chain::sparse_chain::ChainPosition
- #[allow(clippy::result_large_err)] // FIXME
- fn scan<K: Ord + Clone>(
- &self,
- local_chain: &BTreeMap<u32, BlockHash>,
- keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
- txids: impl IntoIterator<Item = Txid>,
- outpoints: impl IntoIterator<Item = OutPoint>,
- stop_gap: usize,
- parallel_requests: usize,
- ) -> Result<KeychainScan<K, ConfirmationTime>, Error>;
-
- /// Convenience method to call [`scan`] without requiring a keychain.
- ///
- /// [`scan`]: EsploraExt::scan
- #[allow(clippy::result_large_err)] // FIXME
- fn scan_without_keychain(
- &self,
- local_chain: &BTreeMap<u32, BlockHash>,
- misc_spks: impl IntoIterator<Item = Script>,
- txids: impl IntoIterator<Item = Txid>,
- outpoints: impl IntoIterator<Item = OutPoint>,
- parallel_requests: usize,
- ) -> Result<ChainGraph<ConfirmationTime>, Error> {
- let wallet_scan = self.scan(
- local_chain,
- [(
- (),
- misc_spks
- .into_iter()
- .enumerate()
- .map(|(i, spk)| (i as u32, spk)),
- )]
- .into(),
- txids,
- outpoints,
- usize::MAX,
- parallel_requests,
- )?;
-
- Ok(wallet_scan.update)
- }
-}
-
+mod blocking_ext;
#[cfg(feature = "blocking")]
-impl EsploraExt for esplora_client::BlockingClient {
- fn scan<K: Ord + Clone>(
- &self,
- local_chain: &BTreeMap<u32, BlockHash>,
- keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
- txids: impl IntoIterator<Item = Txid>,
- outpoints: impl IntoIterator<Item = OutPoint>,
- stop_gap: usize,
- parallel_requests: usize,
- ) -> Result<KeychainScan<K, ConfirmationTime>, Error> {
- let parallel_requests = parallel_requests.max(1);
- let mut scan = KeychainScan::default();
- let update = &mut scan.update;
- let last_active_indices = &mut scan.last_active_indices;
-
- for (&height, &original_hash) in local_chain.iter().rev() {
- let update_block_id = BlockId {
- height,
- hash: self.get_block_hash(height)?,
- };
- let _ = update
- .insert_checkpoint(update_block_id)
- .expect("cannot repeat height here");
- if update_block_id.hash == original_hash {
- break;
- }
- }
- let tip_at_start = BlockId {
- height: self.get_height()?,
- hash: self.get_tip_hash()?,
- };
- if let Err(failure) = update.insert_checkpoint(tip_at_start) {
- match failure {
- sparse_chain::InsertCheckpointError::HashNotMatching { .. } => {
- // there has been a re-org before we started scanning. We haven't consumed any iterators so it's safe to recursively call.
- return EsploraExt::scan(
- self,
- local_chain,
- keychain_spks,
- txids,
- outpoints,
- stop_gap,
- parallel_requests,
- );
- }
- }
- }
+pub use blocking_ext::*;
- for (keychain, spks) in keychain_spks {
- let mut spks = spks.into_iter();
- let mut last_active_index = None;
- let mut empty_scripts = 0;
- type IndexWithTxs = (u32, Vec<esplora_client::Tx>);
-
- loop {
- let handles = (0..parallel_requests)
- .filter_map(
- |_| -> Option<std::thread::JoinHandle<Result<IndexWithTxs, _>>> {
- let (index, script) = spks.next()?;
- let client = self.clone();
- Some(std::thread::spawn(move || {
- let mut related_txs = client.scripthash_txs(&script, None)?;
-
- let n_confirmed =
- related_txs.iter().filter(|tx| tx.status.confirmed).count();
- // esplora pages on 25 confirmed transactions. If there's 25 or more we
- // keep requesting to see if there's more.
- if n_confirmed >= 25 {
- loop {
- let new_related_txs = client.scripthash_txs(
- &script,
- Some(related_txs.last().unwrap().txid),
- )?;
- let n = new_related_txs.len();
- related_txs.extend(new_related_txs);
- // we've reached the end
- if n < 25 {
- break;
- }
- }
- }
-
- Result::<_, esplora_client::Error>::Ok((index, related_txs))
- }))
- },
- )
- .collect::<Vec<_>>();
-
- let n_handles = handles.len();
-
- for handle in handles {
- let (index, related_txs) = handle.join().unwrap()?; // TODO: don't unwrap
- if related_txs.is_empty() {
- empty_scripts += 1;
- } else {
- last_active_index = Some(index);
- empty_scripts = 0;
- }
- for tx in related_txs {
- let confirmation_time =
- map_confirmation_time(&tx.status, tip_at_start.height);
-
- if let Err(failure) = update.insert_tx(tx.to_tx(), confirmation_time) {
- use bdk_chain::{
- chain_graph::InsertTxError, sparse_chain::InsertTxError::*,
- };
- match failure {
- InsertTxError::Chain(TxTooHigh { .. }) => {
- unreachable!("chain position already checked earlier")
- }
- InsertTxError::Chain(TxMovedUnexpectedly { .. })
- | InsertTxError::UnresolvableConflict(_) => {
- /* implies reorg during scan. We deal with that below */
- }
- }
- }
- }
- }
-
- if n_handles == 0 || empty_scripts >= stop_gap {
- break;
- }
- }
-
- if let Some(last_active_index) = last_active_index {
- last_active_indices.insert(keychain, last_active_index);
- }
- }
-
- for txid in txids.into_iter() {
- let (tx, tx_status) = match (self.get_tx(&txid)?, self.get_tx_status(&txid)?) {
- (Some(tx), Some(tx_status)) => (tx, tx_status),
- _ => continue,
- };
-
- let confirmation_time = map_confirmation_time(&tx_status, tip_at_start.height);
-
- if let Err(failure) = update.insert_tx(tx, confirmation_time) {
- use bdk_chain::{chain_graph::InsertTxError, sparse_chain::InsertTxError::*};
- match failure {
- InsertTxError::Chain(TxTooHigh { .. }) => {
- unreachable!("chain position already checked earlier")
- }
- InsertTxError::Chain(TxMovedUnexpectedly { .. })
- | InsertTxError::UnresolvableConflict(_) => {
- /* implies reorg during scan. We deal with that below */
- }
- }
- }
- }
-
- for op in outpoints.into_iter() {
- let mut op_txs = Vec::with_capacity(2);
- if let (Some(tx), Some(tx_status)) =
- (self.get_tx(&op.txid)?, self.get_tx_status(&op.txid)?)
- {
- op_txs.push((tx, tx_status));
- if let Some(OutputStatus {
- txid: Some(txid),
- status: Some(spend_status),
- ..
- }) = self.get_output_status(&op.txid, op.vout as _)?
- {
- if let Some(spend_tx) = self.get_tx(&txid)? {
- op_txs.push((spend_tx, spend_status));
- }
- }
- }
-
- for (tx, status) in op_txs {
- let confirmation_time = map_confirmation_time(&status, tip_at_start.height);
-
- if let Err(failure) = update.insert_tx(tx, confirmation_time) {
- use bdk_chain::{chain_graph::InsertTxError, sparse_chain::InsertTxError::*};
- match failure {
- InsertTxError::Chain(TxTooHigh { .. }) => {
- unreachable!("chain position already checked earlier")
- }
- InsertTxError::Chain(TxMovedUnexpectedly { .. })
- | InsertTxError::UnresolvableConflict(_) => {
- /* implies reorg during scan. We deal with that below */
- }
- }
- }
- }
- }
-
- let reorg_occurred = {
- if let Some(checkpoint) = update.chain().latest_checkpoint() {
- self.get_block_hash(checkpoint.height)? != checkpoint.hash
- } else {
- false
- }
- };
-
- if reorg_occurred {
- // A reorg occurred so lets find out where all the txids we found are in the chain now.
- // XXX: collect required because of weird type naming issues
- let txids_found = update
- .chain()
- .txids()
- .map(|(_, txid)| *txid)
- .collect::<Vec<_>>();
- scan.update = EsploraExt::scan_without_keychain(
- self,
- local_chain,
- [],
- txids_found,
- [],
- parallel_requests,
- )?;
- }
-
- Ok(scan)
- }
-}
+#[cfg(feature = "async")]
+mod async_ext;
+#[cfg(feature = "async")]
+pub use async_ext::*;
-fn map_confirmation_time(tx_status: &TxStatus, height_at_start: u32) -> ConfirmationTime {
+pub(crate) fn map_confirmation_time(
+ tx_status: &TxStatus,
+ height_at_start: u32,
+) -> ConfirmationTime {
match (tx_status.block_time, tx_status.block_height) {
(Some(time), Some(height)) if height <= height_at_start => {
ConfirmationTime::Confirmed { height, time }
_ => ConfirmationTime::Unconfirmed,
}
}
-
-#[cfg(feature = "async")]
-#[async_trait(?Send)]
-pub trait EsploraAsyncExt {
- /// Scan the blockchain (via esplora) for the data specified and returns a [`KeychainScan`].
- ///
- /// - `local_chain`: the most recent block hashes present locally
- /// - `keychain_spks`: keychains that we want to scan transactions for
- /// - `txids`: transactions that we want updated [`ChainPosition`]s for
- /// - `outpoints`: transactions associated with these outpoints (residing, spending) that we
- /// want to included in the update
- ///
- /// The scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated
- /// transactions. `parallel_requests` specifies the max number of HTTP requests to make in
- /// parallel.
- ///
- /// [`ChainPosition`]: bdk_chain::sparse_chain::ChainPosition
- #[allow(clippy::result_large_err)] // FIXME
- async fn scan<K: Ord + Clone>(
- &self,
- local_chain: &BTreeMap<u32, BlockHash>,
- keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
- txids: impl IntoIterator<Item = Txid>,
- outpoints: impl IntoIterator<Item = OutPoint>,
- stop_gap: usize,
- parallel_requests: usize,
- ) -> Result<KeychainScan<K, ConfirmationTime>, Error>;
-
- /// Convenience method to call [`scan`] without requiring a keychain.
- ///
- /// [`scan`]: EsploraAsyncExt::scan
- #[allow(clippy::result_large_err)] // FIXME
- async fn scan_without_keychain(
- &self,
- local_chain: &BTreeMap<u32, BlockHash>,
- misc_spks: impl IntoIterator<Item = Script>,
- txids: impl IntoIterator<Item = Txid>,
- outpoints: impl IntoIterator<Item = OutPoint>,
- parallel_requests: usize,
- ) -> Result<ChainGraph<ConfirmationTime>, Error> {
- let wallet_scan = self
- .scan(
- local_chain,
- [(
- (),
- misc_spks
- .into_iter()
- .enumerate()
- .map(|(i, spk)| (i as u32, spk)),
- )]
- .into(),
- txids,
- outpoints,
- usize::MAX,
- parallel_requests,
- )
- .await?;
-
- Ok(wallet_scan.update)
- }
-}
-
-#[cfg(feature = "async")]
-#[async_trait(?Send)]
-impl EsploraAsyncExt for esplora_client::AsyncClient {
- async fn scan<K: Ord + Clone>(
- &self,
- local_chain: &BTreeMap<u32, BlockHash>,
- keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
- txids: impl IntoIterator<Item = Txid>,
- outpoints: impl IntoIterator<Item = OutPoint>,
- stop_gap: usize,
- parallel_requests: usize,
- ) -> Result<KeychainScan<K, ConfirmationTime>, Error> {
- let parallel_requests = parallel_requests.max(1);
- let mut scan = KeychainScan::default();
- let update = &mut scan.update;
- let last_active_indices = &mut scan.last_active_indices;
-
- for (&height, &original_hash) in local_chain.iter().rev() {
- let update_block_id = BlockId {
- height,
- hash: self.get_block_hash(height).await?,
- };
- let _ = update
- .insert_checkpoint(update_block_id)
- .expect("cannot repeat height here");
- if update_block_id.hash == original_hash {
- break;
- }
- }
- let tip_at_start = BlockId {
- height: self.get_height().await?,
- hash: self.get_tip_hash().await?,
- };
- if let Err(failure) = update.insert_checkpoint(tip_at_start) {
- match failure {
- sparse_chain::InsertCheckpointError::HashNotMatching { .. } => {
- // there has been a re-org before we started scanning. We haven't consumed any iterators so it's safe to recursively call.
- return EsploraAsyncExt::scan(
- self,
- local_chain,
- keychain_spks,
- txids,
- outpoints,
- stop_gap,
- parallel_requests,
- )
- .await;
- }
- }
- }
-
- for (keychain, spks) in keychain_spks {
- let mut spks = spks.into_iter();
- let mut last_active_index = None;
- let mut empty_scripts = 0;
- type IndexWithTxs = (u32, Vec<esplora_client::Tx>);
-
- loop {
- let futures: FuturesOrdered<_> = (0..parallel_requests)
- .filter_map(|_| {
- let (index, script) = spks.next()?;
- let client = self.clone();
- Some(async move {
- let mut related_txs = client.scripthash_txs(&script, None).await?;
-
- let n_confirmed =
- related_txs.iter().filter(|tx| tx.status.confirmed).count();
- // esplora pages on 25 confirmed transactions. If there's 25 or more we
- // keep requesting to see if there's more.
- if n_confirmed >= 25 {
- loop {
- let new_related_txs = client
- .scripthash_txs(
- &script,
- Some(related_txs.last().unwrap().txid),
- )
- .await?;
- let n = new_related_txs.len();
- related_txs.extend(new_related_txs);
- // we've reached the end
- if n < 25 {
- break;
- }
- }
- }
-
- Result::<_, esplora_client::Error>::Ok((index, related_txs))
- })
- })
- .collect();
-
- let n_futures = futures.len();
-
- let idx_with_tx: Vec<IndexWithTxs> = futures.try_collect().await?;
-
- for (index, related_txs) in idx_with_tx {
- if related_txs.is_empty() {
- empty_scripts += 1;
- } else {
- last_active_index = Some(index);
- empty_scripts = 0;
- }
- for tx in related_txs {
- let confirmation_time =
- map_confirmation_time(&tx.status, tip_at_start.height);
-
- if let Err(failure) = update.insert_tx(tx.to_tx(), confirmation_time) {
- use bdk_chain::{
- chain_graph::InsertTxError, sparse_chain::InsertTxError::*,
- };
- match failure {
- InsertTxError::Chain(TxTooHigh { .. }) => {
- unreachable!("chain position already checked earlier")
- }
- InsertTxError::Chain(TxMovedUnexpectedly { .. })
- | InsertTxError::UnresolvableConflict(_) => {
- /* implies reorg during scan. We deal with that below */
- }
- }
- }
- }
- }
-
- if n_futures == 0 || empty_scripts >= stop_gap {
- break;
- }
- }
-
- if let Some(last_active_index) = last_active_index {
- last_active_indices.insert(keychain, last_active_index);
- }
- }
-
- for txid in txids.into_iter() {
- let (tx, tx_status) =
- match (self.get_tx(&txid).await?, self.get_tx_status(&txid).await?) {
- (Some(tx), Some(tx_status)) => (tx, tx_status),
- _ => continue,
- };
-
- let confirmation_time = map_confirmation_time(&tx_status, tip_at_start.height);
-
- if let Err(failure) = update.insert_tx(tx, confirmation_time) {
- use bdk_chain::{chain_graph::InsertTxError, sparse_chain::InsertTxError::*};
- match failure {
- InsertTxError::Chain(TxTooHigh { .. }) => {
- unreachable!("chain position already checked earlier")
- }
- InsertTxError::Chain(TxMovedUnexpectedly { .. })
- | InsertTxError::UnresolvableConflict(_) => {
- /* implies reorg during scan. We deal with that below */
- }
- }
- }
- }
-
- for op in outpoints.into_iter() {
- let mut op_txs = Vec::with_capacity(2);
- if let (Some(tx), Some(tx_status)) = (
- self.get_tx(&op.txid).await?,
- self.get_tx_status(&op.txid).await?,
- ) {
- op_txs.push((tx, tx_status));
- if let Some(OutputStatus {
- txid: Some(txid),
- status: Some(spend_status),
- ..
- }) = self.get_output_status(&op.txid, op.vout as _).await?
- {
- if let Some(spend_tx) = self.get_tx(&txid).await? {
- op_txs.push((spend_tx, spend_status));
- }
- }
- }
-
- for (tx, status) in op_txs {
- let confirmation_time = map_confirmation_time(&status, tip_at_start.height);
-
- if let Err(failure) = update.insert_tx(tx, confirmation_time) {
- use bdk_chain::{chain_graph::InsertTxError, sparse_chain::InsertTxError::*};
- match failure {
- InsertTxError::Chain(TxTooHigh { .. }) => {
- unreachable!("chain position already checked earlier")
- }
- InsertTxError::Chain(TxMovedUnexpectedly { .. })
- | InsertTxError::UnresolvableConflict(_) => {
- /* implies reorg during scan. We deal with that below */
- }
- }
- }
- }
- }
-
- let reorg_occurred = {
- if let Some(checkpoint) = update.chain().latest_checkpoint() {
- self.get_block_hash(checkpoint.height).await? != checkpoint.hash
- } else {
- false
- }
- };
-
- if reorg_occurred {
- // A reorg occurred so lets find out where all the txids we found are in the chain now.
- // XXX: collect required because of weird type naming issues
- let txids_found = update
- .chain()
- .txids()
- .map(|(_, txid)| *txid)
- .collect::<Vec<_>>();
- scan.update = EsploraAsyncExt::scan_without_keychain(
- self,
- local_chain,
- [],
- txids_found,
- [],
- parallel_requests,
- )
- .await?;
- }
-
- Ok(scan)
- }
-}
[dependencies]
bdk = { path = "../../crates/bdk" }
-bdk_esplora = { path = "../../crates/esplora" }
+bdk_esplora = { path = "../../crates/esplora", features = ["blocking"] }
bdk_file_store = { path = "../../crates/file_store" }
--- /dev/null
+[package]
+name = "wallet_esplora_async"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+bdk = { path = "../../crates/bdk" }
+bdk_esplora = { path = "../../crates/esplora", features = ["async-https"] }
+bdk_file_store = { path = "../../crates/file_store" }
+tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros"] }
--- /dev/null
+use std::{io::Write, str::FromStr};
+
+use bdk::{
+ bitcoin::{Address, Network},
+ wallet::AddressIndex,
+ SignOptions, Wallet,
+};
+use bdk_esplora::{esplora_client, EsploraAsyncExt};
+use bdk_file_store::KeychainStore;
+
+const SEND_AMOUNT: u64 = 5000;
+const STOP_GAP: usize = 50;
+const PARALLEL_REQUESTS: usize = 5;
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn std::error::Error>> {
+ let db_path = std::env::temp_dir().join("bdk-esplora-example");
+ let db = KeychainStore::new_from_path(db_path)?;
+ let external_descriptor = "wpkh(tprv8ZgxMBicQKsPdy6LMhUtFHAgpocR8GC6QmwMSFpZs7h6Eziw3SpThFfczTDh5rW2krkqffa11UpX3XkeTTB2FvzZKWXqPY54Y6Rq4AQ5R8L/84'/0'/0'/0/*)";
+ let internal_descriptor = "wpkh(tprv8ZgxMBicQKsPdy6LMhUtFHAgpocR8GC6QmwMSFpZs7h6Eziw3SpThFfczTDh5rW2krkqffa11UpX3XkeTTB2FvzZKWXqPY54Y6Rq4AQ5R8L/84'/0'/0'/1/*)";
+
+ let mut wallet = Wallet::new(
+ external_descriptor,
+ Some(internal_descriptor),
+ db,
+ Network::Testnet,
+ )?;
+
+ let address = wallet.get_address(AddressIndex::New);
+ println!("Generated Address: {}", address);
+
+ let balance = wallet.get_balance();
+ println!("Wallet balance before syncing: {} sats", balance.total());
+
+ print!("Syncing...");
+ // Scanning the blockchain
+ let esplora_url = "https://mempool.space/testnet/api";
+ let client = esplora_client::Builder::new(esplora_url).build_async()?;
+ let checkpoints = wallet.checkpoints();
+ let spks = wallet
+ .spks_of_all_keychains()
+ .into_iter()
+ .map(|(k, spks)| {
+ let mut first = true;
+ (
+ k,
+ spks.inspect(move |(spk_i, _)| {
+ if first {
+ first = false;
+ print!("\nScanning keychain [{:?}]:", k);
+ }
+ print!(" {}", spk_i);
+ let _ = std::io::stdout().flush();
+ }),
+ )
+ })
+ .collect();
+ let update = client
+ .scan(
+ checkpoints,
+ spks,
+ std::iter::empty(),
+ std::iter::empty(),
+ STOP_GAP,
+ PARALLEL_REQUESTS,
+ )
+ .await?;
+ println!();
+ wallet.apply_update(update)?;
+ wallet.commit()?;
+
+ let balance = wallet.get_balance();
+ println!("Wallet balance after syncing: {} sats", balance.total());
+
+ if balance.total() < SEND_AMOUNT {
+ println!(
+ "Please send at least {} sats to the receiving address",
+ SEND_AMOUNT
+ );
+ std::process::exit(0);
+ }
+
+ let faucet_address = Address::from_str("mkHS9ne12qx9pS9VojpwU5xtRd4T7X7ZUt")?;
+
+ let mut tx_builder = wallet.build_tx();
+ tx_builder
+ .add_recipient(faucet_address.script_pubkey(), SEND_AMOUNT)
+ .enable_rbf();
+
+ let (mut psbt, _) = tx_builder.finish()?;
+ let finalized = wallet.sign(&mut psbt, SignOptions::default())?;
+ assert!(finalized);
+
+ let tx = psbt.extract_tx();
+ client.broadcast(&tx).await?;
+ println!("Tx broadcasted! Txid: {}", tx.txid());
+
+ Ok(())
+}