]> Untitled Git - bdk/commitdiff
bitcoind_rpc: rm `BlockHash` from `Emitter::last_mempool_tip`
author志宇 <hello@evanlinjin.me>
Fri, 6 Oct 2023 09:39:22 +0000 (17:39 +0800)
committer志宇 <hello@evanlinjin.me>
Mon, 9 Oct 2023 14:14:04 +0000 (22:14 +0800)
Instead of comparing the blockhash against the emitted_blocks map
to see whether the block is part of the emitter's best chain, we
reduce the `last_mempool_tip` height to the last agreement height
during the polling logic.

The benefits of this is we have tighter bounds for avoiding re-
emission. Also, it will be easier to replace `emitted_blocks` to
a `CheckPoint` (since we no longer rely on map lookup).

crates/bitcoind_rpc/src/lib.rs
crates/bitcoind_rpc/tests/test_emitter.rs

index a4b28c8e8c7ccdfef8ce34e28b26299942d53bca..8ed646c81a20b19be4f4ab708804b0e3f17cbd47 100644 (file)
@@ -33,7 +33,7 @@ pub struct Emitter<'c, C> {
 
     /// The last emitted block during our last mempool emission. This is used to determine whether
     /// there has been a reorg since our last mempool emission.
-    last_mempool_tip: Option<(u32, BlockHash)>,
+    last_mempool_tip: Option<u32>,
 }
 
 impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
@@ -65,12 +65,17 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
     pub fn mempool(&mut self) -> Result<Vec<(Transaction, u64)>, bitcoincore_rpc::Error> {
         let client = self.client;
 
-        let prev_mempool_tip = match self.last_mempool_tip {
-            // use 'avoid-re-emission' logic if there is no reorg
-            Some((height, hash)) if self.emitted_blocks.get(&height) == Some(&hash) => height,
-            _ => 0,
-        };
-
+        // This is the emitted tip height during the last mempool emission.
+        let prev_mempool_tip = self
+            .last_mempool_tip
+            // We use `start_height - 1` as we cannot guarantee that the block at
+            // `start_height` has been emitted.
+            .unwrap_or(self.start_height.saturating_sub(1));
+
+        // Mempool txs come with a timestamp of when the tx is introduced to the mempool. We keep
+        // track of the latest mempool tx's timestamp to determine whether we have seen a tx
+        // before. `prev_mempool_time` is the previous timestamp and `last_time` records what will
+        // be the new latest timestamp.
         let prev_mempool_time = self.last_mempool_time;
         let mut latest_time = prev_mempool_time;
 
@@ -109,11 +114,7 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
             .collect::<Result<Vec<_>, _>>()?;
 
         self.last_mempool_time = latest_time;
-        self.last_mempool_tip = self
-            .emitted_blocks
-            .iter()
-            .last()
-            .map(|(&height, &hash)| (height, hash));
+        self.last_mempool_tip = self.emitted_blocks.iter().last().map(|(&height, _)| height);
 
         Ok(txs_to_emit)
     }
@@ -209,7 +210,18 @@ where
                 continue;
             }
             PollResponse::AgreementFound(res) => {
-                emitter.emitted_blocks.split_off(&(res.height as u32 + 1));
+                let agreement_h = res.height as u32;
+
+                // get rid of evicted blocks
+                emitter.emitted_blocks.split_off(&(agreement_h + 1));
+
+                // The tip during the last mempool emission needs to in the best chain, we reduce
+                // it if it is not.
+                if let Some(h) = emitter.last_mempool_tip.as_mut() {
+                    if *h > agreement_h {
+                        *h = agreement_h;
+                    }
+                }
                 emitter.last_block = Some(res);
                 continue;
             }
index 601fb5616038264fc24a0556650d8c4b53e0e62b..5d57bde18854c7a1b008fb88d54d2395d4b92530 100644 (file)
@@ -368,7 +368,8 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
     // must receive mined block which will confirm the transactions.
     {
         let (height, block) = emitter.next_block()?.expect("must get mined block");
-        let _ = chain.apply_update(block_to_chain_update(&block, height))?;
+        let _ = chain
+            .apply_update(CheckPoint::from_header(&block.header, height).into_update(false))?;
         let indexed_additions = indexed_tx_graph.apply_block_relevant(block, height);
         assert!(indexed_additions.graph.txs.is_empty());
         assert!(indexed_additions.graph.txouts.is_empty());
@@ -685,34 +686,59 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
         env.send(&addr, Amount::from_sat(2100))?;
     }
 
-    // perform reorgs at different heights
-    for reorg_count in 1..TIP_DIFF {
-        // sync emitter to tip
-        while emitter.next_header()?.is_some() {}
+    // sync emitter to tip, first mempool emission should include all txs (as we haven't emitted
+    // from the mempool yet)
+    while emitter.next_header()?.is_some() {}
+    assert_eq!(
+        emitter
+            .mempool()?
+            .into_iter()
+            .map(|(tx, _)| tx.txid())
+            .collect::<BTreeSet<_>>(),
+        env.client
+            .get_raw_mempool()?
+            .into_iter()
+            .collect::<BTreeSet<_>>(),
+        "first mempool emission should include all txs",
+    );
 
+    // perform reorgs at different heights, these reorgs will not comfirm transactions in the
+    // mempool
+    for reorg_count in 1..TIP_DIFF {
         println!("REORG COUNT: {}", reorg_count);
         env.reorg_empty_blocks(reorg_count)?;
 
-        // we recalculate this at every loop as reorgs may evict transactions from mempool
-        let tx_introductions = env
+        // This is a map of mempool txids to tip height where the tx was introduced to the mempool
+        // we recalculate this at every loop as reorgs may evict transactions from mempool. We use
+        // the introduction height to determine whether we expect a tx to appear in a mempool
+        // emission.
+        // TODO: How can have have reorg logic in `TestEnv` NOT blacklast old blocks first?
+        let tx_introductions = dbg!(env
             .client
             .get_raw_mempool_verbose()?
             .into_iter()
             .map(|(txid, entry)| (txid, entry.height as usize))
-            .collect::<BTreeMap<_, _>>();
+            .collect::<BTreeMap<_, _>>());
 
+        // `next_header` emits the replacement block of the reorg
         if let Some((height, _)) = emitter.next_header()? {
-            // the mempool emission (that follows the first block emission after reorg) should return
-            // the entire mempool contents
+            println!("\t- replacement height: {}", height);
+
+            // the mempool emission (that follows the first block emission after reorg) should only
+            // include mempool txs introduced at reorg height or greater
             let mempool = emitter
                 .mempool()?
                 .into_iter()
                 .map(|(tx, _)| tx.txid())
                 .collect::<BTreeSet<_>>();
-            let exp_mempool = tx_introductions.keys().copied().collect::<BTreeSet<_>>();
+            let exp_mempool = tx_introductions
+                .iter()
+                .filter(|(_, &intro_h)| intro_h >= (height as usize))
+                .map(|(&txid, _)| txid)
+                .collect::<BTreeSet<_>>();
             assert_eq!(
                 mempool, exp_mempool,
-                "the first mempool emission after reorg should include all mempool txs"
+                "the first mempool emission after reorg should only include mempool txs introduced at reorg height or greater"
             );
 
             let mempool = emitter
@@ -738,6 +764,9 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
                     .collect::<Vec<_>>(),
             );
         }
+
+        // sync emitter to tip
+        while emitter.next_header()?.is_some() {}
     }
 
     Ok(())