From: valued mammal Date: Sat, 1 Feb 2025 15:15:55 +0000 (-0500) Subject: refactor(rpc)!: update `mempool` interface and test code X-Git-Tag: core-0.5.0~1^2~3 X-Git-Url: http://internal-gitweb-vhost/script/%22https:/struct.DecoderReader.html?a=commitdiff_plain;h=28ef7c9a22c4364925329628fa84fd09ecc82e2e;p=bdk refactor(rpc)!: update `mempool` interface and test code --- diff --git a/crates/bitcoind_rpc/src/lib.rs b/crates/bitcoind_rpc/src/lib.rs index 3fa17ef1..7e363560 100644 --- a/crates/bitcoind_rpc/src/lib.rs +++ b/crates/bitcoind_rpc/src/lib.rs @@ -10,8 +10,9 @@ #![warn(missing_docs)] use bdk_core::{BlockId, CheckPoint}; -use bitcoin::{block::Header, Block, BlockHash, Transaction}; +use bitcoin::{Block, BlockHash, Transaction, Txid}; use bitcoincore_rpc::bitcoincore_rpc_json; +use std::collections::HashSet; pub mod bip158; @@ -43,6 +44,16 @@ pub struct Emitter<'c, C> { /// The last emitted block during our last mempool emission. This is used to determine whether /// there has been a reorg since our last mempool emission. last_mempool_tip: Option, + + /// A set of txids currently assumed to still be in the mempool. + /// + /// This is used to detect mempool evictions by comparing the set against the latest mempool + /// snapshot from bitcoind. Any txid in this set that is missing from the snapshot is considered + /// evicted. + /// + /// When the emitter emits a block, confirmed txids are removed from this set. This prevents + /// confirmed transactions from being mistakenly marked with an `evicted_at` timestamp. + expected_mempool_txids: HashSet, } impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { @@ -53,7 +64,15 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { /// /// `start_height` starts emission from a given height (if there are no conflicts with the /// original chain). - pub fn new(client: &'c C, last_cp: CheckPoint, start_height: u32) -> Self { + /// + /// `expected_mempool_txids` is the initial set of unconfirmed txids provided by the wallet. + /// This allows the [`Emitter`] to inform the wallet about relevant mempool evictions. + pub fn new( + client: &'c C, + last_cp: CheckPoint, + start_height: u32, + expected_mempool_txids: HashSet, + ) -> Self { Self { client, start_height, @@ -61,10 +80,18 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { last_block: None, last_mempool_time: 0, last_mempool_tip: None, + expected_mempool_txids, } } - /// Emit mempool transactions, alongside their first-seen unix timestamps. + /// Emit mempool transactions and any evicted [`Txid`]s. + /// + /// This method returns a [`MempoolEvent`] containing the full transactions (with their + /// first-seen unix timestamps) that were emitted, and [`MempoolEvent::evicted_txids`] which are + /// any [`Txid`]s which were previously seen in the mempool and are now missing. Evicted txids + /// are only reported once the emitter’s checkpoint matches the RPC’s best block in both height + /// and hash. Until `next_block()` advances the checkpoint to tip, `mempool()` will always + /// return an empty `evicted_txids` set. /// /// This method emits each transaction only once, unless we cannot guarantee the transaction's /// ancestors are already emitted. @@ -74,7 +101,7 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { /// tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to block /// of height `h-1`, we want to re-emit this transaction until the receiver has seen the block /// at height `h`. - pub fn mempool(&mut self) -> Result, bitcoincore_rpc::Error> { + pub fn mempool(&mut self) -> Result { let client = self.client; // This is the emitted tip height during the last mempool emission. @@ -84,6 +111,38 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { // `start_height` has been emitted. .unwrap_or(self.start_height.saturating_sub(1)); + // Loop to make sure that the fetched mempool content and the fetched tip are consistent + // with one another. + let (raw_mempool, raw_mempool_txids, rpc_height, rpc_block_hash) = loop { + // Determine if height and hash matches the best block from the RPC. Evictions are deferred + // if we are not at the best block. + let height = client.get_block_count()?; + let hash = client.get_block_hash(height)?; + + // Get the raw mempool result from the RPC client which will be used to determine if any + // transactions have been evicted. + let mp = client.get_raw_mempool_verbose()?; + let mp_txids: HashSet = mp.keys().copied().collect(); + + if height == client.get_block_count()? && hash == client.get_block_hash(height)? { + break (mp, mp_txids, height, hash); + } + }; + + let at_tip = + rpc_height == self.last_cp.height() as u64 && rpc_block_hash == self.last_cp.hash(); + + // If at tip, any expected txid missing from raw mempool is considered evicted; + // if not at tip, we don't evict anything. + let evicted_txids: HashSet = if at_tip { + self.expected_mempool_txids + .difference(&raw_mempool_txids) + .copied() + .collect() + } else { + HashSet::new() + }; + // Mempool txs come with a timestamp of when the tx is introduced to the mempool. We keep // track of the latest mempool tx's timestamp to determine whether we have seen a tx // before. `prev_mempool_time` is the previous timestamp and `last_time` records what will @@ -91,8 +150,7 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { let prev_mempool_time = self.last_mempool_time; let mut latest_time = prev_mempool_time; - let txs_to_emit = client - .get_raw_mempool_verbose()? + let new_txs = raw_mempool .into_iter() .filter_map({ let latest_time = &mut latest_time; @@ -101,25 +159,25 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { if tx_time > *latest_time { *latest_time = tx_time; } - - // Avoid emitting transactions that are already emitted if we can guarantee - // blocks containing ancestors are already emitted. The bitcoind rpc interface - // provides us with the block height that the tx is introduced to the mempool. - // If we have already emitted the block of height, we can assume that all - // ancestor txs have been processed by the receiver. + // Best-effort check to avoid re-emitting transactions we've already emitted. + // + // Complete suppression isn't possible, since a transaction may spend outputs + // owned by the wallet. To determine if such a transaction is relevant, we must + // have already seen its ancestor(s) that contain the spent prevouts. + // + // Fortunately, bitcoind provides the block height at which the transaction + // entered the mempool. If we've already emitted that block height, we can + // reasonably assume the receiver has seen all ancestor transactions. let is_already_emitted = tx_time <= prev_mempool_time; let is_within_height = tx_entry.height <= prev_mempool_tip as _; if is_already_emitted && is_within_height { return None; } - let tx = match client.get_raw_transaction(&txid, None) { Ok(tx) => tx, - // the tx is confirmed or evicted since `get_raw_mempool_verbose` Err(err) if err.is_not_found_error() => return None, Err(err) => return Some(Err(err)), }; - Some(Ok((tx, tx_time as u64))) } }) @@ -128,22 +186,56 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { self.last_mempool_time = latest_time; self.last_mempool_tip = Some(self.last_cp.height()); - Ok(txs_to_emit) - } + // If at tip, we replace `expected_mempool_txids` with just the new txids. Otherwise, we’re + // still catching up to the tip and keep accumulating. + if at_tip { + self.expected_mempool_txids = new_txs.iter().map(|(tx, _)| tx.compute_txid()).collect(); + } else { + self.expected_mempool_txids + .extend(new_txs.iter().map(|(tx, _)| tx.compute_txid())); + } - /// Emit the next block height and header (if any). - pub fn next_header(&mut self) -> Result>, bitcoincore_rpc::Error> { - Ok(poll(self, |hash| self.client.get_block_header(hash))? - .map(|(checkpoint, block)| BlockEvent { block, checkpoint })) + Ok(MempoolEvent { + new_txs, + evicted_txids, + latest_update_time: latest_time as u64, + }) } /// Emit the next block height and block (if any). pub fn next_block(&mut self) -> Result>, bitcoincore_rpc::Error> { - Ok(poll(self, |hash| self.client.get_block(hash))? - .map(|(checkpoint, block)| BlockEvent { block, checkpoint })) + if let Some((checkpoint, block)) = poll(self, |hash| self.client.get_block(hash))? { + // Stop tracking unconfirmed transactions that have been confirmed in this block. + for tx in &block.txdata { + self.expected_mempool_txids.remove(&tx.compute_txid()); + } + return Ok(Some(BlockEvent { block, checkpoint })); + } + Ok(None) } } +/// A new emission from mempool. +#[derive(Debug)] +pub struct MempoolEvent { + /// Unemitted transactions or transactions with ancestors that are unseen by the receiver. + /// + /// To understand the second condition, consider a receiver which filters transactions based on + /// whether it alters the UTXO set of tracked script pubkeys. If an emitted mempool transaction + /// spends a tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to + /// block of height `h-1`, we want to re-emit this transaction until the receiver has seen the + /// block at height `h`. + pub new_txs: Vec<(Transaction, u64)>, + + /// [`Txid`]s of all transactions that have been evicted from mempool. + pub evicted_txids: HashSet, + + /// The latest timestamp of when a transaction entered the mempool. + /// + /// This is useful for setting the timestamp for evicted transactions. + pub latest_update_time: u64, +} + /// A newly emitted block from [`Emitter`]. #[derive(Debug)] pub struct BlockEvent { @@ -329,3 +421,77 @@ impl BitcoindRpcErrorExt for bitcoincore_rpc::Error { } } } + +#[cfg(test)] +mod test { + use crate::{bitcoincore_rpc::RpcApi, Emitter}; + use bdk_bitcoind_rpc::bitcoincore_rpc::bitcoin::Txid; + use bdk_chain::local_chain::LocalChain; + use bdk_testenv::{anyhow, TestEnv}; + use bitcoin::{hashes::Hash, Address, Amount, ScriptBuf, WScriptHash}; + use std::collections::HashSet; + + #[test] + fn test_expected_mempool_txids_accumulate_and_remove() -> anyhow::Result<()> { + let env = TestEnv::new()?; + let chain = LocalChain::from_genesis_hash(env.rpc_client().get_block_hash(0)?).0; + let chain_tip = chain.tip(); + let mut emitter = Emitter::new(env.rpc_client(), chain_tip.clone(), 1, HashSet::new()); + + env.mine_blocks(100, None)?; + while emitter.next_block()?.is_some() {} + + let spk_to_track = ScriptBuf::new_p2wsh(&WScriptHash::all_zeros()); + let addr_to_track = Address::from_script(&spk_to_track, bitcoin::Network::Regtest)?; + let mut mempool_txids = HashSet::new(); + + // Send a tx at different heights and ensure txs are accumulating in expected_mempool_txids. + for _ in 0..10 { + let sent_txid = env.send(&addr_to_track, Amount::from_sat(1_000))?; + mempool_txids.insert(sent_txid); + emitter.mempool()?; + env.mine_blocks(1, None)?; + + for txid in &mempool_txids { + assert!( + emitter.expected_mempool_txids.contains(txid), + "Expected txid {:?} missing", + txid + ); + } + } + + // Process each block and check that confirmed txids are removed from from + // expected_mempool_txids. + while let Some(block_event) = emitter.next_block()? { + let confirmed_txids: HashSet = block_event + .block + .txdata + .iter() + .map(|tx| tx.compute_txid()) + .collect(); + mempool_txids = mempool_txids + .difference(&confirmed_txids) + .copied() + .collect::>(); + for txid in confirmed_txids { + assert!( + !emitter.expected_mempool_txids.contains(&txid), + "Expected txid {:?} should have been removed", + txid + ); + } + for txid in &mempool_txids { + assert!( + emitter.expected_mempool_txids.contains(txid), + "Expected txid {:?} missing", + txid + ); + } + } + + assert!(emitter.expected_mempool_txids.is_empty()); + + Ok(()) + } +} diff --git a/crates/bitcoind_rpc/tests/test_emitter.rs b/crates/bitcoind_rpc/tests/test_emitter.rs index 5753b82f..0c30f5a8 100644 --- a/crates/bitcoind_rpc/tests/test_emitter.rs +++ b/crates/bitcoind_rpc/tests/test_emitter.rs @@ -1,4 +1,4 @@ -use std::collections::{BTreeMap, BTreeSet}; +use std::collections::{BTreeMap, BTreeSet, HashSet}; use bdk_bitcoind_rpc::Emitter; use bdk_chain::{ @@ -22,7 +22,7 @@ pub fn test_sync_local_chain() -> anyhow::Result<()> { let env = TestEnv::new()?; let network_tip = env.rpc_client().get_block_count()?; let (mut local_chain, _) = LocalChain::from_genesis_hash(env.rpc_client().get_block_hash(0)?); - let mut emitter = Emitter::new(env.rpc_client(), local_chain.tip(), 0); + let mut emitter = Emitter::new(env.rpc_client(), local_chain.tip(), 0, HashSet::new()); // Mine some blocks and return the actual block hashes. // Because initializing `ElectrsD` already mines some blocks, we must include those too when @@ -156,7 +156,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> { index }); - let emitter = &mut Emitter::new(env.rpc_client(), chain.tip(), 0); + let emitter = &mut Emitter::new(env.rpc_client(), chain.tip(), 0, HashSet::new()); while let Some(emission) = emitter.next_block()? { let height = emission.block_height(); @@ -189,7 +189,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> { assert!(emitter.next_block()?.is_none()); let mempool_txs = emitter.mempool()?; - let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs); + let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs.new_txs); assert_eq!( indexed_additions .tx_graph @@ -252,14 +252,15 @@ fn ensure_block_emitted_after_reorg_is_at_reorg_height() -> anyhow::Result<()> { hash: env.rpc_client().get_block_hash(0)?, }), EMITTER_START_HEIGHT as _, + HashSet::new(), ); env.mine_blocks(CHAIN_TIP_HEIGHT, None)?; - while emitter.next_header()?.is_some() {} + while emitter.next_block()?.is_some() {} for reorg_count in 1..=10 { let replaced_blocks = env.reorg_empty_blocks(reorg_count)?; - let next_emission = emitter.next_header()?.expect("must emit block after reorg"); + let next_emission = emitter.next_block()?.expect("must emit block after reorg"); assert_eq!( ( next_emission.block_height() as usize, @@ -268,7 +269,7 @@ fn ensure_block_emitted_after_reorg_is_at_reorg_height() -> anyhow::Result<()> { replaced_blocks[0], "block emitted after reorg should be at the reorg height" ); - while emitter.next_header()?.is_some() {} + while emitter.next_block()?.is_some() {} } Ok(()) @@ -332,6 +333,7 @@ fn tx_can_become_unconfirmed_after_reorg() -> anyhow::Result<()> { hash: env.rpc_client().get_block_hash(0)?, }), 0, + HashSet::new(), ); // setup addresses @@ -423,6 +425,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> { hash: env.rpc_client().get_block_hash(0)?, }), 0, + HashSet::new(), ); // mine blocks and sync up emitter @@ -431,7 +434,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> { .get_new_address(None, None)? .assume_checked(); env.mine_blocks(BLOCKS_TO_MINE, Some(addr.clone()))?; - while emitter.next_header()?.is_some() {} + while emitter.next_block()?.is_some() {} // have some random txs in mempool let exp_txids = (0..MEMPOOL_TX_COUNT) @@ -441,6 +444,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> { // the first emission should include all transactions let emitted_txids = emitter .mempool()? + .new_txs .into_iter() .map(|(tx, _)| tx.compute_txid()) .collect::>(); @@ -451,7 +455,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> { // second emission should be empty assert!( - emitter.mempool()?.is_empty(), + emitter.mempool()?.new_txs.is_empty(), "second emission should be empty" ); @@ -459,9 +463,9 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> { for _ in 0..BLOCKS_TO_MINE { env.mine_empty_block()?; } - while emitter.next_header()?.is_some() {} + while emitter.next_block()?.is_some() {} assert!( - emitter.mempool()?.is_empty(), + emitter.mempool()?.new_txs.is_empty(), "third emission, after chain tip is extended, should also be empty" ); @@ -488,6 +492,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<() hash: env.rpc_client().get_block_hash(0)?, }), 0, + HashSet::new(), ); // mine blocks to get initial balance, sync emitter up to tip @@ -496,7 +501,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<() .get_new_address(None, None)? .assume_checked(); env.mine_blocks(PREMINE_COUNT, Some(addr.clone()))?; - while emitter.next_header()?.is_some() {} + while emitter.next_block()?.is_some() {} // mine blocks to introduce txs to mempool at different heights let tx_introductions = (0..MEMPOOL_TX_COUNT) @@ -510,6 +515,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<() assert_eq!( emitter .mempool()? + .new_txs .into_iter() .map(|(tx, _)| tx.compute_txid()) .collect::>(), @@ -519,6 +525,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<() assert_eq!( emitter .mempool()? + .new_txs .into_iter() .map(|(tx, _)| tx.compute_txid()) .collect::>(), @@ -528,7 +535,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<() // At this point, the emitter has seen all mempool transactions. It should only re-emit those // that have introduction heights less than the emitter's last-emitted block tip. - while let Some(emission) = emitter.next_header()? { + while let Some(emission) = emitter.next_block()? { let height = emission.block_height(); // We call `mempool()` twice. // The second call (at height `h`) should skip the tx introduced at height `h`. @@ -539,6 +546,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<() .collect::>(); let emitted_txids = emitter .mempool()? + .new_txs .into_iter() .map(|(tx, _)| tx.compute_txid()) .collect::>(); @@ -576,6 +584,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> { hash: env.rpc_client().get_block_hash(0)?, }), 0, + HashSet::new(), ); // mine blocks to get initial balance @@ -593,10 +602,11 @@ fn mempool_during_reorg() -> anyhow::Result<()> { // sync emitter to tip, first mempool emission should include all txs (as we haven't emitted // from the mempool yet) - while emitter.next_header()?.is_some() {} + while emitter.next_block()?.is_some() {} assert_eq!( emitter .mempool()? + .new_txs .into_iter() .map(|(tx, _)| tx.compute_txid()) .collect::>(), @@ -625,13 +635,14 @@ fn mempool_during_reorg() -> anyhow::Result<()> { .collect::>()); // `next_header` emits the replacement block of the reorg - if let Some(emission) = emitter.next_header()? { + if let Some(emission) = emitter.next_block()? { let height = emission.block_height(); // the mempool emission (that follows the first block emission after reorg) should only // include mempool txs introduced at reorg height or greater let mempool = emitter .mempool()? + .new_txs .into_iter() .map(|(tx, _)| tx.compute_txid()) .collect::>(); @@ -647,6 +658,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> { let mempool = emitter .mempool()? + .new_txs .into_iter() .map(|(tx, _)| tx.compute_txid()) .collect::>(); @@ -670,7 +682,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> { } // sync emitter to tip - while emitter.next_header()?.is_some() {} + while emitter.next_block()?.is_some() {} } Ok(()) @@ -700,18 +712,23 @@ fn no_agreement_point() -> anyhow::Result<()> { hash: env.rpc_client().get_block_hash(0)?, }), (PREMINE_COUNT - 2) as u32, + HashSet::new(), ); // mine 101 blocks env.mine_blocks(PREMINE_COUNT, None)?; // emit block 99a - let block_header_99a = emitter.next_header()?.expect("block 99a header").block; + let block_header_99a = emitter + .next_block()? + .expect("block 99a header") + .block + .header; let block_hash_99a = block_header_99a.block_hash(); let block_hash_98a = block_header_99a.prev_blockhash; // emit block 100a - let block_header_100a = emitter.next_header()?.expect("block 100a header").block; + let block_header_100a = emitter.next_block()?.expect("block 100a header").block; let block_hash_100a = block_header_100a.block_hash(); // get hash for block 101a @@ -726,7 +743,11 @@ fn no_agreement_point() -> anyhow::Result<()> { env.mine_blocks(3, None)?; // emit block header 99b - let block_header_99b = emitter.next_header()?.expect("block 99b header").block; + let block_header_99b = emitter + .next_block()? + .expect("block 99b header") + .block + .header; let block_hash_99b = block_header_99b.block_hash(); let block_hash_98b = block_header_99b.prev_blockhash; @@ -735,3 +756,110 @@ fn no_agreement_point() -> anyhow::Result<()> { Ok(()) } + +/// Validates that when an unconfirmed transaction is double-spent (and thus evicted from the +/// mempool), the emitter reports it in `evicted_txids`, and after inserting that eviction into the +/// graph it no longer appears in the set of canonical transactions. +/// +/// 1. Broadcast a first tx (tx1) and confirm it arrives in unconfirmed set. +/// 2. Double-spend tx1 with tx1b and verify `mempool()` reports tx1 as evicted. +/// 3. Insert the eviction into the graph and assert tx1 is no longer canonical. +#[test] +fn test_expect_tx_evicted() -> anyhow::Result<()> { + use bdk_bitcoind_rpc::bitcoincore_rpc::bitcoin; + use bdk_bitcoind_rpc::bitcoincore_rpc::bitcoincore_rpc_json::CreateRawTransactionInput; + use bdk_chain::miniscript; + use bdk_chain::spk_txout::SpkTxOutIndex; + use bitcoin::constants::genesis_block; + use bitcoin::secp256k1::Secp256k1; + use bitcoin::Network; + use std::collections::HashMap; + let env = TestEnv::new()?; + + let s = bdk_testenv::utils::DESCRIPTORS[0]; + let desc = miniscript::Descriptor::parse_descriptor(&Secp256k1::new(), s) + .unwrap() + .0; + let spk = desc.at_derivation_index(0)?.script_pubkey(); + + let mut chain = LocalChain::from_genesis_hash(genesis_block(Network::Regtest).block_hash()).0; + let chain_tip = chain.tip().block_id(); + + let mut index = SpkTxOutIndex::default(); + index.insert_spk((), spk.clone()); + let mut graph = IndexedTxGraph::::new(index); + + // Receive tx1. + let _ = env.mine_blocks(100, None)?; + let txid_1 = env.send( + &Address::from_script(&spk, Network::Regtest)?, + Amount::ONE_BTC, + )?; + + let mut emitter = Emitter::new(env.rpc_client(), chain.tip(), 1, HashSet::from([txid_1])); + while let Some(emission) = emitter.next_block()? { + let height = emission.block_height(); + chain.apply_update(CheckPoint::from_header(&emission.block.header, height))?; + } + + let changeset = graph.batch_insert_unconfirmed(emitter.mempool()?.new_txs); + assert!(changeset + .tx_graph + .txs + .iter() + .any(|tx| tx.compute_txid() == txid_1)); + + // Double spend tx1. + + // Get `prevout` from core. + let core = env.rpc_client(); + let tx1 = &core.get_raw_transaction(&txid_1, None)?; + let txin = &tx1.input[0]; + let op = txin.previous_output; + + // Create `tx1b` using the previous output from tx1. + let utxo = CreateRawTransactionInput { + txid: op.txid, + vout: op.vout, + sequence: None, + }; + let addr = core.get_new_address(None, None)?.assume_checked(); + let tx = core.create_raw_transaction( + &[utxo], + &HashMap::from([(addr.to_string(), Amount::from_btc(49.99)?)]), + None, + None, + )?; + let res = core.sign_raw_transaction_with_wallet(&tx, None, None)?; + let tx1b = res.transaction()?; + + // Send the tx. + let _txid_2 = core.send_raw_transaction(&tx1b)?; + + // Retrieve the expected unconfirmed txids and spks from the graph. + let exp_spk_txids = graph + .list_expected_spk_txids(&chain, chain_tip, ..) + .collect::>(); + assert_eq!(exp_spk_txids, vec![(spk, txid_1)]); + + // Check that mempool emission contains evicted txid. + let mempool_event = emitter.mempool()?; + assert!(mempool_event.evicted_txids.contains(&txid_1)); + + // Update graph with evicted tx. + for txid in mempool_event.evicted_txids { + if graph.graph().get_tx_node(txid).is_some() { + let _ = graph.insert_evicted_at(txid, mempool_event.latest_update_time); + } + } + + let canonical_txids = graph + .graph() + .list_canonical_txs(&chain, chain_tip, CanonicalizationParams::default()) + .map(|tx| tx.tx_node.compute_txid()) + .collect::>(); + // tx1 should no longer be canonical. + assert!(!canonical_txids.contains(&txid_1)); + + Ok(()) +} diff --git a/examples/example_bitcoind_rpc_polling/src/main.rs b/examples/example_bitcoind_rpc_polling/src/main.rs index 5eb3d3eb..3684edbf 100644 --- a/examples/example_bitcoind_rpc_polling/src/main.rs +++ b/examples/example_bitcoind_rpc_polling/src/main.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashSet, path::PathBuf, sync::{ atomic::{AtomicBool, Ordering}, @@ -11,10 +12,7 @@ use bdk_bitcoind_rpc::{ bitcoincore_rpc::{Auth, Client, RpcApi}, Emitter, }; -use bdk_chain::{ - bitcoin::{Block, Transaction}, - local_chain, CanonicalizationParams, Merge, -}; +use bdk_chain::{bitcoin::Block, local_chain, CanonicalizationParams, Merge}; use example_cli::{ anyhow, clap::{self, Args, Subcommand}, @@ -36,7 +34,7 @@ const DB_COMMIT_DELAY: Duration = Duration::from_secs(60); #[derive(Debug)] enum Emission { Block(bdk_bitcoind_rpc::BlockEvent), - Mempool(Vec<(Transaction, u64)>), + Mempool(bdk_bitcoind_rpc::MempoolEvent), Tip(u32), } @@ -141,7 +139,7 @@ fn main() -> anyhow::Result<()> { let chain_tip = chain.lock().unwrap().tip(); let rpc_client = rpc_args.new_client()?; - let mut emitter = Emitter::new(&rpc_client, chain_tip, fallback_height); + let mut emitter = Emitter::new(&rpc_client, chain_tip, fallback_height, HashSet::new()); let mut db_stage = ChangeSet::default(); let mut last_db_commit = Instant::now(); @@ -205,7 +203,7 @@ fn main() -> anyhow::Result<()> { let graph_changeset = graph .lock() .unwrap() - .batch_insert_relevant_unconfirmed(mempool_txs); + .batch_insert_relevant_unconfirmed(mempool_txs.new_txs); { let db = &mut *db.lock().unwrap(); db_stage.merge(ChangeSet { @@ -233,7 +231,8 @@ fn main() -> anyhow::Result<()> { let (tx, rx) = std::sync::mpsc::sync_channel::(CHANNEL_BOUND); let emission_jh = std::thread::spawn(move || -> anyhow::Result<()> { let rpc_client = rpc_args.new_client()?; - let mut emitter = Emitter::new(&rpc_client, last_cp, fallback_height); + let mut emitter = + Emitter::new(&rpc_client, last_cp, fallback_height, HashSet::new()); let mut block_count = rpc_client.get_block_count()? as u32; tx.send(Emission::Tip(block_count))?; @@ -288,7 +287,13 @@ fn main() -> anyhow::Result<()> { (chain_changeset, graph_changeset) } Emission::Mempool(mempool_txs) => { - let graph_changeset = graph.batch_insert_relevant_unconfirmed(mempool_txs); + let mut graph_changeset = + graph.batch_insert_relevant_unconfirmed(mempool_txs.new_txs.clone()); + for txid in mempool_txs.evicted_txids { + graph_changeset.merge( + graph.insert_evicted_at(txid, mempool_txs.latest_update_time), + ); + } (local_chain::ChangeSet::default(), graph_changeset) } Emission::Tip(h) => {