]> Untitled Git - bdk/commitdiff
feat(bitcoind_rpc)!: Reduce friction of `Emitter` API.
author志宇 <hello@evanlinjin.me>
Thu, 1 May 2025 00:36:10 +0000 (10:36 +1000)
committerWei Chen <wzc110@gmail.com>
Thu, 1 May 2025 14:48:43 +0000 (14:48 +0000)
* Change signature of `Emitter::new` so that `expected_mempool_txids`
  can be more easily constructed from `TxGraph` methods.
* Change generic bounds of `C` within `Emitter<C>` to be `C: DeRef,
  C::Target: RpcApi`. This allows the caller to have `Arc<Client>` as
  `C` and does not force to caller to hold a lifetimed reference.

crates/bitcoind_rpc/src/lib.rs
crates/bitcoind_rpc/tests/test_emitter.rs
examples/example_bitcoind_rpc_polling/src/main.rs

index dc21997d0415da3c38d4e98c3488475b4bf8d6b9..dc35d5c123bb9d2c07442f5bfe5f39bfb42acf55 100644 (file)
@@ -4,15 +4,14 @@
 //! [`Emitter`] is the main structure which sources blockchain data from [`bitcoincore_rpc::Client`].
 //!
 //! To only get block updates (exclude mempool transactions), the caller can use
-//! [`Emitter::next_block`] or/and [`Emitter::next_header`] until it returns `Ok(None)` (which means
-//! the chain tip is reached). A separate method, [`Emitter::mempool`] can be used to emit the whole
-//! mempool.
+//! [`Emitter::next_block`] until it returns `Ok(None)` (which means the chain tip is reached). A
+//! separate method, [`Emitter::mempool`] can be used to emit the whole mempool.
 #![warn(missing_docs)]
 
 use bdk_core::{BlockId, CheckPoint};
 use bitcoin::{Block, BlockHash, Transaction, Txid};
-use bitcoincore_rpc::bitcoincore_rpc_json;
-use std::collections::HashSet;
+use bitcoincore_rpc::{bitcoincore_rpc_json, RpcApi};
+use std::{collections::HashSet, ops::Deref};
 
 pub mod bip158;
 
@@ -23,8 +22,8 @@ pub use bitcoincore_rpc;
 /// Refer to [module-level documentation] for more.
 ///
 /// [module-level documentation]: crate
-pub struct Emitter<'c, C> {
-    client: &'c C,
+pub struct Emitter<C> {
+    client: C,
     start_height: u32,
 
     /// The checkpoint of the last-emitted block that is in the best chain. If it is later found
@@ -56,7 +55,17 @@ pub struct Emitter<'c, C> {
     expected_mempool_txids: HashSet<Txid>,
 }
 
-impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
+/// Indicates that there are no initially expected mempool transactions.
+///
+/// Pass this to the `expected_mempool_txids` field of [`Emitter::new`] when the wallet is known
+/// to start empty (i.e. with no unconfirmed transactions).
+pub const NO_EXPECTED_MEMPOOL_TXIDS: core::iter::Empty<Txid> = core::iter::empty();
+
+impl<C> Emitter<C>
+where
+    C: Deref,
+    C::Target: RpcApi,
+{
     /// Construct a new [`Emitter`].
     ///
     /// `last_cp` informs the emitter of the chain we are starting off with. This way, the emitter
@@ -66,12 +75,13 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
     /// original chain).
     ///
     /// `expected_mempool_txids` is the initial set of unconfirmed txids provided by the wallet.
-    /// This allows the [`Emitter`] to inform the wallet about relevant mempool evictions.
+    /// This allows the [`Emitter`] to inform the wallet about relevant mempool evictions. If it is
+    /// known that the wallet is empty, [`NO_EXPECTED_MEMPOOL_TXIDS`] can be used.
     pub fn new(
-        client: &'c C,
+        client: C,
         last_cp: CheckPoint,
         start_height: u32,
-        expected_mempool_txids: HashSet<Txid>,
+        expected_mempool_txids: impl IntoIterator<Item = impl Into<Txid>>,
     ) -> Self {
         Self {
             client,
@@ -80,7 +90,7 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
             last_block: None,
             last_mempool_time: 0,
             last_mempool_tip: None,
-            expected_mempool_txids,
+            expected_mempool_txids: expected_mempool_txids.into_iter().map(Into::into).collect(),
         }
     }
 
@@ -102,7 +112,7 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
     /// of height `h-1`, we want to re-emit this transaction until the receiver has seen the block
     /// at height `h`.
     pub fn mempool(&mut self) -> Result<MempoolEvent, bitcoincore_rpc::Error> {
-        let client = self.client;
+        let client = &*self.client;
 
         // This is the emitted tip height during the last mempool emission.
         let prev_mempool_tip = self
@@ -204,7 +214,7 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
 
     /// Emit the next block height and block (if any).
     pub fn next_block(&mut self) -> Result<Option<BlockEvent<Block>>, bitcoincore_rpc::Error> {
-        if let Some((checkpoint, block)) = poll(self, |hash| self.client.get_block(hash))? {
+        if let Some((checkpoint, block)) = poll(self, move |hash, client| client.get_block(hash))? {
             // Stop tracking unconfirmed transactions that have been confirmed in this block.
             for tx in &block.txdata {
                 self.expected_mempool_txids.remove(&tx.compute_txid());
@@ -247,7 +257,7 @@ impl MempoolEvent {
 /// A newly emitted block from [`Emitter`].
 #[derive(Debug)]
 pub struct BlockEvent<B> {
-    /// Either a full [`Block`] or [`Header`] of the new block.
+    /// The block.
     pub block: B,
 
     /// The checkpoint of the new block.
@@ -299,9 +309,10 @@ enum PollResponse {
 
 fn poll_once<C>(emitter: &Emitter<C>) -> Result<PollResponse, bitcoincore_rpc::Error>
 where
-    C: bitcoincore_rpc::RpcApi,
+    C: Deref,
+    C::Target: RpcApi,
 {
-    let client = emitter.client;
+    let client = &*emitter.client;
 
     if let Some(last_res) = &emitter.last_block {
         let next_hash = if last_res.height < emitter.start_height as _ {
@@ -355,15 +366,16 @@ fn poll<C, V, F>(
     get_item: F,
 ) -> Result<Option<(CheckPoint, V)>, bitcoincore_rpc::Error>
 where
-    C: bitcoincore_rpc::RpcApi,
-    F: Fn(&BlockHash) -> Result<V, bitcoincore_rpc::Error>,
+    C: Deref,
+    C::Target: RpcApi,
+    F: Fn(&BlockHash, &C::Target) -> Result<V, bitcoincore_rpc::Error>,
 {
     loop {
         match poll_once(emitter)? {
             PollResponse::Block(res) => {
                 let height = res.height as u32;
                 let hash = res.hash;
-                let item = get_item(&hash)?;
+                let item = get_item(&hash, &emitter.client)?;
 
                 let new_cp = emitter
                     .last_cp
@@ -432,11 +444,10 @@ impl BitcoindRpcErrorExt for bitcoincore_rpc::Error {
 
 #[cfg(test)]
 mod test {
-    use crate::{bitcoincore_rpc::RpcApi, Emitter};
-    use bdk_bitcoind_rpc::bitcoincore_rpc::bitcoin::Txid;
+    use crate::{bitcoincore_rpc::RpcApi, Emitter, NO_EXPECTED_MEMPOOL_TXIDS};
     use bdk_chain::local_chain::LocalChain;
     use bdk_testenv::{anyhow, TestEnv};
-    use bitcoin::{hashes::Hash, Address, Amount, ScriptBuf, WScriptHash};
+    use bitcoin::{hashes::Hash, Address, Amount, ScriptBuf, Txid, WScriptHash};
     use std::collections::HashSet;
 
     #[test]
@@ -444,7 +455,12 @@ mod test {
         let env = TestEnv::new()?;
         let chain = LocalChain::from_genesis_hash(env.rpc_client().get_block_hash(0)?).0;
         let chain_tip = chain.tip();
-        let mut emitter = Emitter::new(env.rpc_client(), chain_tip.clone(), 1, HashSet::new());
+        let mut emitter = Emitter::new(
+            env.rpc_client(),
+            chain_tip.clone(),
+            1,
+            NO_EXPECTED_MEMPOOL_TXIDS,
+        );
 
         env.mine_blocks(100, None)?;
         while emitter.next_block()?.is_some() {}
index 360a1f435eab87ddc8b39ad4ec1b2a1d3457dfe2..c6b0c86ac5349397f522f19c2e23447920d105ae 100644 (file)
@@ -1,6 +1,9 @@
-use std::collections::{BTreeMap, BTreeSet, HashSet};
+use std::{
+    collections::{BTreeMap, BTreeSet, HashSet},
+    ops::Deref,
+};
 
-use bdk_bitcoind_rpc::Emitter;
+use bdk_bitcoind_rpc::{Emitter, NO_EXPECTED_MEMPOOL_TXIDS};
 use bdk_chain::{
     bitcoin::{Address, Amount, Txid},
     local_chain::{CheckPoint, LocalChain},
@@ -22,7 +25,12 @@ pub fn test_sync_local_chain() -> anyhow::Result<()> {
     let env = TestEnv::new()?;
     let network_tip = env.rpc_client().get_block_count()?;
     let (mut local_chain, _) = LocalChain::from_genesis_hash(env.rpc_client().get_block_hash(0)?);
-    let mut emitter = Emitter::new(env.rpc_client(), local_chain.tip(), 0, HashSet::new());
+    let mut emitter = Emitter::new(
+        env.rpc_client(),
+        local_chain.tip(),
+        0,
+        NO_EXPECTED_MEMPOOL_TXIDS,
+    );
 
     // Mine some blocks and return the actual block hashes.
     // Because initializing `ElectrsD` already mines some blocks, we must include those too when
@@ -156,7 +164,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
         index
     });
 
-    let emitter = &mut Emitter::new(env.rpc_client(), chain.tip(), 0, HashSet::new());
+    let emitter = &mut Emitter::new(env.rpc_client(), chain.tip(), 0, NO_EXPECTED_MEMPOOL_TXIDS);
 
     while let Some(emission) = emitter.next_block()? {
         let height = emission.block_height();
@@ -252,7 +260,7 @@ fn ensure_block_emitted_after_reorg_is_at_reorg_height() -> anyhow::Result<()> {
             hash: env.rpc_client().get_block_hash(0)?,
         }),
         EMITTER_START_HEIGHT as _,
-        HashSet::new(),
+        NO_EXPECTED_MEMPOOL_TXIDS,
     );
 
     env.mine_blocks(CHAIN_TIP_HEIGHT, None)?;
@@ -292,7 +300,8 @@ fn sync_from_emitter<C>(
     emitter: &mut Emitter<C>,
 ) -> anyhow::Result<()>
 where
-    C: bitcoincore_rpc::RpcApi,
+    C: Deref,
+    C::Target: bitcoincore_rpc::RpcApi,
 {
     while let Some(emission) = emitter.next_block()? {
         let height = emission.block_height();
@@ -333,7 +342,7 @@ fn tx_can_become_unconfirmed_after_reorg() -> anyhow::Result<()> {
             hash: env.rpc_client().get_block_hash(0)?,
         }),
         0,
-        HashSet::new(),
+        NO_EXPECTED_MEMPOOL_TXIDS,
     );
 
     // setup addresses
@@ -425,7 +434,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
             hash: env.rpc_client().get_block_hash(0)?,
         }),
         0,
-        HashSet::new(),
+        NO_EXPECTED_MEMPOOL_TXIDS,
     );
 
     // mine blocks and sync up emitter
@@ -492,7 +501,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
             hash: env.rpc_client().get_block_hash(0)?,
         }),
         0,
-        HashSet::new(),
+        NO_EXPECTED_MEMPOOL_TXIDS,
     );
 
     // mine blocks to get initial balance, sync emitter up to tip
@@ -584,7 +593,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
             hash: env.rpc_client().get_block_hash(0)?,
         }),
         0,
-        HashSet::new(),
+        NO_EXPECTED_MEMPOOL_TXIDS,
     );
 
     // mine blocks to get initial balance
@@ -712,7 +721,7 @@ fn no_agreement_point() -> anyhow::Result<()> {
             hash: env.rpc_client().get_block_hash(0)?,
         }),
         (PREMINE_COUNT - 2) as u32,
-        HashSet::new(),
+        NO_EXPECTED_MEMPOOL_TXIDS,
     );
 
     // mine 101 blocks
index 8cbe1939d6651bba54eb5b7f8c333de2943d0091..51e5bc23b71586f079099681ae72b155e6df1efc 100644 (file)
@@ -1,5 +1,4 @@
 use std::{
-    collections::HashSet,
     path::PathBuf,
     sync::{
         atomic::{AtomicBool, Ordering},
@@ -137,9 +136,24 @@ fn main() -> anyhow::Result<()> {
                 fallback_height, ..
             } = rpc_args;
 
-            let chain_tip = chain.lock().unwrap().tip();
             let rpc_client = rpc_args.new_client()?;
-            let mut emitter = Emitter::new(&rpc_client, chain_tip, fallback_height, HashSet::new());
+            let mut emitter = {
+                let chain = chain.lock().unwrap();
+                let graph = graph.lock().unwrap();
+                Emitter::new(
+                    &rpc_client,
+                    chain.tip(),
+                    fallback_height,
+                    graph
+                        .graph()
+                        .list_canonical_txs(
+                            &*chain,
+                            chain.tip().block_id(),
+                            CanonicalizationParams::default(),
+                        )
+                        .filter(|tx| tx.chain_position.is_unconfirmed()),
+                )
+            };
             let mut db_stage = ChangeSet::default();
 
             let mut last_db_commit = Instant::now();
@@ -222,7 +236,24 @@ fn main() -> anyhow::Result<()> {
             } = rpc_args;
             let sigterm_flag = start_ctrlc_handler();
 
-            let last_cp = chain.lock().unwrap().tip();
+            let rpc_client = Arc::new(rpc_args.new_client()?);
+            let mut emitter = {
+                let chain = chain.lock().unwrap();
+                let graph = graph.lock().unwrap();
+                Emitter::new(
+                    rpc_client.clone(),
+                    chain.tip(),
+                    fallback_height,
+                    graph
+                        .graph()
+                        .list_canonical_txs(
+                            &*chain,
+                            chain.tip().block_id(),
+                            CanonicalizationParams::default(),
+                        )
+                        .filter(|tx| tx.chain_position.is_unconfirmed()),
+                )
+            };
 
             println!(
                 "[{:>10}s] starting emitter thread...",
@@ -230,10 +261,6 @@ fn main() -> anyhow::Result<()> {
             );
             let (tx, rx) = std::sync::mpsc::sync_channel::<Emission>(CHANNEL_BOUND);
             let emission_jh = std::thread::spawn(move || -> anyhow::Result<()> {
-                let rpc_client = rpc_args.new_client()?;
-                let mut emitter =
-                    Emitter::new(&rpc_client, last_cp, fallback_height, HashSet::new());
-
                 let mut block_count = rpc_client.get_block_count()? as u32;
                 tx.send(Emission::Tip(block_count))?;