]> Untitled Git - bdk/commitdiff
Invert dependencies in electrum sync
authorLLFourn <lloyd.fourn@gmail.com>
Fri, 29 Oct 2021 06:41:02 +0000 (17:41 +1100)
committerLLFourn <lloyd.fourn@gmail.com>
Tue, 9 Nov 2021 22:06:49 +0000 (09:06 +1100)
Blockchain calls sync logic rather than the other way around.
Sync logic is captured in script_sync.rs.

src/blockchain/electrum.rs
src/blockchain/esplora/api.rs [new file with mode: 0644]
src/blockchain/esplora/mod.rs
src/blockchain/esplora/reqwest.rs
src/blockchain/esplora/ureq.rs
src/blockchain/mod.rs
src/blockchain/script_sync.rs [new file with mode: 0644]
src/blockchain/utils.rs
src/testutils/blockchain_tests.rs
src/wallet/utils.rs

index 53d4dabb92b40bc6f2ed66bde5b3de90df50a539..f0c2642563e6d22bd09d05d820926a841ffdad74 100644 (file)
 //! # Ok::<(), bdk::Error>(())
 //! ```
 
-use std::collections::HashSet;
+use std::collections::{HashMap, HashSet};
 
 #[allow(unused_imports)]
 use log::{debug, error, info, trace};
 
-use bitcoin::{BlockHeader, Script, Transaction, Txid};
+use bitcoin::{Transaction, Txid};
 
 use electrum_client::{Client, ConfigBuilder, ElectrumApi, Socks5Config};
 
-use self::utils::{ElectrumLikeSync, ElsGetHistoryRes};
+use super::script_sync::Request;
 use super::*;
-use crate::database::BatchDatabase;
+use crate::database::{BatchDatabase, Database};
 use crate::error::Error;
-use crate::FeeRate;
+use crate::{ConfirmationTime, FeeRate};
 
 /// Wrapper over an Electrum Client that implements the required blockchain traits
 ///
@@ -71,10 +71,151 @@ impl Blockchain for ElectrumBlockchain {
     fn setup<D: BatchDatabase, P: Progress>(
         &self,
         database: &mut D,
-        progress_update: P,
+        _progress_update: P,
     ) -> Result<(), Error> {
-        self.client
-            .electrum_like_setup(self.stop_gap, database, progress_update)
+        let mut request = script_sync::start(database, self.stop_gap)?;
+        let mut block_times = HashMap::<u32, u32>::new();
+        let mut txid_to_height = HashMap::<Txid, u32>::new();
+        let mut tx_cache = TxCache::new(database, &self.client);
+        let chunk_size = self.stop_gap;
+        // The electrum server has been inconsistent somehow in its responses during sync. For
+        // example, we do a batch request of transactions and the response contains less
+        // tranascations than in the request. This should never happen but we don't want to panic.
+        let electrum_goof = || Error::Generic("electrum server misbehaving".to_string());
+
+        let batch_update = loop {
+            request = match request {
+                Request::Script(script_req) => {
+                    let scripts = script_req.request().take(chunk_size);
+                    let txids_per_script: Vec<Vec<_>> = self
+                        .client
+                        .batch_script_get_history(scripts)
+                        .map_err(Error::Electrum)?
+                        .into_iter()
+                        .map(|txs| {
+                            txs.into_iter()
+                                .map(|tx| {
+                                    let tx_height = match tx.height {
+                                        none if none <= 0 => None,
+                                        height => {
+                                            txid_to_height.insert(tx.tx_hash, height as u32);
+                                            Some(height as u32)
+                                        }
+                                    };
+                                    (tx.tx_hash, tx_height)
+                                })
+                                .collect()
+                        })
+                        .collect();
+
+                    script_req.satisfy(txids_per_script)?
+                }
+
+                Request::Conftime(conftimereq) => {
+                    let needs_block_height = conftimereq
+                        .request()
+                        .filter_map(|txid| txid_to_height.get(txid).cloned())
+                        .filter(|height| block_times.get(height).is_none())
+                        .take(chunk_size)
+                        .collect::<HashSet<_>>();
+
+                    let new_block_headers =
+                        self.client.batch_block_header(needs_block_height.clone())?;
+
+                    for (height, header) in needs_block_height.into_iter().zip(new_block_headers) {
+                        block_times.insert(height, header.time);
+                    }
+
+                    let conftimes = conftimereq
+                        .request()
+                        .take(chunk_size)
+                        .map(|txid| {
+                            let confirmation_time = txid_to_height
+                                .get(txid)
+                                .map(|height| {
+                                    let timestamp =
+                                        *block_times.get(height).ok_or_else(electrum_goof)?;
+                                    Result::<_, Error>::Ok(ConfirmationTime {
+                                        height: *height,
+                                        timestamp: timestamp.into(),
+                                    })
+                                })
+                                .transpose()?;
+                            Ok(confirmation_time)
+                        })
+                        .collect::<Result<_, Error>>()?;
+
+                    conftimereq.satisfy(conftimes)?
+                }
+                Request::Tx(txreq) => {
+                    let needs_block_height = txreq
+                        .request()
+                        .filter_map(|txid| txid_to_height.get(txid).cloned())
+                        .filter(|height| block_times.get(height).is_none())
+                        .take(chunk_size)
+                        .collect::<HashSet<_>>();
+
+                    let new_block_headers =
+                        self.client.batch_block_header(needs_block_height.clone())?;
+                    for (height, header) in needs_block_height.into_iter().zip(new_block_headers) {
+                        block_times.insert(height, header.time);
+                    }
+                    let needs_full = txreq.request().take(chunk_size);
+
+                    tx_cache.save_txs(needs_full.clone())?;
+                    let full_transactions = needs_full
+                        .map(|txid| tx_cache.get(*txid).ok_or_else(electrum_goof))
+                        .collect::<Result<Vec<_>, _>>()?;
+                    let input_txs = full_transactions.iter().flat_map(|tx| {
+                        tx.input
+                            .iter()
+                            .filter(|input| !input.previous_output.is_null())
+                            .map(|input| &input.previous_output.txid)
+                    });
+                    tx_cache.save_txs(input_txs)?;
+
+                    let full_details = full_transactions
+                        .into_iter()
+                        .map(|tx| {
+                            let confirmation_time = txid_to_height
+                                .get(&tx.txid())
+                                .map(|height| {
+                                    let time = block_times.get(height).ok_or_else(electrum_goof)?;
+                                    Result::<_, Error>::Ok(ConfirmationTime {
+                                        height: *height,
+                                        timestamp: *time as u64,
+                                    })
+                                })
+                                .transpose()?;
+                            let prev_outputs = tx
+                                .input
+                                .iter()
+                                .map(|input| {
+                                    if input.previous_output.is_null() {
+                                        return Ok(None);
+                                    }
+                                    let prev_tx = tx_cache
+                                        .get(input.previous_output.txid)
+                                        .ok_or_else(electrum_goof)?;
+                                    let txout = prev_tx
+                                        .output
+                                        .get(input.previous_output.vout as usize)
+                                        .ok_or_else(electrum_goof)?;
+                                    Ok(Some(txout.clone()))
+                                })
+                                .collect::<Result<Vec<_>, Error>>()?;
+                            Ok((confirmation_time, prev_outputs, tx))
+                        })
+                        .collect::<Result<Vec<_>, Error>>()?;
+
+                    txreq.satisfy(full_details)?
+                }
+                Request::Finish(batch_update) => break batch_update,
+            }
+        };
+
+        database.commit_batch(batch_update)?;
+        Ok(())
     }
 
     fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
@@ -101,43 +242,48 @@ impl Blockchain for ElectrumBlockchain {
     }
 }
 
-impl ElectrumLikeSync for Client {
-    fn els_batch_script_get_history<'s, I: IntoIterator<Item = &'s Script> + Clone>(
-        &self,
-        scripts: I,
-    ) -> Result<Vec<Vec<ElsGetHistoryRes>>, Error> {
-        self.batch_script_get_history(scripts)
-            .map(|v| {
-                v.into_iter()
-                    .map(|v| {
-                        v.into_iter()
-                            .map(
-                                |electrum_client::GetHistoryRes {
-                                     height, tx_hash, ..
-                                 }| ElsGetHistoryRes {
-                                    height,
-                                    tx_hash,
-                                },
-                            )
-                            .collect()
-                    })
-                    .collect()
-            })
-            .map_err(Error::Electrum)
+struct TxCache<'a, 'b, D> {
+    db: &'a D,
+    client: &'b Client,
+    cache: HashMap<Txid, Transaction>,
+}
+
+impl<'a, 'b, D: Database> TxCache<'a, 'b, D> {
+    fn new(db: &'a D, client: &'b Client) -> Self {
+        TxCache {
+            db,
+            client,
+            cache: HashMap::default(),
+        }
     }
+    fn save_txs<'c>(&mut self, txids: impl Iterator<Item = &'c Txid>) -> Result<(), Error> {
+        let mut need_fetch = vec![];
+        for txid in txids {
+            if self.cache.get(txid).is_some() {
+                continue;
+            } else if let Some(transaction) = self.db.get_raw_tx(txid)? {
+                self.cache.insert(*txid, transaction);
+            } else {
+                need_fetch.push(txid);
+            }
+        }
 
-    fn els_batch_transaction_get<'s, I: IntoIterator<Item = &'s Txid> + Clone>(
-        &self,
-        txids: I,
-    ) -> Result<Vec<Transaction>, Error> {
-        self.batch_transaction_get(txids).map_err(Error::Electrum)
+        if !need_fetch.is_empty() {
+            let txs = self
+                .client
+                .batch_transaction_get(need_fetch.clone())
+                .map_err(Error::Electrum)?;
+            for (tx, _txid) in txs.into_iter().zip(need_fetch) {
+                debug_assert_eq!(*_txid, tx.txid());
+                self.cache.insert(tx.txid(), tx);
+            }
+        }
+
+        Ok(())
     }
 
-    fn els_batch_block_header<I: IntoIterator<Item = u32> + Clone>(
-        &self,
-        heights: I,
-    ) -> Result<Vec<BlockHeader>, Error> {
-        self.batch_block_header(heights).map_err(Error::Electrum)
+    fn get(&self, txid: Txid) -> Option<Transaction> {
+        self.cache.get(&txid).map(Clone::clone)
     }
 }
 
diff --git a/src/blockchain/esplora/api.rs b/src/blockchain/esplora/api.rs
new file mode 100644 (file)
index 0000000..74c46c8
--- /dev/null
@@ -0,0 +1,117 @@
+//! structs from the esplora API
+//!
+//! see: <https://github.com/Blockstream/esplora/blob/master/API.md>
+use crate::ConfirmationTime;
+use bitcoin::{OutPoint, Script, Transaction, TxIn, TxOut, Txid};
+
+#[derive(serde::Deserialize, Clone, Debug)]
+pub struct PrevOut {
+    pub value: u64,
+    pub scriptpubkey: Script,
+}
+
+#[derive(serde::Deserialize, Clone, Debug)]
+pub struct Vin {
+    pub txid: Txid,
+    pub vout: u32,
+    // None if coinbase
+    pub prevout: Option<PrevOut>,
+    pub scriptsig: Script,
+    #[serde(deserialize_with = "deserialize_witness")]
+    pub witness: Vec<Vec<u8>>,
+    pub sequence: u32,
+    pub is_coinbase: bool,
+}
+
+#[derive(serde::Deserialize, Clone, Debug)]
+pub struct Vout {
+    pub value: u64,
+    pub scriptpubkey: Script,
+}
+
+#[derive(serde::Deserialize, Clone, Debug)]
+pub struct TxStatus {
+    pub confirmed: bool,
+    pub block_height: Option<u32>,
+    pub block_time: Option<u64>,
+}
+
+#[derive(serde::Deserialize, Clone, Debug)]
+pub struct Tx {
+    pub txid: Txid,
+    pub version: i32,
+    pub locktime: u32,
+    pub vin: Vec<Vin>,
+    pub vout: Vec<Vout>,
+    pub status: TxStatus,
+    pub fee: u64,
+}
+
+impl Tx {
+    pub fn to_tx(&self) -> Transaction {
+        Transaction {
+            version: self.version,
+            lock_time: self.locktime,
+            input: self
+                .vin
+                .iter()
+                .cloned()
+                .map(|vin| TxIn {
+                    previous_output: OutPoint {
+                        txid: vin.txid,
+                        vout: vin.vout,
+                    },
+                    script_sig: vin.scriptsig,
+                    sequence: vin.sequence,
+                    witness: vin.witness,
+                })
+                .collect(),
+            output: self
+                .vout
+                .iter()
+                .cloned()
+                .map(|vout| TxOut {
+                    value: vout.value,
+                    script_pubkey: vout.scriptpubkey,
+                })
+                .collect(),
+        }
+    }
+
+    pub fn confirmation_time(&self) -> Option<ConfirmationTime> {
+        match self.status {
+            TxStatus {
+                confirmed: true,
+                block_height: Some(height),
+                block_time: Some(timestamp),
+            } => Some(ConfirmationTime { timestamp, height }),
+            _ => None,
+        }
+    }
+
+    pub fn previous_outputs(&self) -> Vec<Option<TxOut>> {
+        self.vin
+            .iter()
+            .cloned()
+            .map(|vin| {
+                vin.prevout.map(|po| TxOut {
+                    script_pubkey: po.scriptpubkey,
+                    value: po.value,
+                })
+            })
+            .collect()
+    }
+}
+
+fn deserialize_witness<'de, D>(d: D) -> Result<Vec<Vec<u8>>, D::Error>
+where
+    D: serde::de::Deserializer<'de>,
+{
+    use crate::serde::Deserialize;
+    use bitcoin::hashes::hex::FromHex;
+    let list = Vec::<String>::deserialize(d)?;
+    list.into_iter()
+        .map(|hex_str| Vec::<u8>::from_hex(&hex_str))
+        .collect::<Result<Vec<Vec<u8>>, _>>()
+        .map_err(serde::de::Error::custom)
+}
index 921a1e62c116b81e9856efb8136b580043fe5f78..d4a217b5414d4b80e0ecd58f4b6679c0e1dccdfd 100644 (file)
@@ -21,8 +21,6 @@ use std::collections::HashMap;
 use std::fmt;
 use std::io;
 
-use serde::Deserialize;
-
 use bitcoin::consensus;
 use bitcoin::{BlockHash, Txid};
 
@@ -41,6 +39,8 @@ mod ureq;
 #[cfg(feature = "ureq")]
 pub use self::ureq::*;
 
+mod api;
+
 fn into_fee_rate(target: usize, estimates: HashMap<String, f64>) -> Result<FeeRate, Error> {
     let fee_val = estimates
         .into_iter()
@@ -56,18 +56,6 @@ fn into_fee_rate(target: usize, estimates: HashMap<String, f64>) -> Result<FeeRa
     Ok(FeeRate::from_sat_per_vb(fee_val as f32))
 }
 
-/// Data type used when fetching transaction history from Esplora.
-#[derive(Deserialize)]
-pub struct EsploraGetHistory {
-    txid: Txid,
-    status: EsploraGetHistoryStatus,
-}
-
-#[derive(Deserialize)]
-struct EsploraGetHistoryStatus {
-    block_height: Option<usize>,
-}
-
 /// Errors that can happen during a sync with [`EsploraBlockchain`]
 #[derive(Debug)]
 pub enum EsploraError {
@@ -107,10 +95,50 @@ impl fmt::Display for EsploraError {
     }
 }
 
+/// Configuration for an [`EsploraBlockchain`]
+#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq)]
+pub struct EsploraBlockchainConfig {
+    /// Base URL of the esplora service
+    ///
+    /// eg. `https://blockstream.info/api/`
+    pub base_url: String,
+    /// Optional URL of the proxy to use to make requests to the Esplora server
+    ///
+    /// The string should be formatted as: `<protocol>://<user>:<password>@host:<port>`.
+    ///
+    /// Note that the format of this value and the supported protocols change slightly between the
+    /// sync version of esplora (using `ureq`) and the async version (using `reqwest`). For more
+    /// details check with the documentation of the two crates. Both of them are compiled with
+    /// the `socks` feature enabled.
+    ///
+    /// The proxy is ignored when targeting `wasm32`.
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub proxy: Option<String>,
+    /// Number of parallel requests sent to the esplora service (default: 4)
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub concurrency: Option<u8>,
+    /// Stop searching addresses for transactions after finding an unused gap of this length.
+    pub stop_gap: usize,
+    /// Socket timeout.
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub timeout: Option<u64>,
+}
+
+impl EsploraBlockchainConfig {
+    /// create a config with default values given the base url and stop gap
+    pub fn new(base_url: String) -> Self {
+        Self {
+            base_url,
+            proxy: None,
+            timeout: None,
+            stop_gap: 20,
+            concurrency: None,
+        }
+    }
+}
+
 impl std::error::Error for EsploraError {}
 
-#[cfg(feature = "ureq")]
-impl_error!(::ureq::Error, Ureq, EsploraError);
 #[cfg(feature = "ureq")]
 impl_error!(::ureq::Transport, UreqTransport, EsploraError);
 #[cfg(feature = "reqwest")]
@@ -127,3 +155,5 @@ crate::bdk_blockchain_tests! {
         EsploraBlockchain::new(&format!("http://{}",test_client.electrsd.esplora_url.as_ref().unwrap()), 20)
     }
 }
+
+const DEFAULT_CONCURRENT_REQUESTS: u8 = 4;
index 6e9c7aad908b86ac33cb7bac19934c6387a29e32..a6024adbc301dc4db365e311981df31a08117117 100644 (file)
@@ -21,20 +21,16 @@ use bitcoin::{BlockHeader, Script, Transaction, Txid};
 #[allow(unused_imports)]
 use log::{debug, error, info, trace};
 
-use futures::stream::{self, FuturesOrdered, StreamExt, TryStreamExt};
-
 use ::reqwest::{Client, StatusCode};
+use futures::stream::{FuturesOrdered, TryStreamExt};
 
-use crate::blockchain::esplora::{EsploraError, EsploraGetHistory};
-use crate::blockchain::utils::{ElectrumLikeSync, ElsGetHistoryRes};
+use super::api::Tx;
+use crate::blockchain::esplora::EsploraError;
 use crate::blockchain::*;
 use crate::database::BatchDatabase;
 use crate::error::Error;
-use crate::wallet::utils::ChunksIterator;
 use crate::FeeRate;
 
-const DEFAULT_CONCURRENT_REQUESTS: u8 = 4;
-
 #[derive(Debug)]
 struct UrlClient {
     url: String,
@@ -70,7 +66,7 @@ impl EsploraBlockchain {
             url_client: UrlClient {
                 url: base_url.to_string(),
                 client: Client::new(),
-                concurrency: DEFAULT_CONCURRENT_REQUESTS,
+                concurrency: super::DEFAULT_CONCURRENT_REQUESTS,
             },
             stop_gap,
         }
@@ -98,11 +94,91 @@ impl Blockchain for EsploraBlockchain {
     fn setup<D: BatchDatabase, P: Progress>(
         &self,
         database: &mut D,
-        progress_update: P,
+        _progress_update: P,
     ) -> Result<(), Error> {
-        maybe_await!(self
-            .url_client
-            .electrum_like_setup(self.stop_gap, database, progress_update))
+        use crate::blockchain::script_sync::Request;
+        let mut request = script_sync::start(database, self.stop_gap)?;
+        let mut tx_index: HashMap<Txid, Tx> = HashMap::new();
+
+        let batch_update = loop {
+            request = match request {
+                Request::Script(script_req) => {
+                    let futures: FuturesOrdered<_> = script_req
+                        .request()
+                        .take(self.url_client.concurrency as usize)
+                        .map(|script| async move {
+                            let mut related_txs: Vec<Tx> =
+                                self.url_client._scripthash_txs(script, None).await?;
+
+                            let n_confirmed =
+                                related_txs.iter().filter(|tx| tx.status.confirmed).count();
+                            // esplora pages on 25 confirmed transactions. If there's more than
+                            // 25 we need to keep requesting.
+                            if n_confirmed >= 25 {
+                                loop {
+                                    let new_related_txs: Vec<Tx> = self
+                                        .url_client
+                                        ._scripthash_txs(
+                                            script,
+                                            Some(related_txs.last().unwrap().txid),
+                                        )
+                                        .await?;
+                                    let n = new_related_txs.len();
+                                    related_txs.extend(new_related_txs);
+                                    // we've reached the end
+                                    if n < 25 {
+                                        break;
+                                    }
+                                }
+                            }
+                            Result::<_, Error>::Ok(related_txs)
+                        })
+                        .collect();
+                    let txs_per_script: Vec<Vec<Tx>> = await_or_block!(futures.try_collect())?;
+                    let mut satisfaction = vec![];
+
+                    for txs in txs_per_script {
+                        satisfaction.push(
+                            txs.iter()
+                                .map(|tx| (tx.txid, tx.status.block_height))
+                                .collect(),
+                        );
+                        for tx in txs {
+                            tx_index.insert(tx.txid, tx);
+                        }
+                    }
+
+                    script_req.satisfy(satisfaction)?
+                }
+                Request::Conftime(conftimereq) => {
+                    let conftimes = conftimereq
+                        .request()
+                        .map(|txid| {
+                            tx_index
+                                .get(txid)
+                                .expect("must be in index")
+                                .confirmation_time()
+                        })
+                        .collect();
+                    conftimereq.satisfy(conftimes)?
+                }
+                Request::Tx(txreq) => {
+                    let full_txs = txreq
+                        .request()
+                        .map(|txid| {
+                            let tx = tx_index.get(txid).expect("must be in index");
+                            (tx.confirmation_time(), tx.previous_outputs(), tx.to_tx())
+                        })
+                        .collect();
+                    txreq.satisfy(full_txs)?
+                }
+                Request::Finish(batch_update) => break batch_update,
+            }
+        };
+
+        database.commit_batch(batch_update)?;
+
+        Ok(())
     }
 
     fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
@@ -124,10 +200,6 @@ impl Blockchain for EsploraBlockchain {
 }
 
 impl UrlClient {
-    fn script_to_scripthash(script: &Script) -> String {
-        sha256::Hash::hash(script.as_bytes()).into_inner().to_hex()
-    }
-
     async fn _get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, EsploraError> {
         let resp = self
             .client
@@ -196,71 +268,27 @@ impl UrlClient {
         Ok(req.error_for_status()?.text().await?.parse()?)
     }
 
-    async fn _script_get_history(
+    async fn _scripthash_txs(
         &self,
         script: &Script,
-    ) -> Result<Vec<ElsGetHistoryRes>, EsploraError> {
-        let mut result = Vec::new();
-        let scripthash = Self::script_to_scripthash(script);
-
-        // Add the unconfirmed transactions first
-        result.extend(
-            self.client
-                .get(&format!(
-                    "{}/scripthash/{}/txs/mempool",
-                    self.url, scripthash
-                ))
-                .send()
-                .await?
-                .error_for_status()?
-                .json::<Vec<EsploraGetHistory>>()
-                .await?
-                .into_iter()
-                .map(|x| ElsGetHistoryRes {
-                    tx_hash: x.txid,
-                    height: x.status.block_height.unwrap_or(0) as i32,
-                }),
-        );
-
-        debug!(
-            "Found {} mempool txs for {} - {:?}",
-            result.len(),
-            scripthash,
-            script
-        );
-
-        // Then go through all the pages of confirmed transactions
-        let mut last_txid = String::new();
-        loop {
-            let response = self
-                .client
-                .get(&format!(
-                    "{}/scripthash/{}/txs/chain/{}",
-                    self.url, scripthash, last_txid
-                ))
-                .send()
-                .await?
-                .error_for_status()?
-                .json::<Vec<EsploraGetHistory>>()
-                .await?;
-            let len = response.len();
-            if let Some(elem) = response.last() {
-                last_txid = elem.txid.to_hex();
-            }
-
-            debug!("... adding {} confirmed transactions", len);
-
-            result.extend(response.into_iter().map(|x| ElsGetHistoryRes {
-                tx_hash: x.txid,
-                height: x.status.block_height.unwrap_or(0) as i32,
-            }));
-
-            if len < 25 {
-                break;
-            }
-        }
-
-        Ok(result)
+        last_seen: Option<Txid>,
+    ) -> Result<Vec<Tx>, EsploraError> {
+        let script_hash = sha256::Hash::hash(script.as_bytes()).into_inner().to_hex();
+        let url = match last_seen {
+            Some(last_seen) => format!(
+                "{}/scripthash/{}/txs/chain/{}",
+                self.url, script_hash, last_seen
+            ),
+            None => format!("{}/scripthash/{}/txs", self.url, script_hash),
+        };
+        Ok(self
+            .client
+            .get(url)
+            .send()
+            .await?
+            .error_for_status()?
+            .json::<Vec<Tx>>()
+            .await?)
     }
 
     async fn _get_fee_estimates(&self) -> Result<HashMap<String, f64>, EsploraError> {
@@ -275,83 +303,8 @@ impl UrlClient {
     }
 }
 
-#[maybe_async]
-impl ElectrumLikeSync for UrlClient {
-    fn els_batch_script_get_history<'s, I: IntoIterator<Item = &'s Script>>(
-        &self,
-        scripts: I,
-    ) -> Result<Vec<Vec<ElsGetHistoryRes>>, Error> {
-        let mut results = vec![];
-        for chunk in ChunksIterator::new(scripts.into_iter(), self.concurrency as usize) {
-            let mut futs = FuturesOrdered::new();
-            for script in chunk {
-                futs.push(self._script_get_history(script));
-            }
-            let partial_results: Vec<Vec<ElsGetHistoryRes>> = await_or_block!(futs.try_collect())?;
-            results.extend(partial_results);
-        }
-        Ok(await_or_block!(stream::iter(results).collect()))
-    }
-
-    fn els_batch_transaction_get<'s, I: IntoIterator<Item = &'s Txid>>(
-        &self,
-        txids: I,
-    ) -> Result<Vec<Transaction>, Error> {
-        let mut results = vec![];
-        for chunk in ChunksIterator::new(txids.into_iter(), self.concurrency as usize) {
-            let mut futs = FuturesOrdered::new();
-            for txid in chunk {
-                futs.push(self._get_tx_no_opt(txid));
-            }
-            let partial_results: Vec<Transaction> = await_or_block!(futs.try_collect())?;
-            results.extend(partial_results);
-        }
-        Ok(await_or_block!(stream::iter(results).collect()))
-    }
-
-    fn els_batch_block_header<I: IntoIterator<Item = u32>>(
-        &self,
-        heights: I,
-    ) -> Result<Vec<BlockHeader>, Error> {
-        let mut results = vec![];
-        for chunk in ChunksIterator::new(heights.into_iter(), self.concurrency as usize) {
-            let mut futs = FuturesOrdered::new();
-            for height in chunk {
-                futs.push(self._get_header(height));
-            }
-            let partial_results: Vec<BlockHeader> = await_or_block!(futs.try_collect())?;
-            results.extend(partial_results);
-        }
-        Ok(await_or_block!(stream::iter(results).collect()))
-    }
-}
-
-/// Configuration for an [`EsploraBlockchain`]
-#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq)]
-pub struct EsploraBlockchainConfig {
-    /// Base URL of the esplora service
-    ///
-    /// eg. `https://blockstream.info/api/`
-    pub base_url: String,
-    /// Optional URL of the proxy to use to make requests to the Esplora server
-    ///
-    /// The string should be formatted as: `<protocol>://<user>:<password>@host:<port>`.
-    ///
-    /// Note that the format of this value and the supported protocols change slightly between the
-    /// sync version of esplora (using `ureq`) and the async version (using `reqwest`). For more
-    /// details check with the documentation of the two crates. Both of them are compiled with
-    /// the `socks` feature enabled.
-    ///
-    /// The proxy is ignored when targeting `wasm32`.
-    pub proxy: Option<String>,
-    /// Number of parallel requests sent to the esplora service (default: 4)
-    pub concurrency: Option<u8>,
-    /// Stop searching addresses for transactions after finding an unused gap of this length.
-    pub stop_gap: usize,
-}
-
 impl ConfigurableBlockchain for EsploraBlockchain {
-    type Config = EsploraBlockchainConfig;
+    type Config = super::EsploraBlockchainConfig;
 
     fn from_config(config: &Self::Config) -> Result<Self, Error> {
         let map_e = |e: reqwest::Error| Error::Esplora(Box::new(e.into()));
@@ -360,13 +313,19 @@ impl ConfigurableBlockchain for EsploraBlockchain {
         if let Some(concurrency) = config.concurrency {
             blockchain.url_client.concurrency = concurrency;
         }
+        let mut builder = Client::builder();
         #[cfg(not(target_arch = "wasm32"))]
         if let Some(proxy) = &config.proxy {
-            blockchain.url_client.client = Client::builder()
-                .proxy(reqwest::Proxy::all(proxy).map_err(map_e)?)
-                .build()
-                .map_err(map_e)?;
+            builder = builder.proxy(reqwest::Proxy::all(proxy).map_err(map_e)?);
         }
+
+        #[cfg(not(target_arch = "wasm32"))]
+        if let Some(timeout) = config.timeout {
+            builder = builder.timeout(core::time::Duration::from_secs(timeout));
+        }
+
+        blockchain.url_client.client = builder.build().map_err(map_e)?;
+
         Ok(blockchain)
     }
 }
index 177b773d65b54ef6930ed57f03b3ef7a1750fd09..365b3281d1fb464e583a51312656c36e4e5c0581 100644 (file)
@@ -26,14 +26,14 @@ use bitcoin::hashes::hex::{FromHex, ToHex};
 use bitcoin::hashes::{sha256, Hash};
 use bitcoin::{BlockHeader, Script, Transaction, Txid};
 
-use crate::blockchain::esplora::{EsploraError, EsploraGetHistory};
-use crate::blockchain::utils::{ElectrumLikeSync, ElsGetHistoryRes};
+use super::api::Tx;
+use crate::blockchain::esplora::EsploraError;
 use crate::blockchain::*;
 use crate::database::BatchDatabase;
 use crate::error::Error;
 use crate::FeeRate;
 
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 struct UrlClient {
     url: String,
     agent: Agent,
@@ -47,15 +47,7 @@ struct UrlClient {
 pub struct EsploraBlockchain {
     url_client: UrlClient,
     stop_gap: usize,
-}
-
-impl std::convert::From<UrlClient> for EsploraBlockchain {
-    fn from(url_client: UrlClient) -> Self {
-        EsploraBlockchain {
-            url_client,
-            stop_gap: 20,
-        }
-    }
+    concurrency: u8,
 }
 
 impl EsploraBlockchain {
@@ -66,6 +58,7 @@ impl EsploraBlockchain {
                 url: base_url.to_string(),
                 agent: Agent::new(),
             },
+            concurrency: super::DEFAULT_CONCURRENT_REQUESTS,
             stop_gap,
         }
     }
@@ -75,6 +68,12 @@ impl EsploraBlockchain {
         self.url_client.agent = agent;
         self
     }
+
+    /// Set the number of parallel requests the client can make.
+    pub fn with_concurrency(mut self, concurrency: u8) -> Self {
+        self.concurrency = concurrency;
+        self
+    }
 }
 
 impl Blockchain for EsploraBlockchain {
@@ -91,10 +90,94 @@ impl Blockchain for EsploraBlockchain {
     fn setup<D: BatchDatabase, P: Progress>(
         &self,
         database: &mut D,
-        progress_update: P,
+        _progress_update: P,
     ) -> Result<(), Error> {
-        self.url_client
-            .electrum_like_setup(self.stop_gap, database, progress_update)
+        use crate::blockchain::script_sync::Request;
+        let mut request = script_sync::start(database, self.stop_gap)?;
+        let mut tx_index: HashMap<Txid, Tx> = HashMap::new();
+        let batch_update = loop {
+            request = match request {
+                Request::Script(script_req) => {
+                    let scripts = script_req
+                        .request()
+                        .take(self.concurrency as usize)
+                        .cloned();
+
+                    let handles = scripts.map(move |script| {
+                        let client = self.url_client.clone();
+                        // make each request in its own thread.
+                        std::thread::spawn(move || {
+                            let mut related_txs: Vec<Tx> = client._scripthash_txs(&script, None)?;
+
+                            let n_confirmed =
+                                related_txs.iter().filter(|tx| tx.status.confirmed).count();
+                            // esplora pages on 25 confirmed transactions. If there's more than
+                            // 25 we need to keep requesting.
+                            if n_confirmed >= 25 {
+                                loop {
+                                    let new_related_txs: Vec<Tx> = client._scripthash_txs(
+                                        &script,
+                                        Some(related_txs.last().unwrap().txid),
+                                    )?;
+                                    let n = new_related_txs.len();
+                                    related_txs.extend(new_related_txs);
+                                    // we've reached the end
+                                    if n < 25 {
+                                        break;
+                                    }
+                                }
+                            }
+                            Result::<_, Error>::Ok(related_txs)
+                        })
+                    });
+
+                    let txs_per_script: Vec<Vec<Tx>> = handles
+                        .map(|handle| handle.join().unwrap())
+                        .collect::<Result<_, _>>()?;
+                    let mut satisfaction = vec![];
+
+                    for txs in txs_per_script {
+                        satisfaction.push(
+                            txs.iter()
+                                .map(|tx| (tx.txid, tx.status.block_height))
+                                .collect(),
+                        );
+                        for tx in txs {
+                            tx_index.insert(tx.txid, tx);
+                        }
+                    }
+
+                    script_req.satisfy(satisfaction)?
+                }
+                Request::Conftime(conftimereq) => {
+                    let conftimes = conftimereq
+                        .request()
+                        .map(|txid| {
+                            tx_index
+                                .get(txid)
+                                .expect("must be in index")
+                                .confirmation_time()
+                        })
+                        .collect();
+                    conftimereq.satisfy(conftimes)?
+                }
+                Request::Tx(txreq) => {
+                    let full_txs = txreq
+                        .request()
+                        .map(|txid| {
+                            let tx = tx_index.get(txid).expect("must be in index");
+                            (tx.confirmation_time(), tx.previous_outputs(), tx.to_tx())
+                        })
+                        .collect();
+                    txreq.satisfy(full_txs)?
+                }
+                Request::Finish(batch_update) => break batch_update,
+            }
+        };
+
+        database.commit_batch(batch_update)?;
+
+        Ok(())
     }
 
     fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
@@ -117,10 +200,6 @@ impl Blockchain for EsploraBlockchain {
 }
 
 impl UrlClient {
-    fn script_to_scripthash(script: &Script) -> String {
-        sha256::Hash::hash(script.as_bytes()).into_inner().to_hex()
-    }
-
     fn _get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, EsploraError> {
         let resp = self
             .agent
@@ -200,81 +279,6 @@ impl UrlClient {
         }
     }
 
-    fn _script_get_history(&self, script: &Script) -> Result<Vec<ElsGetHistoryRes>, EsploraError> {
-        let mut result = Vec::new();
-        let scripthash = Self::script_to_scripthash(script);
-
-        // Add the unconfirmed transactions first
-
-        let resp = self
-            .agent
-            .get(&format!(
-                "{}/scripthash/{}/txs/mempool",
-                self.url, scripthash
-            ))
-            .call();
-
-        let v = match resp {
-            Ok(resp) => {
-                let v: Vec<EsploraGetHistory> = resp.into_json()?;
-                Ok(v)
-            }
-            Err(ureq::Error::Status(code, _)) => Err(EsploraError::HttpResponse(code)),
-            Err(e) => Err(EsploraError::Ureq(e)),
-        }?;
-
-        result.extend(v.into_iter().map(|x| ElsGetHistoryRes {
-            tx_hash: x.txid,
-            height: x.status.block_height.unwrap_or(0) as i32,
-        }));
-
-        debug!(
-            "Found {} mempool txs for {} - {:?}",
-            result.len(),
-            scripthash,
-            script
-        );
-
-        // Then go through all the pages of confirmed transactions
-        let mut last_txid = String::new();
-        loop {
-            let resp = self
-                .agent
-                .get(&format!(
-                    "{}/scripthash/{}/txs/chain/{}",
-                    self.url, scripthash, last_txid
-                ))
-                .call();
-
-            let v = match resp {
-                Ok(resp) => {
-                    let v: Vec<EsploraGetHistory> = resp.into_json()?;
-                    Ok(v)
-                }
-                Err(ureq::Error::Status(code, _)) => Err(EsploraError::HttpResponse(code)),
-                Err(e) => Err(EsploraError::Ureq(e)),
-            }?;
-
-            let len = v.len();
-            if let Some(elem) = v.last() {
-                last_txid = elem.txid.to_hex();
-            }
-
-            debug!("... adding {} confirmed transactions", len);
-
-            result.extend(v.into_iter().map(|x| ElsGetHistoryRes {
-                tx_hash: x.txid,
-                height: x.status.block_height.unwrap_or(0) as i32,
-            }));
-
-            if len < 25 {
-                break;
-            }
-        }
-
-        Ok(result)
-    }
-
     fn _get_fee_estimates(&self) -> Result<HashMap<String, f64>, EsploraError> {
         let resp = self
             .agent
@@ -292,6 +296,22 @@ impl UrlClient {
 
         Ok(map)
     }
+
+    fn _scripthash_txs(
+        &self,
+        script: &Script,
+        last_seen: Option<Txid>,
+    ) -> Result<Vec<Tx>, EsploraError> {
+        let script_hash = sha256::Hash::hash(script.as_bytes()).into_inner().to_hex();
+        let url = match last_seen {
+            Some(last_seen) => format!(
+                "{}/scripthash/{}/txs/chain/{}",
+                self.url, script_hash, last_seen
+            ),
+            None => format!("{}/scripthash/{}/txs", self.url, script_hash),
+        };
+        Ok(self.agent.get(&url).call()?.into_json()?)
+    }
 }
 
 fn is_status_not_found(status: u16) -> bool {
@@ -315,84 +335,37 @@ fn into_bytes(resp: Response) -> Result<Vec<u8>, io::Error> {
     Ok(buf)
 }
 
-impl ElectrumLikeSync for UrlClient {
-    fn els_batch_script_get_history<'s, I: IntoIterator<Item = &'s Script>>(
-        &self,
-        scripts: I,
-    ) -> Result<Vec<Vec<ElsGetHistoryRes>>, Error> {
-        let mut results = vec![];
-        for script in scripts.into_iter() {
-            let v = self._script_get_history(script)?;
-            results.push(v);
-        }
-        Ok(results)
-    }
-
-    fn els_batch_transaction_get<'s, I: IntoIterator<Item = &'s Txid>>(
-        &self,
-        txids: I,
-    ) -> Result<Vec<Transaction>, Error> {
-        let mut results = vec![];
-        for txid in txids.into_iter() {
-            let tx = self._get_tx_no_opt(txid)?;
-            results.push(tx);
-        }
-        Ok(results)
-    }
-
-    fn els_batch_block_header<I: IntoIterator<Item = u32>>(
-        &self,
-        heights: I,
-    ) -> Result<Vec<BlockHeader>, Error> {
-        let mut results = vec![];
-        for height in heights.into_iter() {
-            let header = self._get_header(height)?;
-            results.push(header);
-        }
-        Ok(results)
-    }
-}
-
-/// Configuration for an [`EsploraBlockchain`]
-#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq)]
-pub struct EsploraBlockchainConfig {
-    /// Base URL of the esplora service eg. `https://blockstream.info/api/`
-    pub base_url: String,
-    /// Optional URL of the proxy to use to make requests to the Esplora server
-    ///
-    /// The string should be formatted as: `<protocol>://<user>:<password>@host:<port>`.
-    ///
-    /// Note that the format of this value and the supported protocols change slightly between the
-    /// sync version of esplora (using `ureq`) and the async version (using `reqwest`). For more
-    /// details check with the documentation of the two crates. Both of them are compiled with
-    /// the `socks` feature enabled.
-    ///
-    /// The proxy is ignored when targeting `wasm32`.
-    pub proxy: Option<String>,
-    /// Socket read timeout.
-    pub timeout_read: u64,
-    /// Socket write timeout.
-    pub timeout_write: u64,
-    /// Stop searching addresses for transactions after finding an unused gap of this length.
-    pub stop_gap: usize,
-}
-
 impl ConfigurableBlockchain for EsploraBlockchain {
-    type Config = EsploraBlockchainConfig;
+    type Config = super::EsploraBlockchainConfig;
 
     fn from_config(config: &Self::Config) -> Result<Self, Error> {
-        let mut agent_builder = ureq::AgentBuilder::new()
-            .timeout_read(Duration::from_secs(config.timeout_read))
-            .timeout_write(Duration::from_secs(config.timeout_write));
+        let mut agent_builder = ureq::AgentBuilder::new();
+
+        if let Some(timeout) = config.timeout {
+            agent_builder = agent_builder.timeout(Duration::from_secs(timeout));
+        }
 
         if let Some(proxy) = &config.proxy {
             agent_builder = agent_builder
                 .proxy(Proxy::new(proxy).map_err(|e| Error::Esplora(Box::new(e.into())))?);
         }
 
-        Ok(
-            EsploraBlockchain::new(config.base_url.as_str(), config.stop_gap)
-                .with_agent(agent_builder.build()),
-        )
+        let mut blockchain = EsploraBlockchain::new(config.base_url.as_str(), config.stop_gap)
+            .with_agent(agent_builder.build());
+
+        if let Some(concurrency) = config.concurrency {
+            blockchain = blockchain.with_concurrency(concurrency);
+        }
+
+        Ok(blockchain)
+    }
+}
+
+impl From<ureq::Error> for EsploraError {
+    fn from(e: ureq::Error) -> Self {
+        match e {
+            ureq::Error::Status(code, _) => EsploraError::HttpResponse(code),
+            e => EsploraError::Ureq(e),
+        }
     }
 }
index 4d84caea7db3c8db761141db70ad5184bacc3fa2..bbf0303df9c5f01ca4286e82e74689758a8b10f3 100644 (file)
@@ -27,9 +27,6 @@ use crate::database::BatchDatabase;
 use crate::error::Error;
 use crate::FeeRate;
 
-#[cfg(any(feature = "electrum", feature = "esplora"))]
-pub(crate) mod utils;
-
 #[cfg(any(
     feature = "electrum",
     feature = "esplora",
@@ -37,6 +34,8 @@ pub(crate) mod utils;
     feature = "rpc"
 ))]
 pub mod any;
+mod script_sync;
+
 #[cfg(any(
     feature = "electrum",
     feature = "esplora",
diff --git a/src/blockchain/script_sync.rs b/src/blockchain/script_sync.rs
new file mode 100644 (file)
index 0000000..d9c0bcc
--- /dev/null
@@ -0,0 +1,367 @@
+/*!
+This models a how a sync happens where you have a server that you send your script pubkeys to and it
+returns associated transactions i.e. electrum.
+*/
+#![allow(dead_code)]
+use crate::{
+    database::{BatchDatabase, BatchOperations, DatabaseUtils},
+    ConfirmationTime, Error, KeychainKind, LocalUtxo, TransactionDetails,
+};
+use bitcoin::{OutPoint, Script, Transaction, TxOut, Txid};
+use std::collections::{HashMap, HashSet, VecDeque};
+
+struct State<'a, D> {
+    db: &'a D,
+    last_active_index: HashMap<KeychainKind, usize>,
+    tx_needed: VecDeque<Txid>,
+    conftime_needed: VecDeque<Txid>,
+    observed_txs: Vec<TransactionDetails>,
+}
+
+/// A reqeust for on-chain information
+pub enum Request<'a, D: BatchDatabase> {
+    /// A request for transactions related to script pubkeys.
+    Script(ScriptReq<'a, D>),
+    /// A request for confirmation times for some transactions.
+    Conftime(ConftimeReq<'a, D>),
+    /// A request for full transaction details of some transactions.
+    Tx(TxReq<'a, D>),
+    /// Requests are finished here's a batch database update to reflect data gathered.
+    Finish(D::Batch),
+}
+
+/// starts a sync
+pub fn start<D: BatchDatabase>(db: &D, stop_gap: usize) -> Result<Request<'_, D>, Error> {
+    use rand::seq::SliceRandom;
+    let mut keychains = vec![KeychainKind::Internal, KeychainKind::External];
+    // shuffling improve privacy, the server doesn't know my first request is from my internal or external addresses
+    keychains.shuffle(&mut rand::thread_rng());
+    let keychain = keychains.pop().unwrap();
+    let scripts_needed = db
+        .iter_script_pubkeys(Some(keychain))?
+        .into_iter()
+        .collect();
+    let state = State {
+        db,
+        last_active_index: HashMap::default(),
+        conftime_needed: VecDeque::default(),
+        observed_txs: vec![],
+        tx_needed: VecDeque::default(),
+    };
+
+    Ok(Request::Script(ScriptReq {
+        state,
+        scripts_needed,
+        script_index: 0,
+        stop_gap,
+        keychain,
+        next_keychains: keychains,
+        tx_interested: HashSet::default(),
+        tx_conftime_interested: HashSet::default(),
+    }))
+}
+
+pub struct ScriptReq<'a, D: BatchDatabase> {
+    state: State<'a, D>,
+    script_index: usize,
+    scripts_needed: VecDeque<Script>,
+    stop_gap: usize,
+    keychain: KeychainKind,
+    next_keychains: Vec<KeychainKind>,
+    tx_interested: HashSet<Txid>,
+    tx_conftime_interested: HashSet<Txid>,
+}
+
+/// The sync starts by returning script pubkeys we are interested in.
+impl<'a, D: BatchDatabase> ScriptReq<'a, D> {
+    pub fn request(&self) -> impl Iterator<Item = &Script> + Clone {
+        self.scripts_needed.iter()
+    }
+
+    pub fn satisfy(
+        mut self,
+        // we want to know the txids assoiciated with the script and their height
+        txids: Vec<Vec<(Txid, Option<u32>)>>,
+    ) -> Result<Request<'a, D>, Error> {
+        for txid_list in txids.iter() {
+            if !txid_list.is_empty() {
+                // the address is active
+                self.state
+                    .last_active_index
+                    .insert(self.keychain, self.script_index);
+            }
+
+            for (txid, height) in txid_list {
+                // have we seen this txid already?
+                match self.state.db.get_tx(txid, true)? {
+                    Some(mut details) => {
+                        let old_height = details.confirmation_time.as_ref().map(|x| x.height);
+                        match (old_height, height) {
+                            (None, Some(_)) => {
+                                // It looks like the tx has confirmed since we last saw it -- we
+                                // need to know the confirmation time.
+                                self.tx_conftime_interested.insert(*txid);
+                            }
+                            (Some(old_height), Some(new_height)) if old_height != *new_height => {
+                                // The height of the tx has changed !? -- get the confirmation time.
+                                self.tx_conftime_interested.insert(*txid);
+                            }
+                            (Some(_), None) => {
+                                details.confirmation_time = None;
+                                self.state.observed_txs.push(details);
+                            }
+                            _ => self.state.observed_txs.push(details),
+                        }
+                    }
+                    None => {
+                        // we've never seen it let's get the whole thing
+                        self.tx_interested.insert(*txid);
+                    }
+                };
+            }
+
+            self.script_index += 1;
+        }
+
+        for _ in txids {
+            self.scripts_needed.pop_front();
+        }
+
+        let last_active_index = self
+            .state
+            .last_active_index
+            .get(&self.keychain)
+            .map(|x| x + 1)
+            .unwrap_or(0); // so no addresses active maps to 0
+
+        Ok(
+            if self.script_index > last_active_index + self.stop_gap
+                || self.scripts_needed.is_empty()
+            {
+                // we're done here -- check if we need to do the next keychain
+                if let Some(keychain) = self.next_keychains.pop() {
+                    self.keychain = keychain;
+                    self.script_index = 0;
+                    self.scripts_needed = self
+                        .state
+                        .db
+                        .iter_script_pubkeys(Some(keychain))?
+                        .into_iter()
+                        .collect();
+                    Request::Script(self)
+                } else {
+                    self.state.conftime_needed = self.tx_conftime_interested.into_iter().collect();
+                    self.state.tx_needed = self.tx_interested.into_iter().collect();
+                    Request::Conftime(ConftimeReq { state: self.state })
+                }
+            } else {
+                Request::Script(self)
+            },
+        )
+    }
+}
+
+/// Next step is to get confirmation times for those we are interested in.
+pub struct ConftimeReq<'a, D> {
+    state: State<'a, D>,
+}
+
+impl<'a, D: BatchDatabase> ConftimeReq<'a, D> {
+    pub fn request(&self) -> impl Iterator<Item = &Txid> + Clone {
+        self.state.conftime_needed.iter()
+    }
+
+    pub fn satisfy(
+        mut self,
+        confirmation_times: Vec<Option<ConfirmationTime>>,
+    ) -> Result<Request<'a, D>, Error> {
+        let n = confirmation_times.len();
+        for (confirmation_time, txid) in confirmation_times
+            .into_iter()
+            .zip(self.state.conftime_needed.iter())
+        {
+            if let Some(mut tx_details) = self.state.db.get_tx(txid, true)? {
+                tx_details.confirmation_time = confirmation_time;
+                self.state.observed_txs.push(tx_details);
+            }
+        }
+
+        for _ in 0..n {
+            self.state.conftime_needed.pop_front();
+        }
+
+        if self.state.conftime_needed.is_empty() {
+            Ok(Request::Tx(TxReq { state: self.state }))
+        } else {
+            Ok(Request::Conftime(self))
+        }
+    }
+}
+
+/// Then we get full transactions
+pub struct TxReq<'a, D> {
+    state: State<'a, D>,
+}
+
+impl<'a, D: BatchDatabase> TxReq<'a, D> {
+    pub fn request(&self) -> impl Iterator<Item = &Txid> + Clone {
+        self.state.tx_needed.iter()
+    }
+
+    pub fn satisfy(
+        mut self,
+        tx_details: Vec<(Option<ConfirmationTime>, Vec<Option<TxOut>>, Transaction)>,
+    ) -> Result<Request<'a, D>, Error> {
+        let tx_details: Vec<TransactionDetails> = tx_details
+            .into_iter()
+            .zip(self.state.tx_needed.iter())
+            .map(|((confirmation_time, vin, tx), txid)| {
+                assert_eq!(tx.txid(), *txid);
+                let mut sent: u64 = 0;
+                let mut received: u64 = 0;
+                let mut inputs_sum: u64 = 0;
+                let mut outputs_sum: u64 = 0;
+
+                for (txout, input) in vin.into_iter().zip(tx.input.iter()) {
+                    let txout = match txout {
+                        Some(txout) => txout,
+                        None => {
+                            // skip coinbase inputs
+                            debug_assert!(
+                                input.previous_output.is_null(),
+                                "prevout should only be missing for coinbase"
+                            );
+                            continue;
+                        }
+                    };
+
+                    inputs_sum += txout.value;
+                    if self.state.db.is_mine(&txout.script_pubkey)? {
+                        sent += txout.value;
+                    }
+                }
+
+                for out in &tx.output {
+                    outputs_sum += out.value;
+                    if self.state.db.is_mine(&out.script_pubkey)? {
+                        received += out.value;
+                    }
+                }
+                // we need to saturating sub since we want coinbase txs to map to 0 fee and
+                // this subtraction will be negative for coinbase txs.
+                let fee = inputs_sum.saturating_sub(outputs_sum);
+                Result::<_, Error>::Ok(TransactionDetails {
+                    txid: *txid,
+                    transaction: Some(tx),
+                    received,
+                    sent,
+                    confirmation_time,
+                    fee: Some(fee),
+                    verified: false,
+                })
+            })
+            .collect::<Result<Vec<_>, _>>()?;
+
+        for tx_detail in tx_details {
+            self.state.observed_txs.push(tx_detail);
+            self.state.tx_needed.pop_front();
+        }
+
+        if !self.state.tx_needed.is_empty() {
+            Ok(Request::Tx(self))
+        } else {
+            let existing_txs = self.state.db.iter_txs(false)?;
+            let existing_txids: HashSet<Txid> = existing_txs.iter().map(|tx| tx.txid).collect();
+            let observed_txs = make_txs_consistent(&self.state.observed_txs);
+            let observed_txids: HashSet<Txid> = observed_txs.iter().map(|tx| tx.txid).collect();
+            let txids_to_delete = existing_txids.difference(&observed_txids);
+            let mut batch = self.state.db.begin_batch();
+
+            // Delete old txs that no longer exist
+            for txid in txids_to_delete {
+                if let Some(raw_tx) = self.state.db.get_raw_tx(txid)? {
+                    for i in 0..raw_tx.output.len() {
+                        // Also delete any utxos from the txs that no longer exist.
+                        let _ = batch.del_utxo(&OutPoint {
+                            txid: *txid,
+                            vout: i as u32,
+                        })?;
+                    }
+                } else {
+                    unreachable!("we should always have the raw tx");
+                }
+                batch.del_tx(txid, true)?;
+            }
+
+            // Set every tx we observed
+            for observed_tx in &observed_txs {
+                let tx = observed_tx
+                    .transaction
+                    .as_ref()
+                    .expect("transaction will always be present here");
+                for (i, output) in tx.output.iter().enumerate() {
+                    if let Some((keychain, _)) = self
+                        .state
+                        .db
+                        .get_path_from_script_pubkey(&output.script_pubkey)?
+                    {
+                        // add utxos we own from the new transactions we've seen.
+                        batch.set_utxo(&LocalUtxo {
+                            outpoint: OutPoint {
+                                txid: observed_tx.txid,
+                                vout: i as u32,
+                            },
+                            txout: output.clone(),
+                            keychain,
+                        })?;
+                    }
+                }
+                batch.set_tx(observed_tx)?;
+            }
+
+            // we don't do this in the loop above since we may want to delete some of the utxos we
+            // just added in case there are new tranasactions that spend form each other.
+            for observed_tx in &observed_txs {
+                let tx = observed_tx
+                    .transaction
+                    .as_ref()
+                    .expect("transaction will always be present here");
+                for input in &tx.input {
+                    // Delete any spent utxos
+                    batch.del_utxo(&input.previous_output)?;
+                }
+            }
+
+            for (keychain, last_active_index) in self.state.last_active_index {
+                batch.set_last_index(keychain, last_active_index as u32)?;
+            }
+
+            Ok(Request::Finish(batch))
+        }
+    }
+}
+
+/// Remove conflicting transactions -- tie breaking them by fee.
+fn make_txs_consistent(txs: &[TransactionDetails]) -> Vec<&TransactionDetails> {
+    let mut utxo_index: HashMap<OutPoint, &TransactionDetails> = HashMap::default();
+    for tx in txs {
+        for input in &tx.transaction.as_ref().unwrap().input {
+            utxo_index
+                .entry(input.previous_output)
+                .and_modify(|existing| match (tx.fee, existing.fee) {
+                    (Some(fee), Some(existing_fee)) if fee > existing_fee => *existing = tx,
+                    (Some(_), None) => *existing = tx,
+                    _ => { /* leave it the same */ }
+                })
+                .or_insert(tx);
+        }
+    }
+
+    utxo_index
+        .into_iter()
+        .map(|(_, tx)| (tx.txid, tx))
+        .collect::<HashMap<_, _>>()
+        .into_iter()
+        .map(|(_, tx)| tx)
+        .collect()
+}
index 7385b1db8d623259bd8dc6bd3bfb1b70980b30b8..c719a9fd0531d4dcd21adcc4448b5ffac5fb3f31 100644 (file)
@@ -23,366 +23,3 @@ use crate::database::{BatchDatabase, BatchOperations, DatabaseUtils};
 use crate::error::Error;
 use crate::types::{ConfirmationTime, KeychainKind, LocalUtxo, TransactionDetails};
 use crate::wallet::time::Instant;
-use crate::wallet::utils::ChunksIterator;
-
-#[derive(Debug)]
-pub struct ElsGetHistoryRes {
-    pub height: i32,
-    pub tx_hash: Txid,
-}
-
-/// Implements the synchronization logic for an Electrum-like client.
-#[maybe_async]
-pub trait ElectrumLikeSync {
-    fn els_batch_script_get_history<'s, I: IntoIterator<Item = &'s Script> + Clone>(
-        &self,
-        scripts: I,
-    ) -> Result<Vec<Vec<ElsGetHistoryRes>>, Error>;
-
-    fn els_batch_transaction_get<'s, I: IntoIterator<Item = &'s Txid> + Clone>(
-        &self,
-        txids: I,
-    ) -> Result<Vec<Transaction>, Error>;
-
-    fn els_batch_block_header<I: IntoIterator<Item = u32> + Clone>(
-        &self,
-        heights: I,
-    ) -> Result<Vec<BlockHeader>, Error>;
-
-    // Provided methods down here...
-
-    fn electrum_like_setup<D: BatchDatabase, P: Progress>(
-        &self,
-        stop_gap: usize,
-        db: &mut D,
-        _progress_update: P,
-    ) -> Result<(), Error> {
-        // TODO: progress
-        let start = Instant::new();
-        debug!("start setup");
-
-        let chunk_size = stop_gap;
-
-        let mut history_txs_id = HashSet::new();
-        let mut txid_height = HashMap::new();
-        let mut max_indexes = HashMap::new();
-
-        let mut wallet_chains = vec![KeychainKind::Internal, KeychainKind::External];
-        // shuffling improve privacy, the server doesn't know my first request is from my internal or external addresses
-        wallet_chains.shuffle(&mut thread_rng());
-        // download history of our internal and external script_pubkeys
-        for keychain in wallet_chains.iter() {
-            let script_iter = db.iter_script_pubkeys(Some(*keychain))?.into_iter();
-
-            for (i, chunk) in ChunksIterator::new(script_iter, stop_gap).enumerate() {
-                // TODO if i == last, should create another chunk of addresses in db
-                let call_result: Vec<Vec<ElsGetHistoryRes>> =
-                    maybe_await!(self.els_batch_script_get_history(chunk.iter()))?;
-                let max_index = call_result
-                    .iter()
-                    .enumerate()
-                    .filter_map(|(i, v)| v.first().map(|_| i as u32))
-                    .max();
-                if let Some(max) = max_index {
-                    max_indexes.insert(keychain, max + (i * chunk_size) as u32);
-                }
-                let flattened: Vec<ElsGetHistoryRes> = call_result.into_iter().flatten().collect();
-                debug!("#{} of {:?} results:{}", i, keychain, flattened.len());
-                if flattened.is_empty() {
-                    // Didn't find anything in the last `stop_gap` script_pubkeys, breaking
-                    break;
-                }
-
-                for el in flattened {
-                    // el.height = -1 means unconfirmed with unconfirmed parents
-                    // el.height =  0 means unconfirmed with confirmed parents
-                    // but we treat those tx the same
-                    if el.height <= 0 {
-                        txid_height.insert(el.tx_hash, None);
-                    } else {
-                        txid_height.insert(el.tx_hash, Some(el.height as u32));
-                    }
-                    history_txs_id.insert(el.tx_hash);
-                }
-            }
-        }
-
-        // saving max indexes
-        info!("max indexes are: {:?}", max_indexes);
-        for keychain in wallet_chains.iter() {
-            if let Some(index) = max_indexes.get(keychain) {
-                db.set_last_index(*keychain, *index)?;
-            }
-        }
-
-        // get db status
-        let txs_details_in_db: HashMap<Txid, TransactionDetails> = db
-            .iter_txs(false)?
-            .into_iter()
-            .map(|tx| (tx.txid, tx))
-            .collect();
-        let txs_raw_in_db: HashMap<Txid, Transaction> = db
-            .iter_raw_txs()?
-            .into_iter()
-            .map(|tx| (tx.txid(), tx))
-            .collect();
-        let utxos_deps = utxos_deps(db, &txs_raw_in_db)?;
-
-        // download new txs and headers
-        let new_txs = maybe_await!(self.download_and_save_needed_raw_txs(
-            &history_txs_id,
-            &txs_raw_in_db,
-            chunk_size,
-            db
-        ))?;
-        let new_timestamps = maybe_await!(self.download_needed_headers(
-            &txid_height,
-            &txs_details_in_db,
-            chunk_size
-        ))?;
-
-        let mut batch = db.begin_batch();
-
-        // save any tx details not in db but in history_txs_id or with different height/timestamp
-        for txid in history_txs_id.iter() {
-            let height = txid_height.get(txid).cloned().flatten();
-            let timestamp = new_timestamps.get(txid).cloned();
-            if let Some(tx_details) = txs_details_in_db.get(txid) {
-                // check if tx height matches, otherwise updates it. timestamp is not in the if clause
-                // because we are not asking headers for confirmed tx we know about
-                if tx_details.confirmation_time.as_ref().map(|c| c.height) != height {
-                    let confirmation_time = ConfirmationTime::new(height, timestamp);
-                    let mut new_tx_details = tx_details.clone();
-                    new_tx_details.confirmation_time = confirmation_time;
-                    batch.set_tx(&new_tx_details)?;
-                }
-            } else {
-                save_transaction_details_and_utxos(
-                    txid,
-                    db,
-                    timestamp,
-                    height,
-                    &mut batch,
-                    &utxos_deps,
-                )?;
-            }
-        }
-
-        // remove any tx details in db but not in history_txs_id
-        for txid in txs_details_in_db.keys() {
-            if !history_txs_id.contains(txid) {
-                batch.del_tx(txid, false)?;
-            }
-        }
-
-        // remove any spent utxo
-        for new_tx in new_txs.iter() {
-            for input in new_tx.input.iter() {
-                batch.del_utxo(&input.previous_output)?;
-            }
-        }
-
-        db.commit_batch(batch)?;
-        info!("finish setup, elapsed {:?}ms", start.elapsed().as_millis());
-
-        Ok(())
-    }
-
-    /// download txs identified by `history_txs_id` and theirs previous outputs if not already present in db
-    fn download_and_save_needed_raw_txs<D: BatchDatabase>(
-        &self,
-        history_txs_id: &HashSet<Txid>,
-        txs_raw_in_db: &HashMap<Txid, Transaction>,
-        chunk_size: usize,
-        db: &mut D,
-    ) -> Result<Vec<Transaction>, Error> {
-        let mut txs_downloaded = vec![];
-        let txids_raw_in_db: HashSet<Txid> = txs_raw_in_db.keys().cloned().collect();
-        let txids_to_download: Vec<&Txid> = history_txs_id.difference(&txids_raw_in_db).collect();
-        if !txids_to_download.is_empty() {
-            info!("got {} txs to download", txids_to_download.len());
-            txs_downloaded.extend(maybe_await!(self.download_and_save_in_chunks(
-                txids_to_download,
-                chunk_size,
-                db,
-            ))?);
-            let mut prev_txids = HashSet::new();
-            let mut txids_downloaded = HashSet::new();
-            for tx in txs_downloaded.iter() {
-                txids_downloaded.insert(tx.txid());
-                // add every previous input tx, but skip coinbase
-                for input in tx.input.iter().filter(|i| !i.previous_output.is_null()) {
-                    prev_txids.insert(input.previous_output.txid);
-                }
-            }
-            let already_present: HashSet<Txid> =
-                txids_downloaded.union(&txids_raw_in_db).cloned().collect();
-            let prev_txs_to_download: Vec<&Txid> =
-                prev_txids.difference(&already_present).collect();
-            info!("{} previous txs to download", prev_txs_to_download.len());
-            txs_downloaded.extend(maybe_await!(self.download_and_save_in_chunks(
-                prev_txs_to_download,
-                chunk_size,
-                db,
-            ))?);
-        }
-
-        Ok(txs_downloaded)
-    }
-
-    /// download headers at heights in `txid_height` if tx details not already present, returns a map Txid -> timestamp
-    fn download_needed_headers(
-        &self,
-        txid_height: &HashMap<Txid, Option<u32>>,
-        txs_details_in_db: &HashMap<Txid, TransactionDetails>,
-        chunk_size: usize,
-    ) -> Result<HashMap<Txid, u64>, Error> {
-        let mut txid_timestamp = HashMap::new();
-        let txid_in_db_with_conf: HashSet<_> = txs_details_in_db
-            .values()
-            .filter_map(|details| details.confirmation_time.as_ref().map(|_| details.txid))
-            .collect();
-        let needed_txid_height: HashMap<&Txid, u32> = txid_height
-            .iter()
-            .filter(|(t, _)| !txid_in_db_with_conf.contains(*t))
-            .filter_map(|(t, o)| o.map(|h| (t, h)))
-            .collect();
-        let needed_heights: HashSet<u32> = needed_txid_height.values().cloned().collect();
-        if !needed_heights.is_empty() {
-            info!("{} headers to download for timestamp", needed_heights.len());
-            let mut height_timestamp: HashMap<u32, u64> = HashMap::new();
-            for chunk in ChunksIterator::new(needed_heights.into_iter(), chunk_size) {
-                let call_result: Vec<BlockHeader> =
-                    maybe_await!(self.els_batch_block_header(chunk.clone()))?;
-                height_timestamp.extend(
-                    chunk
-                        .into_iter()
-                        .zip(call_result.iter().map(|h| h.time as u64)),
-                );
-            }
-            for (txid, height) in needed_txid_height {
-                let timestamp = height_timestamp
-                    .get(&height)
-                    .ok_or_else(|| Error::Generic("timestamp missing".to_string()))?;
-                txid_timestamp.insert(*txid, *timestamp);
-            }
-        }
-
-        Ok(txid_timestamp)
-    }
-
-    fn download_and_save_in_chunks<D: BatchDatabase>(
-        &self,
-        to_download: Vec<&Txid>,
-        chunk_size: usize,
-        db: &mut D,
-    ) -> Result<Vec<Transaction>, Error> {
-        let mut txs_downloaded = vec![];
-        for chunk in ChunksIterator::new(to_download.into_iter(), chunk_size) {
-            let call_result: Vec<Transaction> =
-                maybe_await!(self.els_batch_transaction_get(chunk))?;
-            let mut batch = db.begin_batch();
-            for new_tx in call_result.iter() {
-                batch.set_raw_tx(new_tx)?;
-            }
-            db.commit_batch(batch)?;
-            txs_downloaded.extend(call_result);
-        }
-
-        Ok(txs_downloaded)
-    }
-}
-
-fn save_transaction_details_and_utxos<D: BatchDatabase>(
-    txid: &Txid,
-    db: &mut D,
-    timestamp: Option<u64>,
-    height: Option<u32>,
-    updates: &mut dyn BatchOperations,
-    utxo_deps: &HashMap<OutPoint, OutPoint>,
-) -> Result<(), Error> {
-    let tx = db.get_raw_tx(txid)?.ok_or(Error::TransactionNotFound)?;
-
-    let mut incoming: u64 = 0;
-    let mut outgoing: u64 = 0;
-
-    let mut inputs_sum: u64 = 0;
-    let mut outputs_sum: u64 = 0;
-
-    // look for our own inputs
-    for input in tx.input.iter() {
-        // skip coinbase inputs
-        if input.previous_output.is_null() {
-            continue;
-        }
-
-        // We already downloaded all previous output txs in the previous step
-        if let Some(previous_output) = db.get_previous_output(&input.previous_output)? {
-            inputs_sum += previous_output.value;
-
-            if db.is_mine(&previous_output.script_pubkey)? {
-                outgoing += previous_output.value;
-            }
-        } else {
-            // The input is not ours, but we still need to count it for the fees
-            let tx = db
-                .get_raw_tx(&input.previous_output.txid)?
-                .ok_or(Error::TransactionNotFound)?;
-            inputs_sum += tx.output[input.previous_output.vout as usize].value;
-        }
-
-        // removes conflicting UTXO if any (generated from same inputs, like for example RBF)
-        if let Some(outpoint) = utxo_deps.get(&input.previous_output) {
-            updates.del_utxo(outpoint)?;
-        }
-    }
-
-    for (i, output) in tx.output.iter().enumerate() {
-        // to compute the fees later
-        outputs_sum += output.value;
-
-        // this output is ours, we have a path to derive it
-        if let Some((keychain, _child)) = db.get_path_from_script_pubkey(&output.script_pubkey)? {
-            debug!("{} output #{} is mine, adding utxo", txid, i);
-            updates.set_utxo(&LocalUtxo {
-                outpoint: OutPoint::new(tx.txid(), i as u32),
-                txout: output.clone(),
-                keychain,
-            })?;
-
-            incoming += output.value;
-        }
-    }
-
-    let tx_details = TransactionDetails {
-        txid: tx.txid(),
-        transaction: Some(tx),
-        received: incoming,
-        sent: outgoing,
-        confirmation_time: ConfirmationTime::new(height, timestamp),
-        fee: Some(inputs_sum.saturating_sub(outputs_sum)), /* if the tx is a coinbase, fees would be negative */
-        verified: height.is_some(),
-    };
-    updates.set_tx(&tx_details)?;
-
-    Ok(())
-}
-
-/// returns utxo dependency as the inputs needed for the utxo to exist
-/// `tx_raw_in_db` must contains utxo's generating txs or errors with [crate::Error::TransactionNotFound]
-fn utxos_deps<D: BatchDatabase>(
-    db: &mut D,
-    tx_raw_in_db: &HashMap<Txid, Transaction>,
-) -> Result<HashMap<OutPoint, OutPoint>, Error> {
-    let utxos = db.iter_utxos()?;
-    let mut utxos_deps = HashMap::new();
-    for utxo in utxos {
-        let from_tx = tx_raw_in_db
-            .get(&utxo.outpoint.txid)
-            .ok_or(Error::TransactionNotFound)?;
-        for input in from_tx.input.iter() {
-            utxos_deps.insert(input.previous_output, utxo.outpoint);
-        }
-    }
-    Ok(utxos_deps)
-}
index b44a9718236a3e20715691a855b7fd0f575fe886..e2c859241d781ca8cbc7ccd041dada442d912e11 100644 (file)
@@ -602,6 +602,74 @@ macro_rules! bdk_blockchain_tests {
                 assert_eq!(wallet.list_unspent().unwrap().len(), 1, "incorrect number of unspents");
             }
 
+            /// Send two conflicting transactions to the same address twice in a row.
+            /// The coins should only be received once!
+            #[test]
+            fn test_sync_double_receive() {
+                let (wallet, descriptors, mut test_client) = init_single_sig();
+                let receiver_wallet = get_wallet_from_descriptors(&("wpkh(cVpPVruEDdmutPzisEsYvtST1usBR3ntr8pXSyt6D2YYqXRyPcFW)".to_string(), None), &test_client);
+                // need to sync so rpc can start watching
+                receiver_wallet.sync(noop_progress(), None).unwrap();
+
+                test_client.receive(testutils! {
+                    @tx ( (@external descriptors, 0) => 50_000, (@external descriptors, 1) => 25_000 ) (@confirmations 1)
+                });
+
+                wallet.sync(noop_progress(), None).unwrap();
+                assert_eq!(wallet.get_balance().unwrap(), 75_000, "incorrect balance");
+                let target_addr = receiver_wallet.get_address($crate::wallet::AddressIndex::New).unwrap().address;
+
+                let tx1 = {
+                    let mut builder = wallet.build_tx();
+                    builder.add_recipient(target_addr.script_pubkey(), 49_000).enable_rbf();
+                    let (mut psbt, _details) = builder.finish().unwrap();
+                    let finalized = wallet.sign(&mut psbt, Default::default()).unwrap();
+                    assert!(finalized, "Cannot finalize transaction");
+                    psbt.extract_tx()
+                };
+
+                let tx2 = {
+                    let mut builder = wallet.build_tx();
+                    builder.add_recipient(target_addr.script_pubkey(), 49_000).enable_rbf().fee_rate(FeeRate::from_sat_per_vb(5.0));
+                    let (mut psbt, _details) = builder.finish().unwrap();
+                    let finalized = wallet.sign(&mut psbt, Default::default()).unwrap();
+                    assert!(finalized, "Cannot finalize transaction");
+                    psbt.extract_tx()
+                };
+
+                wallet.broadcast(&tx1).unwrap();
+                wallet.broadcast(&tx2).unwrap();
+
+                receiver_wallet.sync(noop_progress(), None).unwrap();
+                assert_eq!(receiver_wallet.get_balance().unwrap(), 49_000, "should have received coins once and only once");
+            }
+
+            #[test]
+            fn test_sync_many_sends_to_a_single_address() {
+                let (wallet, descriptors, mut test_client) = init_single_sig();
+
+                for _ in 0..4 {
+                    // split this up into multiple blocks so rpc doesn't get angry
+                    for _ in 0..20 {
+                        test_client.receive(testutils! {
+                            @tx ( (@external descriptors, 0) => 1_000 )
+                        });
+                    }
+                    test_client.generate(1, None);
+                }
+
+                // add some to the mempool as well.
+                for _ in 0..20 {
+                    test_client.receive(testutils! {
+                        @tx ( (@external descriptors, 0) => 1_000 )
+                    });
+                }
+
+                wallet.sync(noop_progress(), None).unwrap();
+
+                assert_eq!(wallet.get_balance().unwrap(), 100_000);
+            }
+
             #[test]
             fn test_update_confirmation_time_after_generate() {
                 let (wallet, descriptors, mut test_client) = init_single_sig();
index 311f691b98a2354c43361d02add67576c3b6945a..2b19eb80306e16f39ad2e4df9a9c27c460507b1d 100644 (file)
@@ -138,40 +138,6 @@ impl<Pk: MiniscriptKey + ToPublicKey> Satisfier<Pk> for Older {
 
 pub(crate) type SecpCtx = Secp256k1<All>;
 
-pub struct ChunksIterator<I: Iterator> {
-    iter: I,
-    size: usize,
-}
-
-#[cfg(any(feature = "electrum", feature = "esplora"))]
-impl<I: Iterator> ChunksIterator<I> {
-    pub fn new(iter: I, size: usize) -> Self {
-        ChunksIterator { iter, size }
-    }
-}
-
-impl<I: Iterator> Iterator for ChunksIterator<I> {
-    type Item = Vec<<I as std::iter::Iterator>::Item>;
-
-    fn next(&mut self) -> Option<Self::Item> {
-        let mut v = Vec::new();
-        for _ in 0..self.size {
-            let e = self.iter.next();
-
-            match e {
-                None => break,
-                Some(val) => v.push(val),
-            }
-        }
-
-        if v.is_empty() {
-            return None;
-        }
-
-        Some(v)
-    }
-}
-
 #[cfg(test)]
 mod test {
     use super::{