# For no-std, remember to enable the bitcoin/no-std feature
bitcoin = { version = "0.30", default-features = false }
bitcoincore-rpc = { version = "0.17" }
+bdk_chain = { path = "../chain", version = "0.5", default-features = false }
[dev-dependencies]
-bdk_chain = { path = "../chain", version = "0.5", features = ["serde", "miniscript"] }
bitcoind = { version = "0.33", features = ["25_0"] }
anyhow = { version = "1" }
[features]
default = ["std"]
-std = ["bitcoin/std"]
-serde = ["bitcoin/serde"]
+std = ["bitcoin/std", "bdk_chain/std"]
+serde = ["bitcoin/serde", "bdk_chain/serde"]
//! mempool.
#![warn(missing_docs)]
-use std::collections::BTreeMap;
-
+use bdk_chain::{local_chain::CheckPoint, BlockId};
use bitcoin::{block::Header, Block, BlockHash, Transaction};
pub use bitcoincore_rpc;
use bitcoincore_rpc::bitcoincore_rpc_json;
client: &'c C,
start_height: u32,
- emitted_blocks: BTreeMap<u32, BlockHash>,
+ last_cp: Option<CheckPoint>,
last_block: Option<bitcoincore_rpc_json::GetBlockResult>,
/// The latest first-seen epoch of emitted mempool transactions. This is used to determine
}
impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
- /// Constructs a new [`Emitter`] with the provided [`bitcoincore_rpc::Client`].
+ /// Construct a new [`Emitter`] with the given RPC `client` and `start_height`.
///
/// `start_height` is the block height to start emitting blocks from.
- pub fn new(client: &'c C, start_height: u32) -> Self {
+ pub fn from_height(client: &'c C, start_height: u32) -> Self {
Self {
client,
start_height,
- emitted_blocks: BTreeMap::new(),
+ last_cp: None,
+ last_block: None,
+ last_mempool_time: 0,
+ last_mempool_tip: None,
+ }
+ }
+
+ /// Construct a new [`Emitter`] with the given RPC `client` and `checkpoint`.
+ ///
+ /// `checkpoint` is used to find the latest block which is still part of the best chain. The
+ /// [`Emitter`] will emit blocks starting right above this block.
+ pub fn from_checkpoint(client: &'c C, checkpoint: CheckPoint) -> Self {
+ Self {
+ client,
+ start_height: 0,
+ last_cp: Some(checkpoint),
last_block: None,
last_mempool_time: 0,
last_mempool_tip: None,
.collect::<Result<Vec<_>, _>>()?;
self.last_mempool_time = latest_time;
- self.last_mempool_tip = self.emitted_blocks.iter().last().map(|(&height, _)| height);
+ self.last_mempool_tip = self.last_cp.as_ref().map(|cp| cp.height());
Ok(txs_to_emit)
}
NoMoreBlocks,
/// Fetched block is not in the best chain.
BlockNotInBestChain,
- AgreementFound(bitcoincore_rpc_json::GetBlockResult),
+ AgreementFound(bitcoincore_rpc_json::GetBlockResult, CheckPoint),
AgreementPointNotFound,
}
let client = emitter.client;
if let Some(last_res) = &emitter.last_block {
- assert!(!emitter.emitted_blocks.is_empty());
+ assert!(
+ emitter.last_cp.is_some(),
+ "must not have block result without last cp"
+ );
let next_hash = match last_res.nextblockhash {
None => return Ok(PollResponse::NoMoreBlocks),
return Ok(PollResponse::Block(res));
}
- if emitter.emitted_blocks.is_empty() {
+ if emitter.last_cp.is_none() {
let hash = client.get_block_hash(emitter.start_height as _)?;
let res = client.get_block_info(&hash)?;
return Ok(PollResponse::Block(res));
}
- for (&_, hash) in emitter.emitted_blocks.iter().rev() {
- let res = client.get_block_info(hash)?;
+ for cp in emitter.last_cp.iter().flat_map(CheckPoint::iter) {
+ let res = client.get_block_info(&cp.hash())?;
if res.confirmations < 0 {
// block is not in best chain
continue;
}
// agreement point found
- return Ok(PollResponse::AgreementFound(res));
+ return Ok(PollResponse::AgreementFound(res, cp));
}
Ok(PollResponse::AgreementPointNotFound)
match poll_once(emitter)? {
PollResponse::Block(res) => {
let height = res.height as u32;
- let item = get_item(&res.hash)?;
- assert_eq!(emitter.emitted_blocks.insert(height, res.hash), None);
+ let hash = res.hash;
+ let item = get_item(&hash)?;
+
+ let this_id = BlockId { height, hash };
+ let prev_id = res.previousblockhash.map(|prev_hash| BlockId {
+ height: height - 1,
+ hash: prev_hash,
+ });
+
+ match (&mut emitter.last_cp, prev_id) {
+ (Some(cp), _) => *cp = cp.clone().push(this_id).expect("must push"),
+ (last_cp, None) => *last_cp = Some(CheckPoint::new(this_id)),
+ // When the receiver constructs a local_chain update from a block, the previous
+ // checkpoint is also included in the update. We need to reflect this state in
+ // `Emitter::last_cp` as well.
+ (last_cp, Some(prev_id)) => {
+ *last_cp = Some(CheckPoint::new(prev_id).push(this_id).expect("must push"))
+ }
+ }
+
emitter.last_block = Some(res);
+
return Ok(Some((height, item)));
}
PollResponse::NoMoreBlocks => {
emitter.last_block = None;
continue;
}
- PollResponse::AgreementFound(res) => {
+ PollResponse::AgreementFound(res, cp) => {
let agreement_h = res.height as u32;
// get rid of evicted blocks
- emitter.emitted_blocks.split_off(&(agreement_h + 1));
+ emitter.last_cp = Some(cp);
// The tip during the last mempool emission needs to in the best chain, we reduce
// it if it is not.
continue;
}
PollResponse::AgreementPointNotFound => {
- emitter.emitted_blocks.clear();
+ // We want to clear `last_cp` and set `start_height` to the first checkpoint's
+ // height. This way, the first checkpoint in `LocalChain` can be replaced.
+ if let Some(last_cp) = emitter.last_cp.take() {
+ emitter.start_height = last_cp.height();
+ }
emitter.last_block = None;
continue;
}
pub fn test_sync_local_chain() -> anyhow::Result<()> {
let env = TestEnv::new()?;
let mut local_chain = LocalChain::default();
- let mut emitter = Emitter::new(&env.client, 0);
+ let mut emitter = Emitter::from_height(&env.client, 0);
// mine some blocks and returned the actual block hashes
let exp_hashes = {
index
});
- let emitter = &mut Emitter::new(&env.client, 0);
+ let emitter = &mut Emitter::from_height(&env.client, 0);
while let Some((height, block)) = emitter.next_block()? {
let _ = chain.apply_update(block_to_chain_update(&block, height))?;
const CHAIN_TIP_HEIGHT: usize = 110;
let env = TestEnv::new()?;
- let mut emitter = Emitter::new(&env.client, EMITTER_START_HEIGHT as _);
+ let mut emitter = Emitter::from_height(&env.client, EMITTER_START_HEIGHT as _);
env.mine_blocks(CHAIN_TIP_HEIGHT, None)?;
while emitter.next_header()?.is_some() {}
const SEND_AMOUNT: Amount = Amount::from_sat(10_000);
let env = TestEnv::new()?;
- let mut emitter = Emitter::new(&env.client, 0);
+ let mut emitter = Emitter::from_height(&env.client, 0);
// setup addresses
let addr_to_mine = env.client.get_new_address(None, None)?.assume_checked();
const MEMPOOL_TX_COUNT: usize = 2;
let env = TestEnv::new()?;
- let mut emitter = Emitter::new(&env.client, 0);
+ let mut emitter = Emitter::from_height(&env.client, 0);
// mine blocks and sync up emitter
let addr = env.client.get_new_address(None, None)?.assume_checked();
const MEMPOOL_TX_COUNT: usize = 21;
let env = TestEnv::new()?;
- let mut emitter = Emitter::new(&env.client, 0);
+ let mut emitter = Emitter::from_height(&env.client, 0);
// mine blocks to get initial balance, sync emitter up to tip
let addr = env.client.get_new_address(None, None)?.assume_checked();
const PREMINE_COUNT: usize = 101;
let env = TestEnv::new()?;
- let mut emitter = Emitter::new(&env.client, 0);
+ let mut emitter = Emitter::from_height(&env.client, 0);
// mine blocks to get initial balance
let addr = env.client.get_new_address(None, None)?.assume_checked();
let env = TestEnv::new()?;
// start height is 99
- let mut emitter = Emitter::new(&env.client, (PREMINE_COUNT - 2) as u32);
+ let mut emitter = Emitter::from_height(&env.client, (PREMINE_COUNT - 2) as u32);
// mine 101 blocks
env.mine_blocks(PREMINE_COUNT, None)?;
const DB_PATH: &str = ".bdk_example_rpc.db";
const CHANNEL_BOUND: usize = 10;
-/// The block depth which we assume no reorgs can happen at.
-const ASSUME_FINAL_DEPTH: u32 = 6;
/// Delay for printing status to stdout.
const STDOUT_PRINT_DELAY: Duration = Duration::from_secs(6);
/// Delay between mempool emissions.
let mut db = db.lock().unwrap();
graph.index.set_lookahead_for_all(lookahead);
- // we start at a height lower than last-seen tip in case of reorgs
- let start_height = chain.tip().as_ref().map_or(fallback_height, |cp| {
- cp.height().saturating_sub(ASSUME_FINAL_DEPTH)
- });
let rpc_client = rpc_args.new_client()?;
- let mut emitter = Emitter::new(&rpc_client, start_height);
+ let mut emitter = match chain.tip() {
+ Some(cp) => Emitter::from_checkpoint(&rpc_client, cp),
+ None => Emitter::from_height(&rpc_client, fallback_height),
+ };
let mut last_db_commit = Instant::now();
let mut last_print = Instant::now();
while let Some((height, block)) = emitter.next_block()? {
let chain_update =
CheckPoint::from_header(&block.header, height).into_update(false);
- let chain_changeset = chain.apply_update(chain_update)?;
+ let chain_changeset = chain
+ .apply_update(chain_update)
+ .expect("must always apply as we recieve blocks in order from emitter");
let graph_changeset = graph.apply_block_relevant(block, height);
db.stage((chain_changeset, graph_changeset));
let sigterm_flag = start_ctrlc_handler();
graph.lock().unwrap().index.set_lookahead_for_all(lookahead);
- // we start at a height lower than last-seen tip in case of reorgs
- let start_height = chain.lock().unwrap().tip().map_or(fallback_height, |cp| {
- cp.height().saturating_sub(ASSUME_FINAL_DEPTH)
- });
+ let last_cp = chain.lock().unwrap().tip();
let (tx, rx) = std::sync::mpsc::sync_channel::<Emission>(CHANNEL_BOUND);
let emission_jh = std::thread::spawn(move || -> anyhow::Result<()> {
println!("emitter thread started...");
let rpc_client = rpc_args.new_client()?;
- let mut emitter = Emitter::new(&rpc_client, start_height);
+ let mut emitter = match last_cp {
+ Some(cp) => Emitter::from_checkpoint(&rpc_client, cp),
+ None => Emitter::from_height(&rpc_client, fallback_height),
+ };
let mut block_count = rpc_client.get_block_count()? as u32;
tx.send(Emission::Tip(block_count))?;
Emission::Block { height, block } => {
let chain_update =
CheckPoint::from_header(&block.header, height).into_update(false);
- let chain_changeset = chain.apply_update(chain_update)?;
+ let chain_changeset = chain
+ .apply_update(chain_update)
+ .expect("must always apply as we recieve blocks in order from emitter");
let graph_changeset = graph.apply_block_relevant(block, height);
(chain_changeset, graph_changeset)
}