..new_tx(i)
};
let mut update = TxUpdate::default();
+ update.seen_ats = [(tx.compute_txid(), i as u64)].into();
update.txs = vec![Arc::new(tx)];
- let _ = tx_graph.apply_update_at(update, Some(i as u64));
+ let _ = tx_graph.apply_update(update);
}
}));
c.bench_function("many_conflicting_unconfirmed::list_canonical_txs", {
};
let txid = tx.compute_txid();
let mut update = TxUpdate::default();
+ update.seen_ats = [(txid, i as u64)].into();
update.txs = vec![Arc::new(tx)];
- let _ = tx_graph.apply_update_at(update, Some(i as u64));
+ let _ = tx_graph.apply_update(update);
// Store the next prevout.
previous_output = OutPoint::new(txid, 0);
}
/// Apply an `update` directly.
///
/// `update` is a [`tx_graph::TxUpdate<A>`] and the resultant changes is returned as [`ChangeSet`].
- #[cfg(feature = "std")]
- #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
pub fn apply_update(&mut self, update: tx_graph::TxUpdate<A>) -> ChangeSet<A, I::ChangeSet> {
let tx_graph = self.graph.apply_update(update);
let indexer = self.index_tx_graph_changeset(&tx_graph);
ChangeSet { tx_graph, indexer }
}
- /// Apply the given `update` with an optional `seen_at` timestamp.
- ///
- /// `seen_at` represents when the update is seen (in unix seconds). It is used to determine the
- /// `last_seen`s for all transactions in the update which have no corresponding anchor(s). The
- /// `last_seen` value is used internally to determine precedence of conflicting unconfirmed
- /// transactions (where the transaction with the lower `last_seen` value is omitted from the
- /// canonical history).
- ///
- /// Not setting a `seen_at` value means unconfirmed transactions introduced by this update will
- /// not be part of the canonical history of transactions.
- ///
- /// Use [`apply_update`](IndexedTxGraph::apply_update) to have the `seen_at` value automatically
- /// set to the current time.
- pub fn apply_update_at(
- &mut self,
- update: tx_graph::TxUpdate<A>,
- seen_at: Option<u64>,
- ) -> ChangeSet<A, I::ChangeSet> {
- let tx_graph = self.graph.apply_update_at(update, seen_at);
- let indexer = self.index_tx_graph_changeset(&tx_graph);
- ChangeSet { tx_graph, indexer }
- }
-
/// Insert a floating `txout` of given `outpoint`.
pub fn insert_txout(&mut self, outpoint: OutPoint, txout: TxOut) -> ChangeSet<A, I::ChangeSet> {
let graph = self.graph.insert_txout(outpoint, txout);
impl<A: Anchor> From<TxUpdate<A>> for TxGraph<A> {
fn from(update: TxUpdate<A>) -> Self {
let mut graph = TxGraph::<A>::default();
- let _ = graph.apply_update_at(update, None);
+ let _ = graph.apply_update(update);
graph
}
}
///
/// The returned [`ChangeSet`] is the set difference between `update` and `self` (transactions that
/// exist in `update` but not in `self`).
- #[cfg(feature = "std")]
- #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
pub fn apply_update(&mut self, update: TxUpdate<A>) -> ChangeSet<A> {
- use std::time::*;
- let now = SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .expect("current time must be greater than epoch anchor");
- self.apply_update_at(update, Some(now.as_secs()))
- }
-
- /// Extends this graph with the given `update` alongside an optional `seen_at` timestamp.
- ///
- /// `seen_at` represents when the update is seen (in unix seconds). It is used to determine the
- /// `last_seen`s for all transactions in the update which have no corresponding anchor(s). The
- /// `last_seen` value is used internally to determine precedence of conflicting unconfirmed
- /// transactions (where the transaction with the lower `last_seen` value is omitted from the
- /// canonical history).
- ///
- /// Not setting a `seen_at` value means unconfirmed transactions introduced by this update will
- /// not be part of the canonical history of transactions.
- ///
- /// Use [`apply_update`](TxGraph::apply_update) to have the `seen_at` value automatically set
- /// to the current time.
- pub fn apply_update_at(&mut self, update: TxUpdate<A>, seen_at: Option<u64>) -> ChangeSet<A> {
let mut changeset = ChangeSet::<A>::default();
- let mut unanchored_txs = HashSet::<Txid>::new();
for tx in update.txs {
- if unanchored_txs.insert(tx.compute_txid()) {
- changeset.merge(self.insert_tx(tx));
- }
+ changeset.merge(self.insert_tx(tx));
}
for (outpoint, txout) in update.txouts {
changeset.merge(self.insert_txout(outpoint, txout));
}
for (anchor, txid) in update.anchors {
- unanchored_txs.remove(&txid);
changeset.merge(self.insert_anchor(txid, anchor));
}
for (txid, seen_at) in update.seen_ats {
changeset.merge(self.insert_seen_at(txid, seen_at));
}
- if let Some(seen_at) = seen_at {
- for txid in unanchored_txs {
- changeset.merge(self.insert_seen_at(txid, seen_at));
- }
- }
changeset
}
// Insert partials transactions.
update.txouts.insert(*outpoint, txout.clone());
// Mark them unconfirmed.
- update.seen_ats.insert(outpoint.txid, unconf_seen_at);
+ update.seen_ats.insert((outpoint.txid, unconf_seen_at));
}
// Insert the full transaction.
for (test_name, update) in test_cases {
let mut tx_graph = TxGraph::<ConfirmationBlockTime>::default();
- let _ = tx_graph.apply_update_at(update.clone(), None);
+ let _ = tx_graph.apply_update(update.clone());
let update_from_tx_graph: TxUpdate<ConfirmationBlockTime> = tx_graph.into();
assert_eq!(
}
/// Builds a [`SyncRequest`].
+///
+/// Construct with [`SyncRequest::builder`].
#[must_use]
pub struct SyncRequestBuilder<I = ()> {
inner: SyncRequest<I>,
}
-impl<I> Default for SyncRequestBuilder<I> {
- fn default() -> Self {
- Self {
- inner: Default::default(),
- }
- }
-}
-
impl SyncRequestBuilder<()> {
/// Add [`Script`]s that will be synced against.
pub fn spks(self, spks: impl IntoIterator<Item = ScriptBuf>) -> Self {
/// ```
#[must_use]
pub struct SyncRequest<I = ()> {
+ start_time: u64,
chain_tip: Option<CheckPoint>,
spks: VecDeque<(I, ScriptBuf)>,
spks_consumed: usize,
inspect: Box<InspectSync<I>>,
}
-impl<I> Default for SyncRequest<I> {
- fn default() -> Self {
- Self {
- chain_tip: None,
- spks: VecDeque::new(),
- spks_consumed: 0,
- txids: VecDeque::new(),
- txids_consumed: 0,
- outpoints: VecDeque::new(),
- outpoints_consumed: 0,
- inspect: Box::new(|_, _| {}),
- }
- }
-}
-
impl<I> From<SyncRequestBuilder<I>> for SyncRequest<I> {
fn from(builder: SyncRequestBuilder<I>) -> Self {
builder.inner
}
impl<I> SyncRequest<I> {
- /// Start building a [`SyncRequest`].
- pub fn builder() -> SyncRequestBuilder<I> {
+ /// Start building [`SyncRequest`] with a given `start_time`.
+ ///
+ /// `start_time` specifies the start time of sync. Chain sources can use this value to set
+ /// [`TxUpdate::seen_ats`](crate::TxUpdate::seen_ats) for mempool transactions. A transaction
+ /// without any `seen_ats` is assumed to be unseen in the mempool.
+ ///
+ /// Use [`SyncRequest::builder`] to use the current timestamp as `start_time` (this requires
+ /// `feature = "std"`).
+ pub fn builder_at(start_time: u64) -> SyncRequestBuilder<I> {
SyncRequestBuilder {
- inner: Default::default(),
+ inner: Self {
+ start_time,
+ chain_tip: None,
+ spks: VecDeque::new(),
+ spks_consumed: 0,
+ txids: VecDeque::new(),
+ txids_consumed: 0,
+ outpoints: VecDeque::new(),
+ outpoints_consumed: 0,
+ inspect: Box::new(|_, _| ()),
+ },
}
}
+ /// Start building [`SyncRequest`] with the current timestamp as the `start_time`.
+ ///
+ /// Use [`SyncRequest::builder_at`] to manually set the `start_time`, or if `feature = "std"`
+ /// is not available.
+ #[cfg(feature = "std")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
+ pub fn builder() -> SyncRequestBuilder<I> {
+ let start_time = std::time::UNIX_EPOCH
+ .elapsed()
+ .expect("failed to get current timestamp")
+ .as_secs();
+ Self::builder_at(start_time)
+ }
+
+ /// When the sync-request was initiated.
+ pub fn start_time(&self) -> u64 {
+ self.start_time
+ }
+
/// Get the [`SyncProgress`] of this request.
pub fn progress(&self) -> SyncProgress {
SyncProgress {
}
/// Builds a [`FullScanRequest`].
+///
+/// Construct with [`FullScanRequest::builder`].
#[must_use]
pub struct FullScanRequestBuilder<K> {
inner: FullScanRequest<K>,
}
-impl<K> Default for FullScanRequestBuilder<K> {
- fn default() -> Self {
- Self {
- inner: Default::default(),
- }
- }
-}
-
impl<K: Ord> FullScanRequestBuilder<K> {
/// Set the initial chain tip for the full scan request.
///
/// [`chain_tip`](FullScanRequestBuilder::chain_tip) (if provided).
#[must_use]
pub struct FullScanRequest<K> {
+ start_time: u64,
chain_tip: Option<CheckPoint>,
spks_by_keychain: BTreeMap<K, Box<dyn Iterator<Item = Indexed<ScriptBuf>> + Send>>,
inspect: Box<InspectFullScan<K>>,
}
}
-impl<K> Default for FullScanRequest<K> {
- fn default() -> Self {
- Self {
- chain_tip: None,
- spks_by_keychain: Default::default(),
- inspect: Box::new(|_, _, _| {}),
+impl<K: Ord + Clone> FullScanRequest<K> {
+ /// Start building a [`FullScanRequest`] with a given `start_time`.
+ ///
+ /// `start_time` specifies the start time of sync. Chain sources can use this value to set
+ /// [`TxUpdate::seen_ats`](crate::TxUpdate::seen_ats) for mempool transactions. A transaction
+ /// without any `seen_ats` is assumed to be unseen in the mempool.
+ ///
+ /// Use [`FullScanRequest::builder`] to use the current timestamp as `start_time` (this
+ /// requires `feature = "std`).
+ pub fn builder_at(start_time: u64) -> FullScanRequestBuilder<K> {
+ FullScanRequestBuilder {
+ inner: Self {
+ start_time,
+ chain_tip: None,
+ spks_by_keychain: BTreeMap::new(),
+ inspect: Box::new(|_, _, _| ()),
+ },
}
}
-}
-impl<K: Ord + Clone> FullScanRequest<K> {
- /// Start building a [`FullScanRequest`].
+ /// Start building a [`FullScanRequest`] with the current timestamp as the `start_time`.
+ ///
+ /// Use [`FullScanRequest::builder_at`] to manually set the `start_time`, or if `feature =
+ /// "std"` is not available.
+ #[cfg(feature = "std")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
pub fn builder() -> FullScanRequestBuilder<K> {
- FullScanRequestBuilder {
- inner: Self::default(),
- }
+ let start_time = std::time::UNIX_EPOCH
+ .elapsed()
+ .expect("failed to get current timestamp")
+ .as_secs();
+ Self::builder_at(start_time)
+ }
+
+ /// When the full-scan-request was initiated.
+ pub fn start_time(&self) -> u64 {
+ self.start_time
}
/// Get the chain tip [`CheckPoint`] of this request (if any).
-use crate::collections::{BTreeMap, BTreeSet, HashMap};
+use crate::collections::{BTreeMap, BTreeSet, HashSet};
use alloc::{sync::Arc, vec::Vec};
use bitcoin::{OutPoint, Transaction, TxOut, Txid};
/// Full transactions. These are transactions that were determined to be relevant to the wallet
/// given the request.
pub txs: Vec<Arc<Transaction>>,
+
/// Floating txouts. These are `TxOut`s that exist but the whole transaction wasn't included in
/// `txs` since only knowing about the output is important. These are often used to help determine
/// the fee of a wallet transaction.
pub txouts: BTreeMap<OutPoint, TxOut>,
+
/// Transaction anchors. Anchors tells us a position in the chain where a transaction was
/// confirmed.
pub anchors: BTreeSet<(A, Txid)>,
- /// Seen at times for transactions. This records when a transaction was most recently seen in
- /// the user's mempool for the sake of tie-breaking other conflicting transactions.
- pub seen_ats: HashMap<Txid, u64>,
+
+ /// When transactions were seen in the mempool.
+ ///
+ /// An unconfirmed transaction can only be canonical with a `seen_at` value. It is the
+ /// responsibility of the chain-source to include the `seen_at` values for unconfirmed
+ /// (unanchored) transactions.
+ ///
+ /// [`FullScanRequest::start_time`](crate::spk_client::FullScanRequest::start_time) or
+ /// [`SyncRequest::start_time`](crate::spk_client::SyncRequest::start_time) can be used to
+ /// provide the `seen_at` value.
+ pub seen_ats: HashSet<(Txid, u64)>,
}
impl<A> Default for TxUpdate<A> {
fetch_prev_txouts: bool,
) -> Result<FullScanResponse<K>, Error> {
let mut request: FullScanRequest<K> = request.into();
+ let start_time = request.start_time();
let tip_and_latest_blocks = match request.chain_tip() {
Some(chain_tip) => Some(fetch_tip_and_latest_blocks(&self.inner, chain_tip)?),
for keychain in request.keychains() {
let spks = request.iter_spks(keychain.clone());
if let Some(last_active_index) =
- self.populate_with_spks(&mut tx_update, spks, stop_gap, batch_size)?
+ self.populate_with_spks(start_time, &mut tx_update, spks, stop_gap, batch_size)?
{
last_active_indices.insert(keychain, last_active_index);
}
fetch_prev_txouts: bool,
) -> Result<SyncResponse, Error> {
let mut request: SyncRequest<I> = request.into();
+ let start_time = request.start_time();
let tip_and_latest_blocks = match request.chain_tip() {
Some(chain_tip) => Some(fetch_tip_and_latest_blocks(&self.inner, chain_tip)?),
let mut tx_update = TxUpdate::<ConfirmationBlockTime>::default();
self.populate_with_spks(
+ start_time,
&mut tx_update,
request
.iter_spks()
usize::MAX,
batch_size,
)?;
- self.populate_with_txids(&mut tx_update, request.iter_txids())?;
- self.populate_with_outpoints(&mut tx_update, request.iter_outpoints())?;
+ self.populate_with_txids(start_time, &mut tx_update, request.iter_txids())?;
+ self.populate_with_outpoints(start_time, &mut tx_update, request.iter_outpoints())?;
// Fetch previous `TxOut`s for fee calculation if flag is enabled.
if fetch_prev_txouts {
/// also included.
fn populate_with_spks(
&self,
+ start_time: u64,
tx_update: &mut TxUpdate<ConfirmationBlockTime>,
mut spks: impl Iterator<Item = (u32, ScriptBuf)>,
stop_gap: usize,
if unused_spk_count >= stop_gap {
return Ok(last_active_index);
}
- continue;
} else {
last_active_index = Some(spk_index);
unused_spk_count = 0;
for tx_res in spk_history {
tx_update.txs.push(self.fetch_tx(tx_res.tx_hash)?);
- if let Ok(height) = tx_res.height.try_into() {
- self.validate_merkle_for_anchor(tx_update, tx_res.tx_hash, height)?;
+ match tx_res.height.try_into() {
+ // Returned heights 0 & -1 are reserved for unconfirmed txs.
+ Ok(height) if height > 0 => {
+ self.validate_merkle_for_anchor(tx_update, tx_res.tx_hash, height)?;
+ }
+ _ => {
+ tx_update.seen_ats.insert((tx_res.tx_hash, start_time));
+ }
}
}
}
/// included. Anchors of the aforementioned transactions are included.
fn populate_with_outpoints(
&self,
+ start_time: u64,
tx_update: &mut TxUpdate<ConfirmationBlockTime>,
outpoints: impl IntoIterator<Item = OutPoint>,
) -> Result<(), Error> {
if !has_residing && res.tx_hash == op_txid {
has_residing = true;
tx_update.txs.push(Arc::clone(&op_tx));
- if let Ok(height) = res.height.try_into() {
- self.validate_merkle_for_anchor(tx_update, res.tx_hash, height)?;
+ match res.height.try_into() {
+ // Returned heights 0 & -1 are reserved for unconfirmed txs.
+ Ok(height) if height > 0 => {
+ self.validate_merkle_for_anchor(tx_update, res.tx_hash, height)?;
+ }
+ _ => {
+ tx_update.seen_ats.insert((res.tx_hash, start_time));
+ }
}
}
continue;
}
tx_update.txs.push(Arc::clone(&res_tx));
- if let Ok(height) = res.height.try_into() {
- self.validate_merkle_for_anchor(tx_update, res.tx_hash, height)?;
+ match res.height.try_into() {
+ // Returned heights 0 & -1 are reserved for unconfirmed txs.
+ Ok(height) if height > 0 => {
+ self.validate_merkle_for_anchor(tx_update, res.tx_hash, height)?;
+ }
+ _ => {
+ tx_update.seen_ats.insert((res.tx_hash, start_time));
+ }
}
}
}
/// Populate the `tx_update` with transactions/anchors of the provided `txids`.
fn populate_with_txids(
&self,
+ start_time: u64,
tx_update: &mut TxUpdate<ConfirmationBlockTime>,
txids: impl IntoIterator<Item = Txid>,
) -> Result<(), Error> {
.into_iter()
.find(|r| r.tx_hash == txid)
{
- if let Ok(height) = r.height.try_into() {
- self.validate_merkle_for_anchor(tx_update, txid, height)?;
+ match r.height.try_into() {
+ // Returned heights 0 & -1 are reserved for unconfirmed txs.
+ Ok(height) if height > 0 => {
+ self.validate_merkle_for_anchor(tx_update, txid, height)?;
+ }
+ _ => {
+ tx_update.seen_ats.insert((r.tx_hash, start_time));
+ }
}
}
env.mine_blocks(100, None)?;
// First unconfirmed tx.
- env.send(&tracked_addr, Amount::from_btc(1.0)?)?;
+ let txid1 = env.send(&tracked_addr, Amount::from_btc(1.0)?)?;
// Create second unconfirmed tx that spends the first.
let utxo = rpc_client
let signed_tx = rpc_client
.sign_raw_transaction_with_wallet(tx_that_spends_unconfirmed.raw_hex(), None, None)?
.transaction()?;
- rpc_client.send_raw_transaction(signed_tx.raw_hex())?;
+ let txid2 = rpc_client.send_raw_transaction(signed_tx.raw_hex())?;
env.wait_until_electrum_sees_txid(signed_tx.compute_txid(), Duration::from_secs(5))?;
);
let client = BdkElectrumClient::new(electrum_client);
- let request = SyncRequest::builder().spks(core::iter::once(tracked_addr.script_pubkey()));
- let _response = client.sync(request, 1, false)?;
+ let req = SyncRequest::builder()
+ .spks(core::iter::once(tracked_addr.script_pubkey()))
+ .build();
+ let req_time = req.start_time();
+ let response = client.sync(req, 1, false)?;
+ assert_eq!(
+ response.tx_update.seen_ats,
+ [(txid1, req_time), (txid2, req_time)].into(),
+ "both txids must have `seen_at` time match the request's `start_time`",
+ );
Ok(())
}
use esplora_client::Sleeper;
use futures::{stream::FuturesOrdered, TryStreamExt};
-use crate::{insert_anchor_from_status, insert_prevouts};
+use crate::{insert_anchor_or_seen_at_from_status, insert_prevouts};
/// [`esplora_client::Error`]
type Error = Box<esplora_client::Error>;
parallel_requests: usize,
) -> Result<FullScanResponse<K>, Error> {
let mut request = request.into();
+ let start_time = request.start_time();
let keychains = request.keychains();
let chain_tip = request.chain_tip();
let keychain_spks = request.iter_spks(keychain.clone());
let (update, last_active_index) = fetch_txs_with_keychain_spks(
self,
+ start_time,
&mut inserted_txs,
keychain_spks,
stop_gap,
parallel_requests: usize,
) -> Result<SyncResponse, Error> {
let mut request = request.into();
+ let start_time = request.start_time();
let chain_tip = request.chain_tip();
let latest_blocks = if chain_tip.is_some() {
tx_update.extend(
fetch_txs_with_spks(
self,
+ start_time,
&mut inserted_txs,
request.iter_spks(),
parallel_requests,
tx_update.extend(
fetch_txs_with_txids(
self,
+ start_time,
&mut inserted_txs,
request.iter_txids(),
parallel_requests,
tx_update.extend(
fetch_txs_with_outpoints(
self,
+ start_time,
&mut inserted_txs,
request.iter_outpoints(),
parallel_requests,
/// Refer to [crate-level docs](crate) for more.
async fn fetch_txs_with_keychain_spks<I, S>(
client: &esplora_client::AsyncClient<S>,
+ start_time: u64,
inserted_txs: &mut HashSet<Txid>,
mut keychain_spks: I,
stop_gap: usize,
if inserted_txs.insert(tx.txid) {
update.txs.push(tx.to_tx().into());
}
- insert_anchor_from_status(&mut update, tx.txid, tx.status);
+ insert_anchor_or_seen_at_from_status(&mut update, start_time, tx.txid, tx.status);
insert_prevouts(&mut update, tx.vin);
}
}
/// Refer to [crate-level docs](crate) for more.
async fn fetch_txs_with_spks<I, S>(
client: &esplora_client::AsyncClient<S>,
+ start_time: u64,
inserted_txs: &mut HashSet<Txid>,
spks: I,
parallel_requests: usize,
{
fetch_txs_with_keychain_spks(
client,
+ start_time,
inserted_txs,
spks.into_iter().enumerate().map(|(i, spk)| (i as u32, spk)),
usize::MAX,
/// Refer to [crate-level docs](crate) for more.
async fn fetch_txs_with_txids<I, S>(
client: &esplora_client::AsyncClient<S>,
+ start_time: u64,
inserted_txs: &mut HashSet<Txid>,
txids: I,
parallel_requests: usize,
if inserted_txs.insert(txid) {
update.txs.push(tx_info.to_tx().into());
}
- insert_anchor_from_status(&mut update, txid, tx_info.status);
+ insert_anchor_or_seen_at_from_status(&mut update, start_time, txid, tx_info.status);
insert_prevouts(&mut update, tx_info.vin);
}
}
/// Refer to [crate-level docs](crate) for more.
async fn fetch_txs_with_outpoints<I, S>(
client: &esplora_client::AsyncClient<S>,
+ start_time: u64,
inserted_txs: &mut HashSet<Txid>,
outpoints: I,
parallel_requests: usize,
update.extend(
fetch_txs_with_txids(
client,
+ start_time,
inserted_txs,
outpoints.iter().copied().map(|op| op.txid),
parallel_requests,
missing_txs.push(spend_txid);
}
if let Some(spend_status) = op_status.status {
- insert_anchor_from_status(&mut update, spend_txid, spend_status);
+ insert_anchor_or_seen_at_from_status(
+ &mut update,
+ start_time,
+ spend_txid,
+ spend_status,
+ );
}
}
}
- update
- .extend(fetch_txs_with_txids(client, inserted_txs, missing_txs, parallel_requests).await?);
+ update.extend(
+ fetch_txs_with_txids(
+ client,
+ start_time,
+ inserted_txs,
+ missing_txs,
+ parallel_requests,
+ )
+ .await?,
+ );
Ok(update)
}
use esplora_client::{OutputStatus, Tx};
use std::thread::JoinHandle;
-use crate::{insert_anchor_from_status, insert_prevouts};
+use crate::{insert_anchor_or_seen_at_from_status, insert_prevouts};
/// [`esplora_client::Error`]
pub type Error = Box<esplora_client::Error>;
parallel_requests: usize,
) -> Result<FullScanResponse<K>, Error> {
let mut request = request.into();
+ let start_time = request.start_time();
let chain_tip = request.chain_tip();
let latest_blocks = if chain_tip.is_some() {
let keychain_spks = request.iter_spks(keychain.clone());
let (update, last_active_index) = fetch_txs_with_keychain_spks(
self,
+ start_time,
&mut inserted_txs,
keychain_spks,
stop_gap,
parallel_requests: usize,
) -> Result<SyncResponse, Error> {
let mut request: SyncRequest<I> = request.into();
+ let start_time = request.start_time();
let chain_tip = request.chain_tip();
let latest_blocks = if chain_tip.is_some() {
let mut inserted_txs = HashSet::<Txid>::new();
tx_update.extend(fetch_txs_with_spks(
self,
+ start_time,
&mut inserted_txs,
request.iter_spks(),
parallel_requests,
)?);
tx_update.extend(fetch_txs_with_txids(
self,
+ start_time,
&mut inserted_txs,
request.iter_txids(),
parallel_requests,
)?);
tx_update.extend(fetch_txs_with_outpoints(
self,
+ start_time,
&mut inserted_txs,
request.iter_outpoints(),
parallel_requests,
fn fetch_txs_with_keychain_spks<I: Iterator<Item = Indexed<ScriptBuf>>>(
client: &esplora_client::BlockingClient,
+ start_time: u64,
inserted_txs: &mut HashSet<Txid>,
mut keychain_spks: I,
stop_gap: usize,
if inserted_txs.insert(tx.txid) {
update.txs.push(tx.to_tx().into());
}
- insert_anchor_from_status(&mut update, tx.txid, tx.status);
+ insert_anchor_or_seen_at_from_status(&mut update, start_time, tx.txid, tx.status);
insert_prevouts(&mut update, tx.vin);
}
}
/// Refer to [crate-level docs](crate) for more.
fn fetch_txs_with_spks<I: IntoIterator<Item = ScriptBuf>>(
client: &esplora_client::BlockingClient,
+ start_time: u64,
inserted_txs: &mut HashSet<Txid>,
spks: I,
parallel_requests: usize,
) -> Result<TxUpdate<ConfirmationBlockTime>, Error> {
fetch_txs_with_keychain_spks(
client,
+ start_time,
inserted_txs,
spks.into_iter().enumerate().map(|(i, spk)| (i as u32, spk)),
usize::MAX,
/// Refer to [crate-level docs](crate) for more.
fn fetch_txs_with_txids<I: IntoIterator<Item = Txid>>(
client: &esplora_client::BlockingClient,
+ start_time: u64,
inserted_txs: &mut HashSet<Txid>,
txids: I,
parallel_requests: usize,
if inserted_txs.insert(txid) {
update.txs.push(tx_info.to_tx().into());
}
- insert_anchor_from_status(&mut update, txid, tx_info.status);
+ insert_anchor_or_seen_at_from_status(&mut update, start_time, txid, tx_info.status);
insert_prevouts(&mut update, tx_info.vin);
}
}
/// Refer to [crate-level docs](crate) for more.
fn fetch_txs_with_outpoints<I: IntoIterator<Item = OutPoint>>(
client: &esplora_client::BlockingClient,
+ start_time: u64,
inserted_txs: &mut HashSet<Txid>,
outpoints: I,
parallel_requests: usize,
// TODO: We should maintain a tx cache (like we do with Electrum).
update.extend(fetch_txs_with_txids(
client,
+ start_time,
inserted_txs,
outpoints.iter().map(|op| op.txid),
parallel_requests,
missing_txs.push(spend_txid);
}
if let Some(spend_status) = op_status.status {
- insert_anchor_from_status(&mut update, spend_txid, spend_status);
+ insert_anchor_or_seen_at_from_status(
+ &mut update,
+ start_time,
+ spend_txid,
+ spend_status,
+ );
}
}
}
update.extend(fetch_txs_with_txids(
client,
+ start_time,
inserted_txs,
missing_txs,
parallel_requests,
#[cfg(feature = "async")]
pub use async_ext::*;
-fn insert_anchor_from_status(
+fn insert_anchor_or_seen_at_from_status(
update: &mut TxUpdate<ConfirmationBlockTime>,
+ start_time: u64,
txid: Txid,
status: TxStatus,
) {
confirmation_time: time,
};
update.anchors.insert((anchor, txid));
+ } else {
+ update.seen_ats.insert((txid, start_time));
}
}