use bdk_chain::{
indexed_tx_graph::IndexedAdditions,
keychain::{KeychainTxOutIndex, LocalChangeSet, LocalUpdate},
- local_chain::{self, LocalChain, UpdateNotConnectedError},
+ local_chain::{self, CannotConnectError, CheckPoint, CheckPointIter, LocalChain},
tx_graph::{CanonicalTx, TxGraph},
Append, BlockId, ChainPosition, ConfirmationTime, ConfirmationTimeAnchor, FullTxOut,
IndexedTxGraph, Persist, PersistBackend,
use bitcoin::secp256k1::Secp256k1;
use bitcoin::util::psbt;
use bitcoin::{
- Address, BlockHash, EcdsaSighashType, LockTime, Network, OutPoint, SchnorrSighashType, Script,
- Sequence, Transaction, TxOut, Txid, Witness,
+ Address, EcdsaSighashType, LockTime, Network, OutPoint, SchnorrSighashType, Script, Sequence,
+ Transaction, TxOut, Txid, Witness,
};
use core::fmt;
use core::ops::Deref;
};
let changeset = db.load_from_persistence().map_err(NewError::Persist)?;
- chain.apply_changeset(changeset.chain_changeset);
+ chain.apply_changeset(&changeset.chain_changeset);
indexed_graph.apply_additions(changeset.indexed_additions);
let persist = Persist::new(db);
.graph()
.filter_chain_unspents(
&self.chain,
- self.chain.tip().unwrap_or_default(),
+ self.chain.tip().map(|cp| cp.block_id()).unwrap_or_default(),
self.indexed_graph.index.outpoints().iter().cloned(),
)
.map(|((k, i), full_txo)| new_local_utxo(k, i, full_txo))
}
/// Get all the checkpoints the wallet is currently storing indexed by height.
- pub fn checkpoints(&self) -> &BTreeMap<u32, BlockHash> {
- self.chain.blocks()
+ pub fn checkpoints(&self) -> CheckPointIter {
+ self.chain.iter_checkpoints()
}
/// Returns the latest checkpoint.
- pub fn latest_checkpoint(&self) -> Option<BlockId> {
+ pub fn latest_checkpoint(&self) -> Option<CheckPoint> {
self.chain.tip()
}
.graph()
.filter_chain_unspents(
&self.chain,
- self.chain.tip().unwrap_or_default(),
+ self.chain.tip().map(|cp| cp.block_id()).unwrap_or_default(),
core::iter::once((spk_i, op)),
)
.map(|((k, i), full_txo)| new_local_utxo(k, i, full_txo))
let canonical_tx = CanonicalTx {
observed_as: graph.get_chain_position(
&self.chain,
- self.chain.tip().unwrap_or_default(),
+ self.chain.tip().map(|cp| cp.block_id()).unwrap_or_default(),
txid,
)?,
node: graph.get_tx_node(txid)?,
pub fn insert_checkpoint(
&mut self,
block_id: BlockId,
- ) -> Result<bool, local_chain::InsertBlockNotMatchingError>
+ ) -> Result<bool, local_chain::InsertBlockError>
where
D: PersistBackend<ChangeSet>,
{
// anchor tx to checkpoint with lowest height that is >= position's height
let anchor = self
.chain
- .blocks()
+ .heights()
.range(height..)
.next()
.ok_or(InsertTxError::ConfirmationHeightCannotBeGreaterThanTip {
- tip_height: self.chain.tip().map(|b| b.height),
+ tip_height: self.chain.tip().map(|b| b.height()),
tx_height: height,
})
- .map(|(&anchor_height, &anchor_hash)| ConfirmationTimeAnchor {
+ .map(|(&anchor_height, &hash)| ConfirmationTimeAnchor {
anchor_block: BlockId {
height: anchor_height,
- hash: anchor_hash,
+ hash,
},
confirmation_height: height,
confirmation_time: time,
pub fn transactions(
&self,
) -> impl Iterator<Item = CanonicalTx<'_, Transaction, ConfirmationTimeAnchor>> + '_ {
- self.indexed_graph
- .graph()
- .list_chain_txs(&self.chain, self.chain.tip().unwrap_or_default())
+ self.indexed_graph.graph().list_chain_txs(
+ &self.chain,
+ self.chain.tip().map(|cp| cp.block_id()).unwrap_or_default(),
+ )
}
/// Return the balance, separated into available, trusted-pending, untrusted-pending and immature
pub fn get_balance(&self) -> Balance {
self.indexed_graph.graph().balance(
&self.chain,
- self.chain.tip().unwrap_or_default(),
+ self.chain.tip().map(|cp| cp.block_id()).unwrap_or_default(),
self.indexed_graph.index.outpoints().iter().cloned(),
|&(k, _), _| k == KeychainKind::Internal,
)
None => self
.chain
.tip()
- .and_then(|cp| cp.height.into())
- .map(|height| LockTime::from_height(height).expect("Invalid height")),
+ .map(|cp| LockTime::from_height(cp.height()).expect("Invalid height")),
h => h,
};
) -> Result<TxBuilder<'_, D, DefaultCoinSelectionAlgorithm, BumpFee>, Error> {
let graph = self.indexed_graph.graph();
let txout_index = &self.indexed_graph.index;
- let chain_tip = self.chain.tip().unwrap_or_default();
+ let chain_tip = self.chain.tip().map(|cp| cp.block_id()).unwrap_or_default();
let mut tx = graph
.get_tx(txid)
psbt: &mut psbt::PartiallySignedTransaction,
sign_options: SignOptions,
) -> Result<bool, Error> {
- let chain_tip = self.chain.tip().unwrap_or_default();
+ let chain_tip = self.chain.tip().map(|cp| cp.block_id()).unwrap_or_default();
let tx = &psbt.unsigned_tx;
let mut finished = true;
});
let current_height = sign_options
.assume_height
- .or(self.chain.tip().map(|b| b.height));
+ .or(self.chain.tip().map(|b| b.height()));
debug!(
"Input #{} - {}, using `confirmation_height` = {:?}, `current_height` = {:?}",
must_only_use_confirmed_tx: bool,
current_height: Option<u32>,
) -> (Vec<WeightedUtxo>, Vec<WeightedUtxo>) {
- let chain_tip = self.chain.tip().unwrap_or_default();
+ let chain_tip = self.chain.tip().map(|cp| cp.block_id()).unwrap_or_default();
// must_spend <- manually selected utxos
// may_spend <- all other available utxos
let mut may_spend = self.get_available_utxos();
}
/// Applies an update to the wallet and stages the changes (but does not [`commit`] them).
- ///
- /// This returns whether the `update` resulted in any changes.
+ /// Returns whether the `update` resulted in any changes.
///
/// Usually you create an `update` by interacting with some blockchain data source and inserting
/// transactions related to your wallet into it.
///
/// [`commit`]: Self::commit
- pub fn apply_update(&mut self, update: Update) -> Result<bool, UpdateNotConnectedError>
+ pub fn apply_update(&mut self, update: Update) -> Result<bool, CannotConnectError>
where
D: PersistBackend<ChangeSet>,
{
- let mut changeset: ChangeSet = self.chain.apply_update(update.chain)?.into();
+ let mut changeset = ChangeSet::from(self.chain.apply_update(update.chain)?);
let (_, index_additions) = self
.indexed_graph
.index
.reveal_to_target_multi(&update.keychain);
changeset.append(ChangeSet::from(IndexedAdditions::from(index_additions)));
- changeset.append(self.indexed_graph.apply_update(update.graph).into());
+ changeset.append(ChangeSet::from(
+ self.indexed_graph.apply_update(update.graph),
+ ));
let changed = !changeset.is_empty();
self.persist.stage(changeset);
fn receive_output_in_latest_block(wallet: &mut Wallet, value: u64) -> OutPoint {
let height = match wallet.latest_checkpoint() {
- Some(BlockId { height, .. }) => ConfirmationTime::Confirmed { height, time: 0 },
+ Some(cp) => ConfirmationTime::Confirmed {
+ height: cp.height(),
+ time: 0,
+ },
None => ConfirmationTime::Unconfirmed { last_seen: 0 },
};
receive_output(wallet, value, height)
// If there's no current_height we're left with using the last sync height
assert_eq!(
psbt.unsigned_tx.lock_time.0,
- wallet.latest_checkpoint().unwrap().height
+ wallet.latest_checkpoint().unwrap().height()
);
}
fn test_create_tx_drain_to_and_utxos() {
let (mut wallet, _) = get_funded_wallet(get_test_wpkh());
let addr = wallet.get_address(New);
- let utxos: Vec<_> = wallet
- .list_unspent()
- .into_iter()
- .map(|u| u.outpoint)
- .collect();
+ let utxos: Vec<_> = wallet.list_unspent().map(|u| u.outpoint).collect();
let mut builder = wallet.build_tx();
builder
.drain_to(addr.script_pubkey())
.insert_tx(
tx.clone(),
ConfirmationTime::Confirmed {
- height: wallet.latest_checkpoint().unwrap().height,
+ height: wallet.latest_checkpoint().unwrap().height(),
time: 42_000,
},
)
//! [`SpkTxOutIndex`]: crate::SpkTxOutIndex
use crate::{
- collections::BTreeMap,
- indexed_tx_graph::IndexedAdditions,
- local_chain::{self, LocalChain},
- tx_graph::TxGraph,
+ collections::BTreeMap, indexed_tx_graph::IndexedAdditions, local_chain, tx_graph::TxGraph,
Anchor, Append,
};
}
}
-/// A structure to update [`KeychainTxOutIndex`], [`TxGraph`] and [`LocalChain`]
-/// atomically.
-#[derive(Debug, Clone, PartialEq)]
+/// A structure to update [`KeychainTxOutIndex`], [`TxGraph`] and [`LocalChain`] atomically.
+///
+/// [`LocalChain`]: local_chain::LocalChain
+#[derive(Debug, Clone)]
pub struct LocalUpdate<K, A> {
/// Last active derivation index per keychain (`K`).
pub keychain: BTreeMap<K, u32>,
+
/// Update for the [`TxGraph`].
pub graph: TxGraph<A>,
+
/// Update for the [`LocalChain`].
- pub chain: LocalChain,
+ ///
+ /// [`LocalChain`]: local_chain::LocalChain
+ pub chain: local_chain::Update,
}
-impl<K, A> Default for LocalUpdate<K, A> {
- fn default() -> Self {
+impl<K, A> LocalUpdate<K, A> {
+ /// Construct a [`LocalUpdate`] with a given [`CheckPoint`] tip.
+ ///
+ /// [`CheckPoint`]: local_chain::CheckPoint
+ pub fn new(chain_update: local_chain::Update) -> Self {
Self {
- keychain: Default::default(),
- graph: Default::default(),
- chain: Default::default(),
+ keychain: BTreeMap::new(),
+ graph: TxGraph::default(),
+ chain: chain_update,
}
}
}
)]
pub struct LocalChangeSet<K, A> {
/// Changes to the [`LocalChain`].
+ ///
+ /// [`LocalChain`]: local_chain::LocalChain
pub chain_changeset: local_chain::ChangeSet,
/// Additions to [`IndexedTxGraph`].
use core::convert::Infallible;
-use alloc::collections::BTreeMap;
+use crate::collections::BTreeMap;
+use crate::{BlockId, ChainOracle};
+use alloc::sync::Arc;
use bitcoin::BlockHash;
-use crate::{BlockId, ChainOracle};
+/// A structure that represents changes to [`LocalChain`].
+pub type ChangeSet = BTreeMap<u32, Option<BlockHash>>;
+
+/// A blockchain of [`LocalChain`].
+///
+/// The in a linked-list with newer blocks pointing to older ones.
+#[derive(Debug, Clone)]
+pub struct CheckPoint(Arc<CPInner>);
+
+/// The internal contents of [`CheckPoint`].
+#[derive(Debug, Clone)]
+struct CPInner {
+ /// Block id (hash and height).
+ block: BlockId,
+ /// Previous checkpoint (if any).
+ prev: Option<Arc<CPInner>>,
+}
+
+impl CheckPoint {
+ /// Construct a new base block at the front of a linked list.
+ pub fn new(block: BlockId) -> Self {
+ Self(Arc::new(CPInner { block, prev: None }))
+ }
+
+ /// Puts another checkpoint onto the linked list representing the blockchain.
+ ///
+ /// Returns an `Err(self)` if the block you are pushing on is not at a greater height that the one you
+ /// are pushing on to.
+ pub fn push(self, block: BlockId) -> Result<Self, Self> {
+ if self.height() < block.height {
+ Ok(Self(Arc::new(CPInner {
+ block,
+ prev: Some(self.0),
+ })))
+ } else {
+ Err(self)
+ }
+ }
+
+ /// Extends the checkpoint linked list by a iterator of block ids.
+ ///
+ /// Returns an `Err(self)` if there is block which does not have a greater height than the
+ /// previous one.
+ pub fn extend_with_blocks(
+ self,
+ blocks: impl IntoIterator<Item = BlockId>,
+ ) -> Result<Self, Self> {
+ let mut curr = self.clone();
+ for block in blocks {
+ curr = curr.push(block).map_err(|_| self.clone())?;
+ }
+ Ok(curr)
+ }
+
+ /// Get the [`BlockId`] of the checkpoint.
+ pub fn block_id(&self) -> BlockId {
+ self.0.block
+ }
+
+ /// Get the height of the checkpoint.
+ pub fn height(&self) -> u32 {
+ self.0.block.height
+ }
+
+ /// Get the block hash of the checkpoint.
+ pub fn hash(&self) -> BlockHash {
+ self.0.block.hash
+ }
+
+ /// Get the previous checkpoint in the chain
+ pub fn prev(&self) -> Option<CheckPoint> {
+ self.0.prev.clone().map(CheckPoint)
+ }
+
+ /// Iterate from this checkpoint in descending height.
+ pub fn iter(&self) -> CheckPointIter {
+ self.clone().into_iter()
+ }
+}
+
+/// A structure that iterates over checkpoints backwards.
+pub struct CheckPointIter {
+ current: Option<Arc<CPInner>>,
+}
+
+impl Iterator for CheckPointIter {
+ type Item = CheckPoint;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ let current = self.current.clone()?;
+ self.current = current.prev.clone();
+ Some(CheckPoint(current))
+ }
+}
+
+impl IntoIterator for CheckPoint {
+ type Item = CheckPoint;
+ type IntoIter = CheckPointIter;
+
+ fn into_iter(self) -> Self::IntoIter {
+ CheckPointIter {
+ current: Some(self.0),
+ }
+ }
+}
+
+/// Represents an update to [`LocalChain`].
+#[derive(Debug, Clone)]
+pub struct Update {
+ /// The update's new [`CheckPoint`] tip.
+ pub tip: CheckPoint,
+
+ /// Whether the update allows for introducing older blocks.
+ ///
+ /// Refer to [`LocalChain::apply_update`] for more.
+ ///
+ /// [`LocalChain::apply_update`]: crate::local_chain::LocalChain::apply_update
+ pub introduce_older_blocks: bool,
+}
/// This is a local implementation of [`ChainOracle`].
-#[derive(Debug, Default, Clone, PartialEq, Eq, PartialOrd, Ord)]
+#[derive(Debug, Default, Clone)]
pub struct LocalChain {
- blocks: BTreeMap<u32, BlockHash>,
+ tip: Option<CheckPoint>,
+ index: BTreeMap<u32, BlockHash>,
+}
+
+impl PartialEq for LocalChain {
+ fn eq(&self, other: &Self) -> bool {
+ self.index == other.index
+ }
+}
+
+impl From<LocalChain> for BTreeMap<u32, BlockHash> {
+ fn from(value: LocalChain) -> Self {
+ value.index
+ }
+}
+
+impl From<ChangeSet> for LocalChain {
+ fn from(value: ChangeSet) -> Self {
+ Self::from_changeset(value)
+ }
+}
+
+impl From<BTreeMap<u32, BlockHash>> for LocalChain {
+ fn from(value: BTreeMap<u32, BlockHash>) -> Self {
+ Self::from_blocks(value)
+ }
}
impl ChainOracle for LocalChain {
fn is_block_in_chain(
&self,
block: BlockId,
- static_block: BlockId,
+ chain_tip: BlockId,
) -> Result<Option<bool>, Self::Error> {
- if block.height > static_block.height {
+ if block.height > chain_tip.height {
return Ok(None);
}
Ok(
match (
- self.blocks.get(&block.height),
- self.blocks.get(&static_block.height),
+ self.index.get(&block.height),
+ self.index.get(&chain_tip.height),
) {
- (Some(&hash), Some(&static_hash)) => {
- Some(hash == block.hash && static_hash == static_block.hash)
- }
+ (Some(cp), Some(tip_cp)) => Some(*cp == block.hash && *tip_cp == chain_tip.hash),
_ => None,
},
)
}
fn get_chain_tip(&self) -> Result<Option<BlockId>, Self::Error> {
- Ok(self.tip())
- }
-}
-
-impl AsRef<BTreeMap<u32, BlockHash>> for LocalChain {
- fn as_ref(&self) -> &BTreeMap<u32, BlockHash> {
- &self.blocks
- }
-}
-
-impl From<LocalChain> for BTreeMap<u32, BlockHash> {
- fn from(value: LocalChain) -> Self {
- value.blocks
- }
-}
-
-impl From<BTreeMap<u32, BlockHash>> for LocalChain {
- fn from(value: BTreeMap<u32, BlockHash>) -> Self {
- Self { blocks: value }
+ Ok(self.tip.as_ref().map(|tip| tip.block_id()))
}
}
impl LocalChain {
- /// Contruct a [`LocalChain`] from a list of [`BlockId`]s.
- pub fn from_blocks<B>(blocks: B) -> Self
- where
- B: IntoIterator<Item = BlockId>,
- {
- Self {
- blocks: blocks.into_iter().map(|b| (b.height, b.hash)).collect(),
- }
- }
+ /// Construct a [`LocalChain`] from an initial `changeset`.
+ pub fn from_changeset(changeset: ChangeSet) -> Self {
+ let mut chain = Self::default();
+ chain.apply_changeset(&changeset);
- /// Get a reference to a map of block height to hash.
- pub fn blocks(&self) -> &BTreeMap<u32, BlockHash> {
- &self.blocks
- }
+ #[cfg(debug_assertions)]
+ chain._check_consistency(Some(&changeset));
- /// Get the chain tip.
- pub fn tip(&self) -> Option<BlockId> {
- self.blocks
- .iter()
- .last()
- .map(|(&height, &hash)| BlockId { height, hash })
+ chain
}
- /// This is like the sparsechain's logic, expect we must guarantee that all invalidated heights
- /// are to be re-filled.
- pub fn determine_changeset(&self, update: &Self) -> Result<ChangeSet, UpdateNotConnectedError> {
- let update = update.as_ref();
- let update_tip = match update.keys().last().cloned() {
- Some(tip) => tip,
- None => return Ok(ChangeSet::default()),
- };
-
- // this is the latest height where both the update and local chain has the same block hash
- let agreement_height = update
- .iter()
- .rev()
- .find(|&(u_height, u_hash)| self.blocks.get(u_height) == Some(u_hash))
- .map(|(&height, _)| height);
-
- // the lower bound of the range to invalidate
- let invalidate_lb = match agreement_height {
- Some(height) if height == update_tip => u32::MAX,
- Some(height) => height + 1,
- None => 0,
+ /// Construct a [`LocalChain`] from a given `checkpoint` tip.
+ pub fn from_tip(tip: CheckPoint) -> Self {
+ let mut _self = Self {
+ tip: Some(tip),
+ ..Default::default()
};
+ _self.reindex(0);
- // the first block's height to invalidate in the local chain
- let invalidate_from_height = self.blocks.range(invalidate_lb..).next().map(|(&h, _)| h);
+ #[cfg(debug_assertions)]
+ _self._check_consistency(None);
- // the first block of height to invalidate (if any) should be represented in the update
- if let Some(first_invalid_height) = invalidate_from_height {
- if !update.contains_key(&first_invalid_height) {
- return Err(UpdateNotConnectedError(first_invalid_height));
- }
- }
+ _self
+ }
- let mut changeset: BTreeMap<u32, Option<BlockHash>> = match invalidate_from_height {
- Some(first_invalid_height) => {
- // the first block of height to invalidate should be represented in the update
- if !update.contains_key(&first_invalid_height) {
- return Err(UpdateNotConnectedError(first_invalid_height));
+ /// Constructs a [`LocalChain`] from a [`BTreeMap`] of height to [`BlockHash`].
+ ///
+ /// The [`BTreeMap`] enforces the height order. However, the caller must ensure the blocks are
+ /// all of the same chain.
+ pub fn from_blocks(blocks: BTreeMap<u32, BlockHash>) -> Self {
+ let mut tip: Option<CheckPoint> = None;
+
+ for block in &blocks {
+ match tip {
+ Some(curr) => {
+ tip = Some(
+ curr.push(BlockId::from(block))
+ .expect("BTreeMap is ordered"),
+ )
}
- self.blocks
- .range(first_invalid_height..)
- .map(|(height, _)| (*height, None))
- .collect()
- }
- None => BTreeMap::new(),
- };
- for (height, update_hash) in update {
- let original_hash = self.blocks.get(height);
- if Some(update_hash) != original_hash {
- changeset.insert(*height, Some(*update_hash));
+ None => tip = Some(CheckPoint::new(BlockId::from(block))),
}
}
- Ok(changeset)
+ let chain = Self { index: blocks, tip };
+
+ #[cfg(debug_assertions)]
+ chain._check_consistency(None);
+
+ chain
}
- /// Applies the given `changeset`.
- pub fn apply_changeset(&mut self, changeset: ChangeSet) {
- for (height, blockhash) in changeset {
- match blockhash {
- Some(blockhash) => self.blocks.insert(height, blockhash),
- None => self.blocks.remove(&height),
- };
- }
+ /// Get the highest checkpoint.
+ pub fn tip(&self) -> Option<CheckPoint> {
+ self.tip.clone()
}
- /// Updates [`LocalChain`] with an update [`LocalChain`].
+ /// Returns whether the [`LocalChain`] is empty (has no checkpoints).
+ pub fn is_empty(&self) -> bool {
+ self.tip.is_none()
+ }
+
+ /// Updates [`Self`] with the given `update_tip`.
///
- /// This is equivalent to calling [`determine_changeset`] and [`apply_changeset`] in sequence.
+ /// `introduce_older_blocks` specifies whether the `update_tip`'s history can introduce blocks
+ /// below the original chain's tip without invalidating blocks. Block-by-block syncing
+ /// mechanisms would typically create updates that builds upon the previous tip. In this case,
+ /// this paramater would be false. Script-pubkey based syncing mechanisms may not introduce
+ /// transactions in a chronological order so some updates require introducing older blocks (to
+ /// anchor older transactions). For script-pubkey based syncing, this parameter would typically
+ /// be true.
///
- /// [`determine_changeset`]: Self::determine_changeset
- /// [`apply_changeset`]: Self::apply_changeset
- pub fn apply_update(&mut self, update: Self) -> Result<ChangeSet, UpdateNotConnectedError> {
- let changeset = self.determine_changeset(&update)?;
- self.apply_changeset(changeset.clone());
- Ok(changeset)
+ /// The method returns [`ChangeSet`] on success. This represents the applied changes to
+ /// [`Self`].
+ ///
+ /// To update, the `update_tip` must *connect* with `self`. If `self` and `update_tip` has a
+ /// mutual checkpoint (same height and hash), it can connect if:
+ /// * The mutual checkpoint is the tip of `self`.
+ /// * An ancestor of `update_tip` has a height which is of the checkpoint one higher than the
+ /// mutual checkpoint from `self`.
+ ///
+ /// Additionally:
+ /// * If `self` is empty, `update_tip` will always connect.
+ /// * If `self` only has one checkpoint, `update_tip` must have an ancestor checkpoint with the
+ /// same height as it.
+ ///
+ /// To invalidate from a given checkpoint, `update_tip` must contain an ancestor checkpoint with
+ /// the same height but different hash.
+ ///
+ /// # Errors
+ ///
+ /// An error will occur if the update does not correctly connect with `self`.
+ ///
+ /// Refer to [module-level documentation] for more.
+ ///
+ /// [module-level documentation]: crate::local_chain
+ pub fn apply_update(&mut self, update: Update) -> Result<ChangeSet, CannotConnectError> {
+ match self.tip() {
+ Some(original_tip) => {
+ let changeset = merge_chains(
+ original_tip,
+ update.tip.clone(),
+ update.introduce_older_blocks,
+ )?;
+ self.apply_changeset(&changeset);
+
+ // return early as `apply_changeset` already calls `check_consistency`
+ Ok(changeset)
+ }
+ None => {
+ *self = Self::from_tip(update.tip);
+ let changeset = self.initial_changeset();
+
+ #[cfg(debug_assertions)]
+ self._check_consistency(Some(&changeset));
+ Ok(changeset)
+ }
+ }
}
- /// Derives a [`ChangeSet`] that assumes that there are no preceding changesets.
- ///
- /// The changeset returned will record additions of all blocks included in [`Self`].
- pub fn initial_changeset(&self) -> ChangeSet {
- self.blocks
- .iter()
- .map(|(&height, &hash)| (height, Some(hash)))
- .collect()
+ /// Apply the given `changeset`.
+ pub fn apply_changeset(&mut self, changeset: &ChangeSet) {
+ if let Some(start_height) = changeset.keys().next().cloned() {
+ let mut extension = BTreeMap::default();
+ let mut base: Option<CheckPoint> = None;
+ for cp in self.iter_checkpoints() {
+ if cp.height() >= start_height {
+ extension.insert(cp.height(), cp.hash());
+ } else {
+ base = Some(cp);
+ break;
+ }
+ }
+
+ for (&height, &hash) in changeset {
+ match hash {
+ Some(hash) => {
+ extension.insert(height, hash);
+ }
+ None => {
+ extension.remove(&height);
+ }
+ };
+ }
+ let new_tip = match base {
+ Some(base) => Some(
+ base.extend_with_blocks(extension.into_iter().map(BlockId::from))
+ .expect("extension is strictly greater than base"),
+ ),
+ None => LocalChain::from_blocks(extension).tip(),
+ };
+ self.tip = new_tip;
+ self.reindex(start_height);
+
+ #[cfg(debug_assertions)]
+ self._check_consistency(Some(changeset));
+ }
}
- /// Insert a block of [`BlockId`] into the [`LocalChain`].
+ /// Insert a [`BlockId`].
///
- /// # Error
+ /// # Errors
///
- /// If the insertion height already contains a block, and the block has a different blockhash,
- /// this will result in an [`InsertBlockNotMatchingError`].
- pub fn insert_block(
- &mut self,
- block_id: BlockId,
- ) -> Result<ChangeSet, InsertBlockNotMatchingError> {
- let mut update = Self::from_blocks(self.tip());
-
- if let Some(original_hash) = update.blocks.insert(block_id.height, block_id.hash) {
+ /// Replacing the block hash of an existing checkpoint will result in an error.
+ pub fn insert_block(&mut self, block_id: BlockId) -> Result<ChangeSet, InsertBlockError> {
+ if let Some(&original_hash) = self.index.get(&block_id.height) {
if original_hash != block_id.hash {
- return Err(InsertBlockNotMatchingError {
+ return Err(InsertBlockError {
height: block_id.height,
original_hash,
update_hash: block_id.hash,
});
+ } else {
+ return Ok(ChangeSet::default());
}
}
- Ok(self.apply_update(update).expect("should always connect"))
+ let mut changeset = ChangeSet::default();
+ changeset.insert(block_id.height, Some(block_id.hash));
+ self.apply_changeset(&changeset);
+ Ok(changeset)
}
-}
-/// This is the return value of [`determine_changeset`] and represents changes to [`LocalChain`].
-///
-/// [`determine_changeset`]: LocalChain::determine_changeset
-pub type ChangeSet = BTreeMap<u32, Option<BlockHash>>;
+ /// Reindex the heights in the chain from (and including) `from` height
+ fn reindex(&mut self, from: u32) {
+ let _ = self.index.split_off(&from);
+ for cp in self.iter_checkpoints() {
+ if cp.height() < from {
+ break;
+ }
+ self.index.insert(cp.height(), cp.hash());
+ }
+ }
-/// Represents an update failure of [`LocalChain`] due to the update not connecting to the original
-/// chain.
-///
-/// The update cannot be applied to the chain because the chain suffix it represents did not
-/// connect to the existing chain. This error case contains the checkpoint height to include so
-/// that the chains can connect.
-#[derive(Clone, Debug, PartialEq)]
-pub struct UpdateNotConnectedError(pub u32);
+ /// Derives an initial [`ChangeSet`], meaning that it can be applied to an empty chain to
+ /// recover the current chain.
+ pub fn initial_changeset(&self) -> ChangeSet {
+ self.index.iter().map(|(k, v)| (*k, Some(*v))).collect()
+ }
-impl core::fmt::Display for UpdateNotConnectedError {
- fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
- write!(
- f,
- "the update cannot connect with the chain, try include block at height {}",
- self.0
- )
+ /// Iterate over checkpoints in decending height order.
+ pub fn iter_checkpoints(&self) -> CheckPointIter {
+ CheckPointIter {
+ current: self.tip.as_ref().map(|tip| tip.0.clone()),
+ }
}
-}
-#[cfg(feature = "std")]
-impl std::error::Error for UpdateNotConnectedError {}
+ /// Get a reference to the internal index mapping the height to block hash
+ pub fn heights(&self) -> &BTreeMap<u32, BlockHash> {
+ &self.index
+ }
+
+ /// Checkpoints that exist under `self.tip` and blocks indexed in `self.index` should be equal.
+ /// Additionally, if a `changeset` is provided, the changes specified in the `changeset` should
+ /// be reflected in `self.index`.
+ #[cfg(debug_assertions)]
+ fn _check_consistency(&self, changeset: Option<&ChangeSet>) {
+ debug_assert_eq!(
+ self.tip
+ .iter()
+ .flat_map(CheckPoint::iter)
+ .map(|cp| (cp.height(), cp.hash()))
+ .collect::<BTreeMap<_, _>>(),
+ self.index,
+ "checkpoint history and index must be consistent"
+ );
+
+ if let Some(changeset) = changeset {
+ for (height, exp_hash) in changeset {
+ let hash = self.index.get(height);
+ assert_eq!(
+ hash,
+ exp_hash.as_ref(),
+ "changeset changes should be reflected in the internal index"
+ );
+ }
+ }
+ }
+}
/// Represents a failure when trying to insert a checkpoint into [`LocalChain`].
#[derive(Clone, Debug, PartialEq)]
-pub struct InsertBlockNotMatchingError {
+pub struct InsertBlockError {
/// The checkpoints' height.
pub height: u32,
/// Original checkpoint's block hash.
pub update_hash: BlockHash,
}
-impl core::fmt::Display for InsertBlockNotMatchingError {
+impl core::fmt::Display for InsertBlockError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(
f,
}
#[cfg(feature = "std")]
-impl std::error::Error for InsertBlockNotMatchingError {}
+impl std::error::Error for InsertBlockError {}
+
+/// Occurs when an update does not have a common checkpoint with the original chain.
+#[derive(Clone, Debug, PartialEq)]
+pub struct CannotConnectError {
+ /// The suggested checkpoint to include to connect the two chains.
+ pub try_include_height: u32,
+}
+
+impl core::fmt::Display for CannotConnectError {
+ fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
+ write!(
+ f,
+ "introduced chain cannot connect with the original chain, try include height {}",
+ self.try_include_height,
+ )
+ }
+}
+
+#[cfg(feature = "std")]
+impl std::error::Error for CannotConnectError {}
+
+fn merge_chains(
+ original_tip: CheckPoint,
+ update_tip: CheckPoint,
+ introduce_older_blocks: bool,
+) -> Result<ChangeSet, CannotConnectError> {
+ let mut changeset = ChangeSet::default();
+ let mut orig = original_tip.into_iter();
+ let mut update = update_tip.into_iter();
+ let mut curr_orig = None;
+ let mut curr_update = None;
+ let mut prev_orig: Option<CheckPoint> = None;
+ let mut prev_update: Option<CheckPoint> = None;
+ let mut point_of_agreement_found = false;
+ let mut prev_orig_was_invalidated = false;
+ let mut potentially_invalidated_heights = vec![];
+
+ // To find the difference between the new chain and the original we iterate over both of them
+ // from the tip backwards in tandem. We always dealing with the highest one from either chain
+ // first and move to the next highest. The crucial logic is applied when they have blocks at the
+ // same height.
+ loop {
+ if curr_orig.is_none() {
+ curr_orig = orig.next();
+ }
+ if curr_update.is_none() {
+ curr_update = update.next();
+ }
+
+ match (curr_orig.as_ref(), curr_update.as_ref()) {
+ // Update block that doesn't exist in the original chain
+ (o, Some(u)) if Some(u.height()) > o.map(|o| o.height()) => {
+ changeset.insert(u.height(), Some(u.hash()));
+ prev_update = curr_update.take();
+ }
+ // Original block that isn't in the update
+ (Some(o), u) if Some(o.height()) > u.map(|u| u.height()) => {
+ // this block might be gone if an earlier block gets invalidated
+ potentially_invalidated_heights.push(o.height());
+ prev_orig_was_invalidated = false;
+ prev_orig = curr_orig.take();
+
+ // OPTIMIZATION: we have run out of update blocks so we don't need to continue
+ // iterating becuase there's no possibility of adding anything to changeset.
+ if u.is_none() {
+ break;
+ }
+ }
+ (Some(o), Some(u)) => {
+ if o.hash() == u.hash() {
+ // We have found our point of agreement 🎉 -- we require that the previous (i.e.
+ // higher because we are iterating backwards) block in the original chain was
+ // invalidated (if it exists). This ensures that there is an unambigious point of
+ // connection to the original chain from the update chain (i.e. we know the
+ // precisely which original blocks are invalid).
+ if !prev_orig_was_invalidated && !point_of_agreement_found {
+ if let (Some(prev_orig), Some(_prev_update)) = (&prev_orig, &prev_update) {
+ return Err(CannotConnectError {
+ try_include_height: prev_orig.height(),
+ });
+ }
+ }
+ point_of_agreement_found = true;
+ prev_orig_was_invalidated = false;
+ // OPTIMIZATION 1 -- If we know that older blocks cannot be introduced without
+ // invalidation, we can break after finding the point of agreement.
+ // OPTIMIZATION 2 -- if we have the same underlying pointer at this point, we
+ // can guarantee that no older blocks are introduced.
+ if !introduce_older_blocks || Arc::as_ptr(&o.0) == Arc::as_ptr(&u.0) {
+ return Ok(changeset);
+ }
+ } else {
+ // We have an invalidation height so we set the height to the updated hash and
+ // also purge all the original chain block hashes above this block.
+ changeset.insert(u.height(), Some(u.hash()));
+ for invalidated_height in potentially_invalidated_heights.drain(..) {
+ changeset.insert(invalidated_height, None);
+ }
+ prev_orig_was_invalidated = true;
+ }
+ prev_update = curr_update.take();
+ prev_orig = curr_orig.take();
+ }
+ (None, None) => {
+ break;
+ }
+ _ => {
+ unreachable!("compiler cannot tell that everything has been covered")
+ }
+ }
+ }
+
+ // When we don't have a point of agreement you can imagine it is implicitly the
+ // genesis block so we need to do the final connectivity check which in this case
+ // just means making sure the entire original chain was invalidated.
+ if !prev_orig_was_invalidated && !point_of_agreement_found {
+ if let Some(prev_orig) = prev_orig {
+ return Err(CannotConnectError {
+ try_include_height: prev_orig.height(),
+ });
+ }
+ }
+
+ Ok(changeset)
+}
//! ```
use crate::{
- collections::*, keychain::Balance, Anchor, Append, BlockId, ChainOracle, ChainPosition,
- ForEachTxOut, FullTxOut,
+ collections::*, keychain::Balance, local_chain::LocalChain, Anchor, Append, BlockId,
+ ChainOracle, ChainPosition, ForEachTxOut, FullTxOut,
};
use alloc::vec::Vec;
use bitcoin::{OutPoint, Script, Transaction, TxOut, Txid};
}
impl<A: Anchor> TxGraph<A> {
+ /// Find missing block heights of `chain`.
+ ///
+ /// This works by scanning through anchors, and seeing whether the anchor block of the anchor
+ /// exists in the [`LocalChain`].
+ pub fn missing_blocks<'a>(&'a self, chain: &'a LocalChain) -> impl Iterator<Item = u32> + 'a {
+ self.anchors
+ .iter()
+ .map(|(a, _)| a.anchor_block())
+ .filter({
+ let mut last_block = Option::<BlockId>::None;
+ move |block| {
+ if last_block.as_ref() == Some(block) {
+ false
+ } else {
+ last_block = Some(*block);
+ true
+ }
+ }
+ })
+ .filter_map(|block| match chain.heights().get(&block.height) {
+ Some(chain_hash) if *chain_hash == block.hash => None,
+ _ => Some(block.height),
+ })
+ }
+
/// Get the position of the transaction in `chain` with tip `chain_tip`.
///
/// If the given transaction of `txid` does not exist in the chain of `chain_tip`, `None` is
macro_rules! local_chain {
[ $(($height:expr, $block_hash:expr)), * ] => {{
#[allow(unused_mut)]
- bdk_chain::local_chain::LocalChain::from_blocks([$(($height, $block_hash).into()),*])
+ bdk_chain::local_chain::LocalChain::from_blocks([$(($height, $block_hash).into()),*].into_iter().collect())
}};
}
#[allow(unused_macros)]
-macro_rules! chain {
- ($([$($tt:tt)*]),*) => { chain!( checkpoints: [$([$($tt)*]),*] ) };
- (checkpoints: $($tail:tt)*) => { chain!( index: TxHeight, checkpoints: $($tail)*) };
- (index: $ind:ty, checkpoints: [ $([$height:expr, $block_hash:expr]),* ] $(,txids: [$(($txid:expr, $tx_height:expr)),*])?) => {{
+macro_rules! chain_update {
+ [ $(($height:expr, $hash:expr)), * ] => {{
#[allow(unused_mut)]
- let mut chain = bdk_chain::sparse_chain::SparseChain::<$ind>::from_checkpoints([$(($height, $block_hash).into()),*]);
-
- $(
- $(
- let _ = chain.insert_tx($txid, $tx_height).expect("should succeed");
- )*
- )?
-
- chain
+ bdk_chain::local_chain::Update {
+ tip: bdk_chain::local_chain::LocalChain::from_blocks([$(($height, $hash).into()),*].into_iter().collect())
+ .tip()
+ .expect("must have tip"),
+ introduce_older_blocks: true,
+ }
}};
}
// Create Local chains
let local_chain = (0..150)
- .map(|i| (i as u32, h!("random")))
- .collect::<BTreeMap<u32, BlockHash>>();
+ .map(|i| (i as u32, Some(h!("random"))))
+ .collect::<BTreeMap<u32, Option<BlockHash>>>();
let local_chain = LocalChain::from(local_chain);
// Initiate IndexedTxGraph
(
*tx,
local_chain
- .blocks()
+ .heights()
.get(&height)
- .map(|&hash| BlockId { height, hash })
+ .cloned()
+ .map(|hash| BlockId { height, hash })
.map(|anchor_block| ConfirmationHeightAnchor {
anchor_block,
confirmation_height: anchor_block.height,
|height: u32,
graph: &IndexedTxGraph<ConfirmationHeightAnchor, KeychainTxOutIndex<String>>| {
let chain_tip = local_chain
- .blocks()
+ .heights()
.get(&height)
.map(|&hash| BlockId { height, hash })
- .expect("block must exist");
+ .unwrap_or_else(|| panic!("block must exist at {}", height));
let txouts = graph
.graph()
.filter_chain_txouts(
-use bdk_chain::local_chain::{
- ChangeSet, InsertBlockNotMatchingError, LocalChain, UpdateNotConnectedError,
-};
+use bdk_chain::local_chain::{CannotConnectError, ChangeSet, InsertBlockError, LocalChain, Update};
use bitcoin::BlockHash;
#[macro_use]
mod common;
-#[test]
-fn add_first_tip() {
- let chain = LocalChain::default();
- assert_eq!(
- chain.determine_changeset(&local_chain![(0, h!("A"))]),
- Ok([(0, Some(h!("A")))].into()),
- "add first tip"
- );
-}
-
-#[test]
-fn add_second_tip() {
- let chain = local_chain![(0, h!("A"))];
- assert_eq!(
- chain.determine_changeset(&local_chain![(0, h!("A")), (1, h!("B"))]),
- Ok([(1, Some(h!("B")))].into())
- );
-}
-
-#[test]
-fn two_disjoint_chains_cannot_merge() {
- let chain1 = local_chain![(0, h!("A"))];
- let chain2 = local_chain![(1, h!("B"))];
- assert_eq!(
- chain1.determine_changeset(&chain2),
- Err(UpdateNotConnectedError(0))
- );
-}
-
-#[test]
-fn duplicate_chains_should_merge() {
- let chain1 = local_chain![(0, h!("A"))];
- let chain2 = local_chain![(0, h!("A"))];
- assert_eq!(chain1.determine_changeset(&chain2), Ok(Default::default()));
-}
-
-#[test]
-fn can_introduce_older_checkpoints() {
- let chain1 = local_chain![(2, h!("C")), (3, h!("D"))];
- let chain2 = local_chain![(1, h!("B")), (2, h!("C"))];
-
- assert_eq!(
- chain1.determine_changeset(&chain2),
- Ok([(1, Some(h!("B")))].into())
- );
-}
-
-#[test]
-fn fix_blockhash_before_agreement_point() {
- let chain1 = local_chain![(0, h!("im-wrong")), (1, h!("we-agree"))];
- let chain2 = local_chain![(0, h!("fix")), (1, h!("we-agree"))];
-
- assert_eq!(
- chain1.determine_changeset(&chain2),
- Ok([(0, Some(h!("fix")))].into())
- )
-}
-
-/// B and C are in both chain and update
-/// ```
-/// | 0 | 1 | 2 | 3 | 4
-/// chain | B C
-/// update | A B C D
-/// ```
-/// This should succeed with the point of agreement being C and A should be added in addition.
-#[test]
-fn two_points_of_agreement() {
- let chain1 = local_chain![(1, h!("B")), (2, h!("C"))];
- let chain2 = local_chain![(0, h!("A")), (1, h!("B")), (2, h!("C")), (3, h!("D"))];
-
- assert_eq!(
- chain1.determine_changeset(&chain2),
- Ok([(0, Some(h!("A"))), (3, Some(h!("D")))].into()),
- );
-}
-
-/// Update and chain does not connect:
-/// ```
-/// | 0 | 1 | 2 | 3 | 4
-/// chain | B C
-/// update | A B D
-/// ```
-/// This should fail as we cannot figure out whether C & D are on the same chain
-#[test]
-fn update_and_chain_does_not_connect() {
- let chain1 = local_chain![(1, h!("B")), (2, h!("C"))];
- let chain2 = local_chain![(0, h!("A")), (1, h!("B")), (3, h!("D"))];
-
- assert_eq!(
- chain1.determine_changeset(&chain2),
- Err(UpdateNotConnectedError(2)),
- );
+#[derive(Debug)]
+struct TestLocalChain<'a> {
+ name: &'static str,
+ chain: LocalChain,
+ update: Update,
+ exp: ExpectedResult<'a>,
}
-/// Transient invalidation:
-/// ```
-/// | 0 | 1 | 2 | 3 | 4 | 5
-/// chain | A B C E
-/// update | A B' C' D
-/// ```
-/// This should succeed and invalidate B,C and E with point of agreement being A.
-#[test]
-fn transitive_invalidation_applies_to_checkpoints_higher_than_invalidation() {
- let chain1 = local_chain![(0, h!("A")), (2, h!("B")), (3, h!("C")), (5, h!("E"))];
- let chain2 = local_chain![(0, h!("A")), (2, h!("B'")), (3, h!("C'")), (4, h!("D"))];
-
- assert_eq!(
- chain1.determine_changeset(&chain2),
- Ok([
- (2, Some(h!("B'"))),
- (3, Some(h!("C'"))),
- (4, Some(h!("D"))),
- (5, None),
- ]
- .into())
- );
+#[derive(Debug, PartialEq)]
+enum ExpectedResult<'a> {
+ Ok {
+ changeset: &'a [(u32, Option<BlockHash>)],
+ init_changeset: &'a [(u32, Option<BlockHash>)],
+ },
+ Err(CannotConnectError),
}
-/// Transient invalidation:
-/// ```
-/// | 0 | 1 | 2 | 3 | 4
-/// chain | B C E
-/// update | B' C' D
-/// ```
-///
-/// This should succeed and invalidate B, C and E with no point of agreement
-#[test]
-fn transitive_invalidation_applies_to_checkpoints_higher_than_invalidation_no_point_of_agreement() {
- let chain1 = local_chain![(1, h!("B")), (2, h!("C")), (4, h!("E"))];
- let chain2 = local_chain![(1, h!("B'")), (2, h!("C'")), (3, h!("D"))];
+impl<'a> TestLocalChain<'a> {
+ fn run(mut self) {
+ println!("[TestLocalChain] test: {}", self.name);
+ let got_changeset = match self.chain.apply_update(self.update) {
+ Ok(changeset) => changeset,
+ Err(got_err) => {
+ assert_eq!(
+ ExpectedResult::Err(got_err),
+ self.exp,
+ "{}: unexpected error",
+ self.name
+ );
+ return;
+ }
+ };
- assert_eq!(
- chain1.determine_changeset(&chain2),
- Ok([
- (1, Some(h!("B'"))),
- (2, Some(h!("C'"))),
- (3, Some(h!("D"))),
- (4, None)
- ]
- .into())
- )
+ match self.exp {
+ ExpectedResult::Ok {
+ changeset,
+ init_changeset,
+ } => {
+ assert_eq!(
+ got_changeset,
+ changeset.iter().cloned().collect(),
+ "{}: unexpected changeset",
+ self.name
+ );
+ assert_eq!(
+ self.chain.initial_changeset(),
+ init_changeset.iter().cloned().collect(),
+ "{}: unexpected initial changeset",
+ self.name
+ );
+ }
+ ExpectedResult::Err(err) => panic!(
+ "{}: expected error ({}), got non-error result: {:?}",
+ self.name, err, got_changeset
+ ),
+ }
+ }
}
-/// Transient invalidation:
-/// ```
-/// | 0 | 1 | 2 | 3 | 4
-/// chain | A B C E
-/// update | B' C' D
-/// ```
-///
-/// This should fail since although it tells us that B and C are invalid it doesn't tell us whether
-/// A was invalid.
#[test]
-fn invalidation_but_no_connection() {
- let chain1 = local_chain![(0, h!("A")), (1, h!("B")), (2, h!("C")), (4, h!("E"))];
- let chain2 = local_chain![(1, h!("B'")), (2, h!("C'")), (3, h!("D"))];
-
- assert_eq!(
- chain1.determine_changeset(&chain2),
- Err(UpdateNotConnectedError(0))
- )
+fn update_local_chain() {
+ [
+ TestLocalChain {
+ name: "add first tip",
+ chain: local_chain![],
+ update: chain_update![(0, h!("A"))],
+ exp: ExpectedResult::Ok {
+ changeset: &[(0, Some(h!("A")))],
+ init_changeset: &[(0, Some(h!("A")))],
+ },
+ },
+ TestLocalChain {
+ name: "add second tip",
+ chain: local_chain![(0, h!("A"))],
+ update: chain_update![(0, h!("A")), (1, h!("B"))],
+ exp: ExpectedResult::Ok {
+ changeset: &[(1, Some(h!("B")))],
+ init_changeset: &[(0, Some(h!("A"))), (1, Some(h!("B")))],
+ },
+ },
+ TestLocalChain {
+ name: "two disjoint chains cannot merge",
+ chain: local_chain![(0, h!("A"))],
+ update: chain_update![(1, h!("B"))],
+ exp: ExpectedResult::Err(CannotConnectError {
+ try_include_height: 0,
+ }),
+ },
+ TestLocalChain {
+ name: "two disjoint chains cannot merge (existing chain longer)",
+ chain: local_chain![(1, h!("A"))],
+ update: chain_update![(0, h!("B"))],
+ exp: ExpectedResult::Err(CannotConnectError {
+ try_include_height: 1,
+ }),
+ },
+ TestLocalChain {
+ name: "duplicate chains should merge",
+ chain: local_chain![(0, h!("A"))],
+ update: chain_update![(0, h!("A"))],
+ exp: ExpectedResult::Ok {
+ changeset: &[],
+ init_changeset: &[(0, Some(h!("A")))],
+ },
+ },
+ // Introduce an older checkpoint (B)
+ // | 0 | 1 | 2 | 3
+ // chain | C D
+ // update | B C
+ TestLocalChain {
+ name: "can introduce older checkpoint",
+ chain: local_chain![(2, h!("C")), (3, h!("D"))],
+ update: chain_update![(1, h!("B")), (2, h!("C"))],
+ exp: ExpectedResult::Ok {
+ changeset: &[(1, Some(h!("B")))],
+ init_changeset: &[(1, Some(h!("B"))), (2, Some(h!("C"))), (3, Some(h!("D")))],
+ },
+ },
+ // Introduce an older checkpoint (A) that is not directly behind PoA
+ // | 1 | 2 | 3
+ // chain | B C
+ // update | A C
+ TestLocalChain {
+ name: "can introduce older checkpoint 2",
+ chain: local_chain![(3, h!("B")), (4, h!("C"))],
+ update: chain_update![(2, h!("A")), (4, h!("C"))],
+ exp: ExpectedResult::Ok {
+ changeset: &[(2, Some(h!("A")))],
+ init_changeset: &[(2, Some(h!("A"))), (3, Some(h!("B"))), (4, Some(h!("C")))],
+ }
+ },
+ // Introduce an older checkpoint (B) that is not the oldest checkpoint
+ // | 1 | 2 | 3
+ // chain | A C
+ // update | B C
+ TestLocalChain {
+ name: "can introduce older checkpoint 3",
+ chain: local_chain![(1, h!("A")), (3, h!("C"))],
+ update: chain_update![(2, h!("B")), (3, h!("C"))],
+ exp: ExpectedResult::Ok {
+ changeset: &[(2, Some(h!("B")))],
+ init_changeset: &[(1, Some(h!("A"))), (2, Some(h!("B"))), (3, Some(h!("C")))],
+ }
+ },
+ // Introduce two older checkpoints below the PoA
+ // | 1 | 2 | 3
+ // chain | C
+ // update | A B C
+ TestLocalChain {
+ name: "introduce two older checkpoints below PoA",
+ chain: local_chain![(3, h!("C"))],
+ update: chain_update![(1, h!("A")), (2, h!("B")), (3, h!("C"))],
+ exp: ExpectedResult::Ok {
+ changeset: &[(1, Some(h!("A"))), (2, Some(h!("B")))],
+ init_changeset: &[(1, Some(h!("A"))), (2, Some(h!("B"))), (3, Some(h!("C")))],
+ },
+ },
+ TestLocalChain {
+ name: "fix blockhash before agreement point",
+ chain: local_chain![(0, h!("im-wrong")), (1, h!("we-agree"))],
+ update: chain_update![(0, h!("fix")), (1, h!("we-agree"))],
+ exp: ExpectedResult::Ok {
+ changeset: &[(0, Some(h!("fix")))],
+ init_changeset: &[(0, Some(h!("fix"))), (1, Some(h!("we-agree")))],
+ },
+ },
+ // B and C are in both chain and update
+ // | 0 | 1 | 2 | 3 | 4
+ // chain | B C
+ // update | A B C D
+ // This should succeed with the point of agreement being C and A should be added in addition.
+ TestLocalChain {
+ name: "two points of agreement",
+ chain: local_chain![(1, h!("B")), (2, h!("C"))],
+ update: chain_update![(0, h!("A")), (1, h!("B")), (2, h!("C")), (3, h!("D"))],
+ exp: ExpectedResult::Ok {
+ changeset: &[(0, Some(h!("A"))), (3, Some(h!("D")))],
+ init_changeset: &[
+ (0, Some(h!("A"))),
+ (1, Some(h!("B"))),
+ (2, Some(h!("C"))),
+ (3, Some(h!("D"))),
+ ],
+ },
+ },
+ // Update and chain does not connect:
+ // | 0 | 1 | 2 | 3 | 4
+ // chain | B C
+ // update | A B D
+ // This should fail as we cannot figure out whether C & D are on the same chain
+ TestLocalChain {
+ name: "update and chain does not connect",
+ chain: local_chain![(1, h!("B")), (2, h!("C"))],
+ update: chain_update![(0, h!("A")), (1, h!("B")), (3, h!("D"))],
+ exp: ExpectedResult::Err(CannotConnectError {
+ try_include_height: 2,
+ }),
+ },
+ // Transient invalidation:
+ // | 0 | 1 | 2 | 3 | 4 | 5
+ // chain | A B C E
+ // update | A B' C' D
+ // This should succeed and invalidate B,C and E with point of agreement being A.
+ TestLocalChain {
+ name: "transitive invalidation applies to checkpoints higher than invalidation",
+ chain: local_chain![(0, h!("A")), (2, h!("B")), (3, h!("C")), (5, h!("E"))],
+ update: chain_update![(0, h!("A")), (2, h!("B'")), (3, h!("C'")), (4, h!("D"))],
+ exp: ExpectedResult::Ok {
+ changeset: &[
+ (2, Some(h!("B'"))),
+ (3, Some(h!("C'"))),
+ (4, Some(h!("D"))),
+ (5, None),
+ ],
+ init_changeset: &[
+ (0, Some(h!("A"))),
+ (2, Some(h!("B'"))),
+ (3, Some(h!("C'"))),
+ (4, Some(h!("D"))),
+ ],
+ },
+ },
+ // Transient invalidation:
+ // | 0 | 1 | 2 | 3 | 4
+ // chain | B C E
+ // update | B' C' D
+ // This should succeed and invalidate B, C and E with no point of agreement
+ TestLocalChain {
+ name: "transitive invalidation applies to checkpoints higher than invalidation no point of agreement",
+ chain: local_chain![(1, h!("B")), (2, h!("C")), (4, h!("E"))],
+ update: chain_update![(1, h!("B'")), (2, h!("C'")), (3, h!("D"))],
+ exp: ExpectedResult::Ok {
+ changeset: &[
+ (1, Some(h!("B'"))),
+ (2, Some(h!("C'"))),
+ (3, Some(h!("D"))),
+ (4, None)
+ ],
+ init_changeset: &[
+ (1, Some(h!("B'"))),
+ (2, Some(h!("C'"))),
+ (3, Some(h!("D"))),
+ ],
+ },
+ },
+ // Transient invalidation:
+ // | 0 | 1 | 2 | 3 | 4
+ // chain | A B C E
+ // update | B' C' D
+ // This should fail since although it tells us that B and C are invalid it doesn't tell us whether
+ // A was invalid.
+ TestLocalChain {
+ name: "invalidation but no connection",
+ chain: local_chain![(0, h!("A")), (1, h!("B")), (2, h!("C")), (4, h!("E"))],
+ update: chain_update![(1, h!("B'")), (2, h!("C'")), (3, h!("D"))],
+ exp: ExpectedResult::Err(CannotConnectError { try_include_height: 0 }),
+ },
+ // Introduce blocks between two points of agreement
+ // | 0 | 1 | 2 | 3 | 4 | 5
+ // chain | A B D E
+ // update | A C E F
+ TestLocalChain {
+ name: "introduce blocks between two points of agreement",
+ chain: local_chain![(0, h!("A")), (1, h!("B")), (3, h!("D")), (4, h!("E"))],
+ update: chain_update![(0, h!("A")), (2, h!("C")), (4, h!("E")), (5, h!("F"))],
+ exp: ExpectedResult::Ok {
+ changeset: &[
+ (2, Some(h!("C"))),
+ (5, Some(h!("F"))),
+ ],
+ init_changeset: &[
+ (0, Some(h!("A"))),
+ (1, Some(h!("B"))),
+ (2, Some(h!("C"))),
+ (3, Some(h!("D"))),
+ (4, Some(h!("E"))),
+ (5, Some(h!("F"))),
+ ],
+ },
+ },
+ ]
+ .into_iter()
+ .for_each(TestLocalChain::run);
}
#[test]
-fn insert_block() {
+fn local_chain_insert_block() {
struct TestCase {
original: LocalChain,
insert: (u32, BlockHash),
- expected_result: Result<ChangeSet, InsertBlockNotMatchingError>,
+ expected_result: Result<ChangeSet, InsertBlockError>,
expected_final: LocalChain,
}
TestCase {
original: local_chain![(2, h!("K"))],
insert: (2, h!("J")),
- expected_result: Err(InsertBlockNotMatchingError {
+ expected_result: Err(InsertBlockError {
height: 2,
original_hash: h!("K"),
update_hash: h!("J"),
let _ = graph.insert_anchor(
tx.txid(),
ConfirmationHeightAnchor {
- anchor_block: tip,
+ anchor_block: tip.block_id(),
confirmation_height: *ht,
},
);
// Assert that confirmed spends are returned correctly.
assert_eq!(
- graph.get_chain_spend(&local_chain, tip, OutPoint::new(tx_0.txid(), 0)),
+ graph.get_chain_spend(&local_chain, tip.block_id(), OutPoint::new(tx_0.txid(), 0)),
Some((
ChainPosition::Confirmed(&ConfirmationHeightAnchor {
- anchor_block: tip,
+ anchor_block: tip.block_id(),
confirmation_height: 98
}),
tx_1.txid(),
// Check if chain position is returned correctly.
assert_eq!(
- graph.get_chain_position(&local_chain, tip, tx_0.txid()),
+ graph.get_chain_position(&local_chain, tip.block_id(), tx_0.txid()),
// Some(ObservedAs::Confirmed(&local_chain.get_block(95).expect("block expected"))),
Some(ChainPosition::Confirmed(&ConfirmationHeightAnchor {
- anchor_block: tip,
+ anchor_block: tip.block_id(),
confirmation_height: 95
}))
);
// Even if unconfirmed tx has a last_seen of 0, it can still be part of a chain spend.
assert_eq!(
- graph.get_chain_spend(&local_chain, tip, OutPoint::new(tx_0.txid(), 1)),
+ graph.get_chain_spend(&local_chain, tip.block_id(), OutPoint::new(tx_0.txid(), 1)),
Some((ChainPosition::Unconfirmed(0), tx_2.txid())),
);
// Check chain spend returned correctly.
assert_eq!(
graph
- .get_chain_spend(&local_chain, tip, OutPoint::new(tx_0.txid(), 1))
+ .get_chain_spend(&local_chain, tip.block_id(), OutPoint::new(tx_0.txid(), 1))
.unwrap(),
(ChainPosition::Unconfirmed(1234567), tx_2.txid())
);
// Because this tx conflicts with an already confirmed transaction, chain position should return none.
assert!(graph
- .get_chain_position(&local_chain, tip, tx_1_conflict.txid())
+ .get_chain_position(&local_chain, tip.block_id(), tx_1_conflict.txid())
.is_none());
// Another conflicting tx that conflicts with tx_2.
// This should return a valid observation with correct last seen.
assert_eq!(
graph
- .get_chain_position(&local_chain, tip, tx_2_conflict.txid())
+ .get_chain_position(&local_chain, tip.block_id(), tx_2_conflict.txid())
.expect("position expected"),
ChainPosition::Unconfirmed(1234568)
);
// Chain_spend now catches the new transaction as the spend.
assert_eq!(
graph
- .get_chain_spend(&local_chain, tip, OutPoint::new(tx_0.txid(), 1))
+ .get_chain_spend(&local_chain, tip.block_id(), OutPoint::new(tx_0.txid(), 1))
.expect("expect observation"),
(ChainPosition::Unconfirmed(1234568), tx_2_conflict.txid())
);
// Chain position of the `tx_2` is now none, as it is older than `tx_2_conflict`
assert!(graph
- .get_chain_position(&local_chain, tip, tx_2.txid())
+ .get_chain_position(&local_chain, tip.block_id(), tx_2.txid())
.is_none());
}
use bdk_chain::{
- bitcoin::{hashes::hex::FromHex, BlockHash, OutPoint, Script, Transaction, Txid},
+ bitcoin::{hashes::hex::FromHex, OutPoint, Script, Transaction, Txid},
keychain::LocalUpdate,
- local_chain::LocalChain,
+ local_chain::{self, CheckPoint},
tx_graph::{self, TxGraph},
Anchor, BlockId, ConfirmationHeightAnchor, ConfirmationTimeAnchor,
};
-use electrum_client::{Client, ElectrumApi, Error};
+use electrum_client::{Client, ElectrumApi, Error, HeaderNotification};
use std::{
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
fmt::Debug,
};
+/// We assume that a block of this depth and deeper cannot be reorged.
+const ASSUME_FINAL_DEPTH: u32 = 8;
+
+/// Represents an update fetched from an Electrum server, but excludes full transactions.
+///
+/// To provide a complete update to [`TxGraph`], you'll need to call [`Self::missing_full_txs`] to
+/// determine the full transactions missing from [`TxGraph`]. Then call [`Self::finalize`] to fetch
+/// the full transactions from Electrum and finalize the update.
#[derive(Debug, Clone)]
pub struct ElectrumUpdate<K, A> {
+ /// Map of [`Txid`]s to associated [`Anchor`]s.
pub graph_update: HashMap<Txid, BTreeSet<A>>,
- pub chain_update: LocalChain,
+ /// The latest chain tip, as seen by the Electrum server.
+ pub new_tip: local_chain::CheckPoint,
+ /// Last-used index update for [`KeychainTxOutIndex`](bdk_chain::keychain::KeychainTxOutIndex).
pub keychain_update: BTreeMap<K, u32>,
}
-impl<K, A> Default for ElectrumUpdate<K, A> {
- fn default() -> Self {
+impl<K, A: Anchor> ElectrumUpdate<K, A> {
+ fn new(new_tip: local_chain::CheckPoint) -> Self {
Self {
- graph_update: Default::default(),
- chain_update: Default::default(),
- keychain_update: Default::default(),
+ new_tip,
+ graph_update: HashMap::new(),
+ keychain_update: BTreeMap::new(),
}
}
-}
-impl<K, A: Anchor> ElectrumUpdate<K, A> {
+ /// Determine the full transactions that are missing from `graph`.
+ ///
+ /// Refer to [`ElectrumUpdate`].
pub fn missing_full_txs<A2>(&self, graph: &TxGraph<A2>) -> Vec<Txid> {
self.graph_update
.keys()
.collect()
}
+ /// Finalizes update with `missing` txids to fetch from `client`.
+ ///
+ /// Refer to [`ElectrumUpdate`].
pub fn finalize(
self,
client: &Client,
Ok(LocalUpdate {
keychain: self.keychain_update,
graph: graph_update,
- chain: self.chain_update,
+ chain: local_chain::Update {
+ tip: self.new_tip,
+ introduce_older_blocks: true,
+ },
})
}
}
missing: Vec<Txid>,
) -> Result<LocalUpdate<K, ConfirmationTimeAnchor>, Error> {
let update = self.finalize(client, seen_at, missing)?;
+ // client.batch_transaction_get(txid)
let relevant_heights = {
let mut visited_heights = HashSet::new();
}
}
+/// Trait to extend [`Client`] functionality.
pub trait ElectrumExt<A> {
- fn get_tip(&self) -> Result<(u32, BlockHash), Error>;
-
+ /// Scan the blockchain (via electrum) for the data specified and returns a [`ElectrumUpdate`].
+ ///
+ /// - `prev_tip`: the most recent blockchain tip present locally
+ /// - `keychain_spks`: keychains that we want to scan transactions for
+ /// - `txids`: transactions for which we want updated [`Anchor`]s
+ /// - `outpoints`: transactions associated with these outpoints (residing, spending) that we
+ /// want to included in the update
+ ///
+ /// The scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated
+ /// transactions. `batch_size` specifies the max number of script pubkeys to request for in a
+ /// single batch request.
fn scan<K: Ord + Clone>(
&self,
- local_chain: &BTreeMap<u32, BlockHash>,
+ prev_tip: Option<CheckPoint>,
keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
txids: impl IntoIterator<Item = Txid>,
outpoints: impl IntoIterator<Item = OutPoint>,
batch_size: usize,
) -> Result<ElectrumUpdate<K, A>, Error>;
+ /// Convenience method to call [`scan`] without requiring a keychain.
+ ///
+ /// [`scan`]: ElectrumExt::scan
fn scan_without_keychain(
&self,
- local_chain: &BTreeMap<u32, BlockHash>,
+ prev_tip: Option<CheckPoint>,
misc_spks: impl IntoIterator<Item = Script>,
txids: impl IntoIterator<Item = Txid>,
outpoints: impl IntoIterator<Item = OutPoint>,
.map(|(i, spk)| (i as u32, spk));
self.scan(
- local_chain,
+ prev_tip,
[((), spk_iter)].into(),
txids,
outpoints,
}
impl ElectrumExt<ConfirmationHeightAnchor> for Client {
- fn get_tip(&self) -> Result<(u32, BlockHash), Error> {
- // TODO: unsubscribe when added to the client, or is there a better call to use here?
- self.block_headers_subscribe()
- .map(|data| (data.height as u32, data.header.block_hash()))
- }
-
fn scan<K: Ord + Clone>(
&self,
- local_chain: &BTreeMap<u32, BlockHash>,
+ prev_tip: Option<CheckPoint>,
keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
txids: impl IntoIterator<Item = Txid>,
outpoints: impl IntoIterator<Item = OutPoint>,
let outpoints = outpoints.into_iter().collect::<Vec<_>>();
let update = loop {
- let mut update = ElectrumUpdate::<K, ConfirmationHeightAnchor> {
- chain_update: prepare_chain_update(self, local_chain)?,
- ..Default::default()
- };
- let anchor_block = update
- .chain_update
- .tip()
- .expect("must have atleast one block");
+ let (tip, _) = construct_update_tip(self, prev_tip.clone())?;
+ let mut update = ElectrumUpdate::<K, ConfirmationHeightAnchor>::new(tip.clone());
+ let cps = update
+ .new_tip
+ .iter()
+ .take(10)
+ .map(|cp| (cp.height(), cp))
+ .collect::<BTreeMap<u32, CheckPoint>>();
if !request_spks.is_empty() {
if !scanned_spks.is_empty() {
scanned_spks.append(&mut populate_with_spks(
self,
- anchor_block,
+ &cps,
&mut update,
&mut scanned_spks
.iter()
scanned_spks.extend(
populate_with_spks(
self,
- anchor_block,
+ &cps,
&mut update,
keychain_spks,
stop_gap,
}
}
- populate_with_txids(self, anchor_block, &mut update, &mut txids.iter().cloned())?;
+ populate_with_txids(self, &cps, &mut update, &mut txids.iter().cloned())?;
- let _txs = populate_with_outpoints(
- self,
- anchor_block,
- &mut update,
- &mut outpoints.iter().cloned(),
- )?;
+ let _txs =
+ populate_with_outpoints(self, &cps, &mut update, &mut outpoints.iter().cloned())?;
// check for reorgs during scan process
- let server_blockhash = self
- .block_header(anchor_block.height as usize)?
- .block_hash();
- if anchor_block.hash != server_blockhash {
+ let server_blockhash = self.block_header(tip.height() as usize)?.block_hash();
+ if tip.hash() != server_blockhash {
continue; // reorg
}
}
}
-/// Prepare an update "template" based on the checkpoints of the `local_chain`.
-fn prepare_chain_update(
+/// Return a [`CheckPoint`] of the latest tip, that connects with `prev_tip`.
+fn construct_update_tip(
client: &Client,
- local_chain: &BTreeMap<u32, BlockHash>,
-) -> Result<LocalChain, Error> {
- let mut update = LocalChain::default();
-
- // Find the local chain block that is still there so our update can connect to the local chain.
- for (&existing_height, &existing_hash) in local_chain.iter().rev() {
- // TODO: a batch request may be safer, as a reorg that happens when we are obtaining
- // `block_header`s will result in inconsistencies
- let current_hash = client.block_header(existing_height as usize)?.block_hash();
- let _ = update
- .insert_block(BlockId {
- height: existing_height,
- hash: current_hash,
- })
- .expect("This never errors because we are working with a fresh chain");
-
- if current_hash == existing_hash {
- break;
+ prev_tip: Option<CheckPoint>,
+) -> Result<(CheckPoint, Option<u32>), Error> {
+ let HeaderNotification { height, .. } = client.block_headers_subscribe()?;
+ let new_tip_height = height as u32;
+
+ // If electrum returns a tip height that is lower than our previous tip, then checkpoints do
+ // not need updating. We just return the previous tip and use that as the point of agreement.
+ if let Some(prev_tip) = prev_tip.as_ref() {
+ if new_tip_height < prev_tip.height() {
+ return Ok((prev_tip.clone(), Some(prev_tip.height())));
}
}
- // Insert the new tip so new transactions will be accepted into the sparsechain.
- let tip = {
- let (height, hash) = crate::get_tip(client)?;
- BlockId { height, hash }
+ // Atomically fetch the latest `ASSUME_FINAL_DEPTH` count of blocks from Electrum. We use this
+ // to construct our checkpoint update.
+ let mut new_blocks = {
+ let start_height = new_tip_height.saturating_sub(ASSUME_FINAL_DEPTH);
+ let hashes = client
+ .block_headers(start_height as _, ASSUME_FINAL_DEPTH as _)?
+ .headers
+ .into_iter()
+ .map(|h| h.block_hash());
+ (start_height..).zip(hashes).collect::<BTreeMap<u32, _>>()
};
- if update.insert_block(tip).is_err() {
- // There has been a re-org before we even begin scanning addresses.
- // Just recursively call (this should never happen).
- return prepare_chain_update(client, local_chain);
- }
- Ok(update)
+ // Find the "point of agreement" (if any).
+ let agreement_cp = {
+ let mut agreement_cp = Option::<CheckPoint>::None;
+ for cp in prev_tip.iter().flat_map(CheckPoint::iter) {
+ let cp_block = cp.block_id();
+ let hash = match new_blocks.get(&cp_block.height) {
+ Some(&hash) => hash,
+ None => {
+ assert!(
+ new_tip_height >= cp_block.height,
+ "already checked that electrum's tip cannot be smaller"
+ );
+ let hash = client.block_header(cp_block.height as _)?.block_hash();
+ new_blocks.insert(cp_block.height, hash);
+ hash
+ }
+ };
+ if hash == cp_block.hash {
+ agreement_cp = Some(cp);
+ break;
+ }
+ }
+ agreement_cp
+ };
+
+ let agreement_height = agreement_cp.as_ref().map(CheckPoint::height);
+
+ let new_tip = new_blocks
+ .into_iter()
+ // Prune `new_blocks` to only include blocks that are actually new.
+ .filter(|(height, _)| Some(*height) > agreement_height)
+ .map(|(height, hash)| BlockId { height, hash })
+ .fold(agreement_cp, |prev_cp, block| {
+ Some(match prev_cp {
+ Some(cp) => cp.push(block).expect("must extend checkpoint"),
+ None => CheckPoint::new(block),
+ })
+ })
+ .expect("must have at least one checkpoint");
+
+ Ok((new_tip, agreement_height))
}
+/// A [tx status] comprises of a concatenation of `tx_hash:height:`s. We transform a single one of
+/// these concatenations into a [`ConfirmationHeightAnchor`] if possible.
+///
+/// We use the lowest possible checkpoint as the anchor block (from `cps`). If an anchor block
+/// cannot be found, or the transaction is unconfirmed, [`None`] is returned.
+///
+/// [tx status](https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-basics.html#status)
fn determine_tx_anchor(
- anchor_block: BlockId,
+ cps: &BTreeMap<u32, CheckPoint>,
raw_height: i32,
txid: Txid,
) -> Option<ConfirmationHeightAnchor> {
== Txid::from_hex("4a5e1e4baab89f3a32518a88c31bc87f618f76673e2cc77ab2127b7afdeda33b")
.expect("must deserialize genesis coinbase txid")
{
+ let anchor_block = cps.values().next()?.block_id();
return Some(ConfirmationHeightAnchor {
anchor_block,
confirmation_height: 0,
}
h => {
let h = h as u32;
+ let anchor_block = cps.range(h..).next().map(|(_, cp)| cp.block_id())?;
if h > anchor_block.height {
None
} else {
fn populate_with_outpoints<K>(
client: &Client,
- anchor_block: BlockId,
+ cps: &BTreeMap<u32, CheckPoint>,
update: &mut ElectrumUpdate<K, ConfirmationHeightAnchor>,
outpoints: &mut impl Iterator<Item = OutPoint>,
) -> Result<HashMap<Txid, Transaction>, Error> {
}
};
- let anchor = determine_tx_anchor(anchor_block, res.height, res.tx_hash);
+ let anchor = determine_tx_anchor(cps, res.height, res.tx_hash);
let tx_entry = update.graph_update.entry(res.tx_hash).or_default();
if let Some(anchor) = anchor {
fn populate_with_txids<K>(
client: &Client,
- anchor_block: BlockId,
+ cps: &BTreeMap<u32, CheckPoint>,
update: &mut ElectrumUpdate<K, ConfirmationHeightAnchor>,
txids: &mut impl Iterator<Item = Txid>,
) -> Result<(), Error> {
.into_iter()
.find(|r| r.tx_hash == txid)
{
- Some(r) => determine_tx_anchor(anchor_block, r.height, txid),
+ Some(r) => determine_tx_anchor(cps, r.height, txid),
None => continue,
};
fn populate_with_spks<K, I: Ord + Clone>(
client: &Client,
- anchor_block: BlockId,
+ cps: &BTreeMap<u32, CheckPoint>,
update: &mut ElectrumUpdate<K, ConfirmationHeightAnchor>,
spks: &mut impl Iterator<Item = (I, Script)>,
stop_gap: usize,
for tx in spk_history {
let tx_entry = update.graph_update.entry(tx.tx_hash).or_default();
- if let Some(anchor) = determine_tx_anchor(anchor_block, tx.height, tx.tx_hash) {
+ if let Some(anchor) = determine_tx_anchor(cps, tx.height, tx.tx_hash) {
tx_entry.insert(anchor);
}
}
//!
//! Refer to [`bdk_electrum_example`] for a complete example.
//!
-//! [`ElectrumClient::scan`]: ElectrumClient::scan
+//! [`ElectrumClient::scan`]: electrum_client::ElectrumClient::scan
//! [`missing_full_txs`]: ElectrumUpdate::missing_full_txs
-//! [`batch_transaction_get`]: ElectrumApi::batch_transaction_get
+//! [`batch_transaction_get`]: electrum_client::ElectrumApi::batch_transaction_get
//! [`bdk_electrum_example`]: https://github.com/LLFourn/bdk_core_staging/tree/master/bdk_electrum_example
-use bdk_chain::bitcoin::BlockHash;
-use electrum_client::{Client, ElectrumApi, Error};
+#![warn(missing_docs)]
+
mod electrum_ext;
pub use bdk_chain;
pub use electrum_client;
pub use electrum_ext::*;
-
-fn get_tip(client: &Client) -> Result<(u32, BlockHash), Error> {
- // TODO: unsubscribe when added to the client, or is there a better call to use here?
- client
- .block_headers_subscribe()
- .map(|data| (data.height as u32, data.header.block_hash()))
-}
use async_trait::async_trait;
+use bdk_chain::collections::btree_map;
use bdk_chain::{
bitcoin::{BlockHash, OutPoint, Script, Txid},
- collections::BTreeMap,
- keychain::LocalUpdate,
- BlockId, ConfirmationTimeAnchor,
+ collections::{BTreeMap, BTreeSet},
+ local_chain::{self, CheckPoint},
+ BlockId, ConfirmationTimeAnchor, TxGraph,
};
-use esplora_client::{Error, OutputStatus, TxStatus};
+use esplora_client::{Error, TxStatus};
use futures::{stream::FuturesOrdered, TryStreamExt};
-use crate::map_confirmation_time_anchor;
+use crate::{anchor_from_status, ASSUME_FINAL_DEPTH};
-/// Trait to extend [`esplora_client::AsyncClient`] functionality.
+/// Trait to extend the functionality of [`esplora_client::AsyncClient`].
///
-/// This is the async version of [`EsploraExt`]. Refer to
-/// [crate-level documentation] for more.
+/// Refer to [crate-level documentation] for more.
///
-/// [`EsploraExt`]: crate::EsploraExt
/// [crate-level documentation]: crate
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait EsploraAsyncExt {
- /// Scan the blockchain (via esplora) for the data specified and returns a
- /// [`LocalUpdate<K, ConfirmationTimeAnchor>`].
+ /// Prepare an [`LocalChain`] update with blocks fetched from Esplora.
///
- /// - `local_chain`: the most recent block hashes present locally
- /// - `keychain_spks`: keychains that we want to scan transactions for
- /// - `txids`: transactions for which we want updated [`ConfirmationTimeAnchor`]s
- /// - `outpoints`: transactions associated with these outpoints (residing, spending) that we
- /// want to included in the update
+ /// * `prev_tip` is the previous tip of [`LocalChain::tip`].
+ /// * `get_heights` is the block heights that we are interested in fetching from Esplora.
+ ///
+ /// The result of this method can be applied to [`LocalChain::apply_update`].
+ ///
+ /// [`LocalChain`]: bdk_chain::local_chain::LocalChain
+ /// [`LocalChain::tip`]: bdk_chain::local_chain::LocalChain::tip
+ /// [`LocalChain::apply_update`]: bdk_chain::local_chain::LocalChain::apply_update
+ #[allow(clippy::result_large_err)]
+ async fn update_local_chain(
+ &self,
+ local_tip: Option<CheckPoint>,
+ request_heights: impl IntoIterator<IntoIter = impl Iterator<Item = u32> + Send> + Send,
+ ) -> Result<local_chain::Update, Error>;
+
+ /// Scan Esplora for the data specified and return a [`TxGraph`] and a map of last active
+ /// indices.
+ ///
+ /// * `keychain_spks`: keychains that we want to scan transactions for
+ /// * `txids`: transactions for which we want updated [`ConfirmationTimeAnchor`]s
+ /// * `outpoints`: transactions associated with these outpoints (residing, spending) that we
+ /// want to include in the update
///
/// The scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated
/// transactions. `parallel_requests` specifies the max number of HTTP requests to make in
/// parallel.
- #[allow(clippy::result_large_err)] // FIXME
- async fn scan<K: Ord + Clone + Send>(
+ #[allow(clippy::result_large_err)]
+ async fn update_tx_graph<K: Ord + Clone + Send>(
&self,
- local_chain: &BTreeMap<u32, BlockHash>,
keychain_spks: BTreeMap<
K,
impl IntoIterator<IntoIter = impl Iterator<Item = (u32, Script)> + Send> + Send,
outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
stop_gap: usize,
parallel_requests: usize,
- ) -> Result<LocalUpdate<K, ConfirmationTimeAnchor>, Error>;
+ ) -> Result<(TxGraph<ConfirmationTimeAnchor>, BTreeMap<K, u32>), Error>;
- /// Convenience method to call [`scan`] without requiring a keychain.
+ /// Convenience method to call [`update_tx_graph`] without requiring a keychain.
///
- /// [`scan`]: EsploraAsyncExt::scan
- #[allow(clippy::result_large_err)] // FIXME
- async fn scan_without_keychain(
+ /// [`update_tx_graph`]: EsploraAsyncExt::update_tx_graph
+ #[allow(clippy::result_large_err)]
+ async fn update_tx_graph_without_keychain(
&self,
- local_chain: &BTreeMap<u32, BlockHash>,
misc_spks: impl IntoIterator<IntoIter = impl Iterator<Item = Script> + Send> + Send,
txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send> + Send,
outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
parallel_requests: usize,
- ) -> Result<LocalUpdate<(), ConfirmationTimeAnchor>, Error> {
- self.scan(
- local_chain,
+ ) -> Result<TxGraph<ConfirmationTimeAnchor>, Error> {
+ self.update_tx_graph(
[(
(),
misc_spks
parallel_requests,
)
.await
+ .map(|(g, _)| g)
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl EsploraAsyncExt for esplora_client::AsyncClient {
- #[allow(clippy::result_large_err)] // FIXME
- async fn scan<K: Ord + Clone + Send>(
+ async fn update_local_chain(
+ &self,
+ local_tip: Option<CheckPoint>,
+ request_heights: impl IntoIterator<IntoIter = impl Iterator<Item = u32> + Send> + Send,
+ ) -> Result<local_chain::Update, Error> {
+ let request_heights = request_heights.into_iter().collect::<BTreeSet<_>>();
+ let new_tip_height = self.get_height().await?;
+
+ // atomically fetch blocks from esplora
+ let mut fetched_blocks = {
+ let heights = (0..=new_tip_height).rev();
+ let hashes = self
+ .get_blocks(Some(new_tip_height))
+ .await?
+ .into_iter()
+ .map(|b| b.id);
+ heights.zip(hashes).collect::<BTreeMap<u32, BlockHash>>()
+ };
+
+ // fetch heights that the caller is interested in
+ for height in request_heights {
+ // do not fetch blocks higher than remote tip
+ if height > new_tip_height {
+ continue;
+ }
+ // only fetch what is missing
+ if let btree_map::Entry::Vacant(entry) = fetched_blocks.entry(height) {
+ let hash = self.get_block_hash(height).await?;
+ entry.insert(hash);
+ }
+ }
+
+ // find the earliest point of agreement between local chain and fetched chain
+ let earliest_agreement_cp = {
+ let mut earliest_agreement_cp = Option::<CheckPoint>::None;
+
+ if let Some(local_tip) = local_tip {
+ let local_tip_height = local_tip.height();
+ for local_cp in local_tip.iter() {
+ let local_block = local_cp.block_id();
+
+ // the updated hash (block hash at this height after the update), can either be:
+ // 1. a block that already existed in `fetched_blocks`
+ // 2. a block that exists locally and atleast has a depth of ASSUME_FINAL_DEPTH
+ // 3. otherwise we can freshly fetch the block from remote, which is safe as it
+ // is guaranteed that this would be at or below ASSUME_FINAL_DEPTH from the
+ // remote tip
+ let updated_hash = match fetched_blocks.entry(local_block.height) {
+ btree_map::Entry::Occupied(entry) => *entry.get(),
+ btree_map::Entry::Vacant(entry) => *entry.insert(
+ if local_tip_height - local_block.height >= ASSUME_FINAL_DEPTH {
+ local_block.hash
+ } else {
+ self.get_block_hash(local_block.height).await?
+ },
+ ),
+ };
+
+ // since we may introduce blocks below the point of agreement, we cannot break
+ // here unconditionally - we only break if we guarantee there are no new heights
+ // below our current local checkpoint
+ if local_block.hash == updated_hash {
+ earliest_agreement_cp = Some(local_cp);
+
+ let first_new_height = *fetched_blocks
+ .keys()
+ .next()
+ .expect("must have atleast one new block");
+ if first_new_height >= local_block.height {
+ break;
+ }
+ }
+ }
+ }
+
+ earliest_agreement_cp
+ };
+
+ let tip = {
+ // first checkpoint to use for the update chain
+ let first_cp = match earliest_agreement_cp {
+ Some(cp) => cp,
+ None => {
+ let (&height, &hash) = fetched_blocks
+ .iter()
+ .next()
+ .expect("must have atleast one new block");
+ CheckPoint::new(BlockId { height, hash })
+ }
+ };
+ // transform fetched chain into the update chain
+ fetched_blocks
+ // we exclude anything at or below the first cp of the update chain otherwise
+ // building the chain will fail
+ .split_off(&(first_cp.height() + 1))
+ .into_iter()
+ .map(|(height, hash)| BlockId { height, hash })
+ .fold(first_cp, |prev_cp, block| {
+ prev_cp.push(block).expect("must extend checkpoint")
+ })
+ };
+
+ Ok(local_chain::Update {
+ tip,
+ introduce_older_blocks: true,
+ })
+ }
+
+ async fn update_tx_graph<K: Ord + Clone + Send>(
&self,
- local_chain: &BTreeMap<u32, BlockHash>,
keychain_spks: BTreeMap<
K,
impl IntoIterator<IntoIter = impl Iterator<Item = (u32, Script)> + Send> + Send,
outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
stop_gap: usize,
parallel_requests: usize,
- ) -> Result<LocalUpdate<K, ConfirmationTimeAnchor>, Error> {
+ ) -> Result<(TxGraph<ConfirmationTimeAnchor>, BTreeMap<K, u32>), Error> {
+ type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>);
let parallel_requests = Ord::max(parallel_requests, 1);
-
- let (mut update, tip_at_start) = loop {
- let mut update = LocalUpdate::<K, ConfirmationTimeAnchor>::default();
-
- for (&height, &original_hash) in local_chain.iter().rev() {
- let update_block_id = BlockId {
- height,
- hash: self.get_block_hash(height).await?,
- };
- let _ = update
- .chain
- .insert_block(update_block_id)
- .expect("cannot repeat height here");
- if update_block_id.hash == original_hash {
- break;
- }
- }
-
- let tip_at_start = BlockId {
- height: self.get_height().await?,
- hash: self.get_tip_hash().await?,
- };
-
- if update.chain.insert_block(tip_at_start).is_ok() {
- break (update, tip_at_start);
- }
- };
+ let mut graph = TxGraph::<ConfirmationTimeAnchor>::default();
+ let mut last_active_indexes = BTreeMap::<K, u32>::new();
for (keychain, spks) in keychain_spks {
let mut spks = spks.into_iter();
- let mut last_active_index = None;
- let mut empty_scripts = 0;
- type IndexWithTxs = (u32, Vec<esplora_client::Tx>);
+ let mut last_index = Option::<u32>::None;
+ let mut last_active_index = Option::<u32>::None;
loop {
- let futures = (0..parallel_requests)
- .filter_map(|_| {
- let (index, script) = spks.next()?;
+ let handles = spks
+ .by_ref()
+ .take(parallel_requests)
+ .map(|(spk_index, spk)| {
let client = self.clone();
- Some(async move {
- let mut related_txs = client.scripthash_txs(&script, None).await?;
-
- let n_confirmed =
- related_txs.iter().filter(|tx| tx.status.confirmed).count();
- // esplora pages on 25 confirmed transactions. If there are 25 or more we
- // keep requesting to see if there's more.
- if n_confirmed >= 25 {
- loop {
- let new_related_txs = client
- .scripthash_txs(
- &script,
- Some(related_txs.last().unwrap().txid),
- )
- .await?;
- let n = new_related_txs.len();
- related_txs.extend(new_related_txs);
- // we've reached the end
- if n < 25 {
- break;
- }
+ async move {
+ let mut last_seen = None;
+ let mut spk_txs = Vec::new();
+ loop {
+ let txs = client.scripthash_txs(&spk, last_seen).await?;
+ let tx_count = txs.len();
+ last_seen = txs.last().map(|tx| tx.txid);
+ spk_txs.extend(txs);
+ if tx_count < 25 {
+ break Result::<_, Error>::Ok((spk_index, spk_txs));
}
}
-
- Result::<_, esplora_client::Error>::Ok((index, related_txs))
- })
+ }
})
.collect::<FuturesOrdered<_>>();
- let n_futures = futures.len();
+ if handles.is_empty() {
+ break;
+ }
- for (index, related_txs) in futures.try_collect::<Vec<IndexWithTxs>>().await? {
- if related_txs.is_empty() {
- empty_scripts += 1;
- } else {
+ for (index, txs) in handles.try_collect::<Vec<TxsOfSpkIndex>>().await? {
+ last_index = Some(index);
+ if !txs.is_empty() {
last_active_index = Some(index);
- empty_scripts = 0;
}
- for tx in related_txs {
- let anchor = map_confirmation_time_anchor(&tx.status, tip_at_start);
-
- let _ = update.graph.insert_tx(tx.to_tx());
- if let Some(anchor) = anchor {
- let _ = update.graph.insert_anchor(tx.txid, anchor);
+ for tx in txs {
+ let _ = graph.insert_tx(tx.to_tx());
+ if let Some(anchor) = anchor_from_status(&tx.status) {
+ let _ = graph.insert_anchor(tx.txid, anchor);
}
}
}
- if n_futures == 0 || empty_scripts >= stop_gap {
+ if last_index > last_active_index.map(|i| i + stop_gap as u32) {
break;
}
}
if let Some(last_active_index) = last_active_index {
- update.keychain.insert(keychain, last_active_index);
+ last_active_indexes.insert(keychain, last_active_index);
}
}
- for txid in txids.into_iter() {
- if update.graph.get_tx(txid).is_none() {
- match self.get_tx(&txid).await? {
- Some(tx) => {
- let _ = update.graph.insert_tx(tx);
- }
- None => continue,
- }
+ let mut txids = txids.into_iter();
+ loop {
+ let handles = txids
+ .by_ref()
+ .take(parallel_requests)
+ .filter(|&txid| graph.get_tx(txid).is_none())
+ .map(|txid| {
+ let client = self.clone();
+ async move { client.get_tx_status(&txid).await.map(|s| (txid, s)) }
+ })
+ .collect::<FuturesOrdered<_>>();
+ // .collect::<Vec<JoinHandle<Result<(Txid, TxStatus), Error>>>>();
+
+ if handles.is_empty() {
+ break;
}
- match self.get_tx_status(&txid).await? {
- tx_status if tx_status.confirmed => {
- if let Some(anchor) = map_confirmation_time_anchor(&tx_status, tip_at_start) {
- let _ = update.graph.insert_anchor(txid, anchor);
- }
+
+ for (txid, status) in handles.try_collect::<Vec<(Txid, TxStatus)>>().await? {
+ if let Some(anchor) = anchor_from_status(&status) {
+ let _ = graph.insert_anchor(txid, anchor);
}
- _ => continue,
}
}
for op in outpoints.into_iter() {
- let mut op_txs = Vec::with_capacity(2);
- if let (
- Some(tx),
- tx_status @ TxStatus {
- confirmed: true, ..
- },
- ) = (
- self.get_tx(&op.txid).await?,
- self.get_tx_status(&op.txid).await?,
- ) {
- op_txs.push((tx, tx_status));
- if let Some(OutputStatus {
- txid: Some(txid),
- status: Some(spend_status),
- ..
- }) = self.get_output_status(&op.txid, op.vout as _).await?
- {
- if let Some(spend_tx) = self.get_tx(&txid).await? {
- op_txs.push((spend_tx, spend_status));
- }
+ if graph.get_tx(op.txid).is_none() {
+ if let Some(tx) = self.get_tx(&op.txid).await? {
+ let _ = graph.insert_tx(tx);
+ }
+ let status = self.get_tx_status(&op.txid).await?;
+ if let Some(anchor) = anchor_from_status(&status) {
+ let _ = graph.insert_anchor(op.txid, anchor);
}
}
- for (tx, status) in op_txs {
- let txid = tx.txid();
- let anchor = map_confirmation_time_anchor(&status, tip_at_start);
-
- let _ = update.graph.insert_tx(tx);
- if let Some(anchor) = anchor {
- let _ = update.graph.insert_anchor(txid, anchor);
+ if let Some(op_status) = self.get_output_status(&op.txid, op.vout as _).await? {
+ if let Some(txid) = op_status.txid {
+ if graph.get_tx(txid).is_none() {
+ if let Some(tx) = self.get_tx(&txid).await? {
+ let _ = graph.insert_tx(tx);
+ }
+ let status = self.get_tx_status(&txid).await?;
+ if let Some(anchor) = anchor_from_status(&status) {
+ let _ = graph.insert_anchor(txid, anchor);
+ }
+ }
}
}
}
- if tip_at_start.hash != self.get_block_hash(tip_at_start.height).await? {
- // A reorg occurred, so let's find out where all the txids we found are now in the chain
- let txids_found = update
- .graph
- .full_txs()
- .map(|tx_node| tx_node.txid)
- .collect::<Vec<_>>();
- update.chain = EsploraAsyncExt::scan_without_keychain(
- self,
- local_chain,
- [],
- txids_found,
- [],
- parallel_requests,
- )
- .await?
- .chain;
- }
-
- Ok(update)
+ Ok((graph, last_active_indexes))
}
}
-use bdk_chain::bitcoin::{BlockHash, OutPoint, Script, Txid};
-use bdk_chain::collections::BTreeMap;
-use bdk_chain::BlockId;
-use bdk_chain::{keychain::LocalUpdate, ConfirmationTimeAnchor};
-use esplora_client::{Error, OutputStatus, TxStatus};
+use std::thread::JoinHandle;
-use crate::map_confirmation_time_anchor;
+use bdk_chain::bitcoin::{OutPoint, Txid};
+use bdk_chain::collections::btree_map;
+use bdk_chain::collections::{BTreeMap, BTreeSet};
+use bdk_chain::{
+ bitcoin::{BlockHash, Script},
+ local_chain::{self, CheckPoint},
+};
+use bdk_chain::{BlockId, ConfirmationTimeAnchor, TxGraph};
+use esplora_client::{Error, TxStatus};
-/// Trait to extend [`esplora_client::BlockingClient`] functionality.
+use crate::{anchor_from_status, ASSUME_FINAL_DEPTH};
+
+/// Trait to extend the functionality of [`esplora_client::BlockingClient`].
///
/// Refer to [crate-level documentation] for more.
///
/// [crate-level documentation]: crate
pub trait EsploraExt {
- /// Scan the blockchain (via esplora) for the data specified and returns a
- /// [`LocalUpdate<K, ConfirmationTimeAnchor>`].
+ /// Prepare an [`LocalChain`] update with blocks fetched from Esplora.
+ ///
+ /// * `prev_tip` is the previous tip of [`LocalChain::tip`].
+ /// * `get_heights` is the block heights that we are interested in fetching from Esplora.
+ ///
+ /// The result of this method can be applied to [`LocalChain::apply_update`].
+ ///
+ /// [`LocalChain`]: bdk_chain::local_chain::LocalChain
+ /// [`LocalChain::tip`]: bdk_chain::local_chain::LocalChain::tip
+ /// [`LocalChain::apply_update`]: bdk_chain::local_chain::LocalChain::apply_update
+ #[allow(clippy::result_large_err)]
+ fn update_local_chain(
+ &self,
+ local_tip: Option<CheckPoint>,
+ request_heights: impl IntoIterator<Item = u32>,
+ ) -> Result<local_chain::Update, Error>;
+
+ /// Scan Esplora for the data specified and return a [`TxGraph`] and a map of last active
+ /// indices.
///
- /// - `local_chain`: the most recent block hashes present locally
- /// - `keychain_spks`: keychains that we want to scan transactions for
- /// - `txids`: transactions for which we want updated [`ConfirmationTimeAnchor`]s
- /// - `outpoints`: transactions associated with these outpoints (residing, spending) that we
- /// want to included in the update
+ /// * `keychain_spks`: keychains that we want to scan transactions for
+ /// * `txids`: transactions for which we want updated [`ConfirmationTimeAnchor`]s
+ /// * `outpoints`: transactions associated with these outpoints (residing, spending) that we
+ /// want to include in the update
///
/// The scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated
/// transactions. `parallel_requests` specifies the max number of HTTP requests to make in
/// parallel.
- #[allow(clippy::result_large_err)] // FIXME
- fn scan<K: Ord + Clone>(
+ #[allow(clippy::result_large_err)]
+ fn update_tx_graph<K: Ord + Clone>(
&self,
- local_chain: &BTreeMap<u32, BlockHash>,
keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
txids: impl IntoIterator<Item = Txid>,
outpoints: impl IntoIterator<Item = OutPoint>,
stop_gap: usize,
parallel_requests: usize,
- ) -> Result<LocalUpdate<K, ConfirmationTimeAnchor>, Error>;
+ ) -> Result<(TxGraph<ConfirmationTimeAnchor>, BTreeMap<K, u32>), Error>;
- /// Convenience method to call [`scan`] without requiring a keychain.
+ /// Convenience method to call [`update_tx_graph`] without requiring a keychain.
///
- /// [`scan`]: EsploraExt::scan
- #[allow(clippy::result_large_err)] // FIXME
- fn scan_without_keychain(
+ /// [`update_tx_graph`]: EsploraExt::update_tx_graph
+ #[allow(clippy::result_large_err)]
+ fn update_tx_graph_without_keychain(
&self,
- local_chain: &BTreeMap<u32, BlockHash>,
misc_spks: impl IntoIterator<Item = Script>,
txids: impl IntoIterator<Item = Txid>,
outpoints: impl IntoIterator<Item = OutPoint>,
parallel_requests: usize,
- ) -> Result<LocalUpdate<(), ConfirmationTimeAnchor>, Error> {
- self.scan(
- local_chain,
+ ) -> Result<TxGraph<ConfirmationTimeAnchor>, Error> {
+ self.update_tx_graph(
[(
(),
misc_spks
usize::MAX,
parallel_requests,
)
+ .map(|(g, _)| g)
}
}
impl EsploraExt for esplora_client::BlockingClient {
- fn scan<K: Ord + Clone>(
+ fn update_local_chain(
&self,
- local_chain: &BTreeMap<u32, BlockHash>,
- keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
- txids: impl IntoIterator<Item = Txid>,
- outpoints: impl IntoIterator<Item = OutPoint>,
- stop_gap: usize,
- parallel_requests: usize,
- ) -> Result<LocalUpdate<K, ConfirmationTimeAnchor>, Error> {
- let parallel_requests = Ord::max(parallel_requests, 1);
+ local_tip: Option<CheckPoint>,
+ request_heights: impl IntoIterator<Item = u32>,
+ ) -> Result<local_chain::Update, Error> {
+ let request_heights = request_heights.into_iter().collect::<BTreeSet<_>>();
+ let new_tip_height = self.get_height()?;
- let (mut update, tip_at_start) = loop {
- let mut update = LocalUpdate::<K, ConfirmationTimeAnchor>::default();
-
- for (&height, &original_hash) in local_chain.iter().rev() {
- let update_block_id = BlockId {
- height,
- hash: self.get_block_hash(height)?,
- };
- let _ = update
- .chain
- .insert_block(update_block_id)
- .expect("cannot repeat height here");
- if update_block_id.hash == original_hash {
- break;
- }
+ // atomically fetch blocks from esplora
+ let mut fetched_blocks = {
+ let heights = (0..=new_tip_height).rev();
+ let hashes = self
+ .get_blocks(Some(new_tip_height))?
+ .into_iter()
+ .map(|b| b.id);
+ heights.zip(hashes).collect::<BTreeMap<u32, BlockHash>>()
+ };
+
+ // fetch heights that the caller is interested in
+ for height in request_heights {
+ // do not fetch blocks higher than remote tip
+ if height > new_tip_height {
+ continue;
+ }
+ // only fetch what is missing
+ if let btree_map::Entry::Vacant(entry) = fetched_blocks.entry(height) {
+ let hash = self.get_block_hash(height)?;
+ entry.insert(hash);
}
+ }
- let tip_at_start = BlockId {
- height: self.get_height()?,
- hash: self.get_tip_hash()?,
- };
+ // find the earliest point of agreement between local chain and fetched chain
+ let earliest_agreement_cp = {
+ let mut earliest_agreement_cp = Option::<CheckPoint>::None;
+
+ if let Some(local_tip) = local_tip {
+ let local_tip_height = local_tip.height();
+ for local_cp in local_tip.iter() {
+ let local_block = local_cp.block_id();
+
+ // the updated hash (block hash at this height after the update), can either be:
+ // 1. a block that already existed in `fetched_blocks`
+ // 2. a block that exists locally and atleast has a depth of ASSUME_FINAL_DEPTH
+ // 3. otherwise we can freshly fetch the block from remote, which is safe as it
+ // is guaranteed that this would be at or below ASSUME_FINAL_DEPTH from the
+ // remote tip
+ let updated_hash = match fetched_blocks.entry(local_block.height) {
+ btree_map::Entry::Occupied(entry) => *entry.get(),
+ btree_map::Entry::Vacant(entry) => *entry.insert(
+ if local_tip_height - local_block.height >= ASSUME_FINAL_DEPTH {
+ local_block.hash
+ } else {
+ self.get_block_hash(local_block.height)?
+ },
+ ),
+ };
+
+ // since we may introduce blocks below the point of agreement, we cannot break
+ // here unconditionally - we only break if we guarantee there are no new heights
+ // below our current local checkpoint
+ if local_block.hash == updated_hash {
+ earliest_agreement_cp = Some(local_cp);
- if update.chain.insert_block(tip_at_start).is_ok() {
- break (update, tip_at_start);
+ let first_new_height = *fetched_blocks
+ .keys()
+ .next()
+ .expect("must have atleast one new block");
+ if first_new_height >= local_block.height {
+ break;
+ }
+ }
+ }
}
+
+ earliest_agreement_cp
};
+ let tip = {
+ // first checkpoint to use for the update chain
+ let first_cp = match earliest_agreement_cp {
+ Some(cp) => cp,
+ None => {
+ let (&height, &hash) = fetched_blocks
+ .iter()
+ .next()
+ .expect("must have atleast one new block");
+ CheckPoint::new(BlockId { height, hash })
+ }
+ };
+ // transform fetched chain into the update chain
+ fetched_blocks
+ // we exclude anything at or below the first cp of the update chain otherwise
+ // building the chain will fail
+ .split_off(&(first_cp.height() + 1))
+ .into_iter()
+ .map(|(height, hash)| BlockId { height, hash })
+ .fold(first_cp, |prev_cp, block| {
+ prev_cp.push(block).expect("must extend checkpoint")
+ })
+ };
+
+ Ok(local_chain::Update {
+ tip,
+ introduce_older_blocks: true,
+ })
+ }
+
+ fn update_tx_graph<K: Ord + Clone>(
+ &self,
+ keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
+ txids: impl IntoIterator<Item = Txid>,
+ outpoints: impl IntoIterator<Item = OutPoint>,
+ stop_gap: usize,
+ parallel_requests: usize,
+ ) -> Result<(TxGraph<ConfirmationTimeAnchor>, BTreeMap<K, u32>), Error> {
+ type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>);
+ let parallel_requests = Ord::max(parallel_requests, 1);
+ let mut graph = TxGraph::<ConfirmationTimeAnchor>::default();
+ let mut last_active_indexes = BTreeMap::<K, u32>::new();
+
for (keychain, spks) in keychain_spks {
let mut spks = spks.into_iter();
- let mut last_active_index = None;
- let mut empty_scripts = 0;
- type IndexWithTxs = (u32, Vec<esplora_client::Tx>);
+ let mut last_index = Option::<u32>::None;
+ let mut last_active_index = Option::<u32>::None;
loop {
- let handles = (0..parallel_requests)
- .filter_map(
- |_| -> Option<std::thread::JoinHandle<Result<IndexWithTxs, _>>> {
- let (index, script) = spks.next()?;
+ let handles = spks
+ .by_ref()
+ .take(parallel_requests)
+ .map(|(spk_index, spk)| {
+ std::thread::spawn({
let client = self.clone();
- Some(std::thread::spawn(move || {
- let mut related_txs = client.scripthash_txs(&script, None)?;
-
- let n_confirmed =
- related_txs.iter().filter(|tx| tx.status.confirmed).count();
- // esplora pages on 25 confirmed transactions. If there are 25 or more we
- // keep requesting to see if there's more.
- if n_confirmed >= 25 {
- loop {
- let new_related_txs = client.scripthash_txs(
- &script,
- Some(related_txs.last().unwrap().txid),
- )?;
- let n = new_related_txs.len();
- related_txs.extend(new_related_txs);
- // we've reached the end
- if n < 25 {
- break;
- }
+ move || -> Result<TxsOfSpkIndex, Error> {
+ let mut last_seen = None;
+ let mut spk_txs = Vec::new();
+ loop {
+ let txs = client.scripthash_txs(&spk, last_seen)?;
+ let tx_count = txs.len();
+ last_seen = txs.last().map(|tx| tx.txid);
+ spk_txs.extend(txs);
+ if tx_count < 25 {
+ break Ok((spk_index, spk_txs));
}
}
+ }
+ })
+ })
+ .collect::<Vec<JoinHandle<Result<TxsOfSpkIndex, Error>>>>();
- Result::<_, esplora_client::Error>::Ok((index, related_txs))
- }))
- },
- )
- .collect::<Vec<_>>();
-
- let n_handles = handles.len();
+ if handles.is_empty() {
+ break;
+ }
for handle in handles {
- let (index, related_txs) = handle.join().unwrap()?; // TODO: don't unwrap
- if related_txs.is_empty() {
- empty_scripts += 1;
- } else {
+ let (index, txs) = handle.join().expect("thread must not panic")?;
+ last_index = Some(index);
+ if !txs.is_empty() {
last_active_index = Some(index);
- empty_scripts = 0;
}
- for tx in related_txs {
- let anchor = map_confirmation_time_anchor(&tx.status, tip_at_start);
-
- let _ = update.graph.insert_tx(tx.to_tx());
- if let Some(anchor) = anchor {
- let _ = update.graph.insert_anchor(tx.txid, anchor);
+ for tx in txs {
+ let _ = graph.insert_tx(tx.to_tx());
+ if let Some(anchor) = anchor_from_status(&tx.status) {
+ let _ = graph.insert_anchor(tx.txid, anchor);
}
}
}
- if n_handles == 0 || empty_scripts >= stop_gap {
+ if last_index > last_active_index.map(|i| i + stop_gap as u32) {
break;
}
}
if let Some(last_active_index) = last_active_index {
- update.keychain.insert(keychain, last_active_index);
+ last_active_indexes.insert(keychain, last_active_index);
}
}
- for txid in txids.into_iter() {
- if update.graph.get_tx(txid).is_none() {
- match self.get_tx(&txid)? {
- Some(tx) => {
- let _ = update.graph.insert_tx(tx);
- }
- None => continue,
- }
+ let mut txids = txids.into_iter();
+ loop {
+ let handles = txids
+ .by_ref()
+ .take(parallel_requests)
+ .filter(|&txid| graph.get_tx(txid).is_none())
+ .map(|txid| {
+ std::thread::spawn({
+ let client = self.clone();
+ move || client.get_tx_status(&txid).map(|s| (txid, s))
+ })
+ })
+ .collect::<Vec<JoinHandle<Result<(Txid, TxStatus), Error>>>>();
+
+ if handles.is_empty() {
+ break;
}
- match self.get_tx_status(&txid)? {
- tx_status @ TxStatus {
- confirmed: true, ..
- } => {
- if let Some(anchor) = map_confirmation_time_anchor(&tx_status, tip_at_start) {
- let _ = update.graph.insert_anchor(txid, anchor);
- }
+
+ for handle in handles {
+ let (txid, status) = handle.join().expect("thread must not panic")?;
+ if let Some(anchor) = anchor_from_status(&status) {
+ let _ = graph.insert_anchor(txid, anchor);
}
- _ => continue,
}
}
for op in outpoints.into_iter() {
- let mut op_txs = Vec::with_capacity(2);
- if let (
- Some(tx),
- tx_status @ TxStatus {
- confirmed: true, ..
- },
- ) = (self.get_tx(&op.txid)?, self.get_tx_status(&op.txid)?)
- {
- op_txs.push((tx, tx_status));
- if let Some(OutputStatus {
- txid: Some(txid),
- status: Some(spend_status),
- ..
- }) = self.get_output_status(&op.txid, op.vout as _)?
- {
- if let Some(spend_tx) = self.get_tx(&txid)? {
- op_txs.push((spend_tx, spend_status));
- }
+ if graph.get_tx(op.txid).is_none() {
+ if let Some(tx) = self.get_tx(&op.txid)? {
+ let _ = graph.insert_tx(tx);
+ }
+ let status = self.get_tx_status(&op.txid)?;
+ if let Some(anchor) = anchor_from_status(&status) {
+ let _ = graph.insert_anchor(op.txid, anchor);
}
}
- for (tx, status) in op_txs {
- let txid = tx.txid();
- let anchor = map_confirmation_time_anchor(&status, tip_at_start);
-
- let _ = update.graph.insert_tx(tx);
- if let Some(anchor) = anchor {
- let _ = update.graph.insert_anchor(txid, anchor);
+ if let Some(op_status) = self.get_output_status(&op.txid, op.vout as _)? {
+ if let Some(txid) = op_status.txid {
+ if graph.get_tx(txid).is_none() {
+ if let Some(tx) = self.get_tx(&txid)? {
+ let _ = graph.insert_tx(tx);
+ }
+ let status = self.get_tx_status(&txid)?;
+ if let Some(anchor) = anchor_from_status(&status) {
+ let _ = graph.insert_anchor(txid, anchor);
+ }
+ }
}
}
}
- if tip_at_start.hash != self.get_block_hash(tip_at_start.height)? {
- // A reorg occurred, so let's find out where all the txids we found are now in the chain
- let txids_found = update
- .graph
- .full_txs()
- .map(|tx_node| tx_node.txid)
- .collect::<Vec<_>>();
- update.chain = EsploraExt::scan_without_keychain(
- self,
- local_chain,
- [],
- txids_found,
- [],
- parallel_requests,
- )?
- .chain;
- }
-
- Ok(update)
+ Ok((graph, last_active_indexes))
}
}
#[cfg(feature = "async")]
pub use async_ext::*;
-pub(crate) fn map_confirmation_time_anchor(
- tx_status: &TxStatus,
- tip_at_start: BlockId,
-) -> Option<ConfirmationTimeAnchor> {
- match (tx_status.block_time, tx_status.block_height) {
- (Some(confirmation_time), Some(confirmation_height)) => Some(ConfirmationTimeAnchor {
- anchor_block: tip_at_start,
- confirmation_height,
- confirmation_time,
- }),
- _ => None,
+const ASSUME_FINAL_DEPTH: u32 = 15;
+
+fn anchor_from_status(status: &TxStatus) -> Option<ConfirmationTimeAnchor> {
+ if let TxStatus {
+ block_height: Some(height),
+ block_hash: Some(hash),
+ block_time: Some(time),
+ ..
+ } = status.clone()
+ {
+ Some(ConfirmationTimeAnchor {
+ anchor_block: BlockId { height, hash },
+ confirmation_height: height,
+ confirmation_time: time,
+ })
+ } else {
+ None
}
}
};
use bdk_chain::{
- bitcoin::{Address, BlockHash, Network, OutPoint, Txid},
+ bitcoin::{Address, Network, OutPoint, Txid},
indexed_tx_graph::{IndexedAdditions, IndexedTxGraph},
keychain::LocalChangeSet,
local_chain::LocalChain,
};
const DB_MAGIC: &[u8] = b"bdk_example_electrum";
-const DB_PATH: &str = ".bdk_electrum_example.db";
-const ASSUME_FINAL_DEPTH: usize = 10;
+const DB_PATH: &str = ".bdk_example_electrum.db";
#[derive(Subcommand, Debug, Clone)]
enum ElectrumCommands {
graph
});
- let chain = Mutex::new({
- let mut chain = LocalChain::default();
- chain.apply_changeset(init_changeset.chain_changeset);
- chain
- });
+ let chain = Mutex::new(LocalChain::from_changeset(init_changeset.chain_changeset));
let electrum_url = match args.network {
Network::Bitcoin => "ssl://electrum.blockstream.info:50002",
stop_gap,
scan_options,
} => {
- let (keychain_spks, local_chain) = {
+ let (keychain_spks, tip) = {
let graph = &*graph.lock().unwrap();
let chain = &*chain.lock().unwrap();
})
.collect::<BTreeMap<_, _>>();
- let c = chain
- .blocks()
- .iter()
- .rev()
- .take(ASSUME_FINAL_DEPTH)
- .map(|(k, v)| (*k, *v))
- .collect::<BTreeMap<u32, BlockHash>>();
-
- (keychain_spks, c)
+ let tip = chain.tip();
+ (keychain_spks, tip)
};
client
.scan(
- &local_chain,
+ tip,
keychain_spks,
core::iter::empty(),
core::iter::empty(),
// Get a short lock on the tracker to get the spks we're interested in
let graph = graph.lock().unwrap();
let chain = chain.lock().unwrap();
- let chain_tip = chain.tip().unwrap_or_default();
+ let chain_tip = chain.tip().map(|cp| cp.block_id()).unwrap_or_default();
if !(all_spks || unused_spks || utxos || unconfirmed) {
unused_spks = true;
}));
}
- let c = chain
- .blocks()
- .iter()
- .rev()
- .take(ASSUME_FINAL_DEPTH)
- .map(|(k, v)| (*k, *v))
- .collect::<BTreeMap<u32, BlockHash>>();
+ let tip = chain.tip();
// drop lock on graph and chain
drop((graph, chain));
let update = client
- .scan_without_keychain(&c, spks, txids, outpoints, scan_options.batch_size)
+ .scan_without_keychain(tip, spks, txids, outpoints, scan_options.batch_size)
.context("scanning the blockchain")?;
ElectrumUpdate {
graph_update: update.graph_update,
- chain_update: update.chain_update,
+ new_tip: update.new_tip,
keychain_update: BTreeMap::new(),
}
}
print!("Syncing...");
let client = electrum_client::Client::new("ssl://electrum.blockstream.info:60002")?;
- let local_chain = wallet.checkpoints();
+ let prev_tip = wallet.latest_checkpoint();
let keychain_spks = wallet
.spks_of_all_keychains()
.into_iter()
})
.collect();
- let electrum_update =
- client.scan(local_chain, keychain_spks, None, None, STOP_GAP, BATCH_SIZE)?;
+ let electrum_update = client.scan(prev_tip, keychain_spks, None, None, STOP_GAP, BATCH_SIZE)?;
println!();
const DB_MAGIC: &str = "bdk_wallet_esplora_example";
-const SEND_AMOUNT: u64 = 5000;
-const STOP_GAP: usize = 50;
-const PARALLEL_REQUESTS: usize = 5;
+const SEND_AMOUNT: u64 = 1000;
+const STOP_GAP: usize = 5;
+const PARALLEL_REQUESTS: usize = 1;
use std::{io::Write, str::FromStr};
use bdk::{
bitcoin::{Address, Network},
+ chain::keychain::LocalUpdate,
wallet::AddressIndex,
SignOptions, Wallet,
};
let client =
esplora_client::Builder::new("https://blockstream.info/testnet/api").build_blocking()?;
- let local_chain = wallet.checkpoints();
+ let prev_tip = wallet.latest_checkpoint();
let keychain_spks = wallet
.spks_of_all_keychains()
.into_iter()
(k, k_spks)
})
.collect();
- let update = client.scan(
- local_chain,
- keychain_spks,
- None,
- None,
- STOP_GAP,
- PARALLEL_REQUESTS,
- )?;
- println!();
+
+ let (update_graph, last_active_indices) =
+ client.update_tx_graph(keychain_spks, None, None, STOP_GAP, PARALLEL_REQUESTS)?;
+ let get_heights = wallet.tx_graph().missing_blocks(wallet.local_chain());
+ let chain_update = client.update_local_chain(prev_tip, get_heights)?;
+ let update = LocalUpdate {
+ keychain: last_active_indices,
+ graph: update_graph,
+ ..LocalUpdate::new(chain_update)
+ };
+
wallet.apply_update(update)?;
wallet.commit()?;
+ println!();
let balance = wallet.get_balance();
println!("Wallet balance after syncing: {} sats", balance.total());
use bdk::{
bitcoin::{Address, Network},
+ chain::keychain::LocalUpdate,
wallet::AddressIndex,
SignOptions, Wallet,
};
let client =
esplora_client::Builder::new("https://blockstream.info/testnet/api").build_async()?;
- let local_chain = wallet.checkpoints();
+ let prev_tip = wallet.latest_checkpoint();
let keychain_spks = wallet
.spks_of_all_keychains()
.into_iter()
(k, k_spks)
})
.collect();
- let update = client
- .scan(
- local_chain,
- keychain_spks,
- [],
- [],
- STOP_GAP,
- PARALLEL_REQUESTS,
- )
+ let (update_graph, last_active_indices) = client
+ .update_tx_graph(keychain_spks, None, None, STOP_GAP, PARALLEL_REQUESTS)
.await?;
- println!();
+ let get_heights = wallet.tx_graph().missing_blocks(wallet.local_chain());
+ let chain_update = client.update_local_chain(prev_tip, get_heights).await?;
+ let update = LocalUpdate {
+ keychain: last_active_indices,
+ graph: update_graph,
+ ..LocalUpdate::new(chain_update)
+ };
wallet.apply_update(update)?;
wallet.commit()?;
+ println!();
let balance = wallet.get_balance();
println!("Wallet balance after syncing: {} sats", balance.total());