use bdk_chain::{
- bitcoin::{OutPoint, ScriptBuf, Transaction, Txid},
- collections::{BTreeMap, HashMap, HashSet},
+ bitcoin::{block::Header, BlockHash, OutPoint, ScriptBuf, Transaction, Txid},
+ collections::{BTreeMap, HashMap},
local_chain::CheckPoint,
spk_client::{FullScanRequest, FullScanResult, SyncRequest, SyncResult},
tx_graph::TxGraph,
- BlockId, ConfirmationHeightAnchor, ConfirmationTimeHeightAnchor,
+ Anchor, BlockId, ConfirmationTimeHeightAnchor,
};
-use core::str::FromStr;
use electrum_client::{ElectrumApi, Error, HeaderNotification};
-use std::sync::{Arc, Mutex};
+use std::{
+ collections::BTreeSet,
+ sync::{Arc, Mutex},
+};
/// We include a chain suffix of a certain length for the purpose of robustness.
const CHAIN_SUFFIX_LENGTH: u32 = 8;
pub inner: E,
/// The transaction cache
tx_cache: Mutex<HashMap<Txid, Arc<Transaction>>>,
+ /// The header cache
+ block_header_cache: Mutex<HashMap<u32, Header>>,
}
impl<E: ElectrumApi> BdkElectrumClient<E> {
Self {
inner: client,
tx_cache: Default::default(),
+ block_header_cache: Default::default(),
}
}
Ok(tx)
}
+ /// Fetch block header of given `height`.
+ ///
+ /// If it hits the cache it will return the cached version and avoid making the request.
+ fn fetch_header(&self, height: u32) -> Result<Header, Error> {
+ let block_header_cache = self.block_header_cache.lock().unwrap();
+
+ if let Some(header) = block_header_cache.get(&height) {
+ return Ok(*header);
+ }
+
+ drop(block_header_cache);
+
+ self.update_header(height)
+ }
+
+ /// Update a block header at given `height`. Returns the updated header.
+ fn update_header(&self, height: u32) -> Result<Header, Error> {
+ let header = self.inner.block_header(height as usize)?;
+
+ self.block_header_cache
+ .lock()
+ .unwrap()
+ .insert(height, header);
+
+ Ok(header)
+ }
+
/// Broadcasts a transaction to the network.
///
/// This is a re-export of [`ElectrumApi::transaction_broadcast`].
stop_gap: usize,
batch_size: usize,
fetch_prev_txouts: bool,
- ) -> Result<ElectrumFullScanResult<K>, Error> {
- let mut request_spks = request.spks_by_keychain;
-
- // We keep track of already-scanned spks just in case a reorg happens and we need to do a
- // rescan. We need to keep track of this as iterators in `keychain_spks` are "unbounded" so
- // cannot be collected. In addition, we keep track of whether an spk has an active tx
- // history for determining the `last_active_index`.
- // * key: (keychain, spk_index) that identifies the spk.
- // * val: (script_pubkey, has_tx_history).
- let mut scanned_spks = BTreeMap::<(K, u32), (ScriptBuf, bool)>::new();
-
- let update = loop {
- let (tip, _) = construct_update_tip(&self.inner, request.chain_tip.clone())?;
- let mut graph_update = TxGraph::<ConfirmationHeightAnchor>::default();
- let cps = tip
- .iter()
- .take(10)
- .map(|cp| (cp.height(), cp))
- .collect::<BTreeMap<u32, CheckPoint>>();
-
- if !request_spks.is_empty() {
- if !scanned_spks.is_empty() {
- scanned_spks.append(
- &mut self.populate_with_spks(
- &cps,
- &mut graph_update,
- &mut scanned_spks
- .iter()
- .map(|(i, (spk, _))| (i.clone(), spk.clone())),
- stop_gap,
- batch_size,
- )?,
- );
- }
- for (keychain, keychain_spks) in &mut request_spks {
- scanned_spks.extend(
- self.populate_with_spks(
- &cps,
- &mut graph_update,
- keychain_spks,
- stop_gap,
- batch_size,
- )?
- .into_iter()
- .map(|(spk_i, spk)| ((keychain.clone(), spk_i), spk)),
- );
- }
- }
-
- // check for reorgs during scan process
- let server_blockhash = self.inner.block_header(tip.height() as usize)?.block_hash();
- if tip.hash() != server_blockhash {
- continue; // reorg
+ ) -> Result<FullScanResult<K>, Error> {
+ let (tip, latest_blocks) =
+ fetch_tip_and_latest_blocks(&self.inner, request.chain_tip.clone())?;
+ let mut graph_update = TxGraph::<ConfirmationTimeHeightAnchor>::default();
+ let mut last_active_indices = BTreeMap::<K, u32>::new();
+
+ for (keychain, keychain_spks) in request.spks_by_keychain {
+ if let Some(last_active_index) =
+ self.populate_with_spks(&mut graph_update, keychain_spks, stop_gap, batch_size)?
+ {
+ last_active_indices.insert(keychain, last_active_index);
}
+ }
- // Fetch previous `TxOut`s for fee calculation if flag is enabled.
- if fetch_prev_txouts {
- self.fetch_prev_txout(&mut graph_update)?;
- }
+ let chain_update = chain_update(tip, &latest_blocks, graph_update.all_anchors())?;
- let chain_update = tip;
-
- let keychain_update = request_spks
- .into_keys()
- .filter_map(|k| {
- scanned_spks
- .range((k.clone(), u32::MIN)..=(k.clone(), u32::MAX))
- .rev()
- .find(|(_, (_, active))| *active)
- .map(|((_, i), _)| (k, *i))
- })
- .collect::<BTreeMap<_, _>>();
-
- break FullScanResult {
- graph_update,
- chain_update,
- last_active_indices: keychain_update,
- };
- };
+ // Fetch previous `TxOut`s for fee calculation if flag is enabled.
+ if fetch_prev_txouts {
+ self.fetch_prev_txout(&mut graph_update)?;
+ }
- Ok(ElectrumFullScanResult(update))
+ Ok(FullScanResult {
+ graph_update,
+ chain_update,
+ last_active_indices,
+ })
}
/// Sync a set of scripts with the blockchain (via an Electrum client) for the data specified
request: SyncRequest,
batch_size: usize,
fetch_prev_txouts: bool,
- ) -> Result<ElectrumSyncResult, Error> {
+ ) -> Result<SyncResult, Error> {
let full_scan_req = FullScanRequest::from_chain_tip(request.chain_tip.clone())
.set_spks_for_keychain((), request.spks.enumerate().map(|(i, spk)| (i as u32, spk)));
- let mut full_scan_res = self
- .full_scan(full_scan_req, usize::MAX, batch_size, false)?
- .with_confirmation_height_anchor();
+ let mut full_scan_res = self.full_scan(full_scan_req, usize::MAX, batch_size, false)?;
+ let (tip, latest_blocks) =
+ fetch_tip_and_latest_blocks(&self.inner, request.chain_tip.clone())?;
- let (tip, _) = construct_update_tip(&self.inner, request.chain_tip)?;
- let cps = tip
- .iter()
- .take(10)
- .map(|cp| (cp.height(), cp))
- .collect::<BTreeMap<u32, CheckPoint>>();
+ self.populate_with_txids(&mut full_scan_res.graph_update, request.txids)?;
+ self.populate_with_outpoints(&mut full_scan_res.graph_update, request.outpoints)?;
- self.populate_with_txids(&cps, &mut full_scan_res.graph_update, request.txids)?;
- self.populate_with_outpoints(&cps, &mut full_scan_res.graph_update, request.outpoints)?;
+ let chain_update = chain_update(
+ tip,
+ &latest_blocks,
+ full_scan_res.graph_update.all_anchors(),
+ )?;
// Fetch previous `TxOut`s for fee calculation if flag is enabled.
if fetch_prev_txouts {
self.fetch_prev_txout(&mut full_scan_res.graph_update)?;
}
- Ok(ElectrumSyncResult(SyncResult {
- chain_update: full_scan_res.chain_update,
+ Ok(SyncResult {
+ chain_update,
graph_update: full_scan_res.graph_update,
- }))
+ })
}
/// Populate the `graph_update` with transactions/anchors associated with the given `spks`.
/// Checkpoints (in `cps`) are used to create anchors. The `tx_cache` is self-explanatory.
fn populate_with_spks<I: Ord + Clone>(
&self,
- cps: &BTreeMap<u32, CheckPoint>,
- graph_update: &mut TxGraph<ConfirmationHeightAnchor>,
- spks: &mut impl Iterator<Item = (I, ScriptBuf)>,
+ graph_update: &mut TxGraph<ConfirmationTimeHeightAnchor>,
+ mut spks: impl Iterator<Item = (I, ScriptBuf)>,
stop_gap: usize,
batch_size: usize,
- ) -> Result<BTreeMap<I, (ScriptBuf, bool)>, Error> {
+ ) -> Result<Option<I>, Error> {
let mut unused_spk_count = 0_usize;
- let mut scanned_spks = BTreeMap::new();
+ let mut last_active_index = Option::<I>::None;
loop {
let spks = (0..batch_size)
.map_while(|_| spks.next())
.collect::<Vec<_>>();
if spks.is_empty() {
- return Ok(scanned_spks);
+ return Ok(last_active_index);
}
let spk_histories = self
.inner
.batch_script_get_history(spks.iter().map(|(_, s)| s.as_script()))?;
- for ((spk_index, spk), spk_history) in spks.into_iter().zip(spk_histories) {
+ for ((spk_index, _spk), spk_history) in spks.into_iter().zip(spk_histories) {
if spk_history.is_empty() {
- scanned_spks.insert(spk_index, (spk, false));
unused_spk_count += 1;
if unused_spk_count > stop_gap {
- return Ok(scanned_spks);
+ return Ok(last_active_index);
}
continue;
} else {
- scanned_spks.insert(spk_index, (spk, true));
+ last_active_index = Some(spk_index);
unused_spk_count = 0;
}
for tx_res in spk_history {
let _ = graph_update.insert_tx(self.fetch_tx(tx_res.tx_hash)?);
- if let Some(anchor) = determine_tx_anchor(cps, tx_res.height, tx_res.tx_hash) {
- let _ = graph_update.insert_anchor(tx_res.tx_hash, anchor);
- }
+ self.validate_merkle_for_anchor(graph_update, tx_res.tx_hash, tx_res.height)?;
}
}
}
}
- // Helper function which fetches the `TxOut`s of our relevant transactions' previous transactions,
- // which we do not have by default. This data is needed to calculate the transaction fee.
- fn fetch_prev_txout(
- &self,
- graph_update: &mut TxGraph<ConfirmationHeightAnchor>,
- ) -> Result<(), Error> {
- let full_txs: Vec<Arc<Transaction>> =
- graph_update.full_txs().map(|tx_node| tx_node.tx).collect();
- for tx in full_txs {
- for vin in &tx.input {
- let outpoint = vin.previous_output;
- let vout = outpoint.vout;
- let prev_tx = self.fetch_tx(outpoint.txid)?;
- let txout = prev_tx.output[vout as usize].clone();
- let _ = graph_update.insert_txout(outpoint, txout);
- }
- }
- Ok(())
- }
-
/// Populate the `graph_update` with associated transactions/anchors of `outpoints`.
///
/// Transactions in which the outpoint resides, and transactions that spend from the outpoint are
/// Checkpoints (in `cps`) are used to create anchors. The `tx_cache` is self-explanatory.
fn populate_with_outpoints(
&self,
- cps: &BTreeMap<u32, CheckPoint>,
- graph_update: &mut TxGraph<ConfirmationHeightAnchor>,
+ graph_update: &mut TxGraph<ConfirmationTimeHeightAnchor>,
outpoints: impl IntoIterator<Item = OutPoint>,
) -> Result<(), Error> {
for outpoint in outpoints {
if !has_residing && res.tx_hash == op_txid {
has_residing = true;
let _ = graph_update.insert_tx(Arc::clone(&op_tx));
- if let Some(anchor) = determine_tx_anchor(cps, res.height, res.tx_hash) {
- let _ = graph_update.insert_anchor(res.tx_hash, anchor);
- }
+ self.validate_merkle_for_anchor(graph_update, res.tx_hash, res.height)?;
}
if !has_spending && res.tx_hash != op_txid {
continue;
}
let _ = graph_update.insert_tx(Arc::clone(&res_tx));
- if let Some(anchor) = determine_tx_anchor(cps, res.height, res.tx_hash) {
- let _ = graph_update.insert_anchor(res.tx_hash, anchor);
- }
+ self.validate_merkle_for_anchor(graph_update, res.tx_hash, res.height)?;
}
}
}
/// Populate the `graph_update` with transactions/anchors of the provided `txids`.
fn populate_with_txids(
&self,
- cps: &BTreeMap<u32, CheckPoint>,
- graph_update: &mut TxGraph<ConfirmationHeightAnchor>,
+ graph_update: &mut TxGraph<ConfirmationTimeHeightAnchor>,
txids: impl IntoIterator<Item = Txid>,
) -> Result<(), Error> {
for txid in txids {
// because of restrictions of the Electrum API, we have to use the `script_get_history`
// call to get confirmation status of our transaction
- let anchor = match self
+ if let Some(r) = self
.inner
.script_get_history(spk)?
.into_iter()
.find(|r| r.tx_hash == txid)
{
- Some(r) => determine_tx_anchor(cps, r.height, txid),
- None => continue,
- };
+ self.validate_merkle_for_anchor(graph_update, txid, r.height)?;
+ }
let _ = graph_update.insert_tx(tx);
- if let Some(anchor) = anchor {
- let _ = graph_update.insert_anchor(txid, anchor);
- }
}
Ok(())
}
-}
-
-/// The result of [`BdkElectrumClient::full_scan`].
-///
-/// This can be transformed into a [`FullScanResult`] with either [`ConfirmationHeightAnchor`] or
-/// [`ConfirmationTimeHeightAnchor`] anchor types.
-pub struct ElectrumFullScanResult<K>(FullScanResult<K, ConfirmationHeightAnchor>);
-
-impl<K> ElectrumFullScanResult<K> {
- /// Return [`FullScanResult`] with [`ConfirmationHeightAnchor`].
- pub fn with_confirmation_height_anchor(self) -> FullScanResult<K, ConfirmationHeightAnchor> {
- self.0
- }
-
- /// Return [`FullScanResult`] with [`ConfirmationTimeHeightAnchor`].
- ///
- /// This requires additional calls to the Electrum server.
- pub fn with_confirmation_time_height_anchor(
- self,
- client: &BdkElectrumClient<impl ElectrumApi>,
- ) -> Result<FullScanResult<K, ConfirmationTimeHeightAnchor>, Error> {
- let res = self.0;
- Ok(FullScanResult {
- graph_update: try_into_confirmation_time_result(res.graph_update, &client.inner)?,
- chain_update: res.chain_update,
- last_active_indices: res.last_active_indices,
- })
- }
-}
-/// The result of [`BdkElectrumClient::sync`].
-///
-/// This can be transformed into a [`SyncResult`] with either [`ConfirmationHeightAnchor`] or
-/// [`ConfirmationTimeHeightAnchor`] anchor types.
-pub struct ElectrumSyncResult(SyncResult<ConfirmationHeightAnchor>);
+ // Helper function which checks if a transaction is confirmed by validating the merkle proof.
+ // An anchor is inserted if the transaction is validated to be in a confirmed block.
+ fn validate_merkle_for_anchor(
+ &self,
+ graph_update: &mut TxGraph<ConfirmationTimeHeightAnchor>,
+ txid: Txid,
+ confirmation_height: i32,
+ ) -> Result<(), Error> {
+ if let Ok(merkle_res) = self
+ .inner
+ .transaction_get_merkle(&txid, confirmation_height as usize)
+ {
+ let mut header = self.fetch_header(merkle_res.block_height as u32)?;
+ let mut is_confirmed_tx = electrum_client::utils::validate_merkle_proof(
+ &txid,
+ &header.merkle_root,
+ &merkle_res,
+ );
+
+ // Merkle validation will fail if the header in `block_header_cache` is outdated, so we
+ // want to check if there is a new header and validate against the new one.
+ if !is_confirmed_tx {
+ header = self.update_header(merkle_res.block_height as u32)?;
+ is_confirmed_tx = electrum_client::utils::validate_merkle_proof(
+ &txid,
+ &header.merkle_root,
+ &merkle_res,
+ );
+ }
-impl ElectrumSyncResult {
- /// Return [`SyncResult`] with [`ConfirmationHeightAnchor`].
- pub fn with_confirmation_height_anchor(self) -> SyncResult<ConfirmationHeightAnchor> {
- self.0
+ if is_confirmed_tx {
+ let _ = graph_update.insert_anchor(
+ txid,
+ ConfirmationTimeHeightAnchor {
+ confirmation_height: merkle_res.block_height as u32,
+ confirmation_time: header.time as u64,
+ anchor_block: BlockId {
+ height: merkle_res.block_height as u32,
+ hash: header.block_hash(),
+ },
+ },
+ );
+ }
+ }
+ Ok(())
}
- /// Return [`SyncResult`] with [`ConfirmationTimeHeightAnchor`].
- ///
- /// This requires additional calls to the Electrum server.
- pub fn with_confirmation_time_height_anchor(
- self,
- client: &BdkElectrumClient<impl ElectrumApi>,
- ) -> Result<SyncResult<ConfirmationTimeHeightAnchor>, Error> {
- let res = self.0;
- Ok(SyncResult {
- graph_update: try_into_confirmation_time_result(res.graph_update, &client.inner)?,
- chain_update: res.chain_update,
- })
+ // Helper function which fetches the `TxOut`s of our relevant transactions' previous transactions,
+ // which we do not have by default. This data is needed to calculate the transaction fee.
+ fn fetch_prev_txout(
+ &self,
+ graph_update: &mut TxGraph<ConfirmationTimeHeightAnchor>,
+ ) -> Result<(), Error> {
+ let full_txs: Vec<Arc<Transaction>> =
+ graph_update.full_txs().map(|tx_node| tx_node.tx).collect();
+ for tx in full_txs {
+ for vin in &tx.input {
+ let outpoint = vin.previous_output;
+ let vout = outpoint.vout;
+ let prev_tx = self.fetch_tx(outpoint.txid)?;
+ let txout = prev_tx.output[vout as usize].clone();
+ let _ = graph_update.insert_txout(outpoint, txout);
+ }
+ }
+ Ok(())
}
}
-fn try_into_confirmation_time_result(
- graph_update: TxGraph<ConfirmationHeightAnchor>,
- client: &impl ElectrumApi,
-) -> Result<TxGraph<ConfirmationTimeHeightAnchor>, Error> {
- let relevant_heights = graph_update
- .all_anchors()
- .iter()
- .map(|(a, _)| a.confirmation_height)
- .collect::<HashSet<_>>();
-
- let height_to_time = relevant_heights
- .clone()
- .into_iter()
- .zip(
- client
- .batch_block_header(relevant_heights)?
- .into_iter()
- .map(|bh| bh.time as u64),
- )
- .collect::<HashMap<u32, u64>>();
-
- Ok(graph_update.map_anchors(|a| ConfirmationTimeHeightAnchor {
- anchor_block: a.anchor_block,
- confirmation_height: a.confirmation_height,
- confirmation_time: height_to_time[&a.confirmation_height],
- }))
-}
-
-/// Return a [`CheckPoint`] of the latest tip, that connects with `prev_tip`.
-fn construct_update_tip(
+/// Return a [`CheckPoint`] of the latest tip, that connects with `prev_tip`. The latest blocks are
+/// fetched to construct checkpoint updates with the proper [`BlockHash`] in case of re-org.
+fn fetch_tip_and_latest_blocks(
client: &impl ElectrumApi,
prev_tip: CheckPoint,
-) -> Result<(CheckPoint, Option<u32>), Error> {
+) -> Result<(CheckPoint, BTreeMap<u32, BlockHash>), Error> {
let HeaderNotification { height, .. } = client.block_headers_subscribe()?;
let new_tip_height = height as u32;
// If electrum returns a tip height that is lower than our previous tip, then checkpoints do
// not need updating. We just return the previous tip and use that as the point of agreement.
if new_tip_height < prev_tip.height() {
- return Ok((prev_tip.clone(), Some(prev_tip.height())));
+ return Ok((prev_tip, BTreeMap::new()));
}
// Atomically fetch the latest `CHAIN_SUFFIX_LENGTH` count of blocks from Electrum. We use this
let agreement_height = agreement_cp.as_ref().map(CheckPoint::height);
let new_tip = new_blocks
+ .clone()
.into_iter()
// Prune `new_blocks` to only include blocks that are actually new.
.filter(|(height, _)| Some(*height) > agreement_height)
})
.expect("must have at least one checkpoint");
- Ok((new_tip, agreement_height))
+ Ok((new_tip, new_blocks))
}
-/// A [tx status] comprises of a concatenation of `tx_hash:height:`s. We transform a single one of
-/// these concatenations into a [`ConfirmationHeightAnchor`] if possible.
-///
-/// We use the lowest possible checkpoint as the anchor block (from `cps`). If an anchor block
-/// cannot be found, or the transaction is unconfirmed, [`None`] is returned.
-///
-/// [tx status](https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-basics.html#status)
-fn determine_tx_anchor(
- cps: &BTreeMap<u32, CheckPoint>,
- raw_height: i32,
- txid: Txid,
-) -> Option<ConfirmationHeightAnchor> {
- // The electrum API has a weird quirk where an unconfirmed transaction is presented with a
- // height of 0. To avoid invalid representation in our data structures, we manually set
- // transactions residing in the genesis block to have height 0, then interpret a height of 0 as
- // unconfirmed for all other transactions.
- if txid
- == Txid::from_str("4a5e1e4baab89f3a32518a88c31bc87f618f76673e2cc77ab2127b7afdeda33b")
- .expect("must deserialize genesis coinbase txid")
- {
- let anchor_block = cps.values().next()?.block_id();
- return Some(ConfirmationHeightAnchor {
- anchor_block,
- confirmation_height: 0,
- });
- }
- match raw_height {
- h if h <= 0 => {
- debug_assert!(h == 0 || h == -1, "unexpected height ({}) from electrum", h);
- None
- }
- h => {
- let h = h as u32;
- let anchor_block = cps.range(h..).next().map(|(_, cp)| cp.block_id())?;
- if h > anchor_block.height {
- None
- } else {
- Some(ConfirmationHeightAnchor {
- anchor_block,
- confirmation_height: h,
- })
- }
+// Add a corresponding checkpoint per anchor height if it does not yet exist. Checkpoints should not
+// surpass `latest_blocks`.
+fn chain_update<A: Anchor>(
+ mut tip: CheckPoint,
+ latest_blocks: &BTreeMap<u32, BlockHash>,
+ anchors: &BTreeSet<(A, Txid)>,
+) -> Result<CheckPoint, Error> {
+ for anchor in anchors {
+ let height = anchor.0.anchor_block().height;
+
+ // Checkpoint uses the `BlockHash` from `latest_blocks` so that the hash will be consistent
+ // in case of a re-org.
+ if tip.get(height).is_none() && height <= tip.height() {
+ let hash = match latest_blocks.get(&height) {
+ Some(&hash) => hash,
+ None => anchor.0.anchor_block().hash,
+ };
+ tip = tip.insert(BlockId { hash, height });
}
}
+ Ok(tip)
}