/// The last emitted block during our last mempool emission. This is used to determine whether
/// there has been a reorg since our last mempool emission.
- last_mempool_tip: Option<(u32, BlockHash)>,
+ last_mempool_tip: Option<u32>,
}
impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
pub fn mempool(&mut self) -> Result<Vec<(Transaction, u64)>, bitcoincore_rpc::Error> {
let client = self.client;
- let prev_mempool_tip = match self.last_mempool_tip {
- // use 'avoid-re-emission' logic if there is no reorg
- Some((height, hash)) if self.emitted_blocks.get(&height) == Some(&hash) => height,
- _ => 0,
- };
-
+ // This is the emitted tip height during the last mempool emission.
+ let prev_mempool_tip = self
+ .last_mempool_tip
+ // We use `start_height - 1` as we cannot guarantee that the block at
+ // `start_height` has been emitted.
+ .unwrap_or(self.start_height.saturating_sub(1));
+
+ // Mempool txs come with a timestamp of when the tx is introduced to the mempool. We keep
+ // track of the latest mempool tx's timestamp to determine whether we have seen a tx
+ // before. `prev_mempool_time` is the previous timestamp and `last_time` records what will
+ // be the new latest timestamp.
let prev_mempool_time = self.last_mempool_time;
let mut latest_time = prev_mempool_time;
.collect::<Result<Vec<_>, _>>()?;
self.last_mempool_time = latest_time;
- self.last_mempool_tip = self
- .emitted_blocks
- .iter()
- .last()
- .map(|(&height, &hash)| (height, hash));
+ self.last_mempool_tip = self.emitted_blocks.iter().last().map(|(&height, _)| height);
Ok(txs_to_emit)
}
continue;
}
PollResponse::AgreementFound(res) => {
- emitter.emitted_blocks.split_off(&(res.height as u32 + 1));
+ let agreement_h = res.height as u32;
+
+ // get rid of evicted blocks
+ emitter.emitted_blocks.split_off(&(agreement_h + 1));
+
+ // The tip during the last mempool emission needs to in the best chain, we reduce
+ // it if it is not.
+ if let Some(h) = emitter.last_mempool_tip.as_mut() {
+ if *h > agreement_h {
+ *h = agreement_h;
+ }
+ }
emitter.last_block = Some(res);
continue;
}
// must receive mined block which will confirm the transactions.
{
let (height, block) = emitter.next_block()?.expect("must get mined block");
- let _ = chain.apply_update(block_to_chain_update(&block, height))?;
+ let _ = chain
+ .apply_update(CheckPoint::from_header(&block.header, height).into_update(false))?;
let indexed_additions = indexed_tx_graph.apply_block_relevant(block, height);
assert!(indexed_additions.graph.txs.is_empty());
assert!(indexed_additions.graph.txouts.is_empty());
env.send(&addr, Amount::from_sat(2100))?;
}
- // perform reorgs at different heights
- for reorg_count in 1..TIP_DIFF {
- // sync emitter to tip
- while emitter.next_header()?.is_some() {}
+ // sync emitter to tip, first mempool emission should include all txs (as we haven't emitted
+ // from the mempool yet)
+ while emitter.next_header()?.is_some() {}
+ assert_eq!(
+ emitter
+ .mempool()?
+ .into_iter()
+ .map(|(tx, _)| tx.txid())
+ .collect::<BTreeSet<_>>(),
+ env.client
+ .get_raw_mempool()?
+ .into_iter()
+ .collect::<BTreeSet<_>>(),
+ "first mempool emission should include all txs",
+ );
+ // perform reorgs at different heights, these reorgs will not comfirm transactions in the
+ // mempool
+ for reorg_count in 1..TIP_DIFF {
println!("REORG COUNT: {}", reorg_count);
env.reorg_empty_blocks(reorg_count)?;
- // we recalculate this at every loop as reorgs may evict transactions from mempool
- let tx_introductions = env
+ // This is a map of mempool txids to tip height where the tx was introduced to the mempool
+ // we recalculate this at every loop as reorgs may evict transactions from mempool. We use
+ // the introduction height to determine whether we expect a tx to appear in a mempool
+ // emission.
+ // TODO: How can have have reorg logic in `TestEnv` NOT blacklast old blocks first?
+ let tx_introductions = dbg!(env
.client
.get_raw_mempool_verbose()?
.into_iter()
.map(|(txid, entry)| (txid, entry.height as usize))
- .collect::<BTreeMap<_, _>>();
+ .collect::<BTreeMap<_, _>>());
+ // `next_header` emits the replacement block of the reorg
if let Some((height, _)) = emitter.next_header()? {
- // the mempool emission (that follows the first block emission after reorg) should return
- // the entire mempool contents
+ println!("\t- replacement height: {}", height);
+
+ // the mempool emission (that follows the first block emission after reorg) should only
+ // include mempool txs introduced at reorg height or greater
let mempool = emitter
.mempool()?
.into_iter()
.map(|(tx, _)| tx.txid())
.collect::<BTreeSet<_>>();
- let exp_mempool = tx_introductions.keys().copied().collect::<BTreeSet<_>>();
+ let exp_mempool = tx_introductions
+ .iter()
+ .filter(|(_, &intro_h)| intro_h >= (height as usize))
+ .map(|(&txid, _)| txid)
+ .collect::<BTreeSet<_>>();
assert_eq!(
mempool, exp_mempool,
- "the first mempool emission after reorg should include all mempool txs"
+ "the first mempool emission after reorg should only include mempool txs introduced at reorg height or greater"
);
let mempool = emitter
.collect::<Vec<_>>(),
);
}
+
+ // sync emitter to tip
+ while emitter.next_header()?.is_some() {}
}
Ok(())