+use std::collections::BTreeSet;
+
use async_trait::async_trait;
use bdk_chain::collections::btree_map;
+use bdk_chain::Anchor;
use bdk_chain::{
- bitcoin::{Amount, BlockHash, OutPoint, ScriptBuf, TxOut, Txid},
+ bitcoin::{BlockHash, OutPoint, ScriptBuf, TxOut, Txid},
collections::BTreeMap,
local_chain::{self, CheckPoint},
BlockId, ConfirmationTimeHeightAnchor, TxGraph,
};
-use esplora_client::TxStatus;
+use esplora_client::{Amount, TxStatus};
use futures::{stream::FuturesOrdered, TryStreamExt};
-use crate::anchor_from_status;
+use crate::{anchor_from_status, FullScanUpdate, SyncUpdate};
/// [`esplora_client::Error`]
type Error = Box<esplora_client::Error>;
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait EsploraAsyncExt {
- /// Prepare a [`LocalChain`] update with blocks fetched from Esplora.
- ///
- /// * `local_tip` is the previous tip of [`LocalChain::tip`].
- /// * `request_heights` is the block heights that we are interested in fetching from Esplora.
- ///
- /// The result of this method can be applied to [`LocalChain::apply_update`].
- ///
- /// ## Consistency
- ///
- /// The chain update returned is guaranteed to be consistent as long as there is not a *large* re-org
- /// during the call. The size of re-org we can tollerate is server dependent but will be at
- /// least 10.
- ///
- /// [`LocalChain`]: bdk_chain::local_chain::LocalChain
- /// [`LocalChain::tip`]: bdk_chain::local_chain::LocalChain::tip
- /// [`LocalChain::apply_update`]: bdk_chain::local_chain::LocalChain::apply_update
- async fn update_local_chain(
- &self,
- local_tip: CheckPoint,
- request_heights: impl IntoIterator<IntoIter = impl Iterator<Item = u32> + Send> + Send,
- ) -> Result<local_chain::Update, Error>;
-
- /// Full scan the keychain scripts specified with the blockchain (via an Esplora client) and
- /// returns a [`TxGraph`] and a map of last active indices.
+ /// Scan keychain scripts for transactions against Esplora, returning an update that can be
+ /// applied to the receiving structures.
///
+ /// * `local_tip`: the previously seen tip from [`LocalChain::tip`].
/// * `keychain_spks`: keychains that we want to scan transactions for
///
- /// The full scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated
- /// transactions. `parallel_requests` specifies the max number of HTTP requests to make in
- /// parallel.
+ /// The full scan for each keychain stops after a gap of `stop_gap` script pubkeys with no
+ /// associated transactions. `parallel_requests` specifies the max number of HTTP requests to
+ /// make in parallel.
///
/// ## Note
///
/// and [Sparrow](https://www.sparrowwallet.com/docs/faq.html#ive-restored-my-wallet-but-some-of-my-funds-are-missing).
///
/// A `stop_gap` of 0 will be treated as a `stop_gap` of 1.
+ ///
+ /// [`LocalChain::tip`]: local_chain::LocalChain::tip
async fn full_scan<K: Ord + Clone + Send>(
&self,
+ local_tip: CheckPoint,
keychain_spks: BTreeMap<
K,
impl IntoIterator<IntoIter = impl Iterator<Item = (u32, ScriptBuf)> + Send> + Send,
>,
stop_gap: usize,
parallel_requests: usize,
- ) -> Result<(TxGraph<ConfirmationTimeHeightAnchor>, BTreeMap<K, u32>), Error>;
+ ) -> Result<FullScanUpdate<K>, Error>;
/// Sync a set of scripts with the blockchain (via an Esplora client) for the data
/// specified and return a [`TxGraph`].
///
+ /// * `local_tip`: the previously seen tip from [`LocalChain::tip`].
/// * `misc_spks`: scripts that we want to sync transactions for
/// * `txids`: transactions for which we want updated [`ConfirmationTimeHeightAnchor`]s
/// * `outpoints`: transactions associated with these outpoints (residing, spending) that we
/// If the scripts to sync are unknown, such as when restoring or importing a keychain that
/// may include scripts that have been used, use [`full_scan`] with the keychain.
///
+ /// [`LocalChain::tip`]: local_chain::LocalChain::tip
/// [`full_scan`]: EsploraAsyncExt::full_scan
async fn sync(
&self,
+ local_tip: CheckPoint,
misc_spks: impl IntoIterator<IntoIter = impl Iterator<Item = ScriptBuf> + Send> + Send,
txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send> + Send,
outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
parallel_requests: usize,
- ) -> Result<TxGraph<ConfirmationTimeHeightAnchor>, Error>;
+ ) -> Result<SyncUpdate, Error>;
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl EsploraAsyncExt for esplora_client::AsyncClient {
- async fn update_local_chain(
- &self,
- local_tip: CheckPoint,
- request_heights: impl IntoIterator<IntoIter = impl Iterator<Item = u32> + Send> + Send,
- ) -> Result<local_chain::Update, Error> {
- // Fetch latest N (server dependent) blocks from Esplora. The server guarantees these are
- // consistent.
- let mut fetched_blocks = self
- .get_blocks(None)
- .await?
- .into_iter()
- .map(|b| (b.time.height, b.id))
- .collect::<BTreeMap<u32, BlockHash>>();
- let new_tip_height = fetched_blocks
- .keys()
- .last()
- .copied()
- .expect("must have atleast one block");
-
- // Fetch blocks of heights that the caller is interested in, skipping blocks that are
- // already fetched when constructing `fetched_blocks`.
- for height in request_heights {
- // do not fetch blocks higher than remote tip
- if height > new_tip_height {
- continue;
- }
- // only fetch what is missing
- if let btree_map::Entry::Vacant(entry) = fetched_blocks.entry(height) {
- // ❗The return value of `get_block_hash` is not strictly guaranteed to be consistent
- // with the chain at the time of `get_blocks` above (there could have been a deep
- // re-org). Since `get_blocks` returns 10 (or so) blocks we are assuming that it's
- // not possible to have a re-org deeper than that.
- entry.insert(self.get_block_hash(height).await?);
- }
- }
-
- // Ensure `fetched_blocks` can create an update that connects with the original chain by
- // finding a "Point of Agreement".
- for (height, local_hash) in local_tip.iter().map(|cp| (cp.height(), cp.hash())) {
- if height > new_tip_height {
- continue;
- }
-
- let fetched_hash = match fetched_blocks.entry(height) {
- btree_map::Entry::Occupied(entry) => *entry.get(),
- btree_map::Entry::Vacant(entry) => {
- *entry.insert(self.get_block_hash(height).await?)
- }
- };
-
- // We have found point of agreement so the update will connect!
- if fetched_hash == local_hash {
- break;
- }
- }
-
- Ok(local_chain::Update {
- tip: CheckPoint::from_block_ids(fetched_blocks.into_iter().map(BlockId::from))
- .expect("must be in height order"),
- introduce_older_blocks: true,
- })
- }
-
async fn full_scan<K: Ord + Clone + Send>(
&self,
+ local_tip: CheckPoint,
keychain_spks: BTreeMap<
K,
impl IntoIterator<IntoIter = impl Iterator<Item = (u32, ScriptBuf)> + Send> + Send,
>,
stop_gap: usize,
parallel_requests: usize,
- ) -> Result<(TxGraph<ConfirmationTimeHeightAnchor>, BTreeMap<K, u32>), Error> {
- type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>);
- let parallel_requests = Ord::max(parallel_requests, 1);
- let mut graph = TxGraph::<ConfirmationTimeHeightAnchor>::default();
- let mut last_active_indexes = BTreeMap::<K, u32>::new();
- let stop_gap = Ord::max(stop_gap, 1);
-
- for (keychain, spks) in keychain_spks {
- let mut spks = spks.into_iter();
- let mut last_index = Option::<u32>::None;
- let mut last_active_index = Option::<u32>::None;
-
- loop {
- let handles = spks
- .by_ref()
- .take(parallel_requests)
- .map(|(spk_index, spk)| {
- let client = self.clone();
- async move {
- let mut last_seen = None;
- let mut spk_txs = Vec::new();
- loop {
- let txs = client.scripthash_txs(&spk, last_seen).await?;
- let tx_count = txs.len();
- last_seen = txs.last().map(|tx| tx.txid);
- spk_txs.extend(txs);
- if tx_count < 25 {
- break Result::<_, Error>::Ok((spk_index, spk_txs));
- }
- }
- }
- })
- .collect::<FuturesOrdered<_>>();
+ ) -> Result<FullScanUpdate<K>, Error> {
+ let update_blocks = init_chain_update(self, &local_tip).await?;
+ let (tx_graph, last_active_indices) =
+ full_scan_for_index_and_graph(self, keychain_spks, stop_gap, parallel_requests).await?;
+ let local_chain =
+ finalize_chain_update(self, &local_tip, tx_graph.all_anchors(), update_blocks).await?;
+ Ok(FullScanUpdate {
+ local_chain,
+ tx_graph,
+ last_active_indices,
+ })
+ }
- if handles.is_empty() {
- break;
- }
+ async fn sync(
+ &self,
+ local_tip: CheckPoint,
+ misc_spks: impl IntoIterator<IntoIter = impl Iterator<Item = ScriptBuf> + Send> + Send,
+ txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send> + Send,
+ outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
+ parallel_requests: usize,
+ ) -> Result<SyncUpdate, Error> {
+ let update_blocks = init_chain_update(self, &local_tip).await?;
+ let tx_graph =
+ sync_for_index_and_graph(self, misc_spks, txids, outpoints, parallel_requests).await?;
+ let local_chain =
+ finalize_chain_update(self, &local_tip, tx_graph.all_anchors(), update_blocks).await?;
+ Ok(SyncUpdate {
+ tx_graph,
+ local_chain,
+ })
+ }
+}
- for (index, txs) in handles.try_collect::<Vec<TxsOfSpkIndex>>().await? {
- last_index = Some(index);
- if !txs.is_empty() {
- last_active_index = Some(index);
- }
- for tx in txs {
- let _ = graph.insert_tx(tx.to_tx());
- if let Some(anchor) = anchor_from_status(&tx.status) {
- let _ = graph.insert_anchor(tx.txid, anchor);
- }
+/// Create the initial chain update.
+///
+/// This atomically fetches the latest blocks from Esplora and additional blocks to ensure the
+/// update can connect to the `start_tip`.
+///
+/// We want to do this before fetching transactions and anchors as we cannot fetch latest blocks and
+/// transactions atomically, and the checkpoint tip is used to determine last-scanned block (for
+/// block-based chain-sources). Therefore it's better to be conservative when setting the tip (use
+/// an earlier tip rather than a later tip) otherwise the caller may accidentally skip blocks when
+/// alternating between chain-sources.
+#[doc(hidden)]
+pub async fn init_chain_update(
+ client: &esplora_client::AsyncClient,
+ local_tip: &CheckPoint,
+) -> Result<BTreeMap<u32, BlockHash>, Error> {
+ // Fetch latest N (server dependent) blocks from Esplora. The server guarantees these are
+ // consistent.
+ let mut fetched_blocks = client
+ .get_blocks(None)
+ .await?
+ .into_iter()
+ .map(|b| (b.time.height, b.id))
+ .collect::<BTreeMap<u32, BlockHash>>();
+ let new_tip_height = fetched_blocks
+ .keys()
+ .last()
+ .copied()
+ .expect("must atleast have one block");
- let previous_outputs = tx.vin.iter().filter_map(|vin| {
- let prevout = vin.prevout.as_ref()?;
- Some((
- OutPoint {
- txid: vin.txid,
- vout: vin.vout,
- },
- TxOut {
- script_pubkey: prevout.scriptpubkey.clone(),
- value: Amount::from_sat(prevout.value),
- },
- ))
- });
-
- for (outpoint, txout) in previous_outputs {
- let _ = graph.insert_txout(outpoint, txout);
- }
- }
- }
+ // Ensure `fetched_blocks` can create an update that connects with the original chain by
+ // finding a "Point of Agreement".
+ for (height, local_hash) in local_tip.iter().map(|cp| (cp.height(), cp.hash())) {
+ if height > new_tip_height {
+ continue;
+ }
- let last_index = last_index.expect("Must be set since handles wasn't empty.");
- let gap_limit_reached = if let Some(i) = last_active_index {
- last_index >= i.saturating_add(stop_gap as u32)
- } else {
- last_index + 1 >= stop_gap as u32
- };
- if gap_limit_reached {
- break;
- }
+ let fetched_hash = match fetched_blocks.entry(height) {
+ btree_map::Entry::Occupied(entry) => *entry.get(),
+ btree_map::Entry::Vacant(entry) => *entry.insert(client.get_block_hash(height).await?),
+ };
+
+ // We have found point of agreement so the update will connect!
+ if fetched_hash == local_hash {
+ break;
+ }
+ }
+
+ Ok(fetched_blocks)
+}
+
+/// Fetches missing checkpoints and finalizes the [`local_chain::Update`].
+///
+/// A checkpoint is considered "missing" if an anchor (of `anchors`) points to a height without an
+/// existing checkpoint/block under `local_tip` or `update_blocks`.
+#[doc(hidden)]
+pub async fn finalize_chain_update<A: Anchor>(
+ client: &esplora_client::AsyncClient,
+ local_tip: &CheckPoint,
+ anchors: &BTreeSet<(A, Txid)>,
+ mut update_blocks: BTreeMap<u32, BlockHash>,
+) -> Result<local_chain::Update, Error> {
+ let update_tip_height = update_blocks
+ .keys()
+ .last()
+ .copied()
+ .expect("must atleast have one block");
+
+ // We want to have a corresponding checkpoint per height. We iterate the heights of anchors
+ // backwards, comparing it against our `local_tip`'s chain and our current set of
+ // `update_blocks` to see if a corresponding checkpoint already exists.
+ let anchor_heights = anchors
+ .iter()
+ .rev()
+ .map(|(a, _)| a.anchor_block().height)
+ // filter out heights that surpass the update tip
+ .filter(|h| *h <= update_tip_height)
+ // filter out duplicate heights
+ .filter({
+ let mut prev_height = Option::<u32>::None;
+ move |h| match prev_height.replace(*h) {
+ None => true,
+ Some(prev_h) => prev_h != *h,
}
+ });
- if let Some(last_active_index) = last_active_index {
- last_active_indexes.insert(keychain, last_active_index);
+ // We keep track of a checkpoint node of `local_tip` to make traversing the linked-list of
+ // checkpoints more efficient.
+ let mut curr_cp = local_tip.clone();
+
+ for h in anchor_heights {
+ if let Some(cp) = curr_cp.range(h..).last() {
+ curr_cp = cp.clone();
+ if cp.height() == h {
+ continue;
}
}
-
- Ok((graph, last_active_indexes))
+ if let btree_map::Entry::Vacant(entry) = update_blocks.entry(h) {
+ entry.insert(client.get_block_hash(h).await?);
+ }
}
- async fn sync(
- &self,
- misc_spks: impl IntoIterator<IntoIter = impl Iterator<Item = ScriptBuf> + Send> + Send,
- txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send> + Send,
- outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
- parallel_requests: usize,
- ) -> Result<TxGraph<ConfirmationTimeHeightAnchor>, Error> {
- let mut graph = self
- .full_scan(
- [(
- (),
- misc_spks
- .into_iter()
- .enumerate()
- .map(|(i, spk)| (i as u32, spk)),
- )]
- .into(),
- usize::MAX,
- parallel_requests,
- )
- .await
- .map(|(g, _)| g)?;
-
- let mut txids = txids.into_iter();
+ Ok(local_chain::Update {
+ tip: CheckPoint::from_block_ids(
+ update_blocks
+ .into_iter()
+ .map(|(height, hash)| BlockId { height, hash }),
+ )
+ .expect("must be in order"),
+ introduce_older_blocks: true,
+ })
+}
+
+/// This performs a full scan to get an update for the [`TxGraph`] and
+/// [`KeychainTxOutIndex`](bdk_chain::keychain::KeychainTxOutIndex).
+#[doc(hidden)]
+pub async fn full_scan_for_index_and_graph<K: Ord + Clone + Send>(
+ client: &esplora_client::AsyncClient,
+ keychain_spks: BTreeMap<
+ K,
+ impl IntoIterator<IntoIter = impl Iterator<Item = (u32, ScriptBuf)> + Send> + Send,
+ >,
+ stop_gap: usize,
+ parallel_requests: usize,
+) -> Result<(TxGraph<ConfirmationTimeHeightAnchor>, BTreeMap<K, u32>), Error> {
+ type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>);
+ let parallel_requests = Ord::max(parallel_requests, 1);
+ let mut graph = TxGraph::<ConfirmationTimeHeightAnchor>::default();
+ let mut last_active_indexes = BTreeMap::<K, u32>::new();
+
+ for (keychain, spks) in keychain_spks {
+ let mut spks = spks.into_iter();
+ let mut last_index = Option::<u32>::None;
+ let mut last_active_index = Option::<u32>::None;
+
loop {
- let handles = txids
+ let handles = spks
.by_ref()
.take(parallel_requests)
- .filter(|&txid| graph.get_tx(txid).is_none())
- .map(|txid| {
- let client = self.clone();
- async move { client.get_tx_status(&txid).await.map(|s| (txid, s)) }
+ .map(|(spk_index, spk)| {
+ let client = client.clone();
+ async move {
+ let mut last_seen = None;
+ let mut spk_txs = Vec::new();
+ loop {
+ let txs = client.scripthash_txs(&spk, last_seen).await?;
+ let tx_count = txs.len();
+ last_seen = txs.last().map(|tx| tx.txid);
+ spk_txs.extend(txs);
+ if tx_count < 25 {
+ break Result::<_, Error>::Ok((spk_index, spk_txs));
+ }
+ }
+ }
})
.collect::<FuturesOrdered<_>>();
break;
}
- for (txid, status) in handles.try_collect::<Vec<(Txid, TxStatus)>>().await? {
- if let Some(anchor) = anchor_from_status(&status) {
- let _ = graph.insert_anchor(txid, anchor);
+ for (index, txs) in handles.try_collect::<Vec<TxsOfSpkIndex>>().await? {
+ last_index = Some(index);
+ if !txs.is_empty() {
+ last_active_index = Some(index);
+ }
+ for tx in txs {
+ let _ = graph.insert_tx(tx.to_tx());
+ if let Some(anchor) = anchor_from_status(&tx.status) {
+ let _ = graph.insert_anchor(tx.txid, anchor);
+ }
+
+ let previous_outputs = tx.vin.iter().filter_map(|vin| {
+ let prevout = vin.prevout.as_ref()?;
+ Some((
+ OutPoint {
+ txid: vin.txid,
+ vout: vin.vout,
+ },
+ TxOut {
+ script_pubkey: prevout.scriptpubkey.clone(),
+ value: Amount::from_sat(prevout.value),
+ },
+ ))
+ });
+
+ for (outpoint, txout) in previous_outputs {
+ let _ = graph.insert_txout(outpoint, txout);
+ }
}
}
+
+ let last_index = last_index.expect("Must be set since handles wasn't empty.");
+ let gap_limit_reached = if let Some(i) = last_active_index {
+ last_index >= i.saturating_add(stop_gap as u32)
+ } else {
+ last_index + 1 >= stop_gap as u32
+ };
+ if gap_limit_reached {
+ break;
+ }
}
- for op in outpoints.into_iter() {
- if graph.get_tx(op.txid).is_none() {
- if let Some(tx) = self.get_tx(&op.txid).await? {
- let _ = graph.insert_tx(tx);
- }
- let status = self.get_tx_status(&op.txid).await?;
- if let Some(anchor) = anchor_from_status(&status) {
- let _ = graph.insert_anchor(op.txid, anchor);
- }
+ if let Some(last_active_index) = last_active_index {
+ last_active_indexes.insert(keychain, last_active_index);
+ }
+ }
+
+ Ok((graph, last_active_indexes))
+}
+
+#[doc(hidden)]
+pub async fn sync_for_index_and_graph(
+ client: &esplora_client::AsyncClient,
+ misc_spks: impl IntoIterator<IntoIter = impl Iterator<Item = ScriptBuf> + Send> + Send,
+ txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send> + Send,
+ outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
+ parallel_requests: usize,
+) -> Result<TxGraph<ConfirmationTimeHeightAnchor>, Error> {
+ let mut graph = full_scan_for_index_and_graph(
+ client,
+ [(
+ (),
+ misc_spks
+ .into_iter()
+ .enumerate()
+ .map(|(i, spk)| (i as u32, spk)),
+ )]
+ .into(),
+ usize::MAX,
+ parallel_requests,
+ )
+ .await
+ .map(|(g, _)| g)?;
+
+ let mut txids = txids.into_iter();
+ loop {
+ let handles = txids
+ .by_ref()
+ .take(parallel_requests)
+ .filter(|&txid| graph.get_tx(txid).is_none())
+ .map(|txid| {
+ let client = client.clone();
+ async move { client.get_tx_status(&txid).await.map(|s| (txid, s)) }
+ })
+ .collect::<FuturesOrdered<_>>();
+
+ if handles.is_empty() {
+ break;
+ }
+
+ for (txid, status) in handles.try_collect::<Vec<(Txid, TxStatus)>>().await? {
+ if let Some(anchor) = anchor_from_status(&status) {
+ let _ = graph.insert_anchor(txid, anchor);
}
+ }
+ }
- if let Some(op_status) = self.get_output_status(&op.txid, op.vout as _).await? {
- if let Some(txid) = op_status.txid {
- if graph.get_tx(txid).is_none() {
- if let Some(tx) = self.get_tx(&txid).await? {
- let _ = graph.insert_tx(tx);
- }
- let status = self.get_tx_status(&txid).await?;
- if let Some(anchor) = anchor_from_status(&status) {
- let _ = graph.insert_anchor(txid, anchor);
- }
+ for op in outpoints.into_iter() {
+ if graph.get_tx(op.txid).is_none() {
+ if let Some(tx) = client.get_tx(&op.txid).await? {
+ let _ = graph.insert_tx(tx);
+ }
+ let status = client.get_tx_status(&op.txid).await?;
+ if let Some(anchor) = anchor_from_status(&status) {
+ let _ = graph.insert_anchor(op.txid, anchor);
+ }
+ }
+
+ if let Some(op_status) = client.get_output_status(&op.txid, op.vout as _).await? {
+ if let Some(txid) = op_status.txid {
+ if graph.get_tx(txid).is_none() {
+ if let Some(tx) = client.get_tx(&txid).await? {
+ let _ = graph.insert_tx(tx);
+ }
+ let status = client.get_tx_status(&txid).await?;
+ if let Some(anchor) = anchor_from_status(&status) {
+ let _ = graph.insert_anchor(txid, anchor);
}
}
}
}
- Ok(graph)
}
+
+ Ok(graph)
}
+use std::collections::BTreeSet;
use std::thread::JoinHandle;
+use std::usize;
use bdk_chain::collections::btree_map;
use bdk_chain::collections::BTreeMap;
+use bdk_chain::Anchor;
use bdk_chain::{
bitcoin::{Amount, BlockHash, OutPoint, ScriptBuf, TxOut, Txid},
local_chain::{self, CheckPoint},
use esplora_client::TxStatus;
use crate::anchor_from_status;
+use crate::FullScanUpdate;
+use crate::SyncUpdate;
/// [`esplora_client::Error`]
-type Error = Box<esplora_client::Error>;
+pub type Error = Box<esplora_client::Error>;
/// Trait to extend the functionality of [`esplora_client::BlockingClient`].
///
///
/// [crate-level documentation]: crate
pub trait EsploraExt {
- /// Prepare a [`LocalChain`] update with blocks fetched from Esplora.
- ///
- /// * `local_tip` is the previous tip of [`LocalChain::tip`].
- /// * `request_heights` is the block heights that we are interested in fetching from Esplora.
- ///
- /// The result of this method can be applied to [`LocalChain::apply_update`].
- ///
- /// ## Consistency
- ///
- /// The chain update returned is guaranteed to be consistent as long as there is not a *large* re-org
- /// during the call. The size of re-org we can tollerate is server dependent but will be at
- /// least 10.
- ///
- /// [`LocalChain`]: bdk_chain::local_chain::LocalChain
- /// [`LocalChain::tip`]: bdk_chain::local_chain::LocalChain::tip
- /// [`LocalChain::apply_update`]: bdk_chain::local_chain::LocalChain::apply_update
- fn update_local_chain(
- &self,
- local_tip: CheckPoint,
- request_heights: impl IntoIterator<Item = u32>,
- ) -> Result<local_chain::Update, Error>;
-
- /// Full scan the keychain scripts specified with the blockchain (via an Esplora client) and
- /// returns a [`TxGraph`] and a map of last active indices.
+ /// Scan keychain scripts for transactions against Esplora, returning an update that can be
+ /// applied to the receiving structures.
///
+ /// * `local_tip`: the previously seen tip from [`LocalChain::tip`].
/// * `keychain_spks`: keychains that we want to scan transactions for
///
- /// The full scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated
- /// transactions. `parallel_requests` specifies the max number of HTTP requests to make in
- /// parallel.
+ /// The full scan for each keychain stops after a gap of `stop_gap` script pubkeys with no
+ /// associated transactions. `parallel_requests` specifies the max number of HTTP requests to
+ /// make in parallel.
///
/// ## Note
///
/// and [Sparrow](https://www.sparrowwallet.com/docs/faq.html#ive-restored-my-wallet-but-some-of-my-funds-are-missing).
///
/// A `stop_gap` of 0 will be treated as a `stop_gap` of 1.
+ ///
+ /// [`LocalChain::tip`]: local_chain::LocalChain::tip
fn full_scan<K: Ord + Clone>(
&self,
+ local_tip: CheckPoint,
keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, ScriptBuf)>>,
stop_gap: usize,
parallel_requests: usize,
- ) -> Result<(TxGraph<ConfirmationTimeHeightAnchor>, BTreeMap<K, u32>), Error>;
+ ) -> Result<FullScanUpdate<K>, Error>;
/// Sync a set of scripts with the blockchain (via an Esplora client) for the data
/// specified and return a [`TxGraph`].
///
+ /// * `local_tip`: the previously seen tip from [`LocalChain::tip`].
/// * `misc_spks`: scripts that we want to sync transactions for
/// * `txids`: transactions for which we want updated [`ConfirmationTimeHeightAnchor`]s
/// * `outpoints`: transactions associated with these outpoints (residing, spending) that we
/// If the scripts to sync are unknown, such as when restoring or importing a keychain that
/// may include scripts that have been used, use [`full_scan`] with the keychain.
///
+ /// [`LocalChain::tip`]: local_chain::LocalChain::tip
/// [`full_scan`]: EsploraExt::full_scan
fn sync(
&self,
+ local_tip: CheckPoint,
misc_spks: impl IntoIterator<Item = ScriptBuf>,
txids: impl IntoIterator<Item = Txid>,
outpoints: impl IntoIterator<Item = OutPoint>,
parallel_requests: usize,
- ) -> Result<TxGraph<ConfirmationTimeHeightAnchor>, Error>;
+ ) -> Result<SyncUpdate, Error>;
}
impl EsploraExt for esplora_client::BlockingClient {
- fn update_local_chain(
+ fn full_scan<K: Ord + Clone>(
&self,
local_tip: CheckPoint,
- request_heights: impl IntoIterator<Item = u32>,
- ) -> Result<local_chain::Update, Error> {
- // Fetch latest N (server dependent) blocks from Esplora. The server guarantees these are
- // consistent.
- let mut fetched_blocks = self
- .get_blocks(None)?
- .into_iter()
- .map(|b| (b.time.height, b.id))
- .collect::<BTreeMap<u32, BlockHash>>();
- let new_tip_height = fetched_blocks
- .keys()
- .last()
- .copied()
- .expect("must atleast have one block");
-
- // Fetch blocks of heights that the caller is interested in, skipping blocks that are
- // already fetched when constructing `fetched_blocks`.
- for height in request_heights {
- // do not fetch blocks higher than remote tip
- if height > new_tip_height {
- continue;
- }
- // only fetch what is missing
- if let btree_map::Entry::Vacant(entry) = fetched_blocks.entry(height) {
- // ❗The return value of `get_block_hash` is not strictly guaranteed to be consistent
- // with the chain at the time of `get_blocks` above (there could have been a deep
- // re-org). Since `get_blocks` returns 10 (or so) blocks we are assuming that it's
- // not possible to have a re-org deeper than that.
- entry.insert(self.get_block_hash(height)?);
- }
- }
-
- // Ensure `fetched_blocks` can create an update that connects with the original chain by
- // finding a "Point of Agreement".
- for (height, local_hash) in local_tip.iter().map(|cp| (cp.height(), cp.hash())) {
- if height > new_tip_height {
- continue;
- }
-
- let fetched_hash = match fetched_blocks.entry(height) {
- btree_map::Entry::Occupied(entry) => *entry.get(),
- btree_map::Entry::Vacant(entry) => *entry.insert(self.get_block_hash(height)?),
- };
-
- // We have found point of agreement so the update will connect!
- if fetched_hash == local_hash {
- break;
- }
- }
-
- Ok(local_chain::Update {
- tip: CheckPoint::from_block_ids(fetched_blocks.into_iter().map(BlockId::from))
- .expect("must be in height order"),
- introduce_older_blocks: true,
+ keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, ScriptBuf)>>,
+ stop_gap: usize,
+ parallel_requests: usize,
+ ) -> Result<FullScanUpdate<K>, Error> {
+ let update_blocks = init_chain_update_blocking(self, &local_tip)?;
+ let (tx_graph, last_active_indices) = full_scan_for_index_and_graph_blocking(
+ self,
+ keychain_spks,
+ stop_gap,
+ parallel_requests,
+ )?;
+ let local_chain = finalize_chain_update_blocking(
+ self,
+ &local_tip,
+ tx_graph.all_anchors(),
+ update_blocks,
+ )?;
+ Ok(FullScanUpdate {
+ local_chain,
+ tx_graph,
+ last_active_indices,
})
}
- fn full_scan<K: Ord + Clone>(
+ fn sync(
&self,
- keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, ScriptBuf)>>,
- stop_gap: usize,
+ local_tip: CheckPoint,
+ misc_spks: impl IntoIterator<Item = ScriptBuf>,
+ txids: impl IntoIterator<Item = Txid>,
+ outpoints: impl IntoIterator<Item = OutPoint>,
parallel_requests: usize,
- ) -> Result<(TxGraph<ConfirmationTimeHeightAnchor>, BTreeMap<K, u32>), Error> {
- type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>);
- let parallel_requests = Ord::max(parallel_requests, 1);
- let mut graph = TxGraph::<ConfirmationTimeHeightAnchor>::default();
- let mut last_active_indexes = BTreeMap::<K, u32>::new();
- let stop_gap = Ord::max(stop_gap, 1);
-
- for (keychain, spks) in keychain_spks {
- let mut spks = spks.into_iter();
- let mut last_index = Option::<u32>::None;
- let mut last_active_index = Option::<u32>::None;
-
- loop {
- let handles = spks
- .by_ref()
- .take(parallel_requests)
- .map(|(spk_index, spk)| {
- std::thread::spawn({
- let client = self.clone();
- move || -> Result<TxsOfSpkIndex, Error> {
- let mut last_seen = None;
- let mut spk_txs = Vec::new();
- loop {
- let txs = client.scripthash_txs(&spk, last_seen)?;
- let tx_count = txs.len();
- last_seen = txs.last().map(|tx| tx.txid);
- spk_txs.extend(txs);
- if tx_count < 25 {
- break Ok((spk_index, spk_txs));
- }
- }
- }
- })
- })
- .collect::<Vec<JoinHandle<Result<TxsOfSpkIndex, Error>>>>();
+ ) -> Result<SyncUpdate, Error> {
+ let update_blocks = init_chain_update_blocking(self, &local_tip)?;
+ let tx_graph = sync_for_index_and_graph_blocking(
+ self,
+ misc_spks,
+ txids,
+ outpoints,
+ parallel_requests,
+ )?;
+ let local_chain = finalize_chain_update_blocking(
+ self,
+ &local_tip,
+ tx_graph.all_anchors(),
+ update_blocks,
+ )?;
+ Ok(SyncUpdate {
+ local_chain,
+ tx_graph,
+ })
+ }
+}
- if handles.is_empty() {
- break;
- }
+/// Create the initial chain update.
+///
+/// This atomically fetches the latest blocks from Esplora and additional blocks to ensure the
+/// update can connect to the `start_tip`.
+///
+/// We want to do this before fetching transactions and anchors as we cannot fetch latest blocks and
+/// transactions atomically, and the checkpoint tip is used to determine last-scanned block (for
+/// block-based chain-sources). Therefore it's better to be conservative when setting the tip (use
+/// an earlier tip rather than a later tip) otherwise the caller may accidentally skip blocks when
+/// alternating between chain-sources.
+#[doc(hidden)]
+pub fn init_chain_update_blocking(
+ client: &esplora_client::BlockingClient,
+ local_tip: &CheckPoint,
+) -> Result<BTreeMap<u32, BlockHash>, Error> {
+ // Fetch latest N (server dependent) blocks from Esplora. The server guarantees these are
+ // consistent.
+ let mut fetched_blocks = client
+ .get_blocks(None)?
+ .into_iter()
+ .map(|b| (b.time.height, b.id))
+ .collect::<BTreeMap<u32, BlockHash>>();
+ let new_tip_height = fetched_blocks
+ .keys()
+ .last()
+ .copied()
+ .expect("must atleast have one block");
- for handle in handles {
- let (index, txs) = handle.join().expect("thread must not panic")?;
- last_index = Some(index);
- if !txs.is_empty() {
- last_active_index = Some(index);
- }
- for tx in txs {
- let _ = graph.insert_tx(tx.to_tx());
- if let Some(anchor) = anchor_from_status(&tx.status) {
- let _ = graph.insert_anchor(tx.txid, anchor);
- }
+ // Ensure `fetched_blocks` can create an update that connects with the original chain by
+ // finding a "Point of Agreement".
+ for (height, local_hash) in local_tip.iter().map(|cp| (cp.height(), cp.hash())) {
+ if height > new_tip_height {
+ continue;
+ }
- let previous_outputs = tx.vin.iter().filter_map(|vin| {
- let prevout = vin.prevout.as_ref()?;
- Some((
- OutPoint {
- txid: vin.txid,
- vout: vin.vout,
- },
- TxOut {
- script_pubkey: prevout.scriptpubkey.clone(),
- value: Amount::from_sat(prevout.value),
- },
- ))
- });
-
- for (outpoint, txout) in previous_outputs {
- let _ = graph.insert_txout(outpoint, txout);
- }
- }
- }
+ let fetched_hash = match fetched_blocks.entry(height) {
+ btree_map::Entry::Occupied(entry) => *entry.get(),
+ btree_map::Entry::Vacant(entry) => *entry.insert(client.get_block_hash(height)?),
+ };
- let last_index = last_index.expect("Must be set since handles wasn't empty.");
- let gap_limit_reached = if let Some(i) = last_active_index {
- last_index >= i.saturating_add(stop_gap as u32)
- } else {
- last_index + 1 >= stop_gap as u32
- };
- if gap_limit_reached {
- break;
- }
+ // We have found point of agreement so the update will connect!
+ if fetched_hash == local_hash {
+ break;
+ }
+ }
+
+ Ok(fetched_blocks)
+}
+
+/// Fetches missing checkpoints and finalizes the [`local_chain::Update`].
+///
+/// A checkpoint is considered "missing" if an anchor (of `anchors`) points to a height without an
+/// existing checkpoint/block under `local_tip` or `update_blocks`.
+#[doc(hidden)]
+pub fn finalize_chain_update_blocking<A: Anchor>(
+ client: &esplora_client::BlockingClient,
+ local_tip: &CheckPoint,
+ anchors: &BTreeSet<(A, Txid)>,
+ mut update_blocks: BTreeMap<u32, BlockHash>,
+) -> Result<local_chain::Update, Error> {
+ let update_tip_height = update_blocks
+ .keys()
+ .last()
+ .copied()
+ .expect("must atleast have one block");
+
+ // We want to have a corresponding checkpoint per height. We iterate the heights of anchors
+ // backwards, comparing it against our `local_tip`'s chain and our current set of
+ // `update_blocks` to see if a corresponding checkpoint already exists.
+ let anchor_heights = anchors
+ .iter()
+ .rev()
+ .map(|(a, _)| a.anchor_block().height)
+ // filter out heights that surpass the update tip
+ .filter(|h| *h <= update_tip_height)
+ // filter out duplicate heights
+ .filter({
+ let mut prev_height = Option::<u32>::None;
+ move |h| match prev_height.replace(*h) {
+ None => true,
+ Some(prev_h) => prev_h != *h,
}
+ });
+
+ // We keep track of a checkpoint node of `local_tip` to make traversing the linked-list of
+ // checkpoints more efficient.
+ let mut curr_cp = local_tip.clone();
- if let Some(last_active_index) = last_active_index {
- last_active_indexes.insert(keychain, last_active_index);
+ for h in anchor_heights {
+ if let Some(cp) = curr_cp.range(h..).last() {
+ curr_cp = cp.clone();
+ if cp.height() == h {
+ continue;
}
}
-
- Ok((graph, last_active_indexes))
+ if let btree_map::Entry::Vacant(entry) = update_blocks.entry(h) {
+ entry.insert(client.get_block_hash(h)?);
+ }
}
- fn sync(
- &self,
- misc_spks: impl IntoIterator<Item = ScriptBuf>,
- txids: impl IntoIterator<Item = Txid>,
- outpoints: impl IntoIterator<Item = OutPoint>,
- parallel_requests: usize,
- ) -> Result<TxGraph<ConfirmationTimeHeightAnchor>, Error> {
- let mut graph = self
- .full_scan(
- [(
- (),
- misc_spks
- .into_iter()
- .enumerate()
- .map(|(i, spk)| (i as u32, spk)),
- )]
- .into(),
- usize::MAX,
- parallel_requests,
- )
- .map(|(g, _)| g)?;
-
- let mut txids = txids.into_iter();
+ Ok(local_chain::Update {
+ tip: CheckPoint::from_block_ids(
+ update_blocks
+ .into_iter()
+ .map(|(height, hash)| BlockId { height, hash }),
+ )
+ .expect("must be in order"),
+ introduce_older_blocks: true,
+ })
+}
+
+/// This performs a full scan to get an update for the [`TxGraph`] and
+/// [`KeychainTxOutIndex`](bdk_chain::keychain::KeychainTxOutIndex).
+#[doc(hidden)]
+pub fn full_scan_for_index_and_graph_blocking<K: Ord + Clone>(
+ client: &esplora_client::BlockingClient,
+ keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, ScriptBuf)>>,
+ stop_gap: usize,
+ parallel_requests: usize,
+) -> Result<(TxGraph<ConfirmationTimeHeightAnchor>, BTreeMap<K, u32>), Error> {
+ type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>);
+ let parallel_requests = Ord::max(parallel_requests, 1);
+ let mut tx_graph = TxGraph::<ConfirmationTimeHeightAnchor>::default();
+ let mut last_active_indices = BTreeMap::<K, u32>::new();
+
+ for (keychain, spks) in keychain_spks {
+ let mut spks = spks.into_iter();
+ let mut last_index = Option::<u32>::None;
+ let mut last_active_index = Option::<u32>::None;
+
loop {
- let handles = txids
+ let handles = spks
.by_ref()
.take(parallel_requests)
- .filter(|&txid| graph.get_tx(txid).is_none())
- .map(|txid| {
+ .map(|(spk_index, spk)| {
std::thread::spawn({
- let client = self.clone();
- move || {
- client
- .get_tx_status(&txid)
- .map_err(Box::new)
- .map(|s| (txid, s))
+ let client = client.clone();
+ move || -> Result<TxsOfSpkIndex, Error> {
+ let mut last_seen = None;
+ let mut spk_txs = Vec::new();
+ loop {
+ let txs = client.scripthash_txs(&spk, last_seen)?;
+ let tx_count = txs.len();
+ last_seen = txs.last().map(|tx| tx.txid);
+ spk_txs.extend(txs);
+ if tx_count < 25 {
+ break Ok((spk_index, spk_txs));
+ }
+ }
}
})
})
- .collect::<Vec<JoinHandle<Result<(Txid, TxStatus), Error>>>>();
+ .collect::<Vec<JoinHandle<Result<TxsOfSpkIndex, Error>>>>();
if handles.is_empty() {
break;
}
for handle in handles {
- let (txid, status) = handle.join().expect("thread must not panic")?;
- if let Some(anchor) = anchor_from_status(&status) {
- let _ = graph.insert_anchor(txid, anchor);
+ let (index, txs) = handle.join().expect("thread must not panic")?;
+ last_index = Some(index);
+ if !txs.is_empty() {
+ last_active_index = Some(index);
+ }
+ for tx in txs {
+ let _ = tx_graph.insert_tx(tx.to_tx());
+ if let Some(anchor) = anchor_from_status(&tx.status) {
+ let _ = tx_graph.insert_anchor(tx.txid, anchor);
+ }
+
+ let previous_outputs = tx.vin.iter().filter_map(|vin| {
+ let prevout = vin.prevout.as_ref()?;
+ Some((
+ OutPoint {
+ txid: vin.txid,
+ vout: vin.vout,
+ },
+ TxOut {
+ script_pubkey: prevout.scriptpubkey.clone(),
+ value: Amount::from_sat(prevout.value),
+ },
+ ))
+ });
+
+ for (outpoint, txout) in previous_outputs {
+ let _ = tx_graph.insert_txout(outpoint, txout);
+ }
}
}
+
+ let last_index = last_index.expect("Must be set since handles wasn't empty.");
+ let gap_limit_reached = if let Some(i) = last_active_index {
+ last_index >= i.saturating_add(stop_gap as u32)
+ } else {
+ last_index + 1 >= stop_gap as u32
+ };
+ if gap_limit_reached {
+ break;
+ }
}
- for op in outpoints {
- if graph.get_tx(op.txid).is_none() {
- if let Some(tx) = self.get_tx(&op.txid)? {
- let _ = graph.insert_tx(tx);
- }
- let status = self.get_tx_status(&op.txid)?;
- if let Some(anchor) = anchor_from_status(&status) {
- let _ = graph.insert_anchor(op.txid, anchor);
- }
+ if let Some(last_active_index) = last_active_index {
+ last_active_indices.insert(keychain, last_active_index);
+ }
+ }
+
+ Ok((tx_graph, last_active_indices))
+}
+
+#[doc(hidden)]
+pub fn sync_for_index_and_graph_blocking(
+ client: &esplora_client::BlockingClient,
+ misc_spks: impl IntoIterator<Item = ScriptBuf>,
+ txids: impl IntoIterator<Item = Txid>,
+ outpoints: impl IntoIterator<Item = OutPoint>,
+ parallel_requests: usize,
+) -> Result<TxGraph<ConfirmationTimeHeightAnchor>, Error> {
+ let (mut tx_graph, _) = full_scan_for_index_and_graph_blocking(
+ client,
+ {
+ let mut keychains = BTreeMap::new();
+ keychains.insert(
+ (),
+ misc_spks
+ .into_iter()
+ .enumerate()
+ .map(|(i, spk)| (i as u32, spk)),
+ );
+ keychains
+ },
+ usize::MAX,
+ parallel_requests,
+ )?;
+
+ let mut txids = txids.into_iter();
+ loop {
+ let handles = txids
+ .by_ref()
+ .take(parallel_requests)
+ .filter(|&txid| tx_graph.get_tx(txid).is_none())
+ .map(|txid| {
+ std::thread::spawn({
+ let client = client.clone();
+ move || {
+ client
+ .get_tx_status(&txid)
+ .map_err(Box::new)
+ .map(|s| (txid, s))
+ }
+ })
+ })
+ .collect::<Vec<JoinHandle<Result<(Txid, TxStatus), Error>>>>();
+
+ if handles.is_empty() {
+ break;
+ }
+
+ for handle in handles {
+ let (txid, status) = handle.join().expect("thread must not panic")?;
+ if let Some(anchor) = anchor_from_status(&status) {
+ let _ = tx_graph.insert_anchor(txid, anchor);
}
+ }
+ }
- if let Some(op_status) = self.get_output_status(&op.txid, op.vout as _)? {
- if let Some(txid) = op_status.txid {
- if graph.get_tx(txid).is_none() {
- if let Some(tx) = self.get_tx(&txid)? {
- let _ = graph.insert_tx(tx);
- }
- let status = self.get_tx_status(&txid)?;
- if let Some(anchor) = anchor_from_status(&status) {
- let _ = graph.insert_anchor(txid, anchor);
- }
+ for op in outpoints {
+ if tx_graph.get_tx(op.txid).is_none() {
+ if let Some(tx) = client.get_tx(&op.txid)? {
+ let _ = tx_graph.insert_tx(tx);
+ }
+ let status = client.get_tx_status(&op.txid)?;
+ if let Some(anchor) = anchor_from_status(&status) {
+ let _ = tx_graph.insert_anchor(op.txid, anchor);
+ }
+ }
+
+ if let Some(op_status) = client.get_output_status(&op.txid, op.vout as _)? {
+ if let Some(txid) = op_status.txid {
+ if tx_graph.get_tx(txid).is_none() {
+ if let Some(tx) = client.get_tx(&txid)? {
+ let _ = tx_graph.insert_tx(tx);
+ }
+ let status = client.get_tx_status(&txid)?;
+ if let Some(anchor) = anchor_from_status(&status) {
+ let _ = tx_graph.insert_anchor(txid, anchor);
}
}
}
}
- Ok(graph)
}
+
+ Ok(tx_graph)
}
//! [`TxGraph`]: bdk_chain::tx_graph::TxGraph
//! [`example_esplora`]: https://github.com/bitcoindevkit/bdk/tree/master/example-crates/example_esplora
-use bdk_chain::{BlockId, ConfirmationTimeHeightAnchor};
+use std::collections::BTreeMap;
+
+use bdk_chain::{local_chain, BlockId, ConfirmationTimeHeightAnchor, TxGraph};
use esplora_client::TxStatus;
pub use esplora_client;
None
}
}
+
+/// Update returns from a full scan.
+pub struct FullScanUpdate<K> {
+ /// The update to apply to the receiving [`LocalChain`](local_chain::LocalChain).
+ pub local_chain: local_chain::Update,
+ /// The update to apply to the receiving [`TxGraph`].
+ pub tx_graph: TxGraph<ConfirmationTimeHeightAnchor>,
+ /// Last active indices for the corresponding keychains (`K`).
+ pub last_active_indices: BTreeMap<K, u32>,
+}
+
+/// Update returned from a sync.
+pub struct SyncUpdate {
+ /// The update to apply to the receiving [`LocalChain`](local_chain::LocalChain).
+ pub local_chain: local_chain::Update,
+ /// The update to apply to the receiving [`TxGraph`].
+ pub tx_graph: TxGraph<ConfirmationTimeHeightAnchor>,
+}
use electrsd::bitcoind::anyhow;
use electrsd::bitcoind::bitcoincore_rpc::RpcApi;
use esplora_client::{self, Builder};
-use std::collections::{BTreeMap, HashSet};
+use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::str::FromStr;
use std::thread::sleep;
use std::time::Duration;
sleep(Duration::from_millis(10))
}
- let graph_update = client
+ // use a full checkpoint linked list (since this is not what we are testing)
+ let cp_tip = env.make_checkpoint_tip();
+
+ let sync_update = client
.sync(
+ cp_tip.clone(),
misc_spks.into_iter(),
vec![].into_iter(),
vec![].into_iter(),
)
.await?;
+ assert!(
+ {
+ let update_cps = sync_update
+ .local_chain
+ .tip
+ .iter()
+ .map(|cp| cp.block_id())
+ .collect::<BTreeSet<_>>();
+ let superset_cps = cp_tip
+ .iter()
+ .map(|cp| cp.block_id())
+ .collect::<BTreeSet<_>>();
+ superset_cps.is_superset(&update_cps)
+ },
+ "update should not alter original checkpoint tip since we already started with all checkpoints",
+ );
+
+ let graph_update = sync_update.tx_graph;
// Check to see if we have the floating txouts available from our two created transactions'
// previous outputs in order to calculate transaction fees.
for tx in graph_update.full_txs() {
sleep(Duration::from_millis(10))
}
+ // use a full checkpoint linked list (since this is not what we are testing)
+ let cp_tip = env.make_checkpoint_tip();
+
// A scan with a gap limit of 3 won't find the transaction, but a scan with a gap limit of 4
// will.
- let (graph_update, active_indices) = client.full_scan(keychains.clone(), 3, 1).await?;
- assert!(graph_update.full_txs().next().is_none());
- assert!(active_indices.is_empty());
- let (graph_update, active_indices) = client.full_scan(keychains.clone(), 4, 1).await?;
- assert_eq!(graph_update.full_txs().next().unwrap().txid, txid_4th_addr);
- assert_eq!(active_indices[&0], 3);
+ let full_scan_update = client
+ .full_scan(cp_tip.clone(), keychains.clone(), 3, 1)
+ .await?;
+ assert!(full_scan_update.tx_graph.full_txs().next().is_none());
+ assert!(full_scan_update.last_active_indices.is_empty());
+ let full_scan_update = client
+ .full_scan(cp_tip.clone(), keychains.clone(), 4, 1)
+ .await?;
+ assert_eq!(
+ full_scan_update.tx_graph.full_txs().next().unwrap().txid,
+ txid_4th_addr
+ );
+ assert_eq!(full_scan_update.last_active_indices[&0], 3);
// Now receive a coin on the last address.
let txid_last_addr = env.bitcoind.client.send_to_address(
// A scan with gap limit 5 won't find the second transaction, but a scan with gap limit 6 will.
// The last active indice won't be updated in the first case but will in the second one.
- let (graph_update, active_indices) = client.full_scan(keychains.clone(), 5, 1).await?;
- let txs: HashSet<_> = graph_update.full_txs().map(|tx| tx.txid).collect();
+ let full_scan_update = client
+ .full_scan(cp_tip.clone(), keychains.clone(), 5, 1)
+ .await?;
+ let txs: HashSet<_> = full_scan_update
+ .tx_graph
+ .full_txs()
+ .map(|tx| tx.txid)
+ .collect();
assert_eq!(txs.len(), 1);
assert!(txs.contains(&txid_4th_addr));
- assert_eq!(active_indices[&0], 3);
- let (graph_update, active_indices) = client.full_scan(keychains, 6, 1).await?;
- let txs: HashSet<_> = graph_update.full_txs().map(|tx| tx.txid).collect();
+ assert_eq!(full_scan_update.last_active_indices[&0], 3);
+ let full_scan_update = client.full_scan(cp_tip, keychains, 6, 1).await?;
+ let txs: HashSet<_> = full_scan_update
+ .tx_graph
+ .full_txs()
+ .map(|tx| tx.txid)
+ .collect();
assert_eq!(txs.len(), 2);
assert!(txs.contains(&txid_4th_addr) && txs.contains(&txid_last_addr));
- assert_eq!(active_indices[&0], 9);
+ assert_eq!(full_scan_update.last_active_indices[&0], 9);
Ok(())
}
use bdk_esplora::EsploraExt;
use electrsd::bitcoind::anyhow;
use electrsd::bitcoind::bitcoincore_rpc::RpcApi;
-use esplora_client::{self, Builder};
+use esplora_client::{self, BlockHash, Builder};
use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::str::FromStr;
use std::thread::sleep;
sleep(Duration::from_millis(10))
}
- let graph_update = client.sync(
+ // use a full checkpoint linked list (since this is not what we are testing)
+ let cp_tip = env.make_checkpoint_tip();
+
+ let sync_update = client.sync(
+ cp_tip.clone(),
misc_spks.into_iter(),
vec![].into_iter(),
vec![].into_iter(),
1,
)?;
+ assert!(
+ {
+ let update_cps = sync_update
+ .local_chain
+ .tip
+ .iter()
+ .map(|cp| cp.block_id())
+ .collect::<BTreeSet<_>>();
+ let superset_cps = cp_tip
+ .iter()
+ .map(|cp| cp.block_id())
+ .collect::<BTreeSet<_>>();
+ superset_cps.is_superset(&update_cps)
+ },
+ "update should not alter original checkpoint tip since we already started with all checkpoints",
+ );
+
+ let graph_update = sync_update.tx_graph;
// Check to see if we have the floating txouts available from our two created transactions'
// previous outputs in order to calculate transaction fees.
for tx in graph_update.full_txs() {
sleep(Duration::from_millis(10))
}
+ // use a full checkpoint linked list (since this is not what we are testing)
+ let cp_tip = env.make_checkpoint_tip();
+
// A scan with a stop_gap of 3 won't find the transaction, but a scan with a gap limit of 4
// will.
- let (graph_update, active_indices) = client.full_scan(keychains.clone(), 3, 1)?;
- assert!(graph_update.full_txs().next().is_none());
- assert!(active_indices.is_empty());
- let (graph_update, active_indices) = client.full_scan(keychains.clone(), 4, 1)?;
- assert_eq!(graph_update.full_txs().next().unwrap().txid, txid_4th_addr);
- assert_eq!(active_indices[&0], 3);
+ let full_scan_update = client.full_scan(cp_tip.clone(), keychains.clone(), 3, 1)?;
+ assert!(full_scan_update.tx_graph.full_txs().next().is_none());
+ assert!(full_scan_update.last_active_indices.is_empty());
+ let full_scan_update = client.full_scan(cp_tip.clone(), keychains.clone(), 4, 1)?;
+ assert_eq!(
+ full_scan_update.tx_graph.full_txs().next().unwrap().txid,
+ txid_4th_addr
+ );
+ assert_eq!(full_scan_update.last_active_indices[&0], 3);
// Now receive a coin on the last address.
let txid_last_addr = env.bitcoind.client.send_to_address(
// A scan with gap limit 5 won't find the second transaction, but a scan with gap limit 6 will.
// The last active indice won't be updated in the first case but will in the second one.
- let (graph_update, active_indices) = client.full_scan(keychains.clone(), 5, 1)?;
- let txs: HashSet<_> = graph_update.full_txs().map(|tx| tx.txid).collect();
+ let full_scan_update = client.full_scan(cp_tip.clone(), keychains.clone(), 5, 1)?;
+ let txs: HashSet<_> = full_scan_update
+ .tx_graph
+ .full_txs()
+ .map(|tx| tx.txid)
+ .collect();
assert_eq!(txs.len(), 1);
assert!(txs.contains(&txid_4th_addr));
- assert_eq!(active_indices[&0], 3);
- let (graph_update, active_indices) = client.full_scan(keychains, 6, 1)?;
- let txs: HashSet<_> = graph_update.full_txs().map(|tx| tx.txid).collect();
+ assert_eq!(full_scan_update.last_active_indices[&0], 3);
+ let full_scan_update = client.full_scan(cp_tip.clone(), keychains, 6, 1)?;
+ let txs: HashSet<_> = full_scan_update
+ .tx_graph
+ .full_txs()
+ .map(|tx| tx.txid)
+ .collect();
assert_eq!(txs.len(), 2);
assert!(txs.contains(&txid_4th_addr) && txs.contains(&txid_last_addr));
- assert_eq!(active_indices[&0], 9);
+ assert_eq!(full_scan_update.last_active_indices[&0], 9);
Ok(())
}
for (i, t) in test_cases.into_iter().enumerate() {
println!("Case {}: {}", i, t.name);
let mut chain = t.chain;
+ let cp_tip = chain.tip();
- let update = client
- .update_local_chain(chain.tip(), t.request_heights.iter().copied())
- .map_err(|err| {
- anyhow::format_err!("[{}:{}] `update_local_chain` failed: {}", i, t.name, err)
+ let new_blocks =
+ bdk_esplora::init_chain_update_blocking(&client, &cp_tip).map_err(|err| {
+ anyhow::format_err!("[{}:{}] `init_chain_update` failed: {}", i, t.name, err)
})?;
- let update_blocks = update
+ let mock_anchors = t
+ .request_heights
+ .iter()
+ .map(|&h| {
+ let anchor_blockhash: BlockHash = bdk_chain::bitcoin::hashes::Hash::hash(
+ &format!("hash_at_height_{}", h).into_bytes(),
+ );
+ let txid: Txid = bdk_chain::bitcoin::hashes::Hash::hash(
+ &format!("txid_at_height_{}", h).into_bytes(),
+ );
+ let anchor = BlockId {
+ height: h,
+ hash: anchor_blockhash,
+ };
+ (anchor, txid)
+ })
+ .collect::<BTreeSet<_>>();
+
+ let chain_update = bdk_esplora::finalize_chain_update_blocking(
+ &client,
+ &cp_tip,
+ &mock_anchors,
+ new_blocks,
+ )?;
+ let update_blocks = chain_update
.tip
.iter()
.map(|cp| cp.block_id())
)
.collect::<BTreeSet<_>>();
- assert_eq!(
- update_blocks, exp_update_blocks,
+ assert!(
+ update_blocks.is_superset(&exp_update_blocks),
"[{}:{}] unexpected update",
- i, t.name
+ i,
+ t.name
);
let _ = chain
- .apply_update(update)
+ .apply_update(chain_update)
.unwrap_or_else(|err| panic!("[{}:{}] update failed to apply: {}", i, t.name, err));
// all requested heights must exist in the final chain
use bdk_chain::{
bitcoin::{
- address::NetworkChecked, block::Header, hash_types::TxMerkleNode, hashes::Hash, secp256k1::rand::random, transaction, Address, Amount, Block, BlockHash, CompactTarget, ScriptBuf, ScriptHash, Transaction, TxIn, TxOut, Txid
+ address::NetworkChecked, block::Header, hash_types::TxMerkleNode, hashes::Hash,
+ secp256k1::rand::random, transaction, Address, Amount, Block, BlockHash, CompactTarget,
+ ScriptBuf, ScriptHash, Transaction, TxIn, TxOut, Txid,
},
local_chain::CheckPoint,
BlockId,
use std::{
- collections::{BTreeMap, BTreeSet},
+ collections::BTreeMap,
io::{self, Write},
sync::Mutex,
};
esplora_args: EsploraArgs,
},
}
+
impl EsploraCommands {
fn esplora_args(&self) -> EsploraArgs {
match self {
};
let client = esplora_cmd.esplora_args().client(args.network)?;
- // Prepare the `IndexedTxGraph` update based on whether we are scanning or syncing.
+ // Prepare the `IndexedTxGraph` and `LocalChain` updates based on whether we are scanning or
+ // syncing.
+ //
// Scanning: We are iterating through spks of all keychains and scanning for transactions for
// each spk. We start with the lowest derivation index spk and stop scanning after `stop_gap`
// number of consecutive spks have no transaction history. A Scan is done in situations of
// wallet restoration. It is a special case. Applications should use "sync" style updates
// after an initial scan.
+ //
// Syncing: We only check for specified spks, utxos and txids to update their confirmation
// status or fetch missing transactions.
- let indexed_tx_graph_changeset = match &esplora_cmd {
+ let (local_chain_changeset, indexed_tx_graph_changeset) = match &esplora_cmd {
EsploraCommands::Scan {
stop_gap,
scan_options,
..
} => {
+ let local_tip = chain.lock().expect("mutex must not be poisoned").tip();
let keychain_spks = graph
.lock()
.expect("mutex must not be poisoned")
// is reached. It returns a `TxGraph` update (`graph_update`) and a structure that
// represents the last active spk derivation indices of keychains
// (`keychain_indices_update`).
- let (mut graph_update, last_active_indices) = client
- .full_scan(keychain_spks, *stop_gap, scan_options.parallel_requests)
+ let mut update = client
+ .full_scan(
+ local_tip,
+ keychain_spks,
+ *stop_gap,
+ scan_options.parallel_requests,
+ )
.context("scanning for transactions")?;
// We want to keep track of the latest time a transaction was seen unconfirmed.
let now = std::time::UNIX_EPOCH.elapsed().unwrap().as_secs();
- let _ = graph_update.update_last_seen_unconfirmed(now);
+ let _ = update.tx_graph.update_last_seen_unconfirmed(now);
let mut graph = graph.lock().expect("mutex must not be poisoned");
+ let mut chain = chain.lock().expect("mutex must not be poisoned");
// Because we did a stop gap based scan we are likely to have some updates to our
// deriviation indices. Usually before a scan you are on a fresh wallet with no
// addresses derived so we need to derive up to last active addresses the scan found
// before adding the transactions.
- let (_, index_changeset) = graph.index.reveal_to_target_multi(&last_active_indices);
- let mut indexed_tx_graph_changeset = graph.apply_update(graph_update);
- indexed_tx_graph_changeset.append(index_changeset.into());
- indexed_tx_graph_changeset
+ (chain.apply_update(update.local_chain)?, {
+ let (_, index_changeset) = graph
+ .index
+ .reveal_to_target_multi(&update.last_active_indices);
+ let mut indexed_tx_graph_changeset = graph.apply_update(update.tx_graph);
+ indexed_tx_graph_changeset.append(index_changeset.into());
+ indexed_tx_graph_changeset
+ })
}
EsploraCommands::Sync {
mut unused_spks,
let mut outpoints: Box<dyn Iterator<Item = OutPoint>> = Box::new(core::iter::empty());
let mut txids: Box<dyn Iterator<Item = Txid>> = Box::new(core::iter::empty());
+ let local_tip = chain.lock().expect("mutex must not be poisoned").tip();
+
// Get a short lock on the structures to get spks, utxos, and txs that we are interested
// in.
{
let graph = graph.lock().unwrap();
let chain = chain.lock().unwrap();
- let chain_tip = chain.tip().block_id();
if *all_spks {
let all_spks = graph
let init_outpoints = graph.index.outpoints().iter().cloned();
let utxos = graph
.graph()
- .filter_chain_unspents(&*chain, chain_tip, init_outpoints)
+ .filter_chain_unspents(&*chain, local_tip.block_id(), init_outpoints)
.map(|(_, utxo)| utxo)
.collect::<Vec<_>>();
outpoints = Box::new(
// `EsploraExt::update_tx_graph_without_keychain`.
let unconfirmed_txids = graph
.graph()
- .list_chain_txs(&*chain, chain_tip)
+ .list_chain_txs(&*chain, local_tip.block_id())
.filter(|canonical_tx| !canonical_tx.chain_position.is_confirmed())
.map(|canonical_tx| canonical_tx.tx_node.txid)
.collect::<Vec<Txid>>();
}
}
- let mut graph_update =
- client.sync(spks, txids, outpoints, scan_options.parallel_requests)?;
+ let mut update = client.sync(
+ local_tip,
+ spks,
+ txids,
+ outpoints,
+ scan_options.parallel_requests,
+ )?;
// Update last seen unconfirmed
let now = std::time::UNIX_EPOCH.elapsed().unwrap().as_secs();
- let _ = graph_update.update_last_seen_unconfirmed(now);
+ let _ = update.tx_graph.update_last_seen_unconfirmed(now);
- graph.lock().unwrap().apply_update(graph_update)
+ (
+ chain.lock().unwrap().apply_update(update.local_chain)?,
+ graph.lock().unwrap().apply_update(update.tx_graph),
+ )
}
};
println!();
- // Now that we're done updating the `IndexedTxGraph`, it's time to update the `LocalChain`! We
- // want the `LocalChain` to have data about all the anchors in the `TxGraph` - for this reason,
- // we want retrieve the blocks at the heights of the newly added anchors that are missing from
- // our view of the chain.
- let (missing_block_heights, tip) = {
- let chain = &*chain.lock().unwrap();
- let missing_block_heights = indexed_tx_graph_changeset
- .graph
- .missing_heights_from(chain)
- .collect::<BTreeSet<_>>();
- let tip = chain.tip();
- (missing_block_heights, tip)
- };
-
- println!("prev tip: {}", tip.height());
- println!("missing block heights: {:?}", missing_block_heights);
-
- // Here, we actually fetch the missing blocks and create a `local_chain::Update`.
- let chain_changeset = {
- let chain_update = client
- .update_local_chain(tip, missing_block_heights)
- .context("scanning for blocks")?;
- println!("new tip: {}", chain_update.tip.height());
- chain.lock().unwrap().apply_update(chain_update)?
- };
-
// We persist the changes
let mut db = db.lock().unwrap();
- db.stage((chain_changeset, indexed_tx_graph_changeset));
+ db.stage((local_chain_changeset, indexed_tx_graph_changeset));
db.commit()?;
Ok(())
}
(k, k_spks)
})
.collect();
- let (mut update_graph, last_active_indices) = client
- .full_scan(keychain_spks, STOP_GAP, PARALLEL_REQUESTS)
- .await?;
+ let mut update = client
+ .full_scan(prev_tip, keychain_spks, STOP_GAP, PARALLEL_REQUESTS)
+ .await?;
let now = std::time::UNIX_EPOCH.elapsed().unwrap().as_secs();
- let _ = update_graph.update_last_seen_unconfirmed(now);
- let missing_heights = update_graph.missing_heights(wallet.local_chain());
- let chain_update = client.update_local_chain(prev_tip, missing_heights).await?;
+ let _ = update.tx_graph.update_last_seen_unconfirmed(now);
+
let update = Update {
- last_active_indices,
- graph: update_graph,
- chain: Some(chain_update),
+ last_active_indices: update.last_active_indices,
+ graph: update.tx_graph,
+ chain: Some(update.local_chain),
};
wallet.apply_update(update)?;
wallet.commit()?;
let client =
esplora_client::Builder::new("https://blockstream.info/testnet/api").build_blocking();
- let prev_tip = wallet.latest_checkpoint();
let keychain_spks = wallet
.all_unbounded_spk_iters()
.into_iter()
})
.collect();
- let (mut update_graph, last_active_indices) =
- client.full_scan(keychain_spks, STOP_GAP, PARALLEL_REQUESTS)?;
-
+ let mut update = client.full_scan(
+ wallet.latest_checkpoint(),
+ keychain_spks,
+ STOP_GAP,
+ PARALLEL_REQUESTS,
+ )?;
let now = std::time::UNIX_EPOCH.elapsed().unwrap().as_secs();
- let _ = update_graph.update_last_seen_unconfirmed(now);
- let missing_heights = update_graph.missing_heights(wallet.local_chain());
- let chain_update = client.update_local_chain(prev_tip, missing_heights)?;
- let update = Update {
- last_active_indices,
- graph: update_graph,
- chain: Some(chain_update),
- };
-
- wallet.apply_update(update)?;
+ let _ = update.tx_graph.update_last_seen_unconfirmed(now);
+
+ wallet.apply_update(Update {
+ last_active_indices: update.last_active_indices,
+ graph: update.tx_graph,
+ chain: Some(update.local_chain),
+ })?;
wallet.commit()?;
println!();