/// We include a chain suffix of a certain length for the purpose of robustness.
const CHAIN_SUFFIX_LENGTH: u32 = 8;
+/// Maximum batch size for proof validation requests
+const MAX_BATCH_SIZE: usize = 100;
+
/// Wrapper around an [`electrum_client::ElectrumApi`] which includes an internal in-memory
/// transaction cache to avoid re-fetching already downloaded transactions.
#[derive(Debug)]
tx_cache: Mutex<HashMap<Txid, Arc<Transaction>>>,
/// The header cache
block_header_cache: Mutex<HashMap<u32, Header>>,
+ /// Cache of transaction anchors
+ anchor_cache: Mutex<HashMap<(Txid, BlockHash), ConfirmationBlockTime>>,
}
impl<E: ElectrumApi> BdkElectrumClient<E> {
inner: client,
tx_cache: Default::default(),
block_header_cache: Default::default(),
+ anchor_cache: Default::default(),
}
}
let mut tx_update = TxUpdate::<ConfirmationBlockTime>::default();
let mut last_active_indices = BTreeMap::<K, u32>::default();
+ let mut pending_anchors = Vec::new();
for keychain in request.keychains() {
let spks = request
.iter_spks(keychain.clone())
.map(|(spk_i, spk)| (spk_i, SpkWithExpectedTxids::from(spk)));
- if let Some(last_active_index) =
- self.populate_with_spks(start_time, &mut tx_update, spks, stop_gap, batch_size)?
- {
+ if let Some(last_active_index) = self.populate_with_spks(
+ start_time,
+ &mut tx_update,
+ spks,
+ stop_gap,
+ batch_size,
+ &mut pending_anchors,
+ )? {
last_active_indices.insert(keychain, last_active_index);
}
}
self.fetch_prev_txout(&mut tx_update)?;
}
+ if !pending_anchors.is_empty() {
+ let anchors = self.batch_fetch_anchors(&pending_anchors)?;
+ for (txid, anchor) in anchors {
+ tx_update.anchors.insert((anchor, txid));
+ }
+ }
+
let chain_update = match tip_and_latest_blocks {
Some((chain_tip, latest_blocks)) => Some(chain_update(
chain_tip,
};
let mut tx_update = TxUpdate::<ConfirmationBlockTime>::default();
+ let mut pending_anchors = Vec::new();
self.populate_with_spks(
start_time,
&mut tx_update,
.map(|(i, spk)| (i as u32, spk)),
usize::MAX,
batch_size,
+ &mut pending_anchors,
+ )?;
+ self.populate_with_txids(
+ start_time,
+ &mut tx_update,
+ request.iter_txids(),
+ &mut pending_anchors,
+ )?;
+ self.populate_with_outpoints(
+ start_time,
+ &mut tx_update,
+ request.iter_outpoints(),
+ &mut pending_anchors,
)?;
- self.populate_with_txids(start_time, &mut tx_update, request.iter_txids())?;
- self.populate_with_outpoints(start_time, &mut tx_update, request.iter_outpoints())?;
// Fetch previous `TxOut`s for fee calculation if flag is enabled.
if fetch_prev_txouts {
self.fetch_prev_txout(&mut tx_update)?;
}
+ if !pending_anchors.is_empty() {
+ let anchors = self.batch_fetch_anchors(&pending_anchors)?;
+ for (txid, anchor) in anchors {
+ tx_update.anchors.insert((anchor, txid));
+ }
+ }
+
let chain_update = match tip_and_latest_blocks {
Some((chain_tip, latest_blocks)) => Some(chain_update(
chain_tip,
mut spks_with_expected_txids: impl Iterator<Item = (u32, SpkWithExpectedTxids)>,
stop_gap: usize,
batch_size: usize,
+ pending_anchors: &mut Vec<(Txid, usize)>,
) -> Result<Option<u32>, Error> {
- let mut unused_spk_count = 0_usize;
- let mut last_active_index = Option::<u32>::None;
+ let mut unused_spk_count = 0;
+ let mut last_active_index = None;
loop {
let spks = (0..batch_size)
.map_while(|_| spks_with_expected_txids.next())
.collect::<Vec<_>>();
if spks.is_empty() {
- return Ok(last_active_index);
+ break;
}
let spk_histories = self
for ((spk_index, spk), spk_history) in spks.into_iter().zip(spk_histories) {
if spk_history.is_empty() {
- unused_spk_count = unused_spk_count.saturating_add(1);
+ unused_spk_count += 1;
if unused_spk_count >= stop_gap {
- return Ok(last_active_index);
+ break;
}
} else {
last_active_index = Some(spk_index);
match tx_res.height.try_into() {
// Returned heights 0 & -1 are reserved for unconfirmed txs.
Ok(height) if height > 0 => {
- self.validate_merkle_for_anchor(tx_update, tx_res.tx_hash, height)?;
+ pending_anchors.push((tx_res.tx_hash, height));
}
_ => {
tx_update.seen_ats.insert((tx_res.tx_hash, start_time));
}
}
}
+
+ Ok(last_active_index)
}
/// Populate the `tx_update` with associated transactions/anchors of `outpoints`.
start_time: u64,
tx_update: &mut TxUpdate<ConfirmationBlockTime>,
outpoints: impl IntoIterator<Item = OutPoint>,
+ pending_anchors: &mut Vec<(Txid, usize)>,
) -> Result<(), Error> {
for outpoint in outpoints {
let op_txid = outpoint.txid;
match res.height.try_into() {
// Returned heights 0 & -1 are reserved for unconfirmed txs.
Ok(height) if height > 0 => {
- self.validate_merkle_for_anchor(tx_update, res.tx_hash, height)?;
+ pending_anchors.push((res.tx_hash, height));
}
_ => {
tx_update.seen_ats.insert((res.tx_hash, start_time));
match res.height.try_into() {
// Returned heights 0 & -1 are reserved for unconfirmed txs.
Ok(height) if height > 0 => {
- self.validate_merkle_for_anchor(tx_update, res.tx_hash, height)?;
+ pending_anchors.push((res.tx_hash, height));
}
_ => {
tx_update.seen_ats.insert((res.tx_hash, start_time));
}
}
}
+
Ok(())
}
start_time: u64,
tx_update: &mut TxUpdate<ConfirmationBlockTime>,
txids: impl IntoIterator<Item = Txid>,
+ pending_anchors: &mut Vec<(Txid, usize)>,
) -> Result<(), Error> {
for txid in txids {
let tx = match self.fetch_tx(txid) {
match r.height.try_into() {
// Returned heights 0 & -1 are reserved for unconfirmed txs.
Ok(height) if height > 0 => {
- self.validate_merkle_for_anchor(tx_update, txid, height)?;
+ pending_anchors.push((txid, height));
}
_ => {
tx_update.seen_ats.insert((r.tx_hash, start_time));
tx_update.txs.push(tx);
}
+
Ok(())
}
- // Helper function which checks if a transaction is confirmed by validating the merkle proof.
- // An anchor is inserted if the transaction is validated to be in a confirmed block.
- fn validate_merkle_for_anchor(
+ /// Batch validate Merkle proofs, cache each confirmation anchor, and return them.
+ fn batch_fetch_anchors(
&self,
- tx_update: &mut TxUpdate<ConfirmationBlockTime>,
- txid: Txid,
- confirmation_height: usize,
- ) -> Result<(), Error> {
- if let Ok(merkle_res) = self
- .inner
- .transaction_get_merkle(&txid, confirmation_height)
+ txs_with_heights: &[(Txid, usize)],
+ ) -> Result<Vec<(Txid, ConfirmationBlockTime)>, Error> {
+ let mut results = Vec::with_capacity(txs_with_heights.len());
+ let mut to_fetch = Vec::new();
+
+ // Build a map for height to block hash conversions. This is for obtaining block hash data
+ // with minimum `fetch_header` calls.
+ let mut height_to_hash: HashMap<u32, BlockHash> = HashMap::new();
+ for &(_, height) in txs_with_heights {
+ let h = height as u32;
+ if !height_to_hash.contains_key(&h) {
+ // Try to obtain hash from the header cache, or fetch the header if absent.
+ let hash = self.fetch_header(h)?.block_hash();
+ height_to_hash.insert(h, hash);
+ }
+ }
+
+ // Check cache.
{
- let mut header = self.fetch_header(merkle_res.block_height as u32)?;
- let mut is_confirmed_tx = electrum_client::utils::validate_merkle_proof(
- &txid,
- &header.merkle_root,
- &merkle_res,
- );
-
- // Merkle validation will fail if the header in `block_header_cache` is outdated, so we
- // want to check if there is a new header and validate against the new one.
- if !is_confirmed_tx {
- header = self.update_header(merkle_res.block_height as u32)?;
- is_confirmed_tx = electrum_client::utils::validate_merkle_proof(
+ let anchor_cache = self.anchor_cache.lock().unwrap();
+ for &(txid, height) in txs_with_heights {
+ let h = height as u32;
+ let hash = height_to_hash[&h];
+ if let Some(anchor) = anchor_cache.get(&(txid, hash)) {
+ results.push((txid, *anchor));
+ } else {
+ to_fetch.push((txid, height, hash));
+ }
+ }
+ }
+
+ // Fetch missing proofs in batches
+ for chunk in to_fetch.chunks(MAX_BATCH_SIZE) {
+ for &(txid, height, hash) in chunk {
+ // Fetch the raw proof.
+ let proof = self.inner.transaction_get_merkle(&txid, height)?;
+
+ // Validate against header, retrying once on stale header.
+ let mut header = self.fetch_header(height as u32)?;
+ let mut valid = electrum_client::utils::validate_merkle_proof(
&txid,
&header.merkle_root,
- &merkle_res,
+ &proof,
);
- }
+ if !valid {
+ header = self.update_header(height as u32)?;
+ valid = electrum_client::utils::validate_merkle_proof(
+ &txid,
+ &header.merkle_root,
+ &proof,
+ );
+ }
- if is_confirmed_tx {
- tx_update.anchors.insert((
- ConfirmationBlockTime {
+ // Build and cache the anchor if merkle proof is valid.
+ if valid {
+ let anchor = ConfirmationBlockTime {
confirmation_time: header.time as u64,
block_id: BlockId {
- height: merkle_res.block_height as u32,
+ height: height as u32,
hash: header.block_hash(),
},
- },
- txid,
- ));
+ };
+ self.anchor_cache
+ .lock()
+ .unwrap()
+ .insert((txid, hash), anchor);
+ results.push((txid, anchor));
+ }
}
}
+
+ Ok(results)
+ }
+
+ /// Validate a single transaction’s Merkle proof, cache its confirmation anchor, and update.
+ #[allow(dead_code)]
+ fn validate_anchor_for_update(
+ &self,
+ tx_update: &mut TxUpdate<ConfirmationBlockTime>,
+ txid: Txid,
+ confirmation_height: usize,
+ ) -> Result<(), Error> {
+ let anchors = self.batch_fetch_anchors(&[(txid, confirmation_height)])?;
+ for (txid, anchor) in anchors {
+ tx_update.anchors.insert((anchor, txid));
+ }
Ok(())
}