//! [`Emitter`] is the main structure which sources blockchain data from [`bitcoincore_rpc::Client`].
//!
//! To only get block updates (exclude mempool transactions), the caller can use
-//! [`Emitter::next_block`] or/and [`Emitter::next_header`] until it returns `Ok(None)` (which means
-//! the chain tip is reached). A separate method, [`Emitter::mempool`] can be used to emit the whole
-//! mempool.
+//! [`Emitter::next_block`] until it returns `Ok(None)` (which means the chain tip is reached). A
+//! separate method, [`Emitter::mempool`] can be used to emit the whole mempool.
#![warn(missing_docs)]
use bdk_core::{BlockId, CheckPoint};
use bitcoin::{Block, BlockHash, Transaction, Txid};
-use bitcoincore_rpc::bitcoincore_rpc_json;
-use std::collections::HashSet;
+use bitcoincore_rpc::{bitcoincore_rpc_json, RpcApi};
+use std::{collections::HashSet, ops::Deref};
pub mod bip158;
/// Refer to [module-level documentation] for more.
///
/// [module-level documentation]: crate
-pub struct Emitter<'c, C> {
- client: &'c C,
+pub struct Emitter<C> {
+ client: C,
start_height: u32,
/// The checkpoint of the last-emitted block that is in the best chain. If it is later found
expected_mempool_txids: HashSet<Txid>,
}
-impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
+/// Indicates that there are no initially expected mempool transactions.
+///
+/// Pass this to the `expected_mempool_txids` field of [`Emitter::new`] when the wallet is known
+/// to start empty (i.e. with no unconfirmed transactions).
+pub const NO_EXPECTED_MEMPOOL_TXIDS: core::iter::Empty<Txid> = core::iter::empty();
+
+impl<C> Emitter<C>
+where
+ C: Deref,
+ C::Target: RpcApi,
+{
/// Construct a new [`Emitter`].
///
/// `last_cp` informs the emitter of the chain we are starting off with. This way, the emitter
/// original chain).
///
/// `expected_mempool_txids` is the initial set of unconfirmed txids provided by the wallet.
- /// This allows the [`Emitter`] to inform the wallet about relevant mempool evictions.
+ /// This allows the [`Emitter`] to inform the wallet about relevant mempool evictions. If it is
+ /// known that the wallet is empty, [`NO_EXPECTED_MEMPOOL_TXIDS`] can be used.
pub fn new(
- client: &'c C,
+ client: C,
last_cp: CheckPoint,
start_height: u32,
- expected_mempool_txids: HashSet<Txid>,
+ expected_mempool_txids: impl IntoIterator<Item = impl Into<Txid>>,
) -> Self {
Self {
client,
last_block: None,
last_mempool_time: 0,
last_mempool_tip: None,
- expected_mempool_txids,
+ expected_mempool_txids: expected_mempool_txids.into_iter().map(Into::into).collect(),
}
}
/// of height `h-1`, we want to re-emit this transaction until the receiver has seen the block
/// at height `h`.
pub fn mempool(&mut self) -> Result<MempoolEvent, bitcoincore_rpc::Error> {
- let client = self.client;
+ let client = &*self.client;
// This is the emitted tip height during the last mempool emission.
let prev_mempool_tip = self
/// Emit the next block height and block (if any).
pub fn next_block(&mut self) -> Result<Option<BlockEvent<Block>>, bitcoincore_rpc::Error> {
- if let Some((checkpoint, block)) = poll(self, |hash| self.client.get_block(hash))? {
+ if let Some((checkpoint, block)) = poll(self, move |hash, client| client.get_block(hash))? {
// Stop tracking unconfirmed transactions that have been confirmed in this block.
for tx in &block.txdata {
self.expected_mempool_txids.remove(&tx.compute_txid());
/// A newly emitted block from [`Emitter`].
#[derive(Debug)]
pub struct BlockEvent<B> {
- /// Either a full [`Block`] or [`Header`] of the new block.
+ /// The block.
pub block: B,
/// The checkpoint of the new block.
fn poll_once<C>(emitter: &Emitter<C>) -> Result<PollResponse, bitcoincore_rpc::Error>
where
- C: bitcoincore_rpc::RpcApi,
+ C: Deref,
+ C::Target: RpcApi,
{
- let client = emitter.client;
+ let client = &*emitter.client;
if let Some(last_res) = &emitter.last_block {
let next_hash = if last_res.height < emitter.start_height as _ {
get_item: F,
) -> Result<Option<(CheckPoint, V)>, bitcoincore_rpc::Error>
where
- C: bitcoincore_rpc::RpcApi,
- F: Fn(&BlockHash) -> Result<V, bitcoincore_rpc::Error>,
+ C: Deref,
+ C::Target: RpcApi,
+ F: Fn(&BlockHash, &C::Target) -> Result<V, bitcoincore_rpc::Error>,
{
loop {
match poll_once(emitter)? {
PollResponse::Block(res) => {
let height = res.height as u32;
let hash = res.hash;
- let item = get_item(&hash)?;
+ let item = get_item(&hash, &emitter.client)?;
let new_cp = emitter
.last_cp
#[cfg(test)]
mod test {
- use crate::{bitcoincore_rpc::RpcApi, Emitter};
- use bdk_bitcoind_rpc::bitcoincore_rpc::bitcoin::Txid;
+ use crate::{bitcoincore_rpc::RpcApi, Emitter, NO_EXPECTED_MEMPOOL_TXIDS};
use bdk_chain::local_chain::LocalChain;
use bdk_testenv::{anyhow, TestEnv};
- use bitcoin::{hashes::Hash, Address, Amount, ScriptBuf, WScriptHash};
+ use bitcoin::{hashes::Hash, Address, Amount, ScriptBuf, Txid, WScriptHash};
use std::collections::HashSet;
#[test]
let env = TestEnv::new()?;
let chain = LocalChain::from_genesis_hash(env.rpc_client().get_block_hash(0)?).0;
let chain_tip = chain.tip();
- let mut emitter = Emitter::new(env.rpc_client(), chain_tip.clone(), 1, HashSet::new());
+ let mut emitter = Emitter::new(
+ env.rpc_client(),
+ chain_tip.clone(),
+ 1,
+ NO_EXPECTED_MEMPOOL_TXIDS,
+ );
env.mine_blocks(100, None)?;
while emitter.next_block()?.is_some() {}
-use std::collections::{BTreeMap, BTreeSet, HashSet};
+use std::{
+ collections::{BTreeMap, BTreeSet, HashSet},
+ ops::Deref,
+};
-use bdk_bitcoind_rpc::Emitter;
+use bdk_bitcoind_rpc::{Emitter, NO_EXPECTED_MEMPOOL_TXIDS};
use bdk_chain::{
bitcoin::{Address, Amount, Txid},
local_chain::{CheckPoint, LocalChain},
let env = TestEnv::new()?;
let network_tip = env.rpc_client().get_block_count()?;
let (mut local_chain, _) = LocalChain::from_genesis_hash(env.rpc_client().get_block_hash(0)?);
- let mut emitter = Emitter::new(env.rpc_client(), local_chain.tip(), 0, HashSet::new());
+ let mut emitter = Emitter::new(
+ env.rpc_client(),
+ local_chain.tip(),
+ 0,
+ NO_EXPECTED_MEMPOOL_TXIDS,
+ );
// Mine some blocks and return the actual block hashes.
// Because initializing `ElectrsD` already mines some blocks, we must include those too when
index
});
- let emitter = &mut Emitter::new(env.rpc_client(), chain.tip(), 0, HashSet::new());
+ let emitter = &mut Emitter::new(env.rpc_client(), chain.tip(), 0, NO_EXPECTED_MEMPOOL_TXIDS);
while let Some(emission) = emitter.next_block()? {
let height = emission.block_height();
hash: env.rpc_client().get_block_hash(0)?,
}),
EMITTER_START_HEIGHT as _,
- HashSet::new(),
+ NO_EXPECTED_MEMPOOL_TXIDS,
);
env.mine_blocks(CHAIN_TIP_HEIGHT, None)?;
emitter: &mut Emitter<C>,
) -> anyhow::Result<()>
where
- C: bitcoincore_rpc::RpcApi,
+ C: Deref,
+ C::Target: bitcoincore_rpc::RpcApi,
{
while let Some(emission) = emitter.next_block()? {
let height = emission.block_height();
hash: env.rpc_client().get_block_hash(0)?,
}),
0,
- HashSet::new(),
+ NO_EXPECTED_MEMPOOL_TXIDS,
);
// setup addresses
hash: env.rpc_client().get_block_hash(0)?,
}),
0,
- HashSet::new(),
+ NO_EXPECTED_MEMPOOL_TXIDS,
);
// mine blocks and sync up emitter
hash: env.rpc_client().get_block_hash(0)?,
}),
0,
- HashSet::new(),
+ NO_EXPECTED_MEMPOOL_TXIDS,
);
// mine blocks to get initial balance, sync emitter up to tip
hash: env.rpc_client().get_block_hash(0)?,
}),
0,
- HashSet::new(),
+ NO_EXPECTED_MEMPOOL_TXIDS,
);
// mine blocks to get initial balance
hash: env.rpc_client().get_block_hash(0)?,
}),
(PREMINE_COUNT - 2) as u32,
- HashSet::new(),
+ NO_EXPECTED_MEMPOOL_TXIDS,
);
// mine 101 blocks
use std::{
- collections::HashSet,
path::PathBuf,
sync::{
atomic::{AtomicBool, Ordering},
fallback_height, ..
} = rpc_args;
- let chain_tip = chain.lock().unwrap().tip();
let rpc_client = rpc_args.new_client()?;
- let mut emitter = Emitter::new(&rpc_client, chain_tip, fallback_height, HashSet::new());
+ let mut emitter = {
+ let chain = chain.lock().unwrap();
+ let graph = graph.lock().unwrap();
+ Emitter::new(
+ &rpc_client,
+ chain.tip(),
+ fallback_height,
+ graph
+ .graph()
+ .list_canonical_txs(
+ &*chain,
+ chain.tip().block_id(),
+ CanonicalizationParams::default(),
+ )
+ .filter(|tx| tx.chain_position.is_unconfirmed()),
+ )
+ };
let mut db_stage = ChangeSet::default();
let mut last_db_commit = Instant::now();
} = rpc_args;
let sigterm_flag = start_ctrlc_handler();
- let last_cp = chain.lock().unwrap().tip();
+ let rpc_client = Arc::new(rpc_args.new_client()?);
+ let mut emitter = {
+ let chain = chain.lock().unwrap();
+ let graph = graph.lock().unwrap();
+ Emitter::new(
+ rpc_client.clone(),
+ chain.tip(),
+ fallback_height,
+ graph
+ .graph()
+ .list_canonical_txs(
+ &*chain,
+ chain.tip().block_id(),
+ CanonicalizationParams::default(),
+ )
+ .filter(|tx| tx.chain_position.is_unconfirmed()),
+ )
+ };
println!(
"[{:>10}s] starting emitter thread...",
);
let (tx, rx) = std::sync::mpsc::sync_channel::<Emission>(CHANNEL_BOUND);
let emission_jh = std::thread::spawn(move || -> anyhow::Result<()> {
- let rpc_client = rpc_args.new_client()?;
- let mut emitter =
- Emitter::new(&rpc_client, last_cp, fallback_height, HashSet::new());
-
let mut block_count = rpc_client.get_block_count()? as u32;
tx.send(Emission::Tip(block_count))?;