From: Wei Chen Date: Mon, 26 May 2025 23:18:07 +0000 (+0000) Subject: feat(electrum): batched `Header`s and `script_get_history` X-Git-Tag: bitcoind_rpc-0.21.0~17^2~2 X-Git-Url: http://internal-gitweb-vhost/script/%22https:/database/scripts/trait.StdError.html?a=commitdiff_plain;h=ec4fd971c81e7d3bcf8fbdac15c6df63b934cd70;p=bdk feat(electrum): batched `Header`s and `script_get_history` --- diff --git a/crates/electrum/src/bdk_electrum_client.rs b/crates/electrum/src/bdk_electrum_client.rs index 4e89fdb8..d68f62e1 100644 --- a/crates/electrum/src/bdk_electrum_client.rs +++ b/crates/electrum/src/bdk_electrum_client.rs @@ -70,33 +70,6 @@ impl BdkElectrumClient { Ok(tx) } - /// Fetch block header of given `height`. - /// - /// If it hits the cache it will return the cached version and avoid making the request. - fn fetch_header(&self, height: u32) -> Result { - let block_header_cache = self.block_header_cache.lock().unwrap(); - - if let Some(header) = block_header_cache.get(&height) { - return Ok(*header); - } - - drop(block_header_cache); - - self.update_header(height) - } - - /// Update a block header at given `height`. Returns the updated header. - fn update_header(&self, height: u32) -> Result { - let header = self.inner.block_header(height as usize)?; - - self.block_header_cache - .lock() - .unwrap() - .insert(height, header); - - Ok(header) - } - /// Broadcasts a transaction to the network. /// /// This is a re-export of [`ElectrumApi::transaction_broadcast`]. @@ -292,7 +265,7 @@ impl BdkElectrumClient { let mut unused_spk_count = 0; let mut last_active_index = None; - loop { + 'batch_loop: loop { let spks = (0..batch_size) .map_while(|_| spks_with_expected_txids.next()) .collect::>(); @@ -308,7 +281,7 @@ impl BdkElectrumClient { if spk_history.is_empty() { unused_spk_count += 1; if unused_spk_count >= stop_gap { - break; + break 'batch_loop; } } else { last_active_index = Some(spk_index); @@ -339,13 +312,7 @@ impl BdkElectrumClient { } } } - - if unused_spk_count >= stop_gap { - break; - } } - - Ok(last_active_index) } /// Populate the `tx_update` with associated transactions/anchors of `outpoints`. @@ -359,56 +326,74 @@ impl BdkElectrumClient { outpoints: impl IntoIterator, pending_anchors: &mut Vec<(Txid, usize)>, ) -> Result<(), Error> { - for outpoint in outpoints { - let op_txid = outpoint.txid; - let op_tx = self.fetch_tx(op_txid)?; - let op_txout = match op_tx.output.get(outpoint.vout as usize) { - Some(txout) => txout, - None => continue, - }; - debug_assert_eq!(op_tx.compute_txid(), op_txid); - - // attempt to find the following transactions (alongside their chain positions), and - // add to our sparsechain `update`: - let mut has_residing = false; // tx in which the outpoint resides - let mut has_spending = false; // tx that spends the outpoint - for res in self.inner.script_get_history(&op_txout.script_pubkey)? { - if has_residing && has_spending { - break; + // Collect valid outpoints with their corresponding `spk` and `tx`. + let mut ops_spks_txs = Vec::new(); + for op in outpoints { + if let Ok(tx) = self.fetch_tx(op.txid) { + if let Some(txout) = tx.output.get(op.vout as usize) { + ops_spks_txs.push((op, txout.script_pubkey.clone(), tx)); } + } + } - if !has_residing && res.tx_hash == op_txid { - has_residing = true; - tx_update.txs.push(Arc::clone(&op_tx)); - match res.height.try_into() { - // Returned heights 0 & -1 are reserved for unconfirmed txs. - Ok(height) if height > 0 => { - pending_anchors.push((res.tx_hash, height)); - } - _ => { - tx_update.seen_ats.insert((res.tx_hash, start_time)); - } + // Dedup `spk`s, batch-fetch all histories in one call, and store them in a map. + let unique_spks: Vec<_> = ops_spks_txs + .iter() + .map(|(_, spk, _)| spk.clone()) + .collect::>() + .into_iter() + .collect(); + let histories = self + .inner + .batch_script_get_history(unique_spks.iter().map(|spk| spk.as_script()))?; + let mut spk_map = HashMap::new(); + for (spk, history) in unique_spks.into_iter().zip(histories.into_iter()) { + spk_map.insert(spk, history); + } + + for (outpoint, spk, tx) in ops_spks_txs { + if let Some(spk_history) = spk_map.get(&spk) { + let mut has_residing = false; // tx in which the outpoint resides + let mut has_spending = false; // tx that spends the outpoint + + for res in spk_history { + if has_residing && has_spending { + break; } - } - if !has_spending && res.tx_hash != op_txid { - let res_tx = self.fetch_tx(res.tx_hash)?; - // we exclude txs/anchors that do not spend our specified outpoint(s) - has_spending = res_tx - .input - .iter() - .any(|txin| txin.previous_output == outpoint); - if !has_spending { - continue; + if !has_residing && res.tx_hash == outpoint.txid { + has_residing = true; + tx_update.txs.push(Arc::clone(&tx)); + match res.height.try_into() { + // Returned heights 0 & -1 are reserved for unconfirmed txs. + Ok(height) if height > 0 => { + pending_anchors.push((res.tx_hash, height)); + } + _ => { + tx_update.seen_ats.insert((res.tx_hash, start_time)); + } + } } - tx_update.txs.push(Arc::clone(&res_tx)); - match res.height.try_into() { - // Returned heights 0 & -1 are reserved for unconfirmed txs. - Ok(height) if height > 0 => { - pending_anchors.push((res.tx_hash, height)); + + if !has_spending && res.tx_hash != outpoint.txid { + let res_tx = self.fetch_tx(res.tx_hash)?; + // we exclude txs/anchors that do not spend our specified outpoint(s) + has_spending = res_tx + .input + .iter() + .any(|txin| txin.previous_output == outpoint); + if !has_spending { + continue; } - _ => { - tx_update.seen_ats.insert((res.tx_hash, start_time)); + tx_update.txs.push(Arc::clone(&res_tx)); + match res.height.try_into() { + // Returned heights 0 & -1 are reserved for unconfirmed txs. + Ok(height) if height > 0 => { + pending_anchors.push((res.tx_hash, height)); + } + _ => { + tx_update.seen_ats.insert((res.tx_hash, start_time)); + } } } } @@ -426,39 +411,47 @@ impl BdkElectrumClient { txids: impl IntoIterator, pending_anchors: &mut Vec<(Txid, usize)>, ) -> Result<(), Error> { + let mut txs = Vec::<(Txid, Arc)>::new(); + let mut scripts = Vec::new(); for txid in txids { - let tx = match self.fetch_tx(txid) { - Ok(tx) => tx, - Err(electrum_client::Error::Protocol(_)) => continue, - Err(other_err) => return Err(other_err), - }; + match self.fetch_tx(txid) { + Ok(tx) => { + let spk = tx + .output + .first() + .map(|txo| &txo.script_pubkey) + .expect("tx must have an output") + .clone(); + txs.push((txid, tx)); + scripts.push(spk); + } + Err(electrum_client::Error::Protocol(_)) => { + continue; + } + Err(e) => return Err(e), + } + } - let spk = tx - .output - .first() - .map(|txo| &txo.script_pubkey) - .expect("tx must have an output"); + // because of restrictions of the Electrum API, we have to use the `script_get_history` + // call to get confirmation status of our transaction + let spk_histories = self + .inner + .batch_script_get_history(scripts.iter().map(|spk| spk.as_script()))?; - // because of restrictions of the Electrum API, we have to use the `script_get_history` - // call to get confirmation status of our transaction - if let Some(r) = self - .inner - .script_get_history(spk)? - .into_iter() - .find(|r| r.tx_hash == txid) - { - match r.height.try_into() { + for (tx, spk_history) in txs.into_iter().zip(spk_histories) { + if let Some(res) = spk_history.into_iter().find(|res| res.tx_hash == tx.0) { + match res.height.try_into() { // Returned heights 0 & -1 are reserved for unconfirmed txs. Ok(height) if height > 0 => { - pending_anchors.push((txid, height)); + pending_anchors.push((tx.0, height)); } _ => { - tx_update.seen_ats.insert((r.tx_hash, start_time)); + tx_update.seen_ats.insert((res.tx_hash, start_time)); } } } - tx_update.txs.push(tx); + tx_update.txs.push(tx.1); } Ok(()) @@ -472,19 +465,37 @@ impl BdkElectrumClient { let mut results = Vec::with_capacity(txs_with_heights.len()); let mut to_fetch = Vec::new(); - // Build a map for height to block hash conversions. This is for obtaining block hash data - // with minimum `fetch_header` calls. - let mut height_to_hash: HashMap = HashMap::new(); - for &(_, height) in txs_with_heights { - let h = height as u32; - if !height_to_hash.contains_key(&h) { - // Try to obtain hash from the header cache, or fetch the header if absent. - let hash = self.fetch_header(h)?.block_hash(); - height_to_hash.insert(h, hash); + // Figure out which block heights we need headers for. + let mut needed_heights: Vec = + txs_with_heights.iter().map(|&(_, h)| h as u32).collect(); + needed_heights.sort_unstable(); + needed_heights.dedup(); + + let mut height_to_hash = HashMap::with_capacity(needed_heights.len()); + + // Collect headers of missing heights, and build `height_to_hash` map. + { + let mut cache = self.block_header_cache.lock().unwrap(); + + let mut missing_heights = Vec::new(); + for &height in &needed_heights { + if let Some(header) = cache.get(&height) { + height_to_hash.insert(height, header.block_hash()); + } else { + missing_heights.push(height); + } + } + + if !missing_heights.is_empty() { + let headers = self.inner.batch_block_header(missing_heights.clone())?; + for (height, header) in missing_heights.into_iter().zip(headers) { + height_to_hash.insert(height, header.block_hash()); + cache.insert(height, header); + } } } - // Check cache. + // Check our anchor cache and queue up any proofs we still need. { let anchor_cache = self.anchor_cache.lock().unwrap(); for &(txid, height) in txs_with_heights { @@ -505,14 +516,22 @@ impl BdkElectrumClient { let proof = self.inner.transaction_get_merkle(&txid, height)?; // Validate against header, retrying once on stale header. - let mut header = self.fetch_header(height as u32)?; + let mut header = { + let cache = self.block_header_cache.lock().unwrap(); + cache[&(height as u32)] + }; let mut valid = electrum_client::utils::validate_merkle_proof( &txid, &header.merkle_root, &proof, ); if !valid { - header = self.update_header(height as u32)?; + let new_header = self.inner.block_header(height)?; + self.block_header_cache + .lock() + .unwrap() + .insert(height as u32, new_header); + header = new_header; valid = electrum_client::utils::validate_merkle_proof( &txid, &header.merkle_root, @@ -526,7 +545,7 @@ impl BdkElectrumClient { confirmation_time: header.time as u64, block_id: BlockId { height: height as u32, - hash: header.block_hash(), + hash, }, }; self.anchor_cache