[dependencies]
bdk_chain = { path = "../chain", version = "0.17.0", default-features = false }
-esplora-client = { version = "0.8.0", default-features = false }
+esplora-client = { version = "0.9.0", default-features = false }
async-trait = { version = "0.1.66", optional = true }
futures = { version = "0.3.26", optional = true }
use std::collections::BTreeSet;
+use std::usize;
use async_trait::async_trait;
use bdk_chain::spk_client::{FullScanRequest, FullScanResult, SyncRequest, SyncResult};
BlockId, ConfirmationBlockTime, TxGraph,
};
use bdk_chain::{Anchor, Indexed};
-use esplora_client::{Amount, TxStatus};
+use esplora_client::{Amount, Tx, TxStatus};
use futures::{stream::FuturesOrdered, TryStreamExt};
use crate::anchor_from_status;
/// Trait to extend the functionality of [`esplora_client::AsyncClient`].
///
-/// Refer to [crate-level documentation] for more.
-///
-/// [crate-level documentation]: crate
+/// Refer to [crate-level documentation](crate) for more.
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait EsploraAsyncExt {
/// Scan keychain scripts for transactions against Esplora, returning an update that can be
/// applied to the receiving structures.
///
- /// - `request`: struct with data required to perform a spk-based blockchain client full scan,
- /// see [`FullScanRequest`]
- ///
- /// The full scan for each keychain stops after a gap of `stop_gap` script pubkeys with no
- /// associated transactions. `parallel_requests` specifies the max number of HTTP requests to
- /// make in parallel.
- ///
- /// ## Note
- ///
- /// `stop_gap` is defined as "the maximum number of consecutive unused addresses".
- /// For example, with a `stop_gap` of 3, `full_scan` will keep scanning
- /// until it encounters 3 consecutive script pubkeys with no associated transactions.
+ /// `request` provides the data required to perform a script-pubkey-based full scan
+ /// (see [`FullScanRequest`]). The full scan for each keychain (`K`) stops after a gap of
+ /// `stop_gap` script pubkeys with no associated transactions. `parallel_requests` specifies
+ /// the maximum number of HTTP requests to make in parallel.
///
- /// This follows the same approach as other Bitcoin-related software,
- /// such as [Electrum](https://electrum.readthedocs.io/en/latest/faq.html#what-is-the-gap-limit),
- /// [BTCPay Server](https://docs.btcpayserver.org/FAQ/Wallet/#the-gap-limit-problem),
- /// and [Sparrow](https://www.sparrowwallet.com/docs/faq.html#ive-restored-my-wallet-but-some-of-my-funds-are-missing).
- ///
- /// A `stop_gap` of 0 will be treated as a `stop_gap` of 1.
+ /// Refer to [crate-level docs](crate) for more.
async fn full_scan<K: Ord + Clone + Send>(
&self,
request: FullScanRequest<K>,
parallel_requests: usize,
) -> Result<FullScanResult<K>, Error>;
- /// Sync a set of scripts with the blockchain (via an Esplora client) for the data
- /// specified and return a [`TxGraph`].
- ///
- /// - `request`: struct with data required to perform a spk-based blockchain client sync, see
- /// [`SyncRequest`]
+ /// Sync a set of scripts, txids, and/or outpoints against Esplora.
///
- /// If the scripts to sync are unknown, such as when restoring or importing a keychain that
- /// may include scripts that have been used, use [`full_scan`] with the keychain.
+ /// `request` provides the data required to perform a script-pubkey-based sync (see
+ /// [`SyncRequest`]). `parallel_requests` specifies the maximum number of HTTP requests to make
+ /// in parallel.
///
- /// [`full_scan`]: EsploraAsyncExt::full_scan
+ /// Refer to [crate-level docs](crate) for more.
async fn sync(
&self,
request: SyncRequest,
parallel_requests: usize,
) -> Result<SyncResult, Error>;
+
+ /// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning
+ /// `keychain_spks` against Esplora.
+ ///
+ /// `keychain_spks` is an *unbounded* indexed-[`ScriptBuf`] iterator that represents scripts
+ /// derived from a keychain. The scanning logic stops after a `stop_gap` number of consecutive
+ /// scripts with no transaction history is reached. `parallel_requests` specifies the maximum
+ /// number of HTTP requests to make in parallel.
+ ///
+ /// A [`TxGraph`] (containing the fetched transactions and anchors) and the last active
+ /// keychain index (if any) is returned. The last active keychain index is the keychain's last
+ /// script pubkey that contains a non-empty transaction history.
+ ///
+ /// Refer to [crate-level docs](crate) for more.
+ async fn fetch_txs_with_keychain_spks<I: Iterator<Item = Indexed<ScriptBuf>> + Send>(
+ &self,
+ keychain_spks: I,
+ stop_gap: usize,
+ parallel_requests: usize,
+ ) -> Result<(TxGraph<ConfirmationBlockTime>, Option<u32>), Error>;
+
+ /// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning `spks`
+ /// against Esplora.
+ ///
+ /// Unlike with [`EsploraAsyncExt::fetch_txs_with_keychain_spks`], `spks` must be *bounded* as
+ /// all contained scripts will be scanned. `parallel_requests` specifies the maximum number of
+ /// HTTP requests to make in parallel.
+ ///
+ /// Refer to [crate-level docs](crate) for more.
+ async fn fetch_txs_with_spks<I: IntoIterator<Item = ScriptBuf> + Send>(
+ &self,
+ spks: I,
+ parallel_requests: usize,
+ ) -> Result<TxGraph<ConfirmationBlockTime>, Error>
+ where
+ I::IntoIter: ExactSizeIterator + Send;
+
+ /// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning `txids`
+ /// against Esplora.
+ ///
+ /// `parallel_requests` specifies the maximum number of HTTP requests to make in parallel.
+ ///
+ /// Refer to [crate-level docs](crate) for more.
+ async fn fetch_txs_with_txids<I: IntoIterator<Item = Txid> + Send>(
+ &self,
+ txids: I,
+ parallel_requests: usize,
+ ) -> Result<TxGraph<ConfirmationBlockTime>, Error>
+ where
+ I::IntoIter: ExactSizeIterator + Send;
+
+ /// Fetch transactions and [`ConfirmationBlockTime`]s that contain and spend the provided
+ /// `outpoints`.
+ ///
+ /// `parallel_requests` specifies the maximum number of HTTP requests to make in parallel.
+ ///
+ /// Refer to [crate-level docs](crate) for more.
+ async fn fetch_txs_with_outpoints<I: IntoIterator<Item = OutPoint> + Send>(
+ &self,
+ outpoints: I,
+ parallel_requests: usize,
+ ) -> Result<TxGraph<ConfirmationBlockTime>, Error>
+ where
+ I::IntoIter: ExactSizeIterator + Send;
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
parallel_requests: usize,
) -> Result<FullScanResult<K>, Error> {
let latest_blocks = fetch_latest_blocks(self).await?;
- let (graph_update, last_active_indices) = full_scan_for_index_and_graph(
- self,
- request.spks_by_keychain,
- stop_gap,
- parallel_requests,
- )
- .await?;
+ let mut graph_update = TxGraph::default();
+ let mut last_active_indices = BTreeMap::<K, u32>::new();
+ for (keychain, keychain_spks) in request.spks_by_keychain {
+ let (tx_graph, last_active_index) = self
+ .fetch_txs_with_keychain_spks(keychain_spks, stop_gap, parallel_requests)
+ .await?;
+ let _ = graph_update.apply_update(tx_graph);
+ if let Some(last_active_index) = last_active_index {
+ last_active_indices.insert(keychain, last_active_index);
+ }
+ }
let chain_update = chain_update(
self,
&latest_blocks,
parallel_requests: usize,
) -> Result<SyncResult, Error> {
let latest_blocks = fetch_latest_blocks(self).await?;
- let graph_update = sync_for_index_and_graph(
- self,
- request.spks,
- request.txids,
- request.outpoints,
- parallel_requests,
- )
- .await?;
+ let mut graph_update = TxGraph::default();
+ let _ = graph_update.apply_update(
+ self.fetch_txs_with_spks(request.spks, parallel_requests)
+ .await?,
+ );
+ let _ = graph_update.apply_update(
+ self.fetch_txs_with_txids(request.txids, parallel_requests)
+ .await?,
+ );
+ let _ = graph_update.apply_update(
+ self.fetch_txs_with_outpoints(request.outpoints, parallel_requests)
+ .await?,
+ );
let chain_update = chain_update(
self,
&latest_blocks,
graph_update,
})
}
+
+ async fn fetch_txs_with_keychain_spks<I: Iterator<Item = Indexed<ScriptBuf>> + Send>(
+ &self,
+ mut keychain_spks: I,
+ stop_gap: usize,
+ parallel_requests: usize,
+ ) -> Result<(TxGraph<ConfirmationBlockTime>, Option<u32>), Error> {
+ type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>);
+
+ let mut tx_graph = TxGraph::default();
+ let mut last_index = Option::<u32>::None;
+ let mut last_active_index = Option::<u32>::None;
+
+ loop {
+ let handles = keychain_spks
+ .by_ref()
+ .take(parallel_requests)
+ .map(|(spk_index, spk)| {
+ let client = self.clone();
+ async move {
+ let mut last_seen = None;
+ let mut spk_txs = Vec::new();
+ loop {
+ let txs = client.scripthash_txs(&spk, last_seen).await?;
+ let tx_count = txs.len();
+ last_seen = txs.last().map(|tx| tx.txid);
+ spk_txs.extend(txs);
+ if tx_count < 25 {
+ break Result::<_, Error>::Ok((spk_index, spk_txs));
+ }
+ }
+ }
+ })
+ .collect::<FuturesOrdered<_>>();
+
+ if handles.is_empty() {
+ break;
+ }
+
+ for (index, txs) in handles.try_collect::<Vec<TxsOfSpkIndex>>().await? {
+ last_index = Some(index);
+ if !txs.is_empty() {
+ last_active_index = Some(index);
+ }
+ for tx in txs {
+ let _ = tx_graph.insert_tx(tx.to_tx());
+ if let Some(anchor) = anchor_from_status(&tx.status) {
+ let _ = tx_graph.insert_anchor(tx.txid, anchor);
+ }
+
+ let previous_outputs = tx.vin.iter().filter_map(|vin| {
+ let prevout = vin.prevout.as_ref()?;
+ Some((
+ OutPoint {
+ txid: vin.txid,
+ vout: vin.vout,
+ },
+ TxOut {
+ script_pubkey: prevout.scriptpubkey.clone(),
+ value: Amount::from_sat(prevout.value),
+ },
+ ))
+ });
+
+ for (outpoint, txout) in previous_outputs {
+ let _ = tx_graph.insert_txout(outpoint, txout);
+ }
+ }
+ }
+
+ let last_index = last_index.expect("Must be set since handles wasn't empty.");
+ let gap_limit_reached = if let Some(i) = last_active_index {
+ last_index >= i.saturating_add(stop_gap as u32)
+ } else {
+ last_index + 1 >= stop_gap as u32
+ };
+ if gap_limit_reached {
+ break;
+ }
+ }
+
+ Ok((tx_graph, last_active_index))
+ }
+
+ async fn fetch_txs_with_spks<I: IntoIterator<Item = ScriptBuf> + Send>(
+ &self,
+ spks: I,
+ parallel_requests: usize,
+ ) -> Result<TxGraph<ConfirmationBlockTime>, Error>
+ where
+ I::IntoIter: ExactSizeIterator + Send,
+ {
+ self.fetch_txs_with_keychain_spks(
+ spks.into_iter().enumerate().map(|(i, spk)| (i as u32, spk)),
+ usize::MAX,
+ parallel_requests,
+ )
+ .await
+ .map(|(tx_graph, _)| tx_graph)
+ }
+
+ async fn fetch_txs_with_txids<I: IntoIterator<Item = Txid> + Send>(
+ &self,
+ txids: I,
+ parallel_requests: usize,
+ ) -> Result<TxGraph<ConfirmationBlockTime>, Error>
+ where
+ I::IntoIter: ExactSizeIterator + Send,
+ {
+ enum EsploraResp {
+ TxStatus(TxStatus),
+ Tx(Option<Tx>),
+ }
+
+ let mut tx_graph = TxGraph::default();
+ let mut txids = txids.into_iter();
+ loop {
+ let handles = txids
+ .by_ref()
+ .take(parallel_requests)
+ .map(|txid| {
+ let client = self.clone();
+ let tx_already_exists = tx_graph.get_tx(txid).is_some();
+ async move {
+ if tx_already_exists {
+ client
+ .get_tx_status(&txid)
+ .await
+ .map(|s| (txid, EsploraResp::TxStatus(s)))
+ } else {
+ client
+ .get_tx_info(&txid)
+ .await
+ .map(|t| (txid, EsploraResp::Tx(t)))
+ }
+ }
+ })
+ .collect::<FuturesOrdered<_>>();
+
+ if handles.is_empty() {
+ break;
+ }
+
+ for (txid, resp) in handles.try_collect::<Vec<_>>().await? {
+ match resp {
+ EsploraResp::TxStatus(status) => {
+ if let Some(anchor) = anchor_from_status(&status) {
+ let _ = tx_graph.insert_anchor(txid, anchor);
+ }
+ }
+ EsploraResp::Tx(Some(tx_info)) => {
+ let _ = tx_graph.insert_tx(tx_info.to_tx());
+ if let Some(anchor) = anchor_from_status(&tx_info.status) {
+ let _ = tx_graph.insert_anchor(txid, anchor);
+ }
+ }
+ _ => continue,
+ }
+ }
+ }
+ Ok(tx_graph)
+ }
+
+ async fn fetch_txs_with_outpoints<I: IntoIterator<Item = OutPoint> + Send>(
+ &self,
+ outpoints: I,
+ parallel_requests: usize,
+ ) -> Result<TxGraph<ConfirmationBlockTime>, Error>
+ where
+ I::IntoIter: ExactSizeIterator + Send,
+ {
+ let outpoints = outpoints.into_iter().collect::<Vec<_>>();
+
+ // make sure txs exists in graph and tx statuses are updated
+ // TODO: We should maintain a tx cache (like we do with Electrum).
+ let mut tx_graph = self
+ .fetch_txs_with_txids(outpoints.iter().map(|op| op.txid), parallel_requests)
+ .await?;
+
+ // get outpoint spend-statuses
+ let mut outpoints = outpoints.into_iter();
+ let mut missing_txs = Vec::<Txid>::with_capacity(outpoints.len());
+ loop {
+ let handles = outpoints
+ .by_ref()
+ .take(parallel_requests)
+ .map(|op| {
+ let client = self.clone();
+ async move { client.get_output_status(&op.txid, op.vout as _).await }
+ })
+ .collect::<FuturesOrdered<_>>();
+
+ if handles.is_empty() {
+ break;
+ }
+
+ for op_status in handles.try_collect::<Vec<_>>().await?.into_iter().flatten() {
+ let spend_txid = match op_status.txid {
+ Some(txid) => txid,
+ None => continue,
+ };
+ if tx_graph.get_tx(spend_txid).is_none() {
+ missing_txs.push(spend_txid);
+ }
+ if let Some(spend_status) = op_status.status {
+ if let Some(spend_anchor) = anchor_from_status(&spend_status) {
+ let _ = tx_graph.insert_anchor(spend_txid, spend_anchor);
+ }
+ }
+ }
+ }
+
+ let _ = tx_graph.apply_update(
+ self.fetch_txs_with_txids(missing_txs, parallel_requests)
+ .await?,
+ );
+ Ok(tx_graph)
+ }
}
/// Fetch latest blocks from Esplora in an atomic call.
Ok(tip)
}
-/// This performs a full scan to get an update for the [`TxGraph`] and
-/// [`KeychainTxOutIndex`](bdk_chain::indexer::keychain_txout::KeychainTxOutIndex).
-async fn full_scan_for_index_and_graph<K: Ord + Clone + Send>(
- client: &esplora_client::AsyncClient,
- keychain_spks: BTreeMap<
- K,
- impl IntoIterator<IntoIter = impl Iterator<Item = Indexed<ScriptBuf>> + Send> + Send,
- >,
- stop_gap: usize,
- parallel_requests: usize,
-) -> Result<(TxGraph<ConfirmationBlockTime>, BTreeMap<K, u32>), Error> {
- type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>);
- let parallel_requests = Ord::max(parallel_requests, 1);
- let mut graph = TxGraph::<ConfirmationBlockTime>::default();
- let mut last_active_indexes = BTreeMap::<K, u32>::new();
-
- for (keychain, spks) in keychain_spks {
- let mut spks = spks.into_iter();
- let mut last_index = Option::<u32>::None;
- let mut last_active_index = Option::<u32>::None;
-
- loop {
- let handles = spks
- .by_ref()
- .take(parallel_requests)
- .map(|(spk_index, spk)| {
- let client = client.clone();
- async move {
- let mut last_seen = None;
- let mut spk_txs = Vec::new();
- loop {
- let txs = client.scripthash_txs(&spk, last_seen).await?;
- let tx_count = txs.len();
- last_seen = txs.last().map(|tx| tx.txid);
- spk_txs.extend(txs);
- if tx_count < 25 {
- break Result::<_, Error>::Ok((spk_index, spk_txs));
- }
- }
- }
- })
- .collect::<FuturesOrdered<_>>();
-
- if handles.is_empty() {
- break;
- }
-
- for (index, txs) in handles.try_collect::<Vec<TxsOfSpkIndex>>().await? {
- last_index = Some(index);
- if !txs.is_empty() {
- last_active_index = Some(index);
- }
- for tx in txs {
- let _ = graph.insert_tx(tx.to_tx());
- if let Some(anchor) = anchor_from_status(&tx.status) {
- let _ = graph.insert_anchor(tx.txid, anchor);
- }
-
- let previous_outputs = tx.vin.iter().filter_map(|vin| {
- let prevout = vin.prevout.as_ref()?;
- Some((
- OutPoint {
- txid: vin.txid,
- vout: vin.vout,
- },
- TxOut {
- script_pubkey: prevout.scriptpubkey.clone(),
- value: Amount::from_sat(prevout.value),
- },
- ))
- });
-
- for (outpoint, txout) in previous_outputs {
- let _ = graph.insert_txout(outpoint, txout);
- }
- }
- }
-
- let last_index = last_index.expect("Must be set since handles wasn't empty.");
- let gap_limit_reached = if let Some(i) = last_active_index {
- last_index >= i.saturating_add(stop_gap as u32)
- } else {
- last_index + 1 >= stop_gap as u32
- };
- if gap_limit_reached {
- break;
- }
- }
-
- if let Some(last_active_index) = last_active_index {
- last_active_indexes.insert(keychain, last_active_index);
- }
- }
-
- Ok((graph, last_active_indexes))
-}
-
-async fn sync_for_index_and_graph(
- client: &esplora_client::AsyncClient,
- misc_spks: impl IntoIterator<IntoIter = impl Iterator<Item = ScriptBuf> + Send> + Send,
- txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send> + Send,
- outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
- parallel_requests: usize,
-) -> Result<TxGraph<ConfirmationBlockTime>, Error> {
- let mut graph = full_scan_for_index_and_graph(
- client,
- [(
- (),
- misc_spks
- .into_iter()
- .enumerate()
- .map(|(i, spk)| (i as u32, spk)),
- )]
- .into(),
- usize::MAX,
- parallel_requests,
- )
- .await
- .map(|(g, _)| g)?;
-
- let mut txids = txids.into_iter();
- loop {
- let handles = txids
- .by_ref()
- .take(parallel_requests)
- .filter(|&txid| graph.get_tx(txid).is_none())
- .map(|txid| {
- let client = client.clone();
- async move { client.get_tx_status(&txid).await.map(|s| (txid, s)) }
- })
- .collect::<FuturesOrdered<_>>();
-
- if handles.is_empty() {
- break;
- }
-
- for (txid, status) in handles.try_collect::<Vec<(Txid, TxStatus)>>().await? {
- if let Some(anchor) = anchor_from_status(&status) {
- let _ = graph.insert_anchor(txid, anchor);
- }
- }
- }
-
- for op in outpoints.into_iter() {
- if graph.get_tx(op.txid).is_none() {
- if let Some(tx) = client.get_tx(&op.txid).await? {
- let _ = graph.insert_tx(tx);
- }
- let status = client.get_tx_status(&op.txid).await?;
- if let Some(anchor) = anchor_from_status(&status) {
- let _ = graph.insert_anchor(op.txid, anchor);
- }
- }
-
- if let Some(op_status) = client.get_output_status(&op.txid, op.vout as _).await? {
- if let Some(txid) = op_status.txid {
- if graph.get_tx(txid).is_none() {
- if let Some(tx) = client.get_tx(&txid).await? {
- let _ = graph.insert_tx(tx);
- }
- let status = client.get_tx_status(&txid).await?;
- if let Some(anchor) = anchor_from_status(&status) {
- let _ = graph.insert_anchor(txid, anchor);
- }
- }
- }
- }
- }
-
- Ok(graph)
-}
-
#[cfg(test)]
mod test {
use std::{collections::BTreeSet, time::Duration};
BlockId, ConfirmationBlockTime, TxGraph,
};
use bdk_chain::{Anchor, Indexed};
-use esplora_client::TxStatus;
+use esplora_client::{OutputStatus, Tx, TxStatus};
use crate::anchor_from_status;
/// Trait to extend the functionality of [`esplora_client::BlockingClient`].
///
-/// Refer to [crate-level documentation] for more.
-///
-/// [crate-level documentation]: crate
+/// Refer to [crate-level documentation](crate) for more.
pub trait EsploraExt {
/// Scan keychain scripts for transactions against Esplora, returning an update that can be
/// applied to the receiving structures.
///
- /// - `request`: struct with data required to perform a spk-based blockchain client full scan,
- /// see [`FullScanRequest`]
+ /// `request` provides the data required to perform a script-pubkey-based full scan
+ /// (see [`FullScanRequest`]). The full scan for each keychain (`K`) stops after a gap of
+ /// `stop_gap` script pubkeys with no associated transactions. `parallel_requests` specifies
+ /// the maximum number of HTTP requests to make in parallel.
///
- /// The full scan for each keychain stops after a gap of `stop_gap` script pubkeys with no
- /// associated transactions. `parallel_requests` specifies the max number of HTTP requests to
- /// make in parallel.
+ /// Refer to [crate-level docs](crate) for more.
+ fn full_scan<K: Ord + Clone>(
+ &self,
+ request: FullScanRequest<K>,
+ stop_gap: usize,
+ parallel_requests: usize,
+ ) -> Result<FullScanResult<K>, Error>;
+
+ /// Sync a set of scripts, txids, and/or outpoints against Esplora.
///
- /// ## Note
+ /// `request` provides the data required to perform a script-pubkey-based sync (see
+ /// [`SyncRequest`]). `parallel_requests` specifies the maximum number of HTTP requests to make
+ /// in parallel.
///
- /// `stop_gap` is defined as "the maximum number of consecutive unused addresses".
- /// For example, with a `stop_gap` of 3, `full_scan` will keep scanning
- /// until it encounters 3 consecutive script pubkeys with no associated transactions.
+ /// Refer to [crate-level docs](crate) for more.
+ fn sync(&self, request: SyncRequest, parallel_requests: usize) -> Result<SyncResult, Error>;
+
+ /// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning
+ /// `keychain_spks` against Esplora.
///
- /// This follows the same approach as other Bitcoin-related software,
- /// such as [Electrum](https://electrum.readthedocs.io/en/latest/faq.html#what-is-the-gap-limit),
- /// [BTCPay Server](https://docs.btcpayserver.org/FAQ/Wallet/#the-gap-limit-problem),
- /// and [Sparrow](https://www.sparrowwallet.com/docs/faq.html#ive-restored-my-wallet-but-some-of-my-funds-are-missing).
+ /// `keychain_spks` is an *unbounded* indexed-[`ScriptBuf`] iterator that represents scripts
+ /// derived from a keychain. The scanning logic stops after a `stop_gap` number of consecutive
+ /// scripts with no transaction history is reached. `parallel_requests` specifies the maximum
+ /// number of HTTP requests to make in parallel.
///
- /// A `stop_gap` of 0 will be treated as a `stop_gap` of 1.
- fn full_scan<K: Ord + Clone>(
+ /// A [`TxGraph`] (containing the fetched transactions and anchors) and the last active keychain
+ /// index (if any) is returned. The last active keychain index is the keychain's last script
+ /// pubkey that contains a non-empty transaction history.
+ ///
+ /// Refer to [crate-level docs](crate) for more.
+ fn fetch_txs_with_keychain_spks<I: Iterator<Item = Indexed<ScriptBuf>>>(
&self,
- request: FullScanRequest<K>,
+ keychain_spks: I,
stop_gap: usize,
parallel_requests: usize,
- ) -> Result<FullScanResult<K>, Error>;
+ ) -> Result<(TxGraph<ConfirmationBlockTime>, Option<u32>), Error>;
+
+ /// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning `spks`
+ /// against Esplora.
+ ///
+ /// Unlike with [`EsploraExt::fetch_txs_with_keychain_spks`], `spks` must be *bounded* as all
+ /// contained scripts will be scanned. `parallel_requests` specifies the maximum number of HTTP
+ /// requests to make in parallel.
+ ///
+ /// Refer to [crate-level docs](crate) for more.
+ fn fetch_txs_with_spks<I: IntoIterator<Item = ScriptBuf>>(
+ &self,
+ spks: I,
+ parallel_requests: usize,
+ ) -> Result<TxGraph<ConfirmationBlockTime>, Error>
+ where
+ I::IntoIter: ExactSizeIterator;
- /// Sync a set of scripts with the blockchain (via an Esplora client) for the data
- /// specified and return a [`TxGraph`].
+ /// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning `txids`
+ /// against Esplora.
///
- /// - `request`: struct with data required to perform a spk-based blockchain client sync, see
- /// [`SyncRequest`]
+ /// `parallel_requests` specifies the maximum number of HTTP requests to make in parallel.
///
- /// If the scripts to sync are unknown, such as when restoring or importing a keychain that
- /// may include scripts that have been used, use [`full_scan`] with the keychain.
+ /// Refer to [crate-level docs](crate) for more.
+ fn fetch_txs_with_txids<I: IntoIterator<Item = Txid>>(
+ &self,
+ txids: I,
+ parallel_requests: usize,
+ ) -> Result<TxGraph<ConfirmationBlockTime>, Error>
+ where
+ I::IntoIter: ExactSizeIterator;
+
+ /// Fetch transactions and [`ConfirmationBlockTime`]s that contain and spend the provided
+ /// `outpoints`.
///
- /// [`full_scan`]: EsploraExt::full_scan
- fn sync(&self, request: SyncRequest, parallel_requests: usize) -> Result<SyncResult, Error>;
+ /// `parallel_requests` specifies the maximum number of HTTP requests to make in parallel.
+ ///
+ /// Refer to [crate-level docs](crate) for more.
+ fn fetch_txs_with_outpoints<I: IntoIterator<Item = OutPoint>>(
+ &self,
+ outpoints: I,
+ parallel_requests: usize,
+ ) -> Result<TxGraph<ConfirmationBlockTime>, Error>
+ where
+ I::IntoIter: ExactSizeIterator;
}
impl EsploraExt for esplora_client::BlockingClient {
parallel_requests: usize,
) -> Result<FullScanResult<K>, Error> {
let latest_blocks = fetch_latest_blocks(self)?;
- let (graph_update, last_active_indices) = full_scan_for_index_and_graph_blocking(
- self,
- request.spks_by_keychain,
- stop_gap,
- parallel_requests,
- )?;
+ let mut graph_update = TxGraph::default();
+ let mut last_active_indices = BTreeMap::<K, u32>::new();
+ for (keychain, keychain_spks) in request.spks_by_keychain {
+ let (tx_graph, last_active_index) =
+ self.fetch_txs_with_keychain_spks(keychain_spks, stop_gap, parallel_requests)?;
+ let _ = graph_update.apply_update(tx_graph);
+ if let Some(last_active_index) = last_active_index {
+ last_active_indices.insert(keychain, last_active_index);
+ }
+ }
let chain_update = chain_update(
self,
&latest_blocks,
fn sync(&self, request: SyncRequest, parallel_requests: usize) -> Result<SyncResult, Error> {
let latest_blocks = fetch_latest_blocks(self)?;
- let graph_update = sync_for_index_and_graph_blocking(
- self,
- request.spks,
- request.txids,
- request.outpoints,
- parallel_requests,
- )?;
+ let mut graph_update = TxGraph::default();
+ let _ =
+ graph_update.apply_update(self.fetch_txs_with_spks(request.spks, parallel_requests)?);
+ let _ =
+ graph_update.apply_update(self.fetch_txs_with_txids(request.txids, parallel_requests)?);
+ let _ = graph_update
+ .apply_update(self.fetch_txs_with_outpoints(request.outpoints, parallel_requests)?);
let chain_update = chain_update(
self,
&latest_blocks,
graph_update,
})
}
+
+ fn fetch_txs_with_keychain_spks<I: Iterator<Item = Indexed<ScriptBuf>>>(
+ &self,
+ mut keychain_spks: I,
+ stop_gap: usize,
+ parallel_requests: usize,
+ ) -> Result<(TxGraph<ConfirmationBlockTime>, Option<u32>), Error> {
+ type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>);
+
+ let mut tx_graph = TxGraph::default();
+ let mut last_index = Option::<u32>::None;
+ let mut last_active_index = Option::<u32>::None;
+
+ loop {
+ let handles = keychain_spks
+ .by_ref()
+ .take(parallel_requests)
+ .map(|(spk_index, spk)| {
+ std::thread::spawn({
+ let client = self.clone();
+ move || -> Result<TxsOfSpkIndex, Error> {
+ let mut last_seen = None;
+ let mut spk_txs = Vec::new();
+ loop {
+ let txs = client.scripthash_txs(&spk, last_seen)?;
+ let tx_count = txs.len();
+ last_seen = txs.last().map(|tx| tx.txid);
+ spk_txs.extend(txs);
+ if tx_count < 25 {
+ break Ok((spk_index, spk_txs));
+ }
+ }
+ }
+ })
+ })
+ .collect::<Vec<JoinHandle<Result<TxsOfSpkIndex, Error>>>>();
+
+ if handles.is_empty() {
+ break;
+ }
+
+ for handle in handles {
+ let (index, txs) = handle.join().expect("thread must not panic")?;
+ last_index = Some(index);
+ if !txs.is_empty() {
+ last_active_index = Some(index);
+ }
+ for tx in txs {
+ let _ = tx_graph.insert_tx(tx.to_tx());
+ if let Some(anchor) = anchor_from_status(&tx.status) {
+ let _ = tx_graph.insert_anchor(tx.txid, anchor);
+ }
+
+ let previous_outputs = tx.vin.iter().filter_map(|vin| {
+ let prevout = vin.prevout.as_ref()?;
+ Some((
+ OutPoint {
+ txid: vin.txid,
+ vout: vin.vout,
+ },
+ TxOut {
+ script_pubkey: prevout.scriptpubkey.clone(),
+ value: Amount::from_sat(prevout.value),
+ },
+ ))
+ });
+
+ for (outpoint, txout) in previous_outputs {
+ let _ = tx_graph.insert_txout(outpoint, txout);
+ }
+ }
+ }
+
+ let last_index = last_index.expect("Must be set since handles wasn't empty.");
+ let gap_limit_reached = if let Some(i) = last_active_index {
+ last_index >= i.saturating_add(stop_gap as u32)
+ } else {
+ last_index + 1 >= stop_gap as u32
+ };
+ if gap_limit_reached {
+ break;
+ }
+ }
+
+ Ok((tx_graph, last_active_index))
+ }
+
+ fn fetch_txs_with_spks<I: IntoIterator<Item = ScriptBuf>>(
+ &self,
+ spks: I,
+ parallel_requests: usize,
+ ) -> Result<TxGraph<ConfirmationBlockTime>, Error>
+ where
+ I::IntoIter: ExactSizeIterator,
+ {
+ self.fetch_txs_with_keychain_spks(
+ spks.into_iter().enumerate().map(|(i, spk)| (i as u32, spk)),
+ usize::MAX,
+ parallel_requests,
+ )
+ .map(|(tx_graph, _)| tx_graph)
+ }
+
+ fn fetch_txs_with_txids<I: IntoIterator<Item = Txid>>(
+ &self,
+ txids: I,
+ parallel_requests: usize,
+ ) -> Result<TxGraph<ConfirmationBlockTime>, Error>
+ where
+ I::IntoIter: ExactSizeIterator,
+ {
+ enum EsploraResp {
+ TxStatus(TxStatus),
+ Tx(Option<Tx>),
+ }
+
+ let mut tx_graph = TxGraph::default();
+ let mut txids = txids.into_iter();
+ loop {
+ let handles = txids
+ .by_ref()
+ .take(parallel_requests)
+ .map(|txid| {
+ let client = self.clone();
+ let tx_already_exists = tx_graph.get_tx(txid).is_some();
+ std::thread::spawn(move || {
+ if tx_already_exists {
+ client
+ .get_tx_status(&txid)
+ .map_err(Box::new)
+ .map(|s| (txid, EsploraResp::TxStatus(s)))
+ } else {
+ client
+ .get_tx_info(&txid)
+ .map_err(Box::new)
+ .map(|t| (txid, EsploraResp::Tx(t)))
+ }
+ })
+ })
+ .collect::<Vec<JoinHandle<Result<(Txid, EsploraResp), Error>>>>();
+
+ if handles.is_empty() {
+ break;
+ }
+
+ for handle in handles {
+ let (txid, resp) = handle.join().expect("thread must not panic")?;
+ match resp {
+ EsploraResp::TxStatus(status) => {
+ if let Some(anchor) = anchor_from_status(&status) {
+ let _ = tx_graph.insert_anchor(txid, anchor);
+ }
+ }
+ EsploraResp::Tx(Some(tx_info)) => {
+ let _ = tx_graph.insert_tx(tx_info.to_tx());
+ if let Some(anchor) = anchor_from_status(&tx_info.status) {
+ let _ = tx_graph.insert_anchor(txid, anchor);
+ }
+ }
+ _ => continue,
+ }
+ }
+ }
+ Ok(tx_graph)
+ }
+
+ fn fetch_txs_with_outpoints<I: IntoIterator<Item = OutPoint>>(
+ &self,
+ outpoints: I,
+ parallel_requests: usize,
+ ) -> Result<TxGraph<ConfirmationBlockTime>, Error>
+ where
+ I::IntoIter: ExactSizeIterator,
+ {
+ let outpoints = outpoints.into_iter().collect::<Vec<_>>();
+
+ // make sure txs exists in graph and tx statuses are updated
+ // TODO: We should maintain a tx cache (like we do with Electrum).
+ let mut tx_graph =
+ self.fetch_txs_with_txids(outpoints.iter().map(|op| op.txid), parallel_requests)?;
+
+ // get outpoint spend-statuses
+ let mut outpoints = outpoints.into_iter();
+ let mut missing_txs = Vec::<Txid>::with_capacity(outpoints.len());
+ loop {
+ let handles = outpoints
+ .by_ref()
+ .take(parallel_requests)
+ .map(|op| {
+ let client = self.clone();
+ std::thread::spawn(move || {
+ client
+ .get_output_status(&op.txid, op.vout as _)
+ .map_err(Box::new)
+ })
+ })
+ .collect::<Vec<JoinHandle<Result<Option<OutputStatus>, Error>>>>();
+
+ if handles.is_empty() {
+ break;
+ }
+
+ for handle in handles {
+ if let Some(op_status) = handle.join().expect("thread must not panic")? {
+ let spend_txid = match op_status.txid {
+ Some(txid) => txid,
+ None => continue,
+ };
+ if tx_graph.get_tx(spend_txid).is_none() {
+ missing_txs.push(spend_txid);
+ }
+ if let Some(spend_status) = op_status.status {
+ if let Some(spend_anchor) = anchor_from_status(&spend_status) {
+ let _ = tx_graph.insert_anchor(spend_txid, spend_anchor);
+ }
+ }
+ }
+ }
+ }
+
+ let _ = tx_graph.apply_update(self.fetch_txs_with_txids(missing_txs, parallel_requests)?);
+ Ok(tx_graph)
+ }
}
/// Fetch latest blocks from Esplora in an atomic call.
Ok(tip)
}
-/// This performs a full scan to get an update for the [`TxGraph`] and
-/// [`KeychainTxOutIndex`](bdk_chain::indexer::keychain_txout::KeychainTxOutIndex).
-fn full_scan_for_index_and_graph_blocking<K: Ord + Clone>(
- client: &esplora_client::BlockingClient,
- keychain_spks: BTreeMap<K, impl IntoIterator<Item = Indexed<ScriptBuf>>>,
- stop_gap: usize,
- parallel_requests: usize,
-) -> Result<(TxGraph<ConfirmationBlockTime>, BTreeMap<K, u32>), Error> {
- type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>);
- let parallel_requests = Ord::max(parallel_requests, 1);
- let mut tx_graph = TxGraph::<ConfirmationBlockTime>::default();
- let mut last_active_indices = BTreeMap::<K, u32>::new();
-
- for (keychain, spks) in keychain_spks {
- let mut spks = spks.into_iter();
- let mut last_index = Option::<u32>::None;
- let mut last_active_index = Option::<u32>::None;
-
- loop {
- let handles = spks
- .by_ref()
- .take(parallel_requests)
- .map(|(spk_index, spk)| {
- std::thread::spawn({
- let client = client.clone();
- move || -> Result<TxsOfSpkIndex, Error> {
- let mut last_seen = None;
- let mut spk_txs = Vec::new();
- loop {
- let txs = client.scripthash_txs(&spk, last_seen)?;
- let tx_count = txs.len();
- last_seen = txs.last().map(|tx| tx.txid);
- spk_txs.extend(txs);
- if tx_count < 25 {
- break Ok((spk_index, spk_txs));
- }
- }
- }
- })
- })
- .collect::<Vec<JoinHandle<Result<TxsOfSpkIndex, Error>>>>();
-
- if handles.is_empty() {
- break;
- }
-
- for handle in handles {
- let (index, txs) = handle.join().expect("thread must not panic")?;
- last_index = Some(index);
- if !txs.is_empty() {
- last_active_index = Some(index);
- }
- for tx in txs {
- let _ = tx_graph.insert_tx(tx.to_tx());
- if let Some(anchor) = anchor_from_status(&tx.status) {
- let _ = tx_graph.insert_anchor(tx.txid, anchor);
- }
-
- let previous_outputs = tx.vin.iter().filter_map(|vin| {
- let prevout = vin.prevout.as_ref()?;
- Some((
- OutPoint {
- txid: vin.txid,
- vout: vin.vout,
- },
- TxOut {
- script_pubkey: prevout.scriptpubkey.clone(),
- value: Amount::from_sat(prevout.value),
- },
- ))
- });
-
- for (outpoint, txout) in previous_outputs {
- let _ = tx_graph.insert_txout(outpoint, txout);
- }
- }
- }
-
- let last_index = last_index.expect("Must be set since handles wasn't empty.");
- let gap_limit_reached = if let Some(i) = last_active_index {
- last_index >= i.saturating_add(stop_gap as u32)
- } else {
- last_index + 1 >= stop_gap as u32
- };
- if gap_limit_reached {
- break;
- }
- }
-
- if let Some(last_active_index) = last_active_index {
- last_active_indices.insert(keychain, last_active_index);
- }
- }
-
- Ok((tx_graph, last_active_indices))
-}
-
-fn sync_for_index_and_graph_blocking(
- client: &esplora_client::BlockingClient,
- misc_spks: impl IntoIterator<Item = ScriptBuf>,
- txids: impl IntoIterator<Item = Txid>,
- outpoints: impl IntoIterator<Item = OutPoint>,
- parallel_requests: usize,
-) -> Result<TxGraph<ConfirmationBlockTime>, Error> {
- let (mut tx_graph, _) = full_scan_for_index_and_graph_blocking(
- client,
- {
- let mut keychains = BTreeMap::new();
- keychains.insert(
- (),
- misc_spks
- .into_iter()
- .enumerate()
- .map(|(i, spk)| (i as u32, spk)),
- );
- keychains
- },
- usize::MAX,
- parallel_requests,
- )?;
-
- let mut txids = txids.into_iter();
- loop {
- let handles = txids
- .by_ref()
- .take(parallel_requests)
- .filter(|&txid| tx_graph.get_tx(txid).is_none())
- .map(|txid| {
- std::thread::spawn({
- let client = client.clone();
- move || {
- client
- .get_tx_status(&txid)
- .map_err(Box::new)
- .map(|s| (txid, s))
- }
- })
- })
- .collect::<Vec<JoinHandle<Result<(Txid, TxStatus), Error>>>>();
-
- if handles.is_empty() {
- break;
- }
-
- for handle in handles {
- let (txid, status) = handle.join().expect("thread must not panic")?;
- if let Some(anchor) = anchor_from_status(&status) {
- let _ = tx_graph.insert_anchor(txid, anchor);
- }
- }
- }
-
- for op in outpoints {
- if tx_graph.get_tx(op.txid).is_none() {
- if let Some(tx) = client.get_tx(&op.txid)? {
- let _ = tx_graph.insert_tx(tx);
- }
- let status = client.get_tx_status(&op.txid)?;
- if let Some(anchor) = anchor_from_status(&status) {
- let _ = tx_graph.insert_anchor(op.txid, anchor);
- }
- }
-
- if let Some(op_status) = client.get_output_status(&op.txid, op.vout as _)? {
- if let Some(txid) = op_status.txid {
- if tx_graph.get_tx(txid).is_none() {
- if let Some(tx) = client.get_tx(&txid)? {
- let _ = tx_graph.insert_tx(tx);
- }
- let status = client.get_tx_status(&txid)?;
- if let Some(anchor) = anchor_from_status(&status) {
- let _ = tx_graph.insert_anchor(txid, anchor);
- }
- }
- }
- }
- }
-
- Ok(tx_graph)
-}
-
#[cfg(test)]
mod test {
use crate::blocking_ext::{chain_update, fetch_latest_blocks};
#![doc = include_str!("../README.md")]
-//! This crate is used for updating structures of [`bdk_chain`] with data from an Esplora server.
+//! # Primary Methods
//!
-//! The two primary methods are [`EsploraExt::sync`] and [`EsploraExt::full_scan`]. In most cases
-//! [`EsploraExt::sync`] is used to sync the transaction histories of scripts that the application
-//! cares about, for example the scripts for all the receive addresses of a Wallet's keychain that it
-//! has shown a user. [`EsploraExt::full_scan`] is meant to be used when importing or restoring a
-//! keychain where the range of possibly used scripts is not known. In this case it is necessary to
-//! scan all keychain scripts until a number (the "stop gap") of unused scripts is discovered. For a
-//! sync or full scan the user receives relevant blockchain data and output updates for [`bdk_chain`]
-//! via a new [`TxGraph`] to be appended to any existing [`TxGraph`] data.
+//! The two primary methods are [`EsploraExt::sync`] and [`EsploraExt::full_scan`].
//!
-//! Refer to [`example_esplora`] for a complete example.
+//! [`EsploraExt::sync`] is used to sync against a subset of wallet data. For example, transaction
+//! histories of revealed and unused script from the external (public) keychain and/or statuses of
+//! wallet-owned UTXOs and spending transactions from them. The policy of what to sync against can
+//! be customized.
+//!
+//! [`EsploraExt::full_scan`] is designed to be used when importing or restoring a keychain where
+//! the range of possibly used scripts is not known. In this case it is necessary to scan all
+//! keychain scripts until a number (the `stop_gap`) of unused scripts is discovered.
+//!
+//! For a sync or full scan, the user receives relevant blockchain data and output updates for
+//! [`bdk_chain`] .
+//!
+//! # Low-Level Methods
+//!
+//! [`EsploraExt::sync`] and [`EsploraExt::full_scan`] returns updates which are *complete* and can
+//! be used directly to determine confirmation statuses of each transaction. This is because a
+//! [`LocalChain`] update is contained in the returned update structures. However, sometimes the
+//! caller wishes to use a custom [`ChainOracle`] implementation (something other than
+//! [`LocalChain`]). The following methods ONLY returns an update [`TxGraph`]:
+//!
+//! * [`EsploraExt::fetch_txs_with_keychain_spks`]
+//! * [`EsploraExt::fetch_txs_with_spks`]
+//! * [`EsploraExt::fetch_txs_with_txids`]
+//! * [`EsploraExt::fetch_txs_with_outpoints`]
+//!
+//! # Stop Gap
+//!
+//! Methods [`EsploraExt::full_scan`] and [`EsploraExt::fetch_txs_with_keychain_spks`] takes in a
+//! `stop_gap` input which is defined as the maximum number of consecutive unused script pubkeys to
+//! scan transactions for before stopping.
+//!
+//! For example, with a `stop_gap` of 3, `full_scan` will keep scanning until it encounters 3
+//! consecutive script pubkeys with no associated transactions.
+//!
+//! This follows the same approach as other Bitcoin-related software,
+//! such as [Electrum](https://electrum.readthedocs.io/en/latest/faq.html#what-is-the-gap-limit),
+//! [BTCPay Server](https://docs.btcpayserver.org/FAQ/Wallet/#the-gap-limit-problem),
+//! and [Sparrow](https://www.sparrowwallet.com/docs/faq.html#ive-restored-my-wallet-but-some-of-my-funds-are-missing).
+//!
+//! A `stop_gap` of 0 will be treated as a `stop_gap` of 1.
+//!
+//! # Async
+//!
+//! Just like how [`EsploraExt`] extends the functionality of an
+//! [`esplora_client::BlockingClient`], [`EsploraExt`] is the async version which extends
+//! [`esplora_client::AsyncClient`].
//!
//! [`TxGraph`]: bdk_chain::tx_graph::TxGraph
+//! [`LocalChain`]: bdk_chain::local_chain::LocalChain
+//! [`ChainOracle`]: bdk_chain::ChainOracle
//! [`example_esplora`]: https://github.com/bitcoindevkit/bdk/tree/master/example-crates/example_esplora
use bdk_chain::{BlockId, ConfirmationBlockTime};