]> Untitled Git - bdk/commitdiff
New `RpcBlockchain` implementation with various fixes
author志宇 <hello@evanlinjin.me>
Fri, 22 Jul 2022 23:44:39 +0000 (07:44 +0800)
committer志宇 <hello@evanlinjin.me>
Thu, 4 Aug 2022 03:27:37 +0000 (11:27 +0800)
The new implementation fixes the following:
* We can track more than 100 scriptPubKeys
* We can obtain more than 1000 transactions per sync
* `TransactionDetails` for already-synced transactions are updated when
  new scriptPubKeys are introduced (fixing the missing balance/coins
      issue of supposedly tracked scriptPubKeys)

`RpcConfig` changes:
* Introduce `RpcSyncParams`.
* Remove `RpcConfig::skip_blocks` (this is replaced by
  `RpcSyncParams::start_time`).

CHANGELOG.md
examples/rpcwallet.rs
src/blockchain/rpc.rs
src/testutils/blockchain_tests.rs

index fa369a47f3349d48d1941ef29d9f9b6358a211fb..3fda3e0601ef53d49d5acfd7fd432a40b6a550be 100644 (file)
@@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 - Add the ability to specify whether a taproot transaction should be signed using the internal key or not, using `sign_with_tap_internal_key` in `SignOptions`
 - Consolidate params `fee_amount` and `amount_needed` in `target_amount` in `CoinSelectionAlgorithm::coin_select` signature.
 - Change the meaning of the `fee_amount` field inside `CoinSelectionResult`: from now on the `fee_amount` will represent only the fees asociated with the utxos in the `selected` field of `CoinSelectionResult`.
+- New `RpcBlockchain` implementation with various fixes.
 
 ## [v0.20.0] - [v0.19.0]
 
index 3178af6bb0a9f8d8d2b72e46aa0e60669caafe48..24a55591033601a0c9d8b4fd954e2b4f90a98f3c 100644 (file)
@@ -103,7 +103,7 @@ fn main() -> Result<(), Box<dyn Error>> {
         auth: bitcoind_auth,
         network: Network::Regtest,
         wallet_name,
-        skip_blocks: None,
+        sync_params: None,
     };
 
     // Use the above configuration to create a RPC blockchain backend
index 1d0d884c09b4d8558128c987b718cdb5c6f6aba5..410e92f929df6ffcbd112b5a01523bff76b3da5a 100644 (file)
 //!     },
 //!     network: bdk::bitcoin::Network::Testnet,
 //!     wallet_name: "wallet_name".to_string(),
-//!     skip_blocks: None,
+//!     sync_params: None,
 //! };
 //! let blockchain = RpcBlockchain::from_config(&config);
 //! ```
 
-use crate::bitcoin::consensus::deserialize;
 use crate::bitcoin::hashes::hex::ToHex;
-use crate::bitcoin::{Address, Network, OutPoint, Transaction, TxOut, Txid};
+use crate::bitcoin::{Network, OutPoint, Transaction, TxOut, Txid};
 use crate::blockchain::*;
-use crate::database::{BatchDatabase, DatabaseUtils};
+use crate::database::{BatchDatabase, BatchOperations, DatabaseUtils};
 use crate::descriptor::get_checksum;
+use crate::error::MissingCachedScripts;
 use crate::{BlockTime, Error, FeeRate, KeychainKind, LocalUtxo, TransactionDetails};
+use bitcoin::Script;
 use bitcoincore_rpc::json::{
-    GetAddressInfoResultLabel, ImportMultiOptions, ImportMultiRequest,
-    ImportMultiRequestScriptPubkey, ImportMultiRescanSince,
+    GetTransactionResult, GetTransactionResultDetailCategory, ImportMultiOptions,
+    ImportMultiRequest, ImportMultiRequestScriptPubkey, ImportMultiRescanSince,
+    ListTransactionResult, ScanningDetails,
 };
 use bitcoincore_rpc::jsonrpc::serde_json::{json, Value};
 use bitcoincore_rpc::Auth as RpcAuth;
@@ -49,7 +51,8 @@ use log::debug;
 use serde::{Deserialize, Serialize};
 use std::collections::{HashMap, HashSet};
 use std::path::PathBuf;
-use std::str::FromStr;
+use std::thread;
+use std::time::Duration;
 
 /// The main struct for RPC backend implementing the [crate::blockchain::Blockchain] trait
 #[derive(Debug)]
@@ -60,11 +63,8 @@ pub struct RpcBlockchain {
     is_descriptors: bool,
     /// Blockchain capabilities, cached here at startup
     capabilities: HashSet<Capability>,
-    /// Skip this many blocks of the blockchain at the first rescan, if None the rescan is done from the genesis block
-    skip_blocks: Option<u32>,
-
-    /// This is a fixed Address used as a hack key to store information on the node
-    _storage_address: Address,
+    /// Sync parameters.
+    sync_params: RpcSyncParams,
 }
 
 /// RpcBlockchain configuration options
@@ -78,8 +78,33 @@ pub struct RpcConfig {
     pub network: Network,
     /// The wallet name in the bitcoin node, consider using [crate::wallet::wallet_name_from_descriptor] for this
     pub wallet_name: String,
-    /// Skip this many blocks of the blockchain at the first rescan, if None the rescan is done from the genesis block
-    pub skip_blocks: Option<u32>,
+    /// Sync parameters
+    pub sync_params: Option<RpcSyncParams>,
+}
+
+/// Sync parameters for Bitcoin Core RPC.
+///
+/// In general, BDK tries to sync `scriptPubKey`s cached in [`crate::database::Database`] with
+/// `scriptPubKey`s imported in the Bitcoin Core Wallet. These parameters are used for determining
+/// how the `importdescriptors` RPC calls are to be made.
+#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
+pub struct RpcSyncParams {
+    /// The minimum number of scripts to scan for on initial sync.
+    pub start_script_count: usize,
+    /// Time in unix seconds in which initial sync will start scanning from (0 to start from genesis).
+    pub start_time: u64,
+    /// RPC poll rate (in seconds) to get state updates.
+    pub poll_rate_sec: u64,
+}
+
+impl Default for RpcSyncParams {
+    fn default() -> Self {
+        Self {
+            start_script_count: 100,
+            start_time: 0,
+            poll_rate_sec: 3,
+        }
+    }
 }
 
 /// This struct is equivalent to [bitcoincore_rpc::Auth] but it implements [serde::Serialize]
@@ -115,27 +140,6 @@ impl From<Auth> for RpcAuth {
     }
 }
 
-impl RpcBlockchain {
-    fn get_node_synced_height(&self) -> Result<u32, Error> {
-        let info = self.client.get_address_info(&self._storage_address)?;
-        if let Some(GetAddressInfoResultLabel::Simple(label)) = info.labels.first() {
-            Ok(label
-                .parse::<u32>()
-                .unwrap_or_else(|_| self.skip_blocks.unwrap_or(0)))
-        } else {
-            Ok(self.skip_blocks.unwrap_or(0))
-        }
-    }
-
-    /// Set the synced height in the core node by using a label of a fixed address so that
-    /// another client with the same descriptor doesn't rescan the blockchain
-    fn set_node_synced_height(&self, height: u32) -> Result<(), Error> {
-        Ok(self
-            .client
-            .set_label(&self._storage_address, &height.to_string())?)
-    }
-}
-
 impl Blockchain for RpcBlockchain {
     fn get_capabilities(&self) -> HashSet<Capability> {
         self.capabilities.clone()
@@ -177,225 +181,53 @@ impl GetBlockHash for RpcBlockchain {
 
 impl WalletSync for RpcBlockchain {
     fn wallet_setup<D: BatchDatabase>(
-        &self,
-        database: &mut D,
-        progress_update: Box<dyn Progress>,
-    ) -> Result<(), Error> {
-        let mut scripts_pubkeys = database.iter_script_pubkeys(Some(KeychainKind::External))?;
-        scripts_pubkeys.extend(database.iter_script_pubkeys(Some(KeychainKind::Internal))?);
-        debug!(
-            "importing {} script_pubkeys (some maybe already imported)",
-            scripts_pubkeys.len()
-        );
-
-        if self.is_descriptors {
-            // Core still doesn't support complex descriptors like BDK, but when the wallet type is
-            // "descriptors" we should import individual addresses using `importdescriptors` rather
-            // than `importmulti`, using the `raw()` descriptor which allows us to specify an
-            // arbitrary script
-            let requests = Value::Array(
-                scripts_pubkeys
-                    .iter()
-                    .map(|s| {
-                        let desc = format!("raw({})", s.to_hex());
-                        json!({
-                            "timestamp": "now",
-                            "desc": format!("{}#{}", desc, get_checksum(&desc).unwrap()),
-                        })
-                    })
-                    .collect(),
-            );
-
-            let res: Vec<Value> = self.client.call("importdescriptors", &[requests])?;
-            res.into_iter()
-                .map(|v| match v["success"].as_bool() {
-                    Some(true) => Ok(()),
-                    Some(false) => Err(Error::Generic(
-                        v["error"]["message"]
-                            .as_str()
-                            .unwrap_or("Unknown error")
-                            .to_string(),
-                    )),
-                    _ => Err(Error::Generic("Unexpected response from Core".to_string())),
-                })
-                .collect::<Result<Vec<_>, _>>()?;
-        } else {
-            let requests: Vec<_> = scripts_pubkeys
-                .iter()
-                .map(|s| ImportMultiRequest {
-                    timestamp: ImportMultiRescanSince::Timestamp(0),
-                    script_pubkey: Some(ImportMultiRequestScriptPubkey::Script(s)),
-                    watchonly: Some(true),
-                    ..Default::default()
-                })
-                .collect();
-            let options = ImportMultiOptions {
-                rescan: Some(false),
-            };
-            self.client.import_multi(&requests, Some(&options))?;
-        }
-
-        loop {
-            let current_height = self.get_height()?;
-
-            // min because block invalidate may cause height to go down
-            let node_synced = self.get_node_synced_height()?.min(current_height);
-
-            let sync_up_to = node_synced.saturating_add(10_000).min(current_height);
-
-            debug!("rescan_blockchain from:{} to:{}", node_synced, sync_up_to);
-            self.client
-                .rescan_blockchain(Some(node_synced as usize), Some(sync_up_to as usize))?;
-            progress_update.update((sync_up_to as f32) / (current_height as f32), None)?;
-
-            self.set_node_synced_height(sync_up_to)?;
-
-            if sync_up_to == current_height {
-                break;
-            }
-        }
-
-        self.wallet_sync(database, progress_update)
-    }
-
-    fn wallet_sync<D: BatchDatabase>(
         &self,
         db: &mut D,
-        _progress_update: Box<dyn Progress>,
+        progress_update: Box<dyn Progress>,
     ) -> Result<(), Error> {
-        let mut indexes = HashMap::new();
-        for keykind in &[KeychainKind::External, KeychainKind::Internal] {
-            indexes.insert(*keykind, db.get_last_index(*keykind)?.unwrap_or(0));
+        let db_scripts = db.iter_script_pubkeys(None)?;
+
+        // this is a hack to check whether the scripts are coming from a derivable descriptor
+        // we assume for non-derivable descriptors, the initial script count is always 1
+        let is_derivable = db_scripts.len() > 1;
+
+        // ensure db scripts meet start script count requirements
+        if is_derivable && db_scripts.len() < self.sync_params.start_script_count {
+            return Err(Error::MissingCachedScripts(MissingCachedScripts {
+                last_count: db_scripts.len(),
+                missing_count: self.sync_params.start_script_count - db_scripts.len(),
+            }));
         }
 
-        let mut known_txs: HashMap<_, _> = db
-            .iter_txs(true)?
-            .into_iter()
-            .map(|tx| (tx.txid, tx))
-            .collect();
-        let known_utxos: HashSet<_> = db.iter_utxos()?.into_iter().collect();
-
-        //TODO list_since_blocks would be more efficient
-        let current_utxo = self
-            .client
-            .list_unspent(Some(0), None, None, Some(true), None)?;
-        debug!("current_utxo len {}", current_utxo.len());
+        // this tells Core wallet where to sync from for imported scripts
+        let start_epoch = db
+            .get_sync_time()?
+            .map_or(self.sync_params.start_time, |st| st.block_time.timestamp);
 
-        //TODO supported up to 1_000 txs, should use since_blocks or do paging
-        let list_txs = self
-            .client
-            .list_transactions(None, Some(1_000), None, Some(true))?;
-        let mut list_txs_ids = HashSet::new();
-
-        for tx_result in list_txs.iter().filter(|t| {
-            // list_txs returns all conflicting txs, we want to
-            // filter out replaced tx => unconfirmed and not in the mempool
-            t.info.confirmations > 0 || self.client.get_mempool_entry(&t.info.txid).is_ok()
-        }) {
-            let txid = tx_result.info.txid;
-            list_txs_ids.insert(txid);
-            if let Some(mut known_tx) = known_txs.get_mut(&txid) {
-                let confirmation_time =
-                    BlockTime::new(tx_result.info.blockheight, tx_result.info.blocktime);
-                if confirmation_time != known_tx.confirmation_time {
-                    // reorg may change tx height
-                    debug!(
-                        "updating tx({}) confirmation time to: {:?}",
-                        txid, confirmation_time
-                    );
-                    known_tx.confirmation_time = confirmation_time;
-                    db.set_tx(known_tx)?;
-                }
-            } else {
-                //TODO check there is already the raw tx in db?
-                let tx_result = self.client.get_transaction(&txid, Some(true))?;
-                let tx: Transaction = deserialize(&tx_result.hex)?;
-                let mut received = 0u64;
-                let mut sent = 0u64;
-                for output in tx.output.iter() {
-                    if let Ok(Some((kind, index))) =
-                        db.get_path_from_script_pubkey(&output.script_pubkey)
-                    {
-                        if index > *indexes.get(&kind).unwrap() {
-                            indexes.insert(kind, index);
-                        }
-                        received += output.value;
-                    }
-                }
-
-                for input in tx.input.iter() {
-                    if let Some(previous_output) = db.get_previous_output(&input.previous_output)? {
-                        if db.is_mine(&previous_output.script_pubkey)? {
-                            sent += previous_output.value;
-                        }
-                    }
-                }
-
-                let td = TransactionDetails {
-                    transaction: Some(tx),
-                    txid: tx_result.info.txid,
-                    confirmation_time: BlockTime::new(
-                        tx_result.info.blockheight,
-                        tx_result.info.blocktime,
-                    ),
-                    received,
-                    sent,
-                    fee: tx_result.fee.map(|f| f.as_sat().unsigned_abs()),
-                };
-                debug!(
-                    "saving tx: {} tx_result.fee:{:?} td.fees:{:?}",
-                    td.txid, tx_result.fee, td.fee
-                );
-                db.set_tx(&td)?;
-            }
-        }
-
-        for known_txid in known_txs.keys() {
-            if !list_txs_ids.contains(known_txid) {
-                debug!("removing tx: {}", known_txid);
-                db.del_tx(known_txid, false)?;
-            }
+        // import all scripts from db into Core wallet
+        if self.is_descriptors {
+            import_descriptors(&self.client, start_epoch, db_scripts.iter())?;
+        } else {
+            import_multi(&self.client, start_epoch, db_scripts.iter())?;
         }
 
-        // Filter out trasactions that are for script pubkeys that aren't in this wallet.
-        let current_utxos = current_utxo
-            .into_iter()
-            .filter_map(
-                |u| match db.get_path_from_script_pubkey(&u.script_pub_key) {
-                    Err(e) => Some(Err(e)),
-                    Ok(None) => None,
-                    Ok(Some(path)) => Some(Ok(LocalUtxo {
-                        outpoint: OutPoint::new(u.txid, u.vout),
-                        keychain: path.0,
-                        txout: TxOut {
-                            value: u.amount.as_sat(),
-                            script_pubkey: u.script_pub_key,
-                        },
-                        is_spent: false,
-                    })),
-                },
-            )
-            .collect::<Result<HashSet<_>, Error>>()?;
+        // await sync (TODO: Maybe make this async)
+        await_wallet_scan(
+            &self.client,
+            self.sync_params.poll_rate_sec,
+            &*progress_update,
+        )?;
 
-        let spent: HashSet<_> = known_utxos.difference(&current_utxos).collect();
-        for utxo in spent {
-            debug!("setting as spent utxo: {:?}", utxo);
-            let mut spent_utxo = utxo.clone();
-            spent_utxo.is_spent = true;
-            db.set_utxo(&spent_utxo)?;
-        }
-        let received: HashSet<_> = current_utxos.difference(&known_utxos).collect();
-        for utxo in received {
-            debug!("adding utxo: {:?}", utxo);
-            db.set_utxo(utxo)?;
-        }
+        // begin db batch updates
+        let mut db_batch = db.begin_batch();
 
-        for (keykind, index) in indexes {
-            debug!("{:?} max {}", keykind, index);
-            db.set_last_index(keykind, index)?;
-        }
+        // update batch: obtain db state then update state with core txids
+        DbState::from_db(db)?
+            .update_state(&self.client, db)?
+            .update_batch::<D>(&mut db_batch)?;
 
-        Ok(())
+        // apply batch updates to db
+        db.commit_batch(db_batch)
     }
 }
 
@@ -464,17 +296,11 @@ impl ConfigurableBlockchain for RpcBlockchain {
             }
         }
 
-        // this is just a fixed address used only to store a label containing the synced height in the node
-        let mut storage_address =
-            Address::from_str("bc1qst0rewf0wm4kw6qn6kv0e5tc56nkf9yhcxlhqv").unwrap();
-        storage_address.network = network;
-
         Ok(RpcBlockchain {
             client,
             capabilities,
             is_descriptors,
-            _storage_address: storage_address,
-            skip_blocks: config.skip_blocks,
+            sync_params: config.sync_params.clone().unwrap_or_default(),
         })
     }
 }
@@ -495,6 +321,461 @@ fn list_wallet_dir(client: &Client) -> Result<Vec<String>, Error> {
     Ok(result.wallets.into_iter().map(|n| n.name).collect())
 }
 
+/// Represents the state of the [`crate::database::Database`].
+struct DbState {
+    txs: HashMap<Txid, TransactionDetails>,
+    utxos: HashSet<LocalUtxo>,
+    last_indexes: HashMap<KeychainKind, u32>,
+
+    // "deltas" to apply to database
+    retained_txs: HashSet<Txid>, // txs to retain (everything else should be deleted)
+    updated_txs: HashSet<Txid>,  // txs to update
+    updated_utxos: HashSet<LocalUtxo>, // utxos to update
+    updated_last_indexes: HashSet<KeychainKind>,
+}
+
+impl DbState {
+    /// Obtain [DbState] from [crate::database::Database].
+    fn from_db<D: BatchDatabase>(db: &D) -> Result<Self, Error> {
+        let txs = db
+            .iter_txs(true)?
+            .into_iter()
+            .map(|tx| (tx.txid, tx))
+            .collect::<HashMap<_, _>>();
+        let utxos = db.iter_utxos()?.into_iter().collect::<HashSet<_>>();
+        let last_indexes = [KeychainKind::External, KeychainKind::Internal]
+            .iter()
+            .filter_map(|keychain| {
+                db.get_last_index(*keychain)
+                    .map(|v| v.map(|i| (*keychain, i)))
+                    .transpose()
+            })
+            .collect::<Result<HashMap<_, _>, Error>>()?;
+
+        let retained_txs = HashSet::with_capacity(txs.len());
+        let updated_txs = HashSet::with_capacity(txs.len());
+        let updated_utxos = HashSet::with_capacity(utxos.len());
+        let updated_last_indexes = HashSet::with_capacity(last_indexes.len());
+
+        Ok(Self {
+            txs,
+            utxos,
+            last_indexes,
+            retained_txs,
+            updated_txs,
+            updated_utxos,
+            updated_last_indexes,
+        })
+    }
+
+    /// Update [DbState] with Core wallet state
+    fn update_state<D>(&mut self, client: &Client, db: &D) -> Result<&mut Self, Error>
+    where
+        D: BatchDatabase,
+    {
+        let tx_iter = CoreTxIter::new(client, 10);
+
+        for tx_res in tx_iter {
+            let tx_res = tx_res?;
+
+            let mut updated = false;
+
+            let db_tx = self.txs.entry(tx_res.info.txid).or_insert_with(|| {
+                updated = true;
+                TransactionDetails {
+                    txid: tx_res.info.txid,
+                    ..Default::default()
+                }
+            });
+
+            // update raw tx (if needed)
+            let raw_tx =
+                match &db_tx.transaction {
+                    Some(raw_tx) => raw_tx,
+                    None => {
+                        updated = true;
+                        db_tx.transaction.insert(client.get_raw_transaction(
+                            &tx_res.info.txid,
+                            tx_res.info.blockhash.as_ref(),
+                        )?)
+                    }
+                };
+
+            // update fee (if needed)
+            if let (None, Some(new_fee)) = (db_tx.fee, tx_res.detail.fee) {
+                updated = true;
+                db_tx.fee = Some(new_fee.as_sat().unsigned_abs());
+            }
+
+            // update confirmation time (if needed)
+            let conf_time = BlockTime::new(tx_res.info.blockheight, tx_res.info.blocktime);
+            if db_tx.confirmation_time != conf_time {
+                updated = true;
+                db_tx.confirmation_time = conf_time;
+            }
+
+            // update received (if needed)
+            let received = Self::_received_from_raw_tx(db, raw_tx)?;
+            if db_tx.received != received {
+                updated = true;
+                db_tx.received = received;
+            }
+
+            // check if tx has an immature coinbase output (add to updated UTXOs)
+            // this is required because `listunspent` does not include immature coinbase outputs
+            if tx_res.detail.category == GetTransactionResultDetailCategory::Immature {
+                // let vout = tx_res.detail.vout;
+                // let txout = raw_tx.output.get(vout as usize).cloned().ok_or_else(|| {
+                //     Error::Generic(format!(
+                //         "Core RPC returned detail with invalid vout '{}' for tx '{}'",
+                //         vout, tx_res.info.txid,
+                //     ))
+                // })?;
+                // println!("got immature detail!");
+
+                // if let Some((keychain, _)) = db.get_path_from_script_pubkey(&txout.script_pubkey)? {
+                //     let utxo = LocalUtxo {
+                //         outpoint: OutPoint::new(tx_res.info.txid, d.vout),
+                //         txout,
+                //         keychain,
+                //         is_spent: false,
+                //     };
+                //     self.updated_utxos.insert(utxo);
+                // }
+            }
+
+            // update tx deltas
+            self.retained_txs.insert(tx_res.info.txid);
+            if updated {
+                self.updated_txs.insert(tx_res.info.txid);
+            }
+        }
+
+        // update sent from tx inputs
+        let sent_updates = self
+            .txs
+            .values()
+            .filter_map(|db_tx| {
+                let txid = self.retained_txs.get(&db_tx.txid)?;
+                self._sent_from_raw_tx(db, db_tx.transaction.as_ref()?)
+                    .map(|sent| {
+                        if db_tx.sent != sent {
+                            Some((*txid, sent))
+                        } else {
+                            None
+                        }
+                    })
+                    .transpose()
+            })
+            .collect::<Result<Vec<_>, _>>()?;
+
+        // record send updates
+        sent_updates.into_iter().for_each(|(txid, sent)| {
+            self.txs.entry(txid).and_modify(|db_tx| db_tx.sent = sent);
+            self.updated_txs.insert(txid);
+        });
+
+        // obtain UTXOs from Core wallet
+        let core_utxos = client
+            .list_unspent(Some(0), None, None, Some(true), None)?
+            .into_iter()
+            .filter_map(|utxo_res| {
+                db.get_path_from_script_pubkey(&utxo_res.script_pub_key)
+                    .transpose()
+                    .map(|v| {
+                        v.map(|(keychain, index)| {
+                            // update last index if needed
+                            self._update_last_index(keychain, index);
+
+                            LocalUtxo {
+                                outpoint: OutPoint::new(utxo_res.txid, utxo_res.vout),
+                                keychain,
+                                txout: TxOut {
+                                    value: utxo_res.amount.as_sat(),
+                                    script_pubkey: utxo_res.script_pub_key,
+                                },
+                                is_spent: false,
+                            }
+                        })
+                    })
+            })
+            .collect::<Result<HashSet<_>, Error>>()?;
+
+        // mark "spent utxos" to be updated in database
+        let spent_utxos = self.utxos.difference(&core_utxos).cloned().map(|mut utxo| {
+            utxo.is_spent = true;
+            utxo
+        });
+
+        // mark new utxos to be added in database
+        let new_utxos = core_utxos.difference(&self.utxos).cloned();
+
+        // add to updated utxos
+        self.updated_utxos = spent_utxos.chain(new_utxos).collect();
+
+        Ok(self)
+    }
+
+    /// We want to filter out conflicting transactions.
+    /// Only accept transactions that are already confirmed, or existing in mempool.
+    fn _filter_tx(client: &Client, res: GetTransactionResult) -> Option<GetTransactionResult> {
+        if res.info.confirmations > 0 || client.get_mempool_entry(&res.info.txid).is_ok() {
+            Some(res)
+        } else {
+            debug!("tx filtered: {}", res.info.txid);
+            None
+        }
+    }
+
+    /// Calculates received amount from raw tx.
+    fn _received_from_raw_tx<D: BatchDatabase>(db: &D, raw_tx: &Transaction) -> Result<u64, Error> {
+        raw_tx.output.iter().try_fold(0_u64, |recv, txo| {
+            let v = if db.is_mine(&txo.script_pubkey)? {
+                txo.value
+            } else {
+                0
+            };
+            Ok(recv + v)
+        })
+    }
+
+    /// Calculates sent from raw tx.
+    fn _sent_from_raw_tx<D: BatchDatabase>(
+        &self,
+        db: &D,
+        raw_tx: &Transaction,
+    ) -> Result<u64, Error> {
+        raw_tx.input.iter().try_fold(0_u64, |sent, txin| {
+            let v = match self._previous_output(&txin.previous_output) {
+                Some(prev_txo) => {
+                    if db.is_mine(&prev_txo.script_pubkey)? {
+                        prev_txo.value
+                    } else {
+                        0
+                    }
+                }
+                None => 0_u64,
+            };
+            Ok(sent + v)
+        })
+    }
+
+    fn _previous_output(&self, outpoint: &OutPoint) -> Option<&TxOut> {
+        let prev_tx = self.txs.get(&outpoint.txid)?.transaction.as_ref()?;
+        prev_tx.output.get(outpoint.vout as usize)
+    }
+
+    fn _update_last_index(&mut self, keychain: KeychainKind, index: u32) {
+        let mut updated = false;
+
+        self.last_indexes
+            .entry(keychain)
+            .and_modify(|last| {
+                if *last < index {
+                    updated = true;
+                    *last = index;
+                }
+            })
+            .or_insert_with(|| {
+                updated = true;
+                index
+            });
+
+        if updated {
+            self.updated_last_indexes.insert(keychain);
+        }
+    }
+
+    /// Prepare db batch operations.
+    fn update_batch<D: BatchDatabase>(&self, batch: &mut D::Batch) -> Result<(), Error> {
+        // delete stale txs from db
+        // stale = not retained
+        self.txs
+            .keys()
+            .filter(|&txid| !self.retained_txs.contains(txid))
+            .try_for_each(|txid| batch.del_tx(txid, false).map(|_| ()))?;
+
+        // update txs
+        self.updated_txs
+            .iter()
+            .filter_map(|txid| self.txs.get(txid))
+            .try_for_each(|txd| batch.set_tx(txd))?;
+
+        // update utxos
+        self.updated_utxos
+            .iter()
+            .try_for_each(|utxo| batch.set_utxo(utxo))?;
+
+        // update last indexes
+        self.updated_last_indexes
+            .iter()
+            .map(|keychain| self.last_indexes.get_key_value(keychain).unwrap())
+            .try_for_each(|(&keychain, &index)| batch.set_last_index(keychain, index))?;
+
+        Ok(())
+    }
+}
+
+fn import_descriptors<'a, S>(
+    client: &Client,
+    start_epoch: u64,
+    scripts_iter: S,
+) -> Result<(), Error>
+where
+    S: Iterator<Item = &'a Script>,
+{
+    let requests = Value::Array(
+        scripts_iter
+            .map(|script| {
+                let desc = descriptor_from_script_pubkey(script);
+                json!({ "timestamp": start_epoch, "desc": desc })
+            })
+            .collect(),
+    );
+    for v in client.call::<Vec<Value>>("importdescriptors", &[requests])? {
+        match v["success"].as_bool() {
+            Some(true) => continue,
+            Some(false) => {
+                return Err(Error::Generic(
+                    v["error"]["message"]
+                        .as_str()
+                        .map_or("unknown error".into(), ToString::to_string),
+                ))
+            }
+            _ => return Err(Error::Generic("Unexpected response form Core".to_string())),
+        }
+    }
+    Ok(())
+}
+
+fn import_multi<'a, S>(client: &Client, start_epoch: u64, scripts_iter: S) -> Result<(), Error>
+where
+    S: Iterator<Item = &'a Script>,
+{
+    let requests = scripts_iter
+        .map(|script| ImportMultiRequest {
+            timestamp: ImportMultiRescanSince::Timestamp(start_epoch),
+            script_pubkey: Some(ImportMultiRequestScriptPubkey::Script(script)),
+            watchonly: Some(true),
+            ..Default::default()
+        })
+        .collect::<Vec<_>>();
+    let options = ImportMultiOptions { rescan: Some(true) };
+    for v in client.import_multi(&requests, Some(&options))? {
+        if let Some(err) = v.error {
+            return Err(Error::Generic(format!(
+                "{} (code: {})",
+                err.message, err.code
+            )));
+        }
+    }
+    Ok(())
+}
+
+struct CoreTxIter<'a> {
+    client: &'a Client,
+    page_size: usize,
+    page_index: usize,
+
+    stack: Vec<ListTransactionResult>,
+    done: bool,
+}
+
+impl<'a> CoreTxIter<'a> {
+    fn new(client: &'a Client, page_size: usize) -> Self {
+        Self {
+            client,
+            page_size,
+            page_index: 0,
+            stack: Vec::with_capacity(page_size),
+            done: false,
+        }
+    }
+
+    /// We want to filter out conflicting transactions.
+    /// Only accept transactions that are already confirmed, or existing in mempool.
+    fn tx_ok(&self, item: &ListTransactionResult) -> bool {
+        item.info.confirmations > 0 || self.client.get_mempool_entry(&item.info.txid).is_ok()
+    }
+}
+
+impl<'a> Iterator for CoreTxIter<'a> {
+    type Item = Result<ListTransactionResult, Error>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        loop {
+            if self.done {
+                return None;
+            }
+
+            if let Some(item) = self.stack.pop() {
+                if self.tx_ok(&item) {
+                    return Some(Ok(item));
+                }
+            }
+
+            let res = self
+                .client
+                .list_transactions(
+                    None,
+                    Some(self.page_size),
+                    Some(self.page_size * self.page_index),
+                    Some(true),
+                )
+                .map_err(Error::Rpc);
+
+            self.page_index += 1;
+
+            let list = match res {
+                Ok(list) => list,
+                Err(err) => {
+                    self.done = true;
+                    return Some(Err(err));
+                }
+            };
+
+            if list.is_empty() {
+                self.done = true;
+                return None;
+            }
+
+            self.stack = list;
+        }
+    }
+}
+
+fn get_scanning_details(client: &Client) -> Result<ScanningDetails, Error> {
+    #[derive(Deserialize)]
+    struct CallResult {
+        scanning: ScanningDetails,
+    }
+    let result: CallResult = client.call("getwalletinfo", &[])?;
+    Ok(result.scanning)
+}
+
+fn await_wallet_scan(
+    client: &Client,
+    poll_rate_sec: u64,
+    progress_update: &dyn Progress,
+) -> Result<(), Error> {
+    let dur = Duration::from_secs(poll_rate_sec);
+    loop {
+        match get_scanning_details(client)? {
+            ScanningDetails::Scanning { duration, progress } => {
+                println!("scanning: duration={}, progress={}", duration, progress);
+                progress_update
+                    .update(progress, Some(format!("elapsed for {} seconds", duration)))?;
+                thread::sleep(dur);
+            }
+            ScanningDetails::NotScanning(_) => {
+                progress_update.update(1.0, None)?;
+                println!("scanning: done!");
+                return Ok(());
+            }
+        };
+    }
+}
+
 /// Returns whether a wallet is legacy or descriptors by calling `getwalletinfo`.
 ///
 /// This API is mapped by bitcoincore_rpc, but it doesn't have the fields we need (either
@@ -509,6 +790,11 @@ fn is_wallet_descriptor(client: &Client) -> Result<bool, Error> {
     Ok(result.descriptors.unwrap_or(false))
 }
 
+fn descriptor_from_script_pubkey(script: &Script) -> String {
+    let desc = format!("raw({})", script.to_hex());
+    format!("{}#{}", desc, get_checksum(&desc).unwrap())
+}
+
 /// Factory of [`RpcBlockchain`] instances, implements [`BlockchainFactory`]
 ///
 /// Internally caches the node url and authentication params and allows getting many different [`RpcBlockchain`]
@@ -529,6 +815,7 @@ fn is_wallet_descriptor(client: &Client) -> Result<bool, Error> {
 ///     network: Network::Testnet,
 ///     wallet_name_prefix: Some("prefix-".to_string()),
 ///     default_skip_blocks: 100_000,
+///     sync_params: None,
 /// };
 /// let main_wallet_blockchain = factory.build("main_wallet", Some(200_000))?;
 /// # Ok(())
@@ -546,6 +833,8 @@ pub struct RpcBlockchainFactory {
     pub wallet_name_prefix: Option<String>,
     /// Default number of blocks to skip which will be inherited by blockchain unless overridden
     pub default_skip_blocks: u32,
+    /// Sync parameters
+    pub sync_params: Option<RpcSyncParams>,
 }
 
 impl BlockchainFactory for RpcBlockchainFactory {
@@ -554,7 +843,7 @@ impl BlockchainFactory for RpcBlockchainFactory {
     fn build(
         &self,
         checksum: &str,
-        override_skip_blocks: Option<u32>,
+        _override_skip_blocks: Option<u32>,
     ) -> Result<Self::Inner, Error> {
         RpcBlockchain::from_config(&RpcConfig {
             url: self.url.clone(),
@@ -565,7 +854,7 @@ impl BlockchainFactory for RpcBlockchainFactory {
                 self.wallet_name_prefix.as_ref().unwrap_or(&String::new()),
                 checksum
             ),
-            skip_blocks: Some(override_skip_blocks.unwrap_or(self.default_skip_blocks)),
+            sync_params: self.sync_params.clone(),
         })
     }
 }
@@ -586,7 +875,7 @@ mod test {
                 auth: Auth::Cookie { file: test_client.bitcoind.params.cookie_file.clone() },
                 network: Network::Regtest,
                 wallet_name: format!("client-wallet-test-{}", std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos() ),
-                skip_blocks: None,
+                sync_params: None,
             };
             RpcBlockchain::from_config(&config).unwrap()
         }
@@ -603,6 +892,7 @@ mod test {
             network: Network::Regtest,
             wallet_name_prefix: Some("prefix-".into()),
             default_skip_blocks: 0,
+            sync_params: None,
         };
 
         (test_client, factory)
@@ -613,7 +903,6 @@ mod test {
         let (_test_client, factory) = get_factory();
 
         let a = factory.build("aaaaaa", None).unwrap();
-        assert_eq!(a.skip_blocks, Some(0));
         assert_eq!(
             a.client
                 .get_wallet_info()
@@ -623,7 +912,6 @@ mod test {
         );
 
         let b = factory.build("bbbbbb", Some(100)).unwrap();
-        assert_eq!(b.skip_blocks, Some(100));
         assert_eq!(
             b.client
                 .get_wallet_info()
index 89e09133587c1e0a554de08a407c392aa99434e0..ed73a2994a3d651d05e759eea811ac35b0d6dc87 100644 (file)
@@ -1057,6 +1057,7 @@ macro_rules! bdk_blockchain_tests {
                 let (wallet, blockchain, _, mut test_client) = init_single_sig();
 
                 let wallet_addr = wallet.get_address($crate::wallet::AddressIndex::New).unwrap().address;
+                println!("wallet addr: {}", wallet_addr);
 
                 wallet.sync(&blockchain, SyncOptions::default()).unwrap();
                 assert_eq!(wallet.get_balance().unwrap(), 0, "incorrect balance");
@@ -1070,7 +1071,6 @@ macro_rules! bdk_blockchain_tests {
                     test_client.generate(100, Some(node_addr));
                 }
 
-
                 wallet.sync(&blockchain, SyncOptions::default()).unwrap();
                 assert!(wallet.get_balance().unwrap() > 0, "incorrect balance after receiving coinbase");
             }
@@ -1267,7 +1267,7 @@ macro_rules! bdk_blockchain_tests {
                 wallet.sync(&blockchain, SyncOptions::default()).unwrap();
 
                 let _ = test_client.receive(testutils! {
-                    @tx ( (@external descriptors, 0)   => 50_000 )
+                    @tx ( (@external descriptors, 0) => 50_000 )
                 });
 
                 wallet.sync(&blockchain, SyncOptions::default()).unwrap();