#![warn(missing_docs)]
use bdk_core::{BlockId, CheckPoint};
-use bitcoin::{block::Header, Block, BlockHash, Transaction};
+use bitcoin::{Block, BlockHash, Transaction, Txid};
use bitcoincore_rpc::bitcoincore_rpc_json;
+use std::collections::HashSet;
pub mod bip158;
/// The last emitted block during our last mempool emission. This is used to determine whether
/// there has been a reorg since our last mempool emission.
last_mempool_tip: Option<u32>,
+
+ /// A set of txids currently assumed to still be in the mempool.
+ ///
+ /// This is used to detect mempool evictions by comparing the set against the latest mempool
+ /// snapshot from bitcoind. Any txid in this set that is missing from the snapshot is considered
+ /// evicted.
+ ///
+ /// When the emitter emits a block, confirmed txids are removed from this set. This prevents
+ /// confirmed transactions from being mistakenly marked with an `evicted_at` timestamp.
+ expected_mempool_txids: HashSet<Txid>,
}
impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
///
/// `start_height` starts emission from a given height (if there are no conflicts with the
/// original chain).
- pub fn new(client: &'c C, last_cp: CheckPoint, start_height: u32) -> Self {
+ ///
+ /// `expected_mempool_txids` is the initial set of unconfirmed txids provided by the wallet.
+ /// This allows the [`Emitter`] to inform the wallet about relevant mempool evictions.
+ pub fn new(
+ client: &'c C,
+ last_cp: CheckPoint,
+ start_height: u32,
+ expected_mempool_txids: HashSet<Txid>,
+ ) -> Self {
Self {
client,
start_height,
last_block: None,
last_mempool_time: 0,
last_mempool_tip: None,
+ expected_mempool_txids,
}
}
- /// Emit mempool transactions, alongside their first-seen unix timestamps.
+ /// Emit mempool transactions and any evicted [`Txid`]s.
+ ///
+ /// This method returns a [`MempoolEvent`] containing the full transactions (with their
+ /// first-seen unix timestamps) that were emitted, and [`MempoolEvent::evicted_txids`] which are
+ /// any [`Txid`]s which were previously seen in the mempool and are now missing. Evicted txids
+ /// are only reported once the emitter’s checkpoint matches the RPC’s best block in both height
+ /// and hash. Until `next_block()` advances the checkpoint to tip, `mempool()` will always
+ /// return an empty `evicted_txids` set.
///
/// This method emits each transaction only once, unless we cannot guarantee the transaction's
/// ancestors are already emitted.
/// tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to block
/// of height `h-1`, we want to re-emit this transaction until the receiver has seen the block
/// at height `h`.
- pub fn mempool(&mut self) -> Result<Vec<(Transaction, u64)>, bitcoincore_rpc::Error> {
+ pub fn mempool(&mut self) -> Result<MempoolEvent, bitcoincore_rpc::Error> {
let client = self.client;
// This is the emitted tip height during the last mempool emission.
// `start_height` has been emitted.
.unwrap_or(self.start_height.saturating_sub(1));
+ // Loop to make sure that the fetched mempool content and the fetched tip are consistent
+ // with one another.
+ let (raw_mempool, raw_mempool_txids, rpc_height, rpc_block_hash) = loop {
+ // Determine if height and hash matches the best block from the RPC. Evictions are deferred
+ // if we are not at the best block.
+ let height = client.get_block_count()?;
+ let hash = client.get_block_hash(height)?;
+
+ // Get the raw mempool result from the RPC client which will be used to determine if any
+ // transactions have been evicted.
+ let mp = client.get_raw_mempool_verbose()?;
+ let mp_txids: HashSet<Txid> = mp.keys().copied().collect();
+
+ if height == client.get_block_count()? && hash == client.get_block_hash(height)? {
+ break (mp, mp_txids, height, hash);
+ }
+ };
+
+ let at_tip =
+ rpc_height == self.last_cp.height() as u64 && rpc_block_hash == self.last_cp.hash();
+
+ // If at tip, any expected txid missing from raw mempool is considered evicted;
+ // if not at tip, we don't evict anything.
+ let evicted_txids: HashSet<Txid> = if at_tip {
+ self.expected_mempool_txids
+ .difference(&raw_mempool_txids)
+ .copied()
+ .collect()
+ } else {
+ HashSet::new()
+ };
+
// Mempool txs come with a timestamp of when the tx is introduced to the mempool. We keep
// track of the latest mempool tx's timestamp to determine whether we have seen a tx
// before. `prev_mempool_time` is the previous timestamp and `last_time` records what will
let prev_mempool_time = self.last_mempool_time;
let mut latest_time = prev_mempool_time;
- let txs_to_emit = client
- .get_raw_mempool_verbose()?
+ let new_txs = raw_mempool
.into_iter()
.filter_map({
let latest_time = &mut latest_time;
if tx_time > *latest_time {
*latest_time = tx_time;
}
-
- // Avoid emitting transactions that are already emitted if we can guarantee
- // blocks containing ancestors are already emitted. The bitcoind rpc interface
- // provides us with the block height that the tx is introduced to the mempool.
- // If we have already emitted the block of height, we can assume that all
- // ancestor txs have been processed by the receiver.
+ // Best-effort check to avoid re-emitting transactions we've already emitted.
+ //
+ // Complete suppression isn't possible, since a transaction may spend outputs
+ // owned by the wallet. To determine if such a transaction is relevant, we must
+ // have already seen its ancestor(s) that contain the spent prevouts.
+ //
+ // Fortunately, bitcoind provides the block height at which the transaction
+ // entered the mempool. If we've already emitted that block height, we can
+ // reasonably assume the receiver has seen all ancestor transactions.
let is_already_emitted = tx_time <= prev_mempool_time;
let is_within_height = tx_entry.height <= prev_mempool_tip as _;
if is_already_emitted && is_within_height {
return None;
}
-
let tx = match client.get_raw_transaction(&txid, None) {
Ok(tx) => tx,
- // the tx is confirmed or evicted since `get_raw_mempool_verbose`
Err(err) if err.is_not_found_error() => return None,
Err(err) => return Some(Err(err)),
};
-
Some(Ok((tx, tx_time as u64)))
}
})
self.last_mempool_time = latest_time;
self.last_mempool_tip = Some(self.last_cp.height());
- Ok(txs_to_emit)
- }
+ // If at tip, we replace `expected_mempool_txids` with just the new txids. Otherwise, we’re
+ // still catching up to the tip and keep accumulating.
+ if at_tip {
+ self.expected_mempool_txids = new_txs.iter().map(|(tx, _)| tx.compute_txid()).collect();
+ } else {
+ self.expected_mempool_txids
+ .extend(new_txs.iter().map(|(tx, _)| tx.compute_txid()));
+ }
- /// Emit the next block height and header (if any).
- pub fn next_header(&mut self) -> Result<Option<BlockEvent<Header>>, bitcoincore_rpc::Error> {
- Ok(poll(self, |hash| self.client.get_block_header(hash))?
- .map(|(checkpoint, block)| BlockEvent { block, checkpoint }))
+ Ok(MempoolEvent {
+ new_txs,
+ evicted_txids,
+ latest_update_time: latest_time as u64,
+ })
}
/// Emit the next block height and block (if any).
pub fn next_block(&mut self) -> Result<Option<BlockEvent<Block>>, bitcoincore_rpc::Error> {
- Ok(poll(self, |hash| self.client.get_block(hash))?
- .map(|(checkpoint, block)| BlockEvent { block, checkpoint }))
+ if let Some((checkpoint, block)) = poll(self, |hash| self.client.get_block(hash))? {
+ // Stop tracking unconfirmed transactions that have been confirmed in this block.
+ for tx in &block.txdata {
+ self.expected_mempool_txids.remove(&tx.compute_txid());
+ }
+ return Ok(Some(BlockEvent { block, checkpoint }));
+ }
+ Ok(None)
}
}
+/// A new emission from mempool.
+#[derive(Debug)]
+pub struct MempoolEvent {
+ /// Unemitted transactions or transactions with ancestors that are unseen by the receiver.
+ ///
+ /// To understand the second condition, consider a receiver which filters transactions based on
+ /// whether it alters the UTXO set of tracked script pubkeys. If an emitted mempool transaction
+ /// spends a tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to
+ /// block of height `h-1`, we want to re-emit this transaction until the receiver has seen the
+ /// block at height `h`.
+ pub new_txs: Vec<(Transaction, u64)>,
+
+ /// [`Txid`]s of all transactions that have been evicted from mempool.
+ pub evicted_txids: HashSet<Txid>,
+
+ /// The latest timestamp of when a transaction entered the mempool.
+ ///
+ /// This is useful for setting the timestamp for evicted transactions.
+ pub latest_update_time: u64,
+}
+
/// A newly emitted block from [`Emitter`].
#[derive(Debug)]
pub struct BlockEvent<B> {
}
}
}
+
+#[cfg(test)]
+mod test {
+ use crate::{bitcoincore_rpc::RpcApi, Emitter};
+ use bdk_bitcoind_rpc::bitcoincore_rpc::bitcoin::Txid;
+ use bdk_chain::local_chain::LocalChain;
+ use bdk_testenv::{anyhow, TestEnv};
+ use bitcoin::{hashes::Hash, Address, Amount, ScriptBuf, WScriptHash};
+ use std::collections::HashSet;
+
+ #[test]
+ fn test_expected_mempool_txids_accumulate_and_remove() -> anyhow::Result<()> {
+ let env = TestEnv::new()?;
+ let chain = LocalChain::from_genesis_hash(env.rpc_client().get_block_hash(0)?).0;
+ let chain_tip = chain.tip();
+ let mut emitter = Emitter::new(env.rpc_client(), chain_tip.clone(), 1, HashSet::new());
+
+ env.mine_blocks(100, None)?;
+ while emitter.next_block()?.is_some() {}
+
+ let spk_to_track = ScriptBuf::new_p2wsh(&WScriptHash::all_zeros());
+ let addr_to_track = Address::from_script(&spk_to_track, bitcoin::Network::Regtest)?;
+ let mut mempool_txids = HashSet::new();
+
+ // Send a tx at different heights and ensure txs are accumulating in expected_mempool_txids.
+ for _ in 0..10 {
+ let sent_txid = env.send(&addr_to_track, Amount::from_sat(1_000))?;
+ mempool_txids.insert(sent_txid);
+ emitter.mempool()?;
+ env.mine_blocks(1, None)?;
+
+ for txid in &mempool_txids {
+ assert!(
+ emitter.expected_mempool_txids.contains(txid),
+ "Expected txid {:?} missing",
+ txid
+ );
+ }
+ }
+
+ // Process each block and check that confirmed txids are removed from from
+ // expected_mempool_txids.
+ while let Some(block_event) = emitter.next_block()? {
+ let confirmed_txids: HashSet<Txid> = block_event
+ .block
+ .txdata
+ .iter()
+ .map(|tx| tx.compute_txid())
+ .collect();
+ mempool_txids = mempool_txids
+ .difference(&confirmed_txids)
+ .copied()
+ .collect::<HashSet<_>>();
+ for txid in confirmed_txids {
+ assert!(
+ !emitter.expected_mempool_txids.contains(&txid),
+ "Expected txid {:?} should have been removed",
+ txid
+ );
+ }
+ for txid in &mempool_txids {
+ assert!(
+ emitter.expected_mempool_txids.contains(txid),
+ "Expected txid {:?} missing",
+ txid
+ );
+ }
+ }
+
+ assert!(emitter.expected_mempool_txids.is_empty());
+
+ Ok(())
+ }
+}
-use std::collections::{BTreeMap, BTreeSet};
+use std::collections::{BTreeMap, BTreeSet, HashSet};
use bdk_bitcoind_rpc::Emitter;
use bdk_chain::{
let env = TestEnv::new()?;
let network_tip = env.rpc_client().get_block_count()?;
let (mut local_chain, _) = LocalChain::from_genesis_hash(env.rpc_client().get_block_hash(0)?);
- let mut emitter = Emitter::new(env.rpc_client(), local_chain.tip(), 0);
+ let mut emitter = Emitter::new(env.rpc_client(), local_chain.tip(), 0, HashSet::new());
// Mine some blocks and return the actual block hashes.
// Because initializing `ElectrsD` already mines some blocks, we must include those too when
index
});
- let emitter = &mut Emitter::new(env.rpc_client(), chain.tip(), 0);
+ let emitter = &mut Emitter::new(env.rpc_client(), chain.tip(), 0, HashSet::new());
while let Some(emission) = emitter.next_block()? {
let height = emission.block_height();
assert!(emitter.next_block()?.is_none());
let mempool_txs = emitter.mempool()?;
- let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs);
+ let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs.new_txs);
assert_eq!(
indexed_additions
.tx_graph
hash: env.rpc_client().get_block_hash(0)?,
}),
EMITTER_START_HEIGHT as _,
+ HashSet::new(),
);
env.mine_blocks(CHAIN_TIP_HEIGHT, None)?;
- while emitter.next_header()?.is_some() {}
+ while emitter.next_block()?.is_some() {}
for reorg_count in 1..=10 {
let replaced_blocks = env.reorg_empty_blocks(reorg_count)?;
- let next_emission = emitter.next_header()?.expect("must emit block after reorg");
+ let next_emission = emitter.next_block()?.expect("must emit block after reorg");
assert_eq!(
(
next_emission.block_height() as usize,
replaced_blocks[0],
"block emitted after reorg should be at the reorg height"
);
- while emitter.next_header()?.is_some() {}
+ while emitter.next_block()?.is_some() {}
}
Ok(())
hash: env.rpc_client().get_block_hash(0)?,
}),
0,
+ HashSet::new(),
);
// setup addresses
hash: env.rpc_client().get_block_hash(0)?,
}),
0,
+ HashSet::new(),
);
// mine blocks and sync up emitter
.get_new_address(None, None)?
.assume_checked();
env.mine_blocks(BLOCKS_TO_MINE, Some(addr.clone()))?;
- while emitter.next_header()?.is_some() {}
+ while emitter.next_block()?.is_some() {}
// have some random txs in mempool
let exp_txids = (0..MEMPOOL_TX_COUNT)
// the first emission should include all transactions
let emitted_txids = emitter
.mempool()?
+ .new_txs
.into_iter()
.map(|(tx, _)| tx.compute_txid())
.collect::<BTreeSet<Txid>>();
// second emission should be empty
assert!(
- emitter.mempool()?.is_empty(),
+ emitter.mempool()?.new_txs.is_empty(),
"second emission should be empty"
);
for _ in 0..BLOCKS_TO_MINE {
env.mine_empty_block()?;
}
- while emitter.next_header()?.is_some() {}
+ while emitter.next_block()?.is_some() {}
assert!(
- emitter.mempool()?.is_empty(),
+ emitter.mempool()?.new_txs.is_empty(),
"third emission, after chain tip is extended, should also be empty"
);
hash: env.rpc_client().get_block_hash(0)?,
}),
0,
+ HashSet::new(),
);
// mine blocks to get initial balance, sync emitter up to tip
.get_new_address(None, None)?
.assume_checked();
env.mine_blocks(PREMINE_COUNT, Some(addr.clone()))?;
- while emitter.next_header()?.is_some() {}
+ while emitter.next_block()?.is_some() {}
// mine blocks to introduce txs to mempool at different heights
let tx_introductions = (0..MEMPOOL_TX_COUNT)
assert_eq!(
emitter
.mempool()?
+ .new_txs
.into_iter()
.map(|(tx, _)| tx.compute_txid())
.collect::<BTreeSet<_>>(),
assert_eq!(
emitter
.mempool()?
+ .new_txs
.into_iter()
.map(|(tx, _)| tx.compute_txid())
.collect::<BTreeSet<_>>(),
// At this point, the emitter has seen all mempool transactions. It should only re-emit those
// that have introduction heights less than the emitter's last-emitted block tip.
- while let Some(emission) = emitter.next_header()? {
+ while let Some(emission) = emitter.next_block()? {
let height = emission.block_height();
// We call `mempool()` twice.
// The second call (at height `h`) should skip the tx introduced at height `h`.
.collect::<BTreeSet<_>>();
let emitted_txids = emitter
.mempool()?
+ .new_txs
.into_iter()
.map(|(tx, _)| tx.compute_txid())
.collect::<BTreeSet<_>>();
hash: env.rpc_client().get_block_hash(0)?,
}),
0,
+ HashSet::new(),
);
// mine blocks to get initial balance
// sync emitter to tip, first mempool emission should include all txs (as we haven't emitted
// from the mempool yet)
- while emitter.next_header()?.is_some() {}
+ while emitter.next_block()?.is_some() {}
assert_eq!(
emitter
.mempool()?
+ .new_txs
.into_iter()
.map(|(tx, _)| tx.compute_txid())
.collect::<BTreeSet<_>>(),
.collect::<BTreeMap<_, _>>());
// `next_header` emits the replacement block of the reorg
- if let Some(emission) = emitter.next_header()? {
+ if let Some(emission) = emitter.next_block()? {
let height = emission.block_height();
// the mempool emission (that follows the first block emission after reorg) should only
// include mempool txs introduced at reorg height or greater
let mempool = emitter
.mempool()?
+ .new_txs
.into_iter()
.map(|(tx, _)| tx.compute_txid())
.collect::<BTreeSet<_>>();
let mempool = emitter
.mempool()?
+ .new_txs
.into_iter()
.map(|(tx, _)| tx.compute_txid())
.collect::<BTreeSet<_>>();
}
// sync emitter to tip
- while emitter.next_header()?.is_some() {}
+ while emitter.next_block()?.is_some() {}
}
Ok(())
hash: env.rpc_client().get_block_hash(0)?,
}),
(PREMINE_COUNT - 2) as u32,
+ HashSet::new(),
);
// mine 101 blocks
env.mine_blocks(PREMINE_COUNT, None)?;
// emit block 99a
- let block_header_99a = emitter.next_header()?.expect("block 99a header").block;
+ let block_header_99a = emitter
+ .next_block()?
+ .expect("block 99a header")
+ .block
+ .header;
let block_hash_99a = block_header_99a.block_hash();
let block_hash_98a = block_header_99a.prev_blockhash;
// emit block 100a
- let block_header_100a = emitter.next_header()?.expect("block 100a header").block;
+ let block_header_100a = emitter.next_block()?.expect("block 100a header").block;
let block_hash_100a = block_header_100a.block_hash();
// get hash for block 101a
env.mine_blocks(3, None)?;
// emit block header 99b
- let block_header_99b = emitter.next_header()?.expect("block 99b header").block;
+ let block_header_99b = emitter
+ .next_block()?
+ .expect("block 99b header")
+ .block
+ .header;
let block_hash_99b = block_header_99b.block_hash();
let block_hash_98b = block_header_99b.prev_blockhash;
Ok(())
}
+
+/// Validates that when an unconfirmed transaction is double-spent (and thus evicted from the
+/// mempool), the emitter reports it in `evicted_txids`, and after inserting that eviction into the
+/// graph it no longer appears in the set of canonical transactions.
+///
+/// 1. Broadcast a first tx (tx1) and confirm it arrives in unconfirmed set.
+/// 2. Double-spend tx1 with tx1b and verify `mempool()` reports tx1 as evicted.
+/// 3. Insert the eviction into the graph and assert tx1 is no longer canonical.
+#[test]
+fn test_expect_tx_evicted() -> anyhow::Result<()> {
+ use bdk_bitcoind_rpc::bitcoincore_rpc::bitcoin;
+ use bdk_bitcoind_rpc::bitcoincore_rpc::bitcoincore_rpc_json::CreateRawTransactionInput;
+ use bdk_chain::miniscript;
+ use bdk_chain::spk_txout::SpkTxOutIndex;
+ use bitcoin::constants::genesis_block;
+ use bitcoin::secp256k1::Secp256k1;
+ use bitcoin::Network;
+ use std::collections::HashMap;
+ let env = TestEnv::new()?;
+
+ let s = bdk_testenv::utils::DESCRIPTORS[0];
+ let desc = miniscript::Descriptor::parse_descriptor(&Secp256k1::new(), s)
+ .unwrap()
+ .0;
+ let spk = desc.at_derivation_index(0)?.script_pubkey();
+
+ let mut chain = LocalChain::from_genesis_hash(genesis_block(Network::Regtest).block_hash()).0;
+ let chain_tip = chain.tip().block_id();
+
+ let mut index = SpkTxOutIndex::default();
+ index.insert_spk((), spk.clone());
+ let mut graph = IndexedTxGraph::<BlockId, _>::new(index);
+
+ // Receive tx1.
+ let _ = env.mine_blocks(100, None)?;
+ let txid_1 = env.send(
+ &Address::from_script(&spk, Network::Regtest)?,
+ Amount::ONE_BTC,
+ )?;
+
+ let mut emitter = Emitter::new(env.rpc_client(), chain.tip(), 1, HashSet::from([txid_1]));
+ while let Some(emission) = emitter.next_block()? {
+ let height = emission.block_height();
+ chain.apply_update(CheckPoint::from_header(&emission.block.header, height))?;
+ }
+
+ let changeset = graph.batch_insert_unconfirmed(emitter.mempool()?.new_txs);
+ assert!(changeset
+ .tx_graph
+ .txs
+ .iter()
+ .any(|tx| tx.compute_txid() == txid_1));
+
+ // Double spend tx1.
+
+ // Get `prevout` from core.
+ let core = env.rpc_client();
+ let tx1 = &core.get_raw_transaction(&txid_1, None)?;
+ let txin = &tx1.input[0];
+ let op = txin.previous_output;
+
+ // Create `tx1b` using the previous output from tx1.
+ let utxo = CreateRawTransactionInput {
+ txid: op.txid,
+ vout: op.vout,
+ sequence: None,
+ };
+ let addr = core.get_new_address(None, None)?.assume_checked();
+ let tx = core.create_raw_transaction(
+ &[utxo],
+ &HashMap::from([(addr.to_string(), Amount::from_btc(49.99)?)]),
+ None,
+ None,
+ )?;
+ let res = core.sign_raw_transaction_with_wallet(&tx, None, None)?;
+ let tx1b = res.transaction()?;
+
+ // Send the tx.
+ let _txid_2 = core.send_raw_transaction(&tx1b)?;
+
+ // Retrieve the expected unconfirmed txids and spks from the graph.
+ let exp_spk_txids = graph
+ .list_expected_spk_txids(&chain, chain_tip, ..)
+ .collect::<Vec<_>>();
+ assert_eq!(exp_spk_txids, vec![(spk, txid_1)]);
+
+ // Check that mempool emission contains evicted txid.
+ let mempool_event = emitter.mempool()?;
+ assert!(mempool_event.evicted_txids.contains(&txid_1));
+
+ // Update graph with evicted tx.
+ for txid in mempool_event.evicted_txids {
+ if graph.graph().get_tx_node(txid).is_some() {
+ let _ = graph.insert_evicted_at(txid, mempool_event.latest_update_time);
+ }
+ }
+
+ let canonical_txids = graph
+ .graph()
+ .list_canonical_txs(&chain, chain_tip, CanonicalizationParams::default())
+ .map(|tx| tx.tx_node.compute_txid())
+ .collect::<Vec<_>>();
+ // tx1 should no longer be canonical.
+ assert!(!canonical_txids.contains(&txid_1));
+
+ Ok(())
+}
use std::{
+ collections::HashSet,
path::PathBuf,
sync::{
atomic::{AtomicBool, Ordering},
bitcoincore_rpc::{Auth, Client, RpcApi},
Emitter,
};
-use bdk_chain::{
- bitcoin::{Block, Transaction},
- local_chain, CanonicalizationParams, Merge,
-};
+use bdk_chain::{bitcoin::Block, local_chain, CanonicalizationParams, Merge};
use example_cli::{
anyhow,
clap::{self, Args, Subcommand},
#[derive(Debug)]
enum Emission {
Block(bdk_bitcoind_rpc::BlockEvent<Block>),
- Mempool(Vec<(Transaction, u64)>),
+ Mempool(bdk_bitcoind_rpc::MempoolEvent),
Tip(u32),
}
let chain_tip = chain.lock().unwrap().tip();
let rpc_client = rpc_args.new_client()?;
- let mut emitter = Emitter::new(&rpc_client, chain_tip, fallback_height);
+ let mut emitter = Emitter::new(&rpc_client, chain_tip, fallback_height, HashSet::new());
let mut db_stage = ChangeSet::default();
let mut last_db_commit = Instant::now();
let graph_changeset = graph
.lock()
.unwrap()
- .batch_insert_relevant_unconfirmed(mempool_txs);
+ .batch_insert_relevant_unconfirmed(mempool_txs.new_txs);
{
let db = &mut *db.lock().unwrap();
db_stage.merge(ChangeSet {
let (tx, rx) = std::sync::mpsc::sync_channel::<Emission>(CHANNEL_BOUND);
let emission_jh = std::thread::spawn(move || -> anyhow::Result<()> {
let rpc_client = rpc_args.new_client()?;
- let mut emitter = Emitter::new(&rpc_client, last_cp, fallback_height);
+ let mut emitter =
+ Emitter::new(&rpc_client, last_cp, fallback_height, HashSet::new());
let mut block_count = rpc_client.get_block_count()? as u32;
tx.send(Emission::Tip(block_count))?;
(chain_changeset, graph_changeset)
}
Emission::Mempool(mempool_txs) => {
- let graph_changeset = graph.batch_insert_relevant_unconfirmed(mempool_txs);
+ let mut graph_changeset =
+ graph.batch_insert_relevant_unconfirmed(mempool_txs.new_txs.clone());
+ for txid in mempool_txs.evicted_txids {
+ graph_changeset.merge(
+ graph.insert_evicted_at(txid, mempool_txs.latest_update_time),
+ );
+ }
(local_chain::ChangeSet::default(), graph_changeset)
}
Emission::Tip(h) => {