]> Untitled Git - bdk/commitdiff
Implement linked-list `LocalChain` and update chain-src crates/examples
author志宇 <hello@evanlinjin.me>
Wed, 19 Jul 2023 09:42:52 +0000 (17:42 +0800)
committer志宇 <hello@evanlinjin.me>
Fri, 28 Jul 2023 03:30:16 +0000 (11:30 +0800)
This commit changes the `LocalChain` implementation to have blocks
stored as a linked-list. This allows the data-src thread to hold a
shared ref to a single checkpoint and have access to the whole history
of checkpoints without cloning or keeping a lock on `LocalChain`.

The APIs of `bdk::Wallet`, `esplora` and `electrum` are also updated to
reflect these changes. Note that the `esplora` crate is rewritten to
anchor txs in the confirmation block (using the esplora API's tx status
block_hash). This guarantees 100% consistency between anchor blocks and
their transactions (instead of anchoring txs to the latest tip).
`ExploraExt` now has separate methods for updating the `TxGraph` and
`LocalChain`.

A new method `TxGraph::missing_blocks` is introduced for finding
"floating anchors" of a `TxGraph` update (given a chain).

Additional changes:

* `test_local_chain.rs` is refactored to make test cases easier to
  write. Additional tests are also added.
* Examples are updated.
* Fix `tempfile` dev dependency of `bdk_file_store` to work with MSRV

Co-authored-by: LLFourn <lloyd.fourn@gmail.com>
18 files changed:
crates/bdk/src/wallet/mod.rs
crates/bdk/tests/wallet.rs
crates/chain/src/keychain.rs
crates/chain/src/local_chain.rs
crates/chain/src/tx_graph.rs
crates/chain/tests/common/mod.rs
crates/chain/tests/test_indexed_tx_graph.rs
crates/chain/tests/test_local_chain.rs
crates/chain/tests/test_tx_graph.rs
crates/electrum/src/electrum_ext.rs
crates/electrum/src/lib.rs
crates/esplora/src/async_ext.rs
crates/esplora/src/blocking_ext.rs
crates/esplora/src/lib.rs
example-crates/example_electrum/src/main.rs
example-crates/wallet_electrum/src/main.rs
example-crates/wallet_esplora/src/main.rs
example-crates/wallet_esplora_async/src/main.rs

index f2f717d9fea9cfa9c14ad754dd9ef8d9ec09625d..634c5c66c0f8d6ba9fe32c665e4adc86c9519eb2 100644 (file)
@@ -23,7 +23,7 @@ pub use bdk_chain::keychain::Balance;
 use bdk_chain::{
     indexed_tx_graph::IndexedAdditions,
     keychain::{KeychainTxOutIndex, LocalChangeSet, LocalUpdate},
-    local_chain::{self, LocalChain, UpdateNotConnectedError},
+    local_chain::{self, CannotConnectError, CheckPoint, CheckPointIter, LocalChain},
     tx_graph::{CanonicalTx, TxGraph},
     Append, BlockId, ChainPosition, ConfirmationTime, ConfirmationTimeAnchor, FullTxOut,
     IndexedTxGraph, Persist, PersistBackend,
@@ -32,8 +32,8 @@ use bitcoin::consensus::encode::serialize;
 use bitcoin::secp256k1::Secp256k1;
 use bitcoin::util::psbt;
 use bitcoin::{
-    Address, BlockHash, EcdsaSighashType, LockTime, Network, OutPoint, SchnorrSighashType, Script,
-    Sequence, Transaction, TxOut, Txid, Witness,
+    Address, EcdsaSighashType, LockTime, Network, OutPoint, SchnorrSighashType, Script, Sequence,
+    Transaction, TxOut, Txid, Witness,
 };
 use core::fmt;
 use core::ops::Deref;
@@ -245,7 +245,7 @@ impl<D> Wallet<D> {
         };
 
         let changeset = db.load_from_persistence().map_err(NewError::Persist)?;
-        chain.apply_changeset(changeset.chain_changeset);
+        chain.apply_changeset(&changeset.chain_changeset);
         indexed_graph.apply_additions(changeset.indexed_additions);
 
         let persist = Persist::new(db);
@@ -370,19 +370,19 @@ impl<D> Wallet<D> {
             .graph()
             .filter_chain_unspents(
                 &self.chain,
-                self.chain.tip().unwrap_or_default(),
+                self.chain.tip().map(|cp| cp.block_id()).unwrap_or_default(),
                 self.indexed_graph.index.outpoints().iter().cloned(),
             )
             .map(|((k, i), full_txo)| new_local_utxo(k, i, full_txo))
     }
 
     /// Get all the checkpoints the wallet is currently storing indexed by height.
-    pub fn checkpoints(&self) -> &BTreeMap<u32, BlockHash> {
-        self.chain.blocks()
+    pub fn checkpoints(&self) -> CheckPointIter {
+        self.chain.iter_checkpoints()
     }
 
     /// Returns the latest checkpoint.
-    pub fn latest_checkpoint(&self) -> Option<BlockId> {
+    pub fn latest_checkpoint(&self) -> Option<CheckPoint> {
         self.chain.tip()
     }
 
@@ -420,7 +420,7 @@ impl<D> Wallet<D> {
             .graph()
             .filter_chain_unspents(
                 &self.chain,
-                self.chain.tip().unwrap_or_default(),
+                self.chain.tip().map(|cp| cp.block_id()).unwrap_or_default(),
                 core::iter::once((spk_i, op)),
             )
             .map(|((k, i), full_txo)| new_local_utxo(k, i, full_txo))
@@ -437,7 +437,7 @@ impl<D> Wallet<D> {
         let canonical_tx = CanonicalTx {
             observed_as: graph.get_chain_position(
                 &self.chain,
-                self.chain.tip().unwrap_or_default(),
+                self.chain.tip().map(|cp| cp.block_id()).unwrap_or_default(),
                 txid,
             )?,
             node: graph.get_tx_node(txid)?,
@@ -460,7 +460,7 @@ impl<D> Wallet<D> {
     pub fn insert_checkpoint(
         &mut self,
         block_id: BlockId,
-    ) -> Result<bool, local_chain::InsertBlockNotMatchingError>
+    ) -> Result<bool, local_chain::InsertBlockError>
     where
         D: PersistBackend<ChangeSet>,
     {
@@ -500,17 +500,17 @@ impl<D> Wallet<D> {
                 // anchor tx to checkpoint with lowest height that is >= position's height
                 let anchor = self
                     .chain
-                    .blocks()
+                    .heights()
                     .range(height..)
                     .next()
                     .ok_or(InsertTxError::ConfirmationHeightCannotBeGreaterThanTip {
-                        tip_height: self.chain.tip().map(|b| b.height),
+                        tip_height: self.chain.tip().map(|b| b.height()),
                         tx_height: height,
                     })
-                    .map(|(&anchor_height, &anchor_hash)| ConfirmationTimeAnchor {
+                    .map(|(&anchor_height, &hash)| ConfirmationTimeAnchor {
                         anchor_block: BlockId {
                             height: anchor_height,
-                            hash: anchor_hash,
+                            hash,
                         },
                         confirmation_height: height,
                         confirmation_time: time,
@@ -531,9 +531,10 @@ impl<D> Wallet<D> {
     pub fn transactions(
         &self,
     ) -> impl Iterator<Item = CanonicalTx<'_, Transaction, ConfirmationTimeAnchor>> + '_ {
-        self.indexed_graph
-            .graph()
-            .list_chain_txs(&self.chain, self.chain.tip().unwrap_or_default())
+        self.indexed_graph.graph().list_chain_txs(
+            &self.chain,
+            self.chain.tip().map(|cp| cp.block_id()).unwrap_or_default(),
+        )
     }
 
     /// Return the balance, separated into available, trusted-pending, untrusted-pending and immature
@@ -541,7 +542,7 @@ impl<D> Wallet<D> {
     pub fn get_balance(&self) -> Balance {
         self.indexed_graph.graph().balance(
             &self.chain,
-            self.chain.tip().unwrap_or_default(),
+            self.chain.tip().map(|cp| cp.block_id()).unwrap_or_default(),
             self.indexed_graph.index.outpoints().iter().cloned(),
             |&(k, _), _| k == KeychainKind::Internal,
         )
@@ -715,8 +716,7 @@ impl<D> Wallet<D> {
             None => self
                 .chain
                 .tip()
-                .and_then(|cp| cp.height.into())
-                .map(|height| LockTime::from_height(height).expect("Invalid height")),
+                .map(|cp| LockTime::from_height(cp.height()).expect("Invalid height")),
             h => h,
         };
 
@@ -1030,7 +1030,7 @@ impl<D> Wallet<D> {
     ) -> Result<TxBuilder<'_, D, DefaultCoinSelectionAlgorithm, BumpFee>, Error> {
         let graph = self.indexed_graph.graph();
         let txout_index = &self.indexed_graph.index;
-        let chain_tip = self.chain.tip().unwrap_or_default();
+        let chain_tip = self.chain.tip().map(|cp| cp.block_id()).unwrap_or_default();
 
         let mut tx = graph
             .get_tx(txid)
@@ -1265,7 +1265,7 @@ impl<D> Wallet<D> {
         psbt: &mut psbt::PartiallySignedTransaction,
         sign_options: SignOptions,
     ) -> Result<bool, Error> {
-        let chain_tip = self.chain.tip().unwrap_or_default();
+        let chain_tip = self.chain.tip().map(|cp| cp.block_id()).unwrap_or_default();
 
         let tx = &psbt.unsigned_tx;
         let mut finished = true;
@@ -1288,7 +1288,7 @@ impl<D> Wallet<D> {
                 });
             let current_height = sign_options
                 .assume_height
-                .or(self.chain.tip().map(|b| b.height));
+                .or(self.chain.tip().map(|b| b.height()));
 
             debug!(
                 "Input #{} - {}, using `confirmation_height` = {:?}, `current_height` = {:?}",
@@ -1433,7 +1433,7 @@ impl<D> Wallet<D> {
         must_only_use_confirmed_tx: bool,
         current_height: Option<u32>,
     ) -> (Vec<WeightedUtxo>, Vec<WeightedUtxo>) {
-        let chain_tip = self.chain.tip().unwrap_or_default();
+        let chain_tip = self.chain.tip().map(|cp| cp.block_id()).unwrap_or_default();
         //    must_spend <- manually selected utxos
         //    may_spend  <- all other available utxos
         let mut may_spend = self.get_available_utxos();
@@ -1697,24 +1697,25 @@ impl<D> Wallet<D> {
     }
 
     /// Applies an update to the wallet and stages the changes (but does not [`commit`] them).
-    ///
-    /// This returns whether the `update` resulted in any changes.
+    /// Returns whether the `update` resulted in any changes.
     ///
     /// Usually you create an `update` by interacting with some blockchain data source and inserting
     /// transactions related to your wallet into it.
     ///
     /// [`commit`]: Self::commit
-    pub fn apply_update(&mut self, update: Update) -> Result<bool, UpdateNotConnectedError>
+    pub fn apply_update(&mut self, update: Update) -> Result<bool, CannotConnectError>
     where
         D: PersistBackend<ChangeSet>,
     {
-        let mut changeset: ChangeSet = self.chain.apply_update(update.chain)?.into();
+        let mut changeset = ChangeSet::from(self.chain.apply_update(update.chain)?);
         let (_, index_additions) = self
             .indexed_graph
             .index
             .reveal_to_target_multi(&update.keychain);
         changeset.append(ChangeSet::from(IndexedAdditions::from(index_additions)));
-        changeset.append(self.indexed_graph.apply_update(update.graph).into());
+        changeset.append(ChangeSet::from(
+            self.indexed_graph.apply_update(update.graph),
+        ));
 
         let changed = !changeset.is_empty();
         self.persist.stage(changeset);
index 282a74fcb9c4f56f3276723acb3191c7b6a49fb1..e8ded3146eafdc8fb1ab572c72a4c1a49a6ea2f4 100644 (file)
@@ -44,7 +44,10 @@ fn receive_output(wallet: &mut Wallet, value: u64, height: ConfirmationTime) ->
 
 fn receive_output_in_latest_block(wallet: &mut Wallet, value: u64) -> OutPoint {
     let height = match wallet.latest_checkpoint() {
-        Some(BlockId { height, .. }) => ConfirmationTime::Confirmed { height, time: 0 },
+        Some(cp) => ConfirmationTime::Confirmed {
+            height: cp.height(),
+            time: 0,
+        },
         None => ConfirmationTime::Unconfirmed { last_seen: 0 },
     };
     receive_output(wallet, value, height)
@@ -222,7 +225,7 @@ fn test_create_tx_fee_sniping_locktime_last_sync() {
     // If there's no current_height we're left with using the last sync height
     assert_eq!(
         psbt.unsigned_tx.lock_time.0,
-        wallet.latest_checkpoint().unwrap().height
+        wallet.latest_checkpoint().unwrap().height()
     );
 }
 
@@ -426,11 +429,7 @@ fn test_create_tx_drain_wallet_and_drain_to_and_with_recipient() {
 fn test_create_tx_drain_to_and_utxos() {
     let (mut wallet, _) = get_funded_wallet(get_test_wpkh());
     let addr = wallet.get_address(New);
-    let utxos: Vec<_> = wallet
-        .list_unspent()
-        .into_iter()
-        .map(|u| u.outpoint)
-        .collect();
+    let utxos: Vec<_> = wallet.list_unspent().map(|u| u.outpoint).collect();
     let mut builder = wallet.build_tx();
     builder
         .drain_to(addr.script_pubkey())
@@ -1482,7 +1481,7 @@ fn test_bump_fee_drain_wallet() {
         .insert_tx(
             tx.clone(),
             ConfirmationTime::Confirmed {
-                height: wallet.latest_checkpoint().unwrap().height,
+                height: wallet.latest_checkpoint().unwrap().height(),
                 time: 42_000,
             },
         )
index f9b2436f291724d274b1f9327435f3c7ac10b893..cc85df4cf91cb00b8629cc5b4265497c813f0316 100644 (file)
 //! [`SpkTxOutIndex`]: crate::SpkTxOutIndex
 
 use crate::{
-    collections::BTreeMap,
-    indexed_tx_graph::IndexedAdditions,
-    local_chain::{self, LocalChain},
-    tx_graph::TxGraph,
+    collections::BTreeMap, indexed_tx_graph::IndexedAdditions, local_chain, tx_graph::TxGraph,
     Anchor, Append,
 };
 
@@ -89,24 +86,32 @@ impl<K> AsRef<BTreeMap<K, u32>> for DerivationAdditions<K> {
     }
 }
 
-/// A structure to update [`KeychainTxOutIndex`], [`TxGraph`] and [`LocalChain`]
-/// atomically.
-#[derive(Debug, Clone, PartialEq)]
+/// A structure to update [`KeychainTxOutIndex`], [`TxGraph`] and [`LocalChain`] atomically.
+///
+/// [`LocalChain`]: local_chain::LocalChain
+#[derive(Debug, Clone)]
 pub struct LocalUpdate<K, A> {
     /// Last active derivation index per keychain (`K`).
     pub keychain: BTreeMap<K, u32>,
+
     /// Update for the [`TxGraph`].
     pub graph: TxGraph<A>,
+
     /// Update for the [`LocalChain`].
-    pub chain: LocalChain,
+    ///
+    /// [`LocalChain`]: local_chain::LocalChain
+    pub chain: local_chain::Update,
 }
 
-impl<K, A> Default for LocalUpdate<K, A> {
-    fn default() -> Self {
+impl<K, A> LocalUpdate<K, A> {
+    /// Construct a [`LocalUpdate`] with a given [`CheckPoint`] tip.
+    ///
+    /// [`CheckPoint`]: local_chain::CheckPoint
+    pub fn new(chain_update: local_chain::Update) -> Self {
         Self {
-            keychain: Default::default(),
-            graph: Default::default(),
-            chain: Default::default(),
+            keychain: BTreeMap::new(),
+            graph: TxGraph::default(),
+            chain: chain_update,
         }
     }
 }
@@ -126,6 +131,8 @@ impl<K, A> Default for LocalUpdate<K, A> {
 )]
 pub struct LocalChangeSet<K, A> {
     /// Changes to the [`LocalChain`].
+    ///
+    /// [`LocalChain`]: local_chain::LocalChain
     pub chain_changeset: local_chain::ChangeSet,
 
     /// Additions to [`IndexedTxGraph`].
index fe97e3f27915c26451800ccb25a3529689351623..92feac4c1607d43bd2a5bc2453601035153651cb 100644 (file)
 
 use core::convert::Infallible;
 
-use alloc::collections::BTreeMap;
+use crate::collections::BTreeMap;
+use crate::{BlockId, ChainOracle};
+use alloc::sync::Arc;
 use bitcoin::BlockHash;
 
-use crate::{BlockId, ChainOracle};
+/// A structure that represents changes to [`LocalChain`].
+pub type ChangeSet = BTreeMap<u32, Option<BlockHash>>;
+
+/// A blockchain of [`LocalChain`].
+///
+/// The in a linked-list with newer blocks pointing to older ones.
+#[derive(Debug, Clone)]
+pub struct CheckPoint(Arc<CPInner>);
+
+/// The internal contents of [`CheckPoint`].
+#[derive(Debug, Clone)]
+struct CPInner {
+    /// Block id (hash and height).
+    block: BlockId,
+    /// Previous checkpoint (if any).
+    prev: Option<Arc<CPInner>>,
+}
+
+impl CheckPoint {
+    /// Construct a new base block at the front of a linked list.
+    pub fn new(block: BlockId) -> Self {
+        Self(Arc::new(CPInner { block, prev: None }))
+    }
+
+    /// Puts another checkpoint onto the linked list representing the blockchain.
+    ///
+    /// Returns an `Err(self)` if the block you are pushing on is not at a greater height that the one you
+    /// are pushing on to.
+    pub fn push(self, block: BlockId) -> Result<Self, Self> {
+        if self.height() < block.height {
+            Ok(Self(Arc::new(CPInner {
+                block,
+                prev: Some(self.0),
+            })))
+        } else {
+            Err(self)
+        }
+    }
+
+    /// Extends the checkpoint linked list by a iterator of block ids.
+    ///
+    /// Returns an `Err(self)` if there is block which does not have a greater height than the
+    /// previous one.
+    pub fn extend_with_blocks(
+        self,
+        blocks: impl IntoIterator<Item = BlockId>,
+    ) -> Result<Self, Self> {
+        let mut curr = self.clone();
+        for block in blocks {
+            curr = curr.push(block).map_err(|_| self.clone())?;
+        }
+        Ok(curr)
+    }
+
+    /// Get the [`BlockId`] of the checkpoint.
+    pub fn block_id(&self) -> BlockId {
+        self.0.block
+    }
+
+    /// Get the height of the checkpoint.
+    pub fn height(&self) -> u32 {
+        self.0.block.height
+    }
+
+    /// Get the block hash of the checkpoint.
+    pub fn hash(&self) -> BlockHash {
+        self.0.block.hash
+    }
+
+    /// Get the previous checkpoint in the chain
+    pub fn prev(&self) -> Option<CheckPoint> {
+        self.0.prev.clone().map(CheckPoint)
+    }
+
+    /// Iterate from this checkpoint in descending height.
+    pub fn iter(&self) -> CheckPointIter {
+        self.clone().into_iter()
+    }
+}
+
+/// A structure that iterates over checkpoints backwards.
+pub struct CheckPointIter {
+    current: Option<Arc<CPInner>>,
+}
+
+impl Iterator for CheckPointIter {
+    type Item = CheckPoint;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        let current = self.current.clone()?;
+        self.current = current.prev.clone();
+        Some(CheckPoint(current))
+    }
+}
+
+impl IntoIterator for CheckPoint {
+    type Item = CheckPoint;
+    type IntoIter = CheckPointIter;
+
+    fn into_iter(self) -> Self::IntoIter {
+        CheckPointIter {
+            current: Some(self.0),
+        }
+    }
+}
+
+/// Represents an update to [`LocalChain`].
+#[derive(Debug, Clone)]
+pub struct Update {
+    /// The update's new [`CheckPoint`] tip.
+    pub tip: CheckPoint,
+
+    /// Whether the update allows for introducing older blocks.
+    ///
+    /// Refer to [`LocalChain::apply_update`] for more.
+    ///
+    /// [`LocalChain::apply_update`]: crate::local_chain::LocalChain::apply_update
+    pub introduce_older_blocks: bool,
+}
 
 /// This is a local implementation of [`ChainOracle`].
-#[derive(Debug, Default, Clone, PartialEq, Eq, PartialOrd, Ord)]
+#[derive(Debug, Default, Clone)]
 pub struct LocalChain {
-    blocks: BTreeMap<u32, BlockHash>,
+    tip: Option<CheckPoint>,
+    index: BTreeMap<u32, BlockHash>,
+}
+
+impl PartialEq for LocalChain {
+    fn eq(&self, other: &Self) -> bool {
+        self.index == other.index
+    }
+}
+
+impl From<LocalChain> for BTreeMap<u32, BlockHash> {
+    fn from(value: LocalChain) -> Self {
+        value.index
+    }
+}
+
+impl From<ChangeSet> for LocalChain {
+    fn from(value: ChangeSet) -> Self {
+        Self::from_changeset(value)
+    }
+}
+
+impl From<BTreeMap<u32, BlockHash>> for LocalChain {
+    fn from(value: BTreeMap<u32, BlockHash>) -> Self {
+        Self::from_blocks(value)
+    }
 }
 
 impl ChainOracle for LocalChain {
@@ -19,215 +164,271 @@ impl ChainOracle for LocalChain {
     fn is_block_in_chain(
         &self,
         block: BlockId,
-        static_block: BlockId,
+        chain_tip: BlockId,
     ) -> Result<Option<bool>, Self::Error> {
-        if block.height > static_block.height {
+        if block.height > chain_tip.height {
             return Ok(None);
         }
         Ok(
             match (
-                self.blocks.get(&block.height),
-                self.blocks.get(&static_block.height),
+                self.index.get(&block.height),
+                self.index.get(&chain_tip.height),
             ) {
-                (Some(&hash), Some(&static_hash)) => {
-                    Some(hash == block.hash && static_hash == static_block.hash)
-                }
+                (Some(cp), Some(tip_cp)) => Some(*cp == block.hash && *tip_cp == chain_tip.hash),
                 _ => None,
             },
         )
     }
 
     fn get_chain_tip(&self) -> Result<Option<BlockId>, Self::Error> {
-        Ok(self.tip())
-    }
-}
-
-impl AsRef<BTreeMap<u32, BlockHash>> for LocalChain {
-    fn as_ref(&self) -> &BTreeMap<u32, BlockHash> {
-        &self.blocks
-    }
-}
-
-impl From<LocalChain> for BTreeMap<u32, BlockHash> {
-    fn from(value: LocalChain) -> Self {
-        value.blocks
-    }
-}
-
-impl From<BTreeMap<u32, BlockHash>> for LocalChain {
-    fn from(value: BTreeMap<u32, BlockHash>) -> Self {
-        Self { blocks: value }
+        Ok(self.tip.as_ref().map(|tip| tip.block_id()))
     }
 }
 
 impl LocalChain {
-    /// Contruct a [`LocalChain`] from a list of [`BlockId`]s.
-    pub fn from_blocks<B>(blocks: B) -> Self
-    where
-        B: IntoIterator<Item = BlockId>,
-    {
-        Self {
-            blocks: blocks.into_iter().map(|b| (b.height, b.hash)).collect(),
-        }
-    }
+    /// Construct a [`LocalChain`] from an initial `changeset`.
+    pub fn from_changeset(changeset: ChangeSet) -> Self {
+        let mut chain = Self::default();
+        chain.apply_changeset(&changeset);
 
-    /// Get a reference to a map of block height to hash.
-    pub fn blocks(&self) -> &BTreeMap<u32, BlockHash> {
-        &self.blocks
-    }
+        #[cfg(debug_assertions)]
+        chain._check_consistency(Some(&changeset));
 
-    /// Get the chain tip.
-    pub fn tip(&self) -> Option<BlockId> {
-        self.blocks
-            .iter()
-            .last()
-            .map(|(&height, &hash)| BlockId { height, hash })
+        chain
     }
 
-    /// This is like the sparsechain's logic, expect we must guarantee that all invalidated heights
-    /// are to be re-filled.
-    pub fn determine_changeset(&self, update: &Self) -> Result<ChangeSet, UpdateNotConnectedError> {
-        let update = update.as_ref();
-        let update_tip = match update.keys().last().cloned() {
-            Some(tip) => tip,
-            None => return Ok(ChangeSet::default()),
-        };
-
-        // this is the latest height where both the update and local chain has the same block hash
-        let agreement_height = update
-            .iter()
-            .rev()
-            .find(|&(u_height, u_hash)| self.blocks.get(u_height) == Some(u_hash))
-            .map(|(&height, _)| height);
-
-        // the lower bound of the range to invalidate
-        let invalidate_lb = match agreement_height {
-            Some(height) if height == update_tip => u32::MAX,
-            Some(height) => height + 1,
-            None => 0,
+    /// Construct a [`LocalChain`] from a given `checkpoint` tip.
+    pub fn from_tip(tip: CheckPoint) -> Self {
+        let mut _self = Self {
+            tip: Some(tip),
+            ..Default::default()
         };
+        _self.reindex(0);
 
-        // the first block's height to invalidate in the local chain
-        let invalidate_from_height = self.blocks.range(invalidate_lb..).next().map(|(&h, _)| h);
+        #[cfg(debug_assertions)]
+        _self._check_consistency(None);
 
-        // the first block of height to invalidate (if any) should be represented in the update
-        if let Some(first_invalid_height) = invalidate_from_height {
-            if !update.contains_key(&first_invalid_height) {
-                return Err(UpdateNotConnectedError(first_invalid_height));
-            }
-        }
+        _self
+    }
 
-        let mut changeset: BTreeMap<u32, Option<BlockHash>> = match invalidate_from_height {
-            Some(first_invalid_height) => {
-                // the first block of height to invalidate should be represented in the update
-                if !update.contains_key(&first_invalid_height) {
-                    return Err(UpdateNotConnectedError(first_invalid_height));
+    /// Constructs a [`LocalChain`] from a [`BTreeMap`] of height to [`BlockHash`].
+    ///
+    /// The [`BTreeMap`] enforces the height order. However, the caller must ensure the blocks are
+    /// all of the same chain.
+    pub fn from_blocks(blocks: BTreeMap<u32, BlockHash>) -> Self {
+        let mut tip: Option<CheckPoint> = None;
+
+        for block in &blocks {
+            match tip {
+                Some(curr) => {
+                    tip = Some(
+                        curr.push(BlockId::from(block))
+                            .expect("BTreeMap is ordered"),
+                    )
                 }
-                self.blocks
-                    .range(first_invalid_height..)
-                    .map(|(height, _)| (*height, None))
-                    .collect()
-            }
-            None => BTreeMap::new(),
-        };
-        for (height, update_hash) in update {
-            let original_hash = self.blocks.get(height);
-            if Some(update_hash) != original_hash {
-                changeset.insert(*height, Some(*update_hash));
+                None => tip = Some(CheckPoint::new(BlockId::from(block))),
             }
         }
 
-        Ok(changeset)
+        let chain = Self { index: blocks, tip };
+
+        #[cfg(debug_assertions)]
+        chain._check_consistency(None);
+
+        chain
     }
 
-    /// Applies the given `changeset`.
-    pub fn apply_changeset(&mut self, changeset: ChangeSet) {
-        for (height, blockhash) in changeset {
-            match blockhash {
-                Some(blockhash) => self.blocks.insert(height, blockhash),
-                None => self.blocks.remove(&height),
-            };
-        }
+    /// Get the highest checkpoint.
+    pub fn tip(&self) -> Option<CheckPoint> {
+        self.tip.clone()
     }
 
-    /// Updates [`LocalChain`] with an update [`LocalChain`].
+    /// Returns whether the [`LocalChain`] is empty (has no checkpoints).
+    pub fn is_empty(&self) -> bool {
+        self.tip.is_none()
+    }
+
+    /// Updates [`Self`] with the given `update_tip`.
     ///
-    /// This is equivalent to calling [`determine_changeset`] and [`apply_changeset`] in sequence.
+    /// `introduce_older_blocks` specifies whether the `update_tip`'s history can introduce blocks
+    /// below the original chain's tip without invalidating blocks. Block-by-block syncing
+    /// mechanisms would typically create updates that builds upon the previous tip. In this case,
+    /// this paramater would be false. Script-pubkey based syncing mechanisms may not introduce
+    /// transactions in a chronological order so some updates require introducing older blocks (to
+    /// anchor older transactions). For script-pubkey based syncing, this parameter would typically
+    /// be true.
     ///
-    /// [`determine_changeset`]: Self::determine_changeset
-    /// [`apply_changeset`]: Self::apply_changeset
-    pub fn apply_update(&mut self, update: Self) -> Result<ChangeSet, UpdateNotConnectedError> {
-        let changeset = self.determine_changeset(&update)?;
-        self.apply_changeset(changeset.clone());
-        Ok(changeset)
+    /// The method returns [`ChangeSet`] on success. This represents the applied changes to
+    /// [`Self`].
+    ///
+    /// To update, the `update_tip` must *connect* with `self`. If `self` and `update_tip` has a
+    /// mutual checkpoint (same height and hash), it can connect if:
+    /// * The mutual checkpoint is the tip of `self`.
+    /// * An ancestor of `update_tip` has a height which is of the checkpoint one higher than the
+    ///         mutual checkpoint from `self`.
+    ///
+    /// Additionally:
+    /// * If `self` is empty, `update_tip` will always connect.
+    /// * If `self` only has one checkpoint, `update_tip` must have an ancestor checkpoint with the
+    ///     same height as it.
+    ///
+    /// To invalidate from a given checkpoint, `update_tip` must contain an ancestor checkpoint with
+    /// the same height but different hash.
+    ///
+    /// # Errors
+    ///
+    /// An error will occur if the update does not correctly connect with `self`.
+    ///
+    /// Refer to [module-level documentation] for more.
+    ///
+    /// [module-level documentation]: crate::local_chain
+    pub fn apply_update(&mut self, update: Update) -> Result<ChangeSet, CannotConnectError> {
+        match self.tip() {
+            Some(original_tip) => {
+                let changeset = merge_chains(
+                    original_tip,
+                    update.tip.clone(),
+                    update.introduce_older_blocks,
+                )?;
+                self.apply_changeset(&changeset);
+
+                // return early as `apply_changeset` already calls `check_consistency`
+                Ok(changeset)
+            }
+            None => {
+                *self = Self::from_tip(update.tip);
+                let changeset = self.initial_changeset();
+
+                #[cfg(debug_assertions)]
+                self._check_consistency(Some(&changeset));
+                Ok(changeset)
+            }
+        }
     }
 
-    /// Derives a [`ChangeSet`] that assumes that there are no preceding changesets.
-    ///
-    /// The changeset returned will record additions of all blocks included in [`Self`].
-    pub fn initial_changeset(&self) -> ChangeSet {
-        self.blocks
-            .iter()
-            .map(|(&height, &hash)| (height, Some(hash)))
-            .collect()
+    /// Apply the given `changeset`.
+    pub fn apply_changeset(&mut self, changeset: &ChangeSet) {
+        if let Some(start_height) = changeset.keys().next().cloned() {
+            let mut extension = BTreeMap::default();
+            let mut base: Option<CheckPoint> = None;
+            for cp in self.iter_checkpoints() {
+                if cp.height() >= start_height {
+                    extension.insert(cp.height(), cp.hash());
+                } else {
+                    base = Some(cp);
+                    break;
+                }
+            }
+
+            for (&height, &hash) in changeset {
+                match hash {
+                    Some(hash) => {
+                        extension.insert(height, hash);
+                    }
+                    None => {
+                        extension.remove(&height);
+                    }
+                };
+            }
+            let new_tip = match base {
+                Some(base) => Some(
+                    base.extend_with_blocks(extension.into_iter().map(BlockId::from))
+                        .expect("extension is strictly greater than base"),
+                ),
+                None => LocalChain::from_blocks(extension).tip(),
+            };
+            self.tip = new_tip;
+            self.reindex(start_height);
+
+            #[cfg(debug_assertions)]
+            self._check_consistency(Some(changeset));
+        }
     }
 
-    /// Insert a block of [`BlockId`] into the [`LocalChain`].
+    /// Insert a [`BlockId`].
     ///
-    /// # Error
+    /// # Errors
     ///
-    /// If the insertion height already contains a block, and the block has a different blockhash,
-    /// this will result in an [`InsertBlockNotMatchingError`].
-    pub fn insert_block(
-        &mut self,
-        block_id: BlockId,
-    ) -> Result<ChangeSet, InsertBlockNotMatchingError> {
-        let mut update = Self::from_blocks(self.tip());
-
-        if let Some(original_hash) = update.blocks.insert(block_id.height, block_id.hash) {
+    /// Replacing the block hash of an existing checkpoint will result in an error.
+    pub fn insert_block(&mut self, block_id: BlockId) -> Result<ChangeSet, InsertBlockError> {
+        if let Some(&original_hash) = self.index.get(&block_id.height) {
             if original_hash != block_id.hash {
-                return Err(InsertBlockNotMatchingError {
+                return Err(InsertBlockError {
                     height: block_id.height,
                     original_hash,
                     update_hash: block_id.hash,
                 });
+            } else {
+                return Ok(ChangeSet::default());
             }
         }
 
-        Ok(self.apply_update(update).expect("should always connect"))
+        let mut changeset = ChangeSet::default();
+        changeset.insert(block_id.height, Some(block_id.hash));
+        self.apply_changeset(&changeset);
+        Ok(changeset)
     }
-}
 
-/// This is the return value of [`determine_changeset`] and represents changes to [`LocalChain`].
-///
-/// [`determine_changeset`]: LocalChain::determine_changeset
-pub type ChangeSet = BTreeMap<u32, Option<BlockHash>>;
+    /// Reindex the heights in the chain from (and including) `from` height
+    fn reindex(&mut self, from: u32) {
+        let _ = self.index.split_off(&from);
+        for cp in self.iter_checkpoints() {
+            if cp.height() < from {
+                break;
+            }
+            self.index.insert(cp.height(), cp.hash());
+        }
+    }
 
-/// Represents an update failure of [`LocalChain`] due to the update not connecting to the original
-/// chain.
-///
-/// The update cannot be applied to the chain because the chain suffix it represents did not
-/// connect to the existing chain. This error case contains the checkpoint height to include so
-/// that the chains can connect.
-#[derive(Clone, Debug, PartialEq)]
-pub struct UpdateNotConnectedError(pub u32);
+    /// Derives an initial [`ChangeSet`], meaning that it can be applied to an empty chain to
+    /// recover the current chain.
+    pub fn initial_changeset(&self) -> ChangeSet {
+        self.index.iter().map(|(k, v)| (*k, Some(*v))).collect()
+    }
 
-impl core::fmt::Display for UpdateNotConnectedError {
-    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
-        write!(
-            f,
-            "the update cannot connect with the chain, try include block at height {}",
-            self.0
-        )
+    /// Iterate over checkpoints in decending height order.
+    pub fn iter_checkpoints(&self) -> CheckPointIter {
+        CheckPointIter {
+            current: self.tip.as_ref().map(|tip| tip.0.clone()),
+        }
     }
-}
 
-#[cfg(feature = "std")]
-impl std::error::Error for UpdateNotConnectedError {}
+    /// Get a reference to the internal index mapping the height to block hash
+    pub fn heights(&self) -> &BTreeMap<u32, BlockHash> {
+        &self.index
+    }
+
+    /// Checkpoints that exist under `self.tip` and blocks indexed in `self.index` should be equal.
+    /// Additionally, if a `changeset` is provided, the changes specified in the `changeset` should
+    /// be reflected in `self.index`.
+    #[cfg(debug_assertions)]
+    fn _check_consistency(&self, changeset: Option<&ChangeSet>) {
+        debug_assert_eq!(
+            self.tip
+                .iter()
+                .flat_map(CheckPoint::iter)
+                .map(|cp| (cp.height(), cp.hash()))
+                .collect::<BTreeMap<_, _>>(),
+            self.index,
+            "checkpoint history and index must be consistent"
+        );
+
+        if let Some(changeset) = changeset {
+            for (height, exp_hash) in changeset {
+                let hash = self.index.get(height);
+                assert_eq!(
+                    hash,
+                    exp_hash.as_ref(),
+                    "changeset changes should be reflected in the internal index"
+                );
+            }
+        }
+    }
+}
 
 /// Represents a failure when trying to insert a checkpoint into [`LocalChain`].
 #[derive(Clone, Debug, PartialEq)]
-pub struct InsertBlockNotMatchingError {
+pub struct InsertBlockError {
     /// The checkpoints' height.
     pub height: u32,
     /// Original checkpoint's block hash.
@@ -236,7 +437,7 @@ pub struct InsertBlockNotMatchingError {
     pub update_hash: BlockHash,
 }
 
-impl core::fmt::Display for InsertBlockNotMatchingError {
+impl core::fmt::Display for InsertBlockError {
     fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
         write!(
             f,
@@ -247,4 +448,129 @@ impl core::fmt::Display for InsertBlockNotMatchingError {
 }
 
 #[cfg(feature = "std")]
-impl std::error::Error for InsertBlockNotMatchingError {}
+impl std::error::Error for InsertBlockError {}
+
+/// Occurs when an update does not have a common checkpoint with the original chain.
+#[derive(Clone, Debug, PartialEq)]
+pub struct CannotConnectError {
+    /// The suggested checkpoint to include to connect the two chains.
+    pub try_include_height: u32,
+}
+
+impl core::fmt::Display for CannotConnectError {
+    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
+        write!(
+            f,
+            "introduced chain cannot connect with the original chain, try include height {}",
+            self.try_include_height,
+        )
+    }
+}
+
+#[cfg(feature = "std")]
+impl std::error::Error for CannotConnectError {}
+
+fn merge_chains(
+    original_tip: CheckPoint,
+    update_tip: CheckPoint,
+    introduce_older_blocks: bool,
+) -> Result<ChangeSet, CannotConnectError> {
+    let mut changeset = ChangeSet::default();
+    let mut orig = original_tip.into_iter();
+    let mut update = update_tip.into_iter();
+    let mut curr_orig = None;
+    let mut curr_update = None;
+    let mut prev_orig: Option<CheckPoint> = None;
+    let mut prev_update: Option<CheckPoint> = None;
+    let mut point_of_agreement_found = false;
+    let mut prev_orig_was_invalidated = false;
+    let mut potentially_invalidated_heights = vec![];
+
+    // To find the difference between the new chain and the original we iterate over both of them
+    // from the tip backwards in tandem. We always dealing with the highest one from either chain
+    // first and move to the next highest. The crucial logic is applied when they have blocks at the
+    // same height.
+    loop {
+        if curr_orig.is_none() {
+            curr_orig = orig.next();
+        }
+        if curr_update.is_none() {
+            curr_update = update.next();
+        }
+
+        match (curr_orig.as_ref(), curr_update.as_ref()) {
+            // Update block that doesn't exist in the original chain
+            (o, Some(u)) if Some(u.height()) > o.map(|o| o.height()) => {
+                changeset.insert(u.height(), Some(u.hash()));
+                prev_update = curr_update.take();
+            }
+            // Original block that isn't in the update
+            (Some(o), u) if Some(o.height()) > u.map(|u| u.height()) => {
+                // this block might be gone if an earlier block gets invalidated
+                potentially_invalidated_heights.push(o.height());
+                prev_orig_was_invalidated = false;
+                prev_orig = curr_orig.take();
+
+                // OPTIMIZATION: we have run out of update blocks so we don't need to continue
+                // iterating becuase there's no possibility of adding anything to changeset.
+                if u.is_none() {
+                    break;
+                }
+            }
+            (Some(o), Some(u)) => {
+                if o.hash() == u.hash() {
+                    // We have found our point of agreement 🎉 -- we require that the previous (i.e.
+                    // higher because we are iterating backwards) block in the original chain was
+                    // invalidated (if it exists). This ensures that there is an unambigious point of
+                    // connection to the original chain from the update chain (i.e. we know the
+                    // precisely which original blocks are invalid).
+                    if !prev_orig_was_invalidated && !point_of_agreement_found {
+                        if let (Some(prev_orig), Some(_prev_update)) = (&prev_orig, &prev_update) {
+                            return Err(CannotConnectError {
+                                try_include_height: prev_orig.height(),
+                            });
+                        }
+                    }
+                    point_of_agreement_found = true;
+                    prev_orig_was_invalidated = false;
+                    // OPTIMIZATION 1 -- If we know that older blocks cannot be introduced without
+                    // invalidation, we can break after finding the point of agreement.
+                    // OPTIMIZATION 2 -- if we have the same underlying pointer at this point, we
+                    // can guarantee that no older blocks are introduced.
+                    if !introduce_older_blocks || Arc::as_ptr(&o.0) == Arc::as_ptr(&u.0) {
+                        return Ok(changeset);
+                    }
+                } else {
+                    // We have an invalidation height so we set the height to the updated hash and
+                    // also purge all the original chain block hashes above this block.
+                    changeset.insert(u.height(), Some(u.hash()));
+                    for invalidated_height in potentially_invalidated_heights.drain(..) {
+                        changeset.insert(invalidated_height, None);
+                    }
+                    prev_orig_was_invalidated = true;
+                }
+                prev_update = curr_update.take();
+                prev_orig = curr_orig.take();
+            }
+            (None, None) => {
+                break;
+            }
+            _ => {
+                unreachable!("compiler cannot tell that everything has been covered")
+            }
+        }
+    }
+
+    // When we don't have a point of agreement you can imagine it is implicitly the
+    // genesis block so we need to do the final connectivity check which in this case
+    // just means making sure the entire original chain was invalidated.
+    if !prev_orig_was_invalidated && !point_of_agreement_found {
+        if let Some(prev_orig) = prev_orig {
+            return Err(CannotConnectError {
+                try_include_height: prev_orig.height(),
+            });
+        }
+    }
+
+    Ok(changeset)
+}
index bc72cc50fd9e93f3e77dda6776ca4f2d9ae73eda..de7a5bca5c4714fa372357a9d16be10f27010f84 100644 (file)
@@ -56,8 +56,8 @@
 //! ```
 
 use crate::{
-    collections::*, keychain::Balance, Anchor, Append, BlockId, ChainOracle, ChainPosition,
-    ForEachTxOut, FullTxOut,
+    collections::*, keychain::Balance, local_chain::LocalChain, Anchor, Append, BlockId,
+    ChainOracle, ChainPosition, ForEachTxOut, FullTxOut,
 };
 use alloc::vec::Vec;
 use bitcoin::{OutPoint, Script, Transaction, TxOut, Txid};
@@ -598,6 +598,31 @@ impl<A: Clone + Ord> TxGraph<A> {
 }
 
 impl<A: Anchor> TxGraph<A> {
+    /// Find missing block heights of `chain`.
+    ///
+    /// This works by scanning through anchors, and seeing whether the anchor block of the anchor
+    /// exists in the [`LocalChain`].
+    pub fn missing_blocks<'a>(&'a self, chain: &'a LocalChain) -> impl Iterator<Item = u32> + 'a {
+        self.anchors
+            .iter()
+            .map(|(a, _)| a.anchor_block())
+            .filter({
+                let mut last_block = Option::<BlockId>::None;
+                move |block| {
+                    if last_block.as_ref() == Some(block) {
+                        false
+                    } else {
+                        last_block = Some(*block);
+                        true
+                    }
+                }
+            })
+            .filter_map(|block| match chain.heights().get(&block.height) {
+                Some(chain_hash) if *chain_hash == block.hash => None,
+                _ => Some(block.height),
+            })
+    }
+
     /// Get the position of the transaction in `chain` with tip `chain_tip`.
     ///
     /// If the given transaction of `txid` does not exist in the chain of `chain_tip`, `None` is
index 7d7288bdfc4d1dd23794f286d1c6e8c0fda32a4e..a32d9c557f39c7b1cbbf697bcfbb8bb22cfd2e13 100644 (file)
@@ -9,25 +9,20 @@ macro_rules! h {
 macro_rules! local_chain {
     [ $(($height:expr, $block_hash:expr)), * ] => {{
         #[allow(unused_mut)]
-        bdk_chain::local_chain::LocalChain::from_blocks([$(($height, $block_hash).into()),*])
+        bdk_chain::local_chain::LocalChain::from_blocks([$(($height, $block_hash).into()),*].into_iter().collect())
     }};
 }
 
 #[allow(unused_macros)]
-macro_rules! chain {
-    ($([$($tt:tt)*]),*) => { chain!( checkpoints: [$([$($tt)*]),*] ) };
-    (checkpoints: $($tail:tt)*) => { chain!( index: TxHeight, checkpoints: $($tail)*) };
-    (index: $ind:ty, checkpoints: [ $([$height:expr, $block_hash:expr]),* ] $(,txids: [$(($txid:expr, $tx_height:expr)),*])?) => {{
+macro_rules! chain_update {
+    [ $(($height:expr, $hash:expr)), * ] => {{
         #[allow(unused_mut)]
-        let mut chain = bdk_chain::sparse_chain::SparseChain::<$ind>::from_checkpoints([$(($height, $block_hash).into()),*]);
-
-        $(
-            $(
-                let _ = chain.insert_tx($txid, $tx_height).expect("should succeed");
-            )*
-        )?
-
-        chain
+        bdk_chain::local_chain::Update {
+            tip: bdk_chain::local_chain::LocalChain::from_blocks([$(($height, $hash).into()),*].into_iter().collect())
+            .tip()
+            .expect("must have tip"),
+            introduce_older_blocks: true,
+        }
     }};
 }
 
index 2ebd913c2e75041210ef3ded2221b193912064bb..16ec872654018e810d652ebf027d69cf35b0cf81 100644 (file)
@@ -109,8 +109,8 @@ fn test_list_owned_txouts() {
     // Create Local chains
 
     let local_chain = (0..150)
-        .map(|i| (i as u32, h!("random")))
-        .collect::<BTreeMap<u32, BlockHash>>();
+        .map(|i| (i as u32, Some(h!("random"))))
+        .collect::<BTreeMap<u32, Option<BlockHash>>>();
     let local_chain = LocalChain::from(local_chain);
 
     // Initiate IndexedTxGraph
@@ -212,9 +212,10 @@ fn test_list_owned_txouts() {
             (
                 *tx,
                 local_chain
-                    .blocks()
+                    .heights()
                     .get(&height)
-                    .map(|&hash| BlockId { height, hash })
+                    .cloned()
+                    .map(|hash| BlockId { height, hash })
                     .map(|anchor_block| ConfirmationHeightAnchor {
                         anchor_block,
                         confirmation_height: anchor_block.height,
@@ -231,10 +232,10 @@ fn test_list_owned_txouts() {
         |height: u32,
          graph: &IndexedTxGraph<ConfirmationHeightAnchor, KeychainTxOutIndex<String>>| {
             let chain_tip = local_chain
-                .blocks()
+                .heights()
                 .get(&height)
                 .map(|&hash| BlockId { height, hash })
-                .expect("block must exist");
+                .unwrap_or_else(|| panic!("block must exist at {}", height));
             let txouts = graph
                 .graph()
                 .filter_chain_txouts(
index 55d8af1133b66555665806c26b85d3f4c3567703..aaa2c371df0edeac1f989abe736e0eab69641ecf 100644 (file)
-use bdk_chain::local_chain::{
-    ChangeSet, InsertBlockNotMatchingError, LocalChain, UpdateNotConnectedError,
-};
+use bdk_chain::local_chain::{CannotConnectError, ChangeSet, InsertBlockError, LocalChain, Update};
 use bitcoin::BlockHash;
 
 #[macro_use]
 mod common;
 
-#[test]
-fn add_first_tip() {
-    let chain = LocalChain::default();
-    assert_eq!(
-        chain.determine_changeset(&local_chain![(0, h!("A"))]),
-        Ok([(0, Some(h!("A")))].into()),
-        "add first tip"
-    );
-}
-
-#[test]
-fn add_second_tip() {
-    let chain = local_chain![(0, h!("A"))];
-    assert_eq!(
-        chain.determine_changeset(&local_chain![(0, h!("A")), (1, h!("B"))]),
-        Ok([(1, Some(h!("B")))].into())
-    );
-}
-
-#[test]
-fn two_disjoint_chains_cannot_merge() {
-    let chain1 = local_chain![(0, h!("A"))];
-    let chain2 = local_chain![(1, h!("B"))];
-    assert_eq!(
-        chain1.determine_changeset(&chain2),
-        Err(UpdateNotConnectedError(0))
-    );
-}
-
-#[test]
-fn duplicate_chains_should_merge() {
-    let chain1 = local_chain![(0, h!("A"))];
-    let chain2 = local_chain![(0, h!("A"))];
-    assert_eq!(chain1.determine_changeset(&chain2), Ok(Default::default()));
-}
-
-#[test]
-fn can_introduce_older_checkpoints() {
-    let chain1 = local_chain![(2, h!("C")), (3, h!("D"))];
-    let chain2 = local_chain![(1, h!("B")), (2, h!("C"))];
-
-    assert_eq!(
-        chain1.determine_changeset(&chain2),
-        Ok([(1, Some(h!("B")))].into())
-    );
-}
-
-#[test]
-fn fix_blockhash_before_agreement_point() {
-    let chain1 = local_chain![(0, h!("im-wrong")), (1, h!("we-agree"))];
-    let chain2 = local_chain![(0, h!("fix")), (1, h!("we-agree"))];
-
-    assert_eq!(
-        chain1.determine_changeset(&chain2),
-        Ok([(0, Some(h!("fix")))].into())
-    )
-}
-
-/// B and C are in both chain and update
-/// ```
-///        | 0 | 1 | 2 | 3 | 4
-/// chain  |     B   C
-/// update | A   B   C   D
-/// ```
-/// This should succeed with the point of agreement being C and A should be added in addition.
-#[test]
-fn two_points_of_agreement() {
-    let chain1 = local_chain![(1, h!("B")), (2, h!("C"))];
-    let chain2 = local_chain![(0, h!("A")), (1, h!("B")), (2, h!("C")), (3, h!("D"))];
-
-    assert_eq!(
-        chain1.determine_changeset(&chain2),
-        Ok([(0, Some(h!("A"))), (3, Some(h!("D")))].into()),
-    );
-}
-
-/// Update and chain does not connect:
-/// ```
-///        | 0 | 1 | 2 | 3 | 4
-/// chain  |     B   C
-/// update | A   B       D
-/// ```
-/// This should fail as we cannot figure out whether C & D are on the same chain
-#[test]
-fn update_and_chain_does_not_connect() {
-    let chain1 = local_chain![(1, h!("B")), (2, h!("C"))];
-    let chain2 = local_chain![(0, h!("A")), (1, h!("B")), (3, h!("D"))];
-
-    assert_eq!(
-        chain1.determine_changeset(&chain2),
-        Err(UpdateNotConnectedError(2)),
-    );
+#[derive(Debug)]
+struct TestLocalChain<'a> {
+    name: &'static str,
+    chain: LocalChain,
+    update: Update,
+    exp: ExpectedResult<'a>,
 }
 
-/// Transient invalidation:
-/// ```
-///        | 0 | 1 | 2 | 3 | 4 | 5
-/// chain  | A       B   C       E
-/// update | A       B'  C'  D
-/// ```
-/// This should succeed and invalidate B,C and E with point of agreement being A.
-#[test]
-fn transitive_invalidation_applies_to_checkpoints_higher_than_invalidation() {
-    let chain1 = local_chain![(0, h!("A")), (2, h!("B")), (3, h!("C")), (5, h!("E"))];
-    let chain2 = local_chain![(0, h!("A")), (2, h!("B'")), (3, h!("C'")), (4, h!("D"))];
-
-    assert_eq!(
-        chain1.determine_changeset(&chain2),
-        Ok([
-            (2, Some(h!("B'"))),
-            (3, Some(h!("C'"))),
-            (4, Some(h!("D"))),
-            (5, None),
-        ]
-        .into())
-    );
+#[derive(Debug, PartialEq)]
+enum ExpectedResult<'a> {
+    Ok {
+        changeset: &'a [(u32, Option<BlockHash>)],
+        init_changeset: &'a [(u32, Option<BlockHash>)],
+    },
+    Err(CannotConnectError),
 }
 
-/// Transient invalidation:
-/// ```
-///        | 0 | 1 | 2 | 3 | 4
-/// chain  |     B   C       E
-/// update |     B'  C'  D
-/// ```
-///
-/// This should succeed and invalidate B, C and E with no point of agreement
-#[test]
-fn transitive_invalidation_applies_to_checkpoints_higher_than_invalidation_no_point_of_agreement() {
-    let chain1 = local_chain![(1, h!("B")), (2, h!("C")), (4, h!("E"))];
-    let chain2 = local_chain![(1, h!("B'")), (2, h!("C'")), (3, h!("D"))];
+impl<'a> TestLocalChain<'a> {
+    fn run(mut self) {
+        println!("[TestLocalChain] test: {}", self.name);
+        let got_changeset = match self.chain.apply_update(self.update) {
+            Ok(changeset) => changeset,
+            Err(got_err) => {
+                assert_eq!(
+                    ExpectedResult::Err(got_err),
+                    self.exp,
+                    "{}: unexpected error",
+                    self.name
+                );
+                return;
+            }
+        };
 
-    assert_eq!(
-        chain1.determine_changeset(&chain2),
-        Ok([
-            (1, Some(h!("B'"))),
-            (2, Some(h!("C'"))),
-            (3, Some(h!("D"))),
-            (4, None)
-        ]
-        .into())
-    )
+        match self.exp {
+            ExpectedResult::Ok {
+                changeset,
+                init_changeset,
+            } => {
+                assert_eq!(
+                    got_changeset,
+                    changeset.iter().cloned().collect(),
+                    "{}: unexpected changeset",
+                    self.name
+                );
+                assert_eq!(
+                    self.chain.initial_changeset(),
+                    init_changeset.iter().cloned().collect(),
+                    "{}: unexpected initial changeset",
+                    self.name
+                );
+            }
+            ExpectedResult::Err(err) => panic!(
+                "{}: expected error ({}), got non-error result: {:?}",
+                self.name, err, got_changeset
+            ),
+        }
+    }
 }
 
-/// Transient invalidation:
-/// ```
-///        | 0 | 1 | 2 | 3 | 4
-/// chain  | A   B   C       E
-/// update |     B'  C'  D
-/// ```
-///
-/// This should fail since although it tells us that B and C are invalid it doesn't tell us whether
-/// A was invalid.
 #[test]
-fn invalidation_but_no_connection() {
-    let chain1 = local_chain![(0, h!("A")), (1, h!("B")), (2, h!("C")), (4, h!("E"))];
-    let chain2 = local_chain![(1, h!("B'")), (2, h!("C'")), (3, h!("D"))];
-
-    assert_eq!(
-        chain1.determine_changeset(&chain2),
-        Err(UpdateNotConnectedError(0))
-    )
+fn update_local_chain() {
+    [
+        TestLocalChain {
+            name: "add first tip",
+            chain: local_chain![],
+            update: chain_update![(0, h!("A"))],
+            exp: ExpectedResult::Ok {
+                changeset: &[(0, Some(h!("A")))],
+                init_changeset: &[(0, Some(h!("A")))],
+            },
+        },
+        TestLocalChain {
+            name: "add second tip",
+            chain: local_chain![(0, h!("A"))],
+            update: chain_update![(0, h!("A")), (1, h!("B"))],
+            exp: ExpectedResult::Ok {
+                changeset: &[(1, Some(h!("B")))],
+                init_changeset: &[(0, Some(h!("A"))), (1, Some(h!("B")))],
+            },
+        },
+        TestLocalChain {
+            name: "two disjoint chains cannot merge",
+            chain: local_chain![(0, h!("A"))],
+            update: chain_update![(1, h!("B"))],
+            exp: ExpectedResult::Err(CannotConnectError {
+                try_include_height: 0,
+            }),
+        },
+        TestLocalChain {
+            name: "two disjoint chains cannot merge (existing chain longer)",
+            chain: local_chain![(1, h!("A"))],
+            update: chain_update![(0, h!("B"))],
+            exp: ExpectedResult::Err(CannotConnectError {
+                try_include_height: 1,
+            }),
+        },
+        TestLocalChain {
+            name: "duplicate chains should merge",
+            chain: local_chain![(0, h!("A"))],
+            update: chain_update![(0, h!("A"))],
+            exp: ExpectedResult::Ok {
+                changeset: &[],
+                init_changeset: &[(0, Some(h!("A")))],
+            },
+        },
+        // Introduce an older checkpoint (B)
+        //        | 0 | 1 | 2 | 3
+        // chain  |         C   D
+        // update |     B   C
+        TestLocalChain {
+            name: "can introduce older checkpoint",
+            chain: local_chain![(2, h!("C")), (3, h!("D"))],
+            update: chain_update![(1, h!("B")), (2, h!("C"))],
+            exp: ExpectedResult::Ok {
+                changeset: &[(1, Some(h!("B")))],
+                init_changeset: &[(1, Some(h!("B"))), (2, Some(h!("C"))), (3, Some(h!("D")))],
+            },
+        },
+        // Introduce an older checkpoint (A) that is not directly behind PoA
+        //        | 1 | 2 | 3
+        // chain  |     B   C
+        // update | A       C
+        TestLocalChain {
+            name: "can introduce older checkpoint 2",
+            chain: local_chain![(3, h!("B")), (4, h!("C"))],
+            update: chain_update![(2, h!("A")), (4, h!("C"))],
+            exp: ExpectedResult::Ok {
+                changeset: &[(2, Some(h!("A")))],
+                init_changeset: &[(2, Some(h!("A"))), (3, Some(h!("B"))), (4, Some(h!("C")))],
+            }
+        },
+        // Introduce an older checkpoint (B) that is not the oldest checkpoint
+        //        | 1 | 2 | 3
+        // chain  | A       C
+        // update |     B   C
+        TestLocalChain {
+            name: "can introduce older checkpoint 3",
+            chain: local_chain![(1, h!("A")), (3, h!("C"))],
+            update: chain_update![(2, h!("B")), (3, h!("C"))],
+            exp: ExpectedResult::Ok {
+                changeset: &[(2, Some(h!("B")))],
+                init_changeset: &[(1, Some(h!("A"))), (2, Some(h!("B"))), (3, Some(h!("C")))],
+            }
+        },
+        // Introduce two older checkpoints below the PoA
+        //        | 1 | 2 | 3
+        // chain  |         C
+        // update | A   B   C
+        TestLocalChain {
+            name: "introduce two older checkpoints below PoA",
+            chain: local_chain![(3, h!("C"))],
+            update: chain_update![(1, h!("A")), (2, h!("B")), (3, h!("C"))],
+            exp: ExpectedResult::Ok {
+                changeset: &[(1, Some(h!("A"))), (2, Some(h!("B")))],
+                init_changeset: &[(1, Some(h!("A"))), (2, Some(h!("B"))), (3, Some(h!("C")))],
+            },
+        },
+        TestLocalChain {
+            name: "fix blockhash before agreement point",
+            chain: local_chain![(0, h!("im-wrong")), (1, h!("we-agree"))],
+            update: chain_update![(0, h!("fix")), (1, h!("we-agree"))],
+            exp: ExpectedResult::Ok {
+                changeset: &[(0, Some(h!("fix")))],
+                init_changeset: &[(0, Some(h!("fix"))), (1, Some(h!("we-agree")))],
+            },
+        },
+        // B and C are in both chain and update
+        //        | 0 | 1 | 2 | 3 | 4
+        // chain  |     B   C
+        // update | A   B   C   D
+        // This should succeed with the point of agreement being C and A should be added in addition.
+        TestLocalChain {
+            name: "two points of agreement",
+            chain: local_chain![(1, h!("B")), (2, h!("C"))],
+            update: chain_update![(0, h!("A")), (1, h!("B")), (2, h!("C")), (3, h!("D"))],
+            exp: ExpectedResult::Ok {
+                changeset: &[(0, Some(h!("A"))), (3, Some(h!("D")))],
+                init_changeset: &[
+                    (0, Some(h!("A"))),
+                    (1, Some(h!("B"))),
+                    (2, Some(h!("C"))),
+                    (3, Some(h!("D"))),
+                ],
+            },
+        },
+        // Update and chain does not connect:
+        //        | 0 | 1 | 2 | 3 | 4
+        // chain  |     B   C
+        // update | A   B       D
+        // This should fail as we cannot figure out whether C & D are on the same chain
+        TestLocalChain {
+            name: "update and chain does not connect",
+            chain: local_chain![(1, h!("B")), (2, h!("C"))],
+            update: chain_update![(0, h!("A")), (1, h!("B")), (3, h!("D"))],
+            exp: ExpectedResult::Err(CannotConnectError {
+                try_include_height: 2,
+            }),
+        },
+        // Transient invalidation:
+        //        | 0 | 1 | 2 | 3 | 4 | 5
+        // chain  | A       B   C       E
+        // update | A       B'  C'  D
+        // This should succeed and invalidate B,C and E with point of agreement being A.
+        TestLocalChain {
+            name: "transitive invalidation applies to checkpoints higher than invalidation",
+            chain: local_chain![(0, h!("A")), (2, h!("B")), (3, h!("C")), (5, h!("E"))],
+            update: chain_update![(0, h!("A")), (2, h!("B'")), (3, h!("C'")), (4, h!("D"))],
+            exp: ExpectedResult::Ok {
+                changeset: &[
+                    (2, Some(h!("B'"))),
+                    (3, Some(h!("C'"))),
+                    (4, Some(h!("D"))),
+                    (5, None),
+                ],
+                init_changeset: &[
+                    (0, Some(h!("A"))),
+                    (2, Some(h!("B'"))),
+                    (3, Some(h!("C'"))),
+                    (4, Some(h!("D"))),
+                ],
+            },
+        },
+        // Transient invalidation:
+        //        | 0 | 1 | 2 | 3 | 4
+        // chain  |     B   C       E
+        // update |     B'  C'  D
+        // This should succeed and invalidate B, C and E with no point of agreement
+        TestLocalChain {
+            name: "transitive invalidation applies to checkpoints higher than invalidation no point of agreement",
+            chain: local_chain![(1, h!("B")), (2, h!("C")), (4, h!("E"))],
+            update: chain_update![(1, h!("B'")), (2, h!("C'")), (3, h!("D"))],
+            exp: ExpectedResult::Ok {
+                changeset: &[
+                    (1, Some(h!("B'"))),
+                    (2, Some(h!("C'"))),
+                    (3, Some(h!("D"))),
+                    (4, None)
+                ],
+                init_changeset: &[
+                    (1, Some(h!("B'"))),
+                    (2, Some(h!("C'"))),
+                    (3, Some(h!("D"))),
+                ],
+            },
+        },
+        // Transient invalidation:
+        //        | 0 | 1 | 2 | 3 | 4
+        // chain  | A   B   C       E
+        // update |     B'  C'  D
+        // This should fail since although it tells us that B and C are invalid it doesn't tell us whether
+        // A was invalid.
+        TestLocalChain {
+            name: "invalidation but no connection",
+            chain: local_chain![(0, h!("A")), (1, h!("B")), (2, h!("C")), (4, h!("E"))],
+            update: chain_update![(1, h!("B'")), (2, h!("C'")), (3, h!("D"))],
+            exp: ExpectedResult::Err(CannotConnectError { try_include_height: 0 }),
+        },
+        // Introduce blocks between two points of agreement
+        //        | 0 | 1 | 2 | 3 | 4 | 5
+        // chain  | A   B       D   E
+        // update | A       C       E   F
+        TestLocalChain {
+            name: "introduce blocks between two points of agreement",
+            chain: local_chain![(0, h!("A")), (1, h!("B")), (3, h!("D")), (4, h!("E"))],
+            update: chain_update![(0, h!("A")), (2, h!("C")), (4, h!("E")), (5, h!("F"))],
+            exp: ExpectedResult::Ok {
+                changeset: &[
+                    (2, Some(h!("C"))),
+                    (5, Some(h!("F"))),
+                ],
+                init_changeset: &[
+                    (0, Some(h!("A"))),
+                    (1, Some(h!("B"))),
+                    (2, Some(h!("C"))),
+                    (3, Some(h!("D"))),
+                    (4, Some(h!("E"))),
+                    (5, Some(h!("F"))),
+                ],
+            },
+        },
+    ]
+    .into_iter()
+    .for_each(TestLocalChain::run);
 }
 
 #[test]
-fn insert_block() {
+fn local_chain_insert_block() {
     struct TestCase {
         original: LocalChain,
         insert: (u32, BlockHash),
-        expected_result: Result<ChangeSet, InsertBlockNotMatchingError>,
+        expected_result: Result<ChangeSet, InsertBlockError>,
         expected_final: LocalChain,
     }
 
@@ -206,7 +326,7 @@ fn insert_block() {
         TestCase {
             original: local_chain![(2, h!("K"))],
             insert: (2, h!("J")),
-            expected_result: Err(InsertBlockNotMatchingError {
+            expected_result: Err(InsertBlockError {
                 height: 2,
                 original_hash: h!("K"),
                 update_hash: h!("J"),
index c272f97aaa070adacdc24dbcf5eaec573d401946..bbffdaf31d0ca1ad8c755c8b683b2be2d5dd33ca 100644 (file)
@@ -697,7 +697,7 @@ fn test_chain_spends() {
             let _ = graph.insert_anchor(
                 tx.txid(),
                 ConfirmationHeightAnchor {
-                    anchor_block: tip,
+                    anchor_block: tip.block_id(),
                     confirmation_height: *ht,
                 },
             );
@@ -705,10 +705,10 @@ fn test_chain_spends() {
 
     // Assert that confirmed spends are returned correctly.
     assert_eq!(
-        graph.get_chain_spend(&local_chain, tip, OutPoint::new(tx_0.txid(), 0)),
+        graph.get_chain_spend(&local_chain, tip.block_id(), OutPoint::new(tx_0.txid(), 0)),
         Some((
             ChainPosition::Confirmed(&ConfirmationHeightAnchor {
-                anchor_block: tip,
+                anchor_block: tip.block_id(),
                 confirmation_height: 98
             }),
             tx_1.txid(),
@@ -717,17 +717,17 @@ fn test_chain_spends() {
 
     // Check if chain position is returned correctly.
     assert_eq!(
-        graph.get_chain_position(&local_chain, tip, tx_0.txid()),
+        graph.get_chain_position(&local_chain, tip.block_id(), tx_0.txid()),
         // Some(ObservedAs::Confirmed(&local_chain.get_block(95).expect("block expected"))),
         Some(ChainPosition::Confirmed(&ConfirmationHeightAnchor {
-            anchor_block: tip,
+            anchor_block: tip.block_id(),
             confirmation_height: 95
         }))
     );
 
     // Even if unconfirmed tx has a last_seen of 0, it can still be part of a chain spend.
     assert_eq!(
-        graph.get_chain_spend(&local_chain, tip, OutPoint::new(tx_0.txid(), 1)),
+        graph.get_chain_spend(&local_chain, tip.block_id(), OutPoint::new(tx_0.txid(), 1)),
         Some((ChainPosition::Unconfirmed(0), tx_2.txid())),
     );
 
@@ -737,7 +737,7 @@ fn test_chain_spends() {
     // Check chain spend returned correctly.
     assert_eq!(
         graph
-            .get_chain_spend(&local_chain, tip, OutPoint::new(tx_0.txid(), 1))
+            .get_chain_spend(&local_chain, tip.block_id(), OutPoint::new(tx_0.txid(), 1))
             .unwrap(),
         (ChainPosition::Unconfirmed(1234567), tx_2.txid())
     );
@@ -754,7 +754,7 @@ fn test_chain_spends() {
 
     // Because this tx conflicts with an already confirmed transaction, chain position should return none.
     assert!(graph
-        .get_chain_position(&local_chain, tip, tx_1_conflict.txid())
+        .get_chain_position(&local_chain, tip.block_id(), tx_1_conflict.txid())
         .is_none());
 
     // Another conflicting tx that conflicts with tx_2.
@@ -773,7 +773,7 @@ fn test_chain_spends() {
     // This should return a valid observation with correct last seen.
     assert_eq!(
         graph
-            .get_chain_position(&local_chain, tip, tx_2_conflict.txid())
+            .get_chain_position(&local_chain, tip.block_id(), tx_2_conflict.txid())
             .expect("position expected"),
         ChainPosition::Unconfirmed(1234568)
     );
@@ -781,14 +781,14 @@ fn test_chain_spends() {
     // Chain_spend now catches the new transaction as the spend.
     assert_eq!(
         graph
-            .get_chain_spend(&local_chain, tip, OutPoint::new(tx_0.txid(), 1))
+            .get_chain_spend(&local_chain, tip.block_id(), OutPoint::new(tx_0.txid(), 1))
             .expect("expect observation"),
         (ChainPosition::Unconfirmed(1234568), tx_2_conflict.txid())
     );
 
     // Chain position of the `tx_2` is now none, as it is older than `tx_2_conflict`
     assert!(graph
-        .get_chain_position(&local_chain, tip, tx_2.txid())
+        .get_chain_position(&local_chain, tip.block_id(), tx_2.txid())
         .is_none());
 }
 
index 1ec44d85cf1c6a29aec2f06a4b5c1aca86191f1b..62f7aa7e33e2b3f1532c86c78bff883bb31fc248 100644 (file)
@@ -1,34 +1,46 @@
 use bdk_chain::{
-    bitcoin::{hashes::hex::FromHex, BlockHash, OutPoint, Script, Transaction, Txid},
+    bitcoin::{hashes::hex::FromHex, OutPoint, Script, Transaction, Txid},
     keychain::LocalUpdate,
-    local_chain::LocalChain,
+    local_chain::{self, CheckPoint},
     tx_graph::{self, TxGraph},
     Anchor, BlockId, ConfirmationHeightAnchor, ConfirmationTimeAnchor,
 };
-use electrum_client::{Client, ElectrumApi, Error};
+use electrum_client::{Client, ElectrumApi, Error, HeaderNotification};
 use std::{
     collections::{BTreeMap, BTreeSet, HashMap, HashSet},
     fmt::Debug,
 };
 
+/// We assume that a block of this depth and deeper cannot be reorged.
+const ASSUME_FINAL_DEPTH: u32 = 8;
+
+/// Represents an update fetched from an Electrum server, but excludes full transactions.
+///
+/// To provide a complete update to [`TxGraph`], you'll need to call [`Self::missing_full_txs`] to
+/// determine the full transactions missing from [`TxGraph`]. Then call [`Self::finalize`] to fetch
+/// the full transactions from Electrum and finalize the update.
 #[derive(Debug, Clone)]
 pub struct ElectrumUpdate<K, A> {
+    /// Map of [`Txid`]s to associated [`Anchor`]s.
     pub graph_update: HashMap<Txid, BTreeSet<A>>,
-    pub chain_update: LocalChain,
+    /// The latest chain tip, as seen by the Electrum server.
+    pub new_tip: local_chain::CheckPoint,
+    /// Last-used index update for [`KeychainTxOutIndex`](bdk_chain::keychain::KeychainTxOutIndex).
     pub keychain_update: BTreeMap<K, u32>,
 }
 
-impl<K, A> Default for ElectrumUpdate<K, A> {
-    fn default() -> Self {
+impl<K, A: Anchor> ElectrumUpdate<K, A> {
+    fn new(new_tip: local_chain::CheckPoint) -> Self {
         Self {
-            graph_update: Default::default(),
-            chain_update: Default::default(),
-            keychain_update: Default::default(),
+            new_tip,
+            graph_update: HashMap::new(),
+            keychain_update: BTreeMap::new(),
         }
     }
-}
 
-impl<K, A: Anchor> ElectrumUpdate<K, A> {
+    /// Determine the full transactions that are missing from `graph`.
+    ///
+    /// Refer to [`ElectrumUpdate`].
     pub fn missing_full_txs<A2>(&self, graph: &TxGraph<A2>) -> Vec<Txid> {
         self.graph_update
             .keys()
@@ -37,6 +49,9 @@ impl<K, A: Anchor> ElectrumUpdate<K, A> {
             .collect()
     }
 
+    /// Finalizes update with `missing` txids to fetch from `client`.
+    ///
+    /// Refer to [`ElectrumUpdate`].
     pub fn finalize(
         self,
         client: &Client,
@@ -56,7 +71,10 @@ impl<K, A: Anchor> ElectrumUpdate<K, A> {
         Ok(LocalUpdate {
             keychain: self.keychain_update,
             graph: graph_update,
-            chain: self.chain_update,
+            chain: local_chain::Update {
+                tip: self.new_tip,
+                introduce_older_blocks: true,
+            },
         })
     }
 }
@@ -75,6 +93,7 @@ impl<K> ElectrumUpdate<K, ConfirmationHeightAnchor> {
         missing: Vec<Txid>,
     ) -> Result<LocalUpdate<K, ConfirmationTimeAnchor>, Error> {
         let update = self.finalize(client, seen_at, missing)?;
+        // client.batch_transaction_get(txid)
 
         let relevant_heights = {
             let mut visited_heights = HashSet::new();
@@ -133,12 +152,22 @@ impl<K> ElectrumUpdate<K, ConfirmationHeightAnchor> {
     }
 }
 
+/// Trait to extend [`Client`] functionality.
 pub trait ElectrumExt<A> {
-    fn get_tip(&self) -> Result<(u32, BlockHash), Error>;
-
+    /// Scan the blockchain (via electrum) for the data specified and returns a [`ElectrumUpdate`].
+    ///
+    /// - `prev_tip`: the most recent blockchain tip present locally
+    /// - `keychain_spks`: keychains that we want to scan transactions for
+    /// - `txids`: transactions for which we want updated [`Anchor`]s
+    /// - `outpoints`: transactions associated with these outpoints (residing, spending) that we
+    ///     want to included in the update
+    ///
+    /// The scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated
+    /// transactions. `batch_size` specifies the max number of script pubkeys to request for in a
+    /// single batch request.
     fn scan<K: Ord + Clone>(
         &self,
-        local_chain: &BTreeMap<u32, BlockHash>,
+        prev_tip: Option<CheckPoint>,
         keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
         txids: impl IntoIterator<Item = Txid>,
         outpoints: impl IntoIterator<Item = OutPoint>,
@@ -146,9 +175,12 @@ pub trait ElectrumExt<A> {
         batch_size: usize,
     ) -> Result<ElectrumUpdate<K, A>, Error>;
 
+    /// Convenience method to call [`scan`] without requiring a keychain.
+    ///
+    /// [`scan`]: ElectrumExt::scan
     fn scan_without_keychain(
         &self,
-        local_chain: &BTreeMap<u32, BlockHash>,
+        prev_tip: Option<CheckPoint>,
         misc_spks: impl IntoIterator<Item = Script>,
         txids: impl IntoIterator<Item = Txid>,
         outpoints: impl IntoIterator<Item = OutPoint>,
@@ -160,7 +192,7 @@ pub trait ElectrumExt<A> {
             .map(|(i, spk)| (i as u32, spk));
 
         self.scan(
-            local_chain,
+            prev_tip,
             [((), spk_iter)].into(),
             txids,
             outpoints,
@@ -171,15 +203,9 @@ pub trait ElectrumExt<A> {
 }
 
 impl ElectrumExt<ConfirmationHeightAnchor> for Client {
-    fn get_tip(&self) -> Result<(u32, BlockHash), Error> {
-        // TODO: unsubscribe when added to the client, or is there a better call to use here?
-        self.block_headers_subscribe()
-            .map(|data| (data.height as u32, data.header.block_hash()))
-    }
-
     fn scan<K: Ord + Clone>(
         &self,
-        local_chain: &BTreeMap<u32, BlockHash>,
+        prev_tip: Option<CheckPoint>,
         keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
         txids: impl IntoIterator<Item = Txid>,
         outpoints: impl IntoIterator<Item = OutPoint>,
@@ -196,20 +222,20 @@ impl ElectrumExt<ConfirmationHeightAnchor> for Client {
         let outpoints = outpoints.into_iter().collect::<Vec<_>>();
 
         let update = loop {
-            let mut update = ElectrumUpdate::<K, ConfirmationHeightAnchor> {
-                chain_update: prepare_chain_update(self, local_chain)?,
-                ..Default::default()
-            };
-            let anchor_block = update
-                .chain_update
-                .tip()
-                .expect("must have atleast one block");
+            let (tip, _) = construct_update_tip(self, prev_tip.clone())?;
+            let mut update = ElectrumUpdate::<K, ConfirmationHeightAnchor>::new(tip.clone());
+            let cps = update
+                .new_tip
+                .iter()
+                .take(10)
+                .map(|cp| (cp.height(), cp))
+                .collect::<BTreeMap<u32, CheckPoint>>();
 
             if !request_spks.is_empty() {
                 if !scanned_spks.is_empty() {
                     scanned_spks.append(&mut populate_with_spks(
                         self,
-                        anchor_block,
+                        &cps,
                         &mut update,
                         &mut scanned_spks
                             .iter()
@@ -222,7 +248,7 @@ impl ElectrumExt<ConfirmationHeightAnchor> for Client {
                     scanned_spks.extend(
                         populate_with_spks(
                             self,
-                            anchor_block,
+                            &cps,
                             &mut update,
                             keychain_spks,
                             stop_gap,
@@ -234,20 +260,14 @@ impl ElectrumExt<ConfirmationHeightAnchor> for Client {
                 }
             }
 
-            populate_with_txids(self, anchor_block, &mut update, &mut txids.iter().cloned())?;
+            populate_with_txids(self, &cps, &mut update, &mut txids.iter().cloned())?;
 
-            let _txs = populate_with_outpoints(
-                self,
-                anchor_block,
-                &mut update,
-                &mut outpoints.iter().cloned(),
-            )?;
+            let _txs =
+                populate_with_outpoints(self, &cps, &mut update, &mut outpoints.iter().cloned())?;
 
             // check for reorgs during scan process
-            let server_blockhash = self
-                .block_header(anchor_block.height as usize)?
-                .block_hash();
-            if anchor_block.hash != server_blockhash {
+            let server_blockhash = self.block_header(tip.height() as usize)?.block_hash();
+            if tip.hash() != server_blockhash {
                 continue; // reorg
             }
 
@@ -268,46 +288,86 @@ impl ElectrumExt<ConfirmationHeightAnchor> for Client {
     }
 }
 
-/// Prepare an update "template" based on the checkpoints of the `local_chain`.
-fn prepare_chain_update(
+/// Return a [`CheckPoint`] of the latest tip, that connects with `prev_tip`.
+fn construct_update_tip(
     client: &Client,
-    local_chain: &BTreeMap<u32, BlockHash>,
-) -> Result<LocalChain, Error> {
-    let mut update = LocalChain::default();
-
-    // Find the local chain block that is still there so our update can connect to the local chain.
-    for (&existing_height, &existing_hash) in local_chain.iter().rev() {
-        // TODO: a batch request may be safer, as a reorg that happens when we are obtaining
-        //       `block_header`s will result in inconsistencies
-        let current_hash = client.block_header(existing_height as usize)?.block_hash();
-        let _ = update
-            .insert_block(BlockId {
-                height: existing_height,
-                hash: current_hash,
-            })
-            .expect("This never errors because we are working with a fresh chain");
-
-        if current_hash == existing_hash {
-            break;
+    prev_tip: Option<CheckPoint>,
+) -> Result<(CheckPoint, Option<u32>), Error> {
+    let HeaderNotification { height, .. } = client.block_headers_subscribe()?;
+    let new_tip_height = height as u32;
+
+    // If electrum returns a tip height that is lower than our previous tip, then checkpoints do
+    // not need updating. We just return the previous tip and use that as the point of agreement.
+    if let Some(prev_tip) = prev_tip.as_ref() {
+        if new_tip_height < prev_tip.height() {
+            return Ok((prev_tip.clone(), Some(prev_tip.height())));
         }
     }
 
-    // Insert the new tip so new transactions will be accepted into the sparsechain.
-    let tip = {
-        let (height, hash) = crate::get_tip(client)?;
-        BlockId { height, hash }
+    // Atomically fetch the latest `ASSUME_FINAL_DEPTH` count of blocks from Electrum. We use this
+    // to construct our checkpoint update.
+    let mut new_blocks = {
+        let start_height = new_tip_height.saturating_sub(ASSUME_FINAL_DEPTH);
+        let hashes = client
+            .block_headers(start_height as _, ASSUME_FINAL_DEPTH as _)?
+            .headers
+            .into_iter()
+            .map(|h| h.block_hash());
+        (start_height..).zip(hashes).collect::<BTreeMap<u32, _>>()
     };
-    if update.insert_block(tip).is_err() {
-        // There has been a re-org before we even begin scanning addresses.
-        // Just recursively call (this should never happen).
-        return prepare_chain_update(client, local_chain);
-    }
 
-    Ok(update)
+    // Find the "point of agreement" (if any).
+    let agreement_cp = {
+        let mut agreement_cp = Option::<CheckPoint>::None;
+        for cp in prev_tip.iter().flat_map(CheckPoint::iter) {
+            let cp_block = cp.block_id();
+            let hash = match new_blocks.get(&cp_block.height) {
+                Some(&hash) => hash,
+                None => {
+                    assert!(
+                        new_tip_height >= cp_block.height,
+                        "already checked that electrum's tip cannot be smaller"
+                    );
+                    let hash = client.block_header(cp_block.height as _)?.block_hash();
+                    new_blocks.insert(cp_block.height, hash);
+                    hash
+                }
+            };
+            if hash == cp_block.hash {
+                agreement_cp = Some(cp);
+                break;
+            }
+        }
+        agreement_cp
+    };
+
+    let agreement_height = agreement_cp.as_ref().map(CheckPoint::height);
+
+    let new_tip = new_blocks
+        .into_iter()
+        // Prune `new_blocks` to only include blocks that are actually new.
+        .filter(|(height, _)| Some(*height) > agreement_height)
+        .map(|(height, hash)| BlockId { height, hash })
+        .fold(agreement_cp, |prev_cp, block| {
+            Some(match prev_cp {
+                Some(cp) => cp.push(block).expect("must extend checkpoint"),
+                None => CheckPoint::new(block),
+            })
+        })
+        .expect("must have at least one checkpoint");
+
+    Ok((new_tip, agreement_height))
 }
 
+/// A [tx status] comprises of a concatenation of `tx_hash:height:`s. We transform a single one of
+/// these concatenations into a [`ConfirmationHeightAnchor`] if possible.
+///
+/// We use the lowest possible checkpoint as the anchor block (from `cps`). If an anchor block
+/// cannot be found, or the transaction is unconfirmed, [`None`] is returned.
+///
+/// [tx status](https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-basics.html#status)
 fn determine_tx_anchor(
-    anchor_block: BlockId,
+    cps: &BTreeMap<u32, CheckPoint>,
     raw_height: i32,
     txid: Txid,
 ) -> Option<ConfirmationHeightAnchor> {
@@ -319,6 +379,7 @@ fn determine_tx_anchor(
         == Txid::from_hex("4a5e1e4baab89f3a32518a88c31bc87f618f76673e2cc77ab2127b7afdeda33b")
             .expect("must deserialize genesis coinbase txid")
     {
+        let anchor_block = cps.values().next()?.block_id();
         return Some(ConfirmationHeightAnchor {
             anchor_block,
             confirmation_height: 0,
@@ -331,6 +392,7 @@ fn determine_tx_anchor(
         }
         h => {
             let h = h as u32;
+            let anchor_block = cps.range(h..).next().map(|(_, cp)| cp.block_id())?;
             if h > anchor_block.height {
                 None
             } else {
@@ -345,7 +407,7 @@ fn determine_tx_anchor(
 
 fn populate_with_outpoints<K>(
     client: &Client,
-    anchor_block: BlockId,
+    cps: &BTreeMap<u32, CheckPoint>,
     update: &mut ElectrumUpdate<K, ConfirmationHeightAnchor>,
     outpoints: &mut impl Iterator<Item = OutPoint>,
 ) -> Result<HashMap<Txid, Transaction>, Error> {
@@ -394,7 +456,7 @@ fn populate_with_outpoints<K>(
                 }
             };
 
-            let anchor = determine_tx_anchor(anchor_block, res.height, res.tx_hash);
+            let anchor = determine_tx_anchor(cps, res.height, res.tx_hash);
 
             let tx_entry = update.graph_update.entry(res.tx_hash).or_default();
             if let Some(anchor) = anchor {
@@ -407,7 +469,7 @@ fn populate_with_outpoints<K>(
 
 fn populate_with_txids<K>(
     client: &Client,
-    anchor_block: BlockId,
+    cps: &BTreeMap<u32, CheckPoint>,
     update: &mut ElectrumUpdate<K, ConfirmationHeightAnchor>,
     txids: &mut impl Iterator<Item = Txid>,
 ) -> Result<(), Error> {
@@ -429,7 +491,7 @@ fn populate_with_txids<K>(
             .into_iter()
             .find(|r| r.tx_hash == txid)
         {
-            Some(r) => determine_tx_anchor(anchor_block, r.height, txid),
+            Some(r) => determine_tx_anchor(cps, r.height, txid),
             None => continue,
         };
 
@@ -443,7 +505,7 @@ fn populate_with_txids<K>(
 
 fn populate_with_spks<K, I: Ord + Clone>(
     client: &Client,
-    anchor_block: BlockId,
+    cps: &BTreeMap<u32, CheckPoint>,
     update: &mut ElectrumUpdate<K, ConfirmationHeightAnchor>,
     spks: &mut impl Iterator<Item = (I, Script)>,
     stop_gap: usize,
@@ -477,7 +539,7 @@ fn populate_with_spks<K, I: Ord + Clone>(
 
             for tx in spk_history {
                 let tx_entry = update.graph_update.entry(tx.tx_hash).or_default();
-                if let Some(anchor) = determine_tx_anchor(anchor_block, tx.height, tx.tx_hash) {
+                if let Some(anchor) = determine_tx_anchor(cps, tx.height, tx.tx_hash) {
                     tx_entry.insert(anchor);
                 }
             }
index 4826c6ddadcdc866d7dcbe44fe08a94f729c9890..716c4d3f70c1d23698b23711fcfdec01a54b72dc 100644 (file)
 //!
 //! Refer to [`bdk_electrum_example`] for a complete example.
 //!
-//! [`ElectrumClient::scan`]: ElectrumClient::scan
+//! [`ElectrumClient::scan`]: electrum_client::ElectrumClient::scan
 //! [`missing_full_txs`]: ElectrumUpdate::missing_full_txs
-//! [`batch_transaction_get`]: ElectrumApi::batch_transaction_get
+//! [`batch_transaction_get`]: electrum_client::ElectrumApi::batch_transaction_get
 //! [`bdk_electrum_example`]: https://github.com/LLFourn/bdk_core_staging/tree/master/bdk_electrum_example
 
-use bdk_chain::bitcoin::BlockHash;
-use electrum_client::{Client, ElectrumApi, Error};
+#![warn(missing_docs)]
+
 mod electrum_ext;
 pub use bdk_chain;
 pub use electrum_client;
 pub use electrum_ext::*;
-
-fn get_tip(client: &Client) -> Result<(u32, BlockHash), Error> {
-    // TODO: unsubscribe when added to the client, or is there a better call to use here?
-    client
-        .block_headers_subscribe()
-        .map(|data| (data.height as u32, data.header.block_hash()))
-}
index e496e415c11f175156c2571497df080d063aac89..5de02ffd0dc2d64827cc9c5889756797b5565991 100644 (file)
@@ -1,41 +1,55 @@
 use async_trait::async_trait;
+use bdk_chain::collections::btree_map;
 use bdk_chain::{
     bitcoin::{BlockHash, OutPoint, Script, Txid},
-    collections::BTreeMap,
-    keychain::LocalUpdate,
-    BlockId, ConfirmationTimeAnchor,
+    collections::{BTreeMap, BTreeSet},
+    local_chain::{self, CheckPoint},
+    BlockId, ConfirmationTimeAnchor, TxGraph,
 };
-use esplora_client::{Error, OutputStatus, TxStatus};
+use esplora_client::{Error, TxStatus};
 use futures::{stream::FuturesOrdered, TryStreamExt};
 
-use crate::map_confirmation_time_anchor;
+use crate::{anchor_from_status, ASSUME_FINAL_DEPTH};
 
-/// Trait to extend [`esplora_client::AsyncClient`] functionality.
+/// Trait to extend the functionality of [`esplora_client::AsyncClient`].
 ///
-/// This is the async version of [`EsploraExt`]. Refer to
-/// [crate-level documentation] for more.
+/// Refer to [crate-level documentation] for more.
 ///
-/// [`EsploraExt`]: crate::EsploraExt
 /// [crate-level documentation]: crate
 #[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
 #[cfg_attr(not(target_arch = "wasm32"), async_trait)]
 pub trait EsploraAsyncExt {
-    /// Scan the blockchain (via esplora) for the data specified and returns a
-    /// [`LocalUpdate<K, ConfirmationTimeAnchor>`].
+    /// Prepare an [`LocalChain`] update with blocks fetched from Esplora.
     ///
-    /// - `local_chain`: the most recent block hashes present locally
-    /// - `keychain_spks`: keychains that we want to scan transactions for
-    /// - `txids`: transactions for which we want updated [`ConfirmationTimeAnchor`]s
-    /// - `outpoints`: transactions associated with these outpoints (residing, spending) that we
-    ///     want to included in the update
+    /// * `prev_tip` is the previous tip of [`LocalChain::tip`].
+    /// * `get_heights` is the block heights that we are interested in fetching from Esplora.
+    ///
+    /// The result of this method can be applied to [`LocalChain::apply_update`].
+    ///
+    /// [`LocalChain`]: bdk_chain::local_chain::LocalChain
+    /// [`LocalChain::tip`]: bdk_chain::local_chain::LocalChain::tip
+    /// [`LocalChain::apply_update`]: bdk_chain::local_chain::LocalChain::apply_update
+    #[allow(clippy::result_large_err)]
+    async fn update_local_chain(
+        &self,
+        local_tip: Option<CheckPoint>,
+        request_heights: impl IntoIterator<IntoIter = impl Iterator<Item = u32> + Send> + Send,
+    ) -> Result<local_chain::Update, Error>;
+
+    /// Scan Esplora for the data specified and return a [`TxGraph`] and a map of last active
+    /// indices.
+    ///
+    /// * `keychain_spks`: keychains that we want to scan transactions for
+    /// * `txids`: transactions for which we want updated [`ConfirmationTimeAnchor`]s
+    /// * `outpoints`: transactions associated with these outpoints (residing, spending) that we
+    ///     want to include in the update
     ///
     /// The scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated
     /// transactions. `parallel_requests` specifies the max number of HTTP requests to make in
     /// parallel.
-    #[allow(clippy::result_large_err)] // FIXME
-    async fn scan<K: Ord + Clone + Send>(
+    #[allow(clippy::result_large_err)]
+    async fn update_tx_graph<K: Ord + Clone + Send>(
         &self,
-        local_chain: &BTreeMap<u32, BlockHash>,
         keychain_spks: BTreeMap<
             K,
             impl IntoIterator<IntoIter = impl Iterator<Item = (u32, Script)> + Send> + Send,
@@ -44,22 +58,20 @@ pub trait EsploraAsyncExt {
         outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
         stop_gap: usize,
         parallel_requests: usize,
-    ) -> Result<LocalUpdate<K, ConfirmationTimeAnchor>, Error>;
+    ) -> Result<(TxGraph<ConfirmationTimeAnchor>, BTreeMap<K, u32>), Error>;
 
-    /// Convenience method to call [`scan`] without requiring a keychain.
+    /// Convenience method to call [`update_tx_graph`] without requiring a keychain.
     ///
-    /// [`scan`]: EsploraAsyncExt::scan
-    #[allow(clippy::result_large_err)] // FIXME
-    async fn scan_without_keychain(
+    /// [`update_tx_graph`]: EsploraAsyncExt::update_tx_graph
+    #[allow(clippy::result_large_err)]
+    async fn update_tx_graph_without_keychain(
         &self,
-        local_chain: &BTreeMap<u32, BlockHash>,
         misc_spks: impl IntoIterator<IntoIter = impl Iterator<Item = Script> + Send> + Send,
         txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send> + Send,
         outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
         parallel_requests: usize,
-    ) -> Result<LocalUpdate<(), ConfirmationTimeAnchor>, Error> {
-        self.scan(
-            local_chain,
+    ) -> Result<TxGraph<ConfirmationTimeAnchor>, Error> {
+        self.update_tx_graph(
             [(
                 (),
                 misc_spks
@@ -74,16 +86,123 @@ pub trait EsploraAsyncExt {
             parallel_requests,
         )
         .await
+        .map(|(g, _)| g)
     }
 }
 
 #[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
 #[cfg_attr(not(target_arch = "wasm32"), async_trait)]
 impl EsploraAsyncExt for esplora_client::AsyncClient {
-    #[allow(clippy::result_large_err)] // FIXME
-    async fn scan<K: Ord + Clone + Send>(
+    async fn update_local_chain(
+        &self,
+        local_tip: Option<CheckPoint>,
+        request_heights: impl IntoIterator<IntoIter = impl Iterator<Item = u32> + Send> + Send,
+    ) -> Result<local_chain::Update, Error> {
+        let request_heights = request_heights.into_iter().collect::<BTreeSet<_>>();
+        let new_tip_height = self.get_height().await?;
+
+        // atomically fetch blocks from esplora
+        let mut fetched_blocks = {
+            let heights = (0..=new_tip_height).rev();
+            let hashes = self
+                .get_blocks(Some(new_tip_height))
+                .await?
+                .into_iter()
+                .map(|b| b.id);
+            heights.zip(hashes).collect::<BTreeMap<u32, BlockHash>>()
+        };
+
+        // fetch heights that the caller is interested in
+        for height in request_heights {
+            // do not fetch blocks higher than remote tip
+            if height > new_tip_height {
+                continue;
+            }
+            // only fetch what is missing
+            if let btree_map::Entry::Vacant(entry) = fetched_blocks.entry(height) {
+                let hash = self.get_block_hash(height).await?;
+                entry.insert(hash);
+            }
+        }
+
+        // find the earliest point of agreement between local chain and fetched chain
+        let earliest_agreement_cp = {
+            let mut earliest_agreement_cp = Option::<CheckPoint>::None;
+
+            if let Some(local_tip) = local_tip {
+                let local_tip_height = local_tip.height();
+                for local_cp in local_tip.iter() {
+                    let local_block = local_cp.block_id();
+
+                    // the updated hash (block hash at this height after the update), can either be:
+                    // 1. a block that already existed in `fetched_blocks`
+                    // 2. a block that exists locally and atleast has a depth of ASSUME_FINAL_DEPTH
+                    // 3. otherwise we can freshly fetch the block from remote, which is safe as it
+                    //    is guaranteed that this would be at or below ASSUME_FINAL_DEPTH from the
+                    //    remote tip
+                    let updated_hash = match fetched_blocks.entry(local_block.height) {
+                        btree_map::Entry::Occupied(entry) => *entry.get(),
+                        btree_map::Entry::Vacant(entry) => *entry.insert(
+                            if local_tip_height - local_block.height >= ASSUME_FINAL_DEPTH {
+                                local_block.hash
+                            } else {
+                                self.get_block_hash(local_block.height).await?
+                            },
+                        ),
+                    };
+
+                    // since we may introduce blocks below the point of agreement, we cannot break
+                    // here unconditionally - we only break if we guarantee there are no new heights
+                    // below our current local checkpoint
+                    if local_block.hash == updated_hash {
+                        earliest_agreement_cp = Some(local_cp);
+
+                        let first_new_height = *fetched_blocks
+                            .keys()
+                            .next()
+                            .expect("must have atleast one new block");
+                        if first_new_height >= local_block.height {
+                            break;
+                        }
+                    }
+                }
+            }
+
+            earliest_agreement_cp
+        };
+
+        let tip = {
+            // first checkpoint to use for the update chain
+            let first_cp = match earliest_agreement_cp {
+                Some(cp) => cp,
+                None => {
+                    let (&height, &hash) = fetched_blocks
+                        .iter()
+                        .next()
+                        .expect("must have atleast one new block");
+                    CheckPoint::new(BlockId { height, hash })
+                }
+            };
+            // transform fetched chain into the update chain
+            fetched_blocks
+                // we exclude anything at or below the first cp of the update chain otherwise
+                // building the chain will fail
+                .split_off(&(first_cp.height() + 1))
+                .into_iter()
+                .map(|(height, hash)| BlockId { height, hash })
+                .fold(first_cp, |prev_cp, block| {
+                    prev_cp.push(block).expect("must extend checkpoint")
+                })
+        };
+
+        Ok(local_chain::Update {
+            tip,
+            introduce_older_blocks: true,
+        })
+    }
+
+    async fn update_tx_graph<K: Ord + Clone + Send>(
         &self,
-        local_chain: &BTreeMap<u32, BlockHash>,
         keychain_spks: BTreeMap<
             K,
             impl IntoIterator<IntoIter = impl Iterator<Item = (u32, Script)> + Send> + Send,
@@ -92,178 +211,116 @@ impl EsploraAsyncExt for esplora_client::AsyncClient {
         outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
         stop_gap: usize,
         parallel_requests: usize,
-    ) -> Result<LocalUpdate<K, ConfirmationTimeAnchor>, Error> {
+    ) -> Result<(TxGraph<ConfirmationTimeAnchor>, BTreeMap<K, u32>), Error> {
+        type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>);
         let parallel_requests = Ord::max(parallel_requests, 1);
-
-        let (mut update, tip_at_start) = loop {
-            let mut update = LocalUpdate::<K, ConfirmationTimeAnchor>::default();
-
-            for (&height, &original_hash) in local_chain.iter().rev() {
-                let update_block_id = BlockId {
-                    height,
-                    hash: self.get_block_hash(height).await?,
-                };
-                let _ = update
-                    .chain
-                    .insert_block(update_block_id)
-                    .expect("cannot repeat height here");
-                if update_block_id.hash == original_hash {
-                    break;
-                }
-            }
-
-            let tip_at_start = BlockId {
-                height: self.get_height().await?,
-                hash: self.get_tip_hash().await?,
-            };
-
-            if update.chain.insert_block(tip_at_start).is_ok() {
-                break (update, tip_at_start);
-            }
-        };
+        let mut graph = TxGraph::<ConfirmationTimeAnchor>::default();
+        let mut last_active_indexes = BTreeMap::<K, u32>::new();
 
         for (keychain, spks) in keychain_spks {
             let mut spks = spks.into_iter();
-            let mut last_active_index = None;
-            let mut empty_scripts = 0;
-            type IndexWithTxs = (u32, Vec<esplora_client::Tx>);
+            let mut last_index = Option::<u32>::None;
+            let mut last_active_index = Option::<u32>::None;
 
             loop {
-                let futures = (0..parallel_requests)
-                    .filter_map(|_| {
-                        let (index, script) = spks.next()?;
+                let handles = spks
+                    .by_ref()
+                    .take(parallel_requests)
+                    .map(|(spk_index, spk)| {
                         let client = self.clone();
-                        Some(async move {
-                            let mut related_txs = client.scripthash_txs(&script, None).await?;
-
-                            let n_confirmed =
-                                related_txs.iter().filter(|tx| tx.status.confirmed).count();
-                            // esplora pages on 25 confirmed transactions. If there are 25 or more we
-                            // keep requesting to see if there's more.
-                            if n_confirmed >= 25 {
-                                loop {
-                                    let new_related_txs = client
-                                        .scripthash_txs(
-                                            &script,
-                                            Some(related_txs.last().unwrap().txid),
-                                        )
-                                        .await?;
-                                    let n = new_related_txs.len();
-                                    related_txs.extend(new_related_txs);
-                                    // we've reached the end
-                                    if n < 25 {
-                                        break;
-                                    }
+                        async move {
+                            let mut last_seen = None;
+                            let mut spk_txs = Vec::new();
+                            loop {
+                                let txs = client.scripthash_txs(&spk, last_seen).await?;
+                                let tx_count = txs.len();
+                                last_seen = txs.last().map(|tx| tx.txid);
+                                spk_txs.extend(txs);
+                                if tx_count < 25 {
+                                    break Result::<_, Error>::Ok((spk_index, spk_txs));
                                 }
                             }
-
-                            Result::<_, esplora_client::Error>::Ok((index, related_txs))
-                        })
+                        }
                     })
                     .collect::<FuturesOrdered<_>>();
 
-                let n_futures = futures.len();
+                if handles.is_empty() {
+                    break;
+                }
 
-                for (index, related_txs) in futures.try_collect::<Vec<IndexWithTxs>>().await? {
-                    if related_txs.is_empty() {
-                        empty_scripts += 1;
-                    } else {
+                for (index, txs) in handles.try_collect::<Vec<TxsOfSpkIndex>>().await? {
+                    last_index = Some(index);
+                    if !txs.is_empty() {
                         last_active_index = Some(index);
-                        empty_scripts = 0;
                     }
-                    for tx in related_txs {
-                        let anchor = map_confirmation_time_anchor(&tx.status, tip_at_start);
-
-                        let _ = update.graph.insert_tx(tx.to_tx());
-                        if let Some(anchor) = anchor {
-                            let _ = update.graph.insert_anchor(tx.txid, anchor);
+                    for tx in txs {
+                        let _ = graph.insert_tx(tx.to_tx());
+                        if let Some(anchor) = anchor_from_status(&tx.status) {
+                            let _ = graph.insert_anchor(tx.txid, anchor);
                         }
                     }
                 }
 
-                if n_futures == 0 || empty_scripts >= stop_gap {
+                if last_index > last_active_index.map(|i| i + stop_gap as u32) {
                     break;
                 }
             }
 
             if let Some(last_active_index) = last_active_index {
-                update.keychain.insert(keychain, last_active_index);
+                last_active_indexes.insert(keychain, last_active_index);
             }
         }
 
-        for txid in txids.into_iter() {
-            if update.graph.get_tx(txid).is_none() {
-                match self.get_tx(&txid).await? {
-                    Some(tx) => {
-                        let _ = update.graph.insert_tx(tx);
-                    }
-                    None => continue,
-                }
+        let mut txids = txids.into_iter();
+        loop {
+            let handles = txids
+                .by_ref()
+                .take(parallel_requests)
+                .filter(|&txid| graph.get_tx(txid).is_none())
+                .map(|txid| {
+                    let client = self.clone();
+                    async move { client.get_tx_status(&txid).await.map(|s| (txid, s)) }
+                })
+                .collect::<FuturesOrdered<_>>();
+            // .collect::<Vec<JoinHandle<Result<(Txid, TxStatus), Error>>>>();
+
+            if handles.is_empty() {
+                break;
             }
-            match self.get_tx_status(&txid).await? {
-                tx_status if tx_status.confirmed => {
-                    if let Some(anchor) = map_confirmation_time_anchor(&tx_status, tip_at_start) {
-                        let _ = update.graph.insert_anchor(txid, anchor);
-                    }
+
+            for (txid, status) in handles.try_collect::<Vec<(Txid, TxStatus)>>().await? {
+                if let Some(anchor) = anchor_from_status(&status) {
+                    let _ = graph.insert_anchor(txid, anchor);
                 }
-                _ => continue,
             }
         }
 
         for op in outpoints.into_iter() {
-            let mut op_txs = Vec::with_capacity(2);
-            if let (
-                Some(tx),
-                tx_status @ TxStatus {
-                    confirmed: true, ..
-                },
-            ) = (
-                self.get_tx(&op.txid).await?,
-                self.get_tx_status(&op.txid).await?,
-            ) {
-                op_txs.push((tx, tx_status));
-                if let Some(OutputStatus {
-                    txid: Some(txid),
-                    status: Some(spend_status),
-                    ..
-                }) = self.get_output_status(&op.txid, op.vout as _).await?
-                {
-                    if let Some(spend_tx) = self.get_tx(&txid).await? {
-                        op_txs.push((spend_tx, spend_status));
-                    }
+            if graph.get_tx(op.txid).is_none() {
+                if let Some(tx) = self.get_tx(&op.txid).await? {
+                    let _ = graph.insert_tx(tx);
+                }
+                let status = self.get_tx_status(&op.txid).await?;
+                if let Some(anchor) = anchor_from_status(&status) {
+                    let _ = graph.insert_anchor(op.txid, anchor);
                 }
             }
 
-            for (tx, status) in op_txs {
-                let txid = tx.txid();
-                let anchor = map_confirmation_time_anchor(&status, tip_at_start);
-
-                let _ = update.graph.insert_tx(tx);
-                if let Some(anchor) = anchor {
-                    let _ = update.graph.insert_anchor(txid, anchor);
+            if let Some(op_status) = self.get_output_status(&op.txid, op.vout as _).await? {
+                if let Some(txid) = op_status.txid {
+                    if graph.get_tx(txid).is_none() {
+                        if let Some(tx) = self.get_tx(&txid).await? {
+                            let _ = graph.insert_tx(tx);
+                        }
+                        let status = self.get_tx_status(&txid).await?;
+                        if let Some(anchor) = anchor_from_status(&status) {
+                            let _ = graph.insert_anchor(txid, anchor);
+                        }
+                    }
                 }
             }
         }
 
-        if tip_at_start.hash != self.get_block_hash(tip_at_start.height).await? {
-            // A reorg occurred, so let's find out where all the txids we found are now in the chain
-            let txids_found = update
-                .graph
-                .full_txs()
-                .map(|tx_node| tx_node.txid)
-                .collect::<Vec<_>>();
-            update.chain = EsploraAsyncExt::scan_without_keychain(
-                self,
-                local_chain,
-                [],
-                txids_found,
-                [],
-                parallel_requests,
-            )
-            .await?
-            .chain;
-        }
-
-        Ok(update)
+        Ok((graph, last_active_indexes))
     }
 }
index 6e1c619931a5ffc7e5c3a75143696c8142fbfc1a..ce64db1bd7e8b816163a0fb131981d85d3d90d58 100644 (file)
@@ -1,54 +1,73 @@
-use bdk_chain::bitcoin::{BlockHash, OutPoint, Script, Txid};
-use bdk_chain::collections::BTreeMap;
-use bdk_chain::BlockId;
-use bdk_chain::{keychain::LocalUpdate, ConfirmationTimeAnchor};
-use esplora_client::{Error, OutputStatus, TxStatus};
+use std::thread::JoinHandle;
 
-use crate::map_confirmation_time_anchor;
+use bdk_chain::bitcoin::{OutPoint, Txid};
+use bdk_chain::collections::btree_map;
+use bdk_chain::collections::{BTreeMap, BTreeSet};
+use bdk_chain::{
+    bitcoin::{BlockHash, Script},
+    local_chain::{self, CheckPoint},
+};
+use bdk_chain::{BlockId, ConfirmationTimeAnchor, TxGraph};
+use esplora_client::{Error, TxStatus};
 
-/// Trait to extend [`esplora_client::BlockingClient`] functionality.
+use crate::{anchor_from_status, ASSUME_FINAL_DEPTH};
+
+/// Trait to extend the functionality of [`esplora_client::BlockingClient`].
 ///
 /// Refer to [crate-level documentation] for more.
 ///
 /// [crate-level documentation]: crate
 pub trait EsploraExt {
-    /// Scan the blockchain (via esplora) for the data specified and returns a
-    /// [`LocalUpdate<K, ConfirmationTimeAnchor>`].
+    /// Prepare an [`LocalChain`] update with blocks fetched from Esplora.
+    ///
+    /// * `prev_tip` is the previous tip of [`LocalChain::tip`].
+    /// * `get_heights` is the block heights that we are interested in fetching from Esplora.
+    ///
+    /// The result of this method can be applied to [`LocalChain::apply_update`].
+    ///
+    /// [`LocalChain`]: bdk_chain::local_chain::LocalChain
+    /// [`LocalChain::tip`]: bdk_chain::local_chain::LocalChain::tip
+    /// [`LocalChain::apply_update`]: bdk_chain::local_chain::LocalChain::apply_update
+    #[allow(clippy::result_large_err)]
+    fn update_local_chain(
+        &self,
+        local_tip: Option<CheckPoint>,
+        request_heights: impl IntoIterator<Item = u32>,
+    ) -> Result<local_chain::Update, Error>;
+
+    /// Scan Esplora for the data specified and return a [`TxGraph`] and a map of last active
+    /// indices.
     ///
-    /// - `local_chain`: the most recent block hashes present locally
-    /// - `keychain_spks`: keychains that we want to scan transactions for
-    /// - `txids`: transactions for which we want updated [`ConfirmationTimeAnchor`]s
-    /// - `outpoints`: transactions associated with these outpoints (residing, spending) that we
-    ///     want to included in the update
+    /// * `keychain_spks`: keychains that we want to scan transactions for
+    /// * `txids`: transactions for which we want updated [`ConfirmationTimeAnchor`]s
+    /// * `outpoints`: transactions associated with these outpoints (residing, spending) that we
+    ///     want to include in the update
     ///
     /// The scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated
     /// transactions. `parallel_requests` specifies the max number of HTTP requests to make in
     /// parallel.
-    #[allow(clippy::result_large_err)] // FIXME
-    fn scan<K: Ord + Clone>(
+    #[allow(clippy::result_large_err)]
+    fn update_tx_graph<K: Ord + Clone>(
         &self,
-        local_chain: &BTreeMap<u32, BlockHash>,
         keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
         txids: impl IntoIterator<Item = Txid>,
         outpoints: impl IntoIterator<Item = OutPoint>,
         stop_gap: usize,
         parallel_requests: usize,
-    ) -> Result<LocalUpdate<K, ConfirmationTimeAnchor>, Error>;
+    ) -> Result<(TxGraph<ConfirmationTimeAnchor>, BTreeMap<K, u32>), Error>;
 
-    /// Convenience method to call [`scan`] without requiring a keychain.
+    /// Convenience method to call [`update_tx_graph`] without requiring a keychain.
     ///
-    /// [`scan`]: EsploraExt::scan
-    #[allow(clippy::result_large_err)] // FIXME
-    fn scan_without_keychain(
+    /// [`update_tx_graph`]: EsploraExt::update_tx_graph
+    #[allow(clippy::result_large_err)]
+    fn update_tx_graph_without_keychain(
         &self,
-        local_chain: &BTreeMap<u32, BlockHash>,
         misc_spks: impl IntoIterator<Item = Script>,
         txids: impl IntoIterator<Item = Txid>,
         outpoints: impl IntoIterator<Item = OutPoint>,
         parallel_requests: usize,
-    ) -> Result<LocalUpdate<(), ConfirmationTimeAnchor>, Error> {
-        self.scan(
-            local_chain,
+    ) -> Result<TxGraph<ConfirmationTimeAnchor>, Error> {
+        self.update_tx_graph(
             [(
                 (),
                 misc_spks
@@ -62,190 +81,240 @@ pub trait EsploraExt {
             usize::MAX,
             parallel_requests,
         )
+        .map(|(g, _)| g)
     }
 }
 
 impl EsploraExt for esplora_client::BlockingClient {
-    fn scan<K: Ord + Clone>(
+    fn update_local_chain(
         &self,
-        local_chain: &BTreeMap<u32, BlockHash>,
-        keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
-        txids: impl IntoIterator<Item = Txid>,
-        outpoints: impl IntoIterator<Item = OutPoint>,
-        stop_gap: usize,
-        parallel_requests: usize,
-    ) -> Result<LocalUpdate<K, ConfirmationTimeAnchor>, Error> {
-        let parallel_requests = Ord::max(parallel_requests, 1);
+        local_tip: Option<CheckPoint>,
+        request_heights: impl IntoIterator<Item = u32>,
+    ) -> Result<local_chain::Update, Error> {
+        let request_heights = request_heights.into_iter().collect::<BTreeSet<_>>();
+        let new_tip_height = self.get_height()?;
 
-        let (mut update, tip_at_start) = loop {
-            let mut update = LocalUpdate::<K, ConfirmationTimeAnchor>::default();
-
-            for (&height, &original_hash) in local_chain.iter().rev() {
-                let update_block_id = BlockId {
-                    height,
-                    hash: self.get_block_hash(height)?,
-                };
-                let _ = update
-                    .chain
-                    .insert_block(update_block_id)
-                    .expect("cannot repeat height here");
-                if update_block_id.hash == original_hash {
-                    break;
-                }
+        // atomically fetch blocks from esplora
+        let mut fetched_blocks = {
+            let heights = (0..=new_tip_height).rev();
+            let hashes = self
+                .get_blocks(Some(new_tip_height))?
+                .into_iter()
+                .map(|b| b.id);
+            heights.zip(hashes).collect::<BTreeMap<u32, BlockHash>>()
+        };
+
+        // fetch heights that the caller is interested in
+        for height in request_heights {
+            // do not fetch blocks higher than remote tip
+            if height > new_tip_height {
+                continue;
+            }
+            // only fetch what is missing
+            if let btree_map::Entry::Vacant(entry) = fetched_blocks.entry(height) {
+                let hash = self.get_block_hash(height)?;
+                entry.insert(hash);
             }
+        }
 
-            let tip_at_start = BlockId {
-                height: self.get_height()?,
-                hash: self.get_tip_hash()?,
-            };
+        // find the earliest point of agreement between local chain and fetched chain
+        let earliest_agreement_cp = {
+            let mut earliest_agreement_cp = Option::<CheckPoint>::None;
+
+            if let Some(local_tip) = local_tip {
+                let local_tip_height = local_tip.height();
+                for local_cp in local_tip.iter() {
+                    let local_block = local_cp.block_id();
+
+                    // the updated hash (block hash at this height after the update), can either be:
+                    // 1. a block that already existed in `fetched_blocks`
+                    // 2. a block that exists locally and atleast has a depth of ASSUME_FINAL_DEPTH
+                    // 3. otherwise we can freshly fetch the block from remote, which is safe as it
+                    //    is guaranteed that this would be at or below ASSUME_FINAL_DEPTH from the
+                    //    remote tip
+                    let updated_hash = match fetched_blocks.entry(local_block.height) {
+                        btree_map::Entry::Occupied(entry) => *entry.get(),
+                        btree_map::Entry::Vacant(entry) => *entry.insert(
+                            if local_tip_height - local_block.height >= ASSUME_FINAL_DEPTH {
+                                local_block.hash
+                            } else {
+                                self.get_block_hash(local_block.height)?
+                            },
+                        ),
+                    };
+
+                    // since we may introduce blocks below the point of agreement, we cannot break
+                    // here unconditionally - we only break if we guarantee there are no new heights
+                    // below our current local checkpoint
+                    if local_block.hash == updated_hash {
+                        earliest_agreement_cp = Some(local_cp);
 
-            if update.chain.insert_block(tip_at_start).is_ok() {
-                break (update, tip_at_start);
+                        let first_new_height = *fetched_blocks
+                            .keys()
+                            .next()
+                            .expect("must have atleast one new block");
+                        if first_new_height >= local_block.height {
+                            break;
+                        }
+                    }
+                }
             }
+
+            earliest_agreement_cp
         };
 
+        let tip = {
+            // first checkpoint to use for the update chain
+            let first_cp = match earliest_agreement_cp {
+                Some(cp) => cp,
+                None => {
+                    let (&height, &hash) = fetched_blocks
+                        .iter()
+                        .next()
+                        .expect("must have atleast one new block");
+                    CheckPoint::new(BlockId { height, hash })
+                }
+            };
+            // transform fetched chain into the update chain
+            fetched_blocks
+                // we exclude anything at or below the first cp of the update chain otherwise
+                // building the chain will fail
+                .split_off(&(first_cp.height() + 1))
+                .into_iter()
+                .map(|(height, hash)| BlockId { height, hash })
+                .fold(first_cp, |prev_cp, block| {
+                    prev_cp.push(block).expect("must extend checkpoint")
+                })
+        };
+
+        Ok(local_chain::Update {
+            tip,
+            introduce_older_blocks: true,
+        })
+    }
+
+    fn update_tx_graph<K: Ord + Clone>(
+        &self,
+        keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
+        txids: impl IntoIterator<Item = Txid>,
+        outpoints: impl IntoIterator<Item = OutPoint>,
+        stop_gap: usize,
+        parallel_requests: usize,
+    ) -> Result<(TxGraph<ConfirmationTimeAnchor>, BTreeMap<K, u32>), Error> {
+        type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>);
+        let parallel_requests = Ord::max(parallel_requests, 1);
+        let mut graph = TxGraph::<ConfirmationTimeAnchor>::default();
+        let mut last_active_indexes = BTreeMap::<K, u32>::new();
+
         for (keychain, spks) in keychain_spks {
             let mut spks = spks.into_iter();
-            let mut last_active_index = None;
-            let mut empty_scripts = 0;
-            type IndexWithTxs = (u32, Vec<esplora_client::Tx>);
+            let mut last_index = Option::<u32>::None;
+            let mut last_active_index = Option::<u32>::None;
 
             loop {
-                let handles = (0..parallel_requests)
-                    .filter_map(
-                        |_| -> Option<std::thread::JoinHandle<Result<IndexWithTxs, _>>> {
-                            let (index, script) = spks.next()?;
+                let handles = spks
+                    .by_ref()
+                    .take(parallel_requests)
+                    .map(|(spk_index, spk)| {
+                        std::thread::spawn({
                             let client = self.clone();
-                            Some(std::thread::spawn(move || {
-                                let mut related_txs = client.scripthash_txs(&script, None)?;
-
-                                let n_confirmed =
-                                    related_txs.iter().filter(|tx| tx.status.confirmed).count();
-                                // esplora pages on 25 confirmed transactions. If there are 25 or more we
-                                // keep requesting to see if there's more.
-                                if n_confirmed >= 25 {
-                                    loop {
-                                        let new_related_txs = client.scripthash_txs(
-                                            &script,
-                                            Some(related_txs.last().unwrap().txid),
-                                        )?;
-                                        let n = new_related_txs.len();
-                                        related_txs.extend(new_related_txs);
-                                        // we've reached the end
-                                        if n < 25 {
-                                            break;
-                                        }
+                            move || -> Result<TxsOfSpkIndex, Error> {
+                                let mut last_seen = None;
+                                let mut spk_txs = Vec::new();
+                                loop {
+                                    let txs = client.scripthash_txs(&spk, last_seen)?;
+                                    let tx_count = txs.len();
+                                    last_seen = txs.last().map(|tx| tx.txid);
+                                    spk_txs.extend(txs);
+                                    if tx_count < 25 {
+                                        break Ok((spk_index, spk_txs));
                                     }
                                 }
+                            }
+                        })
+                    })
+                    .collect::<Vec<JoinHandle<Result<TxsOfSpkIndex, Error>>>>();
 
-                                Result::<_, esplora_client::Error>::Ok((index, related_txs))
-                            }))
-                        },
-                    )
-                    .collect::<Vec<_>>();
-
-                let n_handles = handles.len();
+                if handles.is_empty() {
+                    break;
+                }
 
                 for handle in handles {
-                    let (index, related_txs) = handle.join().unwrap()?; // TODO: don't unwrap
-                    if related_txs.is_empty() {
-                        empty_scripts += 1;
-                    } else {
+                    let (index, txs) = handle.join().expect("thread must not panic")?;
+                    last_index = Some(index);
+                    if !txs.is_empty() {
                         last_active_index = Some(index);
-                        empty_scripts = 0;
                     }
-                    for tx in related_txs {
-                        let anchor = map_confirmation_time_anchor(&tx.status, tip_at_start);
-
-                        let _ = update.graph.insert_tx(tx.to_tx());
-                        if let Some(anchor) = anchor {
-                            let _ = update.graph.insert_anchor(tx.txid, anchor);
+                    for tx in txs {
+                        let _ = graph.insert_tx(tx.to_tx());
+                        if let Some(anchor) = anchor_from_status(&tx.status) {
+                            let _ = graph.insert_anchor(tx.txid, anchor);
                         }
                     }
                 }
 
-                if n_handles == 0 || empty_scripts >= stop_gap {
+                if last_index > last_active_index.map(|i| i + stop_gap as u32) {
                     break;
                 }
             }
 
             if let Some(last_active_index) = last_active_index {
-                update.keychain.insert(keychain, last_active_index);
+                last_active_indexes.insert(keychain, last_active_index);
             }
         }
 
-        for txid in txids.into_iter() {
-            if update.graph.get_tx(txid).is_none() {
-                match self.get_tx(&txid)? {
-                    Some(tx) => {
-                        let _ = update.graph.insert_tx(tx);
-                    }
-                    None => continue,
-                }
+        let mut txids = txids.into_iter();
+        loop {
+            let handles = txids
+                .by_ref()
+                .take(parallel_requests)
+                .filter(|&txid| graph.get_tx(txid).is_none())
+                .map(|txid| {
+                    std::thread::spawn({
+                        let client = self.clone();
+                        move || client.get_tx_status(&txid).map(|s| (txid, s))
+                    })
+                })
+                .collect::<Vec<JoinHandle<Result<(Txid, TxStatus), Error>>>>();
+
+            if handles.is_empty() {
+                break;
             }
-            match self.get_tx_status(&txid)? {
-                tx_status @ TxStatus {
-                    confirmed: true, ..
-                } => {
-                    if let Some(anchor) = map_confirmation_time_anchor(&tx_status, tip_at_start) {
-                        let _ = update.graph.insert_anchor(txid, anchor);
-                    }
+
+            for handle in handles {
+                let (txid, status) = handle.join().expect("thread must not panic")?;
+                if let Some(anchor) = anchor_from_status(&status) {
+                    let _ = graph.insert_anchor(txid, anchor);
                 }
-                _ => continue,
             }
         }
 
         for op in outpoints.into_iter() {
-            let mut op_txs = Vec::with_capacity(2);
-            if let (
-                Some(tx),
-                tx_status @ TxStatus {
-                    confirmed: true, ..
-                },
-            ) = (self.get_tx(&op.txid)?, self.get_tx_status(&op.txid)?)
-            {
-                op_txs.push((tx, tx_status));
-                if let Some(OutputStatus {
-                    txid: Some(txid),
-                    status: Some(spend_status),
-                    ..
-                }) = self.get_output_status(&op.txid, op.vout as _)?
-                {
-                    if let Some(spend_tx) = self.get_tx(&txid)? {
-                        op_txs.push((spend_tx, spend_status));
-                    }
+            if graph.get_tx(op.txid).is_none() {
+                if let Some(tx) = self.get_tx(&op.txid)? {
+                    let _ = graph.insert_tx(tx);
+                }
+                let status = self.get_tx_status(&op.txid)?;
+                if let Some(anchor) = anchor_from_status(&status) {
+                    let _ = graph.insert_anchor(op.txid, anchor);
                 }
             }
 
-            for (tx, status) in op_txs {
-                let txid = tx.txid();
-                let anchor = map_confirmation_time_anchor(&status, tip_at_start);
-
-                let _ = update.graph.insert_tx(tx);
-                if let Some(anchor) = anchor {
-                    let _ = update.graph.insert_anchor(txid, anchor);
+            if let Some(op_status) = self.get_output_status(&op.txid, op.vout as _)? {
+                if let Some(txid) = op_status.txid {
+                    if graph.get_tx(txid).is_none() {
+                        if let Some(tx) = self.get_tx(&txid)? {
+                            let _ = graph.insert_tx(tx);
+                        }
+                        let status = self.get_tx_status(&txid)?;
+                        if let Some(anchor) = anchor_from_status(&status) {
+                            let _ = graph.insert_anchor(txid, anchor);
+                        }
+                    }
                 }
             }
         }
 
-        if tip_at_start.hash != self.get_block_hash(tip_at_start.height)? {
-            // A reorg occurred, so let's find out where all the txids we found are now in the chain
-            let txids_found = update
-                .graph
-                .full_txs()
-                .map(|tx_node| tx_node.txid)
-                .collect::<Vec<_>>();
-            update.chain = EsploraExt::scan_without_keychain(
-                self,
-                local_chain,
-                [],
-                txids_found,
-                [],
-                parallel_requests,
-            )?
-            .chain;
-        }
-
-        Ok(update)
+        Ok((graph, last_active_indexes))
     }
 }
index d5f8d8af6fc64fbff3a3cfd141c2cf4772bb07b0..9954ccec0222ffa4165db210e0886260a2b8dd24 100644 (file)
@@ -14,16 +14,22 @@ mod async_ext;
 #[cfg(feature = "async")]
 pub use async_ext::*;
 
-pub(crate) fn map_confirmation_time_anchor(
-    tx_status: &TxStatus,
-    tip_at_start: BlockId,
-) -> Option<ConfirmationTimeAnchor> {
-    match (tx_status.block_time, tx_status.block_height) {
-        (Some(confirmation_time), Some(confirmation_height)) => Some(ConfirmationTimeAnchor {
-            anchor_block: tip_at_start,
-            confirmation_height,
-            confirmation_time,
-        }),
-        _ => None,
+const ASSUME_FINAL_DEPTH: u32 = 15;
+
+fn anchor_from_status(status: &TxStatus) -> Option<ConfirmationTimeAnchor> {
+    if let TxStatus {
+        block_height: Some(height),
+        block_hash: Some(hash),
+        block_time: Some(time),
+        ..
+    } = status.clone()
+    {
+        Some(ConfirmationTimeAnchor {
+            anchor_block: BlockId { height, hash },
+            confirmation_height: height,
+            confirmation_time: time,
+        })
+    } else {
+        None
     }
 }
index 41d39423477617884601e7328540fd7903b90197..537412f076ca4ae41e2efd4b994b555262a21307 100644 (file)
@@ -5,7 +5,7 @@ use std::{
 };
 
 use bdk_chain::{
-    bitcoin::{Address, BlockHash, Network, OutPoint, Txid},
+    bitcoin::{Address, Network, OutPoint, Txid},
     indexed_tx_graph::{IndexedAdditions, IndexedTxGraph},
     keychain::LocalChangeSet,
     local_chain::LocalChain,
@@ -22,8 +22,7 @@ use example_cli::{
 };
 
 const DB_MAGIC: &[u8] = b"bdk_example_electrum";
-const DB_PATH: &str = ".bdk_electrum_example.db";
-const ASSUME_FINAL_DEPTH: usize = 10;
+const DB_PATH: &str = ".bdk_example_electrum.db";
 
 #[derive(Subcommand, Debug, Clone)]
 enum ElectrumCommands {
@@ -73,11 +72,7 @@ fn main() -> anyhow::Result<()> {
         graph
     });
 
-    let chain = Mutex::new({
-        let mut chain = LocalChain::default();
-        chain.apply_changeset(init_changeset.chain_changeset);
-        chain
-    });
+    let chain = Mutex::new(LocalChain::from_changeset(init_changeset.chain_changeset));
 
     let electrum_url = match args.network {
         Network::Bitcoin => "ssl://electrum.blockstream.info:50002",
@@ -119,7 +114,7 @@ fn main() -> anyhow::Result<()> {
             stop_gap,
             scan_options,
         } => {
-            let (keychain_spks, local_chain) = {
+            let (keychain_spks, tip) = {
                 let graph = &*graph.lock().unwrap();
                 let chain = &*chain.lock().unwrap();
 
@@ -142,20 +137,13 @@ fn main() -> anyhow::Result<()> {
                     })
                     .collect::<BTreeMap<_, _>>();
 
-                let c = chain
-                    .blocks()
-                    .iter()
-                    .rev()
-                    .take(ASSUME_FINAL_DEPTH)
-                    .map(|(k, v)| (*k, *v))
-                    .collect::<BTreeMap<u32, BlockHash>>();
-
-                (keychain_spks, c)
+                let tip = chain.tip();
+                (keychain_spks, tip)
             };
 
             client
                 .scan(
-                    &local_chain,
+                    tip,
                     keychain_spks,
                     core::iter::empty(),
                     core::iter::empty(),
@@ -174,7 +162,7 @@ fn main() -> anyhow::Result<()> {
             // Get a short lock on the tracker to get the spks we're interested in
             let graph = graph.lock().unwrap();
             let chain = chain.lock().unwrap();
-            let chain_tip = chain.tip().unwrap_or_default();
+            let chain_tip = chain.tip().map(|cp| cp.block_id()).unwrap_or_default();
 
             if !(all_spks || unused_spks || utxos || unconfirmed) {
                 unused_spks = true;
@@ -254,23 +242,17 @@ fn main() -> anyhow::Result<()> {
                 }));
             }
 
-            let c = chain
-                .blocks()
-                .iter()
-                .rev()
-                .take(ASSUME_FINAL_DEPTH)
-                .map(|(k, v)| (*k, *v))
-                .collect::<BTreeMap<u32, BlockHash>>();
+            let tip = chain.tip();
 
             // drop lock on graph and chain
             drop((graph, chain));
 
             let update = client
-                .scan_without_keychain(&c, spks, txids, outpoints, scan_options.batch_size)
+                .scan_without_keychain(tip, spks, txids, outpoints, scan_options.batch_size)
                 .context("scanning the blockchain")?;
             ElectrumUpdate {
                 graph_update: update.graph_update,
-                chain_update: update.chain_update,
+                new_tip: update.new_tip,
                 keychain_update: BTreeMap::new(),
             }
         }
index db80f106d13598fd898aee348790e837d4da20c7..2355a6fb0d784b1cabcb7e4f9c135c90ecab25bd 100644 (file)
@@ -35,7 +35,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
     print!("Syncing...");
     let client = electrum_client::Client::new("ssl://electrum.blockstream.info:60002")?;
 
-    let local_chain = wallet.checkpoints();
+    let prev_tip = wallet.latest_checkpoint();
     let keychain_spks = wallet
         .spks_of_all_keychains()
         .into_iter()
@@ -52,8 +52,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
         })
         .collect();
 
-    let electrum_update =
-        client.scan(local_chain, keychain_spks, None, None, STOP_GAP, BATCH_SIZE)?;
+    let electrum_update = client.scan(prev_tip, keychain_spks, None, None, STOP_GAP, BATCH_SIZE)?;
 
     println!();
 
index 119d9cbd792681c3273e78d54247a2c11d5b0f86..530aee5bac35a198a486801f4a410e33b00b7a01 100644 (file)
@@ -1,12 +1,13 @@
 const DB_MAGIC: &str = "bdk_wallet_esplora_example";
-const SEND_AMOUNT: u64 = 5000;
-const STOP_GAP: usize = 50;
-const PARALLEL_REQUESTS: usize = 5;
+const SEND_AMOUNT: u64 = 1000;
+const STOP_GAP: usize = 5;
+const PARALLEL_REQUESTS: usize = 1;
 
 use std::{io::Write, str::FromStr};
 
 use bdk::{
     bitcoin::{Address, Network},
+    chain::keychain::LocalUpdate,
     wallet::AddressIndex,
     SignOptions, Wallet,
 };
@@ -36,7 +37,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
     let client =
         esplora_client::Builder::new("https://blockstream.info/testnet/api").build_blocking()?;
 
-    let local_chain = wallet.checkpoints();
+    let prev_tip = wallet.latest_checkpoint();
     let keychain_spks = wallet
         .spks_of_all_keychains()
         .into_iter()
@@ -52,17 +53,20 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
             (k, k_spks)
         })
         .collect();
-    let update = client.scan(
-        local_chain,
-        keychain_spks,
-        None,
-        None,
-        STOP_GAP,
-        PARALLEL_REQUESTS,
-    )?;
-    println!();
+
+    let (update_graph, last_active_indices) =
+        client.update_tx_graph(keychain_spks, None, None, STOP_GAP, PARALLEL_REQUESTS)?;
+    let get_heights = wallet.tx_graph().missing_blocks(wallet.local_chain());
+    let chain_update = client.update_local_chain(prev_tip, get_heights)?;
+    let update = LocalUpdate {
+        keychain: last_active_indices,
+        graph: update_graph,
+        ..LocalUpdate::new(chain_update)
+    };
+
     wallet.apply_update(update)?;
     wallet.commit()?;
+    println!();
 
     let balance = wallet.get_balance();
     println!("Wallet balance after syncing: {} sats", balance.total());
index 7cb218ec24f734d36552d6a184757aa161bf475a..fe1c85a227f540560bb0b21f751b9b975c36cb6f 100644 (file)
@@ -2,6 +2,7 @@ use std::{io::Write, str::FromStr};
 
 use bdk::{
     bitcoin::{Address, Network},
+    chain::keychain::LocalUpdate,
     wallet::AddressIndex,
     SignOptions, Wallet,
 };
@@ -37,7 +38,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
     let client =
         esplora_client::Builder::new("https://blockstream.info/testnet/api").build_async()?;
 
-    let local_chain = wallet.checkpoints();
+    let prev_tip = wallet.latest_checkpoint();
     let keychain_spks = wallet
         .spks_of_all_keychains()
         .into_iter()
@@ -53,19 +54,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
             (k, k_spks)
         })
         .collect();
-    let update = client
-        .scan(
-            local_chain,
-            keychain_spks,
-            [],
-            [],
-            STOP_GAP,
-            PARALLEL_REQUESTS,
-        )
+    let (update_graph, last_active_indices) = client
+        .update_tx_graph(keychain_spks, None, None, STOP_GAP, PARALLEL_REQUESTS)
         .await?;
-    println!();
+    let get_heights = wallet.tx_graph().missing_blocks(wallet.local_chain());
+    let chain_update = client.update_local_chain(prev_tip, get_heights).await?;
+    let update = LocalUpdate {
+        keychain: last_active_indices,
+        graph: update_graph,
+        ..LocalUpdate::new(chain_update)
+    };
     wallet.apply_update(update)?;
     wallet.commit()?;
+    println!();
 
     let balance = wallet.get_balance();
     println!("Wallet balance after syncing: {} sats", balance.total());