]> Untitled Git - bdk/commitdiff
fix(bitcoind_rpc)!: Simplify emitter
author志宇 <hello@evanlinjin.me>
Thu, 3 Jul 2025 23:58:53 +0000 (23:58 +0000)
committer志宇 <hello@evanlinjin.me>
Thu, 10 Jul 2025 02:13:09 +0000 (02:13 +0000)
Instead of having an avoid-reemission logic based on timestamps returned
by bitcoind RPC, we wrap all transactions in `Arc` and always emit the
entire mempool. This makes emission cheap while avoiding the flawed
avoid-reemission logic.

crates/bitcoind_rpc/src/lib.rs
crates/bitcoind_rpc/tests/test_emitter.rs
examples/example_bitcoind_rpc_polling/src/main.rs

index 250ff430691dfcc2e789bb461aa254030da8b9aa..d72c22c7a60ffd24310fea435232f0f55f3f72a8 100644 (file)
 use bdk_core::{BlockId, CheckPoint};
 use bitcoin::{Block, BlockHash, Transaction, Txid};
 use bitcoincore_rpc::{bitcoincore_rpc_json, RpcApi};
-use std::{collections::HashSet, ops::Deref};
+use std::{
+    collections::{HashMap, HashSet},
+    ops::Deref,
+    sync::Arc,
+};
 
 pub mod bip158;
 
@@ -37,30 +41,23 @@ pub struct Emitter<C> {
     /// gives us an opportunity to re-fetch this result.
     last_block: Option<bitcoincore_rpc_json::GetBlockResult>,
 
-    /// The latest first-seen epoch of emitted mempool transactions. This is used to determine
-    /// whether a mempool transaction is already emitted.
-    last_mempool_time: usize,
-
-    /// The last emitted block during our last mempool emission. This is used to determine whether
-    /// there has been a reorg since our last mempool emission.
-    last_mempool_tip: Option<u32>,
-
-    /// A set of txids currently assumed to still be in the mempool.
+    /// The last snapshot of mempool transactions.
     ///
-    /// This is used to detect mempool evictions by comparing the set against the latest mempool
-    /// snapshot from bitcoind. Any txid in this set that is missing from the snapshot is
-    /// considered evicted.
+    /// This is used to detect mempool evictions and as a cache for transactions to emit.
     ///
-    /// When the emitter emits a block, confirmed txids are removed from this set. This prevents
-    /// confirmed transactions from being mistakenly marked with an `evicted_at` timestamp.
-    expected_mempool_txids: HashSet<Txid>,
+    /// For mempool evictions, the latest call to `getrawmempool` is compared against this field.
+    /// Any transaction that is missing from this field is considered evicted. The exception is if
+    /// the transaction is confirmed into a block - therefore, we only emit evictions when we are
+    /// sure the tip block is already emitted. When a block is emitted, the transactions in the
+    /// block are removed from this field.
+    mempool_snapshot: HashMap<Txid, Arc<Transaction>>,
 }
 
 /// Indicates that there are no initially expected mempool transactions.
 ///
 /// Pass this to the `expected_mempool_txids` field of [`Emitter::new`] when the wallet is known
 /// to start empty (i.e. with no unconfirmed transactions).
-pub const NO_EXPECTED_MEMPOOL_TXIDS: core::iter::Empty<Txid> = core::iter::empty();
+pub const NO_EXPECTED_MEMPOOL_TXIDS: core::iter::Empty<Arc<Transaction>> = core::iter::empty();
 
 impl<C> Emitter<C>
 where
@@ -75,23 +72,27 @@ where
     /// `start_height` starts emission from a given height (if there are no conflicts with the
     /// original chain).
     ///
-    /// `expected_mempool_txids` is the initial set of unconfirmed txids provided by the wallet.
-    /// This allows the [`Emitter`] to inform the wallet about relevant mempool evictions. If it is
-    /// known that the wallet is empty, [`NO_EXPECTED_MEMPOOL_TXIDS`] can be used.
+    /// `expected_mempool_txs` is the initial set of unconfirmed transactions provided by the
+    /// wallet. This allows the [`Emitter`] to inform the wallet about relevant mempool evictions.
+    /// If it is known that the wallet is empty, [`NO_EXPECTED_MEMPOOL_TXIDS`] can be used.
     pub fn new(
         client: C,
         last_cp: CheckPoint,
         start_height: u32,
-        expected_mempool_txids: impl IntoIterator<Item = impl Into<Txid>>,
+        expected_mempool_txs: impl IntoIterator<Item = impl Into<Arc<Transaction>>>,
     ) -> Self {
         Self {
             client,
             start_height,
             last_cp,
             last_block: None,
-            last_mempool_time: 0,
-            last_mempool_tip: None,
-            expected_mempool_txids: expected_mempool_txids.into_iter().map(Into::into).collect(),
+            mempool_snapshot: expected_mempool_txs
+                .into_iter()
+                .map(|tx| {
+                    let tx: Arc<Transaction> = tx.into();
+                    (tx.compute_txid(), tx)
+                })
+                .collect(),
         }
     }
 
@@ -115,102 +116,81 @@ where
     pub fn mempool(&mut self) -> Result<MempoolEvent, bitcoincore_rpc::Error> {
         let client = &*self.client;
 
-        // This is the emitted tip height during the last mempool emission.
-        let prev_mempool_tip = self
-            .last_mempool_tip
-            // We use `start_height - 1` as we cannot guarantee that the block at
-            // `start_height` has been emitted.
-            .unwrap_or(self.start_height.saturating_sub(1));
-
-        // Loop to make sure that the fetched mempool content and the fetched tip are consistent
-        // with one another.
-        let (raw_mempool, raw_mempool_txids, rpc_height, rpc_block_hash) = loop {
-            // Determine if height and hash matches the best block from the RPC. Evictions are
-            // deferred if we are not at the best block.
-            let height = client.get_block_count()?;
-            let hash = client.get_block_hash(height)?;
-
-            // Get the raw mempool result from the RPC client which will be used to determine if any
-            // transactions have been evicted.
-            let mp = client.get_raw_mempool_verbose()?;
-            let mp_txids: HashSet<Txid> = mp.keys().copied().collect();
-
-            if height == client.get_block_count()? && hash == client.get_block_hash(height)? {
-                break (mp, mp_txids, height, hash);
+        let mut rpc_tip_height;
+        let mut rpc_tip_hash;
+        let mut rpc_mempool;
+        let mut rpc_mempool_txids;
+
+        // Ensure we get a mempool snapshot consistent with `rpc_tip_hash` as the tip.
+        loop {
+            rpc_tip_height = client.get_block_count()?;
+            rpc_tip_hash = client.get_block_hash(rpc_tip_height)?;
+            rpc_mempool = client.get_raw_mempool_verbose()?;
+            rpc_mempool_txids = rpc_mempool.keys().copied().collect::<HashSet<Txid>>();
+            let is_still_at_tip = rpc_tip_hash == client.get_block_hash(rpc_tip_height)?
+                && rpc_tip_height == client.get_block_count()?;
+            if is_still_at_tip {
+                break;
             }
-        };
-
-        let at_tip =
-            rpc_height == self.last_cp.height() as u64 && rpc_block_hash == self.last_cp.hash();
-
-        // If at tip, any expected txid missing from raw mempool is considered evicted;
-        // if not at tip, we don't evict anything.
-        let evicted_txids: HashSet<Txid> = if at_tip {
-            self.expected_mempool_txids
-                .difference(&raw_mempool_txids)
-                .copied()
-                .collect()
-        } else {
-            HashSet::new()
-        };
+        }
 
-        // Mempool txs come with a timestamp of when the tx is introduced to the mempool. We keep
-        // track of the latest mempool tx's timestamp to determine whether we have seen a tx
-        // before. `prev_mempool_time` is the previous timestamp and `last_time` records what will
-        // be the new latest timestamp.
-        let prev_mempool_time = self.last_mempool_time;
-        let mut latest_time = prev_mempool_time;
+        let mut mempool_event = MempoolEvent::default();
+        let update_time = &mut 0_u64;
 
-        let new_txs = raw_mempool
+        mempool_event.update = rpc_mempool
             .into_iter()
             .filter_map({
-                let latest_time = &mut latest_time;
-                move |(txid, tx_entry)| -> Option<Result<_, bitcoincore_rpc::Error>> {
-                    let tx_time = tx_entry.time as usize;
-                    if tx_time > *latest_time {
-                        *latest_time = tx_time;
-                    }
-                    // Best-effort check to avoid re-emitting transactions we've already emitted.
-                    //
-                    // Complete suppression isn't possible, since a transaction may spend outputs
-                    // owned by the wallet. To determine if such a transaction is relevant, we must
-                    // have already seen its ancestor(s) that contain the spent prevouts.
-                    //
-                    // Fortunately, bitcoind provides the block height at which the transaction
-                    // entered the mempool. If we've already emitted that block height, we can
-                    // reasonably assume the receiver has seen all ancestor transactions.
-                    let is_already_emitted = tx_time <= prev_mempool_time;
-                    let is_within_height = tx_entry.height <= prev_mempool_tip as _;
-                    if is_already_emitted && is_within_height {
-                        return None;
-                    }
-                    let tx = match client.get_raw_transaction(&txid, None) {
-                        Ok(tx) => tx,
-                        Err(err) if err.is_not_found_error() => return None,
-                        Err(err) => return Some(Err(err)),
+                |(txid, tx_entry)| -> Option<Result<_, bitcoincore_rpc::Error>> {
+                    *update_time = u64::max(*update_time, tx_entry.time);
+                    let tx = match self.mempool_snapshot.get(&txid) {
+                        Some(tx) => tx.clone(),
+                        None => match client.get_raw_transaction(&txid, None) {
+                            Ok(tx) => {
+                                let tx = Arc::new(tx);
+                                self.mempool_snapshot.insert(txid, tx.clone());
+                                tx
+                            }
+                            Err(err) if err.is_not_found_error() => return None,
+                            Err(err) => return Some(Err(err)),
+                        },
                     };
-                    Some(Ok((tx, tx_time as u64)))
+                    Some(Ok((tx, tx_entry.time)))
                 }
             })
             .collect::<Result<Vec<_>, _>>()?;
 
-        self.last_mempool_time = latest_time;
-        self.last_mempool_tip = Some(self.last_cp.height());
+        let at_tip =
+            rpc_tip_height == self.last_cp.height() as u64 && rpc_tip_hash == self.last_cp.hash();
 
-        // If at tip, we replace `expected_mempool_txids` with just the new txids. Otherwise, we’re
-        // still catching up to the tip and keep accumulating.
         if at_tip {
-            self.expected_mempool_txids = new_txs.iter().map(|(tx, _)| tx.compute_txid()).collect();
+            // We only emit evicted transactions when we have already emitted the RPC tip. This is
+            // because we cannot differenciate between transactions that are confirmed and
+            // transactions that are evicted, so we rely on emitted blocks to remove
+            // transactions from the `mempool_snapshot`.
+            mempool_event.evicted = self
+                .mempool_snapshot
+                .keys()
+                .filter(|&txid| !rpc_mempool_txids.contains(txid))
+                .map(|&txid| (txid, *update_time))
+                .collect();
+            self.mempool_snapshot = mempool_event
+                .update
+                .iter()
+                .map(|(tx, _)| (tx.compute_txid(), tx.clone()))
+                .collect();
         } else {
-            self.expected_mempool_txids
-                .extend(new_txs.iter().map(|(tx, _)| tx.compute_txid()));
-        }
+            // Since we are still catching up to the tip (a.k.a tip has not been emitted), we
+            // accumulate more transactions in `mempool_snapshot` so that we can emit evictions in
+            // a batch once we catch up.
+            self.mempool_snapshot.extend(
+                mempool_event
+                    .update
+                    .iter()
+                    .map(|(tx, _)| (tx.compute_txid(), tx.clone())),
+            );
+        };
 
-        Ok(MempoolEvent {
-            new_txs,
-            evicted_txids,
-            latest_update_time: latest_time as u64,
-        })
+        Ok(mempool_event)
     }
 
     /// Emit the next block height and block (if any).
@@ -218,7 +198,7 @@ where
         if let Some((checkpoint, block)) = poll(self, move |hash, client| client.get_block(hash))? {
             // Stop tracking unconfirmed transactions that have been confirmed in this block.
             for tx in &block.txdata {
-                self.expected_mempool_txids.remove(&tx.compute_txid());
+                self.mempool_snapshot.remove(&tx.compute_txid());
             }
             return Ok(Some(BlockEvent { block, checkpoint }));
         }
@@ -227,32 +207,13 @@ where
 }
 
 /// A new emission from mempool.
-#[derive(Debug)]
+#[derive(Debug, Default)]
 pub struct MempoolEvent {
-    /// Unemitted transactions or transactions with ancestors that are unseen by the receiver.
-    ///
-    /// To understand the second condition, consider a receiver which filters transactions based on
-    /// whether it alters the UTXO set of tracked script pubkeys. If an emitted mempool transaction
-    /// spends a tracked UTXO which is confirmed at height `h`, but the receiver has only seen up
-    /// to block of height `h-1`, we want to re-emit this transaction until the receiver has
-    /// seen the block at height `h`.
-    pub new_txs: Vec<(Transaction, u64)>,
-
-    /// [`Txid`]s of all transactions that have been evicted from mempool.
-    pub evicted_txids: HashSet<Txid>,
+    /// Transactions currently in the mempool alongside their seen-at timestamp.
+    pub update: Vec<(Arc<Transaction>, u64)>,
 
-    /// The latest timestamp of when a transaction entered the mempool.
-    ///
-    /// This is useful for setting the timestamp for evicted transactions.
-    pub latest_update_time: u64,
-}
-
-impl MempoolEvent {
-    /// Returns an iterator of `(txid, evicted_at)` pairs for all evicted transactions.
-    pub fn evicted_ats(&self) -> impl ExactSizeIterator<Item = (Txid, u64)> + '_ {
-        let time = self.latest_update_time;
-        self.evicted_txids.iter().map(move |&txid| (txid, time))
-    }
+    /// Transactions evicted from the mempool alongside their evicted-at timestamp.
+    pub evicted: Vec<(Txid, u64)>,
 }
 
 /// A newly emitted block from [`Emitter`].
@@ -396,16 +357,6 @@ where
                 continue;
             }
             PollResponse::AgreementFound(res, cp) => {
-                let agreement_h = res.height as u32;
-
-                // The tip during the last mempool emission needs to in the best chain, we reduce
-                // it if it is not.
-                if let Some(h) = emitter.last_mempool_tip.as_mut() {
-                    if *h > agreement_h {
-                        *h = agreement_h;
-                    }
-                }
-
                 // get rid of evicted blocks
                 emitter.last_cp = cp;
                 emitter.last_block = Some(res);
@@ -479,7 +430,7 @@ mod test {
 
             for txid in &mempool_txids {
                 assert!(
-                    emitter.expected_mempool_txids.contains(txid),
+                    emitter.mempool_snapshot.contains_key(txid),
                     "Expected txid {txid:?} missing"
                 );
             }
@@ -500,19 +451,19 @@ mod test {
                 .collect::<HashSet<_>>();
             for txid in confirmed_txids {
                 assert!(
-                    !emitter.expected_mempool_txids.contains(&txid),
+                    !emitter.mempool_snapshot.contains_key(&txid),
                     "Expected txid {txid:?} should have been removed"
                 );
             }
             for txid in &mempool_txids {
                 assert!(
-                    emitter.expected_mempool_txids.contains(txid),
+                    emitter.mempool_snapshot.contains_key(txid),
                     "Expected txid {txid:?} missing"
                 );
             }
         }
 
-        assert!(emitter.expected_mempool_txids.is_empty());
+        assert!(emitter.mempool_snapshot.is_empty());
 
         Ok(())
     }
index 187325a884449655bb6c9fbbdabcfa4ef29e6187..0ccb7fe61b849f676b5cce27639dba2ecc31110a 100644 (file)
@@ -1,7 +1,4 @@
-use std::{
-    collections::{BTreeMap, BTreeSet, HashSet},
-    ops::Deref,
-};
+use std::{collections::BTreeSet, ops::Deref};
 
 use bdk_bitcoind_rpc::{Emitter, NO_EXPECTED_MEMPOOL_TXIDS};
 use bdk_chain::{
@@ -197,7 +194,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
         assert!(emitter.next_block()?.is_none());
 
         let mempool_txs = emitter.mempool()?;
-        let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs.new_txs);
+        let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs.update);
         assert_eq!(
             indexed_additions
                 .tx_graph
@@ -449,250 +446,36 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
         .map(|_| env.send(&addr, Amount::from_sat(2100)))
         .collect::<Result<BTreeSet<Txid>, _>>()?;
 
-    // the first emission should include all transactions
-    let emitted_txids = emitter
-        .mempool()?
-        .new_txs
-        .into_iter()
-        .map(|(tx, _)| tx.compute_txid())
-        .collect::<BTreeSet<Txid>>();
-    assert_eq!(
-        emitted_txids, exp_txids,
-        "all mempool txs should be emitted"
-    );
-
-    // second emission should be empty
-    assert!(
-        emitter.mempool()?.new_txs.is_empty(),
-        "second emission should be empty"
-    );
-
-    // mine empty blocks + sync up our emitter -> we should still not re-emit
-    for _ in 0..BLOCKS_TO_MINE {
-        env.mine_empty_block()?;
-    }
-    while emitter.next_block()?.is_some() {}
-    assert!(
-        emitter.mempool()?.new_txs.is_empty(),
-        "third emission, after chain tip is extended, should also be empty"
-    );
-
-    Ok(())
-}
-
-/// Ensure mempool tx is still re-emitted if [`Emitter`] has not reached the tx's introduction
-/// height.
-///
-/// We introduce a mempool tx after each block, where blocks are empty (does not confirm previous
-/// mempool txs). Then we emit blocks from [`Emitter`] (intertwining `mempool` calls). We check
-/// that `mempool` should always re-emit txs that have introduced at a height greater than the last
-/// emitted block height.
-#[test]
-fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()> {
-    const PREMINE_COUNT: usize = 101;
-    const MEMPOOL_TX_COUNT: usize = 21;
-
-    let env = TestEnv::new()?;
-    let mut emitter = Emitter::new(
-        env.rpc_client(),
-        CheckPoint::new(BlockId {
-            height: 0,
-            hash: env.rpc_client().get_block_hash(0)?,
-        }),
-        0,
-        NO_EXPECTED_MEMPOOL_TXIDS,
-    );
-
-    // mine blocks to get initial balance, sync emitter up to tip
-    let addr = env
-        .rpc_client()
-        .get_new_address(None, None)?
-        .assume_checked();
-    env.mine_blocks(PREMINE_COUNT, Some(addr.clone()))?;
-    while emitter.next_block()?.is_some() {}
-
-    // mine blocks to introduce txs to mempool at different heights
-    let tx_introductions = (0..MEMPOOL_TX_COUNT)
-        .map(|_| -> anyhow::Result<_> {
-            let (height, _) = env.mine_empty_block()?;
-            let txid = env.send(&addr, Amount::from_sat(2100))?;
-            Ok((height, txid))
-        })
-        .collect::<anyhow::Result<BTreeSet<_>>>()?;
-
-    assert_eq!(
-        emitter
-            .mempool()?
-            .new_txs
-            .into_iter()
-            .map(|(tx, _)| tx.compute_txid())
-            .collect::<BTreeSet<_>>(),
-        tx_introductions.iter().map(|&(_, txid)| txid).collect(),
-        "first mempool emission should include all txs",
-    );
-    assert_eq!(
-        emitter
+    // First two emissions should include all transactions.
+    for _ in 0..2 {
+        let emitted_txids = emitter
             .mempool()?
-            .new_txs
+            .update
             .into_iter()
             .map(|(tx, _)| tx.compute_txid())
-            .collect::<BTreeSet<_>>(),
-        tx_introductions.iter().map(|&(_, txid)| txid).collect(),
-        "second mempool emission should still include all txs",
-    );
-
-    // At this point, the emitter has seen all mempool transactions. It should only re-emit those
-    // that have introduction heights less than the emitter's last-emitted block tip.
-    while let Some(emission) = emitter.next_block()? {
-        let height = emission.block_height();
-        // We call `mempool()` twice.
-        // The second call (at height `h`) should skip the tx introduced at height `h`.
-        for try_index in 0..2 {
-            let exp_txids = tx_introductions
-                .range((height as usize + try_index, Txid::all_zeros())..)
-                .map(|&(_, txid)| txid)
-                .collect::<BTreeSet<_>>();
-            let emitted_txids = emitter
-                .mempool()?
-                .new_txs
-                .into_iter()
-                .map(|(tx, _)| tx.compute_txid())
-                .collect::<BTreeSet<_>>();
-            assert_eq!(
-                emitted_txids, exp_txids,
-                "\n emission {} (try {}) must only contain txs introduced at that height or lower: \n\t missing: {:?} \n\t extra: {:?}",
-                height,
-                try_index,
-                exp_txids
-                    .difference(&emitted_txids)
-                    .map(|txid| (txid, tx_introductions.iter().find_map(|(h, id)| if id == txid { Some(h) } else { None }).unwrap()))
-                    .collect::<Vec<_>>(),
-                emitted_txids
-                    .difference(&exp_txids)
-                    .map(|txid| (txid, tx_introductions.iter().find_map(|(h, id)| if id == txid { Some(h) } else { None }).unwrap()))
-                    .collect::<Vec<_>>(),
-            );
-        }
+            .collect::<BTreeSet<Txid>>();
+        assert_eq!(
+            emitted_txids, exp_txids,
+            "all mempool txs should be emitted"
+        );
     }
 
-    Ok(())
-}
-
-/// Ensure we force re-emit all mempool txs after reorg.
-#[test]
-fn mempool_during_reorg() -> anyhow::Result<()> {
-    const TIP_DIFF: usize = 10;
-    const PREMINE_COUNT: usize = 101;
-
-    let env = TestEnv::new()?;
-    let mut emitter = Emitter::new(
-        env.rpc_client(),
-        CheckPoint::new(BlockId {
-            height: 0,
-            hash: env.rpc_client().get_block_hash(0)?,
-        }),
-        0,
-        NO_EXPECTED_MEMPOOL_TXIDS,
-    );
-
-    // mine blocks to get initial balance
-    let addr = env
-        .rpc_client()
-        .get_new_address(None, None)?
-        .assume_checked();
-    env.mine_blocks(PREMINE_COUNT, Some(addr.clone()))?;
-
-    // introduce mempool tx at each block extension
-    for _ in 0..TIP_DIFF {
+    // mine empty blocks + sync up our emitter -> we should still not re-emit
+    for _ in 0..BLOCKS_TO_MINE {
         env.mine_empty_block()?;
-        env.send(&addr, Amount::from_sat(2100))?;
     }
-
-    // sync emitter to tip, first mempool emission should include all txs (as we haven't emitted
-    // from the mempool yet)
     while emitter.next_block()?.is_some() {}
+    let emitted_txids = emitter
+        .mempool()?
+        .update
+        .into_iter()
+        .map(|(tx, _)| tx.compute_txid())
+        .collect::<BTreeSet<Txid>>();
     assert_eq!(
-        emitter
-            .mempool()?
-            .new_txs
-            .into_iter()
-            .map(|(tx, _)| tx.compute_txid())
-            .collect::<BTreeSet<_>>(),
-        env.rpc_client()
-            .get_raw_mempool()?
-            .into_iter()
-            .collect::<BTreeSet<_>>(),
-        "first mempool emission should include all txs",
+        emitted_txids, exp_txids,
+        "all mempool txs should be emitted"
     );
 
-    // perform reorgs at different heights, these reorgs will not confirm transactions in the
-    // mempool
-    for reorg_count in 1..TIP_DIFF {
-        env.reorg_empty_blocks(reorg_count)?;
-
-        // This is a map of mempool txids to tip height where the tx was introduced to the mempool
-        // we recalculate this at every loop as reorgs may evict transactions from mempool. We use
-        // the introduction height to determine whether we expect a tx to appear in a mempool
-        // emission.
-        // TODO: How can have have reorg logic in `TestEnv` NOT blacklast old blocks first?
-        let tx_introductions = dbg!(env
-            .rpc_client()
-            .get_raw_mempool_verbose()?
-            .into_iter()
-            .map(|(txid, entry)| (txid, entry.height as usize))
-            .collect::<BTreeMap<_, _>>());
-
-        // `next_header` emits the replacement block of the reorg
-        if let Some(emission) = emitter.next_block()? {
-            let height = emission.block_height();
-
-            // the mempool emission (that follows the first block emission after reorg) should only
-            // include mempool txs introduced at reorg height or greater
-            let mempool = emitter
-                .mempool()?
-                .new_txs
-                .into_iter()
-                .map(|(tx, _)| tx.compute_txid())
-                .collect::<BTreeSet<_>>();
-            let exp_mempool = tx_introductions
-                .iter()
-                .filter(|(_, &intro_h)| intro_h >= (height as usize))
-                .map(|(&txid, _)| txid)
-                .collect::<BTreeSet<_>>();
-            assert_eq!(
-                mempool, exp_mempool,
-                "the first mempool emission after reorg should only include mempool txs introduced at reorg height or greater"
-            );
-
-            let mempool = emitter
-                .mempool()?
-                .new_txs
-                .into_iter()
-                .map(|(tx, _)| tx.compute_txid())
-                .collect::<BTreeSet<_>>();
-            let exp_mempool = tx_introductions
-                .iter()
-                .filter(|&(_, &intro_height)| intro_height > (height as usize))
-                .map(|(&txid, _)| txid)
-                .collect::<BTreeSet<_>>();
-            assert_eq!(
-                mempool, exp_mempool,
-                "following mempool emissions after reorg should exclude mempool introduction heights <= last emitted block height: \n\t missing: {:?} \n\t extra: {:?}",
-                exp_mempool
-                    .difference(&mempool)
-                    .map(|txid| (txid, tx_introductions.get(txid).unwrap()))
-                    .collect::<Vec<_>>(),
-                mempool
-                    .difference(&exp_mempool)
-                    .map(|txid| (txid, tx_introductions.get(txid).unwrap()))
-                    .collect::<Vec<_>>(),
-            );
-        }
-
-        // sync emitter to tip
-        while emitter.next_block()?.is_some() {}
-    }
-
     Ok(())
 }
 
@@ -803,14 +586,18 @@ fn test_expect_tx_evicted() -> anyhow::Result<()> {
         &Address::from_script(&spk, Network::Regtest)?,
         Amount::ONE_BTC,
     )?;
+    let tx_1 = env
+        .rpc_client()
+        .get_transaction(&txid_1, None)?
+        .transaction()?;
 
-    let mut emitter = Emitter::new(env.rpc_client(), chain.tip(), 1, HashSet::from([txid_1]));
+    let mut emitter = Emitter::new(env.rpc_client(), chain.tip(), 1, core::iter::once(tx_1));
     while let Some(emission) = emitter.next_block()? {
         let height = emission.block_height();
         chain.apply_update(CheckPoint::from_header(&emission.block.header, height))?;
     }
 
-    let changeset = graph.batch_insert_unconfirmed(emitter.mempool()?.new_txs);
+    let changeset = graph.batch_insert_unconfirmed(emitter.mempool()?.update);
     assert!(changeset
         .tx_graph
         .txs
@@ -852,10 +639,13 @@ fn test_expect_tx_evicted() -> anyhow::Result<()> {
 
     // Check that mempool emission contains evicted txid.
     let mempool_event = emitter.mempool()?;
-    assert!(mempool_event.evicted_txids.contains(&txid_1));
+    assert!(mempool_event
+        .evicted
+        .iter()
+        .any(|(txid, _)| txid == &txid_1));
 
     // Update graph with evicted tx.
-    let _ = graph.batch_insert_relevant_evicted_at(mempool_event.evicted_ats());
+    let _ = graph.batch_insert_relevant_evicted_at(mempool_event.evicted);
 
     let canonical_txids = graph
         .graph()
@@ -888,11 +678,11 @@ fn detect_new_mempool_txs() -> anyhow::Result<()> {
         NO_EXPECTED_MEMPOOL_TXIDS,
     );
 
-    while let Some(_) = emitter.next_block()? {}
+    while emitter.next_block()?.is_some() {}
 
     for n in 0..5 {
         let txid = env.send(&addr, Amount::ONE_BTC)?;
-        let new_txs = emitter.mempool()?.new_txs;
+        let new_txs = emitter.mempool()?.update;
         assert!(
             new_txs.iter().any(|(tx, _)| tx.compute_txid() == txid),
             "must detect new tx {n}"
index 51e5bc23b71586f079099681ae72b155e6df1efc..cb710151a858422e494d4691d8a6e7adece8795b 100644 (file)
@@ -217,7 +217,7 @@ fn main() -> anyhow::Result<()> {
             let graph_changeset = graph
                 .lock()
                 .unwrap()
-                .batch_insert_relevant_unconfirmed(mempool_txs.new_txs);
+                .batch_insert_relevant_unconfirmed(mempool_txs.update);
             {
                 let db = &mut *db.lock().unwrap();
                 db_stage.merge(ChangeSet {
@@ -315,10 +315,9 @@ fn main() -> anyhow::Result<()> {
                     }
                     Emission::Mempool(mempool_txs) => {
                         let mut graph_changeset =
-                            graph.batch_insert_relevant_unconfirmed(mempool_txs.new_txs.clone());
-                        graph_changeset.merge(
-                            graph.batch_insert_relevant_evicted_at(mempool_txs.evicted_ats()),
-                        );
+                            graph.batch_insert_relevant_unconfirmed(mempool_txs.update.clone());
+                        graph_changeset
+                            .merge(graph.batch_insert_relevant_evicted_at(mempool_txs.evicted));
                         (local_chain::ChangeSet::default(), graph_changeset)
                     }
                     Emission::Tip(h) => {