From 800f3580f8518e15ccaf9622fdd2e5141a50d5e5 Mon Sep 17 00:00:00 2001 From: =?utf8?q?=E5=BF=97=E5=AE=87?= Date: Thu, 13 Feb 2025 22:08:18 +1100 Subject: [PATCH] feat!: Improve spk-based syncing flow * Change `TxUpdate::seen_ats` to be a `HashSet<(Txid, u64)>` so we can introduce multiple timestamps per tx. This is useful to introduce both `first_seen` and `last_seen` timestamps to `TxGraph`. This is also a better API for chain-sources as they can just insert timestamps into the field without checking previous values. * Change sync/full-scan flow to have the request structure introduce the sync time instead of introducing the timestamp when apply the `TxUpdate`. This simplifies the `apply_update{_at}` logic and makes `evicted_at` easier to reason about (in the future). --- crates/chain/benches/canonicalization.rs | 6 +- crates/chain/src/indexed_tx_graph.rs | 25 ----- crates/chain/src/tx_graph.rs | 36 +----- crates/chain/tests/test_tx_graph.rs | 4 +- crates/core/src/spk_client.rs | 124 +++++++++++++-------- crates/core/src/tx_update.rs | 18 ++- crates/electrum/src/bdk_electrum_client.rs | 53 +++++++-- crates/electrum/tests/test_electrum.rs | 16 ++- crates/esplora/src/async_ext.rs | 37 +++++- crates/esplora/src/blocking_ext.rs | 26 ++++- crates/esplora/src/lib.rs | 5 +- 11 files changed, 210 insertions(+), 140 deletions(-) diff --git a/crates/chain/benches/canonicalization.rs b/crates/chain/benches/canonicalization.rs index 52cbf51d..6893e6df 100644 --- a/crates/chain/benches/canonicalization.rs +++ b/crates/chain/benches/canonicalization.rs @@ -133,8 +133,9 @@ pub fn many_conflicting_unconfirmed(c: &mut Criterion) { ..new_tx(i) }; let mut update = TxUpdate::default(); + update.seen_ats = [(tx.compute_txid(), i as u64)].into(); update.txs = vec![Arc::new(tx)]; - let _ = tx_graph.apply_update_at(update, Some(i as u64)); + let _ = tx_graph.apply_update(update); } })); c.bench_function("many_conflicting_unconfirmed::list_canonical_txs", { @@ -168,8 +169,9 @@ pub fn many_chained_unconfirmed(c: &mut Criterion) { }; let txid = tx.compute_txid(); let mut update = TxUpdate::default(); + update.seen_ats = [(txid, i as u64)].into(); update.txs = vec![Arc::new(tx)]; - let _ = tx_graph.apply_update_at(update, Some(i as u64)); + let _ = tx_graph.apply_update(update); // Store the next prevout. previous_output = OutPoint::new(txid, 0); } diff --git a/crates/chain/src/indexed_tx_graph.rs b/crates/chain/src/indexed_tx_graph.rs index 039924c9..45ed92ae 100644 --- a/crates/chain/src/indexed_tx_graph.rs +++ b/crates/chain/src/indexed_tx_graph.rs @@ -91,37 +91,12 @@ where /// Apply an `update` directly. /// /// `update` is a [`tx_graph::TxUpdate`] and the resultant changes is returned as [`ChangeSet`]. - #[cfg(feature = "std")] - #[cfg_attr(docsrs, doc(cfg(feature = "std")))] pub fn apply_update(&mut self, update: tx_graph::TxUpdate) -> ChangeSet { let tx_graph = self.graph.apply_update(update); let indexer = self.index_tx_graph_changeset(&tx_graph); ChangeSet { tx_graph, indexer } } - /// Apply the given `update` with an optional `seen_at` timestamp. - /// - /// `seen_at` represents when the update is seen (in unix seconds). It is used to determine the - /// `last_seen`s for all transactions in the update which have no corresponding anchor(s). The - /// `last_seen` value is used internally to determine precedence of conflicting unconfirmed - /// transactions (where the transaction with the lower `last_seen` value is omitted from the - /// canonical history). - /// - /// Not setting a `seen_at` value means unconfirmed transactions introduced by this update will - /// not be part of the canonical history of transactions. - /// - /// Use [`apply_update`](IndexedTxGraph::apply_update) to have the `seen_at` value automatically - /// set to the current time. - pub fn apply_update_at( - &mut self, - update: tx_graph::TxUpdate, - seen_at: Option, - ) -> ChangeSet { - let tx_graph = self.graph.apply_update_at(update, seen_at); - let indexer = self.index_tx_graph_changeset(&tx_graph); - ChangeSet { tx_graph, indexer } - } - /// Insert a floating `txout` of given `outpoint`. pub fn insert_txout(&mut self, outpoint: OutPoint, txout: TxOut) -> ChangeSet { let graph = self.graph.insert_txout(outpoint, txout); diff --git a/crates/chain/src/tx_graph.rs b/crates/chain/src/tx_graph.rs index d0f7380a..d40ee49d 100644 --- a/crates/chain/src/tx_graph.rs +++ b/crates/chain/src/tx_graph.rs @@ -129,7 +129,7 @@ impl From> for TxUpdate { impl From> for TxGraph { fn from(update: TxUpdate) -> Self { let mut graph = TxGraph::::default(); - let _ = graph.apply_update_at(update, None); + let _ = graph.apply_update(update); graph } } @@ -719,52 +719,20 @@ impl TxGraph { /// /// The returned [`ChangeSet`] is the set difference between `update` and `self` (transactions that /// exist in `update` but not in `self`). - #[cfg(feature = "std")] - #[cfg_attr(docsrs, doc(cfg(feature = "std")))] pub fn apply_update(&mut self, update: TxUpdate) -> ChangeSet { - use std::time::*; - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("current time must be greater than epoch anchor"); - self.apply_update_at(update, Some(now.as_secs())) - } - - /// Extends this graph with the given `update` alongside an optional `seen_at` timestamp. - /// - /// `seen_at` represents when the update is seen (in unix seconds). It is used to determine the - /// `last_seen`s for all transactions in the update which have no corresponding anchor(s). The - /// `last_seen` value is used internally to determine precedence of conflicting unconfirmed - /// transactions (where the transaction with the lower `last_seen` value is omitted from the - /// canonical history). - /// - /// Not setting a `seen_at` value means unconfirmed transactions introduced by this update will - /// not be part of the canonical history of transactions. - /// - /// Use [`apply_update`](TxGraph::apply_update) to have the `seen_at` value automatically set - /// to the current time. - pub fn apply_update_at(&mut self, update: TxUpdate, seen_at: Option) -> ChangeSet { let mut changeset = ChangeSet::::default(); - let mut unanchored_txs = HashSet::::new(); for tx in update.txs { - if unanchored_txs.insert(tx.compute_txid()) { - changeset.merge(self.insert_tx(tx)); - } + changeset.merge(self.insert_tx(tx)); } for (outpoint, txout) in update.txouts { changeset.merge(self.insert_txout(outpoint, txout)); } for (anchor, txid) in update.anchors { - unanchored_txs.remove(&txid); changeset.merge(self.insert_anchor(txid, anchor)); } for (txid, seen_at) in update.seen_ats { changeset.merge(self.insert_seen_at(txid, seen_at)); } - if let Some(seen_at) = seen_at { - for txid in unanchored_txs { - changeset.merge(self.insert_seen_at(txid, seen_at)); - } - } changeset } diff --git a/crates/chain/tests/test_tx_graph.rs b/crates/chain/tests/test_tx_graph.rs index 16d2e6c6..eef5e223 100644 --- a/crates/chain/tests/test_tx_graph.rs +++ b/crates/chain/tests/test_tx_graph.rs @@ -94,7 +94,7 @@ fn insert_txouts() { // Insert partials transactions. update.txouts.insert(*outpoint, txout.clone()); // Mark them unconfirmed. - update.seen_ats.insert(outpoint.txid, unconf_seen_at); + update.seen_ats.insert((outpoint.txid, unconf_seen_at)); } // Insert the full transaction. @@ -1289,7 +1289,7 @@ fn tx_graph_update_conversion() { for (test_name, update) in test_cases { let mut tx_graph = TxGraph::::default(); - let _ = tx_graph.apply_update_at(update.clone(), None); + let _ = tx_graph.apply_update(update.clone()); let update_from_tx_graph: TxUpdate = tx_graph.into(); assert_eq!( diff --git a/crates/core/src/spk_client.rs b/crates/core/src/spk_client.rs index a5ec813c..dce3b7ae 100644 --- a/crates/core/src/spk_client.rs +++ b/crates/core/src/spk_client.rs @@ -87,19 +87,13 @@ impl SyncProgress { } /// Builds a [`SyncRequest`]. +/// +/// Construct with [`SyncRequest::builder`]. #[must_use] pub struct SyncRequestBuilder { inner: SyncRequest, } -impl Default for SyncRequestBuilder { - fn default() -> Self { - Self { - inner: Default::default(), - } - } -} - impl SyncRequestBuilder<()> { /// Add [`Script`]s that will be synced against. pub fn spks(self, spks: impl IntoIterator) -> Self { @@ -210,6 +204,7 @@ impl SyncRequestBuilder { /// ``` #[must_use] pub struct SyncRequest { + start_time: u64, chain_tip: Option, spks: VecDeque<(I, ScriptBuf)>, spks_consumed: usize, @@ -220,21 +215,6 @@ pub struct SyncRequest { inspect: Box>, } -impl Default for SyncRequest { - fn default() -> Self { - Self { - chain_tip: None, - spks: VecDeque::new(), - spks_consumed: 0, - txids: VecDeque::new(), - txids_consumed: 0, - outpoints: VecDeque::new(), - outpoints_consumed: 0, - inspect: Box::new(|_, _| {}), - } - } -} - impl From> for SyncRequest { fn from(builder: SyncRequestBuilder) -> Self { builder.inner @@ -242,13 +222,49 @@ impl From> for SyncRequest { } impl SyncRequest { - /// Start building a [`SyncRequest`]. - pub fn builder() -> SyncRequestBuilder { + /// Start building [`SyncRequest`] with a given `start_time`. + /// + /// `start_time` specifies the start time of sync. Chain sources can use this value to set + /// [`TxUpdate::seen_ats`](crate::TxUpdate::seen_ats) for mempool transactions. A transaction + /// without any `seen_ats` is assumed to be unseen in the mempool. + /// + /// Use [`SyncRequest::builder`] to use the current timestamp as `start_time` (this requires + /// `feature = "std"`). + pub fn builder_at(start_time: u64) -> SyncRequestBuilder { SyncRequestBuilder { - inner: Default::default(), + inner: Self { + start_time, + chain_tip: None, + spks: VecDeque::new(), + spks_consumed: 0, + txids: VecDeque::new(), + txids_consumed: 0, + outpoints: VecDeque::new(), + outpoints_consumed: 0, + inspect: Box::new(|_, _| ()), + }, } } + /// Start building [`SyncRequest`] with the current timestamp as the `start_time`. + /// + /// Use [`SyncRequest::builder_at`] to manually set the `start_time`, or if `feature = "std"` + /// is not available. + #[cfg(feature = "std")] + #[cfg_attr(docsrs, doc(cfg(feature = "std")))] + pub fn builder() -> SyncRequestBuilder { + let start_time = std::time::UNIX_EPOCH + .elapsed() + .expect("failed to get current timestamp") + .as_secs(); + Self::builder_at(start_time) + } + + /// When the sync-request was initiated. + pub fn start_time(&self) -> u64 { + self.start_time + } + /// Get the [`SyncProgress`] of this request. pub fn progress(&self) -> SyncProgress { SyncProgress { @@ -339,19 +355,13 @@ impl Default for SyncResponse { } /// Builds a [`FullScanRequest`]. +/// +/// Construct with [`FullScanRequest::builder`]. #[must_use] pub struct FullScanRequestBuilder { inner: FullScanRequest, } -impl Default for FullScanRequestBuilder { - fn default() -> Self { - Self { - inner: Default::default(), - } - } -} - impl FullScanRequestBuilder { /// Set the initial chain tip for the full scan request. /// @@ -397,6 +407,7 @@ impl FullScanRequestBuilder { /// [`chain_tip`](FullScanRequestBuilder::chain_tip) (if provided). #[must_use] pub struct FullScanRequest { + start_time: u64, chain_tip: Option, spks_by_keychain: BTreeMap> + Send>>, inspect: Box>, @@ -408,22 +419,43 @@ impl From> for FullScanRequest { } } -impl Default for FullScanRequest { - fn default() -> Self { - Self { - chain_tip: None, - spks_by_keychain: Default::default(), - inspect: Box::new(|_, _, _| {}), +impl FullScanRequest { + /// Start building a [`FullScanRequest`] with a given `start_time`. + /// + /// `start_time` specifies the start time of sync. Chain sources can use this value to set + /// [`TxUpdate::seen_ats`](crate::TxUpdate::seen_ats) for mempool transactions. A transaction + /// without any `seen_ats` is assumed to be unseen in the mempool. + /// + /// Use [`FullScanRequest::builder`] to use the current timestamp as `start_time` (this + /// requires `feature = "std`). + pub fn builder_at(start_time: u64) -> FullScanRequestBuilder { + FullScanRequestBuilder { + inner: Self { + start_time, + chain_tip: None, + spks_by_keychain: BTreeMap::new(), + inspect: Box::new(|_, _, _| ()), + }, } } -} -impl FullScanRequest { - /// Start building a [`FullScanRequest`]. + /// Start building a [`FullScanRequest`] with the current timestamp as the `start_time`. + /// + /// Use [`FullScanRequest::builder_at`] to manually set the `start_time`, or if `feature = + /// "std"` is not available. + #[cfg(feature = "std")] + #[cfg_attr(docsrs, doc(cfg(feature = "std")))] pub fn builder() -> FullScanRequestBuilder { - FullScanRequestBuilder { - inner: Self::default(), - } + let start_time = std::time::UNIX_EPOCH + .elapsed() + .expect("failed to get current timestamp") + .as_secs(); + Self::builder_at(start_time) + } + + /// When the full-scan-request was initiated. + pub fn start_time(&self) -> u64 { + self.start_time } /// Get the chain tip [`CheckPoint`] of this request (if any). diff --git a/crates/core/src/tx_update.rs b/crates/core/src/tx_update.rs index 5da2bff8..0b548313 100644 --- a/crates/core/src/tx_update.rs +++ b/crates/core/src/tx_update.rs @@ -1,4 +1,4 @@ -use crate::collections::{BTreeMap, BTreeSet, HashMap}; +use crate::collections::{BTreeMap, BTreeSet, HashSet}; use alloc::{sync::Arc, vec::Vec}; use bitcoin::{OutPoint, Transaction, TxOut, Txid}; @@ -24,16 +24,26 @@ pub struct TxUpdate { /// Full transactions. These are transactions that were determined to be relevant to the wallet /// given the request. pub txs: Vec>, + /// Floating txouts. These are `TxOut`s that exist but the whole transaction wasn't included in /// `txs` since only knowing about the output is important. These are often used to help determine /// the fee of a wallet transaction. pub txouts: BTreeMap, + /// Transaction anchors. Anchors tells us a position in the chain where a transaction was /// confirmed. pub anchors: BTreeSet<(A, Txid)>, - /// Seen at times for transactions. This records when a transaction was most recently seen in - /// the user's mempool for the sake of tie-breaking other conflicting transactions. - pub seen_ats: HashMap, + + /// When transactions were seen in the mempool. + /// + /// An unconfirmed transaction can only be canonical with a `seen_at` value. It is the + /// responsibility of the chain-source to include the `seen_at` values for unconfirmed + /// (unanchored) transactions. + /// + /// [`FullScanRequest::start_time`](crate::spk_client::FullScanRequest::start_time) or + /// [`SyncRequest::start_time`](crate::spk_client::SyncRequest::start_time) can be used to + /// provide the `seen_at` value. + pub seen_ats: HashSet<(Txid, u64)>, } impl Default for TxUpdate { diff --git a/crates/electrum/src/bdk_electrum_client.rs b/crates/electrum/src/bdk_electrum_client.rs index e187bf36..163854ad 100644 --- a/crates/electrum/src/bdk_electrum_client.rs +++ b/crates/electrum/src/bdk_electrum_client.rs @@ -128,6 +128,7 @@ impl BdkElectrumClient { fetch_prev_txouts: bool, ) -> Result, Error> { let mut request: FullScanRequest = request.into(); + let start_time = request.start_time(); let tip_and_latest_blocks = match request.chain_tip() { Some(chain_tip) => Some(fetch_tip_and_latest_blocks(&self.inner, chain_tip)?), @@ -139,7 +140,7 @@ impl BdkElectrumClient { for keychain in request.keychains() { let spks = request.iter_spks(keychain.clone()); if let Some(last_active_index) = - self.populate_with_spks(&mut tx_update, spks, stop_gap, batch_size)? + self.populate_with_spks(start_time, &mut tx_update, spks, stop_gap, batch_size)? { last_active_indices.insert(keychain, last_active_index); } @@ -196,6 +197,7 @@ impl BdkElectrumClient { fetch_prev_txouts: bool, ) -> Result { let mut request: SyncRequest = request.into(); + let start_time = request.start_time(); let tip_and_latest_blocks = match request.chain_tip() { Some(chain_tip) => Some(fetch_tip_and_latest_blocks(&self.inner, chain_tip)?), @@ -204,6 +206,7 @@ impl BdkElectrumClient { let mut tx_update = TxUpdate::::default(); self.populate_with_spks( + start_time, &mut tx_update, request .iter_spks() @@ -212,8 +215,8 @@ impl BdkElectrumClient { usize::MAX, batch_size, )?; - self.populate_with_txids(&mut tx_update, request.iter_txids())?; - self.populate_with_outpoints(&mut tx_update, request.iter_outpoints())?; + self.populate_with_txids(start_time, &mut tx_update, request.iter_txids())?; + self.populate_with_outpoints(start_time, &mut tx_update, request.iter_outpoints())?; // Fetch previous `TxOut`s for fee calculation if flag is enabled. if fetch_prev_txouts { @@ -242,6 +245,7 @@ impl BdkElectrumClient { /// also included. fn populate_with_spks( &self, + start_time: u64, tx_update: &mut TxUpdate, mut spks: impl Iterator, stop_gap: usize, @@ -268,7 +272,6 @@ impl BdkElectrumClient { if unused_spk_count >= stop_gap { return Ok(last_active_index); } - continue; } else { last_active_index = Some(spk_index); unused_spk_count = 0; @@ -276,8 +279,14 @@ impl BdkElectrumClient { for tx_res in spk_history { tx_update.txs.push(self.fetch_tx(tx_res.tx_hash)?); - if let Ok(height) = tx_res.height.try_into() { - self.validate_merkle_for_anchor(tx_update, tx_res.tx_hash, height)?; + match tx_res.height.try_into() { + // Returned heights 0 & -1 are reserved for unconfirmed txs. + Ok(height) if height > 0 => { + self.validate_merkle_for_anchor(tx_update, tx_res.tx_hash, height)?; + } + _ => { + tx_update.seen_ats.insert((tx_res.tx_hash, start_time)); + } } } } @@ -290,6 +299,7 @@ impl BdkElectrumClient { /// included. Anchors of the aforementioned transactions are included. fn populate_with_outpoints( &self, + start_time: u64, tx_update: &mut TxUpdate, outpoints: impl IntoIterator, ) -> Result<(), Error> { @@ -314,8 +324,14 @@ impl BdkElectrumClient { if !has_residing && res.tx_hash == op_txid { has_residing = true; tx_update.txs.push(Arc::clone(&op_tx)); - if let Ok(height) = res.height.try_into() { - self.validate_merkle_for_anchor(tx_update, res.tx_hash, height)?; + match res.height.try_into() { + // Returned heights 0 & -1 are reserved for unconfirmed txs. + Ok(height) if height > 0 => { + self.validate_merkle_for_anchor(tx_update, res.tx_hash, height)?; + } + _ => { + tx_update.seen_ats.insert((res.tx_hash, start_time)); + } } } @@ -330,8 +346,14 @@ impl BdkElectrumClient { continue; } tx_update.txs.push(Arc::clone(&res_tx)); - if let Ok(height) = res.height.try_into() { - self.validate_merkle_for_anchor(tx_update, res.tx_hash, height)?; + match res.height.try_into() { + // Returned heights 0 & -1 are reserved for unconfirmed txs. + Ok(height) if height > 0 => { + self.validate_merkle_for_anchor(tx_update, res.tx_hash, height)?; + } + _ => { + tx_update.seen_ats.insert((res.tx_hash, start_time)); + } } } } @@ -342,6 +364,7 @@ impl BdkElectrumClient { /// Populate the `tx_update` with transactions/anchors of the provided `txids`. fn populate_with_txids( &self, + start_time: u64, tx_update: &mut TxUpdate, txids: impl IntoIterator, ) -> Result<(), Error> { @@ -366,8 +389,14 @@ impl BdkElectrumClient { .into_iter() .find(|r| r.tx_hash == txid) { - if let Ok(height) = r.height.try_into() { - self.validate_merkle_for_anchor(tx_update, txid, height)?; + match r.height.try_into() { + // Returned heights 0 & -1 are reserved for unconfirmed txs. + Ok(height) if height > 0 => { + self.validate_merkle_for_anchor(tx_update, txid, height)?; + } + _ => { + tx_update.seen_ats.insert((r.tx_hash, start_time)); + } } } diff --git a/crates/electrum/tests/test_electrum.rs b/crates/electrum/tests/test_electrum.rs index 8c89605e..da15e980 100644 --- a/crates/electrum/tests/test_electrum.rs +++ b/crates/electrum/tests/test_electrum.rs @@ -75,7 +75,7 @@ pub fn chained_mempool_tx_sync() -> anyhow::Result<()> { env.mine_blocks(100, None)?; // First unconfirmed tx. - env.send(&tracked_addr, Amount::from_btc(1.0)?)?; + let txid1 = env.send(&tracked_addr, Amount::from_btc(1.0)?)?; // Create second unconfirmed tx that spends the first. let utxo = rpc_client @@ -100,7 +100,7 @@ pub fn chained_mempool_tx_sync() -> anyhow::Result<()> { let signed_tx = rpc_client .sign_raw_transaction_with_wallet(tx_that_spends_unconfirmed.raw_hex(), None, None)? .transaction()?; - rpc_client.send_raw_transaction(signed_tx.raw_hex())?; + let txid2 = rpc_client.send_raw_transaction(signed_tx.raw_hex())?; env.wait_until_electrum_sees_txid(signed_tx.compute_txid(), Duration::from_secs(5))?; @@ -111,8 +111,16 @@ pub fn chained_mempool_tx_sync() -> anyhow::Result<()> { ); let client = BdkElectrumClient::new(electrum_client); - let request = SyncRequest::builder().spks(core::iter::once(tracked_addr.script_pubkey())); - let _response = client.sync(request, 1, false)?; + let req = SyncRequest::builder() + .spks(core::iter::once(tracked_addr.script_pubkey())) + .build(); + let req_time = req.start_time(); + let response = client.sync(req, 1, false)?; + assert_eq!( + response.tx_update.seen_ats, + [(txid1, req_time), (txid2, req_time)].into(), + "both txids must have `seen_at` time match the request's `start_time`", + ); Ok(()) } diff --git a/crates/esplora/src/async_ext.rs b/crates/esplora/src/async_ext.rs index 4c1bd0ad..4cb34ad8 100644 --- a/crates/esplora/src/async_ext.rs +++ b/crates/esplora/src/async_ext.rs @@ -8,7 +8,7 @@ use bdk_core::{ use esplora_client::Sleeper; use futures::{stream::FuturesOrdered, TryStreamExt}; -use crate::{insert_anchor_from_status, insert_prevouts}; +use crate::{insert_anchor_or_seen_at_from_status, insert_prevouts}; /// [`esplora_client::Error`] type Error = Box; @@ -63,6 +63,7 @@ where parallel_requests: usize, ) -> Result, Error> { let mut request = request.into(); + let start_time = request.start_time(); let keychains = request.keychains(); let chain_tip = request.chain_tip(); @@ -79,6 +80,7 @@ where let keychain_spks = request.iter_spks(keychain.clone()); let (update, last_active_index) = fetch_txs_with_keychain_spks( self, + start_time, &mut inserted_txs, keychain_spks, stop_gap, @@ -111,6 +113,7 @@ where parallel_requests: usize, ) -> Result { let mut request = request.into(); + let start_time = request.start_time(); let chain_tip = request.chain_tip(); let latest_blocks = if chain_tip.is_some() { @@ -124,6 +127,7 @@ where tx_update.extend( fetch_txs_with_spks( self, + start_time, &mut inserted_txs, request.iter_spks(), parallel_requests, @@ -133,6 +137,7 @@ where tx_update.extend( fetch_txs_with_txids( self, + start_time, &mut inserted_txs, request.iter_txids(), parallel_requests, @@ -142,6 +147,7 @@ where tx_update.extend( fetch_txs_with_outpoints( self, + start_time, &mut inserted_txs, request.iter_outpoints(), parallel_requests, @@ -278,6 +284,7 @@ async fn chain_update( /// Refer to [crate-level docs](crate) for more. async fn fetch_txs_with_keychain_spks( client: &esplora_client::AsyncClient, + start_time: u64, inserted_txs: &mut HashSet, mut keychain_spks: I, stop_gap: usize, @@ -328,7 +335,7 @@ where if inserted_txs.insert(tx.txid) { update.txs.push(tx.to_tx().into()); } - insert_anchor_from_status(&mut update, tx.txid, tx.status); + insert_anchor_or_seen_at_from_status(&mut update, start_time, tx.txid, tx.status); insert_prevouts(&mut update, tx.vin); } } @@ -357,6 +364,7 @@ where /// Refer to [crate-level docs](crate) for more. async fn fetch_txs_with_spks( client: &esplora_client::AsyncClient, + start_time: u64, inserted_txs: &mut HashSet, spks: I, parallel_requests: usize, @@ -368,6 +376,7 @@ where { fetch_txs_with_keychain_spks( client, + start_time, inserted_txs, spks.into_iter().enumerate().map(|(i, spk)| (i as u32, spk)), usize::MAX, @@ -385,6 +394,7 @@ where /// Refer to [crate-level docs](crate) for more. async fn fetch_txs_with_txids( client: &esplora_client::AsyncClient, + start_time: u64, inserted_txs: &mut HashSet, txids: I, parallel_requests: usize, @@ -420,7 +430,7 @@ where if inserted_txs.insert(txid) { update.txs.push(tx_info.to_tx().into()); } - insert_anchor_from_status(&mut update, txid, tx_info.status); + insert_anchor_or_seen_at_from_status(&mut update, start_time, txid, tx_info.status); insert_prevouts(&mut update, tx_info.vin); } } @@ -436,6 +446,7 @@ where /// Refer to [crate-level docs](crate) for more. async fn fetch_txs_with_outpoints( client: &esplora_client::AsyncClient, + start_time: u64, inserted_txs: &mut HashSet, outpoints: I, parallel_requests: usize, @@ -453,6 +464,7 @@ where update.extend( fetch_txs_with_txids( client, + start_time, inserted_txs, outpoints.iter().copied().map(|op| op.txid), parallel_requests, @@ -486,13 +498,26 @@ where missing_txs.push(spend_txid); } if let Some(spend_status) = op_status.status { - insert_anchor_from_status(&mut update, spend_txid, spend_status); + insert_anchor_or_seen_at_from_status( + &mut update, + start_time, + spend_txid, + spend_status, + ); } } } - update - .extend(fetch_txs_with_txids(client, inserted_txs, missing_txs, parallel_requests).await?); + update.extend( + fetch_txs_with_txids( + client, + start_time, + inserted_txs, + missing_txs, + parallel_requests, + ) + .await?, + ); Ok(update) } diff --git a/crates/esplora/src/blocking_ext.rs b/crates/esplora/src/blocking_ext.rs index 655055b3..36c97195 100644 --- a/crates/esplora/src/blocking_ext.rs +++ b/crates/esplora/src/blocking_ext.rs @@ -7,7 +7,7 @@ use bdk_core::{ use esplora_client::{OutputStatus, Tx}; use std::thread::JoinHandle; -use crate::{insert_anchor_from_status, insert_prevouts}; +use crate::{insert_anchor_or_seen_at_from_status, insert_prevouts}; /// [`esplora_client::Error`] pub type Error = Box; @@ -54,6 +54,7 @@ impl EsploraExt for esplora_client::BlockingClient { parallel_requests: usize, ) -> Result, Error> { let mut request = request.into(); + let start_time = request.start_time(); let chain_tip = request.chain_tip(); let latest_blocks = if chain_tip.is_some() { @@ -69,6 +70,7 @@ impl EsploraExt for esplora_client::BlockingClient { let keychain_spks = request.iter_spks(keychain.clone()); let (update, last_active_index) = fetch_txs_with_keychain_spks( self, + start_time, &mut inserted_txs, keychain_spks, stop_gap, @@ -103,6 +105,7 @@ impl EsploraExt for esplora_client::BlockingClient { parallel_requests: usize, ) -> Result { let mut request: SyncRequest = request.into(); + let start_time = request.start_time(); let chain_tip = request.chain_tip(); let latest_blocks = if chain_tip.is_some() { @@ -115,18 +118,21 @@ impl EsploraExt for esplora_client::BlockingClient { let mut inserted_txs = HashSet::::new(); tx_update.extend(fetch_txs_with_spks( self, + start_time, &mut inserted_txs, request.iter_spks(), parallel_requests, )?); tx_update.extend(fetch_txs_with_txids( self, + start_time, &mut inserted_txs, request.iter_txids(), parallel_requests, )?); tx_update.extend(fetch_txs_with_outpoints( self, + start_time, &mut inserted_txs, request.iter_outpoints(), parallel_requests, @@ -250,6 +256,7 @@ fn chain_update( fn fetch_txs_with_keychain_spks>>( client: &esplora_client::BlockingClient, + start_time: u64, inserted_txs: &mut HashSet, mut keychain_spks: I, stop_gap: usize, @@ -299,7 +306,7 @@ fn fetch_txs_with_keychain_spks>>( if inserted_txs.insert(tx.txid) { update.txs.push(tx.to_tx().into()); } - insert_anchor_from_status(&mut update, tx.txid, tx.status); + insert_anchor_or_seen_at_from_status(&mut update, start_time, tx.txid, tx.status); insert_prevouts(&mut update, tx.vin); } } @@ -328,12 +335,14 @@ fn fetch_txs_with_keychain_spks>>( /// Refer to [crate-level docs](crate) for more. fn fetch_txs_with_spks>( client: &esplora_client::BlockingClient, + start_time: u64, inserted_txs: &mut HashSet, spks: I, parallel_requests: usize, ) -> Result, Error> { fetch_txs_with_keychain_spks( client, + start_time, inserted_txs, spks.into_iter().enumerate().map(|(i, spk)| (i as u32, spk)), usize::MAX, @@ -350,6 +359,7 @@ fn fetch_txs_with_spks>( /// Refer to [crate-level docs](crate) for more. fn fetch_txs_with_txids>( client: &esplora_client::BlockingClient, + start_time: u64, inserted_txs: &mut HashSet, txids: I, parallel_requests: usize, @@ -386,7 +396,7 @@ fn fetch_txs_with_txids>( if inserted_txs.insert(txid) { update.txs.push(tx_info.to_tx().into()); } - insert_anchor_from_status(&mut update, txid, tx_info.status); + insert_anchor_or_seen_at_from_status(&mut update, start_time, txid, tx_info.status); insert_prevouts(&mut update, tx_info.vin); } } @@ -402,6 +412,7 @@ fn fetch_txs_with_txids>( /// Refer to [crate-level docs](crate) for more. fn fetch_txs_with_outpoints>( client: &esplora_client::BlockingClient, + start_time: u64, inserted_txs: &mut HashSet, outpoints: I, parallel_requests: usize, @@ -413,6 +424,7 @@ fn fetch_txs_with_outpoints>( // TODO: We should maintain a tx cache (like we do with Electrum). update.extend(fetch_txs_with_txids( client, + start_time, inserted_txs, outpoints.iter().map(|op| op.txid), parallel_requests, @@ -449,7 +461,12 @@ fn fetch_txs_with_outpoints>( missing_txs.push(spend_txid); } if let Some(spend_status) = op_status.status { - insert_anchor_from_status(&mut update, spend_txid, spend_status); + insert_anchor_or_seen_at_from_status( + &mut update, + start_time, + spend_txid, + spend_status, + ); } } } @@ -457,6 +474,7 @@ fn fetch_txs_with_outpoints>( update.extend(fetch_txs_with_txids( client, + start_time, inserted_txs, missing_txs, parallel_requests, diff --git a/crates/esplora/src/lib.rs b/crates/esplora/src/lib.rs index a166b6f9..a4d4b9a5 100644 --- a/crates/esplora/src/lib.rs +++ b/crates/esplora/src/lib.rs @@ -36,8 +36,9 @@ mod async_ext; #[cfg(feature = "async")] pub use async_ext::*; -fn insert_anchor_from_status( +fn insert_anchor_or_seen_at_from_status( update: &mut TxUpdate, + start_time: u64, txid: Txid, status: TxStatus, ) { @@ -53,6 +54,8 @@ fn insert_anchor_from_status( confirmation_time: time, }; update.anchors.insert((anchor, txid)); + } else { + update.seen_ats.insert((txid, start_time)); } } -- 2.49.0