/// Returns RpcBlockchain backend creating an RPC client to a specific wallet named as the descriptor's checksum
/// if it's the first time it creates the wallet in the node and upon return is granted the wallet is loaded
fn from_config(config: &Self::Config) -> Result<Self, Error> {
- let wallet_name = config.wallet_name.clone();
- let wallet_url = format!("{}/wallet/{}", config.url, &wallet_name);
- debug!("connecting to {} auth:{:?}", wallet_url, config.auth);
+ let wallet_url = format!("{}/wallet/{}", config.url, &config.wallet_name);
let client = Client::new(wallet_url.as_str(), config.auth.clone().into())?;
let rpc_version = client.version()?;
- let loaded_wallets = client.list_wallets()?;
- if loaded_wallets.contains(&wallet_name) {
- debug!("wallet already loaded {:?}", wallet_name);
- } else if list_wallet_dir(&client)?.contains(&wallet_name) {
- client.load_wallet(&wallet_name)?;
- debug!("wallet loaded {:?}", wallet_name);
+ info!("connected to '{}' with auth: {:?}", wallet_url, config.auth);
+
+ if client.list_wallets()?.contains(&config.wallet_name) {
+ info!("wallet already loaded: {}", config.wallet_name);
+ } else if list_wallet_dir(&client)?.contains(&config.wallet_name) {
+ client.load_wallet(&config.wallet_name)?;
+ info!("wallet loaded: {}", config.wallet_name);
} else {
// pre-0.21 use legacy wallets
if rpc_version < 210_000 {
- client.create_wallet(&wallet_name, Some(true), None, None, None)?;
+ client.create_wallet(&config.wallet_name, Some(true), None, None, None)?;
} else {
// TODO: move back to api call when https://github.com/rust-bitcoin/rust-bitcoincore-rpc/issues/225 is closed
let args = [
- Value::String(wallet_name.clone()),
+ Value::String(config.wallet_name.clone()),
Value::Bool(true),
Value::Bool(false),
Value::Null,
let _: Value = client.call("createwallet", &args)?;
}
- debug!("wallet created {:?}", wallet_name);
+ info!("wallet created: {}", config.wallet_name);
}
let is_descriptors = is_wallet_descriptor(&client)?;
// wait for Core wallet to rescan (TODO: maybe make this async)
await_wallet_scan(client, self.params.poll_rate_sec, self.prog)?;
- // loop through results of Core RPC method `listtransactions`
- for tx_res in CoreTxIter::new(client, 100) {
- let tx_res = tx_res?;
+ // obtain iterator of pagenated `listtransactions` RPC calls
+ const LIST_TX_PAGE_SIZE: usize = 100; // item count per page
+ let tx_iter = list_transactions(client, LIST_TX_PAGE_SIZE)?.filter(|item| {
+ // filter out conflicting transactions - only accept transactions that are already
+ // confirmed, or exists in mempool
+ item.info.confirmations > 0 || client.get_mempool_entry(&item.info.txid).is_ok()
+ });
+
+ // iterate through chronological results of `listtransactions`
+ for tx_res in tx_iter {
let mut updated = false;
let db_tx = self.txs.entry(tx_res.info.txid).or_insert_with(|| {
Ok(())
}
-/// Iterates through results of multiple `listtransactions` calls.
-struct CoreTxIter<'a> {
- client: &'a Client,
+/// Calls the `listtransactions` RPC method in `page_size`s and returns iterator of the tx results
+/// in chronological order.
+///
+/// `page_size` cannot be less than 1 and cannot be greater than 1000.
+fn list_transactions(
+ client: &Client,
page_size: usize,
- page_index: usize,
-
- stack: Vec<ListTransactionResult>,
- done: bool,
-}
-
-impl<'a> CoreTxIter<'a> {
- fn new(client: &'a Client, mut page_size: usize) -> Self {
- if page_size > 1000 {
- page_size = 1000;
- }
-
- Self {
- client,
- page_size,
- page_index: 0,
- stack: Vec::with_capacity(page_size),
- done: false,
- }
- }
-
- /// We want to filter out conflicting transactions.
- /// Only accept transactions that are already confirmed, or existing in mempool.
- fn keep_tx(&self, item: &ListTransactionResult) -> bool {
- item.info.confirmations > 0 || self.client.get_mempool_entry(&item.info.txid).is_ok()
+) -> Result<impl Iterator<Item = ListTransactionResult>, Error> {
+ if !(1..=1000).contains(&page_size) {
+ return Err(Error::Generic(format!(
+ "Core RPC method `listtransactions` must have `page_size` in range [1 to 1000]: got {}",
+ page_size
+ )));
}
-}
-
-impl<'a> Iterator for CoreTxIter<'a> {
- type Item = Result<ListTransactionResult, Error>;
-
- fn next(&mut self) -> Option<Self::Item> {
- loop {
- if self.done {
- return None;
- }
-
- if let Some(item) = self.stack.pop() {
- if self.keep_tx(&item) {
- return Some(Ok(item));
- }
- }
- let res = self
- .client
- .list_transactions(
- None,
- Some(self.page_size),
- Some(self.page_size * self.page_index),
- Some(true),
- )
- .map_err(Error::Rpc);
-
- self.page_index += 1;
-
- let list = match res {
- Ok(list) => list,
- Err(err) => {
- self.done = true;
- return Some(Err(err));
- }
- };
-
- if list.is_empty() {
- self.done = true;
- return None;
+ // `.take_while` helper to obtain the first error (TODO: remove when we can use `.map_while`)
+ let mut got_err = false;
+
+ // obtain results in batches (of `page_size`)
+ let nested_list = (0_usize..)
+ .map(|page_index| {
+ client.list_transactions(
+ None,
+ Some(page_size),
+ Some(page_size * page_index),
+ Some(true),
+ )
+ })
+ // take until returned rpc call is empty or until error
+ // TODO: replace with the following when MSRV is 1.57.0:
+ // `.map_while(|res| res.map(|l| if l.is_empty() { None } else { Some(l) }).transpose())`
+ .take_while(|res| {
+ if got_err || matches!(res, Ok(list) if list.is_empty()) {
+ // break if last iteration was an error, or if the current result is empty
+ false
+ } else {
+ // record whether result is error or not
+ got_err = res.is_err();
+ // continue on non-empty result or first error
+ true
}
+ })
+ .collect::<Result<Vec<_>, _>>()
+ .map_err(Error::Rpc)?;
- self.stack = list;
- }
- }
+ // reverse here to have txs in chronological order
+ Ok(nested_list.into_iter().rev().flatten())
}
fn await_wallet_scan(client: &Client, rate_sec: u64, progress: &dyn Progress) -> Result<(), Error> {
#[cfg(any(feature = "test-rpc", feature = "test-rpc-legacy"))]
mod test {
use super::*;
- use crate::testutils::blockchain_tests::TestClient;
+ use crate::{
+ descriptor::{into_wallet_descriptor_checked, AsDerived},
+ testutils::blockchain_tests::TestClient,
+ wallet::utils::SecpCtx,
+ };
- use bitcoin::Network;
+ use bitcoin::{Address, Network};
use bitcoincore_rpc::RpcApi;
+ use log::LevelFilter;
+ use miniscript::DescriptorTrait;
crate::bdk_blockchain_tests! {
fn test_instance(test_client: &TestClient) -> RpcBlockchain {
"prefix-bbbbbb"
);
}
+
+ /// This test ensures that [list_transactions] always iterates through transactions in
+ /// chronological order, independent of the `page_size`.
+ #[test]
+ fn test_list_transactions() {
+ let _ = env_logger::builder()
+ .filter_level(LevelFilter::Info)
+ .default_format()
+ .try_init();
+
+ const DESC: &'static str = "wpkh(tpubD9zMNV59kgbWgKK55SHJugmKKSt6wQXczxpucGYqNKwGmJp1x7Ar2nrLUXYHDdCctXmyDoSCn2JVMzMUDfib3FaDhwxCEMUELoq19xLSx66/*)";
+ const AMOUNT_PER_TX: u64 = 10_000;
+ const TX_COUNT: u32 = 50;
+
+ let secp = SecpCtx::default();
+ let network = Network::Regtest;
+ let (desc, ..) = into_wallet_descriptor_checked(DESC, &secp, network).unwrap();
+
+ let (mut test_client, factory) = get_factory();
+ let bc = factory.build("itertest", None).unwrap();
+
+ // generate scripts (1 tx per script)
+ let scripts = (0..TX_COUNT)
+ .map(|index| desc.as_derived(index, &secp).script_pubkey())
+ .collect::<Vec<_>>();
+
+ // import scripts and wait
+ if bc.is_descriptors {
+ import_descriptors(&bc.client, 0, scripts.iter()).unwrap();
+ } else {
+ import_multi(&bc.client, 0, scripts.iter()).unwrap();
+ }
+ await_wallet_scan(&bc.client, 2, &NoopProgress).unwrap();
+
+ // create and broadcast txs
+ let expected_txids = scripts
+ .iter()
+ .map(|script| {
+ let addr = Address::from_script(script, network).unwrap();
+ let txid =
+ test_client.receive(testutils! { @tx ( (@addr addr) => AMOUNT_PER_TX ) });
+ test_client.generate(1, None);
+ txid
+ })
+ .collect::<Vec<_>>();
+
+ // iterate through different page sizes - should always return txs in chronological order
+ [1000, 1, 2, 6, 25, 49, 50].iter().for_each(|page_size| {
+ println!("trying with page_size: {}", page_size);
+
+ let txids = list_transactions(&bc.client, *page_size)
+ .unwrap()
+ .map(|res| res.info.txid)
+ .collect::<Vec<_>>();
+
+ assert_eq!(txids.len(), expected_txids.len());
+ assert_eq!(txids, expected_txids);
+ });
+ }
}