]> Untitled Git - bdk/commitdiff
Implement EsploraExt for Async client
authorVladimir Fomene <vladimirfomene@gmail.com>
Tue, 7 Mar 2023 14:04:06 +0000 (17:04 +0300)
committer志宇 <hello@evanlinjin.me>
Wed, 8 Mar 2023 20:25:54 +0000 (09:25 +1300)
Creates a separate async EsploraAsyncExt trait for the
async client using async-trait crate. It has thesame
methods as the EsploraExt trait for the blocking client.
This trait is implemented on the AsyncClient of the
rust-esplora-client crate.

crates/esplora/Cargo.toml
crates/esplora/src/lib.rs

index 1cd0d1c0d9f5e73ae14e4bb3417f29cdbe554862..109a7399b0e3cd13b2028ce11f97c2a938a1eef1 100644 (file)
@@ -14,6 +14,8 @@ readme = "README.md"
 [dependencies]
 bdk_chain = { path = "../chain", version = "0.3.1", features = ["serde", "miniscript"] }
 esplora-client = { version = "0.3", default-features = false }
+async-trait = "0.1.66"
+futures = "0.3.26"
 
 [features]
 default = ["async", "blocking"]
index 18ab22b607b8a29a4ad95c64f431e02a41d38646..9ef2b1bda0442e5bbc737466ff6853726391a6e2 100644 (file)
@@ -3,6 +3,7 @@
 //! The star of the show is the  [`EsploraExt::scan`] method which scans for relevant
 //! blockchain data (via esplora) and outputs a [`KeychainScan`].
 
+use async_trait::async_trait;
 use bdk_chain::{
     bitcoin::{BlockHash, OutPoint, Script, Txid},
     chain_graph::ChainGraph,
@@ -10,6 +11,7 @@ use bdk_chain::{
     sparse_chain, BlockId, ConfirmationTime,
 };
 use esplora_client::{OutputStatus, TxStatus};
+use futures::stream::{FuturesOrdered, TryStreamExt};
 use std::collections::BTreeMap;
 
 pub use esplora_client;
@@ -20,6 +22,8 @@ use esplora_client::Error;
 /// Refer to [crate-level documentation] for more.
 ///
 /// [crate-level documentation]: crate
+
+#[cfg(feature = "blocking")]
 pub trait EsploraExt {
     /// Scan the blockchain (via esplora) for the data specified and returns a [`KeychainScan`].
     ///
@@ -305,16 +309,286 @@ fn map_confirmation_time(tx_status: &TxStatus, height_at_start: u32) -> Confirma
 }
 
 #[cfg(feature = "async")]
-impl EsploraExt for esplora_client::AsyncClient {
-    fn scan<K: Ord + Clone>(
+#[async_trait(?Send)]
+pub trait EsploraAsyncExt {
+    /// Scan the blockchain (via esplora) for the data specified and returns a [`KeychainScan`].
+    ///
+    /// - `local_chain`: the most recent block hashes present locally
+    /// - `keychain_spks`: keychains that we want to scan transactions for
+    /// - `txids`: transactions that we want updated [`ChainPosition`]s for
+    /// - `outpoints`: transactions associated with these outpoints (residing, spending) that we
+    ///     want to included in the update
+    ///
+    /// The scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated
+    /// transactions. `parallel_requests` specifies the max number of HTTP requests to make in
+    /// parallel.
+    ///
+    /// [`ChainPosition`]: bdk_chain::sparse_chain::ChainPosition
+    #[allow(clippy::result_large_err)] // FIXME
+    async fn scan<K: Ord + Clone>(
         &self,
-        _local_chain: &BTreeMap<u32, BlockHash>,
-        _keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
-        _txids: impl IntoIterator<Item = Txid>,
-        _outpoints: impl IntoIterator<Item = OutPoint>,
-        _stop_gap: usize,
-        _parallel_requests: usize,
+        local_chain: &BTreeMap<u32, BlockHash>,
+        keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
+        txids: impl IntoIterator<Item = Txid>,
+        outpoints: impl IntoIterator<Item = OutPoint>,
+        stop_gap: usize,
+        parallel_requests: usize,
+    ) -> Result<KeychainScan<K, ConfirmationTime>, Error>;
+
+    /// Convenience method to call [`scan`] without requiring a keychain.
+    ///
+    /// [`scan`]: EsploraAsyncExt::scan
+    #[allow(clippy::result_large_err)] // FIXME
+    async fn scan_without_keychain(
+        &self,
+        local_chain: &BTreeMap<u32, BlockHash>,
+        misc_spks: impl IntoIterator<Item = Script>,
+        txids: impl IntoIterator<Item = Txid>,
+        outpoints: impl IntoIterator<Item = OutPoint>,
+        parallel_requests: usize,
+    ) -> Result<ChainGraph<ConfirmationTime>, Error> {
+        let wallet_scan = self
+            .scan(
+                local_chain,
+                [(
+                    (),
+                    misc_spks
+                        .into_iter()
+                        .enumerate()
+                        .map(|(i, spk)| (i as u32, spk)),
+                )]
+                .into(),
+                txids,
+                outpoints,
+                usize::MAX,
+                parallel_requests,
+            )
+            .await?;
+
+        Ok(wallet_scan.update)
+    }
+}
+
+#[cfg(feature = "async")]
+#[async_trait(?Send)]
+impl EsploraAsyncExt for esplora_client::AsyncClient {
+    async fn scan<K: Ord + Clone>(
+        &self,
+        local_chain: &BTreeMap<u32, BlockHash>,
+        keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
+        txids: impl IntoIterator<Item = Txid>,
+        outpoints: impl IntoIterator<Item = OutPoint>,
+        stop_gap: usize,
+        parallel_requests: usize,
     ) -> Result<KeychainScan<K, ConfirmationTime>, Error> {
-        todo!()
+        let parallel_requests = parallel_requests.max(1);
+        let mut scan = KeychainScan::default();
+        let update = &mut scan.update;
+        let last_active_indices = &mut scan.last_active_indices;
+
+        for (&height, &original_hash) in local_chain.iter().rev() {
+            let update_block_id = BlockId {
+                height,
+                hash: self.get_block_hash(height).await?,
+            };
+            let _ = update
+                .insert_checkpoint(update_block_id)
+                .expect("cannot repeat height here");
+            if update_block_id.hash == original_hash {
+                break;
+            }
+        }
+        let tip_at_start = BlockId {
+            height: self.get_height().await?,
+            hash: self.get_tip_hash().await?,
+        };
+        if let Err(failure) = update.insert_checkpoint(tip_at_start) {
+            match failure {
+                sparse_chain::InsertCheckpointError::HashNotMatching { .. } => {
+                    // there has been a re-org before we started scanning. We haven't consumed any iterators so it's safe to recursively call.
+                    return EsploraAsyncExt::scan(
+                        self,
+                        local_chain,
+                        keychain_spks,
+                        txids,
+                        outpoints,
+                        stop_gap,
+                        parallel_requests,
+                    )
+                    .await;
+                }
+            }
+        }
+
+        for (keychain, spks) in keychain_spks {
+            let mut spks = spks.into_iter();
+            let mut last_active_index = None;
+            let mut empty_scripts = 0;
+            type IndexWithTxs = (u32, Vec<esplora_client::Tx>);
+
+            loop {
+                let futures: FuturesOrdered<_> = (0..parallel_requests)
+                    .filter_map(|_| {
+                        let (index, script) = spks.next()?;
+                        let client = self.clone();
+                        Some(async move {
+                            let mut related_txs = client.scripthash_txs(&script, None).await?;
+
+                            let n_confirmed =
+                                related_txs.iter().filter(|tx| tx.status.confirmed).count();
+                            // esplora pages on 25 confirmed transactions. If there's 25 or more we
+                            // keep requesting to see if there's more.
+                            if n_confirmed >= 25 {
+                                loop {
+                                    let new_related_txs = client
+                                        .scripthash_txs(
+                                            &script,
+                                            Some(related_txs.last().unwrap().txid),
+                                        )
+                                        .await?;
+                                    let n = new_related_txs.len();
+                                    related_txs.extend(new_related_txs);
+                                    // we've reached the end
+                                    if n < 25 {
+                                        break;
+                                    }
+                                }
+                            }
+
+                            Result::<_, esplora_client::Error>::Ok((index, related_txs))
+                        })
+                    })
+                    .collect();
+
+                let n_futures = futures.len();
+
+                let idx_with_tx: Vec<IndexWithTxs> = futures.try_collect().await?;
+
+                for (index, related_txs) in idx_with_tx {
+                    if related_txs.is_empty() {
+                        empty_scripts += 1;
+                    } else {
+                        last_active_index = Some(index);
+                        empty_scripts = 0;
+                    }
+                    for tx in related_txs {
+                        let confirmation_time =
+                            map_confirmation_time(&tx.status, tip_at_start.height);
+
+                        if let Err(failure) = update.insert_tx(tx.to_tx(), confirmation_time) {
+                            use bdk_chain::{
+                                chain_graph::InsertTxError, sparse_chain::InsertTxError::*,
+                            };
+                            match failure {
+                                InsertTxError::Chain(TxTooHigh { .. }) => {
+                                    unreachable!("chain position already checked earlier")
+                                }
+                                InsertTxError::Chain(TxMovedUnexpectedly { .. })
+                                | InsertTxError::UnresolvableConflict(_) => {
+                                    /* implies reorg during scan. We deal with that below */
+                                }
+                            }
+                        }
+                    }
+                }
+
+                if n_futures == 0 || empty_scripts >= stop_gap {
+                    break;
+                }
+            }
+
+            if let Some(last_active_index) = last_active_index {
+                last_active_indices.insert(keychain, last_active_index);
+            }
+        }
+
+        for txid in txids.into_iter() {
+            let (tx, tx_status) =
+                match (self.get_tx(&txid).await?, self.get_tx_status(&txid).await?) {
+                    (Some(tx), Some(tx_status)) => (tx, tx_status),
+                    _ => continue,
+                };
+
+            let confirmation_time = map_confirmation_time(&tx_status, tip_at_start.height);
+
+            if let Err(failure) = update.insert_tx(tx, confirmation_time) {
+                use bdk_chain::{chain_graph::InsertTxError, sparse_chain::InsertTxError::*};
+                match failure {
+                    InsertTxError::Chain(TxTooHigh { .. }) => {
+                        unreachable!("chain position already checked earlier")
+                    }
+                    InsertTxError::Chain(TxMovedUnexpectedly { .. })
+                    | InsertTxError::UnresolvableConflict(_) => {
+                        /* implies reorg during scan. We deal with that below */
+                    }
+                }
+            }
+        }
+
+        for op in outpoints.into_iter() {
+            let mut op_txs = Vec::with_capacity(2);
+            if let (Some(tx), Some(tx_status)) = (
+                self.get_tx(&op.txid).await?,
+                self.get_tx_status(&op.txid).await?,
+            ) {
+                op_txs.push((tx, tx_status));
+                if let Some(OutputStatus {
+                    txid: Some(txid),
+                    status: Some(spend_status),
+                    ..
+                }) = self.get_output_status(&op.txid, op.vout as _).await?
+                {
+                    if let Some(spend_tx) = self.get_tx(&txid).await? {
+                        op_txs.push((spend_tx, spend_status));
+                    }
+                }
+            }
+
+            for (tx, status) in op_txs {
+                let confirmation_time = map_confirmation_time(&status, tip_at_start.height);
+
+                if let Err(failure) = update.insert_tx(tx, confirmation_time) {
+                    use bdk_chain::{chain_graph::InsertTxError, sparse_chain::InsertTxError::*};
+                    match failure {
+                        InsertTxError::Chain(TxTooHigh { .. }) => {
+                            unreachable!("chain position already checked earlier")
+                        }
+                        InsertTxError::Chain(TxMovedUnexpectedly { .. })
+                        | InsertTxError::UnresolvableConflict(_) => {
+                            /* implies reorg during scan. We deal with that below */
+                        }
+                    }
+                }
+            }
+        }
+
+        let reorg_occurred = {
+            if let Some(checkpoint) = update.chain().latest_checkpoint() {
+                self.get_block_hash(checkpoint.height).await? != checkpoint.hash
+            } else {
+                false
+            }
+        };
+
+        if reorg_occurred {
+            // A reorg occurred so lets find out where all the txids we found are in the chain now.
+            // XXX: collect required because of weird type naming issues
+            let txids_found = update
+                .chain()
+                .txids()
+                .map(|(_, txid)| *txid)
+                .collect::<Vec<_>>();
+            scan.update = EsploraAsyncExt::scan_without_keychain(
+                self,
+                local_chain,
+                [],
+                txids_found,
+                [],
+                parallel_requests,
+            )
+            .await?;
+        }
+
+        Ok(scan)
     }
 }