]> Untitled Git - bdk/commitdiff
Fix wallet sync not finding coins of addresses which are not cached
author志宇 <hello@evanlinjin.me>
Sun, 17 Jul 2022 13:02:19 +0000 (21:02 +0800)
committer志宇 <hello@evanlinjin.me>
Wed, 20 Jul 2022 15:08:12 +0000 (23:08 +0800)
Previously, electrum-based blockchain implementations only synced for
`scriptPubKey`s that are already cached in `Database`.

This PR introduces a feedback mechanism, that uses `stop_gap` and the
difference between "current index" and "last active index" to determine
whether we need to cache more `scriptPubKeys`.

The `WalletSync::wallet_setup` trait now may return an
`Error::MissingCachedScripts` error which contains the number of extra
`scriptPubKey`s to cache, in order to satisfy `stop_gap` for the next call.

`Wallet::sync` now calls `WalletSync` in a loop, cacheing inbetween
subsequent calls (if needed).

src/blockchain/esplora/reqwest.rs
src/blockchain/script_sync.rs
src/error.rs
src/testutils/blockchain_tests.rs
src/testutils/configurable_blockchain_tests.rs
src/wallet/mod.rs

index 0d40506082a729107d9f20b450e64c45fed76709..302e811fd3dc6f9ed7af2427e2b36c9b62b1c776 100644 (file)
@@ -213,7 +213,6 @@ impl WalletSync for EsploraBlockchain {
         };
 
         database.commit_batch(batch_update)?;
-
         Ok(())
     }
 }
index 0575273608c0d78a99cf0e6375b0ce17ffa5be12..2c4b26cef9a12a8c2360ce44ca55d0b35d858803 100644 (file)
@@ -5,6 +5,7 @@ returns associated transactions i.e. electrum.
 #![allow(dead_code)]
 use crate::{
     database::{BatchDatabase, BatchOperations, DatabaseUtils},
+    error::MissingCachedScripts,
     wallet::time::Instant,
     BlockTime, Error, KeychainKind, LocalUtxo, TransactionDetails,
 };
@@ -34,11 +35,12 @@ pub fn start<D: BatchDatabase>(db: &D, stop_gap: usize) -> Result<Request<'_, D>
     let scripts_needed = db
         .iter_script_pubkeys(Some(keychain))?
         .into_iter()
-        .collect();
+        .collect::<VecDeque<_>>();
     let state = State::new(db);
 
     Ok(Request::Script(ScriptReq {
         state,
+        initial_scripts_needed: scripts_needed.len(),
         scripts_needed,
         script_index: 0,
         stop_gap,
@@ -50,6 +52,7 @@ pub fn start<D: BatchDatabase>(db: &D, stop_gap: usize) -> Result<Request<'_, D>
 pub struct ScriptReq<'a, D: BatchDatabase> {
     state: State<'a, D>,
     script_index: usize,
+    initial_scripts_needed: usize, // if this is 1, we assume the descriptor is not derivable
     scripts_needed: VecDeque<Script>,
     stop_gap: usize,
     keychain: KeychainKind,
@@ -113,43 +116,71 @@ impl<'a, D: BatchDatabase> ScriptReq<'a, D> {
             self.script_index += 1;
         }
 
-        for _ in txids {
-            self.scripts_needed.pop_front();
-        }
+        self.scripts_needed.drain(..txids.len());
 
-        let last_active_index = self
+        // last active index: 0 => No last active
+        let last = self
             .state
             .last_active_index
             .get(&self.keychain)
-            .map(|x| x + 1)
-            .unwrap_or(0); // so no addresses active maps to 0
-
-        Ok(
-            if self.script_index > last_active_index + self.stop_gap
-                || self.scripts_needed.is_empty()
-            {
-                debug!(
-                    "finished scanning for transactions for keychain {:?} at index {}",
-                    self.keychain, last_active_index
-                );
-                // we're done here -- check if we need to do the next keychain
-                if let Some(keychain) = self.next_keychains.pop() {
-                    self.keychain = keychain;
-                    self.script_index = 0;
-                    self.scripts_needed = self
-                        .state
-                        .db
-                        .iter_script_pubkeys(Some(keychain))?
-                        .into_iter()
-                        .collect();
-                    Request::Script(self)
-                } else {
-                    Request::Tx(TxReq { state: self.state })
-                }
-            } else {
-                Request::Script(self)
-            },
-        )
+            .map(|&l| l + 1)
+            .unwrap_or(0);
+        // remaining scripts left to check
+        let remaining = self.scripts_needed.len();
+        // difference between current index and last active index
+        let current_gap = self.script_index - last;
+
+        // this is a hack to check whether the scripts are coming from a derivable descriptor
+        // we assume for non-derivable descriptors, the initial script count is always 1
+        let is_derivable = self.initial_scripts_needed > 1;
+
+        debug!(
+            "sync: last={}, remaining={}, diff={}, stop_gap={}",
+            last, remaining, current_gap, self.stop_gap
+        );
+
+        if is_derivable {
+            if remaining > 0 {
+                // we still have scriptPubKeys to do requests for
+                return Ok(Request::Script(self));
+            }
+
+            if last > 0 && current_gap < self.stop_gap {
+                // current gap is not large enough to stop, but we are unable to keep checking since
+                // we have exhausted cached scriptPubKeys, so return error
+                let err = MissingCachedScripts {
+                    last_count: self.script_index,
+                    missing_count: self.stop_gap - current_gap,
+                };
+                return Err(Error::MissingCachedScripts(err));
+            }
+
+            // we have exhausted cached scriptPubKeys and found no txs, continue
+        }
+
+        debug!(
+            "finished scanning for txs of keychain {:?} at index {:?}",
+            self.keychain, last
+        );
+
+        if let Some(keychain) = self.next_keychains.pop() {
+            // we still have another keychain to request txs with
+            let scripts_needed = self
+                .state
+                .db
+                .iter_script_pubkeys(Some(keychain))?
+                .into_iter()
+                .collect::<VecDeque<_>>();
+
+            self.keychain = keychain;
+            self.script_index = 0;
+            self.initial_scripts_needed = scripts_needed.len();
+            self.scripts_needed = scripts_needed;
+            return Ok(Request::Script(self));
+        }
+
+        // We have finished requesting txids, let's get the actual txs.
+        Ok(Request::Tx(TxReq { state: self.state }))
     }
 }
 
@@ -294,6 +325,8 @@ struct State<'a, D> {
     tx_missing_conftime: BTreeMap<Txid, TransactionDetails>,
     /// The start of the sync
     start_time: Instant,
+    /// Missing number of scripts to cache per keychain
+    missing_script_counts: HashMap<KeychainKind, usize>,
 }
 
 impl<'a, D: BatchDatabase> State<'a, D> {
@@ -305,6 +338,7 @@ impl<'a, D: BatchDatabase> State<'a, D> {
             tx_needed: BTreeSet::default(),
             tx_missing_conftime: BTreeMap::default(),
             start_time: Instant::new(),
+            missing_script_counts: HashMap::default(),
         }
     }
     fn into_db_update(self) -> Result<D::Batch, Error> {
index 15ec5713daefaaeef392e810c6bdacc565febcc2..c3f9ea15eb8a8e4472b6d51ef34878e5161083bf 100644 (file)
@@ -13,7 +13,7 @@ use std::fmt;
 
 use crate::bitcoin::Network;
 use crate::{descriptor, wallet, wallet::address_validator};
-use bitcoin::OutPoint;
+use bitcoin::{OutPoint, Txid};
 
 /// Errors that can be thrown by the [`Wallet`](crate::wallet::Wallet)
 #[derive(Debug)]
@@ -125,6 +125,10 @@ pub enum Error {
     //DifferentDescriptorStructure,
     //Uncapable(crate::blockchain::Capability),
     //MissingCachedAddresses,
+    /// [`crate::blockchain::WalletSync`] sync attempt failed due to missing scripts in cache which
+    /// are needed to satisfy `stop_gap`.
+    MissingCachedScripts(MissingCachedScripts),
+
     #[cfg(feature = "electrum")]
     /// Electrum client error
     Electrum(electrum_client::Error),
@@ -145,6 +149,16 @@ pub enum Error {
     Rusqlite(rusqlite::Error),
 }
 
+/// Represents the last failed [`crate::blockchain::WalletSync`] sync attempt in which we were short
+/// on cached `scriptPubKey`s.
+#[derive(Debug)]
+pub struct MissingCachedScripts {
+    /// Number of scripts in which txs were requested during last request.
+    pub last_count: usize,
+    /// Minimum number of scripts to cache more of in order to satisfy `stop_gap`.
+    pub missing_count: usize,
+}
+
 impl fmt::Display for Error {
     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
         write!(f, "{:?}", self)
index 7c08699c67c4f2eafb54f0df8be740ec21d6249c..89e09133587c1e0a554de08a407c392aa99434e0 100644 (file)
@@ -743,7 +743,6 @@ macro_rules! bdk_blockchain_tests {
 
                 blockchain.broadcast(&tx1).expect("broadcasting first");
                 blockchain.broadcast(&tx2).expect("broadcasting replacement");
-
                 receiver_wallet.sync(&blockchain, SyncOptions::default()).expect("syncing receiver");
                 assert_eq!(receiver_wallet.get_balance().expect("balance"), 49_000, "should have received coins once and only once");
             }
index a39608bb149a99d02676d3cedf2b46e0a8f72d27..b07fc9cdaed23bc1a7cfbea15a40168e35023ad5 100644 (file)
@@ -29,6 +29,8 @@ pub trait ConfigurableBlockchainTester<B: ConfigurableBlockchain>: Sized {
 
         if self.config_with_stop_gap(test_client, 0).is_some() {
             test_wallet_sync_with_stop_gaps(test_client, self);
+            test_wallet_sync_fulfills_missing_script_cache(test_client, self);
+            test_wallet_sync_self_transfer_tx(test_client, self);
         } else {
             println!(
                 "{}: Skipped tests requiring config_with_stop_gap.",
@@ -113,16 +115,21 @@ where
         } else {
             max_balance
         };
+        let details = format!(
+            "test_vector: [stop_gap: {}, actual_gap: {}, addrs_before: {}, addrs_after: {}]",
+            stop_gap, actual_gap, addrs_before, addrs_after,
+        );
+        println!("{}", details);
 
         // perform wallet sync
         wallet.sync(&blockchain, Default::default()).unwrap();
 
         let wallet_balance = wallet.get_balance().unwrap();
-
-        let details = format!(
-            "test_vector: [stop_gap: {}, actual_gap: {}, addrs_before: {}, addrs_after: {}]",
-            stop_gap, actual_gap, addrs_before, addrs_after,
+        println!(
+            "max: {}, min: {}, actual: {}",
+            max_balance, min_balance, wallet_balance
         );
+
         assert!(
             wallet_balance <= max_balance,
             "wallet balance is greater than received amount: {}",
@@ -138,3 +145,113 @@ where
         test_client.generate(1, None);
     }
 }
+
+/// With a `stop_gap` of x and every x addresses having a balance of 1000 (for y addresses),
+/// we expect `Wallet::sync` to correctly self-cache addresses, so that the resulting balance,
+/// after sync, should be y * 1000.
+fn test_wallet_sync_fulfills_missing_script_cache<T, B>(test_client: &mut TestClient, tester: &T)
+where
+    T: ConfigurableBlockchainTester<B>,
+    B: ConfigurableBlockchain,
+{
+    // wallet descriptor
+    let descriptor = "wpkh([c258d2e4/84h/1h/0h]tpubDDYkZojQFQjht8Tm4jsS3iuEmKjTiEGjG6KnuFNKKJb5A6ZUCUZKdvLdSDWofKi4ToRCwb9poe1XdqfUnP4jaJjCB2Zwv11ZLgSbnZSNecE/200/*)";
+
+    // amount in sats per tx
+    const AMOUNT_PER_TX: u64 = 1000;
+
+    // addr constants
+    const ADDR_COUNT: usize = 6;
+    const ADDR_GAP: usize = 60;
+
+    let blockchain =
+        B::from_config(&tester.config_with_stop_gap(test_client, ADDR_GAP).unwrap()).unwrap();
+
+    let wallet = Wallet::new(descriptor, None, Network::Regtest, MemoryDatabase::new()).unwrap();
+
+    let expected_balance = (0..ADDR_COUNT).fold(0_u64, |sum, i| {
+        let addr_i = i * ADDR_GAP;
+        let address = wallet.get_address(AddressIndex::Peek(addr_i as _)).unwrap();
+
+        println!(
+            "tx: {} sats => [{}] {}",
+            AMOUNT_PER_TX,
+            addr_i,
+            address.to_string()
+        );
+
+        test_client.receive(testutils! {
+            @tx ( (@addr address.address) => AMOUNT_PER_TX )
+        });
+        test_client.generate(1, None);
+
+        sum + AMOUNT_PER_TX
+    });
+    println!("expected balance: {}, syncing...", expected_balance);
+
+    // perform sync
+    wallet.sync(&blockchain, Default::default()).unwrap();
+    println!("sync done!");
+
+    let balance = wallet.get_balance().unwrap();
+    assert_eq!(balance, expected_balance);
+}
+
+/// Given a `stop_gap`, a wallet with a 2 transactions, one sending to `scriptPubKey` at derivation
+/// index of `stop_gap`, and the other spending from the same `scriptPubKey` into another
+/// `scriptPubKey` at derivation index of `stop_gap * 2`, we expect `Wallet::sync` to perform
+/// correctly, so that we detect the total balance.
+fn test_wallet_sync_self_transfer_tx<T, B>(test_client: &mut TestClient, tester: &T)
+where
+    T: ConfigurableBlockchainTester<B>,
+    B: ConfigurableBlockchain,
+{
+    const TRANSFER_AMOUNT: u64 = 10_000;
+    const STOP_GAP: usize = 75;
+
+    let descriptor = "wpkh(tprv8i8F4EhYDMquzqiecEX8SKYMXqfmmb1Sm7deoA1Hokxzn281XgTkwsd6gL8aJevLE4aJugfVf9MKMvrcRvPawGMenqMBA3bRRfp4s1V7Eg3/*)";
+
+    let blockchain =
+        B::from_config(&tester.config_with_stop_gap(test_client, STOP_GAP).unwrap()).unwrap();
+
+    let wallet = Wallet::new(descriptor, None, Network::Regtest, MemoryDatabase::new()).unwrap();
+
+    let address1 = wallet
+        .get_address(AddressIndex::Peek(STOP_GAP as _))
+        .unwrap();
+    let address2 = wallet
+        .get_address(AddressIndex::Peek((STOP_GAP * 2) as _))
+        .unwrap();
+
+    test_client.receive(testutils! {
+        @tx ( (@addr address1.address) => TRANSFER_AMOUNT )
+    });
+    test_client.generate(1, None);
+
+    wallet.sync(&blockchain, Default::default()).unwrap();
+
+    let mut builder = wallet.build_tx();
+    builder.add_recipient(address2.script_pubkey(), TRANSFER_AMOUNT / 2);
+    let (mut psbt, details) = builder.finish().unwrap();
+    assert!(wallet.sign(&mut psbt, Default::default()).unwrap());
+    blockchain.broadcast(&psbt.extract_tx()).unwrap();
+
+    test_client.generate(1, None);
+
+    // obtain what is expected
+    let fee = details.fee.unwrap();
+    let expected_balance = TRANSFER_AMOUNT - fee;
+    println!("fee={}, expected_balance={}", fee, expected_balance);
+
+    // actually test the wallet
+    wallet.sync(&blockchain, Default::default()).unwrap();
+    let balance = wallet.get_balance().unwrap();
+    assert_eq!(balance, expected_balance);
+
+    // now try with a fresh wallet
+    let fresh_wallet =
+        Wallet::new(descriptor, None, Network::Regtest, MemoryDatabase::new()).unwrap();
+    fresh_wallet.sync(&blockchain, Default::default()).unwrap();
+    let fresh_balance = fresh_wallet.get_balance().unwrap();
+    assert_eq!(fresh_balance, expected_balance);
+}
index 10c8cabe6f1effcfa67e3784e4bc7927e1c5c2b2..66366e9e4521f439c664a3b5f3190572c20d65c6 100644 (file)
@@ -1685,20 +1685,62 @@ where
     ) -> Result<(), Error> {
         debug!("Begin sync...");
 
-        let SyncOptions { progress } = sync_opts;
-        let progress = progress.unwrap_or_else(|| Box::new(NoopProgress));
+        // TODO: for the next runs, we cannot reuse the `sync_opts.progress` object due to trait
+        // restrictions
+        let mut progress_iter = sync_opts.progress.into_iter();
+        let mut new_progress = || {
+            progress_iter
+                .next()
+                .unwrap_or_else(|| Box::new(NoopProgress))
+        };
 
         let run_setup = self.ensure_addresses_cached(CACHE_ADDR_BATCH_SIZE)?;
-
         debug!("run_setup: {}", run_setup);
+
         // TODO: what if i generate an address first and cache some addresses?
         // TODO: we should sync if generating an address triggers a new batch to be stored
-        if run_setup {
-            maybe_await!(
-                blockchain.wallet_setup(self.database.borrow_mut().deref_mut(), progress,)
+
+        // We need to ensure descriptor is derivable to fullfil "missing cache", otherwise we will
+        // end up with an infinite loop
+        let is_deriveable = self.descriptor.is_deriveable()
+            && (self.change_descriptor.is_none()
+                || self.change_descriptor.as_ref().unwrap().is_deriveable());
+
+        // Restrict max rounds in case of faulty "missing cache" implementation by blockchain
+        let max_rounds = if is_deriveable { 100 } else { 1 };
+
+        for _ in 0..max_rounds {
+            let sync_res =
+                if run_setup {
+                    maybe_await!(blockchain
+                        .wallet_setup(self.database.borrow_mut().deref_mut(), new_progress()))
+                } else {
+                    maybe_await!(blockchain
+                        .wallet_sync(self.database.borrow_mut().deref_mut(), new_progress()))
+                };
+
+            // If the error is the special `MissingCachedScripts` error, we return the number of
+            // scripts we should ensure cached.
+            // On any other error, we should return the error.
+            // On no error, we say `ensure_cache` is 0.
+            let ensure_cache = sync_res.map_or_else(
+                |e| match e {
+                    Error::MissingCachedScripts(inner) => {
+                        // each call to `WalletSync` is expensive, maximize on scripts to search for
+                        let extra =
+                            std::cmp::max(inner.missing_count as u32, CACHE_ADDR_BATCH_SIZE);
+                        let last = inner.last_count as u32;
+                        Ok(extra + last)
+                    }
+                    _ => Err(e),
+                },
+                |_| Ok(0_u32),
             )?;
-        } else {
-            maybe_await!(blockchain.wallet_sync(self.database.borrow_mut().deref_mut(), progress,))?;
+
+            // cache and try again, break when there is nothing to cache
+            if !self.ensure_addresses_cached(ensure_cache)? {
+                break;
+            }
         }
 
         let sync_time = SyncTime {