//! Helper types for spk-based blockchain clients.
-
use crate::{
- collections::BTreeMap, local_chain::CheckPoint, ConfirmationBlockTime, Indexed, TxGraph,
+ alloc::{boxed::Box, collections::VecDeque, vec::Vec},
+ collections::BTreeMap,
+ local_chain::CheckPoint,
+ ConfirmationBlockTime, Indexed, TxGraph,
};
-use alloc::boxed::Box;
use bitcoin::{OutPoint, Script, ScriptBuf, Txid};
-use core::marker::PhantomData;
-/// Data required to perform a spk-based blockchain client sync.
-///
-/// A client sync fetches relevant chain data for a known list of scripts, transaction ids and
-/// outpoints. The sync process also updates the chain from the given [`CheckPoint`].
-pub struct SyncRequest {
- /// A checkpoint for the current chain [`LocalChain::tip`].
- /// The sync process will return a new chain update that extends this tip.
- ///
- /// [`LocalChain::tip`]: crate::local_chain::LocalChain::tip
- pub chain_tip: CheckPoint,
- /// Transactions that spend from or to these indexed script pubkeys.
- pub spks: Box<dyn ExactSizeIterator<Item = ScriptBuf> + Send>,
- /// Transactions with these txids.
- pub txids: Box<dyn ExactSizeIterator<Item = Txid> + Send>,
- /// Transactions with these outpoints or spent from these outpoints.
- pub outpoints: Box<dyn ExactSizeIterator<Item = OutPoint> + Send>,
-}
-
-impl SyncRequest {
- /// Construct a new [`SyncRequest`] from a given `cp` tip.
- pub fn from_chain_tip(cp: CheckPoint) -> Self {
+type InspectSync<I> = dyn FnMut(SyncItem<I>, SyncProgress) + Send + 'static;
+
+type InspectFullScan<K> = dyn FnMut(K, u32, &Script) + Send + 'static;
+
+/// An item reported to the [`inspect`](SyncRequestBuilder::inspect) closure of [`SyncRequest`].
+#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
+pub enum SyncItem<'i, I> {
+ /// Script pubkey sync item.
+ Spk(I, &'i Script),
+ /// Txid sync item.
+ Txid(Txid),
+ /// Outpoint sync item.
+ OutPoint(OutPoint),
+}
+
+impl<'i, I: core::fmt::Debug + core::any::Any> core::fmt::Display for SyncItem<'i, I> {
+ fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
+ match self {
+ SyncItem::Spk(i, spk) => {
+ if (i as &dyn core::any::Any).is::<()>() {
+ write!(f, "script '{}'", spk)
+ } else {
+ write!(f, "script {:?} '{}'", i, spk)
+ }
+ }
+ SyncItem::Txid(txid) => write!(f, "txid '{}'", txid),
+ SyncItem::OutPoint(op) => write!(f, "outpoint '{}'", op),
+ }
+ }
+}
+
+/// The progress of [`SyncRequest`].
+#[derive(Debug, Clone)]
+pub struct SyncProgress {
+ /// Script pubkeys consumed by the request.
+ pub spks_consumed: usize,
+ /// Script pubkeys remaining in the request.
+ pub spks_remaining: usize,
+ /// Txids consumed by the request.
+ pub txids_consumed: usize,
+ /// Txids remaining in the request.
+ pub txids_remaining: usize,
+ /// Outpoints consumed by the request.
+ pub outpoints_consumed: usize,
+ /// Outpoints remaining in the request.
+ pub outpoints_remaining: usize,
+}
+
+impl SyncProgress {
+ /// Total items, consumed and remaining, of the request.
+ pub fn total(&self) -> usize {
+ self.total_spks() + self.total_txids() + self.total_outpoints()
+ }
+
+ /// Total script pubkeys, consumed and remaining, of the request.
+ pub fn total_spks(&self) -> usize {
+ self.spks_consumed + self.spks_remaining
+ }
+
+ /// Total txids, consumed and remaining, of the request.
+ pub fn total_txids(&self) -> usize {
+ self.txids_consumed + self.txids_remaining
+ }
+
+ /// Total outpoints, consumed and remaining, of the request.
+ pub fn total_outpoints(&self) -> usize {
+ self.outpoints_consumed + self.outpoints_remaining
+ }
+
+ /// Total consumed items of the request.
+ pub fn consumed(&self) -> usize {
+ self.spks_consumed + self.txids_consumed + self.outpoints_consumed
+ }
+
+ /// Total remaining items of the request.
+ pub fn remaining(&self) -> usize {
+ self.spks_remaining + self.txids_remaining + self.outpoints_remaining
+ }
+}
+
+/// Builds a [`SyncRequest`].
+#[must_use]
+pub struct SyncRequestBuilder<SpkLabel = ()> {
+ inner: SyncRequest<SpkLabel>,
+}
+
+impl<SpkLabel> Default for SyncRequestBuilder<SpkLabel> {
+ fn default() -> Self {
Self {
- chain_tip: cp,
- spks: Box::new(core::iter::empty()),
- txids: Box::new(core::iter::empty()),
- outpoints: Box::new(core::iter::empty()),
+ inner: Default::default(),
}
}
+}
- /// Set the [`Script`]s that will be synced against.
- ///
- /// This consumes the [`SyncRequest`] and returns the updated one.
- #[must_use]
- pub fn set_spks(
- mut self,
- spks: impl IntoIterator<IntoIter = impl ExactSizeIterator<Item = ScriptBuf> + Send + 'static>,
+#[cfg(feature = "miniscript")]
+impl<K: Clone + Ord + core::fmt::Debug + Send + Sync> SyncRequestBuilder<(K, u32)> {
+ /// Add [`Script`]s that are revealed by the `indexer` of the given `spk_range` that will be
+ /// synced against.
+ pub fn revealed_spks_from_indexer(
+ self,
+ indexer: &crate::indexer::keychain_txout::KeychainTxOutIndex<K>,
+ spk_range: impl core::ops::RangeBounds<K>,
) -> Self {
- self.spks = Box::new(spks.into_iter());
- self
+ use crate::alloc::borrow::ToOwned;
+ use crate::alloc::vec::Vec;
+ self.spks_with_labels(
+ indexer
+ .revealed_spks(spk_range)
+ .map(|(i, spk)| (i, spk.to_owned()))
+ .collect::<Vec<_>>(),
+ )
}
- /// Set the [`Txid`]s that will be synced against.
- ///
- /// This consumes the [`SyncRequest`] and returns the updated one.
- #[must_use]
- pub fn set_txids(
- mut self,
- txids: impl IntoIterator<IntoIter = impl ExactSizeIterator<Item = Txid> + Send + 'static>,
+ /// Add [`Script`]s that are revealed by the `indexer` but currently unused.
+ pub fn unused_spks_from_indexer(
+ self,
+ indexer: &crate::indexer::keychain_txout::KeychainTxOutIndex<K>,
) -> Self {
- self.txids = Box::new(txids.into_iter());
- self
+ use crate::alloc::borrow::ToOwned;
+ use crate::alloc::vec::Vec;
+ self.spks_with_labels(
+ indexer
+ .unused_spks()
+ .map(|(i, spk)| (i, spk.to_owned()))
+ .collect::<Vec<_>>(),
+ )
}
+}
+
+impl SyncRequestBuilder<()> {
+ /// Add [`Script`]s that will be synced against.
+ pub fn spks(self, spks: impl IntoIterator<Item = ScriptBuf>) -> Self {
+ self.spks_with_labels(spks.into_iter().map(|spk| ((), spk)))
+ }
+}
- /// Set the [`OutPoint`]s that will be synced against.
+impl<SpkLabel> SyncRequestBuilder<SpkLabel> {
+ /// Set the initial chain tip for the sync request.
///
- /// This consumes the [`SyncRequest`] and returns the updated one.
- #[must_use]
- pub fn set_outpoints(
- mut self,
- outpoints: impl IntoIterator<
- IntoIter = impl ExactSizeIterator<Item = OutPoint> + Send + 'static,
- >,
- ) -> Self {
- self.outpoints = Box::new(outpoints.into_iter());
+ /// This is used to update [`LocalChain`](crate::local_chain::LocalChain).
+ pub fn chain_tip(mut self, cp: CheckPoint) -> Self {
+ self.inner.chain_tip = Some(cp);
self
}
- /// Chain on additional [`Script`]s that will be synced against.
- ///
- /// This consumes the [`SyncRequest`] and returns the updated one.
- #[must_use]
- pub fn chain_spks(
+ /// Add [`Script`]s coupled with an associated label that will be synced against.
+ pub fn spks_with_labels(
mut self,
- spks: impl IntoIterator<
- IntoIter = impl ExactSizeIterator<Item = ScriptBuf> + Send + 'static,
- Item = ScriptBuf,
- >,
+ spks: impl IntoIterator<Item = (SpkLabel, ScriptBuf)>,
) -> Self {
- self.spks = Box::new(ExactSizeChain::new(self.spks, spks.into_iter()));
+ self.inner.spks.extend(spks);
self
}
- /// Chain on additional [`Txid`]s that will be synced against.
- ///
- /// This consumes the [`SyncRequest`] and returns the updated one.
- #[must_use]
- pub fn chain_txids(
- mut self,
- txids: impl IntoIterator<
- IntoIter = impl ExactSizeIterator<Item = Txid> + Send + 'static,
- Item = Txid,
- >,
- ) -> Self {
- self.txids = Box::new(ExactSizeChain::new(self.txids, txids.into_iter()));
+ /// Add [`Txid`]s that will be synced against.
+ pub fn txids(mut self, txids: impl IntoIterator<Item = Txid>) -> Self {
+ self.inner.txids.extend(txids);
self
}
- /// Chain on additional [`OutPoint`]s that will be synced against.
- ///
- /// This consumes the [`SyncRequest`] and returns the updated one.
- #[must_use]
- pub fn chain_outpoints(
- mut self,
- outpoints: impl IntoIterator<
- IntoIter = impl ExactSizeIterator<Item = OutPoint> + Send + 'static,
- Item = OutPoint,
- >,
- ) -> Self {
- self.outpoints = Box::new(ExactSizeChain::new(self.outpoints, outpoints.into_iter()));
+ /// Add [`OutPoint`]s that will be synced against.
+ pub fn outpoints(mut self, outpoints: impl IntoIterator<Item = OutPoint>) -> Self {
+ self.inner.outpoints.extend(outpoints);
self
}
- /// Add a closure that will be called for [`Script`]s previously added to this request.
- ///
- /// This consumes the [`SyncRequest`] and returns the updated one.
- #[must_use]
- pub fn inspect_spks(
- mut self,
- mut inspect: impl FnMut(&Script) + Send + Sync + 'static,
- ) -> Self {
- self.spks = Box::new(self.spks.inspect(move |spk| inspect(spk)));
+ /// Set the closure that will inspect every sync item visited.
+ pub fn inspect<F>(mut self, inspect: F) -> Self
+ where
+ F: FnMut(SyncItem<SpkLabel>, SyncProgress) + Send + 'static,
+ {
+ self.inner.inspect = Box::new(inspect);
self
}
- /// Add a closure that will be called for [`Txid`]s previously added to this request.
+ /// Build the [`SyncRequest`].
+ pub fn build(self) -> SyncRequest<SpkLabel> {
+ self.inner
+ }
+}
+
+/// Data required to perform a spk-based blockchain client sync.
+///
+/// A client sync fetches relevant chain data for a known list of scripts, transaction ids and
+/// outpoints. The sync process also updates the chain from the given
+/// [`chain_tip`](SyncRequestBuilder::chain_tip) (if provided).
+///
+/// ```rust
+/// # use bdk_chain::{bitcoin::{hashes::Hash, ScriptBuf}, local_chain::LocalChain};
+/// # let (local_chain, _) = LocalChain::from_genesis_hash(Hash::all_zeros());
+/// # let scripts = [ScriptBuf::default(), ScriptBuf::default()];
+/// # use bdk_chain::spk_client::SyncRequest;
+/// // Construct a sync request.
+/// let sync_request = SyncRequest::builder()
+/// // Provide chain tip of the local wallet.
+/// .chain_tip(local_chain.tip())
+/// // Provide list of scripts to scan for transactions against.
+/// .spks(scripts)
+/// // This is called for every synced item.
+/// .inspect(|item, progress| println!("{} (remaining: {})", item, progress.remaining()))
+/// // Finish constructing the sync request.
+/// .build();
+/// ```
+#[must_use]
+pub struct SyncRequest<I = ()> {
+ chain_tip: Option<CheckPoint>,
+ spks: VecDeque<(I, ScriptBuf)>,
+ spks_consumed: usize,
+ txids: VecDeque<Txid>,
+ txids_consumed: usize,
+ outpoints: VecDeque<OutPoint>,
+ outpoints_consumed: usize,
+ inspect: Box<InspectSync<I>>,
+}
+
+impl<I> Default for SyncRequest<I> {
+ fn default() -> Self {
+ Self {
+ chain_tip: None,
+ spks: VecDeque::new(),
+ spks_consumed: 0,
+ txids: VecDeque::new(),
+ txids_consumed: 0,
+ outpoints: VecDeque::new(),
+ outpoints_consumed: 0,
+ inspect: Box::new(|_, _| {}),
+ }
+ }
+}
+
+impl<I> From<SyncRequestBuilder<I>> for SyncRequest<I> {
+ fn from(builder: SyncRequestBuilder<I>) -> Self {
+ builder.inner
+ }
+}
+
+impl<I> SyncRequest<I> {
+ /// Start building a [`SyncRequest`].
+ pub fn builder() -> SyncRequestBuilder<I> {
+ SyncRequestBuilder {
+ inner: Default::default(),
+ }
+ }
+
+ /// Get the [`SyncProgress`] of this request.
+ pub fn progress(&self) -> SyncProgress {
+ SyncProgress {
+ spks_consumed: self.spks_consumed,
+ spks_remaining: self.spks.len(),
+ txids_consumed: self.txids_consumed,
+ txids_remaining: self.txids.len(),
+ outpoints_consumed: self.outpoints_consumed,
+ outpoints_remaining: self.outpoints.len(),
+ }
+ }
+
+ /// Get the chain tip [`CheckPoint`] of this request (if any).
+ pub fn chain_tip(&self) -> Option<CheckPoint> {
+ self.chain_tip.clone()
+ }
+
+ /// Advances the sync request and returns the next [`ScriptBuf`].
///
- /// This consumes the [`SyncRequest`] and returns the updated one.
- #[must_use]
- pub fn inspect_txids(mut self, mut inspect: impl FnMut(&Txid) + Send + Sync + 'static) -> Self {
- self.txids = Box::new(self.txids.inspect(move |txid| inspect(txid)));
- self
+ /// Returns [`None`] when there are no more scripts remaining in the request.
+ pub fn next_spk(&mut self) -> Option<ScriptBuf> {
+ let (i, spk) = self.spks.pop_front()?;
+ self.spks_consumed += 1;
+ self._call_inspect(SyncItem::Spk(i, spk.as_script()));
+ Some(spk)
}
- /// Add a closure that will be called for [`OutPoint`]s previously added to this request.
+ /// Advances the sync request and returns the next [`Txid`].
///
- /// This consumes the [`SyncRequest`] and returns the updated one.
- #[must_use]
- pub fn inspect_outpoints(
- mut self,
- mut inspect: impl FnMut(&OutPoint) + Send + Sync + 'static,
- ) -> Self {
- self.outpoints = Box::new(self.outpoints.inspect(move |op| inspect(op)));
- self
+ /// Returns [`None`] when there are no more txids remaining in the request.
+ pub fn next_txid(&mut self) -> Option<Txid> {
+ let txid = self.txids.pop_front()?;
+ self.txids_consumed += 1;
+ self._call_inspect(SyncItem::Txid(txid));
+ Some(txid)
}
- /// Populate the request with revealed script pubkeys from `index` with the given `spk_range`.
+ /// Advances the sync request and returns the next [`OutPoint`].
///
- /// This consumes the [`SyncRequest`] and returns the updated one.
- #[cfg(feature = "miniscript")]
- #[must_use]
- pub fn populate_with_revealed_spks<K: Clone + Ord + core::fmt::Debug + Send + Sync>(
- self,
- index: &crate::indexer::keychain_txout::KeychainTxOutIndex<K>,
- spk_range: impl core::ops::RangeBounds<K>,
- ) -> Self {
- use alloc::borrow::ToOwned;
- use alloc::vec::Vec;
- self.chain_spks(
- index
- .revealed_spks(spk_range)
- .map(|(_, spk)| spk.to_owned())
- .collect::<Vec<_>>(),
- )
+ /// Returns [`None`] when there are no more outpoints in the request.
+ pub fn next_outpoint(&mut self) -> Option<OutPoint> {
+ let outpoint = self.outpoints.pop_front()?;
+ self.outpoints_consumed += 1;
+ self._call_inspect(SyncItem::OutPoint(outpoint));
+ Some(outpoint)
+ }
+
+ /// Iterate over [`ScriptBuf`]s contained in this request.
+ pub fn iter_spks(&mut self) -> impl ExactSizeIterator<Item = ScriptBuf> + '_ {
+ SyncIter::<I, ScriptBuf>::new(self)
+ }
+
+ /// Iterate over [`Txid`]s contained in this request.
+ pub fn iter_txids(&mut self) -> impl ExactSizeIterator<Item = Txid> + '_ {
+ SyncIter::<I, Txid>::new(self)
+ }
+
+ /// Iterate over [`OutPoint`]s contained in this request.
+ pub fn iter_outpoints(&mut self) -> impl ExactSizeIterator<Item = OutPoint> + '_ {
+ SyncIter::<I, OutPoint>::new(self)
+ }
+
+ fn _call_inspect(&mut self, item: SyncItem<I>) {
+ let progress = self.progress();
+ (*self.inspect)(item, progress);
}
}
/// Data returned from a spk-based blockchain client sync.
///
/// See also [`SyncRequest`].
+#[must_use]
+#[derive(Debug)]
pub struct SyncResult<A = ConfirmationBlockTime> {
/// The update to apply to the receiving [`TxGraph`].
pub graph_update: TxGraph<A>,
/// The update to apply to the receiving [`LocalChain`](crate::local_chain::LocalChain).
- pub chain_update: CheckPoint,
-}
-
-/// Data required to perform a spk-based blockchain client full scan.
-///
-/// A client full scan iterates through all the scripts for the given keychains, fetching relevant
-/// data until some stop gap number of scripts is found that have no data. This operation is
-/// generally only used when importing or restoring previously used keychains in which the list of
-/// used scripts is not known. The full scan process also updates the chain from the given [`CheckPoint`].
-pub struct FullScanRequest<K> {
- /// A checkpoint for the current [`LocalChain::tip`].
- /// The full scan process will return a new chain update that extends this tip.
- ///
- /// [`LocalChain::tip`]: crate::local_chain::LocalChain::tip
- pub chain_tip: CheckPoint,
- /// Iterators of script pubkeys indexed by the keychain index.
- pub spks_by_keychain: BTreeMap<K, Box<dyn Iterator<Item = Indexed<ScriptBuf>> + Send>>,
+ pub chain_update: Option<CheckPoint>,
}
-impl<K: Ord + Clone> FullScanRequest<K> {
- /// Construct a new [`FullScanRequest`] from a given `chain_tip`.
- #[must_use]
- pub fn from_chain_tip(chain_tip: CheckPoint) -> Self {
+impl<A> Default for SyncResult<A> {
+ fn default() -> Self {
Self {
- chain_tip,
- spks_by_keychain: BTreeMap::new(),
+ graph_update: Default::default(),
+ chain_update: Default::default(),
}
}
+}
- /// Construct a new [`FullScanRequest`] from a given `chain_tip` and `index`.
- ///
- /// Unbounded script pubkey iterators for each keychain (`K`) are extracted using
- /// [`KeychainTxOutIndex::all_unbounded_spk_iters`] and is used to populate the
- /// [`FullScanRequest`].
- ///
- /// [`KeychainTxOutIndex::all_unbounded_spk_iters`]: crate::indexer::keychain_txout::KeychainTxOutIndex::all_unbounded_spk_iters
- #[cfg(feature = "miniscript")]
- #[must_use]
- pub fn from_keychain_txout_index(
- chain_tip: CheckPoint,
- index: &crate::indexer::keychain_txout::KeychainTxOutIndex<K>,
- ) -> Self
- where
- K: core::fmt::Debug,
- {
- let mut req = Self::from_chain_tip(chain_tip);
- for (keychain, spks) in index.all_unbounded_spk_iters() {
- req = req.set_spks_for_keychain(keychain, spks);
+/// Builds a [`FullScanRequest`].
+#[must_use]
+pub struct FullScanRequestBuilder<K> {
+ inner: FullScanRequest<K>,
+}
+
+impl<K> Default for FullScanRequestBuilder<K> {
+ fn default() -> Self {
+ Self {
+ inner: Default::default(),
}
- req
}
+}
- /// Set the [`Script`]s for a given `keychain`.
- ///
- /// This consumes the [`FullScanRequest`] and returns the updated one.
- #[must_use]
- pub fn set_spks_for_keychain(
+#[cfg(feature = "miniscript")]
+impl<K: Ord + Clone + core::fmt::Debug> FullScanRequestBuilder<K> {
+ /// Add spk iterators for each keychain tracked in `indexer`.
+ pub fn spks_from_indexer(
mut self,
- keychain: K,
- spks: impl IntoIterator<IntoIter = impl Iterator<Item = Indexed<ScriptBuf>> + Send + 'static>,
+ indexer: &crate::indexer::keychain_txout::KeychainTxOutIndex<K>,
) -> Self {
- self.spks_by_keychain
- .insert(keychain, Box::new(spks.into_iter()));
+ for (keychain, spks) in indexer.all_unbounded_spk_iters() {
+ self = self.spks_for_keychain(keychain, spks);
+ }
self
}
+}
- /// Chain on additional [`Script`]s that will be synced against.
+impl<K: Ord> FullScanRequestBuilder<K> {
+ /// Set the initial chain tip for the full scan request.
///
- /// This consumes the [`FullScanRequest`] and returns the updated one.
- #[must_use]
- pub fn chain_spks_for_keychain(
+ /// This is used to update [`LocalChain`](crate::local_chain::LocalChain).
+ pub fn chain_tip(mut self, tip: CheckPoint) -> Self {
+ self.inner.chain_tip = Some(tip);
+ self
+ }
+
+ /// Set the spk iterator for a given `keychain`.
+ pub fn spks_for_keychain(
mut self,
keychain: K,
spks: impl IntoIterator<IntoIter = impl Iterator<Item = Indexed<ScriptBuf>> + Send + 'static>,
) -> Self {
- match self.spks_by_keychain.remove(&keychain) {
- // clippy here suggests to remove `into_iter` from `spks.into_iter()`, but doing so
- // results in a compilation error
- #[allow(clippy::useless_conversion)]
- Some(keychain_spks) => self
- .spks_by_keychain
- .insert(keychain, Box::new(keychain_spks.chain(spks.into_iter()))),
- None => self
- .spks_by_keychain
- .insert(keychain, Box::new(spks.into_iter())),
- };
+ self.inner
+ .spks_by_keychain
+ .insert(keychain, Box::new(spks.into_iter()));
self
}
- /// Add a closure that will be called for every [`Script`] previously added to any keychain in
- /// this request.
- ///
- /// This consumes the [`SyncRequest`] and returns the updated one.
- #[must_use]
- pub fn inspect_spks_for_all_keychains(
- mut self,
- inspect: impl FnMut(K, u32, &Script) + Send + Sync + Clone + 'static,
- ) -> Self
+ /// Set the closure that will inspect every sync item visited.
+ pub fn inspect<F>(mut self, inspect: F) -> Self
where
- K: Send + 'static,
+ F: FnMut(K, u32, &Script) + Send + 'static,
{
- for (keychain, spks) in core::mem::take(&mut self.spks_by_keychain) {
- let mut inspect = inspect.clone();
- self.spks_by_keychain.insert(
- keychain.clone(),
- Box::new(spks.inspect(move |(i, spk)| inspect(keychain.clone(), *i, spk))),
- );
- }
+ self.inner.inspect = Box::new(inspect);
self
}
- /// Add a closure that will be called for every [`Script`] previously added to a given
- /// `keychain` in this request.
- ///
- /// This consumes the [`SyncRequest`] and returns the updated one.
- #[must_use]
- pub fn inspect_spks_for_keychain(
- mut self,
- keychain: K,
- mut inspect: impl FnMut(u32, &Script) + Send + Sync + 'static,
- ) -> Self
- where
- K: Send + 'static,
- {
- if let Some(spks) = self.spks_by_keychain.remove(&keychain) {
- self.spks_by_keychain.insert(
- keychain,
- Box::new(spks.inspect(move |(i, spk)| inspect(*i, spk))),
- );
+ /// Build the [`FullScanRequest`].
+ pub fn build(self) -> FullScanRequest<K> {
+ self.inner
+ }
+}
+
+/// Data required to perform a spk-based blockchain client full scan.
+///
+/// A client full scan iterates through all the scripts for the given keychains, fetching relevant
+/// data until some stop gap number of scripts is found that have no data. This operation is
+/// generally only used when importing or restoring previously used keychains in which the list of
+/// used scripts is not known. The full scan process also updates the chain from the given
+/// [`chain_tip`](FullScanRequestBuilder::chain_tip) (if provided).
+#[must_use]
+pub struct FullScanRequest<K> {
+ chain_tip: Option<CheckPoint>,
+ spks_by_keychain: BTreeMap<K, Box<dyn Iterator<Item = Indexed<ScriptBuf>> + Send>>,
+ inspect: Box<InspectFullScan<K>>,
+}
+
+impl<K> From<FullScanRequestBuilder<K>> for FullScanRequest<K> {
+ fn from(builder: FullScanRequestBuilder<K>) -> Self {
+ builder.inner
+ }
+}
+
+impl<K> Default for FullScanRequest<K> {
+ fn default() -> Self {
+ Self {
+ chain_tip: None,
+ spks_by_keychain: Default::default(),
+ inspect: Box::new(|_, _, _| {}),
+ }
+ }
+}
+
+impl<K: Ord + Clone> FullScanRequest<K> {
+ /// Start building a [`FullScanRequest`].
+ pub fn builder() -> FullScanRequestBuilder<K> {
+ FullScanRequestBuilder {
+ inner: Self::default(),
+ }
+ }
+
+ /// Get the chain tip [`CheckPoint`] of this request (if any).
+ pub fn chain_tip(&self) -> Option<CheckPoint> {
+ self.chain_tip.clone()
+ }
+
+ /// List all keychains contained in this request.
+ pub fn keychains(&self) -> Vec<K> {
+ self.spks_by_keychain.keys().cloned().collect()
+ }
+
+ /// Advances the full scan request and returns the next indexed [`ScriptBuf`] of the given
+ /// `keychain`.
+ pub fn next_spk(&mut self, keychain: K) -> Option<Indexed<ScriptBuf>> {
+ self.iter_spks(keychain).next()
+ }
+
+ /// Iterate over indexed [`ScriptBuf`]s contained in this request of the given `keychain`.
+ pub fn iter_spks(&mut self, keychain: K) -> impl Iterator<Item = Indexed<ScriptBuf>> + '_ {
+ let spks = self.spks_by_keychain.get_mut(&keychain);
+ let inspect = &mut self.inspect;
+ KeychainSpkIter {
+ keychain,
+ spks,
+ inspect,
}
- self
}
}
/// Data returned from a spk-based blockchain client full scan.
///
/// See also [`FullScanRequest`].
+#[must_use]
+#[derive(Debug)]
pub struct FullScanResult<K, A = ConfirmationBlockTime> {
/// The update to apply to the receiving [`LocalChain`](crate::local_chain::LocalChain).
pub graph_update: TxGraph<A>,
/// The update to apply to the receiving [`TxGraph`].
- pub chain_update: CheckPoint,
+ pub chain_update: Option<CheckPoint>,
/// Last active indices for the corresponding keychains (`K`).
pub last_active_indices: BTreeMap<K, u32>,
}
-/// A version of [`core::iter::Chain`] which can combine two [`ExactSizeIterator`]s to form a new
-/// [`ExactSizeIterator`].
-///
-/// The danger of this is explained in [the `ExactSizeIterator` docs]
-/// (https://doc.rust-lang.org/core/iter/trait.ExactSizeIterator.html#when-shouldnt-an-adapter-be-exactsizeiterator).
-/// This does not apply here since it would be impossible to scan an item count that overflows
-/// `usize` anyway.
-struct ExactSizeChain<A, B, I> {
- a: Option<A>,
- b: Option<B>,
- i: PhantomData<I>,
-}
-
-impl<A, B, I> ExactSizeChain<A, B, I> {
- fn new(a: A, b: B) -> Self {
- ExactSizeChain {
- a: Some(a),
- b: Some(b),
- i: PhantomData,
+impl<K, A> Default for FullScanResult<K, A> {
+ fn default() -> Self {
+ Self {
+ graph_update: Default::default(),
+ chain_update: Default::default(),
+ last_active_indices: Default::default(),
}
}
}
-impl<A, B, I> Iterator for ExactSizeChain<A, B, I>
-where
- A: Iterator<Item = I>,
- B: Iterator<Item = I>,
-{
- type Item = I;
+struct KeychainSpkIter<'r, K> {
+ keychain: K,
+ spks: Option<&'r mut Box<dyn Iterator<Item = Indexed<ScriptBuf>> + Send>>,
+ inspect: &'r mut Box<InspectFullScan<K>>,
+}
+
+impl<'r, K: Ord + Clone> Iterator for KeychainSpkIter<'r, K> {
+ type Item = Indexed<ScriptBuf>;
fn next(&mut self) -> Option<Self::Item> {
- if let Some(a) = &mut self.a {
- let item = a.next();
- if item.is_some() {
- return item;
- }
- self.a = None;
- }
- if let Some(b) = &mut self.b {
- let item = b.next();
- if item.is_some() {
- return item;
- }
- self.b = None;
+ let (i, spk) = self.spks.as_mut()?.next()?;
+ (*self.inspect)(self.keychain.clone(), i, &spk);
+ Some((i, spk))
+ }
+}
+
+struct SyncIter<'r, I, Item> {
+ request: &'r mut SyncRequest<I>,
+ marker: core::marker::PhantomData<Item>,
+}
+
+impl<'r, I, Item> SyncIter<'r, I, Item> {
+ fn new(request: &'r mut SyncRequest<I>) -> Self {
+ Self {
+ request,
+ marker: core::marker::PhantomData,
}
- None
}
}
-impl<A, B, I> ExactSizeIterator for ExactSizeChain<A, B, I>
-where
- A: ExactSizeIterator<Item = I>,
- B: ExactSizeIterator<Item = I>,
-{
- fn len(&self) -> usize {
- let a_len = self.a.as_ref().map(|a| a.len()).unwrap_or(0);
- let b_len = self.b.as_ref().map(|a| a.len()).unwrap_or(0);
- a_len + b_len
+impl<'r, I, Item> ExactSizeIterator for SyncIter<'r, I, Item> where SyncIter<'r, I, Item>: Iterator {}
+
+impl<'r, I> Iterator for SyncIter<'r, I, ScriptBuf> {
+ type Item = ScriptBuf;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.request.next_spk()
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ let consumed = self.request.spks_consumed;
+ (consumed, Some(consumed))
+ }
+}
+
+impl<'r, I> Iterator for SyncIter<'r, I, Txid> {
+ type Item = Txid;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.request.next_txid()
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ let consumed = self.request.txids_consumed;
+ (consumed, Some(consumed))
+ }
+}
+
+impl<'r, I> Iterator for SyncIter<'r, I, OutPoint> {
+ type Item = OutPoint;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.request.next_outpoint()
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ let consumed = self.request.outpoints_consumed;
+ (consumed, Some(consumed))
}
}
/// [`Wallet.calculate_fee_rate`]: https://docs.rs/bdk_wallet/latest/bdk_wallet/struct.Wallet.html#method.calculate_fee_rate
pub fn full_scan<K: Ord + Clone>(
&self,
- request: FullScanRequest<K>,
+ request: impl Into<FullScanRequest<K>>,
stop_gap: usize,
batch_size: usize,
fetch_prev_txouts: bool,
) -> Result<FullScanResult<K>, Error> {
- let (tip, latest_blocks) =
- fetch_tip_and_latest_blocks(&self.inner, request.chain_tip.clone())?;
- let mut graph_update = TxGraph::<ConfirmationBlockTime>::default();
- let mut last_active_indices = BTreeMap::<K, u32>::new();
+ let mut request: FullScanRequest<K> = request.into();
+
+ let tip_and_latest_blocks = match request.chain_tip() {
+ Some(chain_tip) => Some(fetch_tip_and_latest_blocks(&self.inner, chain_tip)?),
+ None => None,
+ };
- for (keychain, spks) in request.spks_by_keychain {
+ let mut graph_update = TxGraph::<ConfirmationBlockTime>::default();
+ let mut last_active_indices = BTreeMap::<K, u32>::default();
+ for keychain in request.keychains() {
+ let spks = request.iter_spks(keychain.clone());
if let Some(last_active_index) =
self.populate_with_spks(&mut graph_update, spks, stop_gap, batch_size)?
{
}
}
- let chain_update = chain_update(tip, &latest_blocks, graph_update.all_anchors())?;
-
// Fetch previous `TxOut`s for fee calculation if flag is enabled.
if fetch_prev_txouts {
self.fetch_prev_txout(&mut graph_update)?;
}
+ let chain_update = match tip_and_latest_blocks {
+ Some((chain_tip, latest_blocks)) => Some(chain_update(
+ chain_tip,
+ &latest_blocks,
+ graph_update.all_anchors(),
+ )?),
+ _ => None,
+ };
+
Ok(FullScanResult {
graph_update,
chain_update,
/// [`CalculateFeeError::MissingTxOut`]: bdk_chain::tx_graph::CalculateFeeError::MissingTxOut
/// [`Wallet.calculate_fee`]: https://docs.rs/bdk_wallet/latest/bdk_wallet/struct.Wallet.html#method.calculate_fee
/// [`Wallet.calculate_fee_rate`]: https://docs.rs/bdk_wallet/latest/bdk_wallet/struct.Wallet.html#method.calculate_fee_rate
- pub fn sync(
+ pub fn sync<I: 'static>(
&self,
- request: SyncRequest,
+ request: impl Into<SyncRequest<I>>,
batch_size: usize,
fetch_prev_txouts: bool,
) -> Result<SyncResult, Error> {
- let full_scan_req = FullScanRequest::from_chain_tip(request.chain_tip.clone())
- .set_spks_for_keychain((), request.spks.enumerate().map(|(i, spk)| (i as u32, spk)));
- let mut full_scan_res = self.full_scan(full_scan_req, usize::MAX, batch_size, false)?;
- let (tip, latest_blocks) =
- fetch_tip_and_latest_blocks(&self.inner, request.chain_tip.clone())?;
-
- self.populate_with_txids(&mut full_scan_res.graph_update, request.txids)?;
- self.populate_with_outpoints(&mut full_scan_res.graph_update, request.outpoints)?;
-
- let chain_update = chain_update(
- tip,
- &latest_blocks,
- full_scan_res.graph_update.all_anchors(),
- )?;
+ let mut request: SyncRequest<I> = request.into();
+
+ let tip_and_latest_blocks = match request.chain_tip() {
+ Some(chain_tip) => Some(fetch_tip_and_latest_blocks(&self.inner, chain_tip)?),
+ None => None,
+ };
+
+ let full_scan_request = FullScanRequest::builder()
+ .spks_for_keychain(
+ (),
+ request
+ .iter_spks()
+ .enumerate()
+ .map(|(i, spk)| (i as u32, spk))
+ .collect::<Vec<_>>(),
+ )
+ .build();
+ let mut graph_update = self
+ .full_scan(full_scan_request, usize::MAX, batch_size, false)?
+ .graph_update;
+ self.populate_with_txids(&mut graph_update, request.iter_txids())?;
+ self.populate_with_outpoints(&mut graph_update, request.iter_outpoints())?;
// Fetch previous `TxOut`s for fee calculation if flag is enabled.
if fetch_prev_txouts {
- self.fetch_prev_txout(&mut full_scan_res.graph_update)?;
+ self.fetch_prev_txout(&mut graph_update)?;
}
+ let chain_update = match tip_and_latest_blocks {
+ Some((chain_tip, latest_blocks)) => Some(chain_update(
+ chain_tip,
+ &latest_blocks,
+ graph_update.all_anchors(),
+ )?),
+ None => None,
+ };
+
Ok(SyncResult {
+ graph_update,
chain_update,
- graph_update: full_scan_res.graph_update,
})
}
Spks::IntoIter: ExactSizeIterator + Send + 'static,
{
let mut update = client.sync(
- SyncRequest::from_chain_tip(chain.tip()).chain_spks(spks),
+ SyncRequest::builder().chain_tip(chain.tip()).spks(spks),
BATCH_SIZE,
true,
)?;
.as_secs();
let _ = update.graph_update.update_last_seen_unconfirmed(now);
- let _ = chain
- .apply_update(update.chain_update.clone())
- .map_err(|err| anyhow::anyhow!("LocalChain update error: {:?}", err))?;
+ if let Some(chain_update) = update.chain_update.clone() {
+ let _ = chain
+ .apply_update(chain_update)
+ .map_err(|err| anyhow::anyhow!("LocalChain update error: {:?}", err))?;
+ }
let _ = graph.apply_update(update.graph_update.clone());
Ok(update)
let cp_tip = env.make_checkpoint_tip();
let sync_update = {
- let request = SyncRequest::from_chain_tip(cp_tip.clone()).set_spks(misc_spks);
+ let request = SyncRequest::builder()
+ .chain_tip(cp_tip.clone())
+ .spks(misc_spks);
client.sync(request, 1, true)?
};
// A scan with a stop_gap of 3 won't find the transaction, but a scan with a gap limit of 4
// will.
let full_scan_update = {
- let request =
- FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone());
+ let request = FullScanRequest::builder()
+ .chain_tip(cp_tip.clone())
+ .spks_for_keychain(0, spks.clone());
client.full_scan(request, 3, 1, false)?
};
assert!(full_scan_update.graph_update.full_txs().next().is_none());
assert!(full_scan_update.last_active_indices.is_empty());
let full_scan_update = {
- let request =
- FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone());
+ let request = FullScanRequest::builder()
+ .chain_tip(cp_tip.clone())
+ .spks_for_keychain(0, spks.clone());
client.full_scan(request, 4, 1, false)?
};
assert_eq!(
// A scan with gap limit 5 won't find the second transaction, but a scan with gap limit 6 will.
// The last active indice won't be updated in the first case but will in the second one.
let full_scan_update = {
- let request =
- FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone());
+ let request = FullScanRequest::builder()
+ .chain_tip(cp_tip.clone())
+ .spks_for_keychain(0, spks.clone());
client.full_scan(request, 5, 1, false)?
};
let txs: HashSet<_> = full_scan_update
assert!(txs.contains(&txid_4th_addr));
assert_eq!(full_scan_update.last_active_indices[&0], 3);
let full_scan_update = {
- let request =
- FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone());
+ let request = FullScanRequest::builder()
+ .chain_tip(cp_tip.clone())
+ .spks_for_keychain(0, spks.clone());
client.full_scan(request, 6, 1, false)?
};
let txs: HashSet<_> = full_scan_update
let txid = env.send(&addr_to_track, SEND_AMOUNT)?;
env.wait_until_electrum_sees_txid(txid, Duration::from_secs(6))?;
- sync_with_electrum(
+ let _ = sync_with_electrum(
&client,
[spk_to_track.clone()],
&mut recv_chain,
env.mine_blocks(1, None)?;
env.wait_until_electrum_sees_block(Duration::from_secs(6))?;
- sync_with_electrum(
+ let _ = sync_with_electrum(
&client,
[spk_to_track.clone()],
&mut recv_chain,
env.reorg_empty_blocks(1)?;
env.wait_until_electrum_sees_block(Duration::from_secs(6))?;
- sync_with_electrum(
+ let _ = sync_with_electrum(
&client,
[spk_to_track.clone()],
&mut recv_chain,
env.mine_blocks(1, None)?;
env.wait_until_electrum_sees_block(Duration::from_secs(6))?;
- sync_with_electrum(&client, [spk_to_track], &mut recv_chain, &mut recv_graph)?;
+ let _ = sync_with_electrum(&client, [spk_to_track], &mut recv_chain, &mut recv_graph)?;
// Check if balance is correct once transaction is confirmed again.
assert_eq!(
use std::collections::BTreeSet;
-use std::usize;
use async_trait::async_trait;
use bdk_chain::spk_client::{FullScanRequest, FullScanResult, SyncRequest, SyncResult};
/// the maximum number of HTTP requests to make in parallel.
///
/// Refer to [crate-level docs](crate) for more.
- async fn full_scan<K: Ord + Clone + Send>(
+ async fn full_scan<K: Ord + Clone + Send, R: Into<FullScanRequest<K>> + Send>(
&self,
- request: FullScanRequest<K>,
+ request: R,
stop_gap: usize,
parallel_requests: usize,
) -> Result<FullScanResult<K>, Error>;
/// in parallel.
///
/// Refer to [crate-level docs](crate) for more.
- async fn sync(
+ async fn sync<I: Send, R: Into<SyncRequest<I>> + Send>(
&self,
- request: SyncRequest,
+ request: R,
parallel_requests: usize,
) -> Result<SyncResult, Error>;
-
- /// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning
- /// `keychain_spks` against Esplora.
- ///
- /// `keychain_spks` is an *unbounded* indexed-[`ScriptBuf`] iterator that represents scripts
- /// derived from a keychain. The scanning logic stops after a `stop_gap` number of consecutive
- /// scripts with no transaction history is reached. `parallel_requests` specifies the maximum
- /// number of HTTP requests to make in parallel.
- ///
- /// A [`TxGraph`] (containing the fetched transactions and anchors) and the last active
- /// keychain index (if any) is returned. The last active keychain index is the keychain's last
- /// script pubkey that contains a non-empty transaction history.
- ///
- /// Refer to [crate-level docs](crate) for more.
- async fn fetch_txs_with_keychain_spks<I: Iterator<Item = Indexed<ScriptBuf>> + Send>(
- &self,
- keychain_spks: I,
- stop_gap: usize,
- parallel_requests: usize,
- ) -> Result<(TxGraph<ConfirmationBlockTime>, Option<u32>), Error>;
-
- /// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning `spks`
- /// against Esplora.
- ///
- /// Unlike with [`EsploraAsyncExt::fetch_txs_with_keychain_spks`], `spks` must be *bounded* as
- /// all contained scripts will be scanned. `parallel_requests` specifies the maximum number of
- /// HTTP requests to make in parallel.
- ///
- /// Refer to [crate-level docs](crate) for more.
- async fn fetch_txs_with_spks<I: IntoIterator<Item = ScriptBuf> + Send>(
- &self,
- spks: I,
- parallel_requests: usize,
- ) -> Result<TxGraph<ConfirmationBlockTime>, Error>
- where
- I::IntoIter: ExactSizeIterator + Send;
-
- /// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning `txids`
- /// against Esplora.
- ///
- /// `parallel_requests` specifies the maximum number of HTTP requests to make in parallel.
- ///
- /// Refer to [crate-level docs](crate) for more.
- async fn fetch_txs_with_txids<I: IntoIterator<Item = Txid> + Send>(
- &self,
- txids: I,
- parallel_requests: usize,
- ) -> Result<TxGraph<ConfirmationBlockTime>, Error>
- where
- I::IntoIter: ExactSizeIterator + Send;
-
- /// Fetch transactions and [`ConfirmationBlockTime`]s that contain and spend the provided
- /// `outpoints`.
- ///
- /// `parallel_requests` specifies the maximum number of HTTP requests to make in parallel.
- ///
- /// Refer to [crate-level docs](crate) for more.
- async fn fetch_txs_with_outpoints<I: IntoIterator<Item = OutPoint> + Send>(
- &self,
- outpoints: I,
- parallel_requests: usize,
- ) -> Result<TxGraph<ConfirmationBlockTime>, Error>
- where
- I::IntoIter: ExactSizeIterator + Send;
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl EsploraAsyncExt for esplora_client::AsyncClient {
- async fn full_scan<K: Ord + Clone + Send>(
+ async fn full_scan<K: Ord + Clone + Send, R: Into<FullScanRequest<K>> + Send>(
&self,
- request: FullScanRequest<K>,
+ request: R,
stop_gap: usize,
parallel_requests: usize,
) -> Result<FullScanResult<K>, Error> {
- let latest_blocks = fetch_latest_blocks(self).await?;
+ let mut request = request.into();
+ let keychains = request.keychains();
+
+ let chain_tip = request.chain_tip();
+ let latest_blocks = if chain_tip.is_some() {
+ Some(fetch_latest_blocks(self).await?)
+ } else {
+ None
+ };
+
let mut graph_update = TxGraph::default();
let mut last_active_indices = BTreeMap::<K, u32>::new();
- for (keychain, keychain_spks) in request.spks_by_keychain {
- let (tx_graph, last_active_index) = self
- .fetch_txs_with_keychain_spks(keychain_spks, stop_gap, parallel_requests)
- .await?;
+ for keychain in keychains {
+ let keychain_spks = request.iter_spks(keychain.clone());
+ let (tx_graph, last_active_index) =
+ fetch_txs_with_keychain_spks(self, keychain_spks, stop_gap, parallel_requests)
+ .await?;
let _ = graph_update.apply_update(tx_graph);
if let Some(last_active_index) = last_active_index {
last_active_indices.insert(keychain, last_active_index);
}
}
- let chain_update = chain_update(
- self,
- &latest_blocks,
- &request.chain_tip,
- graph_update.all_anchors(),
- )
- .await?;
+
+ let chain_update = match (chain_tip, latest_blocks) {
+ (Some(chain_tip), Some(latest_blocks)) => Some(
+ chain_update(self, &latest_blocks, &chain_tip, graph_update.all_anchors()).await?,
+ ),
+ _ => None,
+ };
+
Ok(FullScanResult {
chain_update,
graph_update,
})
}
- async fn sync(
+ async fn sync<I: Send, R: Into<SyncRequest<I>> + Send>(
&self,
- request: SyncRequest,
+ request: R,
parallel_requests: usize,
) -> Result<SyncResult, Error> {
- let latest_blocks = fetch_latest_blocks(self).await?;
- let mut graph_update = TxGraph::default();
- let _ = graph_update.apply_update(
- self.fetch_txs_with_spks(request.spks, parallel_requests)
- .await?,
- );
+ let mut request = request.into();
+
+ let chain_tip = request.chain_tip();
+ let latest_blocks = if chain_tip.is_some() {
+ Some(fetch_latest_blocks(self).await?)
+ } else {
+ None
+ };
+
+ let mut graph_update = TxGraph::<ConfirmationBlockTime>::default();
+ let _ = graph_update
+ .apply_update(fetch_txs_with_spks(self, request.iter_spks(), parallel_requests).await?);
let _ = graph_update.apply_update(
- self.fetch_txs_with_txids(request.txids, parallel_requests)
- .await?,
+ fetch_txs_with_txids(self, request.iter_txids(), parallel_requests).await?,
);
let _ = graph_update.apply_update(
- self.fetch_txs_with_outpoints(request.outpoints, parallel_requests)
- .await?,
+ fetch_txs_with_outpoints(self, request.iter_outpoints(), parallel_requests).await?,
);
- let chain_update = chain_update(
- self,
- &latest_blocks,
- &request.chain_tip,
- graph_update.all_anchors(),
- )
- .await?;
+
+ let chain_update = match (chain_tip, latest_blocks) {
+ (Some(chain_tip), Some(latest_blocks)) => Some(
+ chain_update(self, &latest_blocks, &chain_tip, graph_update.all_anchors()).await?,
+ ),
+ _ => None,
+ };
+
Ok(SyncResult {
chain_update,
graph_update,
})
}
-
- async fn fetch_txs_with_keychain_spks<I: Iterator<Item = Indexed<ScriptBuf>> + Send>(
- &self,
- mut keychain_spks: I,
- stop_gap: usize,
- parallel_requests: usize,
- ) -> Result<(TxGraph<ConfirmationBlockTime>, Option<u32>), Error> {
- type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>);
-
- let mut tx_graph = TxGraph::default();
- let mut last_index = Option::<u32>::None;
- let mut last_active_index = Option::<u32>::None;
-
- loop {
- let handles = keychain_spks
- .by_ref()
- .take(parallel_requests)
- .map(|(spk_index, spk)| {
- let client = self.clone();
- async move {
- let mut last_seen = None;
- let mut spk_txs = Vec::new();
- loop {
- let txs = client.scripthash_txs(&spk, last_seen).await?;
- let tx_count = txs.len();
- last_seen = txs.last().map(|tx| tx.txid);
- spk_txs.extend(txs);
- if tx_count < 25 {
- break Result::<_, Error>::Ok((spk_index, spk_txs));
- }
- }
- }
- })
- .collect::<FuturesOrdered<_>>();
-
- if handles.is_empty() {
- break;
- }
-
- for (index, txs) in handles.try_collect::<Vec<TxsOfSpkIndex>>().await? {
- last_index = Some(index);
- if !txs.is_empty() {
- last_active_index = Some(index);
- }
- for tx in txs {
- let _ = tx_graph.insert_tx(tx.to_tx());
- insert_anchor_from_status(&mut tx_graph, tx.txid, tx.status);
- insert_prevouts(&mut tx_graph, tx.vin);
- }
- }
-
- let last_index = last_index.expect("Must be set since handles wasn't empty.");
- let gap_limit_reached = if let Some(i) = last_active_index {
- last_index >= i.saturating_add(stop_gap as u32)
- } else {
- last_index + 1 >= stop_gap as u32
- };
- if gap_limit_reached {
- break;
- }
- }
-
- Ok((tx_graph, last_active_index))
- }
-
- async fn fetch_txs_with_spks<I: IntoIterator<Item = ScriptBuf> + Send>(
- &self,
- spks: I,
- parallel_requests: usize,
- ) -> Result<TxGraph<ConfirmationBlockTime>, Error>
- where
- I::IntoIter: ExactSizeIterator + Send,
- {
- self.fetch_txs_with_keychain_spks(
- spks.into_iter().enumerate().map(|(i, spk)| (i as u32, spk)),
- usize::MAX,
- parallel_requests,
- )
- .await
- .map(|(tx_graph, _)| tx_graph)
- }
-
- async fn fetch_txs_with_txids<I: IntoIterator<Item = Txid> + Send>(
- &self,
- txids: I,
- parallel_requests: usize,
- ) -> Result<TxGraph<ConfirmationBlockTime>, Error>
- where
- I::IntoIter: ExactSizeIterator + Send,
- {
- enum EsploraResp {
- TxStatus(TxStatus),
- Tx(Option<Tx>),
- }
-
- let mut tx_graph = TxGraph::default();
- let mut txids = txids.into_iter();
- loop {
- let handles = txids
- .by_ref()
- .take(parallel_requests)
- .map(|txid| {
- let client = self.clone();
- let tx_already_exists = tx_graph.get_tx(txid).is_some();
- async move {
- if tx_already_exists {
- client
- .get_tx_status(&txid)
- .await
- .map(|s| (txid, EsploraResp::TxStatus(s)))
- } else {
- client
- .get_tx_info(&txid)
- .await
- .map(|t| (txid, EsploraResp::Tx(t)))
- }
- }
- })
- .collect::<FuturesOrdered<_>>();
-
- if handles.is_empty() {
- break;
- }
-
- for (txid, resp) in handles.try_collect::<Vec<_>>().await? {
- match resp {
- EsploraResp::TxStatus(status) => {
- insert_anchor_from_status(&mut tx_graph, txid, status);
- }
- EsploraResp::Tx(Some(tx_info)) => {
- let _ = tx_graph.insert_tx(tx_info.to_tx());
- insert_anchor_from_status(&mut tx_graph, txid, tx_info.status);
- insert_prevouts(&mut tx_graph, tx_info.vin);
- }
- _ => continue,
- }
- }
- }
- Ok(tx_graph)
- }
-
- async fn fetch_txs_with_outpoints<I: IntoIterator<Item = OutPoint> + Send>(
- &self,
- outpoints: I,
- parallel_requests: usize,
- ) -> Result<TxGraph<ConfirmationBlockTime>, Error>
- where
- I::IntoIter: ExactSizeIterator + Send,
- {
- let outpoints = outpoints.into_iter().collect::<Vec<_>>();
-
- // make sure txs exists in graph and tx statuses are updated
- // TODO: We should maintain a tx cache (like we do with Electrum).
- let mut tx_graph = self
- .fetch_txs_with_txids(outpoints.iter().map(|op| op.txid), parallel_requests)
- .await?;
-
- // get outpoint spend-statuses
- let mut outpoints = outpoints.into_iter();
- let mut missing_txs = Vec::<Txid>::with_capacity(outpoints.len());
- loop {
- let handles = outpoints
- .by_ref()
- .take(parallel_requests)
- .map(|op| {
- let client = self.clone();
- async move { client.get_output_status(&op.txid, op.vout as _).await }
- })
- .collect::<FuturesOrdered<_>>();
-
- if handles.is_empty() {
- break;
- }
-
- for op_status in handles.try_collect::<Vec<_>>().await?.into_iter().flatten() {
- let spend_txid = match op_status.txid {
- Some(txid) => txid,
- None => continue,
- };
- if tx_graph.get_tx(spend_txid).is_none() {
- missing_txs.push(spend_txid);
- }
- if let Some(spend_status) = op_status.status {
- insert_anchor_from_status(&mut tx_graph, spend_txid, spend_status);
- }
- }
- }
-
- let _ = tx_graph.apply_update(
- self.fetch_txs_with_txids(missing_txs, parallel_requests)
- .await?,
- );
- Ok(tx_graph)
- }
}
/// Fetch latest blocks from Esplora in an atomic call.
Ok(tip)
}
+/// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning
+/// `keychain_spks` against Esplora.
+///
+/// `keychain_spks` is an *unbounded* indexed-[`ScriptBuf`] iterator that represents scripts
+/// derived from a keychain. The scanning logic stops after a `stop_gap` number of consecutive
+/// scripts with no transaction history is reached. `parallel_requests` specifies the maximum
+/// number of HTTP requests to make in parallel.
+///
+/// A [`TxGraph`] (containing the fetched transactions and anchors) and the last active
+/// keychain index (if any) is returned. The last active keychain index is the keychain's last
+/// script pubkey that contains a non-empty transaction history.
+///
+/// Refer to [crate-level docs](crate) for more.
+async fn fetch_txs_with_keychain_spks<I: Iterator<Item = Indexed<ScriptBuf>> + Send>(
+ client: &esplora_client::AsyncClient,
+ mut keychain_spks: I,
+ stop_gap: usize,
+ parallel_requests: usize,
+) -> Result<(TxGraph<ConfirmationBlockTime>, Option<u32>), Error> {
+ type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>);
+
+ let mut tx_graph = TxGraph::default();
+ let mut last_index = Option::<u32>::None;
+ let mut last_active_index = Option::<u32>::None;
+
+ loop {
+ let handles = keychain_spks
+ .by_ref()
+ .take(parallel_requests)
+ .map(|(spk_index, spk)| {
+ let client = client.clone();
+ async move {
+ let mut last_seen = None;
+ let mut spk_txs = Vec::new();
+ loop {
+ let txs = client.scripthash_txs(&spk, last_seen).await?;
+ let tx_count = txs.len();
+ last_seen = txs.last().map(|tx| tx.txid);
+ spk_txs.extend(txs);
+ if tx_count < 25 {
+ break Result::<_, Error>::Ok((spk_index, spk_txs));
+ }
+ }
+ }
+ })
+ .collect::<FuturesOrdered<_>>();
+
+ if handles.is_empty() {
+ break;
+ }
+
+ for (index, txs) in handles.try_collect::<Vec<TxsOfSpkIndex>>().await? {
+ last_index = Some(index);
+ if !txs.is_empty() {
+ last_active_index = Some(index);
+ }
+ for tx in txs {
+ let _ = tx_graph.insert_tx(tx.to_tx());
+ insert_anchor_from_status(&mut tx_graph, tx.txid, tx.status);
+ insert_prevouts(&mut tx_graph, tx.vin);
+ }
+ }
+
+ let last_index = last_index.expect("Must be set since handles wasn't empty.");
+ let gap_limit_reached = if let Some(i) = last_active_index {
+ last_index >= i.saturating_add(stop_gap as u32)
+ } else {
+ last_index + 1 >= stop_gap as u32
+ };
+ if gap_limit_reached {
+ break;
+ }
+ }
+
+ Ok((tx_graph, last_active_index))
+}
+
+/// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning `spks`
+/// against Esplora.
+///
+/// Unlike with [`EsploraAsyncExt::fetch_txs_with_keychain_spks`], `spks` must be *bounded* as
+/// all contained scripts will be scanned. `parallel_requests` specifies the maximum number of
+/// HTTP requests to make in parallel.
+///
+/// Refer to [crate-level docs](crate) for more.
+async fn fetch_txs_with_spks<I: IntoIterator<Item = ScriptBuf> + Send>(
+ client: &esplora_client::AsyncClient,
+ spks: I,
+ parallel_requests: usize,
+) -> Result<TxGraph<ConfirmationBlockTime>, Error>
+where
+ I::IntoIter: Send,
+{
+ fetch_txs_with_keychain_spks(
+ client,
+ spks.into_iter().enumerate().map(|(i, spk)| (i as u32, spk)),
+ usize::MAX,
+ parallel_requests,
+ )
+ .await
+ .map(|(tx_graph, _)| tx_graph)
+}
+
+/// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning `txids`
+/// against Esplora.
+///
+/// `parallel_requests` specifies the maximum number of HTTP requests to make in parallel.
+///
+/// Refer to [crate-level docs](crate) for more.
+async fn fetch_txs_with_txids<I: IntoIterator<Item = Txid> + Send>(
+ client: &esplora_client::AsyncClient,
+ txids: I,
+ parallel_requests: usize,
+) -> Result<TxGraph<ConfirmationBlockTime>, Error>
+where
+ I::IntoIter: Send,
+{
+ enum EsploraResp {
+ TxStatus(TxStatus),
+ Tx(Option<Tx>),
+ }
+
+ let mut tx_graph = TxGraph::default();
+ let mut txids = txids.into_iter();
+ loop {
+ let handles = txids
+ .by_ref()
+ .take(parallel_requests)
+ .map(|txid| {
+ let client = client.clone();
+ let tx_already_exists = tx_graph.get_tx(txid).is_some();
+ async move {
+ if tx_already_exists {
+ client
+ .get_tx_status(&txid)
+ .await
+ .map(|s| (txid, EsploraResp::TxStatus(s)))
+ } else {
+ client
+ .get_tx_info(&txid)
+ .await
+ .map(|t| (txid, EsploraResp::Tx(t)))
+ }
+ }
+ })
+ .collect::<FuturesOrdered<_>>();
+
+ if handles.is_empty() {
+ break;
+ }
+
+ for (txid, resp) in handles.try_collect::<Vec<_>>().await? {
+ match resp {
+ EsploraResp::TxStatus(status) => {
+ insert_anchor_from_status(&mut tx_graph, txid, status);
+ }
+ EsploraResp::Tx(Some(tx_info)) => {
+ let _ = tx_graph.insert_tx(tx_info.to_tx());
+ insert_anchor_from_status(&mut tx_graph, txid, tx_info.status);
+ insert_prevouts(&mut tx_graph, tx_info.vin);
+ }
+ _ => continue,
+ }
+ }
+ }
+ Ok(tx_graph)
+}
+
+/// Fetch transactions and [`ConfirmationBlockTime`]s that contain and spend the provided
+/// `outpoints`.
+///
+/// `parallel_requests` specifies the maximum number of HTTP requests to make in parallel.
+///
+/// Refer to [crate-level docs](crate) for more.
+async fn fetch_txs_with_outpoints<I: IntoIterator<Item = OutPoint> + Send>(
+ client: &esplora_client::AsyncClient,
+ outpoints: I,
+ parallel_requests: usize,
+) -> Result<TxGraph<ConfirmationBlockTime>, Error>
+where
+ I::IntoIter: Send,
+{
+ let outpoints = outpoints.into_iter().collect::<Vec<_>>();
+
+ // make sure txs exists in graph and tx statuses are updated
+ // TODO: We should maintain a tx cache (like we do with Electrum).
+ let mut tx_graph = fetch_txs_with_txids(
+ client,
+ outpoints.iter().copied().map(|op| op.txid),
+ parallel_requests,
+ )
+ .await?;
+
+ // get outpoint spend-statuses
+ let mut outpoints = outpoints.into_iter();
+ let mut missing_txs = Vec::<Txid>::with_capacity(outpoints.len());
+ loop {
+ let handles = outpoints
+ .by_ref()
+ .take(parallel_requests)
+ .map(|op| {
+ let client = client.clone();
+ async move { client.get_output_status(&op.txid, op.vout as _).await }
+ })
+ .collect::<FuturesOrdered<_>>();
+
+ if handles.is_empty() {
+ break;
+ }
+
+ for op_status in handles.try_collect::<Vec<_>>().await?.into_iter().flatten() {
+ let spend_txid = match op_status.txid {
+ Some(txid) => txid,
+ None => continue,
+ };
+ if tx_graph.get_tx(spend_txid).is_none() {
+ missing_txs.push(spend_txid);
+ }
+ if let Some(spend_status) = op_status.status {
+ insert_anchor_from_status(&mut tx_graph, spend_txid, spend_status);
+ }
+ }
+ }
+
+ let _ =
+ tx_graph.apply_update(fetch_txs_with_txids(client, missing_txs, parallel_requests).await?);
+ Ok(tx_graph)
+}
+
#[cfg(test)]
mod test {
use std::{collections::BTreeSet, time::Duration};
/// the maximum number of HTTP requests to make in parallel.
///
/// Refer to [crate-level docs](crate) for more.
- fn full_scan<K: Ord + Clone>(
+ fn full_scan<K: Ord + Clone, R: Into<FullScanRequest<K>>>(
&self,
- request: FullScanRequest<K>,
+ request: R,
stop_gap: usize,
parallel_requests: usize,
) -> Result<FullScanResult<K>, Error>;
/// in parallel.
///
/// Refer to [crate-level docs](crate) for more.
- fn sync(&self, request: SyncRequest, parallel_requests: usize) -> Result<SyncResult, Error>;
-
- /// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning
- /// `keychain_spks` against Esplora.
- ///
- /// `keychain_spks` is an *unbounded* indexed-[`ScriptBuf`] iterator that represents scripts
- /// derived from a keychain. The scanning logic stops after a `stop_gap` number of consecutive
- /// scripts with no transaction history is reached. `parallel_requests` specifies the maximum
- /// number of HTTP requests to make in parallel.
- ///
- /// A [`TxGraph`] (containing the fetched transactions and anchors) and the last active keychain
- /// index (if any) is returned. The last active keychain index is the keychain's last script
- /// pubkey that contains a non-empty transaction history.
- ///
- /// Refer to [crate-level docs](crate) for more.
- fn fetch_txs_with_keychain_spks<I: Iterator<Item = Indexed<ScriptBuf>>>(
- &self,
- keychain_spks: I,
- stop_gap: usize,
- parallel_requests: usize,
- ) -> Result<(TxGraph<ConfirmationBlockTime>, Option<u32>), Error>;
-
- /// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning `spks`
- /// against Esplora.
- ///
- /// Unlike with [`EsploraExt::fetch_txs_with_keychain_spks`], `spks` must be *bounded* as all
- /// contained scripts will be scanned. `parallel_requests` specifies the maximum number of HTTP
- /// requests to make in parallel.
- ///
- /// Refer to [crate-level docs](crate) for more.
- fn fetch_txs_with_spks<I: IntoIterator<Item = ScriptBuf>>(
+ fn sync<I: 'static, R: Into<SyncRequest<I>>>(
&self,
- spks: I,
+ request: R,
parallel_requests: usize,
- ) -> Result<TxGraph<ConfirmationBlockTime>, Error>
- where
- I::IntoIter: ExactSizeIterator;
-
- /// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning `txids`
- /// against Esplora.
- ///
- /// `parallel_requests` specifies the maximum number of HTTP requests to make in parallel.
- ///
- /// Refer to [crate-level docs](crate) for more.
- fn fetch_txs_with_txids<I: IntoIterator<Item = Txid>>(
- &self,
- txids: I,
- parallel_requests: usize,
- ) -> Result<TxGraph<ConfirmationBlockTime>, Error>
- where
- I::IntoIter: ExactSizeIterator;
-
- /// Fetch transactions and [`ConfirmationBlockTime`]s that contain and spend the provided
- /// `outpoints`.
- ///
- /// `parallel_requests` specifies the maximum number of HTTP requests to make in parallel.
- ///
- /// Refer to [crate-level docs](crate) for more.
- fn fetch_txs_with_outpoints<I: IntoIterator<Item = OutPoint>>(
- &self,
- outpoints: I,
- parallel_requests: usize,
- ) -> Result<TxGraph<ConfirmationBlockTime>, Error>
- where
- I::IntoIter: ExactSizeIterator;
+ ) -> Result<SyncResult, Error>;
}
impl EsploraExt for esplora_client::BlockingClient {
- fn full_scan<K: Ord + Clone>(
+ fn full_scan<K: Ord + Clone, R: Into<FullScanRequest<K>>>(
&self,
- request: FullScanRequest<K>,
+ request: R,
stop_gap: usize,
parallel_requests: usize,
) -> Result<FullScanResult<K>, Error> {
- let latest_blocks = fetch_latest_blocks(self)?;
+ let mut request = request.into();
+
+ let chain_tip = request.chain_tip();
+ let latest_blocks = if chain_tip.is_some() {
+ Some(fetch_latest_blocks(self)?)
+ } else {
+ None
+ };
+
let mut graph_update = TxGraph::default();
let mut last_active_indices = BTreeMap::<K, u32>::new();
- for (keychain, keychain_spks) in request.spks_by_keychain {
+ for keychain in request.keychains() {
+ let keychain_spks = request.iter_spks(keychain.clone());
let (tx_graph, last_active_index) =
- self.fetch_txs_with_keychain_spks(keychain_spks, stop_gap, parallel_requests)?;
+ fetch_txs_with_keychain_spks(self, keychain_spks, stop_gap, parallel_requests)?;
let _ = graph_update.apply_update(tx_graph);
if let Some(last_active_index) = last_active_index {
last_active_indices.insert(keychain, last_active_index);
}
}
- let chain_update = chain_update(
- self,
- &latest_blocks,
- &request.chain_tip,
- graph_update.all_anchors(),
- )?;
+
+ let chain_update = match (chain_tip, latest_blocks) {
+ (Some(chain_tip), Some(latest_blocks)) => Some(chain_update(
+ self,
+ &latest_blocks,
+ &chain_tip,
+ graph_update.all_anchors(),
+ )?),
+ _ => None,
+ };
+
Ok(FullScanResult {
chain_update,
graph_update,
})
}
- fn sync(&self, request: SyncRequest, parallel_requests: usize) -> Result<SyncResult, Error> {
- let latest_blocks = fetch_latest_blocks(self)?;
- let mut graph_update = TxGraph::default();
- let _ =
- graph_update.apply_update(self.fetch_txs_with_spks(request.spks, parallel_requests)?);
- let _ =
- graph_update.apply_update(self.fetch_txs_with_txids(request.txids, parallel_requests)?);
- let _ = graph_update
- .apply_update(self.fetch_txs_with_outpoints(request.outpoints, parallel_requests)?);
- let chain_update = chain_update(
- self,
- &latest_blocks,
- &request.chain_tip,
- graph_update.all_anchors(),
- )?;
- Ok(SyncResult {
- chain_update,
- graph_update,
- })
- }
-
- fn fetch_txs_with_keychain_spks<I: Iterator<Item = Indexed<ScriptBuf>>>(
+ fn sync<I: 'static, R: Into<SyncRequest<I>>>(
&self,
- mut keychain_spks: I,
- stop_gap: usize,
+ request: R,
parallel_requests: usize,
- ) -> Result<(TxGraph<ConfirmationBlockTime>, Option<u32>), Error> {
- type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>);
-
- let mut tx_graph = TxGraph::default();
- let mut last_index = Option::<u32>::None;
- let mut last_active_index = Option::<u32>::None;
-
- loop {
- let handles = keychain_spks
- .by_ref()
- .take(parallel_requests)
- .map(|(spk_index, spk)| {
- std::thread::spawn({
- let client = self.clone();
- move || -> Result<TxsOfSpkIndex, Error> {
- let mut last_seen = None;
- let mut spk_txs = Vec::new();
- loop {
- let txs = client.scripthash_txs(&spk, last_seen)?;
- let tx_count = txs.len();
- last_seen = txs.last().map(|tx| tx.txid);
- spk_txs.extend(txs);
- if tx_count < 25 {
- break Ok((spk_index, spk_txs));
- }
- }
- }
- })
- })
- .collect::<Vec<JoinHandle<Result<TxsOfSpkIndex, Error>>>>();
-
- if handles.is_empty() {
- break;
- }
+ ) -> Result<SyncResult, Error> {
+ let mut request: SyncRequest<I> = request.into();
- for handle in handles {
- let (index, txs) = handle.join().expect("thread must not panic")?;
- last_index = Some(index);
- if !txs.is_empty() {
- last_active_index = Some(index);
- }
- for tx in txs {
- let _ = tx_graph.insert_tx(tx.to_tx());
- insert_anchor_from_status(&mut tx_graph, tx.txid, tx.status);
- insert_prevouts(&mut tx_graph, tx.vin);
- }
- }
-
- let last_index = last_index.expect("Must be set since handles wasn't empty.");
- let gap_limit_reached = if let Some(i) = last_active_index {
- last_index >= i.saturating_add(stop_gap as u32)
- } else {
- last_index + 1 >= stop_gap as u32
- };
- if gap_limit_reached {
- break;
- }
- }
-
- Ok((tx_graph, last_active_index))
- }
+ let chain_tip = request.chain_tip();
+ let latest_blocks = if chain_tip.is_some() {
+ Some(fetch_latest_blocks(self)?)
+ } else {
+ None
+ };
- fn fetch_txs_with_spks<I: IntoIterator<Item = ScriptBuf>>(
- &self,
- spks: I,
- parallel_requests: usize,
- ) -> Result<TxGraph<ConfirmationBlockTime>, Error>
- where
- I::IntoIter: ExactSizeIterator,
- {
- self.fetch_txs_with_keychain_spks(
- spks.into_iter().enumerate().map(|(i, spk)| (i as u32, spk)),
- usize::MAX,
+ let mut graph_update = TxGraph::default();
+ let _ = graph_update.apply_update(fetch_txs_with_spks(
+ self,
+ request.iter_spks(),
parallel_requests,
- )
- .map(|(tx_graph, _)| tx_graph)
- }
-
- fn fetch_txs_with_txids<I: IntoIterator<Item = Txid>>(
- &self,
- txids: I,
- parallel_requests: usize,
- ) -> Result<TxGraph<ConfirmationBlockTime>, Error>
- where
- I::IntoIter: ExactSizeIterator,
- {
- enum EsploraResp {
- TxStatus(TxStatus),
- Tx(Option<Tx>),
- }
-
- let mut tx_graph = TxGraph::default();
- let mut txids = txids.into_iter();
- loop {
- let handles = txids
- .by_ref()
- .take(parallel_requests)
- .map(|txid| {
- let client = self.clone();
- let tx_already_exists = tx_graph.get_tx(txid).is_some();
- std::thread::spawn(move || {
- if tx_already_exists {
- client
- .get_tx_status(&txid)
- .map_err(Box::new)
- .map(|s| (txid, EsploraResp::TxStatus(s)))
- } else {
- client
- .get_tx_info(&txid)
- .map_err(Box::new)
- .map(|t| (txid, EsploraResp::Tx(t)))
- }
- })
- })
- .collect::<Vec<JoinHandle<Result<(Txid, EsploraResp), Error>>>>();
-
- if handles.is_empty() {
- break;
- }
-
- for handle in handles {
- let (txid, resp) = handle.join().expect("thread must not panic")?;
- match resp {
- EsploraResp::TxStatus(status) => {
- insert_anchor_from_status(&mut tx_graph, txid, status);
- }
- EsploraResp::Tx(Some(tx_info)) => {
- let _ = tx_graph.insert_tx(tx_info.to_tx());
- insert_anchor_from_status(&mut tx_graph, txid, tx_info.status);
- insert_prevouts(&mut tx_graph, tx_info.vin);
- }
- _ => continue,
- }
- }
- }
- Ok(tx_graph)
- }
-
- fn fetch_txs_with_outpoints<I: IntoIterator<Item = OutPoint>>(
- &self,
- outpoints: I,
- parallel_requests: usize,
- ) -> Result<TxGraph<ConfirmationBlockTime>, Error>
- where
- I::IntoIter: ExactSizeIterator,
- {
- let outpoints = outpoints.into_iter().collect::<Vec<_>>();
-
- // make sure txs exists in graph and tx statuses are updated
- // TODO: We should maintain a tx cache (like we do with Electrum).
- let mut tx_graph =
- self.fetch_txs_with_txids(outpoints.iter().map(|op| op.txid), parallel_requests)?;
-
- // get outpoint spend-statuses
- let mut outpoints = outpoints.into_iter();
- let mut missing_txs = Vec::<Txid>::with_capacity(outpoints.len());
- loop {
- let handles = outpoints
- .by_ref()
- .take(parallel_requests)
- .map(|op| {
- let client = self.clone();
- std::thread::spawn(move || {
- client
- .get_output_status(&op.txid, op.vout as _)
- .map_err(Box::new)
- })
- })
- .collect::<Vec<JoinHandle<Result<Option<OutputStatus>, Error>>>>();
-
- if handles.is_empty() {
- break;
- }
-
- for handle in handles {
- if let Some(op_status) = handle.join().expect("thread must not panic")? {
- let spend_txid = match op_status.txid {
- Some(txid) => txid,
- None => continue,
- };
- if tx_graph.get_tx(spend_txid).is_none() {
- missing_txs.push(spend_txid);
- }
- if let Some(spend_status) = op_status.status {
- insert_anchor_from_status(&mut tx_graph, spend_txid, spend_status);
- }
- }
- }
- }
+ )?);
+ let _ = graph_update.apply_update(fetch_txs_with_txids(
+ self,
+ request.iter_txids(),
+ parallel_requests,
+ )?);
+ let _ = graph_update.apply_update(fetch_txs_with_outpoints(
+ self,
+ request.iter_outpoints(),
+ parallel_requests,
+ )?);
+
+ let chain_update = match (chain_tip, latest_blocks) {
+ (Some(chain_tip), Some(latest_blocks)) => Some(chain_update(
+ self,
+ &latest_blocks,
+ &chain_tip,
+ graph_update.all_anchors(),
+ )?),
+ _ => None,
+ };
- let _ = tx_graph.apply_update(self.fetch_txs_with_txids(missing_txs, parallel_requests)?);
- Ok(tx_graph)
+ Ok(SyncResult {
+ chain_update,
+ graph_update,
+ })
}
}
Ok(tip)
}
+fn fetch_txs_with_keychain_spks<I: Iterator<Item = Indexed<ScriptBuf>>>(
+ client: &esplora_client::BlockingClient,
+ mut keychain_spks: I,
+ stop_gap: usize,
+ parallel_requests: usize,
+) -> Result<(TxGraph<ConfirmationBlockTime>, Option<u32>), Error> {
+ type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>);
+
+ let mut tx_graph = TxGraph::default();
+ let mut last_index = Option::<u32>::None;
+ let mut last_active_index = Option::<u32>::None;
+
+ loop {
+ let handles = keychain_spks
+ .by_ref()
+ .take(parallel_requests)
+ .map(|(spk_index, spk)| {
+ std::thread::spawn({
+ let client = client.clone();
+ move || -> Result<TxsOfSpkIndex, Error> {
+ let mut last_seen = None;
+ let mut spk_txs = Vec::new();
+ loop {
+ let txs = client.scripthash_txs(&spk, last_seen)?;
+ let tx_count = txs.len();
+ last_seen = txs.last().map(|tx| tx.txid);
+ spk_txs.extend(txs);
+ if tx_count < 25 {
+ break Ok((spk_index, spk_txs));
+ }
+ }
+ }
+ })
+ })
+ .collect::<Vec<JoinHandle<Result<TxsOfSpkIndex, Error>>>>();
+
+ if handles.is_empty() {
+ break;
+ }
+
+ for handle in handles {
+ let (index, txs) = handle.join().expect("thread must not panic")?;
+ last_index = Some(index);
+ if !txs.is_empty() {
+ last_active_index = Some(index);
+ }
+ for tx in txs {
+ let _ = tx_graph.insert_tx(tx.to_tx());
+ insert_anchor_from_status(&mut tx_graph, tx.txid, tx.status);
+ insert_prevouts(&mut tx_graph, tx.vin);
+ }
+ }
+
+ let last_index = last_index.expect("Must be set since handles wasn't empty.");
+ let gap_limit_reached = if let Some(i) = last_active_index {
+ last_index >= i.saturating_add(stop_gap as u32)
+ } else {
+ last_index + 1 >= stop_gap as u32
+ };
+ if gap_limit_reached {
+ break;
+ }
+ }
+
+ Ok((tx_graph, last_active_index))
+}
+
+/// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning `spks`
+/// against Esplora.
+///
+/// Unlike with [`EsploraExt::fetch_txs_with_keychain_spks`], `spks` must be *bounded* as all
+/// contained scripts will be scanned. `parallel_requests` specifies the maximum number of HTTP
+/// requests to make in parallel.
+///
+/// Refer to [crate-level docs](crate) for more.
+fn fetch_txs_with_spks<I: IntoIterator<Item = ScriptBuf>>(
+ client: &esplora_client::BlockingClient,
+ spks: I,
+ parallel_requests: usize,
+) -> Result<TxGraph<ConfirmationBlockTime>, Error> {
+ fetch_txs_with_keychain_spks(
+ client,
+ spks.into_iter().enumerate().map(|(i, spk)| (i as u32, spk)),
+ usize::MAX,
+ parallel_requests,
+ )
+ .map(|(tx_graph, _)| tx_graph)
+}
+
+/// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning `txids`
+/// against Esplora.
+///
+/// `parallel_requests` specifies the maximum number of HTTP requests to make in parallel.
+///
+/// Refer to [crate-level docs](crate) for more.
+fn fetch_txs_with_txids<I: IntoIterator<Item = Txid>>(
+ client: &esplora_client::BlockingClient,
+ txids: I,
+ parallel_requests: usize,
+) -> Result<TxGraph<ConfirmationBlockTime>, Error> {
+ enum EsploraResp {
+ TxStatus(TxStatus),
+ Tx(Option<Tx>),
+ }
+
+ let mut tx_graph = TxGraph::default();
+ let mut txids = txids.into_iter();
+ loop {
+ let handles = txids
+ .by_ref()
+ .take(parallel_requests)
+ .map(|txid| {
+ let client = client.clone();
+ let tx_already_exists = tx_graph.get_tx(txid).is_some();
+ std::thread::spawn(move || {
+ if tx_already_exists {
+ client
+ .get_tx_status(&txid)
+ .map_err(Box::new)
+ .map(|s| (txid, EsploraResp::TxStatus(s)))
+ } else {
+ client
+ .get_tx_info(&txid)
+ .map_err(Box::new)
+ .map(|t| (txid, EsploraResp::Tx(t)))
+ }
+ })
+ })
+ .collect::<Vec<JoinHandle<Result<(Txid, EsploraResp), Error>>>>();
+
+ if handles.is_empty() {
+ break;
+ }
+
+ for handle in handles {
+ let (txid, resp) = handle.join().expect("thread must not panic")?;
+ match resp {
+ EsploraResp::TxStatus(status) => {
+ insert_anchor_from_status(&mut tx_graph, txid, status);
+ }
+ EsploraResp::Tx(Some(tx_info)) => {
+ let _ = tx_graph.insert_tx(tx_info.to_tx());
+ insert_anchor_from_status(&mut tx_graph, txid, tx_info.status);
+ insert_prevouts(&mut tx_graph, tx_info.vin);
+ }
+ _ => continue,
+ }
+ }
+ }
+ Ok(tx_graph)
+}
+
+/// Fetch transactions and [`ConfirmationBlockTime`]s that contain and spend the provided
+/// `outpoints`.
+///
+/// `parallel_requests` specifies the maximum number of HTTP requests to make in parallel.
+///
+/// Refer to [crate-level docs](crate) for more.
+fn fetch_txs_with_outpoints<I: IntoIterator<Item = OutPoint>>(
+ client: &esplora_client::BlockingClient,
+ outpoints: I,
+ parallel_requests: usize,
+) -> Result<TxGraph<ConfirmationBlockTime>, Error> {
+ let outpoints = outpoints.into_iter().collect::<Vec<_>>();
+
+ // make sure txs exists in graph and tx statuses are updated
+ // TODO: We should maintain a tx cache (like we do with Electrum).
+ let mut tx_graph = fetch_txs_with_txids(
+ client,
+ outpoints.iter().map(|op| op.txid),
+ parallel_requests,
+ )?;
+
+ // get outpoint spend-statuses
+ let mut outpoints = outpoints.into_iter();
+ let mut missing_txs = Vec::<Txid>::with_capacity(outpoints.len());
+ loop {
+ let handles = outpoints
+ .by_ref()
+ .take(parallel_requests)
+ .map(|op| {
+ let client = client.clone();
+ std::thread::spawn(move || {
+ client
+ .get_output_status(&op.txid, op.vout as _)
+ .map_err(Box::new)
+ })
+ })
+ .collect::<Vec<JoinHandle<Result<Option<OutputStatus>, Error>>>>();
+
+ if handles.is_empty() {
+ break;
+ }
+
+ for handle in handles {
+ if let Some(op_status) = handle.join().expect("thread must not panic")? {
+ let spend_txid = match op_status.txid {
+ Some(txid) => txid,
+ None => continue,
+ };
+ if tx_graph.get_tx(spend_txid).is_none() {
+ missing_txs.push(spend_txid);
+ }
+ if let Some(spend_status) = op_status.status {
+ insert_anchor_from_status(&mut tx_graph, spend_txid, spend_status);
+ }
+ }
+ }
+ }
+
+ let _ = tx_graph.apply_update(fetch_txs_with_txids(
+ client,
+ missing_txs,
+ parallel_requests,
+ )?);
+ Ok(tx_graph)
+}
+
#[cfg(test)]
mod test {
use crate::blocking_ext::{chain_update, fetch_latest_blocks};
#![doc = include_str!("../README.md")]
-//! # Low-Level Methods
-//!
-//! [`EsploraExt::sync`] and [`EsploraExt::full_scan`] returns updates which are *complete* and can
-//! be used directly to determine confirmation statuses of each transaction. This is because a
-//! [`LocalChain`] update is contained in the returned update structures. However, sometimes the
-//! caller wishes to use a custom [`ChainOracle`] implementation (something other than
-//! [`LocalChain`]). The following methods ONLY returns an update [`TxGraph`]:
-//!
-//! * [`EsploraExt::fetch_txs_with_keychain_spks`]
-//! * [`EsploraExt::fetch_txs_with_spks`]
-//! * [`EsploraExt::fetch_txs_with_txids`]
-//! * [`EsploraExt::fetch_txs_with_outpoints`]
-//!
//! # Stop Gap
//!
-//! Methods [`EsploraExt::full_scan`] and [`EsploraExt::fetch_txs_with_keychain_spks`] takes in a
-//! `stop_gap` input which is defined as the maximum number of consecutive unused script pubkeys to
-//! scan transactions for before stopping.
+//! [`EsploraExt::full_scan`] takes in a `stop_gap` input which is defined as the maximum number of
+//! consecutive unused script pubkeys to scan transactions for before stopping.
//!
//! For example, with a `stop_gap` of 3, `full_scan` will keep scanning until it encounters 3
//! consecutive script pubkeys with no associated transactions.
let cp_tip = env.make_checkpoint_tip();
let sync_update = {
- let request = SyncRequest::from_chain_tip(cp_tip.clone()).set_spks(misc_spks);
+ let request = SyncRequest::builder()
+ .chain_tip(cp_tip.clone())
+ .spks(misc_spks);
client.sync(request, 1).await?
};
// A scan with a gap limit of 3 won't find the transaction, but a scan with a gap limit of 4
// will.
let full_scan_update = {
- let request =
- FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone());
+ let request = FullScanRequest::builder()
+ .chain_tip(cp_tip.clone())
+ .spks_for_keychain(0, spks.clone());
client.full_scan(request, 3, 1).await?
};
assert!(full_scan_update.graph_update.full_txs().next().is_none());
assert!(full_scan_update.last_active_indices.is_empty());
let full_scan_update = {
- let request =
- FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone());
+ let request = FullScanRequest::builder()
+ .chain_tip(cp_tip.clone())
+ .spks_for_keychain(0, spks.clone());
client.full_scan(request, 4, 1).await?
};
assert_eq!(
// A scan with gap limit 5 won't find the second transaction, but a scan with gap limit 6 will.
// The last active indice won't be updated in the first case but will in the second one.
let full_scan_update = {
- let request =
- FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone());
+ let request = FullScanRequest::builder()
+ .chain_tip(cp_tip.clone())
+ .spks_for_keychain(0, spks.clone());
client.full_scan(request, 5, 1).await?
};
let txs: HashSet<_> = full_scan_update
assert!(txs.contains(&txid_4th_addr));
assert_eq!(full_scan_update.last_active_indices[&0], 3);
let full_scan_update = {
- let request =
- FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone());
+ let request = FullScanRequest::builder()
+ .chain_tip(cp_tip.clone())
+ .spks_for_keychain(0, spks.clone());
client.full_scan(request, 6, 1).await?
};
let txs: HashSet<_> = full_scan_update
let cp_tip = env.make_checkpoint_tip();
let sync_update = {
- let request = SyncRequest::from_chain_tip(cp_tip.clone()).set_spks(misc_spks);
+ let request = SyncRequest::builder()
+ .chain_tip(cp_tip.clone())
+ .spks(misc_spks);
client.sync(request, 1)?
};
// A scan with a stop_gap of 3 won't find the transaction, but a scan with a gap limit of 4
// will.
let full_scan_update = {
- let request =
- FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone());
+ let request = FullScanRequest::builder()
+ .chain_tip(cp_tip.clone())
+ .spks_for_keychain(0, spks.clone());
client.full_scan(request, 3, 1)?
};
assert!(full_scan_update.graph_update.full_txs().next().is_none());
assert!(full_scan_update.last_active_indices.is_empty());
let full_scan_update = {
- let request =
- FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone());
+ let request = FullScanRequest::builder()
+ .chain_tip(cp_tip.clone())
+ .spks_for_keychain(0, spks.clone());
client.full_scan(request, 4, 1)?
};
assert_eq!(
// A scan with gap limit 5 won't find the second transaction, but a scan with gap limit 6 will.
// The last active indice won't be updated in the first case but will in the second one.
let full_scan_update = {
- let request =
- FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone());
+ let request = FullScanRequest::builder()
+ .chain_tip(cp_tip.clone())
+ .spks_for_keychain(0, spks.clone());
client.full_scan(request, 5, 1)?
};
let txs: HashSet<_> = full_scan_update
assert!(txs.contains(&txid_4th_addr));
assert_eq!(full_scan_update.last_active_indices[&0], 3);
let full_scan_update = {
- let request =
- FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone());
+ let request = FullScanRequest::builder()
+ .chain_tip(cp_tip.clone())
+ .spks_for_keychain(0, spks.clone());
client.full_scan(request, 6, 1)?
};
let txs: HashSet<_> = full_scan_update
local_chain::{
self, ApplyHeaderError, CannotConnectError, CheckPoint, CheckPointIter, LocalChain,
},
- spk_client::{FullScanRequest, FullScanResult, SyncRequest, SyncResult},
+ spk_client::{
+ FullScanRequest, FullScanRequestBuilder, FullScanResult, SyncRequest, SyncRequestBuilder,
+ SyncResult,
+ },
tx_graph::{CanonicalTx, TxGraph, TxNode},
BlockId, ChainPosition, ConfirmationBlockTime, ConfirmationTime, DescriptorExt, FullTxOut,
Indexed, IndexedTxGraph, Merge,
Self {
last_active_indices: value.last_active_indices,
graph: value.graph_update,
- chain: Some(value.chain_update),
+ chain: value.chain_update,
}
}
}
Self {
last_active_indices: BTreeMap::new(),
graph: value.graph_update,
- chain: Some(value.chain_update),
+ chain: value.chain_update,
}
}
}
/// This is the first step when performing a spk-based wallet partial sync, the returned
/// [`SyncRequest`] collects all revealed script pubkeys from the wallet keychain needed to
/// start a blockchain sync with a spk based blockchain client.
- pub fn start_sync_with_revealed_spks(&self) -> SyncRequest {
- SyncRequest::from_chain_tip(self.chain.tip())
- .populate_with_revealed_spks(&self.indexed_graph.index, ..)
+ pub fn start_sync_with_revealed_spks(&self) -> SyncRequestBuilder<(KeychainKind, u32)> {
+ SyncRequest::builder()
+ .chain_tip(self.chain.tip())
+ .revealed_spks_from_indexer(&self.indexed_graph.index, ..)
}
/// Create a [`FullScanRequest] for this wallet.
///
/// This operation is generally only used when importing or restoring a previously used wallet
/// in which the list of used scripts is not known.
- pub fn start_full_scan(&self) -> FullScanRequest<KeychainKind> {
- FullScanRequest::from_keychain_txout_index(self.chain.tip(), &self.indexed_graph.index)
+ pub fn start_full_scan(&self) -> FullScanRequestBuilder<KeychainKind> {
+ FullScanRequest::builder()
+ .chain_tip(self.chain.tip())
+ .spks_from_indexer(&self.indexed_graph.index)
}
}
use std::io::{self, Write};
use bdk_chain::{
- bitcoin::{Address, Network, Txid},
+ bitcoin::Network,
collections::BTreeSet,
indexed_tx_graph,
spk_client::{FullScanRequest, SyncRequest},
let graph = &*graph.lock().unwrap();
let chain = &*chain.lock().unwrap();
- FullScanRequest::from_chain_tip(chain.tip())
- .set_spks_for_keychain(
+ FullScanRequest::builder()
+ .chain_tip(chain.tip())
+ .spks_for_keychain(
Keychain::External,
graph
.index
.into_iter()
.flatten(),
)
- .set_spks_for_keychain(
+ .spks_for_keychain(
Keychain::Internal,
graph
.index
.into_iter()
.flatten(),
)
- .inspect_spks_for_all_keychains({
+ .inspect({
let mut once = BTreeSet::new();
move |k, spk_i, _| {
if once.insert(k) {
}
let chain_tip = chain.tip();
- let mut request = SyncRequest::from_chain_tip(chain_tip.clone());
+ let mut request =
+ SyncRequest::builder()
+ .chain_tip(chain_tip.clone())
+ .inspect(|item, progress| {
+ let pc = (100 * progress.consumed()) as f32 / progress.total() as f32;
+ eprintln!("[ SCANNING {:03.0}% ] {}", pc, item);
+ });
if all_spks {
- let all_spks = graph
- .index
- .revealed_spks(..)
- .map(|(index, spk)| (index, spk.to_owned()))
- .collect::<Vec<_>>();
- request = request.chain_spks(all_spks.into_iter().map(|((k, spk_i), spk)| {
- eprint!("Scanning {}: {}", k, spk_i);
- spk
- }));
+ request = request.spks_with_labels(
+ graph
+ .index
+ .revealed_spks(..)
+ .map(|(index, spk)| (index, spk.to_owned())),
+ );
}
if unused_spks {
- let unused_spks = graph
- .index
- .unused_spks()
- .map(|(index, spk)| (index, spk.to_owned()))
- .collect::<Vec<_>>();
- request =
- request.chain_spks(unused_spks.into_iter().map(move |((k, spk_i), spk)| {
- eprint!(
- "Checking if address {} {}:{} has been used",
- Address::from_script(&spk, network).unwrap(),
- k,
- spk_i,
- );
- spk
- }));
+ request = request.spks_with_labels(
+ graph
+ .index
+ .unused_spks()
+ .map(|(index, spk)| (index, spk.to_owned())),
+ );
}
if utxos {
let init_outpoints = graph.index.outpoints();
-
- let utxos = graph
- .graph()
- .filter_chain_unspents(
- &*chain,
- chain_tip.block_id(),
- init_outpoints.iter().cloned(),
- )
- .map(|(_, utxo)| utxo)
- .collect::<Vec<_>>();
- request = request.chain_outpoints(utxos.into_iter().map(|utxo| {
- eprint!(
- "Checking if outpoint {} (value: {}) has been spent",
- utxo.outpoint, utxo.txout.value
- );
- utxo.outpoint
- }));
+ request = request.outpoints(
+ graph
+ .graph()
+ .filter_chain_unspents(
+ &*chain,
+ chain_tip.block_id(),
+ init_outpoints.iter().cloned(),
+ )
+ .map(|(_, utxo)| utxo.outpoint),
+ );
};
if unconfirmed {
- let unconfirmed_txids = graph
- .graph()
- .list_canonical_txs(&*chain, chain_tip.block_id())
- .filter(|canonical_tx| !canonical_tx.chain_position.is_confirmed())
- .map(|canonical_tx| canonical_tx.tx_node.txid)
- .collect::<Vec<Txid>>();
-
- request = request.chain_txids(
- unconfirmed_txids
- .into_iter()
- .inspect(|txid| eprint!("Checking if {} is confirmed yet", txid)),
+ request = request.txids(
+ graph
+ .graph()
+ .list_canonical_txs(&*chain, chain_tip.block_id())
+ .filter(|canonical_tx| !canonical_tx.chain_position.is_confirmed())
+ .map(|canonical_tx| canonical_tx.tx_node.txid),
);
}
- let total_spks = request.spks.len();
- let total_txids = request.txids.len();
- let total_ops = request.outpoints.len();
- request = request
- .inspect_spks({
- let mut visited = 0;
- move |_| {
- visited += 1;
- eprintln!(" [ {:>6.2}% ]", (visited * 100) as f32 / total_spks as f32)
- }
- })
- .inspect_txids({
- let mut visited = 0;
- move |_| {
- visited += 1;
- eprintln!(" [ {:>6.2}% ]", (visited * 100) as f32 / total_txids as f32)
- }
- })
- .inspect_outpoints({
- let mut visited = 0;
- move |_| {
- visited += 1;
- eprintln!(" [ {:>6.2}% ]", (visited * 100) as f32 / total_ops as f32)
- }
- });
-
let res = client
.sync(request, scan_options.batch_size, false)
.context("scanning the blockchain")?;
let mut chain = chain.lock().unwrap();
let mut graph = graph.lock().unwrap();
- let chain_changeset = chain.apply_update(chain_update)?;
+ let chain_changeset = chain.apply_update(chain_update.expect("request has chain tip"))?;
let mut indexed_tx_graph_changeset =
indexed_tx_graph::ChangeSet::<ConfirmationBlockTime, _>::default();
+use core::f32;
use std::{
collections::BTreeSet,
io::{self, Write},
};
use bdk_chain::{
- bitcoin::{Address, Network, Txid},
+ bitcoin::Network,
spk_client::{FullScanRequest, SyncRequest},
Merge,
};
let request = {
let chain_tip = chain.lock().expect("mutex must not be poisoned").tip();
let indexed_graph = &*graph.lock().expect("mutex must not be poisoned");
- FullScanRequest::from_keychain_txout_index(chain_tip, &indexed_graph.index)
- .inspect_spks_for_all_keychains({
+ FullScanRequest::builder()
+ .chain_tip(chain_tip)
+ .spks_from_indexer(&indexed_graph.index)
+ .inspect({
let mut once = BTreeSet::<Keychain>::new();
move |keychain, spk_i, _| {
if once.insert(keychain) {
let _ = io::stderr().flush();
}
})
+ .build()
};
// The client scans keychain spks for transaction histories, stopping after `stop_gap`
// deriviation indices. Usually before a scan you are on a fresh wallet with no
// addresses derived so we need to derive up to last active addresses the scan found
// before adding the transactions.
- (chain.apply_update(update.chain_update)?, {
- let index_changeset = graph
- .index
- .reveal_to_target_multi(&update.last_active_indices);
- let mut indexed_tx_graph_changeset = graph.apply_update(update.graph_update);
- indexed_tx_graph_changeset.merge(index_changeset.into());
- indexed_tx_graph_changeset
- })
+ (
+ chain.apply_update(update.chain_update.expect("request included chain tip"))?,
+ {
+ let index_changeset = graph
+ .index
+ .reveal_to_target_multi(&update.last_active_indices);
+ let mut indexed_tx_graph_changeset = graph.apply_update(update.graph_update);
+ indexed_tx_graph_changeset.merge(index_changeset.into());
+ indexed_tx_graph_changeset
+ },
+ )
}
EsploraCommands::Sync {
mut unused_spks,
let local_tip = chain.lock().expect("mutex must not be poisoned").tip();
// Spks, outpoints and txids we want updates on will be accumulated here.
- let mut request = SyncRequest::from_chain_tip(local_tip.clone());
+ let mut request =
+ SyncRequest::builder()
+ .chain_tip(local_tip.clone())
+ .inspect(|item, progress| {
+ let pc = (100 * progress.consumed()) as f32 / progress.total() as f32;
+ eprintln!("[ SCANNING {:03.0}% ] {}", pc, item);
+ // Flush early to ensure we print at every iteration.
+ let _ = io::stderr().flush();
+ });
// Get a short lock on the structures to get spks, utxos, and txs that we are interested
// in.
let chain = chain.lock().unwrap();
if *all_spks {
- let all_spks = graph
- .index
- .revealed_spks(..)
- .map(|((k, i), spk)| (k, i, spk.to_owned()))
- .collect::<Vec<_>>();
- request = request.chain_spks(all_spks.into_iter().map(|(k, i, spk)| {
- eprint!("scanning {}:{}", k, i);
- // Flush early to ensure we print at every iteration.
- let _ = io::stderr().flush();
- spk
- }));
+ request = request.spks_with_labels(
+ graph
+ .index
+ .revealed_spks(..)
+ .map(|(i, spk)| (i, spk.to_owned())),
+ );
}
if unused_spks {
- let unused_spks = graph
- .index
- .unused_spks()
- .map(|(index, spk)| (index, spk.to_owned()))
- .collect::<Vec<_>>();
- request =
- request.chain_spks(unused_spks.into_iter().map(move |((k, i), spk)| {
- eprint!(
- "Checking if address {} {}:{} has been used",
- Address::from_script(&spk, network).unwrap(),
- k,
- i,
- );
- // Flush early to ensure we print at every iteration.
- let _ = io::stderr().flush();
- spk
- }));
+ request = request.spks_with_labels(
+ graph
+ .index
+ .unused_spks()
+ .map(|(index, spk)| (index, spk.to_owned())),
+ );
}
if utxos {
// We want to search for whether the UTXO is spent, and spent by which
// transaction. We provide the outpoint of the UTXO to
// `EsploraExt::update_tx_graph_without_keychain`.
let init_outpoints = graph.index.outpoints();
- let utxos = graph
- .graph()
- .filter_chain_unspents(
- &*chain,
- local_tip.block_id(),
- init_outpoints.iter().cloned(),
- )
- .map(|(_, utxo)| utxo)
- .collect::<Vec<_>>();
- request = request.chain_outpoints(
- utxos
- .into_iter()
- .inspect(|utxo| {
- eprint!(
- "Checking if outpoint {} (value: {}) has been spent",
- utxo.outpoint, utxo.txout.value
- );
- // Flush early to ensure we print at every iteration.
- let _ = io::stderr().flush();
- })
- .map(|utxo| utxo.outpoint),
+ request = request.outpoints(
+ graph
+ .graph()
+ .filter_chain_unspents(
+ &*chain,
+ local_tip.block_id(),
+ init_outpoints.iter().cloned(),
+ )
+ .map(|(_, utxo)| utxo.outpoint),
);
};
if unconfirmed {
// We want to search for whether the unconfirmed transaction is now confirmed.
// We provide the unconfirmed txids to
// `EsploraExt::update_tx_graph_without_keychain`.
- let unconfirmed_txids = graph
- .graph()
- .list_canonical_txs(&*chain, local_tip.block_id())
- .filter(|canonical_tx| !canonical_tx.chain_position.is_confirmed())
- .map(|canonical_tx| canonical_tx.tx_node.txid)
- .collect::<Vec<Txid>>();
- request = request.chain_txids(unconfirmed_txids.into_iter().inspect(|txid| {
- eprint!("Checking if {} is confirmed yet", txid);
- // Flush early to ensure we print at every iteration.
- let _ = io::stderr().flush();
- }));
+ request = request.txids(
+ graph
+ .graph()
+ .list_canonical_txs(&*chain, local_tip.block_id())
+ .filter(|canonical_tx| !canonical_tx.chain_position.is_confirmed())
+ .map(|canonical_tx| canonical_tx.tx_node.txid),
+ );
}
}
- let total_spks = request.spks.len();
- let total_txids = request.txids.len();
- let total_ops = request.outpoints.len();
- request = request
- .inspect_spks({
- let mut visited = 0;
- move |_| {
- visited += 1;
- eprintln!(" [ {:>6.2}% ]", (visited * 100) as f32 / total_spks as f32)
- }
- })
- .inspect_txids({
- let mut visited = 0;
- move |_| {
- visited += 1;
- eprintln!(" [ {:>6.2}% ]", (visited * 100) as f32 / total_txids as f32)
- }
- })
- .inspect_outpoints({
- let mut visited = 0;
- move |_| {
- visited += 1;
- eprintln!(" [ {:>6.2}% ]", (visited * 100) as f32 / total_ops as f32)
- }
- });
let mut update = client.sync(request, scan_options.parallel_requests)?;
// Update last seen unconfirmed
let _ = update.graph_update.update_last_seen_unconfirmed(now);
(
- chain.lock().unwrap().apply_update(update.chain_update)?,
+ chain
+ .lock()
+ .unwrap()
+ .apply_update(update.chain_update.expect("request has chain tip"))?,
graph.lock().unwrap().apply_update(update.graph_update),
)
}
// already have.
client.populate_tx_cache(wallet.tx_graph());
- let request = wallet
- .start_full_scan()
- .inspect_spks_for_all_keychains({
- let mut once = HashSet::<KeychainKind>::new();
- move |k, spk_i, _| {
- if once.insert(k) {
- print!("\nScanning keychain [{:?}]", k)
- } else {
- print!(" {:<3}", spk_i)
- }
+ let request = wallet.start_full_scan().inspect({
+ let mut stdout = std::io::stdout();
+ let mut once = HashSet::<KeychainKind>::new();
+ move |k, spk_i, _| {
+ if once.insert(k) {
+ print!("\nScanning keychain [{:?}]", k)
+ } else {
+ print!(" {:<3}", spk_i)
}
- })
- .inspect_spks_for_all_keychains(|_, _, _| std::io::stdout().flush().expect("must flush"));
+ stdout.flush().expect("must flush");
+ }
+ });
let mut update = client.full_scan(request, STOP_GAP, BATCH_SIZE, false)?;
print!("Syncing...");
let client = esplora_client::Builder::new(ESPLORA_URL).build_async()?;
- let request = wallet.start_full_scan().inspect_spks_for_all_keychains({
+ let request = wallet.start_full_scan().inspect({
+ let mut stdout = std::io::stdout();
let mut once = BTreeSet::<KeychainKind>::new();
move |keychain, spk_i, _| {
if once.insert(keychain) {
- print!("\nScanning keychain [{:?}] ", keychain);
+ print!("\nScanning keychain [{:?}]", keychain);
}
print!(" {:<3}", spk_i);
- std::io::stdout().flush().expect("must flush")
+ stdout.flush().expect("must flush")
}
});
print!("Syncing...");
let client = esplora_client::Builder::new(ESPLORA_URL).build_blocking();
- let request = wallet.start_full_scan().inspect_spks_for_all_keychains({
+ let request = wallet.start_full_scan().inspect({
+ let mut stdout = std::io::stdout();
let mut once = BTreeSet::<KeychainKind>::new();
move |keychain, spk_i, _| {
if once.insert(keychain) {
print!("\nScanning keychain [{:?}] ", keychain);
}
print!(" {:<3}", spk_i);
- std::io::stdout().flush().expect("must flush")
+ stdout.flush().expect("must flush")
}
});