//! },
//! network: bdk::bitcoin::Network::Testnet,
//! wallet_name: "wallet_name".to_string(),
-//! skip_blocks: None,
+//! sync_params: None,
//! };
//! let blockchain = RpcBlockchain::from_config(&config);
//! ```
-use crate::bitcoin::consensus::deserialize;
use crate::bitcoin::hashes::hex::ToHex;
-use crate::bitcoin::{Address, Network, OutPoint, Transaction, TxOut, Txid};
+use crate::bitcoin::{Network, OutPoint, Transaction, TxOut, Txid};
use crate::blockchain::*;
-use crate::database::{BatchDatabase, DatabaseUtils};
+use crate::database::{BatchDatabase, BatchOperations, DatabaseUtils};
use crate::descriptor::get_checksum;
+use crate::error::MissingCachedScripts;
use crate::{BlockTime, Error, FeeRate, KeychainKind, LocalUtxo, TransactionDetails};
+use bitcoin::Script;
use bitcoincore_rpc::json::{
- GetAddressInfoResultLabel, ImportMultiOptions, ImportMultiRequest,
- ImportMultiRequestScriptPubkey, ImportMultiRescanSince,
+ GetTransactionResult, GetTransactionResultDetailCategory, ImportMultiOptions,
+ ImportMultiRequest, ImportMultiRequestScriptPubkey, ImportMultiRescanSince,
+ ListTransactionResult, ScanningDetails,
};
use bitcoincore_rpc::jsonrpc::serde_json::{json, Value};
use bitcoincore_rpc::Auth as RpcAuth;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
-use std::str::FromStr;
+use std::thread;
+use std::time::Duration;
/// The main struct for RPC backend implementing the [crate::blockchain::Blockchain] trait
#[derive(Debug)]
is_descriptors: bool,
/// Blockchain capabilities, cached here at startup
capabilities: HashSet<Capability>,
- /// Skip this many blocks of the blockchain at the first rescan, if None the rescan is done from the genesis block
- skip_blocks: Option<u32>,
-
- /// This is a fixed Address used as a hack key to store information on the node
- _storage_address: Address,
+ /// Sync parameters.
+ sync_params: RpcSyncParams,
}
/// RpcBlockchain configuration options
pub network: Network,
/// The wallet name in the bitcoin node, consider using [crate::wallet::wallet_name_from_descriptor] for this
pub wallet_name: String,
- /// Skip this many blocks of the blockchain at the first rescan, if None the rescan is done from the genesis block
- pub skip_blocks: Option<u32>,
+ /// Sync parameters
+ pub sync_params: Option<RpcSyncParams>,
+}
+
+/// Sync parameters for Bitcoin Core RPC.
+///
+/// In general, BDK tries to sync `scriptPubKey`s cached in [`crate::database::Database`] with
+/// `scriptPubKey`s imported in the Bitcoin Core Wallet. These parameters are used for determining
+/// how the `importdescriptors` RPC calls are to be made.
+#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
+pub struct RpcSyncParams {
+ /// The minimum number of scripts to scan for on initial sync.
+ pub start_script_count: usize,
+ /// Time in unix seconds in which initial sync will start scanning from (0 to start from genesis).
+ pub start_time: u64,
+ /// RPC poll rate (in seconds) to get state updates.
+ pub poll_rate_sec: u64,
+}
+
+impl Default for RpcSyncParams {
+ fn default() -> Self {
+ Self {
+ start_script_count: 100,
+ start_time: 0,
+ poll_rate_sec: 3,
+ }
+ }
}
/// This struct is equivalent to [bitcoincore_rpc::Auth] but it implements [serde::Serialize]
}
}
-impl RpcBlockchain {
- fn get_node_synced_height(&self) -> Result<u32, Error> {
- let info = self.client.get_address_info(&self._storage_address)?;
- if let Some(GetAddressInfoResultLabel::Simple(label)) = info.labels.first() {
- Ok(label
- .parse::<u32>()
- .unwrap_or_else(|_| self.skip_blocks.unwrap_or(0)))
- } else {
- Ok(self.skip_blocks.unwrap_or(0))
- }
- }
-
- /// Set the synced height in the core node by using a label of a fixed address so that
- /// another client with the same descriptor doesn't rescan the blockchain
- fn set_node_synced_height(&self, height: u32) -> Result<(), Error> {
- Ok(self
- .client
- .set_label(&self._storage_address, &height.to_string())?)
- }
-}
-
impl Blockchain for RpcBlockchain {
fn get_capabilities(&self) -> HashSet<Capability> {
self.capabilities.clone()
impl WalletSync for RpcBlockchain {
fn wallet_setup<D: BatchDatabase>(
- &self,
- database: &mut D,
- progress_update: Box<dyn Progress>,
- ) -> Result<(), Error> {
- let mut scripts_pubkeys = database.iter_script_pubkeys(Some(KeychainKind::External))?;
- scripts_pubkeys.extend(database.iter_script_pubkeys(Some(KeychainKind::Internal))?);
- debug!(
- "importing {} script_pubkeys (some maybe already imported)",
- scripts_pubkeys.len()
- );
-
- if self.is_descriptors {
- // Core still doesn't support complex descriptors like BDK, but when the wallet type is
- // "descriptors" we should import individual addresses using `importdescriptors` rather
- // than `importmulti`, using the `raw()` descriptor which allows us to specify an
- // arbitrary script
- let requests = Value::Array(
- scripts_pubkeys
- .iter()
- .map(|s| {
- let desc = format!("raw({})", s.to_hex());
- json!({
- "timestamp": "now",
- "desc": format!("{}#{}", desc, get_checksum(&desc).unwrap()),
- })
- })
- .collect(),
- );
-
- let res: Vec<Value> = self.client.call("importdescriptors", &[requests])?;
- res.into_iter()
- .map(|v| match v["success"].as_bool() {
- Some(true) => Ok(()),
- Some(false) => Err(Error::Generic(
- v["error"]["message"]
- .as_str()
- .unwrap_or("Unknown error")
- .to_string(),
- )),
- _ => Err(Error::Generic("Unexpected response from Core".to_string())),
- })
- .collect::<Result<Vec<_>, _>>()?;
- } else {
- let requests: Vec<_> = scripts_pubkeys
- .iter()
- .map(|s| ImportMultiRequest {
- timestamp: ImportMultiRescanSince::Timestamp(0),
- script_pubkey: Some(ImportMultiRequestScriptPubkey::Script(s)),
- watchonly: Some(true),
- ..Default::default()
- })
- .collect();
- let options = ImportMultiOptions {
- rescan: Some(false),
- };
- self.client.import_multi(&requests, Some(&options))?;
- }
-
- loop {
- let current_height = self.get_height()?;
-
- // min because block invalidate may cause height to go down
- let node_synced = self.get_node_synced_height()?.min(current_height);
-
- let sync_up_to = node_synced.saturating_add(10_000).min(current_height);
-
- debug!("rescan_blockchain from:{} to:{}", node_synced, sync_up_to);
- self.client
- .rescan_blockchain(Some(node_synced as usize), Some(sync_up_to as usize))?;
- progress_update.update((sync_up_to as f32) / (current_height as f32), None)?;
-
- self.set_node_synced_height(sync_up_to)?;
-
- if sync_up_to == current_height {
- break;
- }
- }
-
- self.wallet_sync(database, progress_update)
- }
-
- fn wallet_sync<D: BatchDatabase>(
&self,
db: &mut D,
- _progress_update: Box<dyn Progress>,
+ progress_update: Box<dyn Progress>,
) -> Result<(), Error> {
- let mut indexes = HashMap::new();
- for keykind in &[KeychainKind::External, KeychainKind::Internal] {
- indexes.insert(*keykind, db.get_last_index(*keykind)?.unwrap_or(0));
+ let db_scripts = db.iter_script_pubkeys(None)?;
+
+ // this is a hack to check whether the scripts are coming from a derivable descriptor
+ // we assume for non-derivable descriptors, the initial script count is always 1
+ let is_derivable = db_scripts.len() > 1;
+
+ // ensure db scripts meet start script count requirements
+ if is_derivable && db_scripts.len() < self.sync_params.start_script_count {
+ return Err(Error::MissingCachedScripts(MissingCachedScripts {
+ last_count: db_scripts.len(),
+ missing_count: self.sync_params.start_script_count - db_scripts.len(),
+ }));
}
- let mut known_txs: HashMap<_, _> = db
- .iter_txs(true)?
- .into_iter()
- .map(|tx| (tx.txid, tx))
- .collect();
- let known_utxos: HashSet<_> = db.iter_utxos()?.into_iter().collect();
-
- //TODO list_since_blocks would be more efficient
- let current_utxo = self
- .client
- .list_unspent(Some(0), None, None, Some(true), None)?;
- debug!("current_utxo len {}", current_utxo.len());
+ // this tells Core wallet where to sync from for imported scripts
+ let start_epoch = db
+ .get_sync_time()?
+ .map_or(self.sync_params.start_time, |st| st.block_time.timestamp);
- //TODO supported up to 1_000 txs, should use since_blocks or do paging
- let list_txs = self
- .client
- .list_transactions(None, Some(1_000), None, Some(true))?;
- let mut list_txs_ids = HashSet::new();
-
- for tx_result in list_txs.iter().filter(|t| {
- // list_txs returns all conflicting txs, we want to
- // filter out replaced tx => unconfirmed and not in the mempool
- t.info.confirmations > 0 || self.client.get_mempool_entry(&t.info.txid).is_ok()
- }) {
- let txid = tx_result.info.txid;
- list_txs_ids.insert(txid);
- if let Some(mut known_tx) = known_txs.get_mut(&txid) {
- let confirmation_time =
- BlockTime::new(tx_result.info.blockheight, tx_result.info.blocktime);
- if confirmation_time != known_tx.confirmation_time {
- // reorg may change tx height
- debug!(
- "updating tx({}) confirmation time to: {:?}",
- txid, confirmation_time
- );
- known_tx.confirmation_time = confirmation_time;
- db.set_tx(known_tx)?;
- }
- } else {
- //TODO check there is already the raw tx in db?
- let tx_result = self.client.get_transaction(&txid, Some(true))?;
- let tx: Transaction = deserialize(&tx_result.hex)?;
- let mut received = 0u64;
- let mut sent = 0u64;
- for output in tx.output.iter() {
- if let Ok(Some((kind, index))) =
- db.get_path_from_script_pubkey(&output.script_pubkey)
- {
- if index > *indexes.get(&kind).unwrap() {
- indexes.insert(kind, index);
- }
- received += output.value;
- }
- }
-
- for input in tx.input.iter() {
- if let Some(previous_output) = db.get_previous_output(&input.previous_output)? {
- if db.is_mine(&previous_output.script_pubkey)? {
- sent += previous_output.value;
- }
- }
- }
-
- let td = TransactionDetails {
- transaction: Some(tx),
- txid: tx_result.info.txid,
- confirmation_time: BlockTime::new(
- tx_result.info.blockheight,
- tx_result.info.blocktime,
- ),
- received,
- sent,
- fee: tx_result.fee.map(|f| f.as_sat().unsigned_abs()),
- };
- debug!(
- "saving tx: {} tx_result.fee:{:?} td.fees:{:?}",
- td.txid, tx_result.fee, td.fee
- );
- db.set_tx(&td)?;
- }
- }
-
- for known_txid in known_txs.keys() {
- if !list_txs_ids.contains(known_txid) {
- debug!("removing tx: {}", known_txid);
- db.del_tx(known_txid, false)?;
- }
+ // import all scripts from db into Core wallet
+ if self.is_descriptors {
+ import_descriptors(&self.client, start_epoch, db_scripts.iter())?;
+ } else {
+ import_multi(&self.client, start_epoch, db_scripts.iter())?;
}
- // Filter out trasactions that are for script pubkeys that aren't in this wallet.
- let current_utxos = current_utxo
- .into_iter()
- .filter_map(
- |u| match db.get_path_from_script_pubkey(&u.script_pub_key) {
- Err(e) => Some(Err(e)),
- Ok(None) => None,
- Ok(Some(path)) => Some(Ok(LocalUtxo {
- outpoint: OutPoint::new(u.txid, u.vout),
- keychain: path.0,
- txout: TxOut {
- value: u.amount.as_sat(),
- script_pubkey: u.script_pub_key,
- },
- is_spent: false,
- })),
- },
- )
- .collect::<Result<HashSet<_>, Error>>()?;
+ // await sync (TODO: Maybe make this async)
+ await_wallet_scan(
+ &self.client,
+ self.sync_params.poll_rate_sec,
+ &*progress_update,
+ )?;
- let spent: HashSet<_> = known_utxos.difference(¤t_utxos).collect();
- for utxo in spent {
- debug!("setting as spent utxo: {:?}", utxo);
- let mut spent_utxo = utxo.clone();
- spent_utxo.is_spent = true;
- db.set_utxo(&spent_utxo)?;
- }
- let received: HashSet<_> = current_utxos.difference(&known_utxos).collect();
- for utxo in received {
- debug!("adding utxo: {:?}", utxo);
- db.set_utxo(utxo)?;
- }
+ // begin db batch updates
+ let mut db_batch = db.begin_batch();
- for (keykind, index) in indexes {
- debug!("{:?} max {}", keykind, index);
- db.set_last_index(keykind, index)?;
- }
+ // update batch: obtain db state then update state with core txids
+ DbState::from_db(db)?
+ .update_state(&self.client, db)?
+ .update_batch::<D>(&mut db_batch)?;
- Ok(())
+ // apply batch updates to db
+ db.commit_batch(db_batch)
}
}
}
}
- // this is just a fixed address used only to store a label containing the synced height in the node
- let mut storage_address =
- Address::from_str("bc1qst0rewf0wm4kw6qn6kv0e5tc56nkf9yhcxlhqv").unwrap();
- storage_address.network = network;
-
Ok(RpcBlockchain {
client,
capabilities,
is_descriptors,
- _storage_address: storage_address,
- skip_blocks: config.skip_blocks,
+ sync_params: config.sync_params.clone().unwrap_or_default(),
})
}
}
Ok(result.wallets.into_iter().map(|n| n.name).collect())
}
+/// Represents the state of the [`crate::database::Database`].
+struct DbState {
+ txs: HashMap<Txid, TransactionDetails>,
+ utxos: HashSet<LocalUtxo>,
+ last_indexes: HashMap<KeychainKind, u32>,
+
+ // "deltas" to apply to database
+ retained_txs: HashSet<Txid>, // txs to retain (everything else should be deleted)
+ updated_txs: HashSet<Txid>, // txs to update
+ updated_utxos: HashSet<LocalUtxo>, // utxos to update
+ updated_last_indexes: HashSet<KeychainKind>,
+}
+
+impl DbState {
+ /// Obtain [DbState] from [crate::database::Database].
+ fn from_db<D: BatchDatabase>(db: &D) -> Result<Self, Error> {
+ let txs = db
+ .iter_txs(true)?
+ .into_iter()
+ .map(|tx| (tx.txid, tx))
+ .collect::<HashMap<_, _>>();
+ let utxos = db.iter_utxos()?.into_iter().collect::<HashSet<_>>();
+ let last_indexes = [KeychainKind::External, KeychainKind::Internal]
+ .iter()
+ .filter_map(|keychain| {
+ db.get_last_index(*keychain)
+ .map(|v| v.map(|i| (*keychain, i)))
+ .transpose()
+ })
+ .collect::<Result<HashMap<_, _>, Error>>()?;
+
+ let retained_txs = HashSet::with_capacity(txs.len());
+ let updated_txs = HashSet::with_capacity(txs.len());
+ let updated_utxos = HashSet::with_capacity(utxos.len());
+ let updated_last_indexes = HashSet::with_capacity(last_indexes.len());
+
+ Ok(Self {
+ txs,
+ utxos,
+ last_indexes,
+ retained_txs,
+ updated_txs,
+ updated_utxos,
+ updated_last_indexes,
+ })
+ }
+
+ /// Update [DbState] with Core wallet state
+ fn update_state<D>(&mut self, client: &Client, db: &D) -> Result<&mut Self, Error>
+ where
+ D: BatchDatabase,
+ {
+ let tx_iter = CoreTxIter::new(client, 10);
+
+ for tx_res in tx_iter {
+ let tx_res = tx_res?;
+
+ let mut updated = false;
+
+ let db_tx = self.txs.entry(tx_res.info.txid).or_insert_with(|| {
+ updated = true;
+ TransactionDetails {
+ txid: tx_res.info.txid,
+ ..Default::default()
+ }
+ });
+
+ // update raw tx (if needed)
+ let raw_tx =
+ match &db_tx.transaction {
+ Some(raw_tx) => raw_tx,
+ None => {
+ updated = true;
+ db_tx.transaction.insert(client.get_raw_transaction(
+ &tx_res.info.txid,
+ tx_res.info.blockhash.as_ref(),
+ )?)
+ }
+ };
+
+ // update fee (if needed)
+ if let (None, Some(new_fee)) = (db_tx.fee, tx_res.detail.fee) {
+ updated = true;
+ db_tx.fee = Some(new_fee.as_sat().unsigned_abs());
+ }
+
+ // update confirmation time (if needed)
+ let conf_time = BlockTime::new(tx_res.info.blockheight, tx_res.info.blocktime);
+ if db_tx.confirmation_time != conf_time {
+ updated = true;
+ db_tx.confirmation_time = conf_time;
+ }
+
+ // update received (if needed)
+ let received = Self::_received_from_raw_tx(db, raw_tx)?;
+ if db_tx.received != received {
+ updated = true;
+ db_tx.received = received;
+ }
+
+ // check if tx has an immature coinbase output (add to updated UTXOs)
+ // this is required because `listunspent` does not include immature coinbase outputs
+ if tx_res.detail.category == GetTransactionResultDetailCategory::Immature {
+ // let vout = tx_res.detail.vout;
+ // let txout = raw_tx.output.get(vout as usize).cloned().ok_or_else(|| {
+ // Error::Generic(format!(
+ // "Core RPC returned detail with invalid vout '{}' for tx '{}'",
+ // vout, tx_res.info.txid,
+ // ))
+ // })?;
+ // println!("got immature detail!");
+
+ // if let Some((keychain, _)) = db.get_path_from_script_pubkey(&txout.script_pubkey)? {
+ // let utxo = LocalUtxo {
+ // outpoint: OutPoint::new(tx_res.info.txid, d.vout),
+ // txout,
+ // keychain,
+ // is_spent: false,
+ // };
+ // self.updated_utxos.insert(utxo);
+ // }
+ }
+
+ // update tx deltas
+ self.retained_txs.insert(tx_res.info.txid);
+ if updated {
+ self.updated_txs.insert(tx_res.info.txid);
+ }
+ }
+
+ // update sent from tx inputs
+ let sent_updates = self
+ .txs
+ .values()
+ .filter_map(|db_tx| {
+ let txid = self.retained_txs.get(&db_tx.txid)?;
+ self._sent_from_raw_tx(db, db_tx.transaction.as_ref()?)
+ .map(|sent| {
+ if db_tx.sent != sent {
+ Some((*txid, sent))
+ } else {
+ None
+ }
+ })
+ .transpose()
+ })
+ .collect::<Result<Vec<_>, _>>()?;
+
+ // record send updates
+ sent_updates.into_iter().for_each(|(txid, sent)| {
+ self.txs.entry(txid).and_modify(|db_tx| db_tx.sent = sent);
+ self.updated_txs.insert(txid);
+ });
+
+ // obtain UTXOs from Core wallet
+ let core_utxos = client
+ .list_unspent(Some(0), None, None, Some(true), None)?
+ .into_iter()
+ .filter_map(|utxo_res| {
+ db.get_path_from_script_pubkey(&utxo_res.script_pub_key)
+ .transpose()
+ .map(|v| {
+ v.map(|(keychain, index)| {
+ // update last index if needed
+ self._update_last_index(keychain, index);
+
+ LocalUtxo {
+ outpoint: OutPoint::new(utxo_res.txid, utxo_res.vout),
+ keychain,
+ txout: TxOut {
+ value: utxo_res.amount.as_sat(),
+ script_pubkey: utxo_res.script_pub_key,
+ },
+ is_spent: false,
+ }
+ })
+ })
+ })
+ .collect::<Result<HashSet<_>, Error>>()?;
+
+ // mark "spent utxos" to be updated in database
+ let spent_utxos = self.utxos.difference(&core_utxos).cloned().map(|mut utxo| {
+ utxo.is_spent = true;
+ utxo
+ });
+
+ // mark new utxos to be added in database
+ let new_utxos = core_utxos.difference(&self.utxos).cloned();
+
+ // add to updated utxos
+ self.updated_utxos = spent_utxos.chain(new_utxos).collect();
+
+ Ok(self)
+ }
+
+ /// We want to filter out conflicting transactions.
+ /// Only accept transactions that are already confirmed, or existing in mempool.
+ fn _filter_tx(client: &Client, res: GetTransactionResult) -> Option<GetTransactionResult> {
+ if res.info.confirmations > 0 || client.get_mempool_entry(&res.info.txid).is_ok() {
+ Some(res)
+ } else {
+ debug!("tx filtered: {}", res.info.txid);
+ None
+ }
+ }
+
+ /// Calculates received amount from raw tx.
+ fn _received_from_raw_tx<D: BatchDatabase>(db: &D, raw_tx: &Transaction) -> Result<u64, Error> {
+ raw_tx.output.iter().try_fold(0_u64, |recv, txo| {
+ let v = if db.is_mine(&txo.script_pubkey)? {
+ txo.value
+ } else {
+ 0
+ };
+ Ok(recv + v)
+ })
+ }
+
+ /// Calculates sent from raw tx.
+ fn _sent_from_raw_tx<D: BatchDatabase>(
+ &self,
+ db: &D,
+ raw_tx: &Transaction,
+ ) -> Result<u64, Error> {
+ raw_tx.input.iter().try_fold(0_u64, |sent, txin| {
+ let v = match self._previous_output(&txin.previous_output) {
+ Some(prev_txo) => {
+ if db.is_mine(&prev_txo.script_pubkey)? {
+ prev_txo.value
+ } else {
+ 0
+ }
+ }
+ None => 0_u64,
+ };
+ Ok(sent + v)
+ })
+ }
+
+ fn _previous_output(&self, outpoint: &OutPoint) -> Option<&TxOut> {
+ let prev_tx = self.txs.get(&outpoint.txid)?.transaction.as_ref()?;
+ prev_tx.output.get(outpoint.vout as usize)
+ }
+
+ fn _update_last_index(&mut self, keychain: KeychainKind, index: u32) {
+ let mut updated = false;
+
+ self.last_indexes
+ .entry(keychain)
+ .and_modify(|last| {
+ if *last < index {
+ updated = true;
+ *last = index;
+ }
+ })
+ .or_insert_with(|| {
+ updated = true;
+ index
+ });
+
+ if updated {
+ self.updated_last_indexes.insert(keychain);
+ }
+ }
+
+ /// Prepare db batch operations.
+ fn update_batch<D: BatchDatabase>(&self, batch: &mut D::Batch) -> Result<(), Error> {
+ // delete stale txs from db
+ // stale = not retained
+ self.txs
+ .keys()
+ .filter(|&txid| !self.retained_txs.contains(txid))
+ .try_for_each(|txid| batch.del_tx(txid, false).map(|_| ()))?;
+
+ // update txs
+ self.updated_txs
+ .iter()
+ .filter_map(|txid| self.txs.get(txid))
+ .try_for_each(|txd| batch.set_tx(txd))?;
+
+ // update utxos
+ self.updated_utxos
+ .iter()
+ .try_for_each(|utxo| batch.set_utxo(utxo))?;
+
+ // update last indexes
+ self.updated_last_indexes
+ .iter()
+ .map(|keychain| self.last_indexes.get_key_value(keychain).unwrap())
+ .try_for_each(|(&keychain, &index)| batch.set_last_index(keychain, index))?;
+
+ Ok(())
+ }
+}
+
+fn import_descriptors<'a, S>(
+ client: &Client,
+ start_epoch: u64,
+ scripts_iter: S,
+) -> Result<(), Error>
+where
+ S: Iterator<Item = &'a Script>,
+{
+ let requests = Value::Array(
+ scripts_iter
+ .map(|script| {
+ let desc = descriptor_from_script_pubkey(script);
+ json!({ "timestamp": start_epoch, "desc": desc })
+ })
+ .collect(),
+ );
+ for v in client.call::<Vec<Value>>("importdescriptors", &[requests])? {
+ match v["success"].as_bool() {
+ Some(true) => continue,
+ Some(false) => {
+ return Err(Error::Generic(
+ v["error"]["message"]
+ .as_str()
+ .map_or("unknown error".into(), ToString::to_string),
+ ))
+ }
+ _ => return Err(Error::Generic("Unexpected response form Core".to_string())),
+ }
+ }
+ Ok(())
+}
+
+fn import_multi<'a, S>(client: &Client, start_epoch: u64, scripts_iter: S) -> Result<(), Error>
+where
+ S: Iterator<Item = &'a Script>,
+{
+ let requests = scripts_iter
+ .map(|script| ImportMultiRequest {
+ timestamp: ImportMultiRescanSince::Timestamp(start_epoch),
+ script_pubkey: Some(ImportMultiRequestScriptPubkey::Script(script)),
+ watchonly: Some(true),
+ ..Default::default()
+ })
+ .collect::<Vec<_>>();
+ let options = ImportMultiOptions { rescan: Some(true) };
+ for v in client.import_multi(&requests, Some(&options))? {
+ if let Some(err) = v.error {
+ return Err(Error::Generic(format!(
+ "{} (code: {})",
+ err.message, err.code
+ )));
+ }
+ }
+ Ok(())
+}
+
+struct CoreTxIter<'a> {
+ client: &'a Client,
+ page_size: usize,
+ page_index: usize,
+
+ stack: Vec<ListTransactionResult>,
+ done: bool,
+}
+
+impl<'a> CoreTxIter<'a> {
+ fn new(client: &'a Client, page_size: usize) -> Self {
+ Self {
+ client,
+ page_size,
+ page_index: 0,
+ stack: Vec::with_capacity(page_size),
+ done: false,
+ }
+ }
+
+ /// We want to filter out conflicting transactions.
+ /// Only accept transactions that are already confirmed, or existing in mempool.
+ fn tx_ok(&self, item: &ListTransactionResult) -> bool {
+ item.info.confirmations > 0 || self.client.get_mempool_entry(&item.info.txid).is_ok()
+ }
+}
+
+impl<'a> Iterator for CoreTxIter<'a> {
+ type Item = Result<ListTransactionResult, Error>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ loop {
+ if self.done {
+ return None;
+ }
+
+ if let Some(item) = self.stack.pop() {
+ if self.tx_ok(&item) {
+ return Some(Ok(item));
+ }
+ }
+
+ let res = self
+ .client
+ .list_transactions(
+ None,
+ Some(self.page_size),
+ Some(self.page_size * self.page_index),
+ Some(true),
+ )
+ .map_err(Error::Rpc);
+
+ self.page_index += 1;
+
+ let list = match res {
+ Ok(list) => list,
+ Err(err) => {
+ self.done = true;
+ return Some(Err(err));
+ }
+ };
+
+ if list.is_empty() {
+ self.done = true;
+ return None;
+ }
+
+ self.stack = list;
+ }
+ }
+}
+
+fn get_scanning_details(client: &Client) -> Result<ScanningDetails, Error> {
+ #[derive(Deserialize)]
+ struct CallResult {
+ scanning: ScanningDetails,
+ }
+ let result: CallResult = client.call("getwalletinfo", &[])?;
+ Ok(result.scanning)
+}
+
+fn await_wallet_scan(
+ client: &Client,
+ poll_rate_sec: u64,
+ progress_update: &dyn Progress,
+) -> Result<(), Error> {
+ let dur = Duration::from_secs(poll_rate_sec);
+ loop {
+ match get_scanning_details(client)? {
+ ScanningDetails::Scanning { duration, progress } => {
+ println!("scanning: duration={}, progress={}", duration, progress);
+ progress_update
+ .update(progress, Some(format!("elapsed for {} seconds", duration)))?;
+ thread::sleep(dur);
+ }
+ ScanningDetails::NotScanning(_) => {
+ progress_update.update(1.0, None)?;
+ println!("scanning: done!");
+ return Ok(());
+ }
+ };
+ }
+}
+
/// Returns whether a wallet is legacy or descriptors by calling `getwalletinfo`.
///
/// This API is mapped by bitcoincore_rpc, but it doesn't have the fields we need (either
Ok(result.descriptors.unwrap_or(false))
}
+fn descriptor_from_script_pubkey(script: &Script) -> String {
+ let desc = format!("raw({})", script.to_hex());
+ format!("{}#{}", desc, get_checksum(&desc).unwrap())
+}
+
/// Factory of [`RpcBlockchain`] instances, implements [`BlockchainFactory`]
///
/// Internally caches the node url and authentication params and allows getting many different [`RpcBlockchain`]
/// network: Network::Testnet,
/// wallet_name_prefix: Some("prefix-".to_string()),
/// default_skip_blocks: 100_000,
+/// sync_params: None,
/// };
/// let main_wallet_blockchain = factory.build("main_wallet", Some(200_000))?;
/// # Ok(())
pub wallet_name_prefix: Option<String>,
/// Default number of blocks to skip which will be inherited by blockchain unless overridden
pub default_skip_blocks: u32,
+ /// Sync parameters
+ pub sync_params: Option<RpcSyncParams>,
}
impl BlockchainFactory for RpcBlockchainFactory {
fn build(
&self,
checksum: &str,
- override_skip_blocks: Option<u32>,
+ _override_skip_blocks: Option<u32>,
) -> Result<Self::Inner, Error> {
RpcBlockchain::from_config(&RpcConfig {
url: self.url.clone(),
self.wallet_name_prefix.as_ref().unwrap_or(&String::new()),
checksum
),
- skip_blocks: Some(override_skip_blocks.unwrap_or(self.default_skip_blocks)),
+ sync_params: self.sync_params.clone(),
})
}
}
auth: Auth::Cookie { file: test_client.bitcoind.params.cookie_file.clone() },
network: Network::Regtest,
wallet_name: format!("client-wallet-test-{}", std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos() ),
- skip_blocks: None,
+ sync_params: None,
};
RpcBlockchain::from_config(&config).unwrap()
}
network: Network::Regtest,
wallet_name_prefix: Some("prefix-".into()),
default_skip_blocks: 0,
+ sync_params: None,
};
(test_client, factory)
let (_test_client, factory) = get_factory();
let a = factory.build("aaaaaa", None).unwrap();
- assert_eq!(a.skip_blocks, Some(0));
assert_eq!(
a.client
.get_wallet_info()
);
let b = factory.build("bbbbbb", Some(100)).unwrap();
- assert_eq!(b.skip_blocks, Some(100));
assert_eq!(
b.client
.get_wallet_info()