]> Untitled Git - bdk/commitdiff
feat!: Improve spk-based syncing flow
author志宇 <hello@evanlinjin.me>
Thu, 13 Feb 2025 11:08:18 +0000 (22:08 +1100)
committer志宇 <hello@evanlinjin.me>
Fri, 28 Feb 2025 03:12:03 +0000 (14:12 +1100)
* Change `TxUpdate::seen_ats` to be a `HashSet<(Txid, u64)>` so we can
  introduce multiple timestamps per tx. This is useful to introduce both
  `first_seen` and `last_seen` timestamps to `TxGraph`. This is also a
  better API for chain-sources as they can just insert timestamps into
  the field without checking previous values.
* Change sync/full-scan flow to have the request structure introduce the
  sync time instead of introducing the timestamp when apply the
  `TxUpdate`. This simplifies the `apply_update{_at}` logic and makes
  `evicted_at` easier to reason about (in the future).

crates/chain/benches/canonicalization.rs
crates/chain/src/indexed_tx_graph.rs
crates/chain/src/tx_graph.rs
crates/chain/tests/test_tx_graph.rs
crates/core/src/spk_client.rs
crates/core/src/tx_update.rs
crates/electrum/src/bdk_electrum_client.rs
crates/electrum/tests/test_electrum.rs
crates/esplora/src/async_ext.rs
crates/esplora/src/blocking_ext.rs
crates/esplora/src/lib.rs

index 52cbf51d68f8293cc4f5944d9bf726ebbf4469c6..6893e6df885c6209bb7a89b10b0be622f48d8306 100644 (file)
@@ -133,8 +133,9 @@ pub fn many_conflicting_unconfirmed(c: &mut Criterion) {
                 ..new_tx(i)
             };
             let mut update = TxUpdate::default();
+            update.seen_ats = [(tx.compute_txid(), i as u64)].into();
             update.txs = vec![Arc::new(tx)];
-            let _ = tx_graph.apply_update_at(update, Some(i as u64));
+            let _ = tx_graph.apply_update(update);
         }
     }));
     c.bench_function("many_conflicting_unconfirmed::list_canonical_txs", {
@@ -168,8 +169,9 @@ pub fn many_chained_unconfirmed(c: &mut Criterion) {
             };
             let txid = tx.compute_txid();
             let mut update = TxUpdate::default();
+            update.seen_ats = [(txid, i as u64)].into();
             update.txs = vec![Arc::new(tx)];
-            let _ = tx_graph.apply_update_at(update, Some(i as u64));
+            let _ = tx_graph.apply_update(update);
             // Store the next prevout.
             previous_output = OutPoint::new(txid, 0);
         }
index 039924c9217dc497c98b5bded676eb4350f124e7..45ed92aeeedf73b70688a6eefb37d9cbd9fb8231 100644 (file)
@@ -91,37 +91,12 @@ where
     /// Apply an `update` directly.
     ///
     /// `update` is a [`tx_graph::TxUpdate<A>`] and the resultant changes is returned as [`ChangeSet`].
-    #[cfg(feature = "std")]
-    #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
     pub fn apply_update(&mut self, update: tx_graph::TxUpdate<A>) -> ChangeSet<A, I::ChangeSet> {
         let tx_graph = self.graph.apply_update(update);
         let indexer = self.index_tx_graph_changeset(&tx_graph);
         ChangeSet { tx_graph, indexer }
     }
 
-    /// Apply the given `update` with an optional `seen_at` timestamp.
-    ///
-    /// `seen_at` represents when the update is seen (in unix seconds). It is used to determine the
-    /// `last_seen`s for all transactions in the update which have no corresponding anchor(s). The
-    /// `last_seen` value is used internally to determine precedence of conflicting unconfirmed
-    /// transactions (where the transaction with the lower `last_seen` value is omitted from the
-    /// canonical history).
-    ///
-    /// Not setting a `seen_at` value means unconfirmed transactions introduced by this update will
-    /// not be part of the canonical history of transactions.
-    ///
-    /// Use [`apply_update`](IndexedTxGraph::apply_update) to have the `seen_at` value automatically
-    /// set to the current time.
-    pub fn apply_update_at(
-        &mut self,
-        update: tx_graph::TxUpdate<A>,
-        seen_at: Option<u64>,
-    ) -> ChangeSet<A, I::ChangeSet> {
-        let tx_graph = self.graph.apply_update_at(update, seen_at);
-        let indexer = self.index_tx_graph_changeset(&tx_graph);
-        ChangeSet { tx_graph, indexer }
-    }
-
     /// Insert a floating `txout` of given `outpoint`.
     pub fn insert_txout(&mut self, outpoint: OutPoint, txout: TxOut) -> ChangeSet<A, I::ChangeSet> {
         let graph = self.graph.insert_txout(outpoint, txout);
index d0f7380a1db0a983335a5059d159731f1ade7c66..d40ee49d3cb5cabdc511ed30398c9d26395616b5 100644 (file)
@@ -129,7 +129,7 @@ impl<A: Ord> From<TxGraph<A>> for TxUpdate<A> {
 impl<A: Anchor> From<TxUpdate<A>> for TxGraph<A> {
     fn from(update: TxUpdate<A>) -> Self {
         let mut graph = TxGraph::<A>::default();
-        let _ = graph.apply_update_at(update, None);
+        let _ = graph.apply_update(update);
         graph
     }
 }
@@ -719,52 +719,20 @@ impl<A: Anchor> TxGraph<A> {
     ///
     /// The returned [`ChangeSet`] is the set difference between `update` and `self` (transactions that
     /// exist in `update` but not in `self`).
-    #[cfg(feature = "std")]
-    #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
     pub fn apply_update(&mut self, update: TxUpdate<A>) -> ChangeSet<A> {
-        use std::time::*;
-        let now = SystemTime::now()
-            .duration_since(UNIX_EPOCH)
-            .expect("current time must be greater than epoch anchor");
-        self.apply_update_at(update, Some(now.as_secs()))
-    }
-
-    /// Extends this graph with the given `update` alongside an optional `seen_at` timestamp.
-    ///
-    /// `seen_at` represents when the update is seen (in unix seconds). It is used to determine the
-    /// `last_seen`s for all transactions in the update which have no corresponding anchor(s). The
-    /// `last_seen` value is used internally to determine precedence of conflicting unconfirmed
-    /// transactions (where the transaction with the lower `last_seen` value is omitted from the
-    /// canonical history).
-    ///
-    /// Not setting a `seen_at` value means unconfirmed transactions introduced by this update will
-    /// not be part of the canonical history of transactions.
-    ///
-    /// Use [`apply_update`](TxGraph::apply_update) to have the `seen_at` value automatically set
-    /// to the current time.
-    pub fn apply_update_at(&mut self, update: TxUpdate<A>, seen_at: Option<u64>) -> ChangeSet<A> {
         let mut changeset = ChangeSet::<A>::default();
-        let mut unanchored_txs = HashSet::<Txid>::new();
         for tx in update.txs {
-            if unanchored_txs.insert(tx.compute_txid()) {
-                changeset.merge(self.insert_tx(tx));
-            }
+            changeset.merge(self.insert_tx(tx));
         }
         for (outpoint, txout) in update.txouts {
             changeset.merge(self.insert_txout(outpoint, txout));
         }
         for (anchor, txid) in update.anchors {
-            unanchored_txs.remove(&txid);
             changeset.merge(self.insert_anchor(txid, anchor));
         }
         for (txid, seen_at) in update.seen_ats {
             changeset.merge(self.insert_seen_at(txid, seen_at));
         }
-        if let Some(seen_at) = seen_at {
-            for txid in unanchored_txs {
-                changeset.merge(self.insert_seen_at(txid, seen_at));
-            }
-        }
         changeset
     }
 
index 16d2e6c6d4bdcef21b967437e7d8c003d36dfb4c..eef5e223981f3a4f7910cc592677f466464592b8 100644 (file)
@@ -94,7 +94,7 @@ fn insert_txouts() {
             // Insert partials transactions.
             update.txouts.insert(*outpoint, txout.clone());
             // Mark them unconfirmed.
-            update.seen_ats.insert(outpoint.txid, unconf_seen_at);
+            update.seen_ats.insert((outpoint.txid, unconf_seen_at));
         }
 
         // Insert the full transaction.
@@ -1289,7 +1289,7 @@ fn tx_graph_update_conversion() {
 
     for (test_name, update) in test_cases {
         let mut tx_graph = TxGraph::<ConfirmationBlockTime>::default();
-        let _ = tx_graph.apply_update_at(update.clone(), None);
+        let _ = tx_graph.apply_update(update.clone());
         let update_from_tx_graph: TxUpdate<ConfirmationBlockTime> = tx_graph.into();
 
         assert_eq!(
index a5ec813c97a93df680ef8f1930385df9f72c8c7e..dce3b7ae1886ae4e747e256958d2a1a11972cfb3 100644 (file)
@@ -87,19 +87,13 @@ impl SyncProgress {
 }
 
 /// Builds a [`SyncRequest`].
+///
+/// Construct with [`SyncRequest::builder`].
 #[must_use]
 pub struct SyncRequestBuilder<I = ()> {
     inner: SyncRequest<I>,
 }
 
-impl<I> Default for SyncRequestBuilder<I> {
-    fn default() -> Self {
-        Self {
-            inner: Default::default(),
-        }
-    }
-}
-
 impl SyncRequestBuilder<()> {
     /// Add [`Script`]s that will be synced against.
     pub fn spks(self, spks: impl IntoIterator<Item = ScriptBuf>) -> Self {
@@ -210,6 +204,7 @@ impl<I> SyncRequestBuilder<I> {
 /// ```
 #[must_use]
 pub struct SyncRequest<I = ()> {
+    start_time: u64,
     chain_tip: Option<CheckPoint>,
     spks: VecDeque<(I, ScriptBuf)>,
     spks_consumed: usize,
@@ -220,21 +215,6 @@ pub struct SyncRequest<I = ()> {
     inspect: Box<InspectSync<I>>,
 }
 
-impl<I> Default for SyncRequest<I> {
-    fn default() -> Self {
-        Self {
-            chain_tip: None,
-            spks: VecDeque::new(),
-            spks_consumed: 0,
-            txids: VecDeque::new(),
-            txids_consumed: 0,
-            outpoints: VecDeque::new(),
-            outpoints_consumed: 0,
-            inspect: Box::new(|_, _| {}),
-        }
-    }
-}
-
 impl<I> From<SyncRequestBuilder<I>> for SyncRequest<I> {
     fn from(builder: SyncRequestBuilder<I>) -> Self {
         builder.inner
@@ -242,13 +222,49 @@ impl<I> From<SyncRequestBuilder<I>> for SyncRequest<I> {
 }
 
 impl<I> SyncRequest<I> {
-    /// Start building a [`SyncRequest`].
-    pub fn builder() -> SyncRequestBuilder<I> {
+    /// Start building [`SyncRequest`] with a given `start_time`.
+    ///
+    /// `start_time` specifies the start time of sync. Chain sources can use this value to set
+    /// [`TxUpdate::seen_ats`](crate::TxUpdate::seen_ats) for mempool transactions. A transaction
+    /// without any `seen_ats` is assumed to be unseen in the mempool.
+    ///
+    /// Use [`SyncRequest::builder`] to use the current timestamp as `start_time` (this requires
+    /// `feature = "std"`).
+    pub fn builder_at(start_time: u64) -> SyncRequestBuilder<I> {
         SyncRequestBuilder {
-            inner: Default::default(),
+            inner: Self {
+                start_time,
+                chain_tip: None,
+                spks: VecDeque::new(),
+                spks_consumed: 0,
+                txids: VecDeque::new(),
+                txids_consumed: 0,
+                outpoints: VecDeque::new(),
+                outpoints_consumed: 0,
+                inspect: Box::new(|_, _| ()),
+            },
         }
     }
 
+    /// Start building [`SyncRequest`] with the current timestamp as the `start_time`.
+    ///
+    /// Use [`SyncRequest::builder_at`] to manually set the `start_time`, or if `feature = "std"`
+    /// is not available.
+    #[cfg(feature = "std")]
+    #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
+    pub fn builder() -> SyncRequestBuilder<I> {
+        let start_time = std::time::UNIX_EPOCH
+            .elapsed()
+            .expect("failed to get current timestamp")
+            .as_secs();
+        Self::builder_at(start_time)
+    }
+
+    /// When the sync-request was initiated.
+    pub fn start_time(&self) -> u64 {
+        self.start_time
+    }
+
     /// Get the [`SyncProgress`] of this request.
     pub fn progress(&self) -> SyncProgress {
         SyncProgress {
@@ -339,19 +355,13 @@ impl<A> Default for SyncResponse<A> {
 }
 
 /// Builds a [`FullScanRequest`].
+///
+/// Construct with [`FullScanRequest::builder`].
 #[must_use]
 pub struct FullScanRequestBuilder<K> {
     inner: FullScanRequest<K>,
 }
 
-impl<K> Default for FullScanRequestBuilder<K> {
-    fn default() -> Self {
-        Self {
-            inner: Default::default(),
-        }
-    }
-}
-
 impl<K: Ord> FullScanRequestBuilder<K> {
     /// Set the initial chain tip for the full scan request.
     ///
@@ -397,6 +407,7 @@ impl<K: Ord> FullScanRequestBuilder<K> {
 /// [`chain_tip`](FullScanRequestBuilder::chain_tip) (if provided).
 #[must_use]
 pub struct FullScanRequest<K> {
+    start_time: u64,
     chain_tip: Option<CheckPoint>,
     spks_by_keychain: BTreeMap<K, Box<dyn Iterator<Item = Indexed<ScriptBuf>> + Send>>,
     inspect: Box<InspectFullScan<K>>,
@@ -408,22 +419,43 @@ impl<K> From<FullScanRequestBuilder<K>> for FullScanRequest<K> {
     }
 }
 
-impl<K> Default for FullScanRequest<K> {
-    fn default() -> Self {
-        Self {
-            chain_tip: None,
-            spks_by_keychain: Default::default(),
-            inspect: Box::new(|_, _, _| {}),
+impl<K: Ord + Clone> FullScanRequest<K> {
+    /// Start building a [`FullScanRequest`] with a given `start_time`.
+    ///
+    /// `start_time` specifies the start time of sync. Chain sources can use this value to set
+    /// [`TxUpdate::seen_ats`](crate::TxUpdate::seen_ats) for mempool transactions. A transaction
+    /// without any `seen_ats` is assumed to be unseen in the mempool.
+    ///
+    /// Use [`FullScanRequest::builder`] to use the current timestamp as `start_time` (this
+    /// requires `feature = "std`).
+    pub fn builder_at(start_time: u64) -> FullScanRequestBuilder<K> {
+        FullScanRequestBuilder {
+            inner: Self {
+                start_time,
+                chain_tip: None,
+                spks_by_keychain: BTreeMap::new(),
+                inspect: Box::new(|_, _, _| ()),
+            },
         }
     }
-}
 
-impl<K: Ord + Clone> FullScanRequest<K> {
-    /// Start building a [`FullScanRequest`].
+    /// Start building a [`FullScanRequest`] with the current timestamp as the `start_time`.
+    ///
+    /// Use [`FullScanRequest::builder_at`] to manually set the `start_time`, or if `feature =
+    /// "std"` is not available.
+    #[cfg(feature = "std")]
+    #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
     pub fn builder() -> FullScanRequestBuilder<K> {
-        FullScanRequestBuilder {
-            inner: Self::default(),
-        }
+        let start_time = std::time::UNIX_EPOCH
+            .elapsed()
+            .expect("failed to get current timestamp")
+            .as_secs();
+        Self::builder_at(start_time)
+    }
+
+    /// When the full-scan-request was initiated.
+    pub fn start_time(&self) -> u64 {
+        self.start_time
     }
 
     /// Get the chain tip [`CheckPoint`] of this request (if any).
index 5da2bff8770acb0ac13c83909fe7df7c5698bc64..0b548313a3604201bae5a3ed64f3822bfba9789a 100644 (file)
@@ -1,4 +1,4 @@
-use crate::collections::{BTreeMap, BTreeSet, HashMap};
+use crate::collections::{BTreeMap, BTreeSet, HashSet};
 use alloc::{sync::Arc, vec::Vec};
 use bitcoin::{OutPoint, Transaction, TxOut, Txid};
 
@@ -24,16 +24,26 @@ pub struct TxUpdate<A = ()> {
     /// Full transactions. These are transactions that were determined to be relevant to the wallet
     /// given the request.
     pub txs: Vec<Arc<Transaction>>,
+
     /// Floating txouts. These are `TxOut`s that exist but the whole transaction wasn't included in
     /// `txs` since only knowing about the output is important. These are often used to help determine
     /// the fee of a wallet transaction.
     pub txouts: BTreeMap<OutPoint, TxOut>,
+
     /// Transaction anchors. Anchors tells us a position in the chain where a transaction was
     /// confirmed.
     pub anchors: BTreeSet<(A, Txid)>,
-    /// Seen at times for transactions. This records when a transaction was most recently seen in
-    /// the user's mempool for the sake of tie-breaking other conflicting transactions.
-    pub seen_ats: HashMap<Txid, u64>,
+
+    /// When transactions were seen in the mempool.
+    ///
+    /// An unconfirmed transaction can only be canonical with a `seen_at` value. It is the
+    /// responsibility of the chain-source to include the `seen_at` values for unconfirmed
+    /// (unanchored) transactions.
+    ///
+    /// [`FullScanRequest::start_time`](crate::spk_client::FullScanRequest::start_time) or
+    /// [`SyncRequest::start_time`](crate::spk_client::SyncRequest::start_time) can be used to
+    /// provide the `seen_at` value.
+    pub seen_ats: HashSet<(Txid, u64)>,
 }
 
 impl<A> Default for TxUpdate<A> {
index e187bf3667ce65f463d78158aea82cef18aa47e0..163854ad30288197ff7772586c4941f56feef477 100644 (file)
@@ -128,6 +128,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
         fetch_prev_txouts: bool,
     ) -> Result<FullScanResponse<K>, Error> {
         let mut request: FullScanRequest<K> = request.into();
+        let start_time = request.start_time();
 
         let tip_and_latest_blocks = match request.chain_tip() {
             Some(chain_tip) => Some(fetch_tip_and_latest_blocks(&self.inner, chain_tip)?),
@@ -139,7 +140,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
         for keychain in request.keychains() {
             let spks = request.iter_spks(keychain.clone());
             if let Some(last_active_index) =
-                self.populate_with_spks(&mut tx_update, spks, stop_gap, batch_size)?
+                self.populate_with_spks(start_time, &mut tx_update, spks, stop_gap, batch_size)?
             {
                 last_active_indices.insert(keychain, last_active_index);
             }
@@ -196,6 +197,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
         fetch_prev_txouts: bool,
     ) -> Result<SyncResponse, Error> {
         let mut request: SyncRequest<I> = request.into();
+        let start_time = request.start_time();
 
         let tip_and_latest_blocks = match request.chain_tip() {
             Some(chain_tip) => Some(fetch_tip_and_latest_blocks(&self.inner, chain_tip)?),
@@ -204,6 +206,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
 
         let mut tx_update = TxUpdate::<ConfirmationBlockTime>::default();
         self.populate_with_spks(
+            start_time,
             &mut tx_update,
             request
                 .iter_spks()
@@ -212,8 +215,8 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
             usize::MAX,
             batch_size,
         )?;
-        self.populate_with_txids(&mut tx_update, request.iter_txids())?;
-        self.populate_with_outpoints(&mut tx_update, request.iter_outpoints())?;
+        self.populate_with_txids(start_time, &mut tx_update, request.iter_txids())?;
+        self.populate_with_outpoints(start_time, &mut tx_update, request.iter_outpoints())?;
 
         // Fetch previous `TxOut`s for fee calculation if flag is enabled.
         if fetch_prev_txouts {
@@ -242,6 +245,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
     /// also included.
     fn populate_with_spks(
         &self,
+        start_time: u64,
         tx_update: &mut TxUpdate<ConfirmationBlockTime>,
         mut spks: impl Iterator<Item = (u32, ScriptBuf)>,
         stop_gap: usize,
@@ -268,7 +272,6 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
                     if unused_spk_count >= stop_gap {
                         return Ok(last_active_index);
                     }
-                    continue;
                 } else {
                     last_active_index = Some(spk_index);
                     unused_spk_count = 0;
@@ -276,8 +279,14 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
 
                 for tx_res in spk_history {
                     tx_update.txs.push(self.fetch_tx(tx_res.tx_hash)?);
-                    if let Ok(height) = tx_res.height.try_into() {
-                        self.validate_merkle_for_anchor(tx_update, tx_res.tx_hash, height)?;
+                    match tx_res.height.try_into() {
+                        // Returned heights 0 & -1 are reserved for unconfirmed txs.
+                        Ok(height) if height > 0 => {
+                            self.validate_merkle_for_anchor(tx_update, tx_res.tx_hash, height)?;
+                        }
+                        _ => {
+                            tx_update.seen_ats.insert((tx_res.tx_hash, start_time));
+                        }
                     }
                 }
             }
@@ -290,6 +299,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
     /// included. Anchors of the aforementioned transactions are included.
     fn populate_with_outpoints(
         &self,
+        start_time: u64,
         tx_update: &mut TxUpdate<ConfirmationBlockTime>,
         outpoints: impl IntoIterator<Item = OutPoint>,
     ) -> Result<(), Error> {
@@ -314,8 +324,14 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
                 if !has_residing && res.tx_hash == op_txid {
                     has_residing = true;
                     tx_update.txs.push(Arc::clone(&op_tx));
-                    if let Ok(height) = res.height.try_into() {
-                        self.validate_merkle_for_anchor(tx_update, res.tx_hash, height)?;
+                    match res.height.try_into() {
+                        // Returned heights 0 & -1 are reserved for unconfirmed txs.
+                        Ok(height) if height > 0 => {
+                            self.validate_merkle_for_anchor(tx_update, res.tx_hash, height)?;
+                        }
+                        _ => {
+                            tx_update.seen_ats.insert((res.tx_hash, start_time));
+                        }
                     }
                 }
 
@@ -330,8 +346,14 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
                         continue;
                     }
                     tx_update.txs.push(Arc::clone(&res_tx));
-                    if let Ok(height) = res.height.try_into() {
-                        self.validate_merkle_for_anchor(tx_update, res.tx_hash, height)?;
+                    match res.height.try_into() {
+                        // Returned heights 0 & -1 are reserved for unconfirmed txs.
+                        Ok(height) if height > 0 => {
+                            self.validate_merkle_for_anchor(tx_update, res.tx_hash, height)?;
+                        }
+                        _ => {
+                            tx_update.seen_ats.insert((res.tx_hash, start_time));
+                        }
                     }
                 }
             }
@@ -342,6 +364,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
     /// Populate the `tx_update` with transactions/anchors of the provided `txids`.
     fn populate_with_txids(
         &self,
+        start_time: u64,
         tx_update: &mut TxUpdate<ConfirmationBlockTime>,
         txids: impl IntoIterator<Item = Txid>,
     ) -> Result<(), Error> {
@@ -366,8 +389,14 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
                 .into_iter()
                 .find(|r| r.tx_hash == txid)
             {
-                if let Ok(height) = r.height.try_into() {
-                    self.validate_merkle_for_anchor(tx_update, txid, height)?;
+                match r.height.try_into() {
+                    // Returned heights 0 & -1 are reserved for unconfirmed txs.
+                    Ok(height) if height > 0 => {
+                        self.validate_merkle_for_anchor(tx_update, txid, height)?;
+                    }
+                    _ => {
+                        tx_update.seen_ats.insert((r.tx_hash, start_time));
+                    }
                 }
             }
 
index 8c89605e4464032d37964ac434d2f87266f3b9e3..da15e98035d13ad1944014abacfc9655300d17c6 100644 (file)
@@ -75,7 +75,7 @@ pub fn chained_mempool_tx_sync() -> anyhow::Result<()> {
     env.mine_blocks(100, None)?;
 
     // First unconfirmed tx.
-    env.send(&tracked_addr, Amount::from_btc(1.0)?)?;
+    let txid1 = env.send(&tracked_addr, Amount::from_btc(1.0)?)?;
 
     // Create second unconfirmed tx that spends the first.
     let utxo = rpc_client
@@ -100,7 +100,7 @@ pub fn chained_mempool_tx_sync() -> anyhow::Result<()> {
     let signed_tx = rpc_client
         .sign_raw_transaction_with_wallet(tx_that_spends_unconfirmed.raw_hex(), None, None)?
         .transaction()?;
-    rpc_client.send_raw_transaction(signed_tx.raw_hex())?;
+    let txid2 = rpc_client.send_raw_transaction(signed_tx.raw_hex())?;
 
     env.wait_until_electrum_sees_txid(signed_tx.compute_txid(), Duration::from_secs(5))?;
 
@@ -111,8 +111,16 @@ pub fn chained_mempool_tx_sync() -> anyhow::Result<()> {
     );
 
     let client = BdkElectrumClient::new(electrum_client);
-    let request = SyncRequest::builder().spks(core::iter::once(tracked_addr.script_pubkey()));
-    let _response = client.sync(request, 1, false)?;
+    let req = SyncRequest::builder()
+        .spks(core::iter::once(tracked_addr.script_pubkey()))
+        .build();
+    let req_time = req.start_time();
+    let response = client.sync(req, 1, false)?;
+    assert_eq!(
+        response.tx_update.seen_ats,
+        [(txid1, req_time), (txid2, req_time)].into(),
+        "both txids must have `seen_at` time match the request's `start_time`",
+    );
 
     Ok(())
 }
index 4c1bd0ad791b93c05f736edd54e7692dedfcd28a..4cb34ad80f1545dd94d4a5a5632fbeca444896f7 100644 (file)
@@ -8,7 +8,7 @@ use bdk_core::{
 use esplora_client::Sleeper;
 use futures::{stream::FuturesOrdered, TryStreamExt};
 
-use crate::{insert_anchor_from_status, insert_prevouts};
+use crate::{insert_anchor_or_seen_at_from_status, insert_prevouts};
 
 /// [`esplora_client::Error`]
 type Error = Box<esplora_client::Error>;
@@ -63,6 +63,7 @@ where
         parallel_requests: usize,
     ) -> Result<FullScanResponse<K>, Error> {
         let mut request = request.into();
+        let start_time = request.start_time();
         let keychains = request.keychains();
 
         let chain_tip = request.chain_tip();
@@ -79,6 +80,7 @@ where
             let keychain_spks = request.iter_spks(keychain.clone());
             let (update, last_active_index) = fetch_txs_with_keychain_spks(
                 self,
+                start_time,
                 &mut inserted_txs,
                 keychain_spks,
                 stop_gap,
@@ -111,6 +113,7 @@ where
         parallel_requests: usize,
     ) -> Result<SyncResponse, Error> {
         let mut request = request.into();
+        let start_time = request.start_time();
 
         let chain_tip = request.chain_tip();
         let latest_blocks = if chain_tip.is_some() {
@@ -124,6 +127,7 @@ where
         tx_update.extend(
             fetch_txs_with_spks(
                 self,
+                start_time,
                 &mut inserted_txs,
                 request.iter_spks(),
                 parallel_requests,
@@ -133,6 +137,7 @@ where
         tx_update.extend(
             fetch_txs_with_txids(
                 self,
+                start_time,
                 &mut inserted_txs,
                 request.iter_txids(),
                 parallel_requests,
@@ -142,6 +147,7 @@ where
         tx_update.extend(
             fetch_txs_with_outpoints(
                 self,
+                start_time,
                 &mut inserted_txs,
                 request.iter_outpoints(),
                 parallel_requests,
@@ -278,6 +284,7 @@ async fn chain_update<S: Sleeper>(
 /// Refer to [crate-level docs](crate) for more.
 async fn fetch_txs_with_keychain_spks<I, S>(
     client: &esplora_client::AsyncClient<S>,
+    start_time: u64,
     inserted_txs: &mut HashSet<Txid>,
     mut keychain_spks: I,
     stop_gap: usize,
@@ -328,7 +335,7 @@ where
                 if inserted_txs.insert(tx.txid) {
                     update.txs.push(tx.to_tx().into());
                 }
-                insert_anchor_from_status(&mut update, tx.txid, tx.status);
+                insert_anchor_or_seen_at_from_status(&mut update, start_time, tx.txid, tx.status);
                 insert_prevouts(&mut update, tx.vin);
             }
         }
@@ -357,6 +364,7 @@ where
 /// Refer to [crate-level docs](crate) for more.
 async fn fetch_txs_with_spks<I, S>(
     client: &esplora_client::AsyncClient<S>,
+    start_time: u64,
     inserted_txs: &mut HashSet<Txid>,
     spks: I,
     parallel_requests: usize,
@@ -368,6 +376,7 @@ where
 {
     fetch_txs_with_keychain_spks(
         client,
+        start_time,
         inserted_txs,
         spks.into_iter().enumerate().map(|(i, spk)| (i as u32, spk)),
         usize::MAX,
@@ -385,6 +394,7 @@ where
 /// Refer to [crate-level docs](crate) for more.
 async fn fetch_txs_with_txids<I, S>(
     client: &esplora_client::AsyncClient<S>,
+    start_time: u64,
     inserted_txs: &mut HashSet<Txid>,
     txids: I,
     parallel_requests: usize,
@@ -420,7 +430,7 @@ where
                 if inserted_txs.insert(txid) {
                     update.txs.push(tx_info.to_tx().into());
                 }
-                insert_anchor_from_status(&mut update, txid, tx_info.status);
+                insert_anchor_or_seen_at_from_status(&mut update, start_time, txid, tx_info.status);
                 insert_prevouts(&mut update, tx_info.vin);
             }
         }
@@ -436,6 +446,7 @@ where
 /// Refer to [crate-level docs](crate) for more.
 async fn fetch_txs_with_outpoints<I, S>(
     client: &esplora_client::AsyncClient<S>,
+    start_time: u64,
     inserted_txs: &mut HashSet<Txid>,
     outpoints: I,
     parallel_requests: usize,
@@ -453,6 +464,7 @@ where
     update.extend(
         fetch_txs_with_txids(
             client,
+            start_time,
             inserted_txs,
             outpoints.iter().copied().map(|op| op.txid),
             parallel_requests,
@@ -486,13 +498,26 @@ where
                 missing_txs.push(spend_txid);
             }
             if let Some(spend_status) = op_status.status {
-                insert_anchor_from_status(&mut update, spend_txid, spend_status);
+                insert_anchor_or_seen_at_from_status(
+                    &mut update,
+                    start_time,
+                    spend_txid,
+                    spend_status,
+                );
             }
         }
     }
 
-    update
-        .extend(fetch_txs_with_txids(client, inserted_txs, missing_txs, parallel_requests).await?);
+    update.extend(
+        fetch_txs_with_txids(
+            client,
+            start_time,
+            inserted_txs,
+            missing_txs,
+            parallel_requests,
+        )
+        .await?,
+    );
     Ok(update)
 }
 
index 655055b330657752eaa0b429e97f9dd486c43575..36c97195a89a360a2563b0b76fffba8ff2d83441 100644 (file)
@@ -7,7 +7,7 @@ use bdk_core::{
 use esplora_client::{OutputStatus, Tx};
 use std::thread::JoinHandle;
 
-use crate::{insert_anchor_from_status, insert_prevouts};
+use crate::{insert_anchor_or_seen_at_from_status, insert_prevouts};
 
 /// [`esplora_client::Error`]
 pub type Error = Box<esplora_client::Error>;
@@ -54,6 +54,7 @@ impl EsploraExt for esplora_client::BlockingClient {
         parallel_requests: usize,
     ) -> Result<FullScanResponse<K>, Error> {
         let mut request = request.into();
+        let start_time = request.start_time();
 
         let chain_tip = request.chain_tip();
         let latest_blocks = if chain_tip.is_some() {
@@ -69,6 +70,7 @@ impl EsploraExt for esplora_client::BlockingClient {
             let keychain_spks = request.iter_spks(keychain.clone());
             let (update, last_active_index) = fetch_txs_with_keychain_spks(
                 self,
+                start_time,
                 &mut inserted_txs,
                 keychain_spks,
                 stop_gap,
@@ -103,6 +105,7 @@ impl EsploraExt for esplora_client::BlockingClient {
         parallel_requests: usize,
     ) -> Result<SyncResponse, Error> {
         let mut request: SyncRequest<I> = request.into();
+        let start_time = request.start_time();
 
         let chain_tip = request.chain_tip();
         let latest_blocks = if chain_tip.is_some() {
@@ -115,18 +118,21 @@ impl EsploraExt for esplora_client::BlockingClient {
         let mut inserted_txs = HashSet::<Txid>::new();
         tx_update.extend(fetch_txs_with_spks(
             self,
+            start_time,
             &mut inserted_txs,
             request.iter_spks(),
             parallel_requests,
         )?);
         tx_update.extend(fetch_txs_with_txids(
             self,
+            start_time,
             &mut inserted_txs,
             request.iter_txids(),
             parallel_requests,
         )?);
         tx_update.extend(fetch_txs_with_outpoints(
             self,
+            start_time,
             &mut inserted_txs,
             request.iter_outpoints(),
             parallel_requests,
@@ -250,6 +256,7 @@ fn chain_update(
 
 fn fetch_txs_with_keychain_spks<I: Iterator<Item = Indexed<ScriptBuf>>>(
     client: &esplora_client::BlockingClient,
+    start_time: u64,
     inserted_txs: &mut HashSet<Txid>,
     mut keychain_spks: I,
     stop_gap: usize,
@@ -299,7 +306,7 @@ fn fetch_txs_with_keychain_spks<I: Iterator<Item = Indexed<ScriptBuf>>>(
                 if inserted_txs.insert(tx.txid) {
                     update.txs.push(tx.to_tx().into());
                 }
-                insert_anchor_from_status(&mut update, tx.txid, tx.status);
+                insert_anchor_or_seen_at_from_status(&mut update, start_time, tx.txid, tx.status);
                 insert_prevouts(&mut update, tx.vin);
             }
         }
@@ -328,12 +335,14 @@ fn fetch_txs_with_keychain_spks<I: Iterator<Item = Indexed<ScriptBuf>>>(
 /// Refer to [crate-level docs](crate) for more.
 fn fetch_txs_with_spks<I: IntoIterator<Item = ScriptBuf>>(
     client: &esplora_client::BlockingClient,
+    start_time: u64,
     inserted_txs: &mut HashSet<Txid>,
     spks: I,
     parallel_requests: usize,
 ) -> Result<TxUpdate<ConfirmationBlockTime>, Error> {
     fetch_txs_with_keychain_spks(
         client,
+        start_time,
         inserted_txs,
         spks.into_iter().enumerate().map(|(i, spk)| (i as u32, spk)),
         usize::MAX,
@@ -350,6 +359,7 @@ fn fetch_txs_with_spks<I: IntoIterator<Item = ScriptBuf>>(
 /// Refer to [crate-level docs](crate) for more.
 fn fetch_txs_with_txids<I: IntoIterator<Item = Txid>>(
     client: &esplora_client::BlockingClient,
+    start_time: u64,
     inserted_txs: &mut HashSet<Txid>,
     txids: I,
     parallel_requests: usize,
@@ -386,7 +396,7 @@ fn fetch_txs_with_txids<I: IntoIterator<Item = Txid>>(
                 if inserted_txs.insert(txid) {
                     update.txs.push(tx_info.to_tx().into());
                 }
-                insert_anchor_from_status(&mut update, txid, tx_info.status);
+                insert_anchor_or_seen_at_from_status(&mut update, start_time, txid, tx_info.status);
                 insert_prevouts(&mut update, tx_info.vin);
             }
         }
@@ -402,6 +412,7 @@ fn fetch_txs_with_txids<I: IntoIterator<Item = Txid>>(
 /// Refer to [crate-level docs](crate) for more.
 fn fetch_txs_with_outpoints<I: IntoIterator<Item = OutPoint>>(
     client: &esplora_client::BlockingClient,
+    start_time: u64,
     inserted_txs: &mut HashSet<Txid>,
     outpoints: I,
     parallel_requests: usize,
@@ -413,6 +424,7 @@ fn fetch_txs_with_outpoints<I: IntoIterator<Item = OutPoint>>(
     // TODO: We should maintain a tx cache (like we do with Electrum).
     update.extend(fetch_txs_with_txids(
         client,
+        start_time,
         inserted_txs,
         outpoints.iter().map(|op| op.txid),
         parallel_requests,
@@ -449,7 +461,12 @@ fn fetch_txs_with_outpoints<I: IntoIterator<Item = OutPoint>>(
                     missing_txs.push(spend_txid);
                 }
                 if let Some(spend_status) = op_status.status {
-                    insert_anchor_from_status(&mut update, spend_txid, spend_status);
+                    insert_anchor_or_seen_at_from_status(
+                        &mut update,
+                        start_time,
+                        spend_txid,
+                        spend_status,
+                    );
                 }
             }
         }
@@ -457,6 +474,7 @@ fn fetch_txs_with_outpoints<I: IntoIterator<Item = OutPoint>>(
 
     update.extend(fetch_txs_with_txids(
         client,
+        start_time,
         inserted_txs,
         missing_txs,
         parallel_requests,
index a166b6f9fb9c7eb8f1f1b094fbd411b080d9981d..a4d4b9a5b1a483188fa516f1cde15726177a9242 100644 (file)
@@ -36,8 +36,9 @@ mod async_ext;
 #[cfg(feature = "async")]
 pub use async_ext::*;
 
-fn insert_anchor_from_status(
+fn insert_anchor_or_seen_at_from_status(
     update: &mut TxUpdate<ConfirmationBlockTime>,
+    start_time: u64,
     txid: Txid,
     status: TxStatus,
 ) {
@@ -53,6 +54,8 @@ fn insert_anchor_from_status(
             confirmation_time: time,
         };
         update.anchors.insert((anchor, txid));
+    } else {
+        update.seen_ats.insert((txid, start_time));
     }
 }