]> Untitled Git - bdk/commitdiff
feat!: rework `FullScanRequest` and `SyncRequest`
author志宇 <hello@evanlinjin.me>
Thu, 1 Aug 2024 09:17:49 +0000 (09:17 +0000)
committer志宇 <hello@evanlinjin.me>
Wed, 14 Aug 2024 08:16:57 +0000 (08:16 +0000)
Change `FullScanRequest` and `SyncRequest` take in a `chain_tip` as an
option. In turn, `FullScanResult` and `SyncResult` are also changed to
return the update `chain_tip` as an option. This allows the caller to
opt-out of getting a `LocalChain` update.

Rework `FullScanRequest` and `SyncRequest` to have better ergonomics
when inspecting the progress of items of a sync request. Richer progress
data is provided to the inspect closure.

Introduce `FullScanRequestBuilder` and `SyncRequestBuilder`. Separating
out request-construction and request-consumption in different structs
simplifies the API and method names.

Simplify `EsploraExt` and `EsploraAsyncExt` back to having two methods
(`full_scan` and `sync`). The caller can still opt out of fetching a
`LocalChain` update with the new `FullScanRequest` and `SyncRequest`.

14 files changed:
crates/chain/src/spk_client.rs
crates/electrum/src/bdk_electrum_client.rs
crates/electrum/tests/test_electrum.rs
crates/esplora/src/async_ext.rs
crates/esplora/src/blocking_ext.rs
crates/esplora/src/lib.rs
crates/esplora/tests/async_ext.rs
crates/esplora/tests/blocking_ext.rs
crates/wallet/src/wallet/mod.rs
example-crates/example_electrum/src/main.rs
example-crates/example_esplora/src/main.rs
example-crates/wallet_electrum/src/main.rs
example-crates/wallet_esplora_async/src/main.rs
example-crates/wallet_esplora_blocking/src/main.rs

index 3457dfef770ba3c255661434c0d8592442619c48..215273729086a773de1cc2e27172949a7c995dbc 100644 (file)
 //! Helper types for spk-based blockchain clients.
-
 use crate::{
-    collections::BTreeMap, local_chain::CheckPoint, ConfirmationBlockTime, Indexed, TxGraph,
+    alloc::{boxed::Box, collections::VecDeque, vec::Vec},
+    collections::BTreeMap,
+    local_chain::CheckPoint,
+    ConfirmationBlockTime, Indexed, TxGraph,
 };
-use alloc::boxed::Box;
 use bitcoin::{OutPoint, Script, ScriptBuf, Txid};
-use core::marker::PhantomData;
 
-/// Data required to perform a spk-based blockchain client sync.
-///
-/// A client sync fetches relevant chain data for a known list of scripts, transaction ids and
-/// outpoints. The sync process also updates the chain from the given [`CheckPoint`].
-pub struct SyncRequest {
-    /// A checkpoint for the current chain [`LocalChain::tip`].
-    /// The sync process will return a new chain update that extends this tip.
-    ///
-    /// [`LocalChain::tip`]: crate::local_chain::LocalChain::tip
-    pub chain_tip: CheckPoint,
-    /// Transactions that spend from or to these indexed script pubkeys.
-    pub spks: Box<dyn ExactSizeIterator<Item = ScriptBuf> + Send>,
-    /// Transactions with these txids.
-    pub txids: Box<dyn ExactSizeIterator<Item = Txid> + Send>,
-    /// Transactions with these outpoints or spent from these outpoints.
-    pub outpoints: Box<dyn ExactSizeIterator<Item = OutPoint> + Send>,
-}
-
-impl SyncRequest {
-    /// Construct a new [`SyncRequest`] from a given `cp` tip.
-    pub fn from_chain_tip(cp: CheckPoint) -> Self {
+type InspectSync<I> = dyn FnMut(SyncItem<I>, SyncProgress) + Send + 'static;
+
+type InspectFullScan<K> = dyn FnMut(K, u32, &Script) + Send + 'static;
+
+/// An item reported to the [`inspect`](SyncRequestBuilder::inspect) closure of [`SyncRequest`].
+#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
+pub enum SyncItem<'i, I> {
+    /// Script pubkey sync item.
+    Spk(I, &'i Script),
+    /// Txid sync item.
+    Txid(Txid),
+    /// Outpoint sync item.
+    OutPoint(OutPoint),
+}
+
+impl<'i, I: core::fmt::Debug + core::any::Any> core::fmt::Display for SyncItem<'i, I> {
+    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
+        match self {
+            SyncItem::Spk(i, spk) => {
+                if (i as &dyn core::any::Any).is::<()>() {
+                    write!(f, "script '{}'", spk)
+                } else {
+                    write!(f, "script {:?} '{}'", i, spk)
+                }
+            }
+            SyncItem::Txid(txid) => write!(f, "txid '{}'", txid),
+            SyncItem::OutPoint(op) => write!(f, "outpoint '{}'", op),
+        }
+    }
+}
+
+/// The progress of [`SyncRequest`].
+#[derive(Debug, Clone)]
+pub struct SyncProgress {
+    /// Script pubkeys consumed by the request.
+    pub spks_consumed: usize,
+    /// Script pubkeys remaining in the request.
+    pub spks_remaining: usize,
+    /// Txids consumed by the request.
+    pub txids_consumed: usize,
+    /// Txids remaining in the request.
+    pub txids_remaining: usize,
+    /// Outpoints consumed by the request.
+    pub outpoints_consumed: usize,
+    /// Outpoints remaining in the request.
+    pub outpoints_remaining: usize,
+}
+
+impl SyncProgress {
+    /// Total items, consumed and remaining, of the request.
+    pub fn total(&self) -> usize {
+        self.total_spks() + self.total_txids() + self.total_outpoints()
+    }
+
+    /// Total script pubkeys, consumed and remaining, of the request.
+    pub fn total_spks(&self) -> usize {
+        self.spks_consumed + self.spks_remaining
+    }
+
+    /// Total txids, consumed and remaining, of the request.
+    pub fn total_txids(&self) -> usize {
+        self.txids_consumed + self.txids_remaining
+    }
+
+    /// Total outpoints, consumed and remaining, of the request.
+    pub fn total_outpoints(&self) -> usize {
+        self.outpoints_consumed + self.outpoints_remaining
+    }
+
+    /// Total consumed items of the request.
+    pub fn consumed(&self) -> usize {
+        self.spks_consumed + self.txids_consumed + self.outpoints_consumed
+    }
+
+    /// Total remaining items of the request.
+    pub fn remaining(&self) -> usize {
+        self.spks_remaining + self.txids_remaining + self.outpoints_remaining
+    }
+}
+
+/// Builds a [`SyncRequest`].
+#[must_use]
+pub struct SyncRequestBuilder<SpkLabel = ()> {
+    inner: SyncRequest<SpkLabel>,
+}
+
+impl<SpkLabel> Default for SyncRequestBuilder<SpkLabel> {
+    fn default() -> Self {
         Self {
-            chain_tip: cp,
-            spks: Box::new(core::iter::empty()),
-            txids: Box::new(core::iter::empty()),
-            outpoints: Box::new(core::iter::empty()),
+            inner: Default::default(),
         }
     }
+}
 
-    /// Set the [`Script`]s that will be synced against.
-    ///
-    /// This consumes the [`SyncRequest`] and returns the updated one.
-    #[must_use]
-    pub fn set_spks(
-        mut self,
-        spks: impl IntoIterator<IntoIter = impl ExactSizeIterator<Item = ScriptBuf> + Send + 'static>,
+#[cfg(feature = "miniscript")]
+impl<K: Clone + Ord + core::fmt::Debug + Send + Sync> SyncRequestBuilder<(K, u32)> {
+    /// Add [`Script`]s that are revealed by the `indexer` of the given `spk_range` that will be
+    /// synced against.
+    pub fn revealed_spks_from_indexer(
+        self,
+        indexer: &crate::indexer::keychain_txout::KeychainTxOutIndex<K>,
+        spk_range: impl core::ops::RangeBounds<K>,
     ) -> Self {
-        self.spks = Box::new(spks.into_iter());
-        self
+        use crate::alloc::borrow::ToOwned;
+        use crate::alloc::vec::Vec;
+        self.spks_with_labels(
+            indexer
+                .revealed_spks(spk_range)
+                .map(|(i, spk)| (i, spk.to_owned()))
+                .collect::<Vec<_>>(),
+        )
     }
 
-    /// Set the [`Txid`]s that will be synced against.
-    ///
-    /// This consumes the [`SyncRequest`] and returns the updated one.
-    #[must_use]
-    pub fn set_txids(
-        mut self,
-        txids: impl IntoIterator<IntoIter = impl ExactSizeIterator<Item = Txid> + Send + 'static>,
+    /// Add [`Script`]s that are revealed by the `indexer` but currently unused.
+    pub fn unused_spks_from_indexer(
+        self,
+        indexer: &crate::indexer::keychain_txout::KeychainTxOutIndex<K>,
     ) -> Self {
-        self.txids = Box::new(txids.into_iter());
-        self
+        use crate::alloc::borrow::ToOwned;
+        use crate::alloc::vec::Vec;
+        self.spks_with_labels(
+            indexer
+                .unused_spks()
+                .map(|(i, spk)| (i, spk.to_owned()))
+                .collect::<Vec<_>>(),
+        )
     }
+}
+
+impl SyncRequestBuilder<()> {
+    /// Add [`Script`]s that will be synced against.
+    pub fn spks(self, spks: impl IntoIterator<Item = ScriptBuf>) -> Self {
+        self.spks_with_labels(spks.into_iter().map(|spk| ((), spk)))
+    }
+}
 
-    /// Set the [`OutPoint`]s that will be synced against.
+impl<SpkLabel> SyncRequestBuilder<SpkLabel> {
+    /// Set the initial chain tip for the sync request.
     ///
-    /// This consumes the [`SyncRequest`] and returns the updated one.
-    #[must_use]
-    pub fn set_outpoints(
-        mut self,
-        outpoints: impl IntoIterator<
-            IntoIter = impl ExactSizeIterator<Item = OutPoint> + Send + 'static,
-        >,
-    ) -> Self {
-        self.outpoints = Box::new(outpoints.into_iter());
+    /// This is used to update [`LocalChain`](crate::local_chain::LocalChain).
+    pub fn chain_tip(mut self, cp: CheckPoint) -> Self {
+        self.inner.chain_tip = Some(cp);
         self
     }
 
-    /// Chain on additional [`Script`]s that will be synced against.
-    ///
-    /// This consumes the [`SyncRequest`] and returns the updated one.
-    #[must_use]
-    pub fn chain_spks(
+    /// Add [`Script`]s coupled with an associated label that will be synced against.
+    pub fn spks_with_labels(
         mut self,
-        spks: impl IntoIterator<
-            IntoIter = impl ExactSizeIterator<Item = ScriptBuf> + Send + 'static,
-            Item = ScriptBuf,
-        >,
+        spks: impl IntoIterator<Item = (SpkLabel, ScriptBuf)>,
     ) -> Self {
-        self.spks = Box::new(ExactSizeChain::new(self.spks, spks.into_iter()));
+        self.inner.spks.extend(spks);
         self
     }
 
-    /// Chain on additional [`Txid`]s that will be synced against.
-    ///
-    /// This consumes the [`SyncRequest`] and returns the updated one.
-    #[must_use]
-    pub fn chain_txids(
-        mut self,
-        txids: impl IntoIterator<
-            IntoIter = impl ExactSizeIterator<Item = Txid> + Send + 'static,
-            Item = Txid,
-        >,
-    ) -> Self {
-        self.txids = Box::new(ExactSizeChain::new(self.txids, txids.into_iter()));
+    /// Add [`Txid`]s that will be synced against.
+    pub fn txids(mut self, txids: impl IntoIterator<Item = Txid>) -> Self {
+        self.inner.txids.extend(txids);
         self
     }
 
-    /// Chain on additional [`OutPoint`]s that will be synced against.
-    ///
-    /// This consumes the [`SyncRequest`] and returns the updated one.
-    #[must_use]
-    pub fn chain_outpoints(
-        mut self,
-        outpoints: impl IntoIterator<
-            IntoIter = impl ExactSizeIterator<Item = OutPoint> + Send + 'static,
-            Item = OutPoint,
-        >,
-    ) -> Self {
-        self.outpoints = Box::new(ExactSizeChain::new(self.outpoints, outpoints.into_iter()));
+    /// Add [`OutPoint`]s that will be synced against.
+    pub fn outpoints(mut self, outpoints: impl IntoIterator<Item = OutPoint>) -> Self {
+        self.inner.outpoints.extend(outpoints);
         self
     }
 
-    /// Add a closure that will be called for [`Script`]s previously added to this request.
-    ///
-    /// This consumes the [`SyncRequest`] and returns the updated one.
-    #[must_use]
-    pub fn inspect_spks(
-        mut self,
-        mut inspect: impl FnMut(&Script) + Send + Sync + 'static,
-    ) -> Self {
-        self.spks = Box::new(self.spks.inspect(move |spk| inspect(spk)));
+    /// Set the closure that will inspect every sync item visited.
+    pub fn inspect<F>(mut self, inspect: F) -> Self
+    where
+        F: FnMut(SyncItem<SpkLabel>, SyncProgress) + Send + 'static,
+    {
+        self.inner.inspect = Box::new(inspect);
         self
     }
 
-    /// Add a closure that will be called for [`Txid`]s previously added to this request.
+    /// Build the [`SyncRequest`].
+    pub fn build(self) -> SyncRequest<SpkLabel> {
+        self.inner
+    }
+}
+
+/// Data required to perform a spk-based blockchain client sync.
+///
+/// A client sync fetches relevant chain data for a known list of scripts, transaction ids and
+/// outpoints. The sync process also updates the chain from the given
+/// [`chain_tip`](SyncRequestBuilder::chain_tip) (if provided).
+///
+/// ```rust
+/// # use bdk_chain::{bitcoin::{hashes::Hash, ScriptBuf}, local_chain::LocalChain};
+/// # let (local_chain, _) = LocalChain::from_genesis_hash(Hash::all_zeros());
+/// # let scripts = [ScriptBuf::default(), ScriptBuf::default()];
+/// # use bdk_chain::spk_client::SyncRequest;
+/// // Construct a sync request.
+/// let sync_request = SyncRequest::builder()
+///     // Provide chain tip of the local wallet.
+///     .chain_tip(local_chain.tip())
+///     // Provide list of scripts to scan for transactions against.
+///     .spks(scripts)
+///     // This is called for every synced item.
+///     .inspect(|item, progress| println!("{} (remaining: {})", item, progress.remaining()))
+///     // Finish constructing the sync request.
+///     .build();
+/// ```
+#[must_use]
+pub struct SyncRequest<I = ()> {
+    chain_tip: Option<CheckPoint>,
+    spks: VecDeque<(I, ScriptBuf)>,
+    spks_consumed: usize,
+    txids: VecDeque<Txid>,
+    txids_consumed: usize,
+    outpoints: VecDeque<OutPoint>,
+    outpoints_consumed: usize,
+    inspect: Box<InspectSync<I>>,
+}
+
+impl<I> Default for SyncRequest<I> {
+    fn default() -> Self {
+        Self {
+            chain_tip: None,
+            spks: VecDeque::new(),
+            spks_consumed: 0,
+            txids: VecDeque::new(),
+            txids_consumed: 0,
+            outpoints: VecDeque::new(),
+            outpoints_consumed: 0,
+            inspect: Box::new(|_, _| {}),
+        }
+    }
+}
+
+impl<I> From<SyncRequestBuilder<I>> for SyncRequest<I> {
+    fn from(builder: SyncRequestBuilder<I>) -> Self {
+        builder.inner
+    }
+}
+
+impl<I> SyncRequest<I> {
+    /// Start building a [`SyncRequest`].
+    pub fn builder() -> SyncRequestBuilder<I> {
+        SyncRequestBuilder {
+            inner: Default::default(),
+        }
+    }
+
+    /// Get the [`SyncProgress`] of this request.
+    pub fn progress(&self) -> SyncProgress {
+        SyncProgress {
+            spks_consumed: self.spks_consumed,
+            spks_remaining: self.spks.len(),
+            txids_consumed: self.txids_consumed,
+            txids_remaining: self.txids.len(),
+            outpoints_consumed: self.outpoints_consumed,
+            outpoints_remaining: self.outpoints.len(),
+        }
+    }
+
+    /// Get the chain tip [`CheckPoint`] of this request (if any).
+    pub fn chain_tip(&self) -> Option<CheckPoint> {
+        self.chain_tip.clone()
+    }
+
+    /// Advances the sync request and returns the next [`ScriptBuf`].
     ///
-    /// This consumes the [`SyncRequest`] and returns the updated one.
-    #[must_use]
-    pub fn inspect_txids(mut self, mut inspect: impl FnMut(&Txid) + Send + Sync + 'static) -> Self {
-        self.txids = Box::new(self.txids.inspect(move |txid| inspect(txid)));
-        self
+    /// Returns [`None`] when there are no more scripts remaining in the request.
+    pub fn next_spk(&mut self) -> Option<ScriptBuf> {
+        let (i, spk) = self.spks.pop_front()?;
+        self.spks_consumed += 1;
+        self._call_inspect(SyncItem::Spk(i, spk.as_script()));
+        Some(spk)
     }
 
-    /// Add a closure that will be called for [`OutPoint`]s previously added to this request.
+    /// Advances the sync request and returns the next [`Txid`].
     ///
-    /// This consumes the [`SyncRequest`] and returns the updated one.
-    #[must_use]
-    pub fn inspect_outpoints(
-        mut self,
-        mut inspect: impl FnMut(&OutPoint) + Send + Sync + 'static,
-    ) -> Self {
-        self.outpoints = Box::new(self.outpoints.inspect(move |op| inspect(op)));
-        self
+    /// Returns [`None`] when there are no more txids remaining in the request.
+    pub fn next_txid(&mut self) -> Option<Txid> {
+        let txid = self.txids.pop_front()?;
+        self.txids_consumed += 1;
+        self._call_inspect(SyncItem::Txid(txid));
+        Some(txid)
     }
 
-    /// Populate the request with revealed script pubkeys from `index` with the given `spk_range`.
+    /// Advances the sync request and returns the next [`OutPoint`].
     ///
-    /// This consumes the [`SyncRequest`] and returns the updated one.
-    #[cfg(feature = "miniscript")]
-    #[must_use]
-    pub fn populate_with_revealed_spks<K: Clone + Ord + core::fmt::Debug + Send + Sync>(
-        self,
-        index: &crate::indexer::keychain_txout::KeychainTxOutIndex<K>,
-        spk_range: impl core::ops::RangeBounds<K>,
-    ) -> Self {
-        use alloc::borrow::ToOwned;
-        use alloc::vec::Vec;
-        self.chain_spks(
-            index
-                .revealed_spks(spk_range)
-                .map(|(_, spk)| spk.to_owned())
-                .collect::<Vec<_>>(),
-        )
+    /// Returns [`None`] when there are no more outpoints in the request.
+    pub fn next_outpoint(&mut self) -> Option<OutPoint> {
+        let outpoint = self.outpoints.pop_front()?;
+        self.outpoints_consumed += 1;
+        self._call_inspect(SyncItem::OutPoint(outpoint));
+        Some(outpoint)
+    }
+
+    /// Iterate over [`ScriptBuf`]s contained in this request.
+    pub fn iter_spks(&mut self) -> impl ExactSizeIterator<Item = ScriptBuf> + '_ {
+        SyncIter::<I, ScriptBuf>::new(self)
+    }
+
+    /// Iterate over [`Txid`]s contained in this request.
+    pub fn iter_txids(&mut self) -> impl ExactSizeIterator<Item = Txid> + '_ {
+        SyncIter::<I, Txid>::new(self)
+    }
+
+    /// Iterate over [`OutPoint`]s contained in this request.
+    pub fn iter_outpoints(&mut self) -> impl ExactSizeIterator<Item = OutPoint> + '_ {
+        SyncIter::<I, OutPoint>::new(self)
+    }
+
+    fn _call_inspect(&mut self, item: SyncItem<I>) {
+        let progress = self.progress();
+        (*self.inspect)(item, progress);
     }
 }
 
 /// Data returned from a spk-based blockchain client sync.
 ///
 /// See also [`SyncRequest`].
+#[must_use]
+#[derive(Debug)]
 pub struct SyncResult<A = ConfirmationBlockTime> {
     /// The update to apply to the receiving [`TxGraph`].
     pub graph_update: TxGraph<A>,
     /// The update to apply to the receiving [`LocalChain`](crate::local_chain::LocalChain).
-    pub chain_update: CheckPoint,
-}
-
-/// Data required to perform a spk-based blockchain client full scan.
-///
-/// A client full scan iterates through all the scripts for the given keychains, fetching relevant
-/// data until some stop gap number of scripts is found that have no data. This operation is
-/// generally only used when importing or restoring previously used keychains in which the list of
-/// used scripts is not known. The full scan process also updates the chain from the given [`CheckPoint`].
-pub struct FullScanRequest<K> {
-    /// A checkpoint for the current [`LocalChain::tip`].
-    /// The full scan process will return a new chain update that extends this tip.
-    ///
-    /// [`LocalChain::tip`]: crate::local_chain::LocalChain::tip
-    pub chain_tip: CheckPoint,
-    /// Iterators of script pubkeys indexed by the keychain index.
-    pub spks_by_keychain: BTreeMap<K, Box<dyn Iterator<Item = Indexed<ScriptBuf>> + Send>>,
+    pub chain_update: Option<CheckPoint>,
 }
 
-impl<K: Ord + Clone> FullScanRequest<K> {
-    /// Construct a new [`FullScanRequest`] from a given `chain_tip`.
-    #[must_use]
-    pub fn from_chain_tip(chain_tip: CheckPoint) -> Self {
+impl<A> Default for SyncResult<A> {
+    fn default() -> Self {
         Self {
-            chain_tip,
-            spks_by_keychain: BTreeMap::new(),
+            graph_update: Default::default(),
+            chain_update: Default::default(),
         }
     }
+}
 
-    /// Construct a new [`FullScanRequest`] from a given `chain_tip` and `index`.
-    ///
-    /// Unbounded script pubkey iterators for each keychain (`K`) are extracted using
-    /// [`KeychainTxOutIndex::all_unbounded_spk_iters`] and is used to populate the
-    /// [`FullScanRequest`].
-    ///
-    /// [`KeychainTxOutIndex::all_unbounded_spk_iters`]: crate::indexer::keychain_txout::KeychainTxOutIndex::all_unbounded_spk_iters
-    #[cfg(feature = "miniscript")]
-    #[must_use]
-    pub fn from_keychain_txout_index(
-        chain_tip: CheckPoint,
-        index: &crate::indexer::keychain_txout::KeychainTxOutIndex<K>,
-    ) -> Self
-    where
-        K: core::fmt::Debug,
-    {
-        let mut req = Self::from_chain_tip(chain_tip);
-        for (keychain, spks) in index.all_unbounded_spk_iters() {
-            req = req.set_spks_for_keychain(keychain, spks);
+/// Builds a [`FullScanRequest`].
+#[must_use]
+pub struct FullScanRequestBuilder<K> {
+    inner: FullScanRequest<K>,
+}
+
+impl<K> Default for FullScanRequestBuilder<K> {
+    fn default() -> Self {
+        Self {
+            inner: Default::default(),
         }
-        req
     }
+}
 
-    /// Set the [`Script`]s for a given `keychain`.
-    ///
-    /// This consumes the [`FullScanRequest`] and returns the updated one.
-    #[must_use]
-    pub fn set_spks_for_keychain(
+#[cfg(feature = "miniscript")]
+impl<K: Ord + Clone + core::fmt::Debug> FullScanRequestBuilder<K> {
+    /// Add spk iterators for each keychain tracked in `indexer`.
+    pub fn spks_from_indexer(
         mut self,
-        keychain: K,
-        spks: impl IntoIterator<IntoIter = impl Iterator<Item = Indexed<ScriptBuf>> + Send + 'static>,
+        indexer: &crate::indexer::keychain_txout::KeychainTxOutIndex<K>,
     ) -> Self {
-        self.spks_by_keychain
-            .insert(keychain, Box::new(spks.into_iter()));
+        for (keychain, spks) in indexer.all_unbounded_spk_iters() {
+            self = self.spks_for_keychain(keychain, spks);
+        }
         self
     }
+}
 
-    /// Chain on additional [`Script`]s that will be synced against.
+impl<K: Ord> FullScanRequestBuilder<K> {
+    /// Set the initial chain tip for the full scan request.
     ///
-    /// This consumes the [`FullScanRequest`] and returns the updated one.
-    #[must_use]
-    pub fn chain_spks_for_keychain(
+    /// This is used to update [`LocalChain`](crate::local_chain::LocalChain).
+    pub fn chain_tip(mut self, tip: CheckPoint) -> Self {
+        self.inner.chain_tip = Some(tip);
+        self
+    }
+
+    /// Set the spk iterator for a given `keychain`.
+    pub fn spks_for_keychain(
         mut self,
         keychain: K,
         spks: impl IntoIterator<IntoIter = impl Iterator<Item = Indexed<ScriptBuf>> + Send + 'static>,
     ) -> Self {
-        match self.spks_by_keychain.remove(&keychain) {
-            // clippy here suggests to remove `into_iter` from `spks.into_iter()`, but doing so
-            // results in a compilation error
-            #[allow(clippy::useless_conversion)]
-            Some(keychain_spks) => self
-                .spks_by_keychain
-                .insert(keychain, Box::new(keychain_spks.chain(spks.into_iter()))),
-            None => self
-                .spks_by_keychain
-                .insert(keychain, Box::new(spks.into_iter())),
-        };
+        self.inner
+            .spks_by_keychain
+            .insert(keychain, Box::new(spks.into_iter()));
         self
     }
 
-    /// Add a closure that will be called for every [`Script`] previously added to any keychain in
-    /// this request.
-    ///
-    /// This consumes the [`SyncRequest`] and returns the updated one.
-    #[must_use]
-    pub fn inspect_spks_for_all_keychains(
-        mut self,
-        inspect: impl FnMut(K, u32, &Script) + Send + Sync + Clone + 'static,
-    ) -> Self
+    /// Set the closure that will inspect every sync item visited.
+    pub fn inspect<F>(mut self, inspect: F) -> Self
     where
-        K: Send + 'static,
+        F: FnMut(K, u32, &Script) + Send + 'static,
     {
-        for (keychain, spks) in core::mem::take(&mut self.spks_by_keychain) {
-            let mut inspect = inspect.clone();
-            self.spks_by_keychain.insert(
-                keychain.clone(),
-                Box::new(spks.inspect(move |(i, spk)| inspect(keychain.clone(), *i, spk))),
-            );
-        }
+        self.inner.inspect = Box::new(inspect);
         self
     }
 
-    /// Add a closure that will be called for every [`Script`] previously added to a given
-    /// `keychain` in this request.
-    ///
-    /// This consumes the [`SyncRequest`] and returns the updated one.
-    #[must_use]
-    pub fn inspect_spks_for_keychain(
-        mut self,
-        keychain: K,
-        mut inspect: impl FnMut(u32, &Script) + Send + Sync + 'static,
-    ) -> Self
-    where
-        K: Send + 'static,
-    {
-        if let Some(spks) = self.spks_by_keychain.remove(&keychain) {
-            self.spks_by_keychain.insert(
-                keychain,
-                Box::new(spks.inspect(move |(i, spk)| inspect(*i, spk))),
-            );
+    /// Build the [`FullScanRequest`].
+    pub fn build(self) -> FullScanRequest<K> {
+        self.inner
+    }
+}
+
+/// Data required to perform a spk-based blockchain client full scan.
+///
+/// A client full scan iterates through all the scripts for the given keychains, fetching relevant
+/// data until some stop gap number of scripts is found that have no data. This operation is
+/// generally only used when importing or restoring previously used keychains in which the list of
+/// used scripts is not known. The full scan process also updates the chain from the given
+/// [`chain_tip`](FullScanRequestBuilder::chain_tip) (if provided).
+#[must_use]
+pub struct FullScanRequest<K> {
+    chain_tip: Option<CheckPoint>,
+    spks_by_keychain: BTreeMap<K, Box<dyn Iterator<Item = Indexed<ScriptBuf>> + Send>>,
+    inspect: Box<InspectFullScan<K>>,
+}
+
+impl<K> From<FullScanRequestBuilder<K>> for FullScanRequest<K> {
+    fn from(builder: FullScanRequestBuilder<K>) -> Self {
+        builder.inner
+    }
+}
+
+impl<K> Default for FullScanRequest<K> {
+    fn default() -> Self {
+        Self {
+            chain_tip: None,
+            spks_by_keychain: Default::default(),
+            inspect: Box::new(|_, _, _| {}),
+        }
+    }
+}
+
+impl<K: Ord + Clone> FullScanRequest<K> {
+    /// Start building a [`FullScanRequest`].
+    pub fn builder() -> FullScanRequestBuilder<K> {
+        FullScanRequestBuilder {
+            inner: Self::default(),
+        }
+    }
+
+    /// Get the chain tip [`CheckPoint`] of this request (if any).
+    pub fn chain_tip(&self) -> Option<CheckPoint> {
+        self.chain_tip.clone()
+    }
+
+    /// List all keychains contained in this request.
+    pub fn keychains(&self) -> Vec<K> {
+        self.spks_by_keychain.keys().cloned().collect()
+    }
+
+    /// Advances the full scan request and returns the next indexed [`ScriptBuf`] of the given
+    /// `keychain`.
+    pub fn next_spk(&mut self, keychain: K) -> Option<Indexed<ScriptBuf>> {
+        self.iter_spks(keychain).next()
+    }
+
+    /// Iterate over indexed [`ScriptBuf`]s contained in this request of the given `keychain`.
+    pub fn iter_spks(&mut self, keychain: K) -> impl Iterator<Item = Indexed<ScriptBuf>> + '_ {
+        let spks = self.spks_by_keychain.get_mut(&keychain);
+        let inspect = &mut self.inspect;
+        KeychainSpkIter {
+            keychain,
+            spks,
+            inspect,
         }
-        self
     }
 }
 
 /// Data returned from a spk-based blockchain client full scan.
 ///
 /// See also [`FullScanRequest`].
+#[must_use]
+#[derive(Debug)]
 pub struct FullScanResult<K, A = ConfirmationBlockTime> {
     /// The update to apply to the receiving [`LocalChain`](crate::local_chain::LocalChain).
     pub graph_update: TxGraph<A>,
     /// The update to apply to the receiving [`TxGraph`].
-    pub chain_update: CheckPoint,
+    pub chain_update: Option<CheckPoint>,
     /// Last active indices for the corresponding keychains (`K`).
     pub last_active_indices: BTreeMap<K, u32>,
 }
 
-/// A version of [`core::iter::Chain`] which can combine two [`ExactSizeIterator`]s to form a new
-/// [`ExactSizeIterator`].
-///
-/// The danger of this is explained in [the `ExactSizeIterator` docs]
-/// (https://doc.rust-lang.org/core/iter/trait.ExactSizeIterator.html#when-shouldnt-an-adapter-be-exactsizeiterator).
-/// This does not apply here since it would be impossible to scan an item count that overflows
-/// `usize` anyway.
-struct ExactSizeChain<A, B, I> {
-    a: Option<A>,
-    b: Option<B>,
-    i: PhantomData<I>,
-}
-
-impl<A, B, I> ExactSizeChain<A, B, I> {
-    fn new(a: A, b: B) -> Self {
-        ExactSizeChain {
-            a: Some(a),
-            b: Some(b),
-            i: PhantomData,
+impl<K, A> Default for FullScanResult<K, A> {
+    fn default() -> Self {
+        Self {
+            graph_update: Default::default(),
+            chain_update: Default::default(),
+            last_active_indices: Default::default(),
         }
     }
 }
 
-impl<A, B, I> Iterator for ExactSizeChain<A, B, I>
-where
-    A: Iterator<Item = I>,
-    B: Iterator<Item = I>,
-{
-    type Item = I;
+struct KeychainSpkIter<'r, K> {
+    keychain: K,
+    spks: Option<&'r mut Box<dyn Iterator<Item = Indexed<ScriptBuf>> + Send>>,
+    inspect: &'r mut Box<InspectFullScan<K>>,
+}
+
+impl<'r, K: Ord + Clone> Iterator for KeychainSpkIter<'r, K> {
+    type Item = Indexed<ScriptBuf>;
 
     fn next(&mut self) -> Option<Self::Item> {
-        if let Some(a) = &mut self.a {
-            let item = a.next();
-            if item.is_some() {
-                return item;
-            }
-            self.a = None;
-        }
-        if let Some(b) = &mut self.b {
-            let item = b.next();
-            if item.is_some() {
-                return item;
-            }
-            self.b = None;
+        let (i, spk) = self.spks.as_mut()?.next()?;
+        (*self.inspect)(self.keychain.clone(), i, &spk);
+        Some((i, spk))
+    }
+}
+
+struct SyncIter<'r, I, Item> {
+    request: &'r mut SyncRequest<I>,
+    marker: core::marker::PhantomData<Item>,
+}
+
+impl<'r, I, Item> SyncIter<'r, I, Item> {
+    fn new(request: &'r mut SyncRequest<I>) -> Self {
+        Self {
+            request,
+            marker: core::marker::PhantomData,
         }
-        None
     }
 }
 
-impl<A, B, I> ExactSizeIterator for ExactSizeChain<A, B, I>
-where
-    A: ExactSizeIterator<Item = I>,
-    B: ExactSizeIterator<Item = I>,
-{
-    fn len(&self) -> usize {
-        let a_len = self.a.as_ref().map(|a| a.len()).unwrap_or(0);
-        let b_len = self.b.as_ref().map(|a| a.len()).unwrap_or(0);
-        a_len + b_len
+impl<'r, I, Item> ExactSizeIterator for SyncIter<'r, I, Item> where SyncIter<'r, I, Item>: Iterator {}
+
+impl<'r, I> Iterator for SyncIter<'r, I, ScriptBuf> {
+    type Item = ScriptBuf;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.request.next_spk()
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        let consumed = self.request.spks_consumed;
+        (consumed, Some(consumed))
+    }
+}
+
+impl<'r, I> Iterator for SyncIter<'r, I, Txid> {
+    type Item = Txid;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.request.next_txid()
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        let consumed = self.request.txids_consumed;
+        (consumed, Some(consumed))
+    }
+}
+
+impl<'r, I> Iterator for SyncIter<'r, I, OutPoint> {
+    type Item = OutPoint;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.request.next_outpoint()
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        let consumed = self.request.outpoints_consumed;
+        (consumed, Some(consumed))
     }
 }
index 9dfbdab7330e353dc329ebbcfa260b8b8d1c3534..3584e8bb623535769f7ac6d4b004dcf8e8093114 100644 (file)
@@ -126,17 +126,22 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
     /// [`Wallet.calculate_fee_rate`]: https://docs.rs/bdk_wallet/latest/bdk_wallet/struct.Wallet.html#method.calculate_fee_rate
     pub fn full_scan<K: Ord + Clone>(
         &self,
-        request: FullScanRequest<K>,
+        request: impl Into<FullScanRequest<K>>,
         stop_gap: usize,
         batch_size: usize,
         fetch_prev_txouts: bool,
     ) -> Result<FullScanResult<K>, Error> {
-        let (tip, latest_blocks) =
-            fetch_tip_and_latest_blocks(&self.inner, request.chain_tip.clone())?;
-        let mut graph_update = TxGraph::<ConfirmationBlockTime>::default();
-        let mut last_active_indices = BTreeMap::<K, u32>::new();
+        let mut request: FullScanRequest<K> = request.into();
+
+        let tip_and_latest_blocks = match request.chain_tip() {
+            Some(chain_tip) => Some(fetch_tip_and_latest_blocks(&self.inner, chain_tip)?),
+            None => None,
+        };
 
-        for (keychain, spks) in request.spks_by_keychain {
+        let mut graph_update = TxGraph::<ConfirmationBlockTime>::default();
+        let mut last_active_indices = BTreeMap::<K, u32>::default();
+        for keychain in request.keychains() {
+            let spks = request.iter_spks(keychain.clone());
             if let Some(last_active_index) =
                 self.populate_with_spks(&mut graph_update, spks, stop_gap, batch_size)?
             {
@@ -144,13 +149,20 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
             }
         }
 
-        let chain_update = chain_update(tip, &latest_blocks, graph_update.all_anchors())?;
-
         // Fetch previous `TxOut`s for fee calculation if flag is enabled.
         if fetch_prev_txouts {
             self.fetch_prev_txout(&mut graph_update)?;
         }
 
+        let chain_update = match tip_and_latest_blocks {
+            Some((chain_tip, latest_blocks)) => Some(chain_update(
+                chain_tip,
+                &latest_blocks,
+                graph_update.all_anchors(),
+            )?),
+            _ => None,
+        };
+
         Ok(FullScanResult {
             graph_update,
             chain_update,
@@ -180,35 +192,52 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
     /// [`CalculateFeeError::MissingTxOut`]: bdk_chain::tx_graph::CalculateFeeError::MissingTxOut
     /// [`Wallet.calculate_fee`]: https://docs.rs/bdk_wallet/latest/bdk_wallet/struct.Wallet.html#method.calculate_fee
     /// [`Wallet.calculate_fee_rate`]: https://docs.rs/bdk_wallet/latest/bdk_wallet/struct.Wallet.html#method.calculate_fee_rate
-    pub fn sync(
+    pub fn sync<I: 'static>(
         &self,
-        request: SyncRequest,
+        request: impl Into<SyncRequest<I>>,
         batch_size: usize,
         fetch_prev_txouts: bool,
     ) -> Result<SyncResult, Error> {
-        let full_scan_req = FullScanRequest::from_chain_tip(request.chain_tip.clone())
-            .set_spks_for_keychain((), request.spks.enumerate().map(|(i, spk)| (i as u32, spk)));
-        let mut full_scan_res = self.full_scan(full_scan_req, usize::MAX, batch_size, false)?;
-        let (tip, latest_blocks) =
-            fetch_tip_and_latest_blocks(&self.inner, request.chain_tip.clone())?;
-
-        self.populate_with_txids(&mut full_scan_res.graph_update, request.txids)?;
-        self.populate_with_outpoints(&mut full_scan_res.graph_update, request.outpoints)?;
-
-        let chain_update = chain_update(
-            tip,
-            &latest_blocks,
-            full_scan_res.graph_update.all_anchors(),
-        )?;
+        let mut request: SyncRequest<I> = request.into();
+
+        let tip_and_latest_blocks = match request.chain_tip() {
+            Some(chain_tip) => Some(fetch_tip_and_latest_blocks(&self.inner, chain_tip)?),
+            None => None,
+        };
+
+        let full_scan_request = FullScanRequest::builder()
+            .spks_for_keychain(
+                (),
+                request
+                    .iter_spks()
+                    .enumerate()
+                    .map(|(i, spk)| (i as u32, spk))
+                    .collect::<Vec<_>>(),
+            )
+            .build();
+        let mut graph_update = self
+            .full_scan(full_scan_request, usize::MAX, batch_size, false)?
+            .graph_update;
+        self.populate_with_txids(&mut graph_update, request.iter_txids())?;
+        self.populate_with_outpoints(&mut graph_update, request.iter_outpoints())?;
 
         // Fetch previous `TxOut`s for fee calculation if flag is enabled.
         if fetch_prev_txouts {
-            self.fetch_prev_txout(&mut full_scan_res.graph_update)?;
+            self.fetch_prev_txout(&mut graph_update)?;
         }
 
+        let chain_update = match tip_and_latest_blocks {
+            Some((chain_tip, latest_blocks)) => Some(chain_update(
+                chain_tip,
+                &latest_blocks,
+                graph_update.all_anchors(),
+            )?),
+            None => None,
+        };
+
         Ok(SyncResult {
+            graph_update,
             chain_update,
-            graph_update: full_scan_res.graph_update,
         })
     }
 
index afe50be0a1db37b7a6382832fc11dd22b9a42733..63e91081b61c1e711b25d7578d9f15e0e2d7d1e4 100644 (file)
@@ -39,7 +39,7 @@ where
     Spks::IntoIter: ExactSizeIterator + Send + 'static,
 {
     let mut update = client.sync(
-        SyncRequest::from_chain_tip(chain.tip()).chain_spks(spks),
+        SyncRequest::builder().chain_tip(chain.tip()).spks(spks),
         BATCH_SIZE,
         true,
     )?;
@@ -51,9 +51,11 @@ where
         .as_secs();
     let _ = update.graph_update.update_last_seen_unconfirmed(now);
 
-    let _ = chain
-        .apply_update(update.chain_update.clone())
-        .map_err(|err| anyhow::anyhow!("LocalChain update error: {:?}", err))?;
+    if let Some(chain_update) = update.chain_update.clone() {
+        let _ = chain
+            .apply_update(chain_update)
+            .map_err(|err| anyhow::anyhow!("LocalChain update error: {:?}", err))?;
+    }
     let _ = graph.apply_update(update.graph_update.clone());
 
     Ok(update)
@@ -103,7 +105,9 @@ pub fn test_update_tx_graph_without_keychain() -> anyhow::Result<()> {
     let cp_tip = env.make_checkpoint_tip();
 
     let sync_update = {
-        let request = SyncRequest::from_chain_tip(cp_tip.clone()).set_spks(misc_spks);
+        let request = SyncRequest::builder()
+            .chain_tip(cp_tip.clone())
+            .spks(misc_spks);
         client.sync(request, 1, true)?
     };
 
@@ -207,15 +211,17 @@ pub fn test_update_tx_graph_stop_gap() -> anyhow::Result<()> {
     // A scan with a stop_gap of 3 won't find the transaction, but a scan with a gap limit of 4
     // will.
     let full_scan_update = {
-        let request =
-            FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone());
+        let request = FullScanRequest::builder()
+            .chain_tip(cp_tip.clone())
+            .spks_for_keychain(0, spks.clone());
         client.full_scan(request, 3, 1, false)?
     };
     assert!(full_scan_update.graph_update.full_txs().next().is_none());
     assert!(full_scan_update.last_active_indices.is_empty());
     let full_scan_update = {
-        let request =
-            FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone());
+        let request = FullScanRequest::builder()
+            .chain_tip(cp_tip.clone())
+            .spks_for_keychain(0, spks.clone());
         client.full_scan(request, 4, 1, false)?
     };
     assert_eq!(
@@ -246,8 +252,9 @@ pub fn test_update_tx_graph_stop_gap() -> anyhow::Result<()> {
     // A scan with gap limit 5 won't find the second transaction, but a scan with gap limit 6 will.
     // The last active indice won't be updated in the first case but will in the second one.
     let full_scan_update = {
-        let request =
-            FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone());
+        let request = FullScanRequest::builder()
+            .chain_tip(cp_tip.clone())
+            .spks_for_keychain(0, spks.clone());
         client.full_scan(request, 5, 1, false)?
     };
     let txs: HashSet<_> = full_scan_update
@@ -259,8 +266,9 @@ pub fn test_update_tx_graph_stop_gap() -> anyhow::Result<()> {
     assert!(txs.contains(&txid_4th_addr));
     assert_eq!(full_scan_update.last_active_indices[&0], 3);
     let full_scan_update = {
-        let request =
-            FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone());
+        let request = FullScanRequest::builder()
+            .chain_tip(cp_tip.clone())
+            .spks_for_keychain(0, spks.clone());
         client.full_scan(request, 6, 1, false)?
     };
     let txs: HashSet<_> = full_scan_update
@@ -311,7 +319,7 @@ fn test_sync() -> anyhow::Result<()> {
     let txid = env.send(&addr_to_track, SEND_AMOUNT)?;
     env.wait_until_electrum_sees_txid(txid, Duration::from_secs(6))?;
 
-    sync_with_electrum(
+    let _ = sync_with_electrum(
         &client,
         [spk_to_track.clone()],
         &mut recv_chain,
@@ -332,7 +340,7 @@ fn test_sync() -> anyhow::Result<()> {
     env.mine_blocks(1, None)?;
     env.wait_until_electrum_sees_block(Duration::from_secs(6))?;
 
-    sync_with_electrum(
+    let _ = sync_with_electrum(
         &client,
         [spk_to_track.clone()],
         &mut recv_chain,
@@ -353,7 +361,7 @@ fn test_sync() -> anyhow::Result<()> {
     env.reorg_empty_blocks(1)?;
     env.wait_until_electrum_sees_block(Duration::from_secs(6))?;
 
-    sync_with_electrum(
+    let _ = sync_with_electrum(
         &client,
         [spk_to_track.clone()],
         &mut recv_chain,
@@ -373,7 +381,7 @@ fn test_sync() -> anyhow::Result<()> {
     env.mine_blocks(1, None)?;
     env.wait_until_electrum_sees_block(Duration::from_secs(6))?;
 
-    sync_with_electrum(&client, [spk_to_track], &mut recv_chain, &mut recv_graph)?;
+    let _ = sync_with_electrum(&client, [spk_to_track], &mut recv_chain, &mut recv_graph)?;
 
     // Check if balance is correct once transaction is confirmed again.
     assert_eq!(
index fed6dd526a89a786d808bb9888e0f61193479489..066b91e170466496b1c3900286f2f2bb0a77c5fe 100644 (file)
@@ -1,5 +1,4 @@
 use std::collections::BTreeSet;
-use std::usize;
 
 use async_trait::async_trait;
 use bdk_chain::spk_client::{FullScanRequest, FullScanResult, SyncRequest, SyncResult};
@@ -33,9 +32,9 @@ pub trait EsploraAsyncExt {
     /// the maximum number of HTTP requests to make in parallel.
     ///
     /// Refer to [crate-level docs](crate) for more.
-    async fn full_scan<K: Ord + Clone + Send>(
+    async fn full_scan<K: Ord + Clone + Send, R: Into<FullScanRequest<K>> + Send>(
         &self,
-        request: FullScanRequest<K>,
+        request: R,
         stop_gap: usize,
         parallel_requests: usize,
     ) -> Result<FullScanResult<K>, Error>;
@@ -47,105 +46,52 @@ pub trait EsploraAsyncExt {
     /// in parallel.
     ///
     /// Refer to [crate-level docs](crate) for more.
-    async fn sync(
+    async fn sync<I: Send, R: Into<SyncRequest<I>> + Send>(
         &self,
-        request: SyncRequest,
+        request: R,
         parallel_requests: usize,
     ) -> Result<SyncResult, Error>;
-
-    /// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning
-    /// `keychain_spks` against Esplora.
-    ///
-    /// `keychain_spks` is an *unbounded* indexed-[`ScriptBuf`] iterator that represents scripts
-    /// derived from a keychain. The scanning logic stops after a `stop_gap` number of consecutive
-    /// scripts with no transaction history is reached. `parallel_requests` specifies the maximum
-    /// number of HTTP requests to make in parallel.
-    ///
-    /// A [`TxGraph`] (containing the fetched transactions and anchors) and the last active
-    /// keychain index (if any) is returned. The last active keychain index is the keychain's last
-    /// script pubkey that contains a non-empty transaction history.
-    ///
-    /// Refer to [crate-level docs](crate) for more.
-    async fn fetch_txs_with_keychain_spks<I: Iterator<Item = Indexed<ScriptBuf>> + Send>(
-        &self,
-        keychain_spks: I,
-        stop_gap: usize,
-        parallel_requests: usize,
-    ) -> Result<(TxGraph<ConfirmationBlockTime>, Option<u32>), Error>;
-
-    /// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning `spks`
-    /// against Esplora.
-    ///
-    /// Unlike with [`EsploraAsyncExt::fetch_txs_with_keychain_spks`], `spks` must be *bounded* as
-    /// all contained scripts will be scanned. `parallel_requests` specifies the maximum number of
-    /// HTTP requests to make in parallel.
-    ///
-    /// Refer to [crate-level docs](crate) for more.
-    async fn fetch_txs_with_spks<I: IntoIterator<Item = ScriptBuf> + Send>(
-        &self,
-        spks: I,
-        parallel_requests: usize,
-    ) -> Result<TxGraph<ConfirmationBlockTime>, Error>
-    where
-        I::IntoIter: ExactSizeIterator + Send;
-
-    /// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning `txids`
-    /// against Esplora.
-    ///
-    /// `parallel_requests` specifies the maximum number of HTTP requests to make in parallel.
-    ///
-    /// Refer to [crate-level docs](crate) for more.
-    async fn fetch_txs_with_txids<I: IntoIterator<Item = Txid> + Send>(
-        &self,
-        txids: I,
-        parallel_requests: usize,
-    ) -> Result<TxGraph<ConfirmationBlockTime>, Error>
-    where
-        I::IntoIter: ExactSizeIterator + Send;
-
-    /// Fetch transactions and [`ConfirmationBlockTime`]s that contain and spend the provided
-    /// `outpoints`.
-    ///
-    /// `parallel_requests` specifies the maximum number of HTTP requests to make in parallel.
-    ///
-    /// Refer to [crate-level docs](crate) for more.
-    async fn fetch_txs_with_outpoints<I: IntoIterator<Item = OutPoint> + Send>(
-        &self,
-        outpoints: I,
-        parallel_requests: usize,
-    ) -> Result<TxGraph<ConfirmationBlockTime>, Error>
-    where
-        I::IntoIter: ExactSizeIterator + Send;
 }
 
 #[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
 #[cfg_attr(not(target_arch = "wasm32"), async_trait)]
 impl EsploraAsyncExt for esplora_client::AsyncClient {
-    async fn full_scan<K: Ord + Clone + Send>(
+    async fn full_scan<K: Ord + Clone + Send, R: Into<FullScanRequest<K>> + Send>(
         &self,
-        request: FullScanRequest<K>,
+        request: R,
         stop_gap: usize,
         parallel_requests: usize,
     ) -> Result<FullScanResult<K>, Error> {
-        let latest_blocks = fetch_latest_blocks(self).await?;
+        let mut request = request.into();
+        let keychains = request.keychains();
+
+        let chain_tip = request.chain_tip();
+        let latest_blocks = if chain_tip.is_some() {
+            Some(fetch_latest_blocks(self).await?)
+        } else {
+            None
+        };
+
         let mut graph_update = TxGraph::default();
         let mut last_active_indices = BTreeMap::<K, u32>::new();
-        for (keychain, keychain_spks) in request.spks_by_keychain {
-            let (tx_graph, last_active_index) = self
-                .fetch_txs_with_keychain_spks(keychain_spks, stop_gap, parallel_requests)
-                .await?;
+        for keychain in keychains {
+            let keychain_spks = request.iter_spks(keychain.clone());
+            let (tx_graph, last_active_index) =
+                fetch_txs_with_keychain_spks(self, keychain_spks, stop_gap, parallel_requests)
+                    .await?;
             let _ = graph_update.apply_update(tx_graph);
             if let Some(last_active_index) = last_active_index {
                 last_active_indices.insert(keychain, last_active_index);
             }
         }
-        let chain_update = chain_update(
-            self,
-            &latest_blocks,
-            &request.chain_tip,
-            graph_update.all_anchors(),
-        )
-        .await?;
+
+        let chain_update = match (chain_tip, latest_blocks) {
+            (Some(chain_tip), Some(latest_blocks)) => Some(
+                chain_update(self, &latest_blocks, &chain_tip, graph_update.all_anchors()).await?,
+            ),
+            _ => None,
+        };
+
         Ok(FullScanResult {
             chain_update,
             graph_update,
@@ -153,231 +99,42 @@ impl EsploraAsyncExt for esplora_client::AsyncClient {
         })
     }
 
-    async fn sync(
+    async fn sync<I: Send, R: Into<SyncRequest<I>> + Send>(
         &self,
-        request: SyncRequest,
+        request: R,
         parallel_requests: usize,
     ) -> Result<SyncResult, Error> {
-        let latest_blocks = fetch_latest_blocks(self).await?;
-        let mut graph_update = TxGraph::default();
-        let _ = graph_update.apply_update(
-            self.fetch_txs_with_spks(request.spks, parallel_requests)
-                .await?,
-        );
+        let mut request = request.into();
+
+        let chain_tip = request.chain_tip();
+        let latest_blocks = if chain_tip.is_some() {
+            Some(fetch_latest_blocks(self).await?)
+        } else {
+            None
+        };
+
+        let mut graph_update = TxGraph::<ConfirmationBlockTime>::default();
+        let _ = graph_update
+            .apply_update(fetch_txs_with_spks(self, request.iter_spks(), parallel_requests).await?);
         let _ = graph_update.apply_update(
-            self.fetch_txs_with_txids(request.txids, parallel_requests)
-                .await?,
+            fetch_txs_with_txids(self, request.iter_txids(), parallel_requests).await?,
         );
         let _ = graph_update.apply_update(
-            self.fetch_txs_with_outpoints(request.outpoints, parallel_requests)
-                .await?,
+            fetch_txs_with_outpoints(self, request.iter_outpoints(), parallel_requests).await?,
         );
-        let chain_update = chain_update(
-            self,
-            &latest_blocks,
-            &request.chain_tip,
-            graph_update.all_anchors(),
-        )
-        .await?;
+
+        let chain_update = match (chain_tip, latest_blocks) {
+            (Some(chain_tip), Some(latest_blocks)) => Some(
+                chain_update(self, &latest_blocks, &chain_tip, graph_update.all_anchors()).await?,
+            ),
+            _ => None,
+        };
+
         Ok(SyncResult {
             chain_update,
             graph_update,
         })
     }
-
-    async fn fetch_txs_with_keychain_spks<I: Iterator<Item = Indexed<ScriptBuf>> + Send>(
-        &self,
-        mut keychain_spks: I,
-        stop_gap: usize,
-        parallel_requests: usize,
-    ) -> Result<(TxGraph<ConfirmationBlockTime>, Option<u32>), Error> {
-        type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>);
-
-        let mut tx_graph = TxGraph::default();
-        let mut last_index = Option::<u32>::None;
-        let mut last_active_index = Option::<u32>::None;
-
-        loop {
-            let handles = keychain_spks
-                .by_ref()
-                .take(parallel_requests)
-                .map(|(spk_index, spk)| {
-                    let client = self.clone();
-                    async move {
-                        let mut last_seen = None;
-                        let mut spk_txs = Vec::new();
-                        loop {
-                            let txs = client.scripthash_txs(&spk, last_seen).await?;
-                            let tx_count = txs.len();
-                            last_seen = txs.last().map(|tx| tx.txid);
-                            spk_txs.extend(txs);
-                            if tx_count < 25 {
-                                break Result::<_, Error>::Ok((spk_index, spk_txs));
-                            }
-                        }
-                    }
-                })
-                .collect::<FuturesOrdered<_>>();
-
-            if handles.is_empty() {
-                break;
-            }
-
-            for (index, txs) in handles.try_collect::<Vec<TxsOfSpkIndex>>().await? {
-                last_index = Some(index);
-                if !txs.is_empty() {
-                    last_active_index = Some(index);
-                }
-                for tx in txs {
-                    let _ = tx_graph.insert_tx(tx.to_tx());
-                    insert_anchor_from_status(&mut tx_graph, tx.txid, tx.status);
-                    insert_prevouts(&mut tx_graph, tx.vin);
-                }
-            }
-
-            let last_index = last_index.expect("Must be set since handles wasn't empty.");
-            let gap_limit_reached = if let Some(i) = last_active_index {
-                last_index >= i.saturating_add(stop_gap as u32)
-            } else {
-                last_index + 1 >= stop_gap as u32
-            };
-            if gap_limit_reached {
-                break;
-            }
-        }
-
-        Ok((tx_graph, last_active_index))
-    }
-
-    async fn fetch_txs_with_spks<I: IntoIterator<Item = ScriptBuf> + Send>(
-        &self,
-        spks: I,
-        parallel_requests: usize,
-    ) -> Result<TxGraph<ConfirmationBlockTime>, Error>
-    where
-        I::IntoIter: ExactSizeIterator + Send,
-    {
-        self.fetch_txs_with_keychain_spks(
-            spks.into_iter().enumerate().map(|(i, spk)| (i as u32, spk)),
-            usize::MAX,
-            parallel_requests,
-        )
-        .await
-        .map(|(tx_graph, _)| tx_graph)
-    }
-
-    async fn fetch_txs_with_txids<I: IntoIterator<Item = Txid> + Send>(
-        &self,
-        txids: I,
-        parallel_requests: usize,
-    ) -> Result<TxGraph<ConfirmationBlockTime>, Error>
-    where
-        I::IntoIter: ExactSizeIterator + Send,
-    {
-        enum EsploraResp {
-            TxStatus(TxStatus),
-            Tx(Option<Tx>),
-        }
-
-        let mut tx_graph = TxGraph::default();
-        let mut txids = txids.into_iter();
-        loop {
-            let handles = txids
-                .by_ref()
-                .take(parallel_requests)
-                .map(|txid| {
-                    let client = self.clone();
-                    let tx_already_exists = tx_graph.get_tx(txid).is_some();
-                    async move {
-                        if tx_already_exists {
-                            client
-                                .get_tx_status(&txid)
-                                .await
-                                .map(|s| (txid, EsploraResp::TxStatus(s)))
-                        } else {
-                            client
-                                .get_tx_info(&txid)
-                                .await
-                                .map(|t| (txid, EsploraResp::Tx(t)))
-                        }
-                    }
-                })
-                .collect::<FuturesOrdered<_>>();
-
-            if handles.is_empty() {
-                break;
-            }
-
-            for (txid, resp) in handles.try_collect::<Vec<_>>().await? {
-                match resp {
-                    EsploraResp::TxStatus(status) => {
-                        insert_anchor_from_status(&mut tx_graph, txid, status);
-                    }
-                    EsploraResp::Tx(Some(tx_info)) => {
-                        let _ = tx_graph.insert_tx(tx_info.to_tx());
-                        insert_anchor_from_status(&mut tx_graph, txid, tx_info.status);
-                        insert_prevouts(&mut tx_graph, tx_info.vin);
-                    }
-                    _ => continue,
-                }
-            }
-        }
-        Ok(tx_graph)
-    }
-
-    async fn fetch_txs_with_outpoints<I: IntoIterator<Item = OutPoint> + Send>(
-        &self,
-        outpoints: I,
-        parallel_requests: usize,
-    ) -> Result<TxGraph<ConfirmationBlockTime>, Error>
-    where
-        I::IntoIter: ExactSizeIterator + Send,
-    {
-        let outpoints = outpoints.into_iter().collect::<Vec<_>>();
-
-        // make sure txs exists in graph and tx statuses are updated
-        // TODO: We should maintain a tx cache (like we do with Electrum).
-        let mut tx_graph = self
-            .fetch_txs_with_txids(outpoints.iter().map(|op| op.txid), parallel_requests)
-            .await?;
-
-        // get outpoint spend-statuses
-        let mut outpoints = outpoints.into_iter();
-        let mut missing_txs = Vec::<Txid>::with_capacity(outpoints.len());
-        loop {
-            let handles = outpoints
-                .by_ref()
-                .take(parallel_requests)
-                .map(|op| {
-                    let client = self.clone();
-                    async move { client.get_output_status(&op.txid, op.vout as _).await }
-                })
-                .collect::<FuturesOrdered<_>>();
-
-            if handles.is_empty() {
-                break;
-            }
-
-            for op_status in handles.try_collect::<Vec<_>>().await?.into_iter().flatten() {
-                let spend_txid = match op_status.txid {
-                    Some(txid) => txid,
-                    None => continue,
-                };
-                if tx_graph.get_tx(spend_txid).is_none() {
-                    missing_txs.push(spend_txid);
-                }
-                if let Some(spend_status) = op_status.status {
-                    insert_anchor_from_status(&mut tx_graph, spend_txid, spend_status);
-                }
-            }
-        }
-
-        let _ = tx_graph.apply_update(
-            self.fetch_txs_with_txids(missing_txs, parallel_requests)
-                .await?,
-        );
-        Ok(tx_graph)
-    }
 }
 
 /// Fetch latest blocks from Esplora in an atomic call.
@@ -480,6 +237,235 @@ async fn chain_update<A: Anchor>(
     Ok(tip)
 }
 
+/// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning
+/// `keychain_spks` against Esplora.
+///
+/// `keychain_spks` is an *unbounded* indexed-[`ScriptBuf`] iterator that represents scripts
+/// derived from a keychain. The scanning logic stops after a `stop_gap` number of consecutive
+/// scripts with no transaction history is reached. `parallel_requests` specifies the maximum
+/// number of HTTP requests to make in parallel.
+///
+/// A [`TxGraph`] (containing the fetched transactions and anchors) and the last active
+/// keychain index (if any) is returned. The last active keychain index is the keychain's last
+/// script pubkey that contains a non-empty transaction history.
+///
+/// Refer to [crate-level docs](crate) for more.
+async fn fetch_txs_with_keychain_spks<I: Iterator<Item = Indexed<ScriptBuf>> + Send>(
+    client: &esplora_client::AsyncClient,
+    mut keychain_spks: I,
+    stop_gap: usize,
+    parallel_requests: usize,
+) -> Result<(TxGraph<ConfirmationBlockTime>, Option<u32>), Error> {
+    type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>);
+
+    let mut tx_graph = TxGraph::default();
+    let mut last_index = Option::<u32>::None;
+    let mut last_active_index = Option::<u32>::None;
+
+    loop {
+        let handles = keychain_spks
+            .by_ref()
+            .take(parallel_requests)
+            .map(|(spk_index, spk)| {
+                let client = client.clone();
+                async move {
+                    let mut last_seen = None;
+                    let mut spk_txs = Vec::new();
+                    loop {
+                        let txs = client.scripthash_txs(&spk, last_seen).await?;
+                        let tx_count = txs.len();
+                        last_seen = txs.last().map(|tx| tx.txid);
+                        spk_txs.extend(txs);
+                        if tx_count < 25 {
+                            break Result::<_, Error>::Ok((spk_index, spk_txs));
+                        }
+                    }
+                }
+            })
+            .collect::<FuturesOrdered<_>>();
+
+        if handles.is_empty() {
+            break;
+        }
+
+        for (index, txs) in handles.try_collect::<Vec<TxsOfSpkIndex>>().await? {
+            last_index = Some(index);
+            if !txs.is_empty() {
+                last_active_index = Some(index);
+            }
+            for tx in txs {
+                let _ = tx_graph.insert_tx(tx.to_tx());
+                insert_anchor_from_status(&mut tx_graph, tx.txid, tx.status);
+                insert_prevouts(&mut tx_graph, tx.vin);
+            }
+        }
+
+        let last_index = last_index.expect("Must be set since handles wasn't empty.");
+        let gap_limit_reached = if let Some(i) = last_active_index {
+            last_index >= i.saturating_add(stop_gap as u32)
+        } else {
+            last_index + 1 >= stop_gap as u32
+        };
+        if gap_limit_reached {
+            break;
+        }
+    }
+
+    Ok((tx_graph, last_active_index))
+}
+
+/// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning `spks`
+/// against Esplora.
+///
+/// Unlike with [`EsploraAsyncExt::fetch_txs_with_keychain_spks`], `spks` must be *bounded* as
+/// all contained scripts will be scanned. `parallel_requests` specifies the maximum number of
+/// HTTP requests to make in parallel.
+///
+/// Refer to [crate-level docs](crate) for more.
+async fn fetch_txs_with_spks<I: IntoIterator<Item = ScriptBuf> + Send>(
+    client: &esplora_client::AsyncClient,
+    spks: I,
+    parallel_requests: usize,
+) -> Result<TxGraph<ConfirmationBlockTime>, Error>
+where
+    I::IntoIter: Send,
+{
+    fetch_txs_with_keychain_spks(
+        client,
+        spks.into_iter().enumerate().map(|(i, spk)| (i as u32, spk)),
+        usize::MAX,
+        parallel_requests,
+    )
+    .await
+    .map(|(tx_graph, _)| tx_graph)
+}
+
+/// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning `txids`
+/// against Esplora.
+///
+/// `parallel_requests` specifies the maximum number of HTTP requests to make in parallel.
+///
+/// Refer to [crate-level docs](crate) for more.
+async fn fetch_txs_with_txids<I: IntoIterator<Item = Txid> + Send>(
+    client: &esplora_client::AsyncClient,
+    txids: I,
+    parallel_requests: usize,
+) -> Result<TxGraph<ConfirmationBlockTime>, Error>
+where
+    I::IntoIter: Send,
+{
+    enum EsploraResp {
+        TxStatus(TxStatus),
+        Tx(Option<Tx>),
+    }
+
+    let mut tx_graph = TxGraph::default();
+    let mut txids = txids.into_iter();
+    loop {
+        let handles = txids
+            .by_ref()
+            .take(parallel_requests)
+            .map(|txid| {
+                let client = client.clone();
+                let tx_already_exists = tx_graph.get_tx(txid).is_some();
+                async move {
+                    if tx_already_exists {
+                        client
+                            .get_tx_status(&txid)
+                            .await
+                            .map(|s| (txid, EsploraResp::TxStatus(s)))
+                    } else {
+                        client
+                            .get_tx_info(&txid)
+                            .await
+                            .map(|t| (txid, EsploraResp::Tx(t)))
+                    }
+                }
+            })
+            .collect::<FuturesOrdered<_>>();
+
+        if handles.is_empty() {
+            break;
+        }
+
+        for (txid, resp) in handles.try_collect::<Vec<_>>().await? {
+            match resp {
+                EsploraResp::TxStatus(status) => {
+                    insert_anchor_from_status(&mut tx_graph, txid, status);
+                }
+                EsploraResp::Tx(Some(tx_info)) => {
+                    let _ = tx_graph.insert_tx(tx_info.to_tx());
+                    insert_anchor_from_status(&mut tx_graph, txid, tx_info.status);
+                    insert_prevouts(&mut tx_graph, tx_info.vin);
+                }
+                _ => continue,
+            }
+        }
+    }
+    Ok(tx_graph)
+}
+
+/// Fetch transactions and [`ConfirmationBlockTime`]s that contain and spend the provided
+/// `outpoints`.
+///
+/// `parallel_requests` specifies the maximum number of HTTP requests to make in parallel.
+///
+/// Refer to [crate-level docs](crate) for more.
+async fn fetch_txs_with_outpoints<I: IntoIterator<Item = OutPoint> + Send>(
+    client: &esplora_client::AsyncClient,
+    outpoints: I,
+    parallel_requests: usize,
+) -> Result<TxGraph<ConfirmationBlockTime>, Error>
+where
+    I::IntoIter: Send,
+{
+    let outpoints = outpoints.into_iter().collect::<Vec<_>>();
+
+    // make sure txs exists in graph and tx statuses are updated
+    // TODO: We should maintain a tx cache (like we do with Electrum).
+    let mut tx_graph = fetch_txs_with_txids(
+        client,
+        outpoints.iter().copied().map(|op| op.txid),
+        parallel_requests,
+    )
+    .await?;
+
+    // get outpoint spend-statuses
+    let mut outpoints = outpoints.into_iter();
+    let mut missing_txs = Vec::<Txid>::with_capacity(outpoints.len());
+    loop {
+        let handles = outpoints
+            .by_ref()
+            .take(parallel_requests)
+            .map(|op| {
+                let client = client.clone();
+                async move { client.get_output_status(&op.txid, op.vout as _).await }
+            })
+            .collect::<FuturesOrdered<_>>();
+
+        if handles.is_empty() {
+            break;
+        }
+
+        for op_status in handles.try_collect::<Vec<_>>().await?.into_iter().flatten() {
+            let spend_txid = match op_status.txid {
+                Some(txid) => txid,
+                None => continue,
+            };
+            if tx_graph.get_tx(spend_txid).is_none() {
+                missing_txs.push(spend_txid);
+            }
+            if let Some(spend_status) = op_status.status {
+                insert_anchor_from_status(&mut tx_graph, spend_txid, spend_status);
+            }
+        }
+    }
+
+    let _ =
+        tx_graph.apply_update(fetch_txs_with_txids(client, missing_txs, parallel_requests).await?);
+    Ok(tx_graph)
+}
+
 #[cfg(test)]
 mod test {
     use std::{collections::BTreeSet, time::Duration};
index 81ce76848bced35fb25fec47a82339d63545108c..6e3e25afe70e613dcc1b4fd337757f339db7c2e0 100644 (file)
@@ -29,9 +29,9 @@ pub trait EsploraExt {
     /// the maximum number of HTTP requests to make in parallel.
     ///
     /// Refer to [crate-level docs](crate) for more.
-    fn full_scan<K: Ord + Clone>(
+    fn full_scan<K: Ord + Clone, R: Into<FullScanRequest<K>>>(
         &self,
-        request: FullScanRequest<K>,
+        request: R,
         stop_gap: usize,
         parallel_requests: usize,
     ) -> Result<FullScanResult<K>, Error>;
@@ -43,97 +43,51 @@ pub trait EsploraExt {
     /// in parallel.
     ///
     /// Refer to [crate-level docs](crate) for more.
-    fn sync(&self, request: SyncRequest, parallel_requests: usize) -> Result<SyncResult, Error>;
-
-    /// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning
-    /// `keychain_spks` against Esplora.
-    ///
-    /// `keychain_spks` is an *unbounded* indexed-[`ScriptBuf`] iterator that represents scripts
-    /// derived from a keychain. The scanning logic stops after a `stop_gap` number of consecutive
-    /// scripts with no transaction history is reached. `parallel_requests` specifies the maximum
-    /// number of HTTP requests to make in parallel.
-    ///
-    /// A [`TxGraph`] (containing the fetched transactions and anchors) and the last active keychain
-    /// index (if any) is returned. The last active keychain index is the keychain's last script
-    /// pubkey that contains a non-empty transaction history.
-    ///
-    /// Refer to [crate-level docs](crate) for more.
-    fn fetch_txs_with_keychain_spks<I: Iterator<Item = Indexed<ScriptBuf>>>(
-        &self,
-        keychain_spks: I,
-        stop_gap: usize,
-        parallel_requests: usize,
-    ) -> Result<(TxGraph<ConfirmationBlockTime>, Option<u32>), Error>;
-
-    /// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning `spks`
-    /// against Esplora.
-    ///
-    /// Unlike with [`EsploraExt::fetch_txs_with_keychain_spks`], `spks` must be *bounded* as all
-    /// contained scripts will be scanned. `parallel_requests` specifies the maximum number of HTTP
-    /// requests to make in parallel.
-    ///
-    /// Refer to [crate-level docs](crate) for more.
-    fn fetch_txs_with_spks<I: IntoIterator<Item = ScriptBuf>>(
+    fn sync<I: 'static, R: Into<SyncRequest<I>>>(
         &self,
-        spks: I,
+        request: R,
         parallel_requests: usize,
-    ) -> Result<TxGraph<ConfirmationBlockTime>, Error>
-    where
-        I::IntoIter: ExactSizeIterator;
-
-    /// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning `txids`
-    /// against Esplora.
-    ///
-    /// `parallel_requests` specifies the maximum number of HTTP requests to make in parallel.
-    ///
-    /// Refer to [crate-level docs](crate) for more.
-    fn fetch_txs_with_txids<I: IntoIterator<Item = Txid>>(
-        &self,
-        txids: I,
-        parallel_requests: usize,
-    ) -> Result<TxGraph<ConfirmationBlockTime>, Error>
-    where
-        I::IntoIter: ExactSizeIterator;
-
-    /// Fetch transactions and [`ConfirmationBlockTime`]s that contain and spend the provided
-    /// `outpoints`.
-    ///
-    /// `parallel_requests` specifies the maximum number of HTTP requests to make in parallel.
-    ///
-    /// Refer to [crate-level docs](crate) for more.
-    fn fetch_txs_with_outpoints<I: IntoIterator<Item = OutPoint>>(
-        &self,
-        outpoints: I,
-        parallel_requests: usize,
-    ) -> Result<TxGraph<ConfirmationBlockTime>, Error>
-    where
-        I::IntoIter: ExactSizeIterator;
+    ) -> Result<SyncResult, Error>;
 }
 
 impl EsploraExt for esplora_client::BlockingClient {
-    fn full_scan<K: Ord + Clone>(
+    fn full_scan<K: Ord + Clone, R: Into<FullScanRequest<K>>>(
         &self,
-        request: FullScanRequest<K>,
+        request: R,
         stop_gap: usize,
         parallel_requests: usize,
     ) -> Result<FullScanResult<K>, Error> {
-        let latest_blocks = fetch_latest_blocks(self)?;
+        let mut request = request.into();
+
+        let chain_tip = request.chain_tip();
+        let latest_blocks = if chain_tip.is_some() {
+            Some(fetch_latest_blocks(self)?)
+        } else {
+            None
+        };
+
         let mut graph_update = TxGraph::default();
         let mut last_active_indices = BTreeMap::<K, u32>::new();
-        for (keychain, keychain_spks) in request.spks_by_keychain {
+        for keychain in request.keychains() {
+            let keychain_spks = request.iter_spks(keychain.clone());
             let (tx_graph, last_active_index) =
-                self.fetch_txs_with_keychain_spks(keychain_spks, stop_gap, parallel_requests)?;
+                fetch_txs_with_keychain_spks(self, keychain_spks, stop_gap, parallel_requests)?;
             let _ = graph_update.apply_update(tx_graph);
             if let Some(last_active_index) = last_active_index {
                 last_active_indices.insert(keychain, last_active_index);
             }
         }
-        let chain_update = chain_update(
-            self,
-            &latest_blocks,
-            &request.chain_tip,
-            graph_update.all_anchors(),
-        )?;
+
+        let chain_update = match (chain_tip, latest_blocks) {
+            (Some(chain_tip), Some(latest_blocks)) => Some(chain_update(
+                self,
+                &latest_blocks,
+                &chain_tip,
+                graph_update.all_anchors(),
+            )?),
+            _ => None,
+        };
+
         Ok(FullScanResult {
             chain_update,
             graph_update,
@@ -141,224 +95,51 @@ impl EsploraExt for esplora_client::BlockingClient {
         })
     }
 
-    fn sync(&self, request: SyncRequest, parallel_requests: usize) -> Result<SyncResult, Error> {
-        let latest_blocks = fetch_latest_blocks(self)?;
-        let mut graph_update = TxGraph::default();
-        let _ =
-            graph_update.apply_update(self.fetch_txs_with_spks(request.spks, parallel_requests)?);
-        let _ =
-            graph_update.apply_update(self.fetch_txs_with_txids(request.txids, parallel_requests)?);
-        let _ = graph_update
-            .apply_update(self.fetch_txs_with_outpoints(request.outpoints, parallel_requests)?);
-        let chain_update = chain_update(
-            self,
-            &latest_blocks,
-            &request.chain_tip,
-            graph_update.all_anchors(),
-        )?;
-        Ok(SyncResult {
-            chain_update,
-            graph_update,
-        })
-    }
-
-    fn fetch_txs_with_keychain_spks<I: Iterator<Item = Indexed<ScriptBuf>>>(
+    fn sync<I: 'static, R: Into<SyncRequest<I>>>(
         &self,
-        mut keychain_spks: I,
-        stop_gap: usize,
+        request: R,
         parallel_requests: usize,
-    ) -> Result<(TxGraph<ConfirmationBlockTime>, Option<u32>), Error> {
-        type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>);
-
-        let mut tx_graph = TxGraph::default();
-        let mut last_index = Option::<u32>::None;
-        let mut last_active_index = Option::<u32>::None;
-
-        loop {
-            let handles = keychain_spks
-                .by_ref()
-                .take(parallel_requests)
-                .map(|(spk_index, spk)| {
-                    std::thread::spawn({
-                        let client = self.clone();
-                        move || -> Result<TxsOfSpkIndex, Error> {
-                            let mut last_seen = None;
-                            let mut spk_txs = Vec::new();
-                            loop {
-                                let txs = client.scripthash_txs(&spk, last_seen)?;
-                                let tx_count = txs.len();
-                                last_seen = txs.last().map(|tx| tx.txid);
-                                spk_txs.extend(txs);
-                                if tx_count < 25 {
-                                    break Ok((spk_index, spk_txs));
-                                }
-                            }
-                        }
-                    })
-                })
-                .collect::<Vec<JoinHandle<Result<TxsOfSpkIndex, Error>>>>();
-
-            if handles.is_empty() {
-                break;
-            }
+    ) -> Result<SyncResult, Error> {
+        let mut request: SyncRequest<I> = request.into();
 
-            for handle in handles {
-                let (index, txs) = handle.join().expect("thread must not panic")?;
-                last_index = Some(index);
-                if !txs.is_empty() {
-                    last_active_index = Some(index);
-                }
-                for tx in txs {
-                    let _ = tx_graph.insert_tx(tx.to_tx());
-                    insert_anchor_from_status(&mut tx_graph, tx.txid, tx.status);
-                    insert_prevouts(&mut tx_graph, tx.vin);
-                }
-            }
-
-            let last_index = last_index.expect("Must be set since handles wasn't empty.");
-            let gap_limit_reached = if let Some(i) = last_active_index {
-                last_index >= i.saturating_add(stop_gap as u32)
-            } else {
-                last_index + 1 >= stop_gap as u32
-            };
-            if gap_limit_reached {
-                break;
-            }
-        }
-
-        Ok((tx_graph, last_active_index))
-    }
+        let chain_tip = request.chain_tip();
+        let latest_blocks = if chain_tip.is_some() {
+            Some(fetch_latest_blocks(self)?)
+        } else {
+            None
+        };
 
-    fn fetch_txs_with_spks<I: IntoIterator<Item = ScriptBuf>>(
-        &self,
-        spks: I,
-        parallel_requests: usize,
-    ) -> Result<TxGraph<ConfirmationBlockTime>, Error>
-    where
-        I::IntoIter: ExactSizeIterator,
-    {
-        self.fetch_txs_with_keychain_spks(
-            spks.into_iter().enumerate().map(|(i, spk)| (i as u32, spk)),
-            usize::MAX,
+        let mut graph_update = TxGraph::default();
+        let _ = graph_update.apply_update(fetch_txs_with_spks(
+            self,
+            request.iter_spks(),
             parallel_requests,
-        )
-        .map(|(tx_graph, _)| tx_graph)
-    }
-
-    fn fetch_txs_with_txids<I: IntoIterator<Item = Txid>>(
-        &self,
-        txids: I,
-        parallel_requests: usize,
-    ) -> Result<TxGraph<ConfirmationBlockTime>, Error>
-    where
-        I::IntoIter: ExactSizeIterator,
-    {
-        enum EsploraResp {
-            TxStatus(TxStatus),
-            Tx(Option<Tx>),
-        }
-
-        let mut tx_graph = TxGraph::default();
-        let mut txids = txids.into_iter();
-        loop {
-            let handles = txids
-                .by_ref()
-                .take(parallel_requests)
-                .map(|txid| {
-                    let client = self.clone();
-                    let tx_already_exists = tx_graph.get_tx(txid).is_some();
-                    std::thread::spawn(move || {
-                        if tx_already_exists {
-                            client
-                                .get_tx_status(&txid)
-                                .map_err(Box::new)
-                                .map(|s| (txid, EsploraResp::TxStatus(s)))
-                        } else {
-                            client
-                                .get_tx_info(&txid)
-                                .map_err(Box::new)
-                                .map(|t| (txid, EsploraResp::Tx(t)))
-                        }
-                    })
-                })
-                .collect::<Vec<JoinHandle<Result<(Txid, EsploraResp), Error>>>>();
-
-            if handles.is_empty() {
-                break;
-            }
-
-            for handle in handles {
-                let (txid, resp) = handle.join().expect("thread must not panic")?;
-                match resp {
-                    EsploraResp::TxStatus(status) => {
-                        insert_anchor_from_status(&mut tx_graph, txid, status);
-                    }
-                    EsploraResp::Tx(Some(tx_info)) => {
-                        let _ = tx_graph.insert_tx(tx_info.to_tx());
-                        insert_anchor_from_status(&mut tx_graph, txid, tx_info.status);
-                        insert_prevouts(&mut tx_graph, tx_info.vin);
-                    }
-                    _ => continue,
-                }
-            }
-        }
-        Ok(tx_graph)
-    }
-
-    fn fetch_txs_with_outpoints<I: IntoIterator<Item = OutPoint>>(
-        &self,
-        outpoints: I,
-        parallel_requests: usize,
-    ) -> Result<TxGraph<ConfirmationBlockTime>, Error>
-    where
-        I::IntoIter: ExactSizeIterator,
-    {
-        let outpoints = outpoints.into_iter().collect::<Vec<_>>();
-
-        // make sure txs exists in graph and tx statuses are updated
-        // TODO: We should maintain a tx cache (like we do with Electrum).
-        let mut tx_graph =
-            self.fetch_txs_with_txids(outpoints.iter().map(|op| op.txid), parallel_requests)?;
-
-        // get outpoint spend-statuses
-        let mut outpoints = outpoints.into_iter();
-        let mut missing_txs = Vec::<Txid>::with_capacity(outpoints.len());
-        loop {
-            let handles = outpoints
-                .by_ref()
-                .take(parallel_requests)
-                .map(|op| {
-                    let client = self.clone();
-                    std::thread::spawn(move || {
-                        client
-                            .get_output_status(&op.txid, op.vout as _)
-                            .map_err(Box::new)
-                    })
-                })
-                .collect::<Vec<JoinHandle<Result<Option<OutputStatus>, Error>>>>();
-
-            if handles.is_empty() {
-                break;
-            }
-
-            for handle in handles {
-                if let Some(op_status) = handle.join().expect("thread must not panic")? {
-                    let spend_txid = match op_status.txid {
-                        Some(txid) => txid,
-                        None => continue,
-                    };
-                    if tx_graph.get_tx(spend_txid).is_none() {
-                        missing_txs.push(spend_txid);
-                    }
-                    if let Some(spend_status) = op_status.status {
-                        insert_anchor_from_status(&mut tx_graph, spend_txid, spend_status);
-                    }
-                }
-            }
-        }
+        )?);
+        let _ = graph_update.apply_update(fetch_txs_with_txids(
+            self,
+            request.iter_txids(),
+            parallel_requests,
+        )?);
+        let _ = graph_update.apply_update(fetch_txs_with_outpoints(
+            self,
+            request.iter_outpoints(),
+            parallel_requests,
+        )?);
+
+        let chain_update = match (chain_tip, latest_blocks) {
+            (Some(chain_tip), Some(latest_blocks)) => Some(chain_update(
+                self,
+                &latest_blocks,
+                &chain_tip,
+                graph_update.all_anchors(),
+            )?),
+            _ => None,
+        };
 
-        let _ = tx_graph.apply_update(self.fetch_txs_with_txids(missing_txs, parallel_requests)?);
-        Ok(tx_graph)
+        Ok(SyncResult {
+            chain_update,
+            graph_update,
+        })
     }
 }
 
@@ -461,6 +242,224 @@ fn chain_update<A: Anchor>(
     Ok(tip)
 }
 
+fn fetch_txs_with_keychain_spks<I: Iterator<Item = Indexed<ScriptBuf>>>(
+    client: &esplora_client::BlockingClient,
+    mut keychain_spks: I,
+    stop_gap: usize,
+    parallel_requests: usize,
+) -> Result<(TxGraph<ConfirmationBlockTime>, Option<u32>), Error> {
+    type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>);
+
+    let mut tx_graph = TxGraph::default();
+    let mut last_index = Option::<u32>::None;
+    let mut last_active_index = Option::<u32>::None;
+
+    loop {
+        let handles = keychain_spks
+            .by_ref()
+            .take(parallel_requests)
+            .map(|(spk_index, spk)| {
+                std::thread::spawn({
+                    let client = client.clone();
+                    move || -> Result<TxsOfSpkIndex, Error> {
+                        let mut last_seen = None;
+                        let mut spk_txs = Vec::new();
+                        loop {
+                            let txs = client.scripthash_txs(&spk, last_seen)?;
+                            let tx_count = txs.len();
+                            last_seen = txs.last().map(|tx| tx.txid);
+                            spk_txs.extend(txs);
+                            if tx_count < 25 {
+                                break Ok((spk_index, spk_txs));
+                            }
+                        }
+                    }
+                })
+            })
+            .collect::<Vec<JoinHandle<Result<TxsOfSpkIndex, Error>>>>();
+
+        if handles.is_empty() {
+            break;
+        }
+
+        for handle in handles {
+            let (index, txs) = handle.join().expect("thread must not panic")?;
+            last_index = Some(index);
+            if !txs.is_empty() {
+                last_active_index = Some(index);
+            }
+            for tx in txs {
+                let _ = tx_graph.insert_tx(tx.to_tx());
+                insert_anchor_from_status(&mut tx_graph, tx.txid, tx.status);
+                insert_prevouts(&mut tx_graph, tx.vin);
+            }
+        }
+
+        let last_index = last_index.expect("Must be set since handles wasn't empty.");
+        let gap_limit_reached = if let Some(i) = last_active_index {
+            last_index >= i.saturating_add(stop_gap as u32)
+        } else {
+            last_index + 1 >= stop_gap as u32
+        };
+        if gap_limit_reached {
+            break;
+        }
+    }
+
+    Ok((tx_graph, last_active_index))
+}
+
+/// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning `spks`
+/// against Esplora.
+///
+/// Unlike with [`EsploraExt::fetch_txs_with_keychain_spks`], `spks` must be *bounded* as all
+/// contained scripts will be scanned. `parallel_requests` specifies the maximum number of HTTP
+/// requests to make in parallel.
+///
+/// Refer to [crate-level docs](crate) for more.
+fn fetch_txs_with_spks<I: IntoIterator<Item = ScriptBuf>>(
+    client: &esplora_client::BlockingClient,
+    spks: I,
+    parallel_requests: usize,
+) -> Result<TxGraph<ConfirmationBlockTime>, Error> {
+    fetch_txs_with_keychain_spks(
+        client,
+        spks.into_iter().enumerate().map(|(i, spk)| (i as u32, spk)),
+        usize::MAX,
+        parallel_requests,
+    )
+    .map(|(tx_graph, _)| tx_graph)
+}
+
+/// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning `txids`
+/// against Esplora.
+///
+/// `parallel_requests` specifies the maximum number of HTTP requests to make in parallel.
+///
+/// Refer to [crate-level docs](crate) for more.
+fn fetch_txs_with_txids<I: IntoIterator<Item = Txid>>(
+    client: &esplora_client::BlockingClient,
+    txids: I,
+    parallel_requests: usize,
+) -> Result<TxGraph<ConfirmationBlockTime>, Error> {
+    enum EsploraResp {
+        TxStatus(TxStatus),
+        Tx(Option<Tx>),
+    }
+
+    let mut tx_graph = TxGraph::default();
+    let mut txids = txids.into_iter();
+    loop {
+        let handles = txids
+            .by_ref()
+            .take(parallel_requests)
+            .map(|txid| {
+                let client = client.clone();
+                let tx_already_exists = tx_graph.get_tx(txid).is_some();
+                std::thread::spawn(move || {
+                    if tx_already_exists {
+                        client
+                            .get_tx_status(&txid)
+                            .map_err(Box::new)
+                            .map(|s| (txid, EsploraResp::TxStatus(s)))
+                    } else {
+                        client
+                            .get_tx_info(&txid)
+                            .map_err(Box::new)
+                            .map(|t| (txid, EsploraResp::Tx(t)))
+                    }
+                })
+            })
+            .collect::<Vec<JoinHandle<Result<(Txid, EsploraResp), Error>>>>();
+
+        if handles.is_empty() {
+            break;
+        }
+
+        for handle in handles {
+            let (txid, resp) = handle.join().expect("thread must not panic")?;
+            match resp {
+                EsploraResp::TxStatus(status) => {
+                    insert_anchor_from_status(&mut tx_graph, txid, status);
+                }
+                EsploraResp::Tx(Some(tx_info)) => {
+                    let _ = tx_graph.insert_tx(tx_info.to_tx());
+                    insert_anchor_from_status(&mut tx_graph, txid, tx_info.status);
+                    insert_prevouts(&mut tx_graph, tx_info.vin);
+                }
+                _ => continue,
+            }
+        }
+    }
+    Ok(tx_graph)
+}
+
+/// Fetch transactions and [`ConfirmationBlockTime`]s that contain and spend the provided
+/// `outpoints`.
+///
+/// `parallel_requests` specifies the maximum number of HTTP requests to make in parallel.
+///
+/// Refer to [crate-level docs](crate) for more.
+fn fetch_txs_with_outpoints<I: IntoIterator<Item = OutPoint>>(
+    client: &esplora_client::BlockingClient,
+    outpoints: I,
+    parallel_requests: usize,
+) -> Result<TxGraph<ConfirmationBlockTime>, Error> {
+    let outpoints = outpoints.into_iter().collect::<Vec<_>>();
+
+    // make sure txs exists in graph and tx statuses are updated
+    // TODO: We should maintain a tx cache (like we do with Electrum).
+    let mut tx_graph = fetch_txs_with_txids(
+        client,
+        outpoints.iter().map(|op| op.txid),
+        parallel_requests,
+    )?;
+
+    // get outpoint spend-statuses
+    let mut outpoints = outpoints.into_iter();
+    let mut missing_txs = Vec::<Txid>::with_capacity(outpoints.len());
+    loop {
+        let handles = outpoints
+            .by_ref()
+            .take(parallel_requests)
+            .map(|op| {
+                let client = client.clone();
+                std::thread::spawn(move || {
+                    client
+                        .get_output_status(&op.txid, op.vout as _)
+                        .map_err(Box::new)
+                })
+            })
+            .collect::<Vec<JoinHandle<Result<Option<OutputStatus>, Error>>>>();
+
+        if handles.is_empty() {
+            break;
+        }
+
+        for handle in handles {
+            if let Some(op_status) = handle.join().expect("thread must not panic")? {
+                let spend_txid = match op_status.txid {
+                    Some(txid) => txid,
+                    None => continue,
+                };
+                if tx_graph.get_tx(spend_txid).is_none() {
+                    missing_txs.push(spend_txid);
+                }
+                if let Some(spend_status) = op_status.status {
+                    insert_anchor_from_status(&mut tx_graph, spend_txid, spend_status);
+                }
+            }
+        }
+    }
+
+    let _ = tx_graph.apply_update(fetch_txs_with_txids(
+        client,
+        missing_txs,
+        parallel_requests,
+    )?);
+    Ok(tx_graph)
+}
+
 #[cfg(test)]
 mod test {
     use crate::blocking_ext::{chain_update, fetch_latest_blocks};
index 9ce0c7be188f49177cb71bcbfa1ecec352c16694..c74fe74a11a7b02fcbcea73e3bb33388b28090a0 100644 (file)
@@ -1,22 +1,8 @@
 #![doc = include_str!("../README.md")]
-//! # Low-Level Methods
-//!
-//! [`EsploraExt::sync`] and [`EsploraExt::full_scan`] returns updates which are *complete* and can
-//! be used directly to determine confirmation statuses of each transaction. This is because a
-//! [`LocalChain`] update is contained in the returned update structures. However, sometimes the
-//! caller wishes to use a custom [`ChainOracle`] implementation (something other than
-//! [`LocalChain`]). The following methods ONLY returns an update [`TxGraph`]:
-//!
-//! * [`EsploraExt::fetch_txs_with_keychain_spks`]
-//! * [`EsploraExt::fetch_txs_with_spks`]
-//! * [`EsploraExt::fetch_txs_with_txids`]
-//! * [`EsploraExt::fetch_txs_with_outpoints`]
-//!
 //! # Stop Gap
 //!
-//! Methods [`EsploraExt::full_scan`] and [`EsploraExt::fetch_txs_with_keychain_spks`] takes in a
-//! `stop_gap` input which is defined as the maximum number of consecutive unused script pubkeys to
-//! scan transactions for before stopping.
+//! [`EsploraExt::full_scan`] takes in a `stop_gap` input which is defined as the maximum number of
+//! consecutive unused script pubkeys to scan transactions for before stopping.
 //!
 //! For example, with a `stop_gap` of 3, `full_scan` will keep scanning until it encounters 3
 //! consecutive script pubkeys with no associated transactions.
index 2258c9d60b7bcde57d17354c8a7674919e2d31ca..70d4641941b195a9d0eefca629fd47b71a846059 100644 (file)
@@ -55,7 +55,9 @@ pub async fn test_update_tx_graph_without_keychain() -> anyhow::Result<()> {
     let cp_tip = env.make_checkpoint_tip();
 
     let sync_update = {
-        let request = SyncRequest::from_chain_tip(cp_tip.clone()).set_spks(misc_spks);
+        let request = SyncRequest::builder()
+            .chain_tip(cp_tip.clone())
+            .spks(misc_spks);
         client.sync(request, 1).await?
     };
 
@@ -160,15 +162,17 @@ pub async fn test_async_update_tx_graph_stop_gap() -> anyhow::Result<()> {
     // A scan with a gap limit of 3 won't find the transaction, but a scan with a gap limit of 4
     // will.
     let full_scan_update = {
-        let request =
-            FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone());
+        let request = FullScanRequest::builder()
+            .chain_tip(cp_tip.clone())
+            .spks_for_keychain(0, spks.clone());
         client.full_scan(request, 3, 1).await?
     };
     assert!(full_scan_update.graph_update.full_txs().next().is_none());
     assert!(full_scan_update.last_active_indices.is_empty());
     let full_scan_update = {
-        let request =
-            FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone());
+        let request = FullScanRequest::builder()
+            .chain_tip(cp_tip.clone())
+            .spks_for_keychain(0, spks.clone());
         client.full_scan(request, 4, 1).await?
     };
     assert_eq!(
@@ -201,8 +205,9 @@ pub async fn test_async_update_tx_graph_stop_gap() -> anyhow::Result<()> {
     // A scan with gap limit 5 won't find the second transaction, but a scan with gap limit 6 will.
     // The last active indice won't be updated in the first case but will in the second one.
     let full_scan_update = {
-        let request =
-            FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone());
+        let request = FullScanRequest::builder()
+            .chain_tip(cp_tip.clone())
+            .spks_for_keychain(0, spks.clone());
         client.full_scan(request, 5, 1).await?
     };
     let txs: HashSet<_> = full_scan_update
@@ -214,8 +219,9 @@ pub async fn test_async_update_tx_graph_stop_gap() -> anyhow::Result<()> {
     assert!(txs.contains(&txid_4th_addr));
     assert_eq!(full_scan_update.last_active_indices[&0], 3);
     let full_scan_update = {
-        let request =
-            FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone());
+        let request = FullScanRequest::builder()
+            .chain_tip(cp_tip.clone())
+            .spks_for_keychain(0, spks.clone());
         client.full_scan(request, 6, 1).await?
     };
     let txs: HashSet<_> = full_scan_update
index 2e363f4e6de3e3713fe9b93b111b568552ed702f..818f1f5fb62f6315173261b15453e59c8246be87 100644 (file)
@@ -55,7 +55,9 @@ pub fn test_update_tx_graph_without_keychain() -> anyhow::Result<()> {
     let cp_tip = env.make_checkpoint_tip();
 
     let sync_update = {
-        let request = SyncRequest::from_chain_tip(cp_tip.clone()).set_spks(misc_spks);
+        let request = SyncRequest::builder()
+            .chain_tip(cp_tip.clone())
+            .spks(misc_spks);
         client.sync(request, 1)?
     };
 
@@ -161,15 +163,17 @@ pub fn test_update_tx_graph_stop_gap() -> anyhow::Result<()> {
     // A scan with a stop_gap of 3 won't find the transaction, but a scan with a gap limit of 4
     // will.
     let full_scan_update = {
-        let request =
-            FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone());
+        let request = FullScanRequest::builder()
+            .chain_tip(cp_tip.clone())
+            .spks_for_keychain(0, spks.clone());
         client.full_scan(request, 3, 1)?
     };
     assert!(full_scan_update.graph_update.full_txs().next().is_none());
     assert!(full_scan_update.last_active_indices.is_empty());
     let full_scan_update = {
-        let request =
-            FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone());
+        let request = FullScanRequest::builder()
+            .chain_tip(cp_tip.clone())
+            .spks_for_keychain(0, spks.clone());
         client.full_scan(request, 4, 1)?
     };
     assert_eq!(
@@ -202,8 +206,9 @@ pub fn test_update_tx_graph_stop_gap() -> anyhow::Result<()> {
     // A scan with gap limit 5 won't find the second transaction, but a scan with gap limit 6 will.
     // The last active indice won't be updated in the first case but will in the second one.
     let full_scan_update = {
-        let request =
-            FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone());
+        let request = FullScanRequest::builder()
+            .chain_tip(cp_tip.clone())
+            .spks_for_keychain(0, spks.clone());
         client.full_scan(request, 5, 1)?
     };
     let txs: HashSet<_> = full_scan_update
@@ -215,8 +220,9 @@ pub fn test_update_tx_graph_stop_gap() -> anyhow::Result<()> {
     assert!(txs.contains(&txid_4th_addr));
     assert_eq!(full_scan_update.last_active_indices[&0], 3);
     let full_scan_update = {
-        let request =
-            FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone());
+        let request = FullScanRequest::builder()
+            .chain_tip(cp_tip.clone())
+            .spks_for_keychain(0, spks.clone());
         client.full_scan(request, 6, 1)?
     };
     let txs: HashSet<_> = full_scan_update
index 4303ad298b84bb3c9126fbeb474c11ee985adaee..02936eaefdf72af1aa11c888469122bd00a2e157 100644 (file)
@@ -29,7 +29,10 @@ use bdk_chain::{
     local_chain::{
         self, ApplyHeaderError, CannotConnectError, CheckPoint, CheckPointIter, LocalChain,
     },
-    spk_client::{FullScanRequest, FullScanResult, SyncRequest, SyncResult},
+    spk_client::{
+        FullScanRequest, FullScanRequestBuilder, FullScanResult, SyncRequest, SyncRequestBuilder,
+        SyncResult,
+    },
     tx_graph::{CanonicalTx, TxGraph, TxNode},
     BlockId, ChainPosition, ConfirmationBlockTime, ConfirmationTime, DescriptorExt, FullTxOut,
     Indexed, IndexedTxGraph, Merge,
@@ -151,7 +154,7 @@ impl From<FullScanResult<KeychainKind>> for Update {
         Self {
             last_active_indices: value.last_active_indices,
             graph: value.graph_update,
-            chain: Some(value.chain_update),
+            chain: value.chain_update,
         }
     }
 }
@@ -161,7 +164,7 @@ impl From<SyncResult> for Update {
         Self {
             last_active_indices: BTreeMap::new(),
             graph: value.graph_update,
-            chain: Some(value.chain_update),
+            chain: value.chain_update,
         }
     }
 }
@@ -2437,9 +2440,10 @@ impl Wallet {
     /// This is the first step when performing a spk-based wallet partial sync, the returned
     /// [`SyncRequest`] collects all revealed script pubkeys from the wallet keychain needed to
     /// start a blockchain sync with a spk based blockchain client.
-    pub fn start_sync_with_revealed_spks(&self) -> SyncRequest {
-        SyncRequest::from_chain_tip(self.chain.tip())
-            .populate_with_revealed_spks(&self.indexed_graph.index, ..)
+    pub fn start_sync_with_revealed_spks(&self) -> SyncRequestBuilder<(KeychainKind, u32)> {
+        SyncRequest::builder()
+            .chain_tip(self.chain.tip())
+            .revealed_spks_from_indexer(&self.indexed_graph.index, ..)
     }
 
     /// Create a [`FullScanRequest] for this wallet.
@@ -2450,8 +2454,10 @@ impl Wallet {
     ///
     /// This operation is generally only used when importing or restoring a previously used wallet
     /// in which the list of used scripts is not known.
-    pub fn start_full_scan(&self) -> FullScanRequest<KeychainKind> {
-        FullScanRequest::from_keychain_txout_index(self.chain.tip(), &self.indexed_graph.index)
+    pub fn start_full_scan(&self) -> FullScanRequestBuilder<KeychainKind> {
+        FullScanRequest::builder()
+            .chain_tip(self.chain.tip())
+            .spks_from_indexer(&self.indexed_graph.index)
     }
 }
 
index cda8c55264399492b43e37d9a3858502edfac547..bcb0b3ed53058b9d129748190c00a2510d7f9872 100644 (file)
@@ -1,7 +1,7 @@
 use std::io::{self, Write};
 
 use bdk_chain::{
-    bitcoin::{Address, Network, Txid},
+    bitcoin::Network,
     collections::BTreeSet,
     indexed_tx_graph,
     spk_client::{FullScanRequest, SyncRequest},
@@ -139,8 +139,9 @@ fn main() -> anyhow::Result<()> {
                 let graph = &*graph.lock().unwrap();
                 let chain = &*chain.lock().unwrap();
 
-                FullScanRequest::from_chain_tip(chain.tip())
-                    .set_spks_for_keychain(
+                FullScanRequest::builder()
+                    .chain_tip(chain.tip())
+                    .spks_for_keychain(
                         Keychain::External,
                         graph
                             .index
@@ -148,7 +149,7 @@ fn main() -> anyhow::Result<()> {
                             .into_iter()
                             .flatten(),
                     )
-                    .set_spks_for_keychain(
+                    .spks_for_keychain(
                         Keychain::Internal,
                         graph
                             .index
@@ -156,7 +157,7 @@ fn main() -> anyhow::Result<()> {
                             .into_iter()
                             .flatten(),
                     )
-                    .inspect_spks_for_all_keychains({
+                    .inspect({
                         let mut once = BTreeSet::new();
                         move |k, spk_i, _| {
                             if once.insert(k) {
@@ -199,99 +200,55 @@ fn main() -> anyhow::Result<()> {
             }
 
             let chain_tip = chain.tip();
-            let mut request = SyncRequest::from_chain_tip(chain_tip.clone());
+            let mut request =
+                SyncRequest::builder()
+                    .chain_tip(chain_tip.clone())
+                    .inspect(|item, progress| {
+                        let pc = (100 * progress.consumed()) as f32 / progress.total() as f32;
+                        eprintln!("[ SCANNING {:03.0}% ] {}", pc, item);
+                    });
 
             if all_spks {
-                let all_spks = graph
-                    .index
-                    .revealed_spks(..)
-                    .map(|(index, spk)| (index, spk.to_owned()))
-                    .collect::<Vec<_>>();
-                request = request.chain_spks(all_spks.into_iter().map(|((k, spk_i), spk)| {
-                    eprint!("Scanning {}: {}", k, spk_i);
-                    spk
-                }));
+                request = request.spks_with_labels(
+                    graph
+                        .index
+                        .revealed_spks(..)
+                        .map(|(index, spk)| (index, spk.to_owned())),
+                );
             }
             if unused_spks {
-                let unused_spks = graph
-                    .index
-                    .unused_spks()
-                    .map(|(index, spk)| (index, spk.to_owned()))
-                    .collect::<Vec<_>>();
-                request =
-                    request.chain_spks(unused_spks.into_iter().map(move |((k, spk_i), spk)| {
-                        eprint!(
-                            "Checking if address {} {}:{} has been used",
-                            Address::from_script(&spk, network).unwrap(),
-                            k,
-                            spk_i,
-                        );
-                        spk
-                    }));
+                request = request.spks_with_labels(
+                    graph
+                        .index
+                        .unused_spks()
+                        .map(|(index, spk)| (index, spk.to_owned())),
+                );
             }
 
             if utxos {
                 let init_outpoints = graph.index.outpoints();
-
-                let utxos = graph
-                    .graph()
-                    .filter_chain_unspents(
-                        &*chain,
-                        chain_tip.block_id(),
-                        init_outpoints.iter().cloned(),
-                    )
-                    .map(|(_, utxo)| utxo)
-                    .collect::<Vec<_>>();
-                request = request.chain_outpoints(utxos.into_iter().map(|utxo| {
-                    eprint!(
-                        "Checking if outpoint {} (value: {}) has been spent",
-                        utxo.outpoint, utxo.txout.value
-                    );
-                    utxo.outpoint
-                }));
+                request = request.outpoints(
+                    graph
+                        .graph()
+                        .filter_chain_unspents(
+                            &*chain,
+                            chain_tip.block_id(),
+                            init_outpoints.iter().cloned(),
+                        )
+                        .map(|(_, utxo)| utxo.outpoint),
+                );
             };
 
             if unconfirmed {
-                let unconfirmed_txids = graph
-                    .graph()
-                    .list_canonical_txs(&*chain, chain_tip.block_id())
-                    .filter(|canonical_tx| !canonical_tx.chain_position.is_confirmed())
-                    .map(|canonical_tx| canonical_tx.tx_node.txid)
-                    .collect::<Vec<Txid>>();
-
-                request = request.chain_txids(
-                    unconfirmed_txids
-                        .into_iter()
-                        .inspect(|txid| eprint!("Checking if {} is confirmed yet", txid)),
+                request = request.txids(
+                    graph
+                        .graph()
+                        .list_canonical_txs(&*chain, chain_tip.block_id())
+                        .filter(|canonical_tx| !canonical_tx.chain_position.is_confirmed())
+                        .map(|canonical_tx| canonical_tx.tx_node.txid),
                 );
             }
 
-            let total_spks = request.spks.len();
-            let total_txids = request.txids.len();
-            let total_ops = request.outpoints.len();
-            request = request
-                .inspect_spks({
-                    let mut visited = 0;
-                    move |_| {
-                        visited += 1;
-                        eprintln!(" [ {:>6.2}% ]", (visited * 100) as f32 / total_spks as f32)
-                    }
-                })
-                .inspect_txids({
-                    let mut visited = 0;
-                    move |_| {
-                        visited += 1;
-                        eprintln!(" [ {:>6.2}% ]", (visited * 100) as f32 / total_txids as f32)
-                    }
-                })
-                .inspect_outpoints({
-                    let mut visited = 0;
-                    move |_| {
-                        visited += 1;
-                        eprintln!(" [ {:>6.2}% ]", (visited * 100) as f32 / total_ops as f32)
-                    }
-                });
-
             let res = client
                 .sync(request, scan_options.batch_size, false)
                 .context("scanning the blockchain")?;
@@ -313,7 +270,7 @@ fn main() -> anyhow::Result<()> {
         let mut chain = chain.lock().unwrap();
         let mut graph = graph.lock().unwrap();
 
-        let chain_changeset = chain.apply_update(chain_update)?;
+        let chain_changeset = chain.apply_update(chain_update.expect("request has chain tip"))?;
 
         let mut indexed_tx_graph_changeset =
             indexed_tx_graph::ChangeSet::<ConfirmationBlockTime, _>::default();
index 608e58d11e8842cf1bc3cc807410a803771539d1..0ea99c775e095217a72418f6e1cb0b10185da344 100644 (file)
@@ -1,10 +1,11 @@
+use core::f32;
 use std::{
     collections::BTreeSet,
     io::{self, Write},
 };
 
 use bdk_chain::{
-    bitcoin::{Address, Network, Txid},
+    bitcoin::Network,
     spk_client::{FullScanRequest, SyncRequest},
     Merge,
 };
@@ -144,8 +145,10 @@ fn main() -> anyhow::Result<()> {
             let request = {
                 let chain_tip = chain.lock().expect("mutex must not be poisoned").tip();
                 let indexed_graph = &*graph.lock().expect("mutex must not be poisoned");
-                FullScanRequest::from_keychain_txout_index(chain_tip, &indexed_graph.index)
-                    .inspect_spks_for_all_keychains({
+                FullScanRequest::builder()
+                    .chain_tip(chain_tip)
+                    .spks_from_indexer(&indexed_graph.index)
+                    .inspect({
                         let mut once = BTreeSet::<Keychain>::new();
                         move |keychain, spk_i, _| {
                             if once.insert(keychain) {
@@ -156,6 +159,7 @@ fn main() -> anyhow::Result<()> {
                             let _ = io::stderr().flush();
                         }
                     })
+                    .build()
             };
 
             // The client scans keychain spks for transaction histories, stopping after `stop_gap`
@@ -176,14 +180,17 @@ fn main() -> anyhow::Result<()> {
             // deriviation indices. Usually before a scan you are on a fresh wallet with no
             // addresses derived so we need to derive up to last active addresses the scan found
             // before adding the transactions.
-            (chain.apply_update(update.chain_update)?, {
-                let index_changeset = graph
-                    .index
-                    .reveal_to_target_multi(&update.last_active_indices);
-                let mut indexed_tx_graph_changeset = graph.apply_update(update.graph_update);
-                indexed_tx_graph_changeset.merge(index_changeset.into());
-                indexed_tx_graph_changeset
-            })
+            (
+                chain.apply_update(update.chain_update.expect("request included chain tip"))?,
+                {
+                    let index_changeset = graph
+                        .index
+                        .reveal_to_target_multi(&update.last_active_indices);
+                    let mut indexed_tx_graph_changeset = graph.apply_update(update.graph_update);
+                    indexed_tx_graph_changeset.merge(index_changeset.into());
+                    indexed_tx_graph_changeset
+                },
+            )
         }
         EsploraCommands::Sync {
             mut unused_spks,
@@ -206,7 +213,15 @@ fn main() -> anyhow::Result<()> {
 
             let local_tip = chain.lock().expect("mutex must not be poisoned").tip();
             // Spks, outpoints and txids we want updates on will be accumulated here.
-            let mut request = SyncRequest::from_chain_tip(local_tip.clone());
+            let mut request =
+                SyncRequest::builder()
+                    .chain_tip(local_tip.clone())
+                    .inspect(|item, progress| {
+                        let pc = (100 * progress.consumed()) as f32 / progress.total() as f32;
+                        eprintln!("[ SCANNING {:03.0}% ] {}", pc, item);
+                        // Flush early to ensure we print at every iteration.
+                        let _ = io::stderr().flush();
+                    });
 
             // Get a short lock on the structures to get spks, utxos, and txs that we are interested
             // in.
@@ -215,108 +230,51 @@ fn main() -> anyhow::Result<()> {
                 let chain = chain.lock().unwrap();
 
                 if *all_spks {
-                    let all_spks = graph
-                        .index
-                        .revealed_spks(..)
-                        .map(|((k, i), spk)| (k, i, spk.to_owned()))
-                        .collect::<Vec<_>>();
-                    request = request.chain_spks(all_spks.into_iter().map(|(k, i, spk)| {
-                        eprint!("scanning {}:{}", k, i);
-                        // Flush early to ensure we print at every iteration.
-                        let _ = io::stderr().flush();
-                        spk
-                    }));
+                    request = request.spks_with_labels(
+                        graph
+                            .index
+                            .revealed_spks(..)
+                            .map(|(i, spk)| (i, spk.to_owned())),
+                    );
                 }
                 if unused_spks {
-                    let unused_spks = graph
-                        .index
-                        .unused_spks()
-                        .map(|(index, spk)| (index, spk.to_owned()))
-                        .collect::<Vec<_>>();
-                    request =
-                        request.chain_spks(unused_spks.into_iter().map(move |((k, i), spk)| {
-                            eprint!(
-                                "Checking if address {} {}:{} has been used",
-                                Address::from_script(&spk, network).unwrap(),
-                                k,
-                                i,
-                            );
-                            // Flush early to ensure we print at every iteration.
-                            let _ = io::stderr().flush();
-                            spk
-                        }));
+                    request = request.spks_with_labels(
+                        graph
+                            .index
+                            .unused_spks()
+                            .map(|(index, spk)| (index, spk.to_owned())),
+                    );
                 }
                 if utxos {
                     // We want to search for whether the UTXO is spent, and spent by which
                     // transaction. We provide the outpoint of the UTXO to
                     // `EsploraExt::update_tx_graph_without_keychain`.
                     let init_outpoints = graph.index.outpoints();
-                    let utxos = graph
-                        .graph()
-                        .filter_chain_unspents(
-                            &*chain,
-                            local_tip.block_id(),
-                            init_outpoints.iter().cloned(),
-                        )
-                        .map(|(_, utxo)| utxo)
-                        .collect::<Vec<_>>();
-                    request = request.chain_outpoints(
-                        utxos
-                            .into_iter()
-                            .inspect(|utxo| {
-                                eprint!(
-                                    "Checking if outpoint {} (value: {}) has been spent",
-                                    utxo.outpoint, utxo.txout.value
-                                );
-                                // Flush early to ensure we print at every iteration.
-                                let _ = io::stderr().flush();
-                            })
-                            .map(|utxo| utxo.outpoint),
+                    request = request.outpoints(
+                        graph
+                            .graph()
+                            .filter_chain_unspents(
+                                &*chain,
+                                local_tip.block_id(),
+                                init_outpoints.iter().cloned(),
+                            )
+                            .map(|(_, utxo)| utxo.outpoint),
                     );
                 };
                 if unconfirmed {
                     // We want to search for whether the unconfirmed transaction is now confirmed.
                     // We provide the unconfirmed txids to
                     // `EsploraExt::update_tx_graph_without_keychain`.
-                    let unconfirmed_txids = graph
-                        .graph()
-                        .list_canonical_txs(&*chain, local_tip.block_id())
-                        .filter(|canonical_tx| !canonical_tx.chain_position.is_confirmed())
-                        .map(|canonical_tx| canonical_tx.tx_node.txid)
-                        .collect::<Vec<Txid>>();
-                    request = request.chain_txids(unconfirmed_txids.into_iter().inspect(|txid| {
-                        eprint!("Checking if {} is confirmed yet", txid);
-                        // Flush early to ensure we print at every iteration.
-                        let _ = io::stderr().flush();
-                    }));
+                    request = request.txids(
+                        graph
+                            .graph()
+                            .list_canonical_txs(&*chain, local_tip.block_id())
+                            .filter(|canonical_tx| !canonical_tx.chain_position.is_confirmed())
+                            .map(|canonical_tx| canonical_tx.tx_node.txid),
+                    );
                 }
             }
 
-            let total_spks = request.spks.len();
-            let total_txids = request.txids.len();
-            let total_ops = request.outpoints.len();
-            request = request
-                .inspect_spks({
-                    let mut visited = 0;
-                    move |_| {
-                        visited += 1;
-                        eprintln!(" [ {:>6.2}% ]", (visited * 100) as f32 / total_spks as f32)
-                    }
-                })
-                .inspect_txids({
-                    let mut visited = 0;
-                    move |_| {
-                        visited += 1;
-                        eprintln!(" [ {:>6.2}% ]", (visited * 100) as f32 / total_txids as f32)
-                    }
-                })
-                .inspect_outpoints({
-                    let mut visited = 0;
-                    move |_| {
-                        visited += 1;
-                        eprintln!(" [ {:>6.2}% ]", (visited * 100) as f32 / total_ops as f32)
-                    }
-                });
             let mut update = client.sync(request, scan_options.parallel_requests)?;
 
             // Update last seen unconfirmed
@@ -324,7 +282,10 @@ fn main() -> anyhow::Result<()> {
             let _ = update.graph_update.update_last_seen_unconfirmed(now);
 
             (
-                chain.lock().unwrap().apply_update(update.chain_update)?,
+                chain
+                    .lock()
+                    .unwrap()
+                    .apply_update(update.chain_update.expect("request has chain tip"))?,
                 graph.lock().unwrap().apply_update(update.graph_update),
             )
         }
index b1e7655de24d013395eb2f51123e06d824ba5c1e..cda1889d0b77ad0fcf516cb37f473702611499db 100644 (file)
@@ -52,19 +52,18 @@ fn main() -> Result<(), anyhow::Error> {
     // already have.
     client.populate_tx_cache(wallet.tx_graph());
 
-    let request = wallet
-        .start_full_scan()
-        .inspect_spks_for_all_keychains({
-            let mut once = HashSet::<KeychainKind>::new();
-            move |k, spk_i, _| {
-                if once.insert(k) {
-                    print!("\nScanning keychain [{:?}]", k)
-                } else {
-                    print!(" {:<3}", spk_i)
-                }
+    let request = wallet.start_full_scan().inspect({
+        let mut stdout = std::io::stdout();
+        let mut once = HashSet::<KeychainKind>::new();
+        move |k, spk_i, _| {
+            if once.insert(k) {
+                print!("\nScanning keychain [{:?}]", k)
+            } else {
+                print!(" {:<3}", spk_i)
             }
-        })
-        .inspect_spks_for_all_keychains(|_, _, _| std::io::stdout().flush().expect("must flush"));
+            stdout.flush().expect("must flush");
+        }
+    });
 
     let mut update = client.full_scan(request, STOP_GAP, BATCH_SIZE, false)?;
 
index 535abc6af294efc7b484415c699d36a69fb77302..ae73e603c08b0ea7398a41cce1ea148529049808 100644 (file)
@@ -45,14 +45,15 @@ async fn main() -> Result<(), anyhow::Error> {
     print!("Syncing...");
     let client = esplora_client::Builder::new(ESPLORA_URL).build_async()?;
 
-    let request = wallet.start_full_scan().inspect_spks_for_all_keychains({
+    let request = wallet.start_full_scan().inspect({
+        let mut stdout = std::io::stdout();
         let mut once = BTreeSet::<KeychainKind>::new();
         move |keychain, spk_i, _| {
             if once.insert(keychain) {
-                print!("\nScanning keychain [{:?}] ", keychain);
+                print!("\nScanning keychain [{:?}]", keychain);
             }
             print!(" {:<3}", spk_i);
-            std::io::stdout().flush().expect("must flush")
+            stdout.flush().expect("must flush")
         }
     });
 
index 7e825150d856b7f7dc551ba6831faa8f5b8f9b6f..9bfed70ee7c5315403cb0ff5ba490d5fd687941a 100644 (file)
@@ -47,14 +47,15 @@ fn main() -> Result<(), anyhow::Error> {
     print!("Syncing...");
     let client = esplora_client::Builder::new(ESPLORA_URL).build_blocking();
 
-    let request = wallet.start_full_scan().inspect_spks_for_all_keychains({
+    let request = wallet.start_full_scan().inspect({
+        let mut stdout = std::io::stdout();
         let mut once = BTreeSet::<KeychainKind>::new();
         move |keychain, spk_i, _| {
             if once.insert(keychain) {
                 print!("\nScanning keychain [{:?}] ", keychain);
             }
             print!(" {:<3}", spk_i);
-            std::io::stdout().flush().expect("must flush")
+            stdout.flush().expect("must flush")
         }
     });