]> Untitled Git - bdk/commitdiff
feat(bitcoind_rpc)!: emissions include checkpoint and connected_to data
author志宇 <hello@evanlinjin.me>
Sat, 30 Dec 2023 12:48:20 +0000 (20:48 +0800)
committer志宇 <hello@evanlinjin.me>
Mon, 15 Jan 2024 16:27:02 +0000 (00:27 +0800)
Previously, emissions are purely blocks + the block height. This means
emitted blocks can only connect to previous-adjacent blocks. Hence, sync
must start from genesis and include every block.

crates/bitcoind_rpc/src/lib.rs
crates/bitcoind_rpc/tests/test_emitter.rs
example-crates/example_bitcoind_rpc_polling/src/main.rs

index e790b8a8ed08c96c2be1676327b9e618054d9d72..ce5e863bb0773f8c708276cd8754cc74262a8cad 100644 (file)
@@ -43,11 +43,13 @@ pub struct Emitter<'c, C> {
 }
 
 impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
-    /// Construct a new [`Emitter`] with the given RPC `client`, `last_cp` and `start_height`.
+    /// Construct a new [`Emitter`].
     ///
-    /// * `last_cp` is the check point used to find the latest block which is still part of the best
-    ///   chain.
-    /// * `start_height` is the block height to start emitting blocks from.
+    /// `last_cp` informs the emitter of the chain we are starting off with. This way, the emitter
+    /// can start emission from a block that connects to the original chain.
+    ///
+    /// `start_height` starts emission from a given height (if there are no conflicts with the
+    /// original chain).
     pub fn new(client: &'c C, last_cp: CheckPoint, start_height: u32) -> Self {
         Self {
             client,
@@ -127,13 +129,58 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
     }
 
     /// Emit the next block height and header (if any).
-    pub fn next_header(&mut self) -> Result<Option<(u32, Header)>, bitcoincore_rpc::Error> {
-        poll(self, |hash| self.client.get_block_header(hash))
+    pub fn next_header(&mut self) -> Result<Option<BlockEvent<Header>>, bitcoincore_rpc::Error> {
+        Ok(poll(self, |hash| self.client.get_block_header(hash))?
+            .map(|(checkpoint, block)| BlockEvent { block, checkpoint }))
     }
 
     /// Emit the next block height and block (if any).
-    pub fn next_block(&mut self) -> Result<Option<(u32, Block)>, bitcoincore_rpc::Error> {
-        poll(self, |hash| self.client.get_block(hash))
+    pub fn next_block(&mut self) -> Result<Option<BlockEvent<Block>>, bitcoincore_rpc::Error> {
+        Ok(poll(self, |hash| self.client.get_block(hash))?
+            .map(|(checkpoint, block)| BlockEvent { block, checkpoint }))
+    }
+}
+
+/// A newly emitted block from [`Emitter`].
+#[derive(Debug)]
+pub struct BlockEvent<B> {
+    /// Either a full [`Block`] or [`Header`] of the new block.
+    pub block: B,
+
+    /// The checkpoint of the new block.
+    ///
+    /// A [`CheckPoint`] is a node of a linked list of [`BlockId`]s. This checkpoint is linked to
+    /// all [`BlockId`]s originally passed in [`Emitter::new`] as well as emitted blocks since then.
+    /// These blocks are guaranteed to be of the same chain.
+    ///
+    /// This is important as BDK structures require block-to-apply to be connected with another
+    /// block in the original chain.
+    pub checkpoint: CheckPoint,
+}
+
+impl<B> BlockEvent<B> {
+    /// The block height of this new block.
+    pub fn block_height(&self) -> u32 {
+        self.checkpoint.height()
+    }
+
+    /// The block hash of this new block.
+    pub fn block_hash(&self) -> BlockHash {
+        self.checkpoint.hash()
+    }
+
+    /// The [`BlockId`] of a previous block that this block connects to.
+    ///
+    /// This either returns a [`BlockId`] of a previously emitted block or from the chain we started
+    /// with (passed in as `last_cp` in [`Emitter::new`]).
+    ///
+    /// This value is derived from [`BlockEvent::checkpoint`].
+    pub fn connected_to(&self) -> BlockId {
+        match self.checkpoint.prev() {
+            Some(prev_cp) => prev_cp.block_id(),
+            // there is no previous checkpoint, so just connect with itself
+            None => self.checkpoint.block_id(),
+        }
     }
 }
 
@@ -203,7 +250,7 @@ where
 fn poll<C, V, F>(
     emitter: &mut Emitter<C>,
     get_item: F,
-) -> Result<Option<(u32, V)>, bitcoincore_rpc::Error>
+) -> Result<Option<(CheckPoint, V)>, bitcoincore_rpc::Error>
 where
     C: bitcoincore_rpc::RpcApi,
     F: Fn(&BlockHash) -> Result<V, bitcoincore_rpc::Error>,
@@ -215,13 +262,14 @@ where
                 let hash = res.hash;
                 let item = get_item(&hash)?;
 
-                emitter.last_cp = emitter
+                let new_cp = emitter
                     .last_cp
                     .clone()
                     .push(BlockId { height, hash })
                     .expect("must push");
+                emitter.last_cp = new_cp.clone();
                 emitter.last_block = Some(res);
-                return Ok(Some((height, item)));
+                return Ok(Some((new_cp, item)));
             }
             PollResponse::NoMoreBlocks => {
                 emitter.last_block = None;
index 521124e5d80804b6c95b57e9e6abe84a892a8e11..384df92d069068984dab6acbae1f0ea8dd5f687d 100644 (file)
@@ -157,28 +157,6 @@ impl TestEnv {
     }
 }
 
-fn block_to_chain_update(block: &bitcoin::Block, height: u32) -> local_chain::Update {
-    let this_id = BlockId {
-        height,
-        hash: block.block_hash(),
-    };
-    let tip = if block.header.prev_blockhash == BlockHash::all_zeros() {
-        CheckPoint::new(this_id)
-    } else {
-        CheckPoint::new(BlockId {
-            height: height - 1,
-            hash: block.header.prev_blockhash,
-        })
-        .extend(core::iter::once(this_id))
-        .expect("must construct checkpoint")
-    };
-
-    local_chain::Update {
-        tip,
-        introduce_older_blocks: false,
-    }
-}
-
 /// Ensure that blocks are emitted in order even after reorg.
 ///
 /// 1. Mine 101 blocks.
@@ -200,17 +178,21 @@ pub fn test_sync_local_chain() -> anyhow::Result<()> {
 
     // see if the emitter outputs the right blocks
     println!("first sync:");
-    while let Some((height, block)) = emitter.next_block()? {
+    while let Some(emission) = emitter.next_block()? {
+        let height = emission.block_height();
+        let hash = emission.block_hash();
         assert_eq!(
-            block.block_hash(),
+            emission.block_hash(),
             exp_hashes[height as usize],
             "emitted block hash is unexpected"
         );
 
-        let chain_update = block_to_chain_update(&block, height);
         assert_eq!(
-            local_chain.apply_update(chain_update)?,
-            BTreeMap::from([(height, Some(block.block_hash()))]),
+            local_chain.apply_update(local_chain::Update {
+                tip: emission.checkpoint,
+                introduce_older_blocks: false,
+            })?,
+            BTreeMap::from([(height, Some(hash))]),
             "chain update changeset is unexpected",
         );
     }
@@ -237,27 +219,30 @@ pub fn test_sync_local_chain() -> anyhow::Result<()> {
     // see if the emitter outputs the right blocks
     println!("after reorg:");
     let mut exp_height = exp_hashes.len() - reorged_blocks.len();
-    while let Some((height, block)) = emitter.next_block()? {
+    while let Some(emission) = emitter.next_block()? {
+        let height = emission.block_height();
+        let hash = emission.block_hash();
         assert_eq!(
             height, exp_height as u32,
             "emitted block has unexpected height"
         );
 
         assert_eq!(
-            block.block_hash(),
-            exp_hashes[height as usize],
+            hash, exp_hashes[height as usize],
             "emitted block is unexpected"
         );
 
-        let chain_update = block_to_chain_update(&block, height);
         assert_eq!(
-            local_chain.apply_update(chain_update)?,
+            local_chain.apply_update(local_chain::Update {
+                tip: emission.checkpoint,
+                introduce_older_blocks: false,
+            })?,
             if exp_height == exp_hashes.len() - reorged_blocks.len() {
-                core::iter::once((height, Some(block.block_hash())))
+                core::iter::once((height, Some(hash)))
                     .chain((height + 1..exp_hashes.len() as u32).map(|h| (h, None)))
                     .collect::<bdk_chain::local_chain::ChangeSet>()
             } else {
-                BTreeMap::from([(height, Some(block.block_hash()))])
+                BTreeMap::from([(height, Some(hash))])
             },
             "chain update changeset is unexpected",
         );
@@ -307,9 +292,13 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
 
     let emitter = &mut Emitter::new(&env.client, chain.tip(), 0);
 
-    while let Some((height, block)) = emitter.next_block()? {
-        let _ = chain.apply_update(block_to_chain_update(&block, height))?;
-        let indexed_additions = indexed_tx_graph.apply_block_relevant(block, height);
+    while let Some(emission) = emitter.next_block()? {
+        let height = emission.block_height();
+        let _ = chain.apply_update(local_chain::Update {
+            tip: emission.checkpoint,
+            introduce_older_blocks: false,
+        })?;
+        let indexed_additions = indexed_tx_graph.apply_block_relevant(emission.block, height);
         assert!(indexed_additions.is_empty());
     }
 
@@ -367,10 +356,13 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
 
     // must receive mined block which will confirm the transactions.
     {
-        let (height, block) = emitter.next_block()?.expect("must get mined block");
-        let _ = chain
-            .apply_update(CheckPoint::from_header(&block.header, height).into_update(false))?;
-        let indexed_additions = indexed_tx_graph.apply_block_relevant(block, height);
+        let emission = emitter.next_block()?.expect("must get mined block");
+        let height = emission.block_height();
+        let _ = chain.apply_update(local_chain::Update {
+            tip: emission.checkpoint,
+            introduce_older_blocks: false,
+        })?;
+        let indexed_additions = indexed_tx_graph.apply_block_relevant(emission.block, height);
         assert!(indexed_additions.graph.txs.is_empty());
         assert!(indexed_additions.graph.txouts.is_empty());
         assert_eq!(indexed_additions.graph.anchors, exp_anchors);
@@ -407,9 +399,12 @@ fn ensure_block_emitted_after_reorg_is_at_reorg_height() -> anyhow::Result<()> {
 
     for reorg_count in 1..=10 {
         let replaced_blocks = env.reorg_empty_blocks(reorg_count)?;
-        let (height, next_header) = emitter.next_header()?.expect("must emit block after reorg");
+        let next_emission = emitter.next_header()?.expect("must emit block after reorg");
         assert_eq!(
-            (height as usize, next_header.block_hash()),
+            (
+                next_emission.block_height() as usize,
+                next_emission.block_hash()
+            ),
             replaced_blocks[0],
             "block emitted after reorg should be at the reorg height"
         );
@@ -439,8 +434,9 @@ fn sync_from_emitter<C>(
 where
     C: bitcoincore_rpc::RpcApi,
 {
-    while let Some((height, block)) = emitter.next_block()? {
-        process_block(recv_chain, recv_graph, block, height)?;
+    while let Some(emission) = emitter.next_block()? {
+        let height = emission.block_height();
+        process_block(recv_chain, recv_graph, emission.block, height)?;
     }
     Ok(())
 }
@@ -660,7 +656,8 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
 
     // At this point, the emitter has seen all mempool transactions. It should only re-emit those
     // that have introduction heights less than the emitter's last-emitted block tip.
-    while let Some((height, _)) = emitter.next_header()? {
+    while let Some(emission) = emitter.next_header()? {
+        let height = emission.block_height();
         // We call `mempool()` twice.
         // The second call (at height `h`) should skip the tx introduced at height `h`.
         for try_index in 0..2 {
@@ -754,7 +751,8 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
             .collect::<BTreeMap<_, _>>());
 
         // `next_header` emits the replacement block of the reorg
-        if let Some((height, _)) = emitter.next_header()? {
+        if let Some(emission) = emitter.next_header()? {
+            let height = emission.block_height();
             println!("\t- replacement height: {}", height);
 
             // the mempool emission (that follows the first block emission after reorg) should only
@@ -835,12 +833,12 @@ fn no_agreement_point() -> anyhow::Result<()> {
     env.mine_blocks(PREMINE_COUNT, None)?;
 
     // emit block 99a
-    let (_, block_header_99a) = emitter.next_header()?.expect("block 99a header");
+    let block_header_99a = emitter.next_header()?.expect("block 99a header").block;
     let block_hash_99a = block_header_99a.block_hash();
     let block_hash_98a = block_header_99a.prev_blockhash;
 
     // emit block 100a
-    let (_, block_header_100a) = emitter.next_header()?.expect("block 100a header");
+    let block_header_100a = emitter.next_header()?.expect("block 100a header").block;
     let block_hash_100a = block_header_100a.block_hash();
 
     // get hash for block 101a
@@ -855,7 +853,7 @@ fn no_agreement_point() -> anyhow::Result<()> {
     env.mine_blocks(3, None)?;
 
     // emit block header 99b
-    let (_, block_header_99b) = emitter.next_header()?.expect("block 99b header");
+    let block_header_99b = emitter.next_header()?.expect("block 99b header").block;
     let block_hash_99b = block_header_99b.block_hash();
     let block_hash_98b = block_header_99b.prev_blockhash;
 
index 449242e41fccf543ceaf1e9c5254b024a8917deb..648962c284522ffa0708da29ce74f74083918527 100644 (file)
@@ -14,7 +14,7 @@ use bdk_bitcoind_rpc::{
 use bdk_chain::{
     bitcoin::{constants::genesis_block, Block, Transaction},
     indexed_tx_graph, keychain,
-    local_chain::{self, CheckPoint, LocalChain},
+    local_chain::{self, LocalChain},
     ConfirmationTimeHeightAnchor, IndexedTxGraph,
 };
 use example_cli::{
@@ -42,7 +42,7 @@ type ChangeSet = (
 
 #[derive(Debug)]
 enum Emission {
-    Block { height: u32, block: Block },
+    Block(bdk_bitcoind_rpc::BlockEvent<Block>),
     Mempool(Vec<(Transaction, u64)>),
     Tip(u32),
 }
@@ -178,17 +178,20 @@ fn main() -> anyhow::Result<()> {
             let mut last_db_commit = Instant::now();
             let mut last_print = Instant::now();
 
-            while let Some((height, block)) = emitter.next_block()? {
+            while let Some(emission) = emitter.next_block()? {
+                let height = emission.block_height();
+
                 let mut chain = chain.lock().unwrap();
                 let mut graph = graph.lock().unwrap();
                 let mut db = db.lock().unwrap();
 
-                let chain_update =
-                    CheckPoint::from_header(&block.header, height).into_update(false);
                 let chain_changeset = chain
-                    .apply_update(chain_update)
+                    .apply_update(local_chain::Update {
+                        tip: emission.checkpoint,
+                        introduce_older_blocks: false,
+                    })
                     .expect("must always apply as we receive blocks in order from emitter");
-                let graph_changeset = graph.apply_block_relevant(block, height);
+                let graph_changeset = graph.apply_block_relevant(emission.block, height);
                 db.stage((chain_changeset, graph_changeset));
 
                 // commit staged db changes in intervals
@@ -256,7 +259,8 @@ fn main() -> anyhow::Result<()> {
 
                 loop {
                     match emitter.next_block()? {
-                        Some((height, block)) => {
+                        Some(block_emission) => {
+                            let height = block_emission.block_height();
                             if sigterm_flag.load(Ordering::Acquire) {
                                 break;
                             }
@@ -264,7 +268,7 @@ fn main() -> anyhow::Result<()> {
                                 block_count = rpc_client.get_block_count()? as u32;
                                 tx.send(Emission::Tip(block_count))?;
                             }
-                            tx.send(Emission::Block { height, block })?;
+                            tx.send(Emission::Block(block_emission))?;
                         }
                         None => {
                             if await_flag(&sigterm_flag, MEMPOOL_EMIT_DELAY) {
@@ -293,13 +297,17 @@ fn main() -> anyhow::Result<()> {
                 let mut chain = chain.lock().unwrap();
 
                 let changeset = match emission {
-                    Emission::Block { height, block } => {
-                        let chain_update =
-                            CheckPoint::from_header(&block.header, height).into_update(false);
+                    Emission::Block(block_emission) => {
+                        let height = block_emission.block_height();
+                        let chain_update = local_chain::Update {
+                            tip: block_emission.checkpoint,
+                            introduce_older_blocks: false,
+                        };
                         let chain_changeset = chain
                             .apply_update(chain_update)
                             .expect("must always apply as we receive blocks in order from emitter");
-                        let graph_changeset = graph.apply_block_relevant(block, height);
+                        let graph_changeset =
+                            graph.apply_block_relevant(block_emission.block, height);
                         (chain_changeset, graph_changeset)
                     }
                     Emission::Mempool(mempool_txs) => {