]> Untitled Git - bdk/commitdiff
refactor(rpc)!: update `mempool` interface and test code
authorvalued mammal <valuedmammal@protonmail.com>
Sat, 1 Feb 2025 15:15:55 +0000 (10:15 -0500)
committerWei Chen <wzc110@gmail.com>
Thu, 1 May 2025 14:41:55 +0000 (14:41 +0000)
crates/bitcoind_rpc/src/lib.rs
crates/bitcoind_rpc/tests/test_emitter.rs
examples/example_bitcoind_rpc_polling/src/main.rs

index 3fa17ef1913c84478475a6432e0d913fbd3d378d..7e363560ba1d469a6351d9719a1767c8a74de849 100644 (file)
@@ -10,8 +10,9 @@
 #![warn(missing_docs)]
 
 use bdk_core::{BlockId, CheckPoint};
-use bitcoin::{block::Header, Block, BlockHash, Transaction};
+use bitcoin::{Block, BlockHash, Transaction, Txid};
 use bitcoincore_rpc::bitcoincore_rpc_json;
+use std::collections::HashSet;
 
 pub mod bip158;
 
@@ -43,6 +44,16 @@ pub struct Emitter<'c, C> {
     /// The last emitted block during our last mempool emission. This is used to determine whether
     /// there has been a reorg since our last mempool emission.
     last_mempool_tip: Option<u32>,
+
+    /// A set of txids currently assumed to still be in the mempool.
+    ///
+    /// This is used to detect mempool evictions by comparing the set against the latest mempool
+    /// snapshot from bitcoind. Any txid in this set that is missing from the snapshot is considered
+    /// evicted.
+    ///
+    /// When the emitter emits a block, confirmed txids are removed from this set. This prevents
+    /// confirmed transactions from being mistakenly marked with an `evicted_at` timestamp.
+    expected_mempool_txids: HashSet<Txid>,
 }
 
 impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
@@ -53,7 +64,15 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
     ///
     /// `start_height` starts emission from a given height (if there are no conflicts with the
     /// original chain).
-    pub fn new(client: &'c C, last_cp: CheckPoint, start_height: u32) -> Self {
+    ///
+    /// `expected_mempool_txids` is the initial set of unconfirmed txids provided by the wallet.
+    /// This allows the [`Emitter`] to inform the wallet about relevant mempool evictions.
+    pub fn new(
+        client: &'c C,
+        last_cp: CheckPoint,
+        start_height: u32,
+        expected_mempool_txids: HashSet<Txid>,
+    ) -> Self {
         Self {
             client,
             start_height,
@@ -61,10 +80,18 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
             last_block: None,
             last_mempool_time: 0,
             last_mempool_tip: None,
+            expected_mempool_txids,
         }
     }
 
-    /// Emit mempool transactions, alongside their first-seen unix timestamps.
+    /// Emit mempool transactions and any evicted [`Txid`]s.
+    ///
+    /// This method returns a [`MempoolEvent`] containing the full transactions (with their
+    /// first-seen unix timestamps) that were emitted, and [`MempoolEvent::evicted_txids`] which are
+    /// any [`Txid`]s which were previously seen in the mempool and are now missing. Evicted txids
+    /// are only reported once the emitter’s checkpoint matches the RPC’s best block in both height
+    /// and hash. Until `next_block()` advances the checkpoint to tip, `mempool()` will always
+    /// return an empty `evicted_txids` set.
     ///
     /// This method emits each transaction only once, unless we cannot guarantee the transaction's
     /// ancestors are already emitted.
@@ -74,7 +101,7 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
     /// tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to block
     /// of height `h-1`, we want to re-emit this transaction until the receiver has seen the block
     /// at height `h`.
-    pub fn mempool(&mut self) -> Result<Vec<(Transaction, u64)>, bitcoincore_rpc::Error> {
+    pub fn mempool(&mut self) -> Result<MempoolEvent, bitcoincore_rpc::Error> {
         let client = self.client;
 
         // This is the emitted tip height during the last mempool emission.
@@ -84,6 +111,38 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
             // `start_height` has been emitted.
             .unwrap_or(self.start_height.saturating_sub(1));
 
+        // Loop to make sure that the fetched mempool content and the fetched tip are consistent
+        // with one another.
+        let (raw_mempool, raw_mempool_txids, rpc_height, rpc_block_hash) = loop {
+            // Determine if height and hash matches the best block from the RPC. Evictions are deferred
+            // if we are not at the best block.
+            let height = client.get_block_count()?;
+            let hash = client.get_block_hash(height)?;
+
+            // Get the raw mempool result from the RPC client which will be used to determine if any
+            // transactions have been evicted.
+            let mp = client.get_raw_mempool_verbose()?;
+            let mp_txids: HashSet<Txid> = mp.keys().copied().collect();
+
+            if height == client.get_block_count()? && hash == client.get_block_hash(height)? {
+                break (mp, mp_txids, height, hash);
+            }
+        };
+
+        let at_tip =
+            rpc_height == self.last_cp.height() as u64 && rpc_block_hash == self.last_cp.hash();
+
+        // If at tip, any expected txid missing from raw mempool is considered evicted;
+        // if not at tip, we don't evict anything.
+        let evicted_txids: HashSet<Txid> = if at_tip {
+            self.expected_mempool_txids
+                .difference(&raw_mempool_txids)
+                .copied()
+                .collect()
+        } else {
+            HashSet::new()
+        };
+
         // Mempool txs come with a timestamp of when the tx is introduced to the mempool. We keep
         // track of the latest mempool tx's timestamp to determine whether we have seen a tx
         // before. `prev_mempool_time` is the previous timestamp and `last_time` records what will
@@ -91,8 +150,7 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
         let prev_mempool_time = self.last_mempool_time;
         let mut latest_time = prev_mempool_time;
 
-        let txs_to_emit = client
-            .get_raw_mempool_verbose()?
+        let new_txs = raw_mempool
             .into_iter()
             .filter_map({
                 let latest_time = &mut latest_time;
@@ -101,25 +159,25 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
                     if tx_time > *latest_time {
                         *latest_time = tx_time;
                     }
-
-                    // Avoid emitting transactions that are already emitted if we can guarantee
-                    // blocks containing ancestors are already emitted. The bitcoind rpc interface
-                    // provides us with the block height that the tx is introduced to the mempool.
-                    // If we have already emitted the block of height, we can assume that all
-                    // ancestor txs have been processed by the receiver.
+                    // Best-effort check to avoid re-emitting transactions we've already emitted.
+                    //
+                    // Complete suppression isn't possible, since a transaction may spend outputs
+                    // owned by the wallet. To determine if such a transaction is relevant, we must
+                    // have already seen its ancestor(s) that contain the spent prevouts.
+                    //
+                    // Fortunately, bitcoind provides the block height at which the transaction
+                    // entered the mempool. If we've already emitted that block height, we can
+                    // reasonably assume the receiver has seen all ancestor transactions.
                     let is_already_emitted = tx_time <= prev_mempool_time;
                     let is_within_height = tx_entry.height <= prev_mempool_tip as _;
                     if is_already_emitted && is_within_height {
                         return None;
                     }
-
                     let tx = match client.get_raw_transaction(&txid, None) {
                         Ok(tx) => tx,
-                        // the tx is confirmed or evicted since `get_raw_mempool_verbose`
                         Err(err) if err.is_not_found_error() => return None,
                         Err(err) => return Some(Err(err)),
                     };
-
                     Some(Ok((tx, tx_time as u64)))
                 }
             })
@@ -128,22 +186,56 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
         self.last_mempool_time = latest_time;
         self.last_mempool_tip = Some(self.last_cp.height());
 
-        Ok(txs_to_emit)
-    }
+        // If at tip, we replace `expected_mempool_txids` with just the new txids. Otherwise, we’re
+        // still catching up to the tip and keep accumulating.
+        if at_tip {
+            self.expected_mempool_txids = new_txs.iter().map(|(tx, _)| tx.compute_txid()).collect();
+        } else {
+            self.expected_mempool_txids
+                .extend(new_txs.iter().map(|(tx, _)| tx.compute_txid()));
+        }
 
-    /// Emit the next block height and header (if any).
-    pub fn next_header(&mut self) -> Result<Option<BlockEvent<Header>>, bitcoincore_rpc::Error> {
-        Ok(poll(self, |hash| self.client.get_block_header(hash))?
-            .map(|(checkpoint, block)| BlockEvent { block, checkpoint }))
+        Ok(MempoolEvent {
+            new_txs,
+            evicted_txids,
+            latest_update_time: latest_time as u64,
+        })
     }
 
     /// Emit the next block height and block (if any).
     pub fn next_block(&mut self) -> Result<Option<BlockEvent<Block>>, bitcoincore_rpc::Error> {
-        Ok(poll(self, |hash| self.client.get_block(hash))?
-            .map(|(checkpoint, block)| BlockEvent { block, checkpoint }))
+        if let Some((checkpoint, block)) = poll(self, |hash| self.client.get_block(hash))? {
+            // Stop tracking unconfirmed transactions that have been confirmed in this block.
+            for tx in &block.txdata {
+                self.expected_mempool_txids.remove(&tx.compute_txid());
+            }
+            return Ok(Some(BlockEvent { block, checkpoint }));
+        }
+        Ok(None)
     }
 }
 
+/// A new emission from mempool.
+#[derive(Debug)]
+pub struct MempoolEvent {
+    /// Unemitted transactions or transactions with ancestors that are unseen by the receiver.
+    ///
+    /// To understand the second condition, consider a receiver which filters transactions based on
+    /// whether it alters the UTXO set of tracked script pubkeys. If an emitted mempool transaction
+    /// spends a tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to
+    /// block of height `h-1`, we want to re-emit this transaction until the receiver has seen the
+    /// block at height `h`.
+    pub new_txs: Vec<(Transaction, u64)>,
+
+    /// [`Txid`]s of all transactions that have been evicted from mempool.
+    pub evicted_txids: HashSet<Txid>,
+
+    /// The latest timestamp of when a transaction entered the mempool.
+    ///
+    /// This is useful for setting the timestamp for evicted transactions.
+    pub latest_update_time: u64,
+}
+
 /// A newly emitted block from [`Emitter`].
 #[derive(Debug)]
 pub struct BlockEvent<B> {
@@ -329,3 +421,77 @@ impl BitcoindRpcErrorExt for bitcoincore_rpc::Error {
         }
     }
 }
+
+#[cfg(test)]
+mod test {
+    use crate::{bitcoincore_rpc::RpcApi, Emitter};
+    use bdk_bitcoind_rpc::bitcoincore_rpc::bitcoin::Txid;
+    use bdk_chain::local_chain::LocalChain;
+    use bdk_testenv::{anyhow, TestEnv};
+    use bitcoin::{hashes::Hash, Address, Amount, ScriptBuf, WScriptHash};
+    use std::collections::HashSet;
+
+    #[test]
+    fn test_expected_mempool_txids_accumulate_and_remove() -> anyhow::Result<()> {
+        let env = TestEnv::new()?;
+        let chain = LocalChain::from_genesis_hash(env.rpc_client().get_block_hash(0)?).0;
+        let chain_tip = chain.tip();
+        let mut emitter = Emitter::new(env.rpc_client(), chain_tip.clone(), 1, HashSet::new());
+
+        env.mine_blocks(100, None)?;
+        while emitter.next_block()?.is_some() {}
+
+        let spk_to_track = ScriptBuf::new_p2wsh(&WScriptHash::all_zeros());
+        let addr_to_track = Address::from_script(&spk_to_track, bitcoin::Network::Regtest)?;
+        let mut mempool_txids = HashSet::new();
+
+        // Send a tx at different heights and ensure txs are accumulating in expected_mempool_txids.
+        for _ in 0..10 {
+            let sent_txid = env.send(&addr_to_track, Amount::from_sat(1_000))?;
+            mempool_txids.insert(sent_txid);
+            emitter.mempool()?;
+            env.mine_blocks(1, None)?;
+
+            for txid in &mempool_txids {
+                assert!(
+                    emitter.expected_mempool_txids.contains(txid),
+                    "Expected txid {:?} missing",
+                    txid
+                );
+            }
+        }
+
+        // Process each block and check that confirmed txids are removed from from
+        // expected_mempool_txids.
+        while let Some(block_event) = emitter.next_block()? {
+            let confirmed_txids: HashSet<Txid> = block_event
+                .block
+                .txdata
+                .iter()
+                .map(|tx| tx.compute_txid())
+                .collect();
+            mempool_txids = mempool_txids
+                .difference(&confirmed_txids)
+                .copied()
+                .collect::<HashSet<_>>();
+            for txid in confirmed_txids {
+                assert!(
+                    !emitter.expected_mempool_txids.contains(&txid),
+                    "Expected txid {:?} should have been removed",
+                    txid
+                );
+            }
+            for txid in &mempool_txids {
+                assert!(
+                    emitter.expected_mempool_txids.contains(txid),
+                    "Expected txid {:?} missing",
+                    txid
+                );
+            }
+        }
+
+        assert!(emitter.expected_mempool_txids.is_empty());
+
+        Ok(())
+    }
+}
index 5753b82f879d89fe356f66a472c38406bb9a10c2..0c30f5a803b9c85587c1ab9f1cf7e41f199d8e12 100644 (file)
@@ -1,4 +1,4 @@
-use std::collections::{BTreeMap, BTreeSet};
+use std::collections::{BTreeMap, BTreeSet, HashSet};
 
 use bdk_bitcoind_rpc::Emitter;
 use bdk_chain::{
@@ -22,7 +22,7 @@ pub fn test_sync_local_chain() -> anyhow::Result<()> {
     let env = TestEnv::new()?;
     let network_tip = env.rpc_client().get_block_count()?;
     let (mut local_chain, _) = LocalChain::from_genesis_hash(env.rpc_client().get_block_hash(0)?);
-    let mut emitter = Emitter::new(env.rpc_client(), local_chain.tip(), 0);
+    let mut emitter = Emitter::new(env.rpc_client(), local_chain.tip(), 0, HashSet::new());
 
     // Mine some blocks and return the actual block hashes.
     // Because initializing `ElectrsD` already mines some blocks, we must include those too when
@@ -156,7 +156,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
         index
     });
 
-    let emitter = &mut Emitter::new(env.rpc_client(), chain.tip(), 0);
+    let emitter = &mut Emitter::new(env.rpc_client(), chain.tip(), 0, HashSet::new());
 
     while let Some(emission) = emitter.next_block()? {
         let height = emission.block_height();
@@ -189,7 +189,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
         assert!(emitter.next_block()?.is_none());
 
         let mempool_txs = emitter.mempool()?;
-        let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs);
+        let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs.new_txs);
         assert_eq!(
             indexed_additions
                 .tx_graph
@@ -252,14 +252,15 @@ fn ensure_block_emitted_after_reorg_is_at_reorg_height() -> anyhow::Result<()> {
             hash: env.rpc_client().get_block_hash(0)?,
         }),
         EMITTER_START_HEIGHT as _,
+        HashSet::new(),
     );
 
     env.mine_blocks(CHAIN_TIP_HEIGHT, None)?;
-    while emitter.next_header()?.is_some() {}
+    while emitter.next_block()?.is_some() {}
 
     for reorg_count in 1..=10 {
         let replaced_blocks = env.reorg_empty_blocks(reorg_count)?;
-        let next_emission = emitter.next_header()?.expect("must emit block after reorg");
+        let next_emission = emitter.next_block()?.expect("must emit block after reorg");
         assert_eq!(
             (
                 next_emission.block_height() as usize,
@@ -268,7 +269,7 @@ fn ensure_block_emitted_after_reorg_is_at_reorg_height() -> anyhow::Result<()> {
             replaced_blocks[0],
             "block emitted after reorg should be at the reorg height"
         );
-        while emitter.next_header()?.is_some() {}
+        while emitter.next_block()?.is_some() {}
     }
 
     Ok(())
@@ -332,6 +333,7 @@ fn tx_can_become_unconfirmed_after_reorg() -> anyhow::Result<()> {
             hash: env.rpc_client().get_block_hash(0)?,
         }),
         0,
+        HashSet::new(),
     );
 
     // setup addresses
@@ -423,6 +425,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
             hash: env.rpc_client().get_block_hash(0)?,
         }),
         0,
+        HashSet::new(),
     );
 
     // mine blocks and sync up emitter
@@ -431,7 +434,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
         .get_new_address(None, None)?
         .assume_checked();
     env.mine_blocks(BLOCKS_TO_MINE, Some(addr.clone()))?;
-    while emitter.next_header()?.is_some() {}
+    while emitter.next_block()?.is_some() {}
 
     // have some random txs in mempool
     let exp_txids = (0..MEMPOOL_TX_COUNT)
@@ -441,6 +444,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
     // the first emission should include all transactions
     let emitted_txids = emitter
         .mempool()?
+        .new_txs
         .into_iter()
         .map(|(tx, _)| tx.compute_txid())
         .collect::<BTreeSet<Txid>>();
@@ -451,7 +455,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
 
     // second emission should be empty
     assert!(
-        emitter.mempool()?.is_empty(),
+        emitter.mempool()?.new_txs.is_empty(),
         "second emission should be empty"
     );
 
@@ -459,9 +463,9 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
     for _ in 0..BLOCKS_TO_MINE {
         env.mine_empty_block()?;
     }
-    while emitter.next_header()?.is_some() {}
+    while emitter.next_block()?.is_some() {}
     assert!(
-        emitter.mempool()?.is_empty(),
+        emitter.mempool()?.new_txs.is_empty(),
         "third emission, after chain tip is extended, should also be empty"
     );
 
@@ -488,6 +492,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
             hash: env.rpc_client().get_block_hash(0)?,
         }),
         0,
+        HashSet::new(),
     );
 
     // mine blocks to get initial balance, sync emitter up to tip
@@ -496,7 +501,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
         .get_new_address(None, None)?
         .assume_checked();
     env.mine_blocks(PREMINE_COUNT, Some(addr.clone()))?;
-    while emitter.next_header()?.is_some() {}
+    while emitter.next_block()?.is_some() {}
 
     // mine blocks to introduce txs to mempool at different heights
     let tx_introductions = (0..MEMPOOL_TX_COUNT)
@@ -510,6 +515,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
     assert_eq!(
         emitter
             .mempool()?
+            .new_txs
             .into_iter()
             .map(|(tx, _)| tx.compute_txid())
             .collect::<BTreeSet<_>>(),
@@ -519,6 +525,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
     assert_eq!(
         emitter
             .mempool()?
+            .new_txs
             .into_iter()
             .map(|(tx, _)| tx.compute_txid())
             .collect::<BTreeSet<_>>(),
@@ -528,7 +535,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
 
     // At this point, the emitter has seen all mempool transactions. It should only re-emit those
     // that have introduction heights less than the emitter's last-emitted block tip.
-    while let Some(emission) = emitter.next_header()? {
+    while let Some(emission) = emitter.next_block()? {
         let height = emission.block_height();
         // We call `mempool()` twice.
         // The second call (at height `h`) should skip the tx introduced at height `h`.
@@ -539,6 +546,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
                 .collect::<BTreeSet<_>>();
             let emitted_txids = emitter
                 .mempool()?
+                .new_txs
                 .into_iter()
                 .map(|(tx, _)| tx.compute_txid())
                 .collect::<BTreeSet<_>>();
@@ -576,6 +584,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
             hash: env.rpc_client().get_block_hash(0)?,
         }),
         0,
+        HashSet::new(),
     );
 
     // mine blocks to get initial balance
@@ -593,10 +602,11 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
 
     // sync emitter to tip, first mempool emission should include all txs (as we haven't emitted
     // from the mempool yet)
-    while emitter.next_header()?.is_some() {}
+    while emitter.next_block()?.is_some() {}
     assert_eq!(
         emitter
             .mempool()?
+            .new_txs
             .into_iter()
             .map(|(tx, _)| tx.compute_txid())
             .collect::<BTreeSet<_>>(),
@@ -625,13 +635,14 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
             .collect::<BTreeMap<_, _>>());
 
         // `next_header` emits the replacement block of the reorg
-        if let Some(emission) = emitter.next_header()? {
+        if let Some(emission) = emitter.next_block()? {
             let height = emission.block_height();
 
             // the mempool emission (that follows the first block emission after reorg) should only
             // include mempool txs introduced at reorg height or greater
             let mempool = emitter
                 .mempool()?
+                .new_txs
                 .into_iter()
                 .map(|(tx, _)| tx.compute_txid())
                 .collect::<BTreeSet<_>>();
@@ -647,6 +658,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
 
             let mempool = emitter
                 .mempool()?
+                .new_txs
                 .into_iter()
                 .map(|(tx, _)| tx.compute_txid())
                 .collect::<BTreeSet<_>>();
@@ -670,7 +682,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
         }
 
         // sync emitter to tip
-        while emitter.next_header()?.is_some() {}
+        while emitter.next_block()?.is_some() {}
     }
 
     Ok(())
@@ -700,18 +712,23 @@ fn no_agreement_point() -> anyhow::Result<()> {
             hash: env.rpc_client().get_block_hash(0)?,
         }),
         (PREMINE_COUNT - 2) as u32,
+        HashSet::new(),
     );
 
     // mine 101 blocks
     env.mine_blocks(PREMINE_COUNT, None)?;
 
     // emit block 99a
-    let block_header_99a = emitter.next_header()?.expect("block 99a header").block;
+    let block_header_99a = emitter
+        .next_block()?
+        .expect("block 99a header")
+        .block
+        .header;
     let block_hash_99a = block_header_99a.block_hash();
     let block_hash_98a = block_header_99a.prev_blockhash;
 
     // emit block 100a
-    let block_header_100a = emitter.next_header()?.expect("block 100a header").block;
+    let block_header_100a = emitter.next_block()?.expect("block 100a header").block;
     let block_hash_100a = block_header_100a.block_hash();
 
     // get hash for block 101a
@@ -726,7 +743,11 @@ fn no_agreement_point() -> anyhow::Result<()> {
     env.mine_blocks(3, None)?;
 
     // emit block header 99b
-    let block_header_99b = emitter.next_header()?.expect("block 99b header").block;
+    let block_header_99b = emitter
+        .next_block()?
+        .expect("block 99b header")
+        .block
+        .header;
     let block_hash_99b = block_header_99b.block_hash();
     let block_hash_98b = block_header_99b.prev_blockhash;
 
@@ -735,3 +756,110 @@ fn no_agreement_point() -> anyhow::Result<()> {
 
     Ok(())
 }
+
+/// Validates that when an unconfirmed transaction is double-spent (and thus evicted from the
+/// mempool), the emitter reports it in `evicted_txids`, and after inserting that eviction into the
+/// graph it no longer appears in the set of canonical transactions.
+///
+/// 1. Broadcast a first tx (tx1) and confirm it arrives in unconfirmed set.
+/// 2. Double-spend tx1 with tx1b and verify `mempool()` reports tx1 as evicted.
+/// 3. Insert the eviction into the graph and assert tx1 is no longer canonical.
+#[test]
+fn test_expect_tx_evicted() -> anyhow::Result<()> {
+    use bdk_bitcoind_rpc::bitcoincore_rpc::bitcoin;
+    use bdk_bitcoind_rpc::bitcoincore_rpc::bitcoincore_rpc_json::CreateRawTransactionInput;
+    use bdk_chain::miniscript;
+    use bdk_chain::spk_txout::SpkTxOutIndex;
+    use bitcoin::constants::genesis_block;
+    use bitcoin::secp256k1::Secp256k1;
+    use bitcoin::Network;
+    use std::collections::HashMap;
+    let env = TestEnv::new()?;
+
+    let s = bdk_testenv::utils::DESCRIPTORS[0];
+    let desc = miniscript::Descriptor::parse_descriptor(&Secp256k1::new(), s)
+        .unwrap()
+        .0;
+    let spk = desc.at_derivation_index(0)?.script_pubkey();
+
+    let mut chain = LocalChain::from_genesis_hash(genesis_block(Network::Regtest).block_hash()).0;
+    let chain_tip = chain.tip().block_id();
+
+    let mut index = SpkTxOutIndex::default();
+    index.insert_spk((), spk.clone());
+    let mut graph = IndexedTxGraph::<BlockId, _>::new(index);
+
+    // Receive tx1.
+    let _ = env.mine_blocks(100, None)?;
+    let txid_1 = env.send(
+        &Address::from_script(&spk, Network::Regtest)?,
+        Amount::ONE_BTC,
+    )?;
+
+    let mut emitter = Emitter::new(env.rpc_client(), chain.tip(), 1, HashSet::from([txid_1]));
+    while let Some(emission) = emitter.next_block()? {
+        let height = emission.block_height();
+        chain.apply_update(CheckPoint::from_header(&emission.block.header, height))?;
+    }
+
+    let changeset = graph.batch_insert_unconfirmed(emitter.mempool()?.new_txs);
+    assert!(changeset
+        .tx_graph
+        .txs
+        .iter()
+        .any(|tx| tx.compute_txid() == txid_1));
+
+    // Double spend tx1.
+
+    // Get `prevout` from core.
+    let core = env.rpc_client();
+    let tx1 = &core.get_raw_transaction(&txid_1, None)?;
+    let txin = &tx1.input[0];
+    let op = txin.previous_output;
+
+    // Create `tx1b` using the previous output from tx1.
+    let utxo = CreateRawTransactionInput {
+        txid: op.txid,
+        vout: op.vout,
+        sequence: None,
+    };
+    let addr = core.get_new_address(None, None)?.assume_checked();
+    let tx = core.create_raw_transaction(
+        &[utxo],
+        &HashMap::from([(addr.to_string(), Amount::from_btc(49.99)?)]),
+        None,
+        None,
+    )?;
+    let res = core.sign_raw_transaction_with_wallet(&tx, None, None)?;
+    let tx1b = res.transaction()?;
+
+    // Send the tx.
+    let _txid_2 = core.send_raw_transaction(&tx1b)?;
+
+    // Retrieve the expected unconfirmed txids and spks from the graph.
+    let exp_spk_txids = graph
+        .list_expected_spk_txids(&chain, chain_tip, ..)
+        .collect::<Vec<_>>();
+    assert_eq!(exp_spk_txids, vec![(spk, txid_1)]);
+
+    // Check that mempool emission contains evicted txid.
+    let mempool_event = emitter.mempool()?;
+    assert!(mempool_event.evicted_txids.contains(&txid_1));
+
+    // Update graph with evicted tx.
+    for txid in mempool_event.evicted_txids {
+        if graph.graph().get_tx_node(txid).is_some() {
+            let _ = graph.insert_evicted_at(txid, mempool_event.latest_update_time);
+        }
+    }
+
+    let canonical_txids = graph
+        .graph()
+        .list_canonical_txs(&chain, chain_tip, CanonicalizationParams::default())
+        .map(|tx| tx.tx_node.compute_txid())
+        .collect::<Vec<_>>();
+    // tx1 should no longer be canonical.
+    assert!(!canonical_txids.contains(&txid_1));
+
+    Ok(())
+}
index 5eb3d3eb1902d136f36b648bef6b7a4b31713d64..3684edbfcbcf3f762cb0865b8b65497d95d82be1 100644 (file)
@@ -1,4 +1,5 @@
 use std::{
+    collections::HashSet,
     path::PathBuf,
     sync::{
         atomic::{AtomicBool, Ordering},
@@ -11,10 +12,7 @@ use bdk_bitcoind_rpc::{
     bitcoincore_rpc::{Auth, Client, RpcApi},
     Emitter,
 };
-use bdk_chain::{
-    bitcoin::{Block, Transaction},
-    local_chain, CanonicalizationParams, Merge,
-};
+use bdk_chain::{bitcoin::Block, local_chain, CanonicalizationParams, Merge};
 use example_cli::{
     anyhow,
     clap::{self, Args, Subcommand},
@@ -36,7 +34,7 @@ const DB_COMMIT_DELAY: Duration = Duration::from_secs(60);
 #[derive(Debug)]
 enum Emission {
     Block(bdk_bitcoind_rpc::BlockEvent<Block>),
-    Mempool(Vec<(Transaction, u64)>),
+    Mempool(bdk_bitcoind_rpc::MempoolEvent),
     Tip(u32),
 }
 
@@ -141,7 +139,7 @@ fn main() -> anyhow::Result<()> {
 
             let chain_tip = chain.lock().unwrap().tip();
             let rpc_client = rpc_args.new_client()?;
-            let mut emitter = Emitter::new(&rpc_client, chain_tip, fallback_height);
+            let mut emitter = Emitter::new(&rpc_client, chain_tip, fallback_height, HashSet::new());
             let mut db_stage = ChangeSet::default();
 
             let mut last_db_commit = Instant::now();
@@ -205,7 +203,7 @@ fn main() -> anyhow::Result<()> {
             let graph_changeset = graph
                 .lock()
                 .unwrap()
-                .batch_insert_relevant_unconfirmed(mempool_txs);
+                .batch_insert_relevant_unconfirmed(mempool_txs.new_txs);
             {
                 let db = &mut *db.lock().unwrap();
                 db_stage.merge(ChangeSet {
@@ -233,7 +231,8 @@ fn main() -> anyhow::Result<()> {
             let (tx, rx) = std::sync::mpsc::sync_channel::<Emission>(CHANNEL_BOUND);
             let emission_jh = std::thread::spawn(move || -> anyhow::Result<()> {
                 let rpc_client = rpc_args.new_client()?;
-                let mut emitter = Emitter::new(&rpc_client, last_cp, fallback_height);
+                let mut emitter =
+                    Emitter::new(&rpc_client, last_cp, fallback_height, HashSet::new());
 
                 let mut block_count = rpc_client.get_block_count()? as u32;
                 tx.send(Emission::Tip(block_count))?;
@@ -288,7 +287,13 @@ fn main() -> anyhow::Result<()> {
                         (chain_changeset, graph_changeset)
                     }
                     Emission::Mempool(mempool_txs) => {
-                        let graph_changeset = graph.batch_insert_relevant_unconfirmed(mempool_txs);
+                        let mut graph_changeset =
+                            graph.batch_insert_relevant_unconfirmed(mempool_txs.new_txs.clone());
+                        for txid in mempool_txs.evicted_txids {
+                            graph_changeset.merge(
+                                graph.insert_evicted_at(txid, mempool_txs.latest_update_time),
+                            );
+                        }
                         (local_chain::ChangeSet::default(), graph_changeset)
                     }
                     Emission::Tip(h) => {