]> Untitled Git - bdk/commitdiff
bitcoind_rpc!: bring back `CheckPoint`s to `Emitter`
author志宇 <hello@evanlinjin.me>
Fri, 6 Oct 2023 16:56:01 +0000 (00:56 +0800)
committer志宇 <hello@evanlinjin.me>
Mon, 9 Oct 2023 14:14:04 +0000 (22:14 +0800)
* `bdk_chain` dependency is added. In the future, we will introduce a
  separate `bdk_core` crate to contain shared types.
* replace `Emitter::new` with `from_height` and `from_checkpoint`
  * `from_height` emits from the given start height
  * `from_checkpoint` uses the provided cp to find agreement point
* introduce logic that ensures emitted blocks can connect with
  receiver's `LocalChain`
* in our rpc example, we can now `expect()` chain updates to always
  since we are using checkpoints and receiving blocks in order

crates/bitcoind_rpc/Cargo.toml
crates/bitcoind_rpc/src/lib.rs
crates/bitcoind_rpc/tests/test_emitter.rs
example-crates/example_bitcoind_rpc_polling/src/main.rs

index eeb9de581f0dd33464d30d347a4b27444cce4a18..f04469d27fe5e894d9159452efd33b6e1443ce43 100644 (file)
@@ -9,13 +9,13 @@ edition = "2021"
 # For no-std, remember to enable the bitcoin/no-std feature
 bitcoin = { version = "0.30", default-features = false }
 bitcoincore-rpc = { version = "0.17" }
+bdk_chain = { path = "../chain", version = "0.5", default-features = false }
 
 [dev-dependencies]
-bdk_chain = { path = "../chain", version = "0.5", features = ["serde", "miniscript"] }
 bitcoind = { version = "0.33", features = ["25_0"] }
 anyhow = { version = "1" }
 
 [features]
 default = ["std"]
-std = ["bitcoin/std"]
-serde = ["bitcoin/serde"]
+std = ["bitcoin/std", "bdk_chain/std"]
+serde = ["bitcoin/serde", "bdk_chain/serde"]
index 8ed646c81a20b19be4f4ab708804b0e3f17cbd47..f200550bd390bf4f5f386023d8aac105aa98fca9 100644 (file)
@@ -9,8 +9,7 @@
 //! mempool.
 #![warn(missing_docs)]
 
-use std::collections::BTreeMap;
-
+use bdk_chain::{local_chain::CheckPoint, BlockId};
 use bitcoin::{block::Header, Block, BlockHash, Transaction};
 pub use bitcoincore_rpc;
 use bitcoincore_rpc::bitcoincore_rpc_json;
@@ -24,7 +23,7 @@ pub struct Emitter<'c, C> {
     client: &'c C,
     start_height: u32,
 
-    emitted_blocks: BTreeMap<u32, BlockHash>,
+    last_cp: Option<CheckPoint>,
     last_block: Option<bitcoincore_rpc_json::GetBlockResult>,
 
     /// The latest first-seen epoch of emitted mempool transactions. This is used to determine
@@ -37,14 +36,29 @@ pub struct Emitter<'c, C> {
 }
 
 impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
-    /// Constructs a new [`Emitter`] with the provided [`bitcoincore_rpc::Client`].
+    /// Construct a new [`Emitter`] with the given RPC `client` and `start_height`.
     ///
     /// `start_height` is the block height to start emitting blocks from.
-    pub fn new(client: &'c C, start_height: u32) -> Self {
+    pub fn from_height(client: &'c C, start_height: u32) -> Self {
         Self {
             client,
             start_height,
-            emitted_blocks: BTreeMap::new(),
+            last_cp: None,
+            last_block: None,
+            last_mempool_time: 0,
+            last_mempool_tip: None,
+        }
+    }
+
+    /// Construct a new [`Emitter`] with the given RPC `client` and `checkpoint`.
+    ///
+    /// `checkpoint` is used to find the latest block which is still part of the best chain. The
+    /// [`Emitter`] will emit blocks starting right above this block.
+    pub fn from_checkpoint(client: &'c C, checkpoint: CheckPoint) -> Self {
+        Self {
+            client,
+            start_height: 0,
+            last_cp: Some(checkpoint),
             last_block: None,
             last_mempool_time: 0,
             last_mempool_tip: None,
@@ -114,7 +128,7 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
             .collect::<Result<Vec<_>, _>>()?;
 
         self.last_mempool_time = latest_time;
-        self.last_mempool_tip = self.emitted_blocks.iter().last().map(|(&height, _)| height);
+        self.last_mempool_tip = self.last_cp.as_ref().map(|cp| cp.height());
 
         Ok(txs_to_emit)
     }
@@ -135,7 +149,7 @@ enum PollResponse {
     NoMoreBlocks,
     /// Fetched block is not in the best chain.
     BlockNotInBestChain,
-    AgreementFound(bitcoincore_rpc_json::GetBlockResult),
+    AgreementFound(bitcoincore_rpc_json::GetBlockResult, CheckPoint),
     AgreementPointNotFound,
 }
 
@@ -146,7 +160,10 @@ where
     let client = emitter.client;
 
     if let Some(last_res) = &emitter.last_block {
-        assert!(!emitter.emitted_blocks.is_empty());
+        assert!(
+            emitter.last_cp.is_some(),
+            "must not have block result without last cp"
+        );
 
         let next_hash = match last_res.nextblockhash {
             None => return Ok(PollResponse::NoMoreBlocks),
@@ -160,7 +177,7 @@ where
         return Ok(PollResponse::Block(res));
     }
 
-    if emitter.emitted_blocks.is_empty() {
+    if emitter.last_cp.is_none() {
         let hash = client.get_block_hash(emitter.start_height as _)?;
 
         let res = client.get_block_info(&hash)?;
@@ -170,15 +187,15 @@ where
         return Ok(PollResponse::Block(res));
     }
 
-    for (&_, hash) in emitter.emitted_blocks.iter().rev() {
-        let res = client.get_block_info(hash)?;
+    for cp in emitter.last_cp.iter().flat_map(CheckPoint::iter) {
+        let res = client.get_block_info(&cp.hash())?;
         if res.confirmations < 0 {
             // block is not in best chain
             continue;
         }
 
         // agreement point found
-        return Ok(PollResponse::AgreementFound(res));
+        return Ok(PollResponse::AgreementFound(res, cp));
     }
 
     Ok(PollResponse::AgreementPointNotFound)
@@ -196,9 +213,28 @@ where
         match poll_once(emitter)? {
             PollResponse::Block(res) => {
                 let height = res.height as u32;
-                let item = get_item(&res.hash)?;
-                assert_eq!(emitter.emitted_blocks.insert(height, res.hash), None);
+                let hash = res.hash;
+                let item = get_item(&hash)?;
+
+                let this_id = BlockId { height, hash };
+                let prev_id = res.previousblockhash.map(|prev_hash| BlockId {
+                    height: height - 1,
+                    hash: prev_hash,
+                });
+
+                match (&mut emitter.last_cp, prev_id) {
+                    (Some(cp), _) => *cp = cp.clone().push(this_id).expect("must push"),
+                    (last_cp, None) => *last_cp = Some(CheckPoint::new(this_id)),
+                    // When the receiver constructs a local_chain update from a block, the previous
+                    // checkpoint is also included in the update. We need to reflect this state in
+                    // `Emitter::last_cp` as well.
+                    (last_cp, Some(prev_id)) => {
+                        *last_cp = Some(CheckPoint::new(prev_id).push(this_id).expect("must push"))
+                    }
+                }
+
                 emitter.last_block = Some(res);
+
                 return Ok(Some((height, item)));
             }
             PollResponse::NoMoreBlocks => {
@@ -209,11 +245,11 @@ where
                 emitter.last_block = None;
                 continue;
             }
-            PollResponse::AgreementFound(res) => {
+            PollResponse::AgreementFound(res, cp) => {
                 let agreement_h = res.height as u32;
 
                 // get rid of evicted blocks
-                emitter.emitted_blocks.split_off(&(agreement_h + 1));
+                emitter.last_cp = Some(cp);
 
                 // The tip during the last mempool emission needs to in the best chain, we reduce
                 // it if it is not.
@@ -226,7 +262,11 @@ where
                 continue;
             }
             PollResponse::AgreementPointNotFound => {
-                emitter.emitted_blocks.clear();
+                // We want to clear `last_cp` and set `start_height` to the first checkpoint's
+                // height. This way, the first checkpoint in `LocalChain` can be replaced.
+                if let Some(last_cp) = emitter.last_cp.take() {
+                    emitter.start_height = last_cp.height();
+                }
                 emitter.last_block = None;
                 continue;
             }
index 5d57bde18854c7a1b008fb88d54d2395d4b92530..f0bbd3d15806b40f479d399fc9a2c42f872aea48 100644 (file)
@@ -189,7 +189,7 @@ fn block_to_chain_update(block: &bitcoin::Block, height: u32) -> local_chain::Up
 pub fn test_sync_local_chain() -> anyhow::Result<()> {
     let env = TestEnv::new()?;
     let mut local_chain = LocalChain::default();
-    let mut emitter = Emitter::new(&env.client, 0);
+    let mut emitter = Emitter::from_height(&env.client, 0);
 
     // mine some blocks and returned the actual block hashes
     let exp_hashes = {
@@ -305,7 +305,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
         index
     });
 
-    let emitter = &mut Emitter::new(&env.client, 0);
+    let emitter = &mut Emitter::from_height(&env.client, 0);
 
     while let Some((height, block)) = emitter.next_block()? {
         let _ = chain.apply_update(block_to_chain_update(&block, height))?;
@@ -393,7 +393,7 @@ fn ensure_block_emitted_after_reorg_is_at_reorg_height() -> anyhow::Result<()> {
     const CHAIN_TIP_HEIGHT: usize = 110;
 
     let env = TestEnv::new()?;
-    let mut emitter = Emitter::new(&env.client, EMITTER_START_HEIGHT as _);
+    let mut emitter = Emitter::from_height(&env.client, EMITTER_START_HEIGHT as _);
 
     env.mine_blocks(CHAIN_TIP_HEIGHT, None)?;
     while emitter.next_header()?.is_some() {}
@@ -461,7 +461,7 @@ fn tx_can_become_unconfirmed_after_reorg() -> anyhow::Result<()> {
     const SEND_AMOUNT: Amount = Amount::from_sat(10_000);
 
     let env = TestEnv::new()?;
-    let mut emitter = Emitter::new(&env.client, 0);
+    let mut emitter = Emitter::from_height(&env.client, 0);
 
     // setup addresses
     let addr_to_mine = env.client.get_new_address(None, None)?.assume_checked();
@@ -542,7 +542,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
     const MEMPOOL_TX_COUNT: usize = 2;
 
     let env = TestEnv::new()?;
-    let mut emitter = Emitter::new(&env.client, 0);
+    let mut emitter = Emitter::from_height(&env.client, 0);
 
     // mine blocks and sync up emitter
     let addr = env.client.get_new_address(None, None)?.assume_checked();
@@ -597,7 +597,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
     const MEMPOOL_TX_COUNT: usize = 21;
 
     let env = TestEnv::new()?;
-    let mut emitter = Emitter::new(&env.client, 0);
+    let mut emitter = Emitter::from_height(&env.client, 0);
 
     // mine blocks to get initial balance, sync emitter up to tip
     let addr = env.client.get_new_address(None, None)?.assume_checked();
@@ -674,7 +674,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
     const PREMINE_COUNT: usize = 101;
 
     let env = TestEnv::new()?;
-    let mut emitter = Emitter::new(&env.client, 0);
+    let mut emitter = Emitter::from_height(&env.client, 0);
 
     // mine blocks to get initial balance
     let addr = env.client.get_new_address(None, None)?.assume_checked();
@@ -789,7 +789,7 @@ fn no_agreement_point() -> anyhow::Result<()> {
     let env = TestEnv::new()?;
 
     // start height is 99
-    let mut emitter = Emitter::new(&env.client, (PREMINE_COUNT - 2) as u32);
+    let mut emitter = Emitter::from_height(&env.client, (PREMINE_COUNT - 2) as u32);
 
     // mine 101 blocks
     env.mine_blocks(PREMINE_COUNT, None)?;
index c9bcc9728374bc724aaf1d0c23bdefc5f2e0642a..ad77030aec85a7582e65f6ad2de43260cedd89a6 100644 (file)
@@ -27,8 +27,6 @@ const DB_MAGIC: &[u8] = b"bdk_example_rpc";
 const DB_PATH: &str = ".bdk_example_rpc.db";
 
 const CHANNEL_BOUND: usize = 10;
-/// The block depth which we assume no reorgs can happen at.
-const ASSUME_FINAL_DEPTH: u32 = 6;
 /// Delay for printing status to stdout.
 const STDOUT_PRINT_DELAY: Duration = Duration::from_secs(6);
 /// Delay between mempool emissions.
@@ -160,13 +158,12 @@ fn main() -> anyhow::Result<()> {
             let mut db = db.lock().unwrap();
 
             graph.index.set_lookahead_for_all(lookahead);
-            // we start at a height lower than last-seen tip in case of reorgs
-            let start_height = chain.tip().as_ref().map_or(fallback_height, |cp| {
-                cp.height().saturating_sub(ASSUME_FINAL_DEPTH)
-            });
 
             let rpc_client = rpc_args.new_client()?;
-            let mut emitter = Emitter::new(&rpc_client, start_height);
+            let mut emitter = match chain.tip() {
+                Some(cp) => Emitter::from_checkpoint(&rpc_client, cp),
+                None => Emitter::from_height(&rpc_client, fallback_height),
+            };
 
             let mut last_db_commit = Instant::now();
             let mut last_print = Instant::now();
@@ -174,7 +171,9 @@ fn main() -> anyhow::Result<()> {
             while let Some((height, block)) = emitter.next_block()? {
                 let chain_update =
                     CheckPoint::from_header(&block.header, height).into_update(false);
-                let chain_changeset = chain.apply_update(chain_update)?;
+                let chain_changeset = chain
+                    .apply_update(chain_update)
+                    .expect("must always apply as we recieve blocks in order from emitter");
                 let graph_changeset = graph.apply_block_relevant(block, height);
                 db.stage((chain_changeset, graph_changeset));
 
@@ -227,17 +226,17 @@ fn main() -> anyhow::Result<()> {
             let sigterm_flag = start_ctrlc_handler();
 
             graph.lock().unwrap().index.set_lookahead_for_all(lookahead);
-            // we start at a height lower than last-seen tip in case of reorgs
-            let start_height = chain.lock().unwrap().tip().map_or(fallback_height, |cp| {
-                cp.height().saturating_sub(ASSUME_FINAL_DEPTH)
-            });
+            let last_cp = chain.lock().unwrap().tip();
 
             let (tx, rx) = std::sync::mpsc::sync_channel::<Emission>(CHANNEL_BOUND);
             let emission_jh = std::thread::spawn(move || -> anyhow::Result<()> {
                 println!("emitter thread started...");
 
                 let rpc_client = rpc_args.new_client()?;
-                let mut emitter = Emitter::new(&rpc_client, start_height);
+                let mut emitter = match last_cp {
+                    Some(cp) => Emitter::from_checkpoint(&rpc_client, cp),
+                    None => Emitter::from_height(&rpc_client, fallback_height),
+                };
 
                 let mut block_count = rpc_client.get_block_count()? as u32;
                 tx.send(Emission::Tip(block_count))?;
@@ -284,7 +283,9 @@ fn main() -> anyhow::Result<()> {
                     Emission::Block { height, block } => {
                         let chain_update =
                             CheckPoint::from_header(&block.header, height).into_update(false);
-                        let chain_changeset = chain.apply_update(chain_update)?;
+                        let chain_changeset = chain
+                            .apply_update(chain_update)
+                            .expect("must always apply as we recieve blocks in order from emitter");
                         let graph_changeset = graph.apply_block_relevant(block, height);
                         (chain_changeset, graph_changeset)
                     }