]> Untitled Git - bdk/commitdiff
feat(chain)!: Change `TxGraph` to understand evicted-at & last-evicted
author志宇 <hello@evanlinjin.me>
Fri, 24 Jan 2025 08:45:00 +0000 (19:45 +1100)
committer志宇 <hello@evanlinjin.me>
Fri, 14 Mar 2025 02:15:16 +0000 (13:15 +1100)
The evicted-at and last-evicted timestamp informs the `TxGraph` when the
transaction was last deemed as missing (evicted) from the mempool.

The canonicalization algorithm is changed to disregard transactions with
a last-evicted timestamp greater or equal to it's last-seen timestamp.
The exception is when we have a canonical descendant due to rules of
transitivity.

Update rusqlite_impl to persist `last_evicted`.

Also update docs:
* Remove duplicate paragraphs about `ChangeSet`s.
* Add "Canonicalization" section which expands on methods that require
  canonicalization and the associated data types used in the
  canonicalization algorithm.

crates/chain/src/rusqlite_impl.rs
crates/chain/src/tx_graph.rs
crates/chain/tests/test_tx_graph.rs

index 7b39f53c02399e3cd031f40460dc09b68d9c39b1..3bc105d0bcd11a6c042a350031029e2cfa3894fb 100644 (file)
@@ -264,12 +264,20 @@ impl tx_graph::ChangeSet<ConfirmationBlockTime> {
         format!("{add_confirmation_time_column}; {extract_confirmation_time_from_anchor_column}; {drop_anchor_column}")
     }
 
+    /// Get v2 of sqlite [tx_graph::ChangeSet] schema
+    pub fn schema_v2() -> String {
+        format!(
+            "ALTER TABLE {} ADD COLUMN last_evicted INTEGER",
+            Self::TXS_TABLE_NAME,
+        )
+    }
+
     /// Initialize sqlite tables.
     pub fn init_sqlite_tables(db_tx: &rusqlite::Transaction) -> rusqlite::Result<()> {
         migrate_schema(
             db_tx,
             Self::SCHEMA_NAME,
-            &[&Self::schema_v0(), &Self::schema_v1()],
+            &[&Self::schema_v0(), &Self::schema_v1(), &Self::schema_v2()],
         )
     }
 
@@ -280,7 +288,7 @@ impl tx_graph::ChangeSet<ConfirmationBlockTime> {
         let mut changeset = Self::default();
 
         let mut statement = db_tx.prepare(&format!(
-            "SELECT txid, raw_tx, last_seen FROM {}",
+            "SELECT txid, raw_tx, last_seen, last_evicted FROM {}",
             Self::TXS_TABLE_NAME,
         ))?;
         let row_iter = statement.query_map([], |row| {
@@ -288,16 +296,20 @@ impl tx_graph::ChangeSet<ConfirmationBlockTime> {
                 row.get::<_, Impl<bitcoin::Txid>>("txid")?,
                 row.get::<_, Option<Impl<bitcoin::Transaction>>>("raw_tx")?,
                 row.get::<_, Option<u64>>("last_seen")?,
+                row.get::<_, Option<u64>>("last_evicted")?,
             ))
         })?;
         for row in row_iter {
-            let (Impl(txid), tx, last_seen) = row?;
+            let (Impl(txid), tx, last_seen, last_evicted) = row?;
             if let Some(Impl(tx)) = tx {
                 changeset.txs.insert(Arc::new(tx));
             }
             if let Some(last_seen) = last_seen {
                 changeset.last_seen.insert(txid, last_seen);
             }
+            if let Some(last_evicted) = last_evicted {
+                changeset.last_evicted.insert(txid, last_evicted);
+            }
         }
 
         let mut statement = db_tx.prepare(&format!(
@@ -377,6 +389,19 @@ impl tx_graph::ChangeSet<ConfirmationBlockTime> {
             })?;
         }
 
+        let mut statement = db_tx
+            .prepare_cached(&format!(
+                "INSERT INTO {}(txid, last_evicted) VALUES(:txid, :last_evicted) ON CONFLICT(txid) DO UPDATE SET last_evicted=:last_evicted",
+                Self::TXS_TABLE_NAME,
+            ))?;
+        for (&txid, &last_evicted) in &self.last_evicted {
+            let checked_time = last_evicted.to_sql()?;
+            statement.execute(named_params! {
+                ":txid": Impl(txid),
+                ":last_evicted": Some(checked_time),
+            })?;
+        }
+
         let mut statement = db_tx.prepare_cached(&format!(
             "REPLACE INTO {}(txid, vout, value, script) VALUES(:txid, :vout, :value, :script)",
             Self::TXOUTS_TABLE_NAME,
@@ -628,7 +653,7 @@ mod test {
     }
 
     #[test]
-    fn v0_to_v1_schema_migration_is_backward_compatible() -> anyhow::Result<()> {
+    fn v0_to_v2_schema_migration_is_backward_compatible() -> anyhow::Result<()> {
         type ChangeSet = tx_graph::ChangeSet<ConfirmationBlockTime>;
         let mut conn = rusqlite::Connection::open_in_memory()?;
 
@@ -697,13 +722,17 @@ mod test {
             }
         }
 
-        // Apply v1 sqlite schema to tables with data
+        // Apply v1 & v2 sqlite schema to tables with data
         {
             let db_tx = conn.transaction()?;
             migrate_schema(
                 &db_tx,
                 ChangeSet::SCHEMA_NAME,
-                &[&ChangeSet::schema_v0(), &ChangeSet::schema_v1()],
+                &[
+                    &ChangeSet::schema_v0(),
+                    &ChangeSet::schema_v1(),
+                    &ChangeSet::schema_v2(),
+                ],
             )?;
             db_tx.commit()?;
         }
@@ -718,4 +747,43 @@ mod test {
 
         Ok(())
     }
+
+    #[test]
+    fn can_persist_last_evicted() -> anyhow::Result<()> {
+        use bitcoin::hashes::Hash;
+
+        type ChangeSet = tx_graph::ChangeSet<ConfirmationBlockTime>;
+        let mut conn = rusqlite::Connection::open_in_memory()?;
+
+        // Init tables
+        {
+            let db_tx = conn.transaction()?;
+            ChangeSet::init_sqlite_tables(&db_tx)?;
+            db_tx.commit()?;
+        }
+
+        let txid = bitcoin::Txid::all_zeros();
+        let last_evicted = 100;
+
+        // Persist `last_evicted`
+        {
+            let changeset = ChangeSet {
+                last_evicted: [(txid, last_evicted)].into(),
+                ..Default::default()
+            };
+            let db_tx = conn.transaction()?;
+            changeset.persist_to_sqlite(&db_tx)?;
+            db_tx.commit()?;
+        }
+
+        // Load from sqlite should succeed
+        {
+            let db_tx = conn.transaction()?;
+            let changeset = ChangeSet::from_sqlite(&db_tx)?;
+            db_tx.commit()?;
+            assert_eq!(changeset.last_evicted.get(&txid), Some(&last_evicted));
+        }
+
+        Ok(())
+    }
 }
index d40ee49d3cb5cabdc511ed30398c9d26395616b5..092bda7cbeddb8fa9cc61a107a40ba8c3e717047 100644 (file)
 //! documentation for more details), and the timestamp of the last time we saw the transaction as
 //! unconfirmed.
 //!
-//! Conflicting transactions are allowed to coexist within a [`TxGraph`]. This is useful for
-//! identifying and traversing conflicts and descendants of a given transaction. Some [`TxGraph`]
-//! methods only consider transactions that are "canonical" (i.e., in the best chain or in mempool).
-//! We decide which transactions are canonical based on the transaction's anchors and the
-//! `last_seen` (as unconfirmed) timestamp.
+//! # Canonicalization
 //!
-//! The [`ChangeSet`] reports changes made to a [`TxGraph`]; it can be used to either save to
-//! persistent storage, or to be applied to another [`TxGraph`].
+//! Conflicting transactions are allowed to coexist within a [`TxGraph`]. A process called
+//! canonicalization is required to get a conflict-free view of transactions.
+//!
+//! * [`list_canonical_txs`](TxGraph::list_canonical_txs) lists canonical transactions.
+//! * [`filter_chain_txouts`](TxGraph::filter_chain_txouts) filters out canonical outputs from a
+//!     list of outpoints.
+//! * [`filter_chain_unspents`](TxGraph::filter_chain_unspents) filters out canonical unspent
+//!     outputs from a list of outpoints.
+//! * [`balance`](TxGraph::balance) gets the total sum of unspent outputs filtered from a list of
+//!     outpoints.
+//! * [`canonical_iter`](TxGraph::canonical_iter) returns the [`CanonicalIter`] which contains all
+//!     of the canonicalization logic.
+//!
+//! All these methods require a `chain` and `chain_tip` argument. The `chain` must be a
+//! [`ChainOracle`] implementation (such as [`LocalChain`](crate::local_chain::LocalChain)) which
+//! identifies which blocks exist under a given `chain_tip`.
+//!
+//! The canonicalization algorithm uses the following associated data to determine which
+//! transactions have precedence over others:
 //!
-//! Lastly, you can use [`TxAncestors`]/[`TxDescendants`] to traverse ancestors and descendants of
-//! a given transaction, respectively.
+//! * [`Anchor`] - This bit of data represents that a transaction is anchored in a given block. If
+//!     the transaction is anchored in chain of `chain_tip`, or is an ancestor of a transaction
+//!     anchored in chain of `chain_tip`, then the transaction must be canonical.
+//! * `last_seen` - This is the timestamp of when a transaction is last-seen in the mempool. This
+//!     value is updated by [`insert_seen_at`](TxGraph::insert_seen_at) and
+//!     [`apply_update`](TxGraph::apply_update). Transactions that are seen later have higher
+//!     priority than those that are seen earlier. `last_seen` values are transitive. This means
+//!     that the actual `last_seen` value of a transaction is the max of all the `last_seen` values
+//!     from it's descendants.
+//! * `last_evicted` - This is the timestamp of when a transaction last went missing from the
+//!     mempool. If this value is equal to or higher than the transaction's `last_seen` value, then
+//!     it will not be considered canonical.
+//!
+//! # Graph traversal
+//!
+//! You can use [`TxAncestors`]/[`TxDescendants`] to traverse ancestors and descendants of a given
+//! transaction, respectively.
 //!
 //! # Applying changes
 //!
+//! The [`ChangeSet`] reports changes made to a [`TxGraph`]; it can be used to either save to
+//! persistent storage, or to be applied to another [`TxGraph`].
+//!
 //! Methods that change the state of [`TxGraph`] will return [`ChangeSet`]s.
-//! [`ChangeSet`]s can be applied back to a [`TxGraph`] or be used to inform persistent storage
-//! of the changes to [`TxGraph`].
 //!
 //! # Generics
 //!
@@ -122,6 +151,7 @@ impl<A: Ord> From<TxGraph<A>> for TxUpdate<A> {
             .flat_map(|(txid, anchors)| anchors.into_iter().map(move |a| (a, txid)))
             .collect();
         tx_update.seen_ats = graph.last_seen.into_iter().collect();
+        tx_update.evicted_ats = graph.last_evicted.into_iter().collect();
         tx_update
     }
 }
@@ -145,6 +175,7 @@ pub struct TxGraph<A = ConfirmationBlockTime> {
     spends: BTreeMap<OutPoint, HashSet<Txid>>,
     anchors: HashMap<Txid, BTreeSet<A>>,
     last_seen: HashMap<Txid, u64>,
+    last_evicted: HashMap<Txid, u64>,
 
     txs_by_highest_conf_heights: BTreeSet<(u32, Txid)>,
     txs_by_last_seen: BTreeSet<(u64, Txid)>,
@@ -162,6 +193,7 @@ impl<A> Default for TxGraph<A> {
             spends: Default::default(),
             anchors: Default::default(),
             last_seen: Default::default(),
+            last_evicted: Default::default(),
             txs_by_highest_conf_heights: Default::default(),
             txs_by_last_seen: Default::default(),
             empty_outspends: Default::default(),
@@ -715,6 +747,34 @@ impl<A: Anchor> TxGraph<A> {
         changeset
     }
 
+    /// Inserts the given `evicted_at` for `txid` into [`TxGraph`].
+    ///
+    /// The `evicted_at` timestamp represents the last known time when the transaction was observed
+    /// to be missing from the mempool. If `txid` was previously recorded with an earlier
+    /// `evicted_at` value, it is updated only if the new value is greater.
+    pub fn insert_evicted_at(&mut self, txid: Txid, evicted_at: u64) -> ChangeSet<A> {
+        let is_changed = match self.last_evicted.entry(txid) {
+            hash_map::Entry::Occupied(mut e) => {
+                let last_evicted = e.get_mut();
+                let change = *last_evicted < evicted_at;
+                if change {
+                    *last_evicted = evicted_at;
+                }
+                change
+            }
+            hash_map::Entry::Vacant(e) => {
+                e.insert(evicted_at);
+                true
+            }
+        };
+
+        let mut changeset = ChangeSet::<A>::default();
+        if is_changed {
+            changeset.last_evicted.insert(txid, evicted_at);
+        }
+        changeset
+    }
+
     /// Extends this graph with the given `update`.
     ///
     /// The returned [`ChangeSet`] is the set difference between `update` and `self` (transactions that
@@ -733,6 +793,9 @@ impl<A: Anchor> TxGraph<A> {
         for (txid, seen_at) in update.seen_ats {
             changeset.merge(self.insert_seen_at(txid, seen_at));
         }
+        for (txid, evicted_at) in update.evicted_ats {
+            changeset.merge(self.insert_evicted_at(txid, evicted_at));
+        }
         changeset
     }
 
@@ -750,6 +813,7 @@ impl<A: Anchor> TxGraph<A> {
                 .flat_map(|(txid, anchors)| anchors.iter().map(|a| (a.clone(), *txid)))
                 .collect(),
             last_seen: self.last_seen.iter().map(|(&k, &v)| (k, v)).collect(),
+            last_evicted: self.last_evicted.iter().map(|(&k, &v)| (k, v)).collect(),
         }
     }
 
@@ -767,6 +831,9 @@ impl<A: Anchor> TxGraph<A> {
         for (txid, seen_at) in changeset.last_seen {
             let _ = self.insert_seen_at(txid, seen_at);
         }
+        for (txid, evicted_at) in changeset.last_evicted {
+            let _ = self.insert_evicted_at(txid, evicted_at);
+        }
     }
 }
 
@@ -937,9 +1004,17 @@ impl<A: Anchor> TxGraph<A> {
 
     /// List txids by descending last-seen order.
     ///
-    /// Transactions without last-seens are excluded.
-    pub fn txids_by_descending_last_seen(&self) -> impl ExactSizeIterator<Item = (u64, Txid)> + '_ {
-        self.txs_by_last_seen.iter().copied().rev()
+    /// Transactions without last-seens are excluded. Transactions with a last-evicted timestamp
+    /// equal or higher than it's last-seen timestamp are excluded.
+    pub fn txids_by_descending_last_seen(&self) -> impl Iterator<Item = (u64, Txid)> + '_ {
+        self.txs_by_last_seen
+            .iter()
+            .copied()
+            .rev()
+            .filter(|(last_seen, txid)| match self.last_evicted.get(txid) {
+                Some(last_evicted) => last_evicted < last_seen,
+                None => true,
+            })
     }
 
     /// Returns a [`CanonicalIter`].
@@ -1107,6 +1182,9 @@ pub struct ChangeSet<A = ()> {
     pub anchors: BTreeSet<(A, Txid)>,
     /// Added last-seen unix timestamps of transactions.
     pub last_seen: BTreeMap<Txid, u64>,
+    /// Added timestamps of when a transaction is last evicted from the mempool.
+    #[cfg_attr(feature = "serde", serde(default))]
+    pub last_evicted: BTreeMap<Txid, u64>,
 }
 
 impl<A> Default for ChangeSet<A> {
@@ -1116,6 +1194,7 @@ impl<A> Default for ChangeSet<A> {
             txouts: Default::default(),
             anchors: Default::default(),
             last_seen: Default::default(),
+            last_evicted: Default::default(),
         }
     }
 }
@@ -1170,6 +1249,14 @@ impl<A: Ord> Merge for ChangeSet<A> {
                 .filter(|(txid, update_ls)| self.last_seen.get(txid) < Some(update_ls))
                 .collect::<Vec<_>>(),
         );
+        // last_evicted timestamps should only increase
+        self.last_evicted.extend(
+            other
+                .last_evicted
+                .into_iter()
+                .filter(|(txid, update_lm)| self.last_evicted.get(txid) < Some(update_lm))
+                .collect::<Vec<_>>(),
+        );
     }
 
     fn is_empty(&self) -> bool {
@@ -1177,6 +1264,7 @@ impl<A: Ord> Merge for ChangeSet<A> {
             && self.txouts.is_empty()
             && self.anchors.is_empty()
             && self.last_seen.is_empty()
+            && self.last_evicted.is_empty()
     }
 }
 
@@ -1196,6 +1284,7 @@ impl<A: Ord> ChangeSet<A> {
                 self.anchors.into_iter().map(|(a, txid)| (f(a), txid)),
             ),
             last_seen: self.last_seen,
+            last_evicted: self.last_evicted,
         }
     }
 }
index eef5e223981f3a4f7910cc592677f466464592b8..4461478213a2cbbff0c59ed13db74d1831a34f3a 100644 (file)
@@ -115,7 +115,8 @@ fn insert_txouts() {
             txs: [Arc::new(update_tx.clone())].into(),
             txouts: update_ops.clone().into(),
             anchors: [(conf_anchor, update_tx.compute_txid()),].into(),
-            last_seen: [(hash!("tx2"), 1000000)].into()
+            last_seen: [(hash!("tx2"), 1000000)].into(),
+            last_evicted: [].into(),
         }
     );
 
@@ -168,7 +169,8 @@ fn insert_txouts() {
             txs: [Arc::new(update_tx.clone())].into(),
             txouts: update_ops.into_iter().chain(original_ops).collect(),
             anchors: [(conf_anchor, update_tx.compute_txid()),].into(),
-            last_seen: [(hash!("tx2"), 1000000)].into()
+            last_seen: [(hash!("tx2"), 1000000)].into(),
+            last_evicted: [].into(),
         }
     );
 }