From: 志宇 Date: Thu, 1 May 2025 00:36:10 +0000 (+1000) Subject: feat(bitcoind_rpc)!: Reduce friction of `Emitter` API. X-Git-Tag: core-0.5.0~1^2 X-Git-Url: http://internal-gitweb-vhost/script/%22https:/database/scripts/enum.FromScriptError.html?a=commitdiff_plain;h=8513d83a2ef2fe748e9007984187a7f08d256ee6;p=bdk feat(bitcoind_rpc)!: Reduce friction of `Emitter` API. * Change signature of `Emitter::new` so that `expected_mempool_txids` can be more easily constructed from `TxGraph` methods. * Change generic bounds of `C` within `Emitter` to be `C: DeRef, C::Target: RpcApi`. This allows the caller to have `Arc` as `C` and does not force to caller to hold a lifetimed reference. --- diff --git a/crates/bitcoind_rpc/src/lib.rs b/crates/bitcoind_rpc/src/lib.rs index dc21997d..dc35d5c1 100644 --- a/crates/bitcoind_rpc/src/lib.rs +++ b/crates/bitcoind_rpc/src/lib.rs @@ -4,15 +4,14 @@ //! [`Emitter`] is the main structure which sources blockchain data from [`bitcoincore_rpc::Client`]. //! //! To only get block updates (exclude mempool transactions), the caller can use -//! [`Emitter::next_block`] or/and [`Emitter::next_header`] until it returns `Ok(None)` (which means -//! the chain tip is reached). A separate method, [`Emitter::mempool`] can be used to emit the whole -//! mempool. +//! [`Emitter::next_block`] until it returns `Ok(None)` (which means the chain tip is reached). A +//! separate method, [`Emitter::mempool`] can be used to emit the whole mempool. #![warn(missing_docs)] use bdk_core::{BlockId, CheckPoint}; use bitcoin::{Block, BlockHash, Transaction, Txid}; -use bitcoincore_rpc::bitcoincore_rpc_json; -use std::collections::HashSet; +use bitcoincore_rpc::{bitcoincore_rpc_json, RpcApi}; +use std::{collections::HashSet, ops::Deref}; pub mod bip158; @@ -23,8 +22,8 @@ pub use bitcoincore_rpc; /// Refer to [module-level documentation] for more. /// /// [module-level documentation]: crate -pub struct Emitter<'c, C> { - client: &'c C, +pub struct Emitter { + client: C, start_height: u32, /// The checkpoint of the last-emitted block that is in the best chain. If it is later found @@ -56,7 +55,17 @@ pub struct Emitter<'c, C> { expected_mempool_txids: HashSet, } -impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { +/// Indicates that there are no initially expected mempool transactions. +/// +/// Pass this to the `expected_mempool_txids` field of [`Emitter::new`] when the wallet is known +/// to start empty (i.e. with no unconfirmed transactions). +pub const NO_EXPECTED_MEMPOOL_TXIDS: core::iter::Empty = core::iter::empty(); + +impl Emitter +where + C: Deref, + C::Target: RpcApi, +{ /// Construct a new [`Emitter`]. /// /// `last_cp` informs the emitter of the chain we are starting off with. This way, the emitter @@ -66,12 +75,13 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { /// original chain). /// /// `expected_mempool_txids` is the initial set of unconfirmed txids provided by the wallet. - /// This allows the [`Emitter`] to inform the wallet about relevant mempool evictions. + /// This allows the [`Emitter`] to inform the wallet about relevant mempool evictions. If it is + /// known that the wallet is empty, [`NO_EXPECTED_MEMPOOL_TXIDS`] can be used. pub fn new( - client: &'c C, + client: C, last_cp: CheckPoint, start_height: u32, - expected_mempool_txids: HashSet, + expected_mempool_txids: impl IntoIterator>, ) -> Self { Self { client, @@ -80,7 +90,7 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { last_block: None, last_mempool_time: 0, last_mempool_tip: None, - expected_mempool_txids, + expected_mempool_txids: expected_mempool_txids.into_iter().map(Into::into).collect(), } } @@ -102,7 +112,7 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { /// of height `h-1`, we want to re-emit this transaction until the receiver has seen the block /// at height `h`. pub fn mempool(&mut self) -> Result { - let client = self.client; + let client = &*self.client; // This is the emitted tip height during the last mempool emission. let prev_mempool_tip = self @@ -204,7 +214,7 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { /// Emit the next block height and block (if any). pub fn next_block(&mut self) -> Result>, bitcoincore_rpc::Error> { - if let Some((checkpoint, block)) = poll(self, |hash| self.client.get_block(hash))? { + if let Some((checkpoint, block)) = poll(self, move |hash, client| client.get_block(hash))? { // Stop tracking unconfirmed transactions that have been confirmed in this block. for tx in &block.txdata { self.expected_mempool_txids.remove(&tx.compute_txid()); @@ -247,7 +257,7 @@ impl MempoolEvent { /// A newly emitted block from [`Emitter`]. #[derive(Debug)] pub struct BlockEvent { - /// Either a full [`Block`] or [`Header`] of the new block. + /// The block. pub block: B, /// The checkpoint of the new block. @@ -299,9 +309,10 @@ enum PollResponse { fn poll_once(emitter: &Emitter) -> Result where - C: bitcoincore_rpc::RpcApi, + C: Deref, + C::Target: RpcApi, { - let client = emitter.client; + let client = &*emitter.client; if let Some(last_res) = &emitter.last_block { let next_hash = if last_res.height < emitter.start_height as _ { @@ -355,15 +366,16 @@ fn poll( get_item: F, ) -> Result, bitcoincore_rpc::Error> where - C: bitcoincore_rpc::RpcApi, - F: Fn(&BlockHash) -> Result, + C: Deref, + C::Target: RpcApi, + F: Fn(&BlockHash, &C::Target) -> Result, { loop { match poll_once(emitter)? { PollResponse::Block(res) => { let height = res.height as u32; let hash = res.hash; - let item = get_item(&hash)?; + let item = get_item(&hash, &emitter.client)?; let new_cp = emitter .last_cp @@ -432,11 +444,10 @@ impl BitcoindRpcErrorExt for bitcoincore_rpc::Error { #[cfg(test)] mod test { - use crate::{bitcoincore_rpc::RpcApi, Emitter}; - use bdk_bitcoind_rpc::bitcoincore_rpc::bitcoin::Txid; + use crate::{bitcoincore_rpc::RpcApi, Emitter, NO_EXPECTED_MEMPOOL_TXIDS}; use bdk_chain::local_chain::LocalChain; use bdk_testenv::{anyhow, TestEnv}; - use bitcoin::{hashes::Hash, Address, Amount, ScriptBuf, WScriptHash}; + use bitcoin::{hashes::Hash, Address, Amount, ScriptBuf, Txid, WScriptHash}; use std::collections::HashSet; #[test] @@ -444,7 +455,12 @@ mod test { let env = TestEnv::new()?; let chain = LocalChain::from_genesis_hash(env.rpc_client().get_block_hash(0)?).0; let chain_tip = chain.tip(); - let mut emitter = Emitter::new(env.rpc_client(), chain_tip.clone(), 1, HashSet::new()); + let mut emitter = Emitter::new( + env.rpc_client(), + chain_tip.clone(), + 1, + NO_EXPECTED_MEMPOOL_TXIDS, + ); env.mine_blocks(100, None)?; while emitter.next_block()?.is_some() {} diff --git a/crates/bitcoind_rpc/tests/test_emitter.rs b/crates/bitcoind_rpc/tests/test_emitter.rs index 360a1f43..c6b0c86a 100644 --- a/crates/bitcoind_rpc/tests/test_emitter.rs +++ b/crates/bitcoind_rpc/tests/test_emitter.rs @@ -1,6 +1,9 @@ -use std::collections::{BTreeMap, BTreeSet, HashSet}; +use std::{ + collections::{BTreeMap, BTreeSet, HashSet}, + ops::Deref, +}; -use bdk_bitcoind_rpc::Emitter; +use bdk_bitcoind_rpc::{Emitter, NO_EXPECTED_MEMPOOL_TXIDS}; use bdk_chain::{ bitcoin::{Address, Amount, Txid}, local_chain::{CheckPoint, LocalChain}, @@ -22,7 +25,12 @@ pub fn test_sync_local_chain() -> anyhow::Result<()> { let env = TestEnv::new()?; let network_tip = env.rpc_client().get_block_count()?; let (mut local_chain, _) = LocalChain::from_genesis_hash(env.rpc_client().get_block_hash(0)?); - let mut emitter = Emitter::new(env.rpc_client(), local_chain.tip(), 0, HashSet::new()); + let mut emitter = Emitter::new( + env.rpc_client(), + local_chain.tip(), + 0, + NO_EXPECTED_MEMPOOL_TXIDS, + ); // Mine some blocks and return the actual block hashes. // Because initializing `ElectrsD` already mines some blocks, we must include those too when @@ -156,7 +164,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> { index }); - let emitter = &mut Emitter::new(env.rpc_client(), chain.tip(), 0, HashSet::new()); + let emitter = &mut Emitter::new(env.rpc_client(), chain.tip(), 0, NO_EXPECTED_MEMPOOL_TXIDS); while let Some(emission) = emitter.next_block()? { let height = emission.block_height(); @@ -252,7 +260,7 @@ fn ensure_block_emitted_after_reorg_is_at_reorg_height() -> anyhow::Result<()> { hash: env.rpc_client().get_block_hash(0)?, }), EMITTER_START_HEIGHT as _, - HashSet::new(), + NO_EXPECTED_MEMPOOL_TXIDS, ); env.mine_blocks(CHAIN_TIP_HEIGHT, None)?; @@ -292,7 +300,8 @@ fn sync_from_emitter( emitter: &mut Emitter, ) -> anyhow::Result<()> where - C: bitcoincore_rpc::RpcApi, + C: Deref, + C::Target: bitcoincore_rpc::RpcApi, { while let Some(emission) = emitter.next_block()? { let height = emission.block_height(); @@ -333,7 +342,7 @@ fn tx_can_become_unconfirmed_after_reorg() -> anyhow::Result<()> { hash: env.rpc_client().get_block_hash(0)?, }), 0, - HashSet::new(), + NO_EXPECTED_MEMPOOL_TXIDS, ); // setup addresses @@ -425,7 +434,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> { hash: env.rpc_client().get_block_hash(0)?, }), 0, - HashSet::new(), + NO_EXPECTED_MEMPOOL_TXIDS, ); // mine blocks and sync up emitter @@ -492,7 +501,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<() hash: env.rpc_client().get_block_hash(0)?, }), 0, - HashSet::new(), + NO_EXPECTED_MEMPOOL_TXIDS, ); // mine blocks to get initial balance, sync emitter up to tip @@ -584,7 +593,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> { hash: env.rpc_client().get_block_hash(0)?, }), 0, - HashSet::new(), + NO_EXPECTED_MEMPOOL_TXIDS, ); // mine blocks to get initial balance @@ -712,7 +721,7 @@ fn no_agreement_point() -> anyhow::Result<()> { hash: env.rpc_client().get_block_hash(0)?, }), (PREMINE_COUNT - 2) as u32, - HashSet::new(), + NO_EXPECTED_MEMPOOL_TXIDS, ); // mine 101 blocks diff --git a/examples/example_bitcoind_rpc_polling/src/main.rs b/examples/example_bitcoind_rpc_polling/src/main.rs index 8cbe1939..51e5bc23 100644 --- a/examples/example_bitcoind_rpc_polling/src/main.rs +++ b/examples/example_bitcoind_rpc_polling/src/main.rs @@ -1,5 +1,4 @@ use std::{ - collections::HashSet, path::PathBuf, sync::{ atomic::{AtomicBool, Ordering}, @@ -137,9 +136,24 @@ fn main() -> anyhow::Result<()> { fallback_height, .. } = rpc_args; - let chain_tip = chain.lock().unwrap().tip(); let rpc_client = rpc_args.new_client()?; - let mut emitter = Emitter::new(&rpc_client, chain_tip, fallback_height, HashSet::new()); + let mut emitter = { + let chain = chain.lock().unwrap(); + let graph = graph.lock().unwrap(); + Emitter::new( + &rpc_client, + chain.tip(), + fallback_height, + graph + .graph() + .list_canonical_txs( + &*chain, + chain.tip().block_id(), + CanonicalizationParams::default(), + ) + .filter(|tx| tx.chain_position.is_unconfirmed()), + ) + }; let mut db_stage = ChangeSet::default(); let mut last_db_commit = Instant::now(); @@ -222,7 +236,24 @@ fn main() -> anyhow::Result<()> { } = rpc_args; let sigterm_flag = start_ctrlc_handler(); - let last_cp = chain.lock().unwrap().tip(); + let rpc_client = Arc::new(rpc_args.new_client()?); + let mut emitter = { + let chain = chain.lock().unwrap(); + let graph = graph.lock().unwrap(); + Emitter::new( + rpc_client.clone(), + chain.tip(), + fallback_height, + graph + .graph() + .list_canonical_txs( + &*chain, + chain.tip().block_id(), + CanonicalizationParams::default(), + ) + .filter(|tx| tx.chain_position.is_unconfirmed()), + ) + }; println!( "[{:>10}s] starting emitter thread...", @@ -230,10 +261,6 @@ fn main() -> anyhow::Result<()> { ); let (tx, rx) = std::sync::mpsc::sync_channel::(CHANNEL_BOUND); let emission_jh = std::thread::spawn(move || -> anyhow::Result<()> { - let rpc_client = rpc_args.new_client()?; - let mut emitter = - Emitter::new(&rpc_client, last_cp, fallback_height, HashSet::new()); - let mut block_count = rpc_client.get_block_count()? as u32; tx.send(Emission::Tip(block_count))?;