//! The star of the show is the [`EsploraExt::scan`] method which scans for relevant
//! blockchain data (via esplora) and outputs a [`KeychainScan`].
+use async_trait::async_trait;
use bdk_chain::{
bitcoin::{BlockHash, OutPoint, Script, Txid},
chain_graph::ChainGraph,
sparse_chain, BlockId, ConfirmationTime,
};
use esplora_client::{OutputStatus, TxStatus};
+use futures::stream::{FuturesOrdered, TryStreamExt};
use std::collections::BTreeMap;
pub use esplora_client;
/// Refer to [crate-level documentation] for more.
///
/// [crate-level documentation]: crate
+
+#[cfg(feature = "blocking")]
pub trait EsploraExt {
/// Scan the blockchain (via esplora) for the data specified and returns a [`KeychainScan`].
///
}
#[cfg(feature = "async")]
-impl EsploraExt for esplora_client::AsyncClient {
- fn scan<K: Ord + Clone>(
+#[async_trait(?Send)]
+pub trait EsploraAsyncExt {
+ /// Scan the blockchain (via esplora) for the data specified and returns a [`KeychainScan`].
+ ///
+ /// - `local_chain`: the most recent block hashes present locally
+ /// - `keychain_spks`: keychains that we want to scan transactions for
+ /// - `txids`: transactions that we want updated [`ChainPosition`]s for
+ /// - `outpoints`: transactions associated with these outpoints (residing, spending) that we
+ /// want to included in the update
+ ///
+ /// The scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated
+ /// transactions. `parallel_requests` specifies the max number of HTTP requests to make in
+ /// parallel.
+ ///
+ /// [`ChainPosition`]: bdk_chain::sparse_chain::ChainPosition
+ #[allow(clippy::result_large_err)] // FIXME
+ async fn scan<K: Ord + Clone>(
&self,
- _local_chain: &BTreeMap<u32, BlockHash>,
- _keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
- _txids: impl IntoIterator<Item = Txid>,
- _outpoints: impl IntoIterator<Item = OutPoint>,
- _stop_gap: usize,
- _parallel_requests: usize,
+ local_chain: &BTreeMap<u32, BlockHash>,
+ keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
+ txids: impl IntoIterator<Item = Txid>,
+ outpoints: impl IntoIterator<Item = OutPoint>,
+ stop_gap: usize,
+ parallel_requests: usize,
+ ) -> Result<KeychainScan<K, ConfirmationTime>, Error>;
+
+ /// Convenience method to call [`scan`] without requiring a keychain.
+ ///
+ /// [`scan`]: EsploraAsyncExt::scan
+ #[allow(clippy::result_large_err)] // FIXME
+ async fn scan_without_keychain(
+ &self,
+ local_chain: &BTreeMap<u32, BlockHash>,
+ misc_spks: impl IntoIterator<Item = Script>,
+ txids: impl IntoIterator<Item = Txid>,
+ outpoints: impl IntoIterator<Item = OutPoint>,
+ parallel_requests: usize,
+ ) -> Result<ChainGraph<ConfirmationTime>, Error> {
+ let wallet_scan = self
+ .scan(
+ local_chain,
+ [(
+ (),
+ misc_spks
+ .into_iter()
+ .enumerate()
+ .map(|(i, spk)| (i as u32, spk)),
+ )]
+ .into(),
+ txids,
+ outpoints,
+ usize::MAX,
+ parallel_requests,
+ )
+ .await?;
+
+ Ok(wallet_scan.update)
+ }
+}
+
+#[cfg(feature = "async")]
+#[async_trait(?Send)]
+impl EsploraAsyncExt for esplora_client::AsyncClient {
+ async fn scan<K: Ord + Clone>(
+ &self,
+ local_chain: &BTreeMap<u32, BlockHash>,
+ keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
+ txids: impl IntoIterator<Item = Txid>,
+ outpoints: impl IntoIterator<Item = OutPoint>,
+ stop_gap: usize,
+ parallel_requests: usize,
) -> Result<KeychainScan<K, ConfirmationTime>, Error> {
- todo!()
+ let parallel_requests = parallel_requests.max(1);
+ let mut scan = KeychainScan::default();
+ let update = &mut scan.update;
+ let last_active_indices = &mut scan.last_active_indices;
+
+ for (&height, &original_hash) in local_chain.iter().rev() {
+ let update_block_id = BlockId {
+ height,
+ hash: self.get_block_hash(height).await?,
+ };
+ let _ = update
+ .insert_checkpoint(update_block_id)
+ .expect("cannot repeat height here");
+ if update_block_id.hash == original_hash {
+ break;
+ }
+ }
+ let tip_at_start = BlockId {
+ height: self.get_height().await?,
+ hash: self.get_tip_hash().await?,
+ };
+ if let Err(failure) = update.insert_checkpoint(tip_at_start) {
+ match failure {
+ sparse_chain::InsertCheckpointError::HashNotMatching { .. } => {
+ // there has been a re-org before we started scanning. We haven't consumed any iterators so it's safe to recursively call.
+ return EsploraAsyncExt::scan(
+ self,
+ local_chain,
+ keychain_spks,
+ txids,
+ outpoints,
+ stop_gap,
+ parallel_requests,
+ )
+ .await;
+ }
+ }
+ }
+
+ for (keychain, spks) in keychain_spks {
+ let mut spks = spks.into_iter();
+ let mut last_active_index = None;
+ let mut empty_scripts = 0;
+ type IndexWithTxs = (u32, Vec<esplora_client::Tx>);
+
+ loop {
+ let futures: FuturesOrdered<_> = (0..parallel_requests)
+ .filter_map(|_| {
+ let (index, script) = spks.next()?;
+ let client = self.clone();
+ Some(async move {
+ let mut related_txs = client.scripthash_txs(&script, None).await?;
+
+ let n_confirmed =
+ related_txs.iter().filter(|tx| tx.status.confirmed).count();
+ // esplora pages on 25 confirmed transactions. If there's 25 or more we
+ // keep requesting to see if there's more.
+ if n_confirmed >= 25 {
+ loop {
+ let new_related_txs = client
+ .scripthash_txs(
+ &script,
+ Some(related_txs.last().unwrap().txid),
+ )
+ .await?;
+ let n = new_related_txs.len();
+ related_txs.extend(new_related_txs);
+ // we've reached the end
+ if n < 25 {
+ break;
+ }
+ }
+ }
+
+ Result::<_, esplora_client::Error>::Ok((index, related_txs))
+ })
+ })
+ .collect();
+
+ let n_futures = futures.len();
+
+ let idx_with_tx: Vec<IndexWithTxs> = futures.try_collect().await?;
+
+ for (index, related_txs) in idx_with_tx {
+ if related_txs.is_empty() {
+ empty_scripts += 1;
+ } else {
+ last_active_index = Some(index);
+ empty_scripts = 0;
+ }
+ for tx in related_txs {
+ let confirmation_time =
+ map_confirmation_time(&tx.status, tip_at_start.height);
+
+ if let Err(failure) = update.insert_tx(tx.to_tx(), confirmation_time) {
+ use bdk_chain::{
+ chain_graph::InsertTxError, sparse_chain::InsertTxError::*,
+ };
+ match failure {
+ InsertTxError::Chain(TxTooHigh { .. }) => {
+ unreachable!("chain position already checked earlier")
+ }
+ InsertTxError::Chain(TxMovedUnexpectedly { .. })
+ | InsertTxError::UnresolvableConflict(_) => {
+ /* implies reorg during scan. We deal with that below */
+ }
+ }
+ }
+ }
+ }
+
+ if n_futures == 0 || empty_scripts >= stop_gap {
+ break;
+ }
+ }
+
+ if let Some(last_active_index) = last_active_index {
+ last_active_indices.insert(keychain, last_active_index);
+ }
+ }
+
+ for txid in txids.into_iter() {
+ let (tx, tx_status) =
+ match (self.get_tx(&txid).await?, self.get_tx_status(&txid).await?) {
+ (Some(tx), Some(tx_status)) => (tx, tx_status),
+ _ => continue,
+ };
+
+ let confirmation_time = map_confirmation_time(&tx_status, tip_at_start.height);
+
+ if let Err(failure) = update.insert_tx(tx, confirmation_time) {
+ use bdk_chain::{chain_graph::InsertTxError, sparse_chain::InsertTxError::*};
+ match failure {
+ InsertTxError::Chain(TxTooHigh { .. }) => {
+ unreachable!("chain position already checked earlier")
+ }
+ InsertTxError::Chain(TxMovedUnexpectedly { .. })
+ | InsertTxError::UnresolvableConflict(_) => {
+ /* implies reorg during scan. We deal with that below */
+ }
+ }
+ }
+ }
+
+ for op in outpoints.into_iter() {
+ let mut op_txs = Vec::with_capacity(2);
+ if let (Some(tx), Some(tx_status)) = (
+ self.get_tx(&op.txid).await?,
+ self.get_tx_status(&op.txid).await?,
+ ) {
+ op_txs.push((tx, tx_status));
+ if let Some(OutputStatus {
+ txid: Some(txid),
+ status: Some(spend_status),
+ ..
+ }) = self.get_output_status(&op.txid, op.vout as _).await?
+ {
+ if let Some(spend_tx) = self.get_tx(&txid).await? {
+ op_txs.push((spend_tx, spend_status));
+ }
+ }
+ }
+
+ for (tx, status) in op_txs {
+ let confirmation_time = map_confirmation_time(&status, tip_at_start.height);
+
+ if let Err(failure) = update.insert_tx(tx, confirmation_time) {
+ use bdk_chain::{chain_graph::InsertTxError, sparse_chain::InsertTxError::*};
+ match failure {
+ InsertTxError::Chain(TxTooHigh { .. }) => {
+ unreachable!("chain position already checked earlier")
+ }
+ InsertTxError::Chain(TxMovedUnexpectedly { .. })
+ | InsertTxError::UnresolvableConflict(_) => {
+ /* implies reorg during scan. We deal with that below */
+ }
+ }
+ }
+ }
+ }
+
+ let reorg_occurred = {
+ if let Some(checkpoint) = update.chain().latest_checkpoint() {
+ self.get_block_hash(checkpoint.height).await? != checkpoint.hash
+ } else {
+ false
+ }
+ };
+
+ if reorg_occurred {
+ // A reorg occurred so lets find out where all the txids we found are in the chain now.
+ // XXX: collect required because of weird type naming issues
+ let txids_found = update
+ .chain()
+ .txids()
+ .map(|(_, txid)| *txid)
+ .collect::<Vec<_>>();
+ scan.update = EsploraAsyncExt::scan_without_keychain(
+ self,
+ local_chain,
+ [],
+ txids_found,
+ [],
+ parallel_requests,
+ )
+ .await?;
+ }
+
+ Ok(scan)
}
}