Blockchain calls sync logic rather than the other way around.
Sync logic is captured in script_sync.rs.
//! # Ok::<(), bdk::Error>(())
//! ```
-use std::collections::HashSet;
+use std::collections::{HashMap, HashSet};
#[allow(unused_imports)]
use log::{debug, error, info, trace};
-use bitcoin::{BlockHeader, Script, Transaction, Txid};
+use bitcoin::{Transaction, Txid};
use electrum_client::{Client, ConfigBuilder, ElectrumApi, Socks5Config};
-use self::utils::{ElectrumLikeSync, ElsGetHistoryRes};
+use super::script_sync::Request;
use super::*;
-use crate::database::BatchDatabase;
+use crate::database::{BatchDatabase, Database};
use crate::error::Error;
-use crate::FeeRate;
+use crate::{ConfirmationTime, FeeRate};
/// Wrapper over an Electrum Client that implements the required blockchain traits
///
fn setup<D: BatchDatabase, P: Progress>(
&self,
database: &mut D,
- progress_update: P,
+ _progress_update: P,
) -> Result<(), Error> {
- self.client
- .electrum_like_setup(self.stop_gap, database, progress_update)
+ let mut request = script_sync::start(database, self.stop_gap)?;
+ let mut block_times = HashMap::<u32, u32>::new();
+ let mut txid_to_height = HashMap::<Txid, u32>::new();
+ let mut tx_cache = TxCache::new(database, &self.client);
+ let chunk_size = self.stop_gap;
+ // The electrum server has been inconsistent somehow in its responses during sync. For
+ // example, we do a batch request of transactions and the response contains less
+ // tranascations than in the request. This should never happen but we don't want to panic.
+ let electrum_goof = || Error::Generic("electrum server misbehaving".to_string());
+
+ let batch_update = loop {
+ request = match request {
+ Request::Script(script_req) => {
+ let scripts = script_req.request().take(chunk_size);
+ let txids_per_script: Vec<Vec<_>> = self
+ .client
+ .batch_script_get_history(scripts)
+ .map_err(Error::Electrum)?
+ .into_iter()
+ .map(|txs| {
+ txs.into_iter()
+ .map(|tx| {
+ let tx_height = match tx.height {
+ none if none <= 0 => None,
+ height => {
+ txid_to_height.insert(tx.tx_hash, height as u32);
+ Some(height as u32)
+ }
+ };
+ (tx.tx_hash, tx_height)
+ })
+ .collect()
+ })
+ .collect();
+
+ script_req.satisfy(txids_per_script)?
+ }
+
+ Request::Conftime(conftimereq) => {
+ let needs_block_height = conftimereq
+ .request()
+ .filter_map(|txid| txid_to_height.get(txid).cloned())
+ .filter(|height| block_times.get(height).is_none())
+ .take(chunk_size)
+ .collect::<HashSet<_>>();
+
+ let new_block_headers =
+ self.client.batch_block_header(needs_block_height.clone())?;
+
+ for (height, header) in needs_block_height.into_iter().zip(new_block_headers) {
+ block_times.insert(height, header.time);
+ }
+
+ let conftimes = conftimereq
+ .request()
+ .take(chunk_size)
+ .map(|txid| {
+ let confirmation_time = txid_to_height
+ .get(txid)
+ .map(|height| {
+ let timestamp =
+ *block_times.get(height).ok_or_else(electrum_goof)?;
+ Result::<_, Error>::Ok(ConfirmationTime {
+ height: *height,
+ timestamp: timestamp.into(),
+ })
+ })
+ .transpose()?;
+ Ok(confirmation_time)
+ })
+ .collect::<Result<_, Error>>()?;
+
+ conftimereq.satisfy(conftimes)?
+ }
+ Request::Tx(txreq) => {
+ let needs_block_height = txreq
+ .request()
+ .filter_map(|txid| txid_to_height.get(txid).cloned())
+ .filter(|height| block_times.get(height).is_none())
+ .take(chunk_size)
+ .collect::<HashSet<_>>();
+
+ let new_block_headers =
+ self.client.batch_block_header(needs_block_height.clone())?;
+ for (height, header) in needs_block_height.into_iter().zip(new_block_headers) {
+ block_times.insert(height, header.time);
+ }
+ let needs_full = txreq.request().take(chunk_size);
+
+ tx_cache.save_txs(needs_full.clone())?;
+ let full_transactions = needs_full
+ .map(|txid| tx_cache.get(*txid).ok_or_else(electrum_goof))
+ .collect::<Result<Vec<_>, _>>()?;
+ let input_txs = full_transactions.iter().flat_map(|tx| {
+ tx.input
+ .iter()
+ .filter(|input| !input.previous_output.is_null())
+ .map(|input| &input.previous_output.txid)
+ });
+ tx_cache.save_txs(input_txs)?;
+
+ let full_details = full_transactions
+ .into_iter()
+ .map(|tx| {
+ let confirmation_time = txid_to_height
+ .get(&tx.txid())
+ .map(|height| {
+ let time = block_times.get(height).ok_or_else(electrum_goof)?;
+ Result::<_, Error>::Ok(ConfirmationTime {
+ height: *height,
+ timestamp: *time as u64,
+ })
+ })
+ .transpose()?;
+ let prev_outputs = tx
+ .input
+ .iter()
+ .map(|input| {
+ if input.previous_output.is_null() {
+ return Ok(None);
+ }
+ let prev_tx = tx_cache
+ .get(input.previous_output.txid)
+ .ok_or_else(electrum_goof)?;
+ let txout = prev_tx
+ .output
+ .get(input.previous_output.vout as usize)
+ .ok_or_else(electrum_goof)?;
+ Ok(Some(txout.clone()))
+ })
+ .collect::<Result<Vec<_>, Error>>()?;
+ Ok((confirmation_time, prev_outputs, tx))
+ })
+ .collect::<Result<Vec<_>, Error>>()?;
+
+ txreq.satisfy(full_details)?
+ }
+ Request::Finish(batch_update) => break batch_update,
+ }
+ };
+
+ database.commit_batch(batch_update)?;
+ Ok(())
}
fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
}
}
-impl ElectrumLikeSync for Client {
- fn els_batch_script_get_history<'s, I: IntoIterator<Item = &'s Script> + Clone>(
- &self,
- scripts: I,
- ) -> Result<Vec<Vec<ElsGetHistoryRes>>, Error> {
- self.batch_script_get_history(scripts)
- .map(|v| {
- v.into_iter()
- .map(|v| {
- v.into_iter()
- .map(
- |electrum_client::GetHistoryRes {
- height, tx_hash, ..
- }| ElsGetHistoryRes {
- height,
- tx_hash,
- },
- )
- .collect()
- })
- .collect()
- })
- .map_err(Error::Electrum)
+struct TxCache<'a, 'b, D> {
+ db: &'a D,
+ client: &'b Client,
+ cache: HashMap<Txid, Transaction>,
+}
+
+impl<'a, 'b, D: Database> TxCache<'a, 'b, D> {
+ fn new(db: &'a D, client: &'b Client) -> Self {
+ TxCache {
+ db,
+ client,
+ cache: HashMap::default(),
+ }
}
+ fn save_txs<'c>(&mut self, txids: impl Iterator<Item = &'c Txid>) -> Result<(), Error> {
+ let mut need_fetch = vec![];
+ for txid in txids {
+ if self.cache.get(txid).is_some() {
+ continue;
+ } else if let Some(transaction) = self.db.get_raw_tx(txid)? {
+ self.cache.insert(*txid, transaction);
+ } else {
+ need_fetch.push(txid);
+ }
+ }
- fn els_batch_transaction_get<'s, I: IntoIterator<Item = &'s Txid> + Clone>(
- &self,
- txids: I,
- ) -> Result<Vec<Transaction>, Error> {
- self.batch_transaction_get(txids).map_err(Error::Electrum)
+ if !need_fetch.is_empty() {
+ let txs = self
+ .client
+ .batch_transaction_get(need_fetch.clone())
+ .map_err(Error::Electrum)?;
+ for (tx, _txid) in txs.into_iter().zip(need_fetch) {
+ debug_assert_eq!(*_txid, tx.txid());
+ self.cache.insert(tx.txid(), tx);
+ }
+ }
+
+ Ok(())
}
- fn els_batch_block_header<I: IntoIterator<Item = u32> + Clone>(
- &self,
- heights: I,
- ) -> Result<Vec<BlockHeader>, Error> {
- self.batch_block_header(heights).map_err(Error::Electrum)
+ fn get(&self, txid: Txid) -> Option<Transaction> {
+ self.cache.get(&txid).map(Clone::clone)
}
}
--- /dev/null
+//! structs from the esplora API
+//!
+//! see: <https://github.com/Blockstream/esplora/blob/master/API.md>
+use crate::ConfirmationTime;
+use bitcoin::{OutPoint, Script, Transaction, TxIn, TxOut, Txid};
+
+#[derive(serde::Deserialize, Clone, Debug)]
+pub struct PrevOut {
+ pub value: u64,
+ pub scriptpubkey: Script,
+}
+
+#[derive(serde::Deserialize, Clone, Debug)]
+pub struct Vin {
+ pub txid: Txid,
+ pub vout: u32,
+ // None if coinbase
+ pub prevout: Option<PrevOut>,
+ pub scriptsig: Script,
+ #[serde(deserialize_with = "deserialize_witness")]
+ pub witness: Vec<Vec<u8>>,
+ pub sequence: u32,
+ pub is_coinbase: bool,
+}
+
+#[derive(serde::Deserialize, Clone, Debug)]
+pub struct Vout {
+ pub value: u64,
+ pub scriptpubkey: Script,
+}
+
+#[derive(serde::Deserialize, Clone, Debug)]
+pub struct TxStatus {
+ pub confirmed: bool,
+ pub block_height: Option<u32>,
+ pub block_time: Option<u64>,
+}
+
+#[derive(serde::Deserialize, Clone, Debug)]
+pub struct Tx {
+ pub txid: Txid,
+ pub version: i32,
+ pub locktime: u32,
+ pub vin: Vec<Vin>,
+ pub vout: Vec<Vout>,
+ pub status: TxStatus,
+ pub fee: u64,
+}
+
+impl Tx {
+ pub fn to_tx(&self) -> Transaction {
+ Transaction {
+ version: self.version,
+ lock_time: self.locktime,
+ input: self
+ .vin
+ .iter()
+ .cloned()
+ .map(|vin| TxIn {
+ previous_output: OutPoint {
+ txid: vin.txid,
+ vout: vin.vout,
+ },
+ script_sig: vin.scriptsig,
+ sequence: vin.sequence,
+ witness: vin.witness,
+ })
+ .collect(),
+ output: self
+ .vout
+ .iter()
+ .cloned()
+ .map(|vout| TxOut {
+ value: vout.value,
+ script_pubkey: vout.scriptpubkey,
+ })
+ .collect(),
+ }
+ }
+
+ pub fn confirmation_time(&self) -> Option<ConfirmationTime> {
+ match self.status {
+ TxStatus {
+ confirmed: true,
+ block_height: Some(height),
+ block_time: Some(timestamp),
+ } => Some(ConfirmationTime { timestamp, height }),
+ _ => None,
+ }
+ }
+
+ pub fn previous_outputs(&self) -> Vec<Option<TxOut>> {
+ self.vin
+ .iter()
+ .cloned()
+ .map(|vin| {
+ vin.prevout.map(|po| TxOut {
+ script_pubkey: po.scriptpubkey,
+ value: po.value,
+ })
+ })
+ .collect()
+ }
+}
+
+fn deserialize_witness<'de, D>(d: D) -> Result<Vec<Vec<u8>>, D::Error>
+where
+ D: serde::de::Deserializer<'de>,
+{
+ use crate::serde::Deserialize;
+ use bitcoin::hashes::hex::FromHex;
+ let list = Vec::<String>::deserialize(d)?;
+ list.into_iter()
+ .map(|hex_str| Vec::<u8>::from_hex(&hex_str))
+ .collect::<Result<Vec<Vec<u8>>, _>>()
+ .map_err(serde::de::Error::custom)
+}
use std::fmt;
use std::io;
-use serde::Deserialize;
-
use bitcoin::consensus;
use bitcoin::{BlockHash, Txid};
#[cfg(feature = "ureq")]
pub use self::ureq::*;
+mod api;
+
fn into_fee_rate(target: usize, estimates: HashMap<String, f64>) -> Result<FeeRate, Error> {
let fee_val = estimates
.into_iter()
Ok(FeeRate::from_sat_per_vb(fee_val as f32))
}
-/// Data type used when fetching transaction history from Esplora.
-#[derive(Deserialize)]
-pub struct EsploraGetHistory {
- txid: Txid,
- status: EsploraGetHistoryStatus,
-}
-
-#[derive(Deserialize)]
-struct EsploraGetHistoryStatus {
- block_height: Option<usize>,
-}
-
/// Errors that can happen during a sync with [`EsploraBlockchain`]
#[derive(Debug)]
pub enum EsploraError {
}
}
+/// Configuration for an [`EsploraBlockchain`]
+#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq)]
+pub struct EsploraBlockchainConfig {
+ /// Base URL of the esplora service
+ ///
+ /// eg. `https://blockstream.info/api/`
+ pub base_url: String,
+ /// Optional URL of the proxy to use to make requests to the Esplora server
+ ///
+ /// The string should be formatted as: `<protocol>://<user>:<password>@host:<port>`.
+ ///
+ /// Note that the format of this value and the supported protocols change slightly between the
+ /// sync version of esplora (using `ureq`) and the async version (using `reqwest`). For more
+ /// details check with the documentation of the two crates. Both of them are compiled with
+ /// the `socks` feature enabled.
+ ///
+ /// The proxy is ignored when targeting `wasm32`.
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub proxy: Option<String>,
+ /// Number of parallel requests sent to the esplora service (default: 4)
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub concurrency: Option<u8>,
+ /// Stop searching addresses for transactions after finding an unused gap of this length.
+ pub stop_gap: usize,
+ /// Socket timeout.
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub timeout: Option<u64>,
+}
+
+impl EsploraBlockchainConfig {
+ /// create a config with default values given the base url and stop gap
+ pub fn new(base_url: String) -> Self {
+ Self {
+ base_url,
+ proxy: None,
+ timeout: None,
+ stop_gap: 20,
+ concurrency: None,
+ }
+ }
+}
+
impl std::error::Error for EsploraError {}
-#[cfg(feature = "ureq")]
-impl_error!(::ureq::Error, Ureq, EsploraError);
#[cfg(feature = "ureq")]
impl_error!(::ureq::Transport, UreqTransport, EsploraError);
#[cfg(feature = "reqwest")]
EsploraBlockchain::new(&format!("http://{}",test_client.electrsd.esplora_url.as_ref().unwrap()), 20)
}
}
+
+const DEFAULT_CONCURRENT_REQUESTS: u8 = 4;
#[allow(unused_imports)]
use log::{debug, error, info, trace};
-use futures::stream::{self, FuturesOrdered, StreamExt, TryStreamExt};
-
use ::reqwest::{Client, StatusCode};
+use futures::stream::{FuturesOrdered, TryStreamExt};
-use crate::blockchain::esplora::{EsploraError, EsploraGetHistory};
-use crate::blockchain::utils::{ElectrumLikeSync, ElsGetHistoryRes};
+use super::api::Tx;
+use crate::blockchain::esplora::EsploraError;
use crate::blockchain::*;
use crate::database::BatchDatabase;
use crate::error::Error;
-use crate::wallet::utils::ChunksIterator;
use crate::FeeRate;
-const DEFAULT_CONCURRENT_REQUESTS: u8 = 4;
-
#[derive(Debug)]
struct UrlClient {
url: String,
url_client: UrlClient {
url: base_url.to_string(),
client: Client::new(),
- concurrency: DEFAULT_CONCURRENT_REQUESTS,
+ concurrency: super::DEFAULT_CONCURRENT_REQUESTS,
},
stop_gap,
}
fn setup<D: BatchDatabase, P: Progress>(
&self,
database: &mut D,
- progress_update: P,
+ _progress_update: P,
) -> Result<(), Error> {
- maybe_await!(self
- .url_client
- .electrum_like_setup(self.stop_gap, database, progress_update))
+ use crate::blockchain::script_sync::Request;
+ let mut request = script_sync::start(database, self.stop_gap)?;
+ let mut tx_index: HashMap<Txid, Tx> = HashMap::new();
+
+ let batch_update = loop {
+ request = match request {
+ Request::Script(script_req) => {
+ let futures: FuturesOrdered<_> = script_req
+ .request()
+ .take(self.url_client.concurrency as usize)
+ .map(|script| async move {
+ let mut related_txs: Vec<Tx> =
+ self.url_client._scripthash_txs(script, None).await?;
+
+ let n_confirmed =
+ related_txs.iter().filter(|tx| tx.status.confirmed).count();
+ // esplora pages on 25 confirmed transactions. If there's more than
+ // 25 we need to keep requesting.
+ if n_confirmed >= 25 {
+ loop {
+ let new_related_txs: Vec<Tx> = self
+ .url_client
+ ._scripthash_txs(
+ script,
+ Some(related_txs.last().unwrap().txid),
+ )
+ .await?;
+ let n = new_related_txs.len();
+ related_txs.extend(new_related_txs);
+ // we've reached the end
+ if n < 25 {
+ break;
+ }
+ }
+ }
+ Result::<_, Error>::Ok(related_txs)
+ })
+ .collect();
+ let txs_per_script: Vec<Vec<Tx>> = await_or_block!(futures.try_collect())?;
+ let mut satisfaction = vec![];
+
+ for txs in txs_per_script {
+ satisfaction.push(
+ txs.iter()
+ .map(|tx| (tx.txid, tx.status.block_height))
+ .collect(),
+ );
+ for tx in txs {
+ tx_index.insert(tx.txid, tx);
+ }
+ }
+
+ script_req.satisfy(satisfaction)?
+ }
+ Request::Conftime(conftimereq) => {
+ let conftimes = conftimereq
+ .request()
+ .map(|txid| {
+ tx_index
+ .get(txid)
+ .expect("must be in index")
+ .confirmation_time()
+ })
+ .collect();
+ conftimereq.satisfy(conftimes)?
+ }
+ Request::Tx(txreq) => {
+ let full_txs = txreq
+ .request()
+ .map(|txid| {
+ let tx = tx_index.get(txid).expect("must be in index");
+ (tx.confirmation_time(), tx.previous_outputs(), tx.to_tx())
+ })
+ .collect();
+ txreq.satisfy(full_txs)?
+ }
+ Request::Finish(batch_update) => break batch_update,
+ }
+ };
+
+ database.commit_batch(batch_update)?;
+
+ Ok(())
}
fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
}
impl UrlClient {
- fn script_to_scripthash(script: &Script) -> String {
- sha256::Hash::hash(script.as_bytes()).into_inner().to_hex()
- }
-
async fn _get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, EsploraError> {
let resp = self
.client
Ok(req.error_for_status()?.text().await?.parse()?)
}
- async fn _script_get_history(
+ async fn _scripthash_txs(
&self,
script: &Script,
- ) -> Result<Vec<ElsGetHistoryRes>, EsploraError> {
- let mut result = Vec::new();
- let scripthash = Self::script_to_scripthash(script);
-
- // Add the unconfirmed transactions first
- result.extend(
- self.client
- .get(&format!(
- "{}/scripthash/{}/txs/mempool",
- self.url, scripthash
- ))
- .send()
- .await?
- .error_for_status()?
- .json::<Vec<EsploraGetHistory>>()
- .await?
- .into_iter()
- .map(|x| ElsGetHistoryRes {
- tx_hash: x.txid,
- height: x.status.block_height.unwrap_or(0) as i32,
- }),
- );
-
- debug!(
- "Found {} mempool txs for {} - {:?}",
- result.len(),
- scripthash,
- script
- );
-
- // Then go through all the pages of confirmed transactions
- let mut last_txid = String::new();
- loop {
- let response = self
- .client
- .get(&format!(
- "{}/scripthash/{}/txs/chain/{}",
- self.url, scripthash, last_txid
- ))
- .send()
- .await?
- .error_for_status()?
- .json::<Vec<EsploraGetHistory>>()
- .await?;
- let len = response.len();
- if let Some(elem) = response.last() {
- last_txid = elem.txid.to_hex();
- }
-
- debug!("... adding {} confirmed transactions", len);
-
- result.extend(response.into_iter().map(|x| ElsGetHistoryRes {
- tx_hash: x.txid,
- height: x.status.block_height.unwrap_or(0) as i32,
- }));
-
- if len < 25 {
- break;
- }
- }
-
- Ok(result)
+ last_seen: Option<Txid>,
+ ) -> Result<Vec<Tx>, EsploraError> {
+ let script_hash = sha256::Hash::hash(script.as_bytes()).into_inner().to_hex();
+ let url = match last_seen {
+ Some(last_seen) => format!(
+ "{}/scripthash/{}/txs/chain/{}",
+ self.url, script_hash, last_seen
+ ),
+ None => format!("{}/scripthash/{}/txs", self.url, script_hash),
+ };
+ Ok(self
+ .client
+ .get(url)
+ .send()
+ .await?
+ .error_for_status()?
+ .json::<Vec<Tx>>()
+ .await?)
}
async fn _get_fee_estimates(&self) -> Result<HashMap<String, f64>, EsploraError> {
}
}
-#[maybe_async]
-impl ElectrumLikeSync for UrlClient {
- fn els_batch_script_get_history<'s, I: IntoIterator<Item = &'s Script>>(
- &self,
- scripts: I,
- ) -> Result<Vec<Vec<ElsGetHistoryRes>>, Error> {
- let mut results = vec![];
- for chunk in ChunksIterator::new(scripts.into_iter(), self.concurrency as usize) {
- let mut futs = FuturesOrdered::new();
- for script in chunk {
- futs.push(self._script_get_history(script));
- }
- let partial_results: Vec<Vec<ElsGetHistoryRes>> = await_or_block!(futs.try_collect())?;
- results.extend(partial_results);
- }
- Ok(await_or_block!(stream::iter(results).collect()))
- }
-
- fn els_batch_transaction_get<'s, I: IntoIterator<Item = &'s Txid>>(
- &self,
- txids: I,
- ) -> Result<Vec<Transaction>, Error> {
- let mut results = vec![];
- for chunk in ChunksIterator::new(txids.into_iter(), self.concurrency as usize) {
- let mut futs = FuturesOrdered::new();
- for txid in chunk {
- futs.push(self._get_tx_no_opt(txid));
- }
- let partial_results: Vec<Transaction> = await_or_block!(futs.try_collect())?;
- results.extend(partial_results);
- }
- Ok(await_or_block!(stream::iter(results).collect()))
- }
-
- fn els_batch_block_header<I: IntoIterator<Item = u32>>(
- &self,
- heights: I,
- ) -> Result<Vec<BlockHeader>, Error> {
- let mut results = vec![];
- for chunk in ChunksIterator::new(heights.into_iter(), self.concurrency as usize) {
- let mut futs = FuturesOrdered::new();
- for height in chunk {
- futs.push(self._get_header(height));
- }
- let partial_results: Vec<BlockHeader> = await_or_block!(futs.try_collect())?;
- results.extend(partial_results);
- }
- Ok(await_or_block!(stream::iter(results).collect()))
- }
-}
-
-/// Configuration for an [`EsploraBlockchain`]
-#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq)]
-pub struct EsploraBlockchainConfig {
- /// Base URL of the esplora service
- ///
- /// eg. `https://blockstream.info/api/`
- pub base_url: String,
- /// Optional URL of the proxy to use to make requests to the Esplora server
- ///
- /// The string should be formatted as: `<protocol>://<user>:<password>@host:<port>`.
- ///
- /// Note that the format of this value and the supported protocols change slightly between the
- /// sync version of esplora (using `ureq`) and the async version (using `reqwest`). For more
- /// details check with the documentation of the two crates. Both of them are compiled with
- /// the `socks` feature enabled.
- ///
- /// The proxy is ignored when targeting `wasm32`.
- pub proxy: Option<String>,
- /// Number of parallel requests sent to the esplora service (default: 4)
- pub concurrency: Option<u8>,
- /// Stop searching addresses for transactions after finding an unused gap of this length.
- pub stop_gap: usize,
-}
-
impl ConfigurableBlockchain for EsploraBlockchain {
- type Config = EsploraBlockchainConfig;
+ type Config = super::EsploraBlockchainConfig;
fn from_config(config: &Self::Config) -> Result<Self, Error> {
let map_e = |e: reqwest::Error| Error::Esplora(Box::new(e.into()));
if let Some(concurrency) = config.concurrency {
blockchain.url_client.concurrency = concurrency;
}
+ let mut builder = Client::builder();
#[cfg(not(target_arch = "wasm32"))]
if let Some(proxy) = &config.proxy {
- blockchain.url_client.client = Client::builder()
- .proxy(reqwest::Proxy::all(proxy).map_err(map_e)?)
- .build()
- .map_err(map_e)?;
+ builder = builder.proxy(reqwest::Proxy::all(proxy).map_err(map_e)?);
}
+
+ #[cfg(not(target_arch = "wasm32"))]
+ if let Some(timeout) = config.timeout {
+ builder = builder.timeout(core::time::Duration::from_secs(timeout));
+ }
+
+ blockchain.url_client.client = builder.build().map_err(map_e)?;
+
Ok(blockchain)
}
}
use bitcoin::hashes::{sha256, Hash};
use bitcoin::{BlockHeader, Script, Transaction, Txid};
-use crate::blockchain::esplora::{EsploraError, EsploraGetHistory};
-use crate::blockchain::utils::{ElectrumLikeSync, ElsGetHistoryRes};
+use super::api::Tx;
+use crate::blockchain::esplora::EsploraError;
use crate::blockchain::*;
use crate::database::BatchDatabase;
use crate::error::Error;
use crate::FeeRate;
-#[derive(Debug)]
+#[derive(Debug, Clone)]
struct UrlClient {
url: String,
agent: Agent,
pub struct EsploraBlockchain {
url_client: UrlClient,
stop_gap: usize,
-}
-
-impl std::convert::From<UrlClient> for EsploraBlockchain {
- fn from(url_client: UrlClient) -> Self {
- EsploraBlockchain {
- url_client,
- stop_gap: 20,
- }
- }
+ concurrency: u8,
}
impl EsploraBlockchain {
url: base_url.to_string(),
agent: Agent::new(),
},
+ concurrency: super::DEFAULT_CONCURRENT_REQUESTS,
stop_gap,
}
}
self.url_client.agent = agent;
self
}
+
+ /// Set the number of parallel requests the client can make.
+ pub fn with_concurrency(mut self, concurrency: u8) -> Self {
+ self.concurrency = concurrency;
+ self
+ }
}
impl Blockchain for EsploraBlockchain {
fn setup<D: BatchDatabase, P: Progress>(
&self,
database: &mut D,
- progress_update: P,
+ _progress_update: P,
) -> Result<(), Error> {
- self.url_client
- .electrum_like_setup(self.stop_gap, database, progress_update)
+ use crate::blockchain::script_sync::Request;
+ let mut request = script_sync::start(database, self.stop_gap)?;
+ let mut tx_index: HashMap<Txid, Tx> = HashMap::new();
+ let batch_update = loop {
+ request = match request {
+ Request::Script(script_req) => {
+ let scripts = script_req
+ .request()
+ .take(self.concurrency as usize)
+ .cloned();
+
+ let handles = scripts.map(move |script| {
+ let client = self.url_client.clone();
+ // make each request in its own thread.
+ std::thread::spawn(move || {
+ let mut related_txs: Vec<Tx> = client._scripthash_txs(&script, None)?;
+
+ let n_confirmed =
+ related_txs.iter().filter(|tx| tx.status.confirmed).count();
+ // esplora pages on 25 confirmed transactions. If there's more than
+ // 25 we need to keep requesting.
+ if n_confirmed >= 25 {
+ loop {
+ let new_related_txs: Vec<Tx> = client._scripthash_txs(
+ &script,
+ Some(related_txs.last().unwrap().txid),
+ )?;
+ let n = new_related_txs.len();
+ related_txs.extend(new_related_txs);
+ // we've reached the end
+ if n < 25 {
+ break;
+ }
+ }
+ }
+ Result::<_, Error>::Ok(related_txs)
+ })
+ });
+
+ let txs_per_script: Vec<Vec<Tx>> = handles
+ .map(|handle| handle.join().unwrap())
+ .collect::<Result<_, _>>()?;
+ let mut satisfaction = vec![];
+
+ for txs in txs_per_script {
+ satisfaction.push(
+ txs.iter()
+ .map(|tx| (tx.txid, tx.status.block_height))
+ .collect(),
+ );
+ for tx in txs {
+ tx_index.insert(tx.txid, tx);
+ }
+ }
+
+ script_req.satisfy(satisfaction)?
+ }
+ Request::Conftime(conftimereq) => {
+ let conftimes = conftimereq
+ .request()
+ .map(|txid| {
+ tx_index
+ .get(txid)
+ .expect("must be in index")
+ .confirmation_time()
+ })
+ .collect();
+ conftimereq.satisfy(conftimes)?
+ }
+ Request::Tx(txreq) => {
+ let full_txs = txreq
+ .request()
+ .map(|txid| {
+ let tx = tx_index.get(txid).expect("must be in index");
+ (tx.confirmation_time(), tx.previous_outputs(), tx.to_tx())
+ })
+ .collect();
+ txreq.satisfy(full_txs)?
+ }
+ Request::Finish(batch_update) => break batch_update,
+ }
+ };
+
+ database.commit_batch(batch_update)?;
+
+ Ok(())
}
fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
}
impl UrlClient {
- fn script_to_scripthash(script: &Script) -> String {
- sha256::Hash::hash(script.as_bytes()).into_inner().to_hex()
- }
-
fn _get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, EsploraError> {
let resp = self
.agent
}
}
- fn _script_get_history(&self, script: &Script) -> Result<Vec<ElsGetHistoryRes>, EsploraError> {
- let mut result = Vec::new();
- let scripthash = Self::script_to_scripthash(script);
-
- // Add the unconfirmed transactions first
-
- let resp = self
- .agent
- .get(&format!(
- "{}/scripthash/{}/txs/mempool",
- self.url, scripthash
- ))
- .call();
-
- let v = match resp {
- Ok(resp) => {
- let v: Vec<EsploraGetHistory> = resp.into_json()?;
- Ok(v)
- }
- Err(ureq::Error::Status(code, _)) => Err(EsploraError::HttpResponse(code)),
- Err(e) => Err(EsploraError::Ureq(e)),
- }?;
-
- result.extend(v.into_iter().map(|x| ElsGetHistoryRes {
- tx_hash: x.txid,
- height: x.status.block_height.unwrap_or(0) as i32,
- }));
-
- debug!(
- "Found {} mempool txs for {} - {:?}",
- result.len(),
- scripthash,
- script
- );
-
- // Then go through all the pages of confirmed transactions
- let mut last_txid = String::new();
- loop {
- let resp = self
- .agent
- .get(&format!(
- "{}/scripthash/{}/txs/chain/{}",
- self.url, scripthash, last_txid
- ))
- .call();
-
- let v = match resp {
- Ok(resp) => {
- let v: Vec<EsploraGetHistory> = resp.into_json()?;
- Ok(v)
- }
- Err(ureq::Error::Status(code, _)) => Err(EsploraError::HttpResponse(code)),
- Err(e) => Err(EsploraError::Ureq(e)),
- }?;
-
- let len = v.len();
- if let Some(elem) = v.last() {
- last_txid = elem.txid.to_hex();
- }
-
- debug!("... adding {} confirmed transactions", len);
-
- result.extend(v.into_iter().map(|x| ElsGetHistoryRes {
- tx_hash: x.txid,
- height: x.status.block_height.unwrap_or(0) as i32,
- }));
-
- if len < 25 {
- break;
- }
- }
-
- Ok(result)
- }
-
fn _get_fee_estimates(&self) -> Result<HashMap<String, f64>, EsploraError> {
let resp = self
.agent
Ok(map)
}
+
+ fn _scripthash_txs(
+ &self,
+ script: &Script,
+ last_seen: Option<Txid>,
+ ) -> Result<Vec<Tx>, EsploraError> {
+ let script_hash = sha256::Hash::hash(script.as_bytes()).into_inner().to_hex();
+ let url = match last_seen {
+ Some(last_seen) => format!(
+ "{}/scripthash/{}/txs/chain/{}",
+ self.url, script_hash, last_seen
+ ),
+ None => format!("{}/scripthash/{}/txs", self.url, script_hash),
+ };
+ Ok(self.agent.get(&url).call()?.into_json()?)
+ }
}
fn is_status_not_found(status: u16) -> bool {
Ok(buf)
}
-impl ElectrumLikeSync for UrlClient {
- fn els_batch_script_get_history<'s, I: IntoIterator<Item = &'s Script>>(
- &self,
- scripts: I,
- ) -> Result<Vec<Vec<ElsGetHistoryRes>>, Error> {
- let mut results = vec![];
- for script in scripts.into_iter() {
- let v = self._script_get_history(script)?;
- results.push(v);
- }
- Ok(results)
- }
-
- fn els_batch_transaction_get<'s, I: IntoIterator<Item = &'s Txid>>(
- &self,
- txids: I,
- ) -> Result<Vec<Transaction>, Error> {
- let mut results = vec![];
- for txid in txids.into_iter() {
- let tx = self._get_tx_no_opt(txid)?;
- results.push(tx);
- }
- Ok(results)
- }
-
- fn els_batch_block_header<I: IntoIterator<Item = u32>>(
- &self,
- heights: I,
- ) -> Result<Vec<BlockHeader>, Error> {
- let mut results = vec![];
- for height in heights.into_iter() {
- let header = self._get_header(height)?;
- results.push(header);
- }
- Ok(results)
- }
-}
-
-/// Configuration for an [`EsploraBlockchain`]
-#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq)]
-pub struct EsploraBlockchainConfig {
- /// Base URL of the esplora service eg. `https://blockstream.info/api/`
- pub base_url: String,
- /// Optional URL of the proxy to use to make requests to the Esplora server
- ///
- /// The string should be formatted as: `<protocol>://<user>:<password>@host:<port>`.
- ///
- /// Note that the format of this value and the supported protocols change slightly between the
- /// sync version of esplora (using `ureq`) and the async version (using `reqwest`). For more
- /// details check with the documentation of the two crates. Both of them are compiled with
- /// the `socks` feature enabled.
- ///
- /// The proxy is ignored when targeting `wasm32`.
- pub proxy: Option<String>,
- /// Socket read timeout.
- pub timeout_read: u64,
- /// Socket write timeout.
- pub timeout_write: u64,
- /// Stop searching addresses for transactions after finding an unused gap of this length.
- pub stop_gap: usize,
-}
-
impl ConfigurableBlockchain for EsploraBlockchain {
- type Config = EsploraBlockchainConfig;
+ type Config = super::EsploraBlockchainConfig;
fn from_config(config: &Self::Config) -> Result<Self, Error> {
- let mut agent_builder = ureq::AgentBuilder::new()
- .timeout_read(Duration::from_secs(config.timeout_read))
- .timeout_write(Duration::from_secs(config.timeout_write));
+ let mut agent_builder = ureq::AgentBuilder::new();
+
+ if let Some(timeout) = config.timeout {
+ agent_builder = agent_builder.timeout(Duration::from_secs(timeout));
+ }
if let Some(proxy) = &config.proxy {
agent_builder = agent_builder
.proxy(Proxy::new(proxy).map_err(|e| Error::Esplora(Box::new(e.into())))?);
}
- Ok(
- EsploraBlockchain::new(config.base_url.as_str(), config.stop_gap)
- .with_agent(agent_builder.build()),
- )
+ let mut blockchain = EsploraBlockchain::new(config.base_url.as_str(), config.stop_gap)
+ .with_agent(agent_builder.build());
+
+ if let Some(concurrency) = config.concurrency {
+ blockchain = blockchain.with_concurrency(concurrency);
+ }
+
+ Ok(blockchain)
+ }
+}
+
+impl From<ureq::Error> for EsploraError {
+ fn from(e: ureq::Error) -> Self {
+ match e {
+ ureq::Error::Status(code, _) => EsploraError::HttpResponse(code),
+ e => EsploraError::Ureq(e),
+ }
}
}
use crate::error::Error;
use crate::FeeRate;
-#[cfg(any(feature = "electrum", feature = "esplora"))]
-pub(crate) mod utils;
-
#[cfg(any(
feature = "electrum",
feature = "esplora",
feature = "rpc"
))]
pub mod any;
+mod script_sync;
+
#[cfg(any(
feature = "electrum",
feature = "esplora",
--- /dev/null
+/*!
+This models a how a sync happens where you have a server that you send your script pubkeys to and it
+returns associated transactions i.e. electrum.
+*/
+#![allow(dead_code)]
+use crate::{
+ database::{BatchDatabase, BatchOperations, DatabaseUtils},
+ ConfirmationTime, Error, KeychainKind, LocalUtxo, TransactionDetails,
+};
+use bitcoin::{OutPoint, Script, Transaction, TxOut, Txid};
+use std::collections::{HashMap, HashSet, VecDeque};
+
+struct State<'a, D> {
+ db: &'a D,
+ last_active_index: HashMap<KeychainKind, usize>,
+ tx_needed: VecDeque<Txid>,
+ conftime_needed: VecDeque<Txid>,
+ observed_txs: Vec<TransactionDetails>,
+}
+
+/// A reqeust for on-chain information
+pub enum Request<'a, D: BatchDatabase> {
+ /// A request for transactions related to script pubkeys.
+ Script(ScriptReq<'a, D>),
+ /// A request for confirmation times for some transactions.
+ Conftime(ConftimeReq<'a, D>),
+ /// A request for full transaction details of some transactions.
+ Tx(TxReq<'a, D>),
+ /// Requests are finished here's a batch database update to reflect data gathered.
+ Finish(D::Batch),
+}
+
+/// starts a sync
+pub fn start<D: BatchDatabase>(db: &D, stop_gap: usize) -> Result<Request<'_, D>, Error> {
+ use rand::seq::SliceRandom;
+ let mut keychains = vec![KeychainKind::Internal, KeychainKind::External];
+ // shuffling improve privacy, the server doesn't know my first request is from my internal or external addresses
+ keychains.shuffle(&mut rand::thread_rng());
+ let keychain = keychains.pop().unwrap();
+ let scripts_needed = db
+ .iter_script_pubkeys(Some(keychain))?
+ .into_iter()
+ .collect();
+ let state = State {
+ db,
+ last_active_index: HashMap::default(),
+ conftime_needed: VecDeque::default(),
+ observed_txs: vec![],
+ tx_needed: VecDeque::default(),
+ };
+
+ Ok(Request::Script(ScriptReq {
+ state,
+ scripts_needed,
+ script_index: 0,
+ stop_gap,
+ keychain,
+ next_keychains: keychains,
+ tx_interested: HashSet::default(),
+ tx_conftime_interested: HashSet::default(),
+ }))
+}
+
+pub struct ScriptReq<'a, D: BatchDatabase> {
+ state: State<'a, D>,
+ script_index: usize,
+ scripts_needed: VecDeque<Script>,
+ stop_gap: usize,
+ keychain: KeychainKind,
+ next_keychains: Vec<KeychainKind>,
+ tx_interested: HashSet<Txid>,
+ tx_conftime_interested: HashSet<Txid>,
+}
+
+/// The sync starts by returning script pubkeys we are interested in.
+impl<'a, D: BatchDatabase> ScriptReq<'a, D> {
+ pub fn request(&self) -> impl Iterator<Item = &Script> + Clone {
+ self.scripts_needed.iter()
+ }
+
+ pub fn satisfy(
+ mut self,
+ // we want to know the txids assoiciated with the script and their height
+ txids: Vec<Vec<(Txid, Option<u32>)>>,
+ ) -> Result<Request<'a, D>, Error> {
+ for txid_list in txids.iter() {
+ if !txid_list.is_empty() {
+ // the address is active
+ self.state
+ .last_active_index
+ .insert(self.keychain, self.script_index);
+ }
+
+ for (txid, height) in txid_list {
+ // have we seen this txid already?
+ match self.state.db.get_tx(txid, true)? {
+ Some(mut details) => {
+ let old_height = details.confirmation_time.as_ref().map(|x| x.height);
+ match (old_height, height) {
+ (None, Some(_)) => {
+ // It looks like the tx has confirmed since we last saw it -- we
+ // need to know the confirmation time.
+ self.tx_conftime_interested.insert(*txid);
+ }
+ (Some(old_height), Some(new_height)) if old_height != *new_height => {
+ // The height of the tx has changed !? -- get the confirmation time.
+ self.tx_conftime_interested.insert(*txid);
+ }
+ (Some(_), None) => {
+ details.confirmation_time = None;
+ self.state.observed_txs.push(details);
+ }
+ _ => self.state.observed_txs.push(details),
+ }
+ }
+ None => {
+ // we've never seen it let's get the whole thing
+ self.tx_interested.insert(*txid);
+ }
+ };
+ }
+
+ self.script_index += 1;
+ }
+
+ for _ in txids {
+ self.scripts_needed.pop_front();
+ }
+
+ let last_active_index = self
+ .state
+ .last_active_index
+ .get(&self.keychain)
+ .map(|x| x + 1)
+ .unwrap_or(0); // so no addresses active maps to 0
+
+ Ok(
+ if self.script_index > last_active_index + self.stop_gap
+ || self.scripts_needed.is_empty()
+ {
+ // we're done here -- check if we need to do the next keychain
+ if let Some(keychain) = self.next_keychains.pop() {
+ self.keychain = keychain;
+ self.script_index = 0;
+ self.scripts_needed = self
+ .state
+ .db
+ .iter_script_pubkeys(Some(keychain))?
+ .into_iter()
+ .collect();
+ Request::Script(self)
+ } else {
+ self.state.conftime_needed = self.tx_conftime_interested.into_iter().collect();
+ self.state.tx_needed = self.tx_interested.into_iter().collect();
+ Request::Conftime(ConftimeReq { state: self.state })
+ }
+ } else {
+ Request::Script(self)
+ },
+ )
+ }
+}
+
+/// Next step is to get confirmation times for those we are interested in.
+pub struct ConftimeReq<'a, D> {
+ state: State<'a, D>,
+}
+
+impl<'a, D: BatchDatabase> ConftimeReq<'a, D> {
+ pub fn request(&self) -> impl Iterator<Item = &Txid> + Clone {
+ self.state.conftime_needed.iter()
+ }
+
+ pub fn satisfy(
+ mut self,
+ confirmation_times: Vec<Option<ConfirmationTime>>,
+ ) -> Result<Request<'a, D>, Error> {
+ let n = confirmation_times.len();
+ for (confirmation_time, txid) in confirmation_times
+ .into_iter()
+ .zip(self.state.conftime_needed.iter())
+ {
+ if let Some(mut tx_details) = self.state.db.get_tx(txid, true)? {
+ tx_details.confirmation_time = confirmation_time;
+ self.state.observed_txs.push(tx_details);
+ }
+ }
+
+ for _ in 0..n {
+ self.state.conftime_needed.pop_front();
+ }
+
+ if self.state.conftime_needed.is_empty() {
+ Ok(Request::Tx(TxReq { state: self.state }))
+ } else {
+ Ok(Request::Conftime(self))
+ }
+ }
+}
+
+/// Then we get full transactions
+pub struct TxReq<'a, D> {
+ state: State<'a, D>,
+}
+
+impl<'a, D: BatchDatabase> TxReq<'a, D> {
+ pub fn request(&self) -> impl Iterator<Item = &Txid> + Clone {
+ self.state.tx_needed.iter()
+ }
+
+ pub fn satisfy(
+ mut self,
+ tx_details: Vec<(Option<ConfirmationTime>, Vec<Option<TxOut>>, Transaction)>,
+ ) -> Result<Request<'a, D>, Error> {
+ let tx_details: Vec<TransactionDetails> = tx_details
+ .into_iter()
+ .zip(self.state.tx_needed.iter())
+ .map(|((confirmation_time, vin, tx), txid)| {
+ assert_eq!(tx.txid(), *txid);
+ let mut sent: u64 = 0;
+ let mut received: u64 = 0;
+ let mut inputs_sum: u64 = 0;
+ let mut outputs_sum: u64 = 0;
+
+ for (txout, input) in vin.into_iter().zip(tx.input.iter()) {
+ let txout = match txout {
+ Some(txout) => txout,
+ None => {
+ // skip coinbase inputs
+ debug_assert!(
+ input.previous_output.is_null(),
+ "prevout should only be missing for coinbase"
+ );
+ continue;
+ }
+ };
+
+ inputs_sum += txout.value;
+ if self.state.db.is_mine(&txout.script_pubkey)? {
+ sent += txout.value;
+ }
+ }
+
+ for out in &tx.output {
+ outputs_sum += out.value;
+ if self.state.db.is_mine(&out.script_pubkey)? {
+ received += out.value;
+ }
+ }
+ // we need to saturating sub since we want coinbase txs to map to 0 fee and
+ // this subtraction will be negative for coinbase txs.
+ let fee = inputs_sum.saturating_sub(outputs_sum);
+ Result::<_, Error>::Ok(TransactionDetails {
+ txid: *txid,
+ transaction: Some(tx),
+ received,
+ sent,
+ confirmation_time,
+ fee: Some(fee),
+ verified: false,
+ })
+ })
+ .collect::<Result<Vec<_>, _>>()?;
+
+ for tx_detail in tx_details {
+ self.state.observed_txs.push(tx_detail);
+ self.state.tx_needed.pop_front();
+ }
+
+ if !self.state.tx_needed.is_empty() {
+ Ok(Request::Tx(self))
+ } else {
+ let existing_txs = self.state.db.iter_txs(false)?;
+ let existing_txids: HashSet<Txid> = existing_txs.iter().map(|tx| tx.txid).collect();
+ let observed_txs = make_txs_consistent(&self.state.observed_txs);
+ let observed_txids: HashSet<Txid> = observed_txs.iter().map(|tx| tx.txid).collect();
+ let txids_to_delete = existing_txids.difference(&observed_txids);
+ let mut batch = self.state.db.begin_batch();
+
+ // Delete old txs that no longer exist
+ for txid in txids_to_delete {
+ if let Some(raw_tx) = self.state.db.get_raw_tx(txid)? {
+ for i in 0..raw_tx.output.len() {
+ // Also delete any utxos from the txs that no longer exist.
+ let _ = batch.del_utxo(&OutPoint {
+ txid: *txid,
+ vout: i as u32,
+ })?;
+ }
+ } else {
+ unreachable!("we should always have the raw tx");
+ }
+ batch.del_tx(txid, true)?;
+ }
+
+ // Set every tx we observed
+ for observed_tx in &observed_txs {
+ let tx = observed_tx
+ .transaction
+ .as_ref()
+ .expect("transaction will always be present here");
+ for (i, output) in tx.output.iter().enumerate() {
+ if let Some((keychain, _)) = self
+ .state
+ .db
+ .get_path_from_script_pubkey(&output.script_pubkey)?
+ {
+ // add utxos we own from the new transactions we've seen.
+ batch.set_utxo(&LocalUtxo {
+ outpoint: OutPoint {
+ txid: observed_tx.txid,
+ vout: i as u32,
+ },
+ txout: output.clone(),
+ keychain,
+ })?;
+ }
+ }
+ batch.set_tx(observed_tx)?;
+ }
+
+ // we don't do this in the loop above since we may want to delete some of the utxos we
+ // just added in case there are new tranasactions that spend form each other.
+ for observed_tx in &observed_txs {
+ let tx = observed_tx
+ .transaction
+ .as_ref()
+ .expect("transaction will always be present here");
+ for input in &tx.input {
+ // Delete any spent utxos
+ batch.del_utxo(&input.previous_output)?;
+ }
+ }
+
+ for (keychain, last_active_index) in self.state.last_active_index {
+ batch.set_last_index(keychain, last_active_index as u32)?;
+ }
+
+ Ok(Request::Finish(batch))
+ }
+ }
+}
+
+/// Remove conflicting transactions -- tie breaking them by fee.
+fn make_txs_consistent(txs: &[TransactionDetails]) -> Vec<&TransactionDetails> {
+ let mut utxo_index: HashMap<OutPoint, &TransactionDetails> = HashMap::default();
+ for tx in txs {
+ for input in &tx.transaction.as_ref().unwrap().input {
+ utxo_index
+ .entry(input.previous_output)
+ .and_modify(|existing| match (tx.fee, existing.fee) {
+ (Some(fee), Some(existing_fee)) if fee > existing_fee => *existing = tx,
+ (Some(_), None) => *existing = tx,
+ _ => { /* leave it the same */ }
+ })
+ .or_insert(tx);
+ }
+ }
+
+ utxo_index
+ .into_iter()
+ .map(|(_, tx)| (tx.txid, tx))
+ .collect::<HashMap<_, _>>()
+ .into_iter()
+ .map(|(_, tx)| tx)
+ .collect()
+}
use crate::error::Error;
use crate::types::{ConfirmationTime, KeychainKind, LocalUtxo, TransactionDetails};
use crate::wallet::time::Instant;
-use crate::wallet::utils::ChunksIterator;
-
-#[derive(Debug)]
-pub struct ElsGetHistoryRes {
- pub height: i32,
- pub tx_hash: Txid,
-}
-
-/// Implements the synchronization logic for an Electrum-like client.
-#[maybe_async]
-pub trait ElectrumLikeSync {
- fn els_batch_script_get_history<'s, I: IntoIterator<Item = &'s Script> + Clone>(
- &self,
- scripts: I,
- ) -> Result<Vec<Vec<ElsGetHistoryRes>>, Error>;
-
- fn els_batch_transaction_get<'s, I: IntoIterator<Item = &'s Txid> + Clone>(
- &self,
- txids: I,
- ) -> Result<Vec<Transaction>, Error>;
-
- fn els_batch_block_header<I: IntoIterator<Item = u32> + Clone>(
- &self,
- heights: I,
- ) -> Result<Vec<BlockHeader>, Error>;
-
- // Provided methods down here...
-
- fn electrum_like_setup<D: BatchDatabase, P: Progress>(
- &self,
- stop_gap: usize,
- db: &mut D,
- _progress_update: P,
- ) -> Result<(), Error> {
- // TODO: progress
- let start = Instant::new();
- debug!("start setup");
-
- let chunk_size = stop_gap;
-
- let mut history_txs_id = HashSet::new();
- let mut txid_height = HashMap::new();
- let mut max_indexes = HashMap::new();
-
- let mut wallet_chains = vec![KeychainKind::Internal, KeychainKind::External];
- // shuffling improve privacy, the server doesn't know my first request is from my internal or external addresses
- wallet_chains.shuffle(&mut thread_rng());
- // download history of our internal and external script_pubkeys
- for keychain in wallet_chains.iter() {
- let script_iter = db.iter_script_pubkeys(Some(*keychain))?.into_iter();
-
- for (i, chunk) in ChunksIterator::new(script_iter, stop_gap).enumerate() {
- // TODO if i == last, should create another chunk of addresses in db
- let call_result: Vec<Vec<ElsGetHistoryRes>> =
- maybe_await!(self.els_batch_script_get_history(chunk.iter()))?;
- let max_index = call_result
- .iter()
- .enumerate()
- .filter_map(|(i, v)| v.first().map(|_| i as u32))
- .max();
- if let Some(max) = max_index {
- max_indexes.insert(keychain, max + (i * chunk_size) as u32);
- }
- let flattened: Vec<ElsGetHistoryRes> = call_result.into_iter().flatten().collect();
- debug!("#{} of {:?} results:{}", i, keychain, flattened.len());
- if flattened.is_empty() {
- // Didn't find anything in the last `stop_gap` script_pubkeys, breaking
- break;
- }
-
- for el in flattened {
- // el.height = -1 means unconfirmed with unconfirmed parents
- // el.height = 0 means unconfirmed with confirmed parents
- // but we treat those tx the same
- if el.height <= 0 {
- txid_height.insert(el.tx_hash, None);
- } else {
- txid_height.insert(el.tx_hash, Some(el.height as u32));
- }
- history_txs_id.insert(el.tx_hash);
- }
- }
- }
-
- // saving max indexes
- info!("max indexes are: {:?}", max_indexes);
- for keychain in wallet_chains.iter() {
- if let Some(index) = max_indexes.get(keychain) {
- db.set_last_index(*keychain, *index)?;
- }
- }
-
- // get db status
- let txs_details_in_db: HashMap<Txid, TransactionDetails> = db
- .iter_txs(false)?
- .into_iter()
- .map(|tx| (tx.txid, tx))
- .collect();
- let txs_raw_in_db: HashMap<Txid, Transaction> = db
- .iter_raw_txs()?
- .into_iter()
- .map(|tx| (tx.txid(), tx))
- .collect();
- let utxos_deps = utxos_deps(db, &txs_raw_in_db)?;
-
- // download new txs and headers
- let new_txs = maybe_await!(self.download_and_save_needed_raw_txs(
- &history_txs_id,
- &txs_raw_in_db,
- chunk_size,
- db
- ))?;
- let new_timestamps = maybe_await!(self.download_needed_headers(
- &txid_height,
- &txs_details_in_db,
- chunk_size
- ))?;
-
- let mut batch = db.begin_batch();
-
- // save any tx details not in db but in history_txs_id or with different height/timestamp
- for txid in history_txs_id.iter() {
- let height = txid_height.get(txid).cloned().flatten();
- let timestamp = new_timestamps.get(txid).cloned();
- if let Some(tx_details) = txs_details_in_db.get(txid) {
- // check if tx height matches, otherwise updates it. timestamp is not in the if clause
- // because we are not asking headers for confirmed tx we know about
- if tx_details.confirmation_time.as_ref().map(|c| c.height) != height {
- let confirmation_time = ConfirmationTime::new(height, timestamp);
- let mut new_tx_details = tx_details.clone();
- new_tx_details.confirmation_time = confirmation_time;
- batch.set_tx(&new_tx_details)?;
- }
- } else {
- save_transaction_details_and_utxos(
- txid,
- db,
- timestamp,
- height,
- &mut batch,
- &utxos_deps,
- )?;
- }
- }
-
- // remove any tx details in db but not in history_txs_id
- for txid in txs_details_in_db.keys() {
- if !history_txs_id.contains(txid) {
- batch.del_tx(txid, false)?;
- }
- }
-
- // remove any spent utxo
- for new_tx in new_txs.iter() {
- for input in new_tx.input.iter() {
- batch.del_utxo(&input.previous_output)?;
- }
- }
-
- db.commit_batch(batch)?;
- info!("finish setup, elapsed {:?}ms", start.elapsed().as_millis());
-
- Ok(())
- }
-
- /// download txs identified by `history_txs_id` and theirs previous outputs if not already present in db
- fn download_and_save_needed_raw_txs<D: BatchDatabase>(
- &self,
- history_txs_id: &HashSet<Txid>,
- txs_raw_in_db: &HashMap<Txid, Transaction>,
- chunk_size: usize,
- db: &mut D,
- ) -> Result<Vec<Transaction>, Error> {
- let mut txs_downloaded = vec![];
- let txids_raw_in_db: HashSet<Txid> = txs_raw_in_db.keys().cloned().collect();
- let txids_to_download: Vec<&Txid> = history_txs_id.difference(&txids_raw_in_db).collect();
- if !txids_to_download.is_empty() {
- info!("got {} txs to download", txids_to_download.len());
- txs_downloaded.extend(maybe_await!(self.download_and_save_in_chunks(
- txids_to_download,
- chunk_size,
- db,
- ))?);
- let mut prev_txids = HashSet::new();
- let mut txids_downloaded = HashSet::new();
- for tx in txs_downloaded.iter() {
- txids_downloaded.insert(tx.txid());
- // add every previous input tx, but skip coinbase
- for input in tx.input.iter().filter(|i| !i.previous_output.is_null()) {
- prev_txids.insert(input.previous_output.txid);
- }
- }
- let already_present: HashSet<Txid> =
- txids_downloaded.union(&txids_raw_in_db).cloned().collect();
- let prev_txs_to_download: Vec<&Txid> =
- prev_txids.difference(&already_present).collect();
- info!("{} previous txs to download", prev_txs_to_download.len());
- txs_downloaded.extend(maybe_await!(self.download_and_save_in_chunks(
- prev_txs_to_download,
- chunk_size,
- db,
- ))?);
- }
-
- Ok(txs_downloaded)
- }
-
- /// download headers at heights in `txid_height` if tx details not already present, returns a map Txid -> timestamp
- fn download_needed_headers(
- &self,
- txid_height: &HashMap<Txid, Option<u32>>,
- txs_details_in_db: &HashMap<Txid, TransactionDetails>,
- chunk_size: usize,
- ) -> Result<HashMap<Txid, u64>, Error> {
- let mut txid_timestamp = HashMap::new();
- let txid_in_db_with_conf: HashSet<_> = txs_details_in_db
- .values()
- .filter_map(|details| details.confirmation_time.as_ref().map(|_| details.txid))
- .collect();
- let needed_txid_height: HashMap<&Txid, u32> = txid_height
- .iter()
- .filter(|(t, _)| !txid_in_db_with_conf.contains(*t))
- .filter_map(|(t, o)| o.map(|h| (t, h)))
- .collect();
- let needed_heights: HashSet<u32> = needed_txid_height.values().cloned().collect();
- if !needed_heights.is_empty() {
- info!("{} headers to download for timestamp", needed_heights.len());
- let mut height_timestamp: HashMap<u32, u64> = HashMap::new();
- for chunk in ChunksIterator::new(needed_heights.into_iter(), chunk_size) {
- let call_result: Vec<BlockHeader> =
- maybe_await!(self.els_batch_block_header(chunk.clone()))?;
- height_timestamp.extend(
- chunk
- .into_iter()
- .zip(call_result.iter().map(|h| h.time as u64)),
- );
- }
- for (txid, height) in needed_txid_height {
- let timestamp = height_timestamp
- .get(&height)
- .ok_or_else(|| Error::Generic("timestamp missing".to_string()))?;
- txid_timestamp.insert(*txid, *timestamp);
- }
- }
-
- Ok(txid_timestamp)
- }
-
- fn download_and_save_in_chunks<D: BatchDatabase>(
- &self,
- to_download: Vec<&Txid>,
- chunk_size: usize,
- db: &mut D,
- ) -> Result<Vec<Transaction>, Error> {
- let mut txs_downloaded = vec![];
- for chunk in ChunksIterator::new(to_download.into_iter(), chunk_size) {
- let call_result: Vec<Transaction> =
- maybe_await!(self.els_batch_transaction_get(chunk))?;
- let mut batch = db.begin_batch();
- for new_tx in call_result.iter() {
- batch.set_raw_tx(new_tx)?;
- }
- db.commit_batch(batch)?;
- txs_downloaded.extend(call_result);
- }
-
- Ok(txs_downloaded)
- }
-}
-
-fn save_transaction_details_and_utxos<D: BatchDatabase>(
- txid: &Txid,
- db: &mut D,
- timestamp: Option<u64>,
- height: Option<u32>,
- updates: &mut dyn BatchOperations,
- utxo_deps: &HashMap<OutPoint, OutPoint>,
-) -> Result<(), Error> {
- let tx = db.get_raw_tx(txid)?.ok_or(Error::TransactionNotFound)?;
-
- let mut incoming: u64 = 0;
- let mut outgoing: u64 = 0;
-
- let mut inputs_sum: u64 = 0;
- let mut outputs_sum: u64 = 0;
-
- // look for our own inputs
- for input in tx.input.iter() {
- // skip coinbase inputs
- if input.previous_output.is_null() {
- continue;
- }
-
- // We already downloaded all previous output txs in the previous step
- if let Some(previous_output) = db.get_previous_output(&input.previous_output)? {
- inputs_sum += previous_output.value;
-
- if db.is_mine(&previous_output.script_pubkey)? {
- outgoing += previous_output.value;
- }
- } else {
- // The input is not ours, but we still need to count it for the fees
- let tx = db
- .get_raw_tx(&input.previous_output.txid)?
- .ok_or(Error::TransactionNotFound)?;
- inputs_sum += tx.output[input.previous_output.vout as usize].value;
- }
-
- // removes conflicting UTXO if any (generated from same inputs, like for example RBF)
- if let Some(outpoint) = utxo_deps.get(&input.previous_output) {
- updates.del_utxo(outpoint)?;
- }
- }
-
- for (i, output) in tx.output.iter().enumerate() {
- // to compute the fees later
- outputs_sum += output.value;
-
- // this output is ours, we have a path to derive it
- if let Some((keychain, _child)) = db.get_path_from_script_pubkey(&output.script_pubkey)? {
- debug!("{} output #{} is mine, adding utxo", txid, i);
- updates.set_utxo(&LocalUtxo {
- outpoint: OutPoint::new(tx.txid(), i as u32),
- txout: output.clone(),
- keychain,
- })?;
-
- incoming += output.value;
- }
- }
-
- let tx_details = TransactionDetails {
- txid: tx.txid(),
- transaction: Some(tx),
- received: incoming,
- sent: outgoing,
- confirmation_time: ConfirmationTime::new(height, timestamp),
- fee: Some(inputs_sum.saturating_sub(outputs_sum)), /* if the tx is a coinbase, fees would be negative */
- verified: height.is_some(),
- };
- updates.set_tx(&tx_details)?;
-
- Ok(())
-}
-
-/// returns utxo dependency as the inputs needed for the utxo to exist
-/// `tx_raw_in_db` must contains utxo's generating txs or errors with [crate::Error::TransactionNotFound]
-fn utxos_deps<D: BatchDatabase>(
- db: &mut D,
- tx_raw_in_db: &HashMap<Txid, Transaction>,
-) -> Result<HashMap<OutPoint, OutPoint>, Error> {
- let utxos = db.iter_utxos()?;
- let mut utxos_deps = HashMap::new();
- for utxo in utxos {
- let from_tx = tx_raw_in_db
- .get(&utxo.outpoint.txid)
- .ok_or(Error::TransactionNotFound)?;
- for input in from_tx.input.iter() {
- utxos_deps.insert(input.previous_output, utxo.outpoint);
- }
- }
- Ok(utxos_deps)
-}
assert_eq!(wallet.list_unspent().unwrap().len(), 1, "incorrect number of unspents");
}
+ /// Send two conflicting transactions to the same address twice in a row.
+ /// The coins should only be received once!
+ #[test]
+ fn test_sync_double_receive() {
+ let (wallet, descriptors, mut test_client) = init_single_sig();
+ let receiver_wallet = get_wallet_from_descriptors(&("wpkh(cVpPVruEDdmutPzisEsYvtST1usBR3ntr8pXSyt6D2YYqXRyPcFW)".to_string(), None), &test_client);
+ // need to sync so rpc can start watching
+ receiver_wallet.sync(noop_progress(), None).unwrap();
+
+ test_client.receive(testutils! {
+ @tx ( (@external descriptors, 0) => 50_000, (@external descriptors, 1) => 25_000 ) (@confirmations 1)
+ });
+
+ wallet.sync(noop_progress(), None).unwrap();
+ assert_eq!(wallet.get_balance().unwrap(), 75_000, "incorrect balance");
+ let target_addr = receiver_wallet.get_address($crate::wallet::AddressIndex::New).unwrap().address;
+
+ let tx1 = {
+ let mut builder = wallet.build_tx();
+ builder.add_recipient(target_addr.script_pubkey(), 49_000).enable_rbf();
+ let (mut psbt, _details) = builder.finish().unwrap();
+ let finalized = wallet.sign(&mut psbt, Default::default()).unwrap();
+ assert!(finalized, "Cannot finalize transaction");
+ psbt.extract_tx()
+ };
+
+ let tx2 = {
+ let mut builder = wallet.build_tx();
+ builder.add_recipient(target_addr.script_pubkey(), 49_000).enable_rbf().fee_rate(FeeRate::from_sat_per_vb(5.0));
+ let (mut psbt, _details) = builder.finish().unwrap();
+ let finalized = wallet.sign(&mut psbt, Default::default()).unwrap();
+ assert!(finalized, "Cannot finalize transaction");
+ psbt.extract_tx()
+ };
+
+ wallet.broadcast(&tx1).unwrap();
+ wallet.broadcast(&tx2).unwrap();
+
+ receiver_wallet.sync(noop_progress(), None).unwrap();
+ assert_eq!(receiver_wallet.get_balance().unwrap(), 49_000, "should have received coins once and only once");
+ }
+
+ #[test]
+ fn test_sync_many_sends_to_a_single_address() {
+ let (wallet, descriptors, mut test_client) = init_single_sig();
+
+ for _ in 0..4 {
+ // split this up into multiple blocks so rpc doesn't get angry
+ for _ in 0..20 {
+ test_client.receive(testutils! {
+ @tx ( (@external descriptors, 0) => 1_000 )
+ });
+ }
+ test_client.generate(1, None);
+ }
+
+ // add some to the mempool as well.
+ for _ in 0..20 {
+ test_client.receive(testutils! {
+ @tx ( (@external descriptors, 0) => 1_000 )
+ });
+ }
+
+ wallet.sync(noop_progress(), None).unwrap();
+
+ assert_eq!(wallet.get_balance().unwrap(), 100_000);
+ }
+
#[test]
fn test_update_confirmation_time_after_generate() {
let (wallet, descriptors, mut test_client) = init_single_sig();
pub(crate) type SecpCtx = Secp256k1<All>;
-pub struct ChunksIterator<I: Iterator> {
- iter: I,
- size: usize,
-}
-
-#[cfg(any(feature = "electrum", feature = "esplora"))]
-impl<I: Iterator> ChunksIterator<I> {
- pub fn new(iter: I, size: usize) -> Self {
- ChunksIterator { iter, size }
- }
-}
-
-impl<I: Iterator> Iterator for ChunksIterator<I> {
- type Item = Vec<<I as std::iter::Iterator>::Item>;
-
- fn next(&mut self) -> Option<Self::Item> {
- let mut v = Vec::new();
- for _ in 0..self.size {
- let e = self.iter.next();
-
- match e {
- None => break,
- Some(val) => v.push(val),
- }
- }
-
- if v.is_empty() {
- return None;
- }
-
- Some(v)
- }
-}
-
#[cfg(test)]
mod test {
use super::{