]> Untitled Git - bdk/commitdiff
[sync] Improve sync
authorRiccardo Casatta <riccardo@casatta.it>
Mon, 16 Nov 2020 11:18:34 +0000 (12:18 +0100)
committerRiccardo Casatta <riccardo@casatta.it>
Tue, 17 Nov 2020 08:39:43 +0000 (09:39 +0100)
Make every request in batch, to save round trip times
Fetch timestamp of blockheader to populate timestamp field in transaction
Remove listunspent requests because we can compute it from our history

src/blockchain/electrum.rs
src/blockchain/esplora.rs
src/blockchain/mod.rs
src/blockchain/utils.rs

index 9655ebbb8396bc4278db05b22c74368bd499487f..a134158c0f277c3659c7452776d26a900d1b865e 100644 (file)
@@ -42,11 +42,11 @@ use std::collections::HashSet;
 #[allow(unused_imports)]
 use log::{debug, error, info, trace};
 
-use bitcoin::{Script, Transaction, Txid};
+use bitcoin::{BlockHeader, Script, Transaction, Txid};
 
 use electrum_client::{Client, ElectrumApi};
 
-use self::utils::{ELSGetHistoryRes, ELSListUnspentRes, ElectrumLikeSync};
+use self::utils::{ELSGetHistoryRes, ElectrumLikeSync};
 use super::*;
 use crate::database::BatchDatabase;
 use crate::error::Error;
@@ -141,36 +141,18 @@ impl ElectrumLikeSync for Client {
             .map_err(Error::Electrum)
     }
 
-    fn els_batch_script_list_unspent<'s, I: IntoIterator<Item = &'s Script>>(
+    fn els_batch_transaction_get<'s, I: IntoIterator<Item = &'s Txid>>(
         &self,
-        scripts: I,
-    ) -> Result<Vec<Vec<ELSListUnspentRes>>, Error> {
-        self.batch_script_list_unspent(scripts)
-            .map(|v| {
-                v.into_iter()
-                    .map(|v| {
-                        v.into_iter()
-                            .map(
-                                |electrum_client::ListUnspentRes {
-                                     height,
-                                     tx_hash,
-                                     tx_pos,
-                                     ..
-                                 }| ELSListUnspentRes {
-                                    height,
-                                    tx_hash,
-                                    tx_pos,
-                                },
-                            )
-                            .collect()
-                    })
-                    .collect()
-            })
-            .map_err(Error::Electrum)
+        txids: I,
+    ) -> Result<Vec<Transaction>, Error> {
+        self.batch_transaction_get(txids).map_err(Error::Electrum)
     }
 
-    fn els_transaction_get(&self, txid: &Txid) -> Result<Transaction, Error> {
-        self.transaction_get(txid).map_err(Error::Electrum)
+    fn els_batch_block_header<I: IntoIterator<Item = u32>>(
+        &self,
+        heights: I,
+    ) -> Result<Vec<BlockHeader>, Error> {
+        self.batch_block_header(heights).map_err(Error::Electrum)
     }
 }
 
index ef649d0bfa33e3bb4b1b02ed21c6852899af5344..2b69d5237a772c87ab747ee880360173a36a803b 100644 (file)
@@ -48,15 +48,16 @@ use serde::Deserialize;
 use reqwest::{Client, StatusCode};
 
 use bitcoin::consensus::{deserialize, serialize};
-use bitcoin::hashes::hex::ToHex;
+use bitcoin::hashes::hex::{FromHex, ToHex};
 use bitcoin::hashes::{sha256, Hash};
-use bitcoin::{Script, Transaction, Txid};
+use bitcoin::{BlockHash, BlockHeader, Script, Transaction, TxMerkleNode, Txid};
 
-use self::utils::{ELSGetHistoryRes, ELSListUnspentRes, ElectrumLikeSync};
+use self::utils::{ELSGetHistoryRes, ElectrumLikeSync};
 use super::*;
 use crate::database::BatchDatabase;
 use crate::error::Error;
 use crate::FeeRate;
+use std::convert::TryInto;
 
 #[derive(Debug)]
 struct UrlClient {
@@ -161,6 +162,39 @@ impl UrlClient {
         Ok(Some(deserialize(&resp.error_for_status()?.bytes().await?)?))
     }
 
+    async fn _get_tx_no_opt(&self, txid: &Txid) -> Result<Transaction, EsploraError> {
+        match self._get_tx(txid).await {
+            Ok(Some(tx)) => Ok(tx),
+            Ok(None) => Err(EsploraError::TransactionNotFound(*txid)),
+            Err(e) => Err(e),
+        }
+    }
+
+    async fn _get_header(&self, block_height: u32) -> Result<BlockHeader, EsploraError> {
+        let resp = self
+            .client
+            .get(&format!("{}/block-height/{}", self.url, block_height))
+            .send()
+            .await?;
+
+        if let StatusCode::NOT_FOUND = resp.status() {
+            return Err(EsploraError::HeaderHeightNotFound(block_height));
+        }
+        let bytes = resp.bytes().await?;
+        let hash = std::str::from_utf8(&bytes)
+            .map_err(|_| EsploraError::HeaderHeightNotFound(block_height))?;
+
+        let resp = self
+            .client
+            .get(&format!("{}/block/{}", self.url, hash))
+            .send()
+            .await?;
+
+        let esplora_header = resp.json::<EsploraHeader>().await?;
+
+        Ok(esplora_header.try_into()?)
+    }
+
     async fn _broadcast(&self, transaction: &Transaction) -> Result<(), EsploraError> {
         self.client
             .post(&format!("{}/tx", self.url))
@@ -249,31 +283,6 @@ impl UrlClient {
         Ok(result)
     }
 
-    async fn _script_list_unspent(
-        &self,
-        script: &Script,
-    ) -> Result<Vec<ELSListUnspentRes>, EsploraError> {
-        Ok(self
-            .client
-            .get(&format!(
-                "{}/scripthash/{}/utxo",
-                self.url,
-                Self::script_to_scripthash(script)
-            ))
-            .send()
-            .await?
-            .error_for_status()?
-            .json::<Vec<EsploraListUnspent>>()
-            .await?
-            .into_iter()
-            .map(|x| ELSListUnspentRes {
-                tx_hash: x.txid,
-                height: x.status.block_height.unwrap_or(0),
-                tx_pos: x.vout,
-            })
-            .collect())
-    }
-
     async fn _get_fee_estimates(&self) -> Result<HashMap<String, f64>, EsploraError> {
         Ok(self
             .client
@@ -302,13 +311,13 @@ impl ElectrumLikeSync for UrlClient {
         await_or_block!(future)
     }
 
-    fn els_batch_script_list_unspent<'s, I: IntoIterator<Item = &'s Script>>(
+    fn els_batch_transaction_get<'s, I: IntoIterator<Item = &'s Txid>>(
         &self,
-        scripts: I,
-    ) -> Result<Vec<Vec<ELSListUnspentRes>>, Error> {
+        txids: I,
+    ) -> Result<Vec<Transaction>, Error> {
         let future = async {
-            Ok(stream::iter(scripts)
-                .then(|script| self._script_list_unspent(&script))
+            Ok(stream::iter(txids)
+                .then(|txid| self._get_tx_no_opt(&txid))
                 .try_collect()
                 .await?)
         };
@@ -316,9 +325,18 @@ impl ElectrumLikeSync for UrlClient {
         await_or_block!(future)
     }
 
-    fn els_transaction_get(&self, txid: &Txid) -> Result<Transaction, Error> {
-        Ok(await_or_block!(self._get_tx(txid))?
-            .ok_or_else(|| EsploraError::TransactionNotFound(*txid))?)
+    fn els_batch_block_header<I: IntoIterator<Item = u32>>(
+        &self,
+        heights: I,
+    ) -> Result<Vec<BlockHeader>, Error> {
+        let future = async {
+            Ok(stream::iter(heights)
+                .then(|h| self._get_header(h))
+                .try_collect()
+                .await?)
+        };
+
+        await_or_block!(future)
     }
 }
 
@@ -333,11 +351,37 @@ struct EsploraGetHistory {
     status: EsploraGetHistoryStatus,
 }
 
-#[derive(Deserialize)]
-struct EsploraListUnspent {
-    txid: Txid,
-    vout: usize,
-    status: EsploraGetHistoryStatus,
+#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
+pub struct EsploraHeader {
+    pub id: String,
+    pub height: u32,
+    pub version: i32,
+    pub timestamp: u32,
+    pub tx_count: u32,
+    pub size: u32,
+    pub weight: u32,
+    pub merkle_root: String,
+    pub previousblockhash: String,
+    pub nonce: u32,
+    pub bits: u32,
+    pub difficulty: u32,
+}
+
+impl TryInto<BlockHeader> for EsploraHeader {
+    type Error = EsploraError;
+
+    fn try_into(self) -> Result<BlockHeader, esplora::EsploraError> {
+        Ok(BlockHeader {
+            version: self.version,
+            prev_blockhash: BlockHash::from_hex(&self.previousblockhash)
+                .map_err(|_| EsploraError::HeaderParseFail)?,
+            merkle_root: TxMerkleNode::from_hex(&self.merkle_root)
+                .map_err(|_| EsploraError::HeaderParseFail)?,
+            time: self.timestamp,
+            bits: self.bits,
+            nonce: self.nonce,
+        })
+    }
 }
 
 /// Configuration for an [`EsploraBlockchain`]
@@ -366,6 +410,12 @@ pub enum EsploraError {
 
     /// Transaction not found
     TransactionNotFound(Txid),
+    /// Header height not found
+    HeaderHeightNotFound(u32),
+    /// Header hash not found
+    HeaderHashNotFound(BlockHash),
+    /// EsploraHeader cannot be converted in BlockHeader
+    HeaderParseFail,
 }
 
 impl fmt::Display for EsploraError {
@@ -393,3 +443,23 @@ impl From<bitcoin::consensus::encode::Error> for EsploraError {
         EsploraError::BitcoinEncoding(other)
     }
 }
+
+#[cfg(test)]
+mod test {
+    use crate::blockchain::esplora::EsploraHeader;
+    use bitcoin::hashes::hex::FromHex;
+    use bitcoin::{BlockHash, BlockHeader};
+    use std::convert::TryInto;
+
+    #[test]
+    fn test_esplora_header() {
+        let json_str = r#"{"id":"00000000b873e79784647a6c82962c70d228557d24a747ea4d1b8bbe878e1206","height":1,"version":1,"timestamp":1296688928,"tx_count":1,"size":190,"weight":760,"merkle_root":"f0315ffc38709d70ad5647e22048358dd3745f3ce3874223c80a7c92fab0c8ba","previousblockhash":"000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943","nonce":1924588547,"bits":486604799,"difficulty":1}"#;
+        let json: EsploraHeader = serde_json::from_str(&json_str).unwrap();
+        let header: BlockHeader = json.try_into().unwrap();
+        assert_eq!(
+            header.block_hash(),
+            BlockHash::from_hex("00000000b873e79784647a6c82962c70d228557d24a747ea4d1b8bbe878e1206")
+                .unwrap()
+        );
+    }
+}
index 83b6903de669d5113173bc7806f8d3058e4ee4ff..6711e51ea787e873ffc1aee99badc9104e947198 100644 (file)
@@ -40,6 +40,7 @@ use crate::database::BatchDatabase;
 use crate::error::Error;
 use crate::FeeRate;
 
+#[cfg(any(feature = "electrum", feature = "esplora"))]
 pub(crate) mod utils;
 
 #[cfg(any(feature = "electrum", feature = "esplora", feature = "compact_filters"))]
index bf8ff6cb2cdcb40f1eb9699554ef5c6f3fa093bc..c18b5fa4fd68ccf615aedb84911edbb78a1b0627 100644 (file)
 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 // SOFTWARE.
 
-use std::cmp;
-use std::collections::{HashSet, VecDeque};
-use std::convert::TryFrom;
+use std::collections::{HashMap, HashSet};
 
 #[allow(unused_imports)]
 use log::{debug, error, info, trace};
 
-use bitcoin::{Address, Network, OutPoint, Script, Transaction, Txid};
+use bitcoin::{BlockHeader, OutPoint, Script, Transaction, Txid};
 
 use super::*;
 use crate::database::{BatchDatabase, BatchOperations, DatabaseUtils};
 use crate::error::Error;
 use crate::types::{ScriptType, TransactionDetails, UTXO};
 use crate::wallet::utils::ChunksIterator;
+use rand::seq::SliceRandom;
+use rand::thread_rng;
+use std::time::Instant;
 
 #[derive(Debug)]
 pub struct ELSGetHistoryRes {
@@ -43,13 +44,6 @@ pub struct ELSGetHistoryRes {
     pub tx_hash: Txid,
 }
 
-#[derive(Debug)]
-pub struct ELSListUnspentRes {
-    pub height: usize,
-    pub tx_hash: Txid,
-    pub tx_pos: usize,
-}
-
 /// Implements the synchronization logic for an Electrum-like client.
 #[maybe_async]
 pub trait ElectrumLikeSync {
@@ -58,306 +52,354 @@ pub trait ElectrumLikeSync {
         scripts: I,
     ) -> Result<Vec<Vec<ELSGetHistoryRes>>, Error>;
 
-    fn els_batch_script_list_unspent<'s, I: IntoIterator<Item = &'s Script>>(
+    fn els_batch_transaction_get<'s, I: IntoIterator<Item = &'s Txid>>(
         &self,
-        scripts: I,
-    ) -> Result<Vec<Vec<ELSListUnspentRes>>, Error>;
+        txids: I,
+    ) -> Result<Vec<Transaction>, Error>;
 
-    fn els_transaction_get(&self, txid: &Txid) -> Result<Transaction, Error>;
+    fn els_batch_block_header<I: IntoIterator<Item = u32>>(
+        &self,
+        heights: I,
+    ) -> Result<Vec<BlockHeader>, Error>;
 
     // Provided methods down here...
 
     fn electrum_like_setup<D: BatchDatabase, P: Progress>(
         &self,
         stop_gap: Option<usize>,
-        database: &mut D,
+        db: &mut D,
         _progress_update: P,
     ) -> Result<(), Error> {
         // TODO: progress
+        let start = Instant::now();
+        debug!("start setup");
 
         let stop_gap = stop_gap.unwrap_or(20);
-        let batch_query_size = 20;
-
-        // check unconfirmed tx, delete so they are retrieved later
-        let mut del_batch = database.begin_batch();
-        for tx in database.iter_txs(false)? {
-            if tx.height.is_none() {
-                del_batch.del_tx(&tx.txid, false)?;
-            }
-        }
-        database.commit_batch(del_batch)?;
-
-        // maximum derivation index for a change address that we've seen during sync
-        let mut change_max_deriv = None;
-
-        let mut already_checked: HashSet<Script> = HashSet::new();
-        let mut to_check_later = VecDeque::with_capacity(batch_query_size);
-
-        // insert the first chunk
-        let mut iter_scriptpubkeys = database
-            .iter_script_pubkeys(Some(ScriptType::External))?
-            .into_iter();
-        let chunk: Vec<Script> = iter_scriptpubkeys.by_ref().take(batch_query_size).collect();
-        for item in chunk.into_iter().rev() {
-            to_check_later.push_front(item);
-        }
-
-        let mut iterating_external = true;
-        let mut index = 0;
-        let mut last_found = None;
-        while !to_check_later.is_empty() {
-            trace!("to_check_later size {}", to_check_later.len());
-
-            let until = cmp::min(to_check_later.len(), batch_query_size);
-            let chunk: Vec<Script> = to_check_later.drain(..until).collect();
-            let call_result = maybe_await!(self.els_batch_script_get_history(chunk.iter()))?;
-
-            for (script, history) in chunk.into_iter().zip(call_result.into_iter()) {
-                trace!("received history for {:?}, size {}", script, history.len());
-
-                if !history.is_empty() {
-                    last_found = Some(index);
-
-                    let mut check_later_scripts = maybe_await!(self.check_history(
-                        database,
-                        script,
-                        history,
-                        &mut change_max_deriv
-                    ))?
-                    .into_iter()
-                    .filter(|x| already_checked.insert(x.clone()))
-                    .collect();
-                    to_check_later.append(&mut check_later_scripts);
+        let chunk_size = stop_gap;
+
+        let mut history_txs_id = HashSet::new();
+        let mut txid_height = HashMap::new();
+        let mut max_indexes = HashMap::new();
+
+        let mut wallet_chains = vec![ScriptType::Internal, ScriptType::External];
+        // shuffling improve privacy, the server doesn't know my first request is from my internal or external addresses
+        wallet_chains.shuffle(&mut thread_rng());
+        // download history of our internal and external script_pubkeys
+        for script_type in wallet_chains.iter() {
+            let script_iter = db.iter_script_pubkeys(Some(*script_type))?.into_iter();
+
+            for (i, chunk) in ChunksIterator::new(script_iter, stop_gap).enumerate() {
+                // TODO if i == last, should create another chunk of addresses in db
+                let call_result: Vec<Vec<ELSGetHistoryRes>> =
+                    maybe_await!(self.els_batch_script_get_history(chunk.iter()))?;
+                let max_index = call_result
+                    .iter()
+                    .enumerate()
+                    .filter(|(_, v)| !v.is_empty())
+                    .map(|(i, _)| i as u32)
+                    .max();
+                if let Some(max) = max_index {
+                    max_indexes.insert(script_type, max + (i * chunk_size) as u32);
+                }
+                let flattened: Vec<ELSGetHistoryRes> = call_result.into_iter().flatten().collect();
+                debug!("#{} of {:?} results:{}", i, script_type, flattened.len());
+                if flattened.is_empty() {
+                    // Didn't find anything in the last `stop_gap` script_pubkeys, breaking
+                    break;
                 }
 
-                index += 1;
-            }
-
-            match iterating_external {
-                true if index - last_found.unwrap_or(0) >= stop_gap => iterating_external = false,
-                true => {
-                    trace!("pushing one more batch from `iter_scriptpubkeys`. index = {}, last_found = {:?}, stop_gap = {}", index, last_found, stop_gap);
-
-                    let chunk: Vec<Script> =
-                        iter_scriptpubkeys.by_ref().take(batch_query_size).collect();
-                    for item in chunk.into_iter().rev() {
-                        to_check_later.push_front(item);
+                for el in flattened {
+                    // el.height = -1 means unconfirmed with unconfirmed parents
+                    // el.height =  0 means unconfirmed with confirmed parents
+                    // but we threat those tx the same
+                    let height = el.height.max(0);
+                    if height == 0 {
+                        txid_height.insert(el.tx_hash, None);
+                    } else {
+                        txid_height.insert(el.tx_hash, Some(height as u32));
                     }
+                    history_txs_id.insert(el.tx_hash);
                 }
-                _ => {}
             }
         }
 
-        // check utxo
-        // TODO: try to minimize network requests and re-use scripts if possible
-        let mut batch = database.begin_batch();
-        for chunk in ChunksIterator::new(database.iter_utxos()?.into_iter(), batch_query_size) {
-            let scripts: Vec<_> = chunk.iter().map(|u| &u.txout.script_pubkey).collect();
-            let call_result = maybe_await!(self.els_batch_script_list_unspent(scripts))?;
-
-            // check which utxos are actually still unspent
-            for (utxo, list_unspent) in chunk.into_iter().zip(call_result.iter()) {
-                debug!(
-                    "outpoint {:?} is unspent for me, list unspent is {:?}",
-                    utxo.outpoint, list_unspent
-                );
-
-                let mut spent = true;
-                for unspent in list_unspent {
-                    let res_outpoint = OutPoint::new(unspent.tx_hash, unspent.tx_pos as u32);
-                    if utxo.outpoint == res_outpoint {
-                        spent = false;
-                        break;
-                    }
-                }
-                if spent {
-                    info!("{} not anymore unspent, removing", utxo.outpoint);
-                    batch.del_utxo(&utxo.outpoint)?;
+        // saving max indexes
+        info!("max indexes are: {:?}", max_indexes);
+        for script_type in wallet_chains.iter() {
+            if let Some(index) = max_indexes.get(script_type) {
+                db.set_last_index(*script_type, *index)?;
+            }
+        }
+
+        // get db status
+        let txs_details_in_db: HashMap<Txid, TransactionDetails> = db
+            .iter_txs(false)?
+            .into_iter()
+            .map(|tx| (tx.txid, tx))
+            .collect();
+        let txs_raw_in_db: HashMap<Txid, Transaction> = db
+            .iter_raw_txs()?
+            .into_iter()
+            .map(|tx| (tx.txid(), tx))
+            .collect();
+        let utxos_deps = utxos_deps(db, &txs_raw_in_db)?;
+
+        // download new txs and headers
+        let new_txs = maybe_await!(self.download_and_save_needed_raw_txs(
+            &history_txs_id,
+            &txs_raw_in_db,
+            chunk_size,
+            db
+        ))?;
+        let new_timestamps = maybe_await!(self.download_needed_headers(
+            &txid_height,
+            &txs_details_in_db,
+            chunk_size
+        ))?;
+
+        let mut batch = db.begin_batch();
+
+        // save any tx details not in db but in history_txs_id or with different height/timestamp
+        for txid in history_txs_id.iter() {
+            let height = *txid_height.get(txid).unwrap_or(&None);
+            let timestamp = *new_timestamps.get(txid).unwrap_or(&0u64);
+            if let Some(tx_details) = txs_details_in_db.get(txid) {
+                // check if height matches, otherwise updates it
+                if tx_details.height != height {
+                    let mut new_tx_details = tx_details.clone();
+                    new_tx_details.height = height;
+                    new_tx_details.timestamp = timestamp;
+                    batch.set_tx(&new_tx_details)?;
                 }
+            } else {
+                save_transaction_details_and_utxos(
+                    &txid,
+                    db,
+                    timestamp,
+                    height,
+                    &mut batch,
+                    &utxos_deps,
+                )?;
             }
         }
 
-        let current_ext = database.get_last_index(ScriptType::External)?.unwrap_or(0);
-        let first_ext_new = last_found.map(|x| x + 1).unwrap_or(0) as u32;
-        if first_ext_new > current_ext {
-            info!("Setting external index to {}", first_ext_new);
-            database.set_last_index(ScriptType::External, first_ext_new)?;
+        // remove any tx details in db but not in history_txs_id
+        for txid in txs_details_in_db.keys() {
+            if !history_txs_id.contains(txid) {
+                batch.del_tx(&txid, false)?;
+            }
         }
 
-        let current_int = database.get_last_index(ScriptType::Internal)?.unwrap_or(0);
-        let first_int_new = change_max_deriv.map(|x| x + 1).unwrap_or(0);
-        if first_int_new > current_int {
-            info!("Setting internal index to {}", first_int_new);
-            database.set_last_index(ScriptType::Internal, first_int_new)?;
+        // remove any spent utxo
+        for new_tx in new_txs.iter() {
+            for input in new_tx.input.iter() {
+                batch.del_utxo(&input.previous_output)?;
+            }
         }
 
-        database.commit_batch(batch)?;
+        db.commit_batch(batch)?;
+        info!("finish setup, elapsed {:?}ms", start.elapsed().as_millis());
 
         Ok(())
     }
 
-    fn check_tx_and_descendant<D: BatchDatabase>(
+    /// download txs identified by `history_txs_id` and theirs previous outputs if not already present in db
+    fn download_and_save_needed_raw_txs<D: BatchDatabase>(
         &self,
-        database: &mut D,
-        txid: &Txid,
-        height: Option<u32>,
-        cur_script: &Script,
-        change_max_deriv: &mut Option<u32>,
-    ) -> Result<Vec<Script>, Error> {
-        debug!(
-            "check_tx_and_descendant of {}, height: {:?}, script: {}",
-            txid, height, cur_script
-        );
-        let mut updates = database.begin_batch();
-        let tx = match database.get_tx(&txid, true)? {
-            Some(mut saved_tx) => {
-                // update the height if it's different (in case of reorg)
-                if saved_tx.height != height {
-                    info!(
-                        "updating height from {:?} to {:?} for tx {}",
-                        saved_tx.height, height, txid
-                    );
-                    saved_tx.height = height;
-                    updates.set_tx(&saved_tx)?;
+        history_txs_id: &HashSet<Txid>,
+        txs_raw_in_db: &HashMap<Txid, Transaction>,
+        chunk_size: usize,
+        db: &mut D,
+    ) -> Result<Vec<Transaction>, Error> {
+        let mut txs_downloaded = vec![];
+        let txids_raw_in_db: HashSet<Txid> = txs_raw_in_db.keys().cloned().collect();
+        let txids_to_download: Vec<&Txid> = history_txs_id.difference(&txids_raw_in_db).collect();
+        if !txids_to_download.is_empty() {
+            info!("got {} txs to download", txids_to_download.len());
+            txs_downloaded.extend(maybe_await!(self.download_and_save_in_chunks(
+                txids_to_download,
+                chunk_size,
+                db,
+            ))?);
+            let mut prev_txids = HashSet::new();
+            let mut txids_downloaded = HashSet::new();
+            for tx in txs_downloaded.iter() {
+                txids_downloaded.insert(tx.txid());
+                // add every previous input tx, but skip coinbase
+                for input in tx.input.iter().filter(|i| !i.previous_output.is_null()) {
+                    prev_txids.insert(input.previous_output.txid);
                 }
+            }
+            let already_present: HashSet<Txid> =
+                txids_downloaded.union(&txids_raw_in_db).cloned().collect();
+            let prev_txs_to_download: Vec<&Txid> =
+                prev_txids.difference(&already_present).collect();
+            info!("{} previous txs to download", prev_txs_to_download.len());
+            txs_downloaded.extend(maybe_await!(self.download_and_save_in_chunks(
+                prev_txs_to_download,
+                chunk_size,
+                db,
+            ))?);
+        }
 
-                debug!("already have {} in db, returning the cached version", txid);
+        Ok(txs_downloaded)
+    }
 
-                // unwrap since we explicitly ask for the raw_tx, if it's not present something
-                // went wrong
-                saved_tx.transaction.unwrap()
+    /// download headers at heights in `txid_height` if tx details not already present, returns a map Txid -> timestamp
+    fn download_needed_headers(
+        &self,
+        txid_height: &HashMap<Txid, Option<u32>>,
+        txs_details_in_db: &HashMap<Txid, TransactionDetails>,
+        chunk_size: usize,
+    ) -> Result<HashMap<Txid, u64>, Error> {
+        let mut txid_timestamp = HashMap::new();
+        let needed_txid_height: HashMap<&Txid, &Option<u32>> = txid_height
+            .iter()
+            .filter(|(txid, _)| txs_details_in_db.get(*txid).is_none())
+            .collect();
+        let needed_heights: HashSet<u32> =
+            needed_txid_height.iter().filter_map(|(_, b)| **b).collect();
+        if !needed_heights.is_empty() {
+            info!("{} headers to download for timestamp", needed_heights.len());
+            let mut height_timestamp: HashMap<u32, u64> = HashMap::new();
+            for chunk in ChunksIterator::new(needed_heights.into_iter(), chunk_size) {
+                let call_result: Vec<BlockHeader> =
+                    maybe_await!(self.els_batch_block_header(chunk.clone()))?;
+                let vec: Vec<(u32, u64)> = chunk
+                    .into_iter()
+                    .zip(call_result.iter().map(|h| h.time as u64))
+                    .collect();
+                height_timestamp.extend(vec);
             }
-            None => {
-                let fetched_tx = maybe_await!(self.els_transaction_get(&txid))?;
-                database.set_raw_tx(&fetched_tx)?;
-
-                fetched_tx
+            for (txid, height_opt) in needed_txid_height {
+                if let Some(height) = height_opt {
+                    let timestamp = height_timestamp
+                        .get(height)
+                        .ok_or_else(|| Error::Generic("timestamp missing".to_string()))?;
+                    txid_timestamp.insert(*txid, *timestamp);
+                }
             }
-        };
-
-        let mut incoming: u64 = 0;
-        let mut outgoing: u64 = 0;
+        }
 
-        let mut inputs_sum: u64 = 0;
-        let mut outputs_sum: u64 = 0;
+        Ok(txid_timestamp)
+    }
 
-        // look for our own inputs
-        for (i, input) in tx.input.iter().enumerate() {
-            // skip coinbase inputs
-            if input.previous_output.is_null() {
-                continue;
+    fn download_and_save_in_chunks<D: BatchDatabase>(
+        &self,
+        to_download: Vec<&Txid>,
+        chunk_size: usize,
+        db: &mut D,
+    ) -> Result<Vec<Transaction>, Error> {
+        let mut txs_downloaded = vec![];
+        for chunk in ChunksIterator::new(to_download.into_iter(), chunk_size) {
+            let call_result: Vec<Transaction> =
+                maybe_await!(self.els_batch_transaction_get(chunk))?;
+            let mut batch = db.begin_batch();
+            for new_tx in call_result.iter() {
+                batch.set_raw_tx(new_tx)?;
             }
+            db.commit_batch(batch)?;
+            txs_downloaded.extend(call_result);
+        }
 
-            // the fact that we visit addresses in a BFS fashion starting from the external addresses
-            // should ensure that this query is always consistent (i.e. when we get to call this all
-            // the transactions at a lower depth have already been indexed, so if an outpoint is ours
-            // we are guaranteed to have it in the db).
-            if let Some(previous_output) = database.get_previous_output(&input.previous_output)? {
-                inputs_sum += previous_output.value;
-
-                if database.is_mine(&previous_output.script_pubkey)? {
-                    outgoing += previous_output.value;
-
-                    debug!("{} input #{} is mine, removing from utxo", txid, i);
-                    updates.del_utxo(&input.previous_output)?;
-                }
-            } else {
-                // The input is not ours, but we still need to count it for the fees. so fetch the
-                // tx (from the database or from network) and check it
-                let tx = match database.get_tx(&input.previous_output.txid, true)? {
-                    Some(saved_tx) => saved_tx.transaction.unwrap(),
-                    None => {
-                        let fetched_tx =
-                            maybe_await!(self.els_transaction_get(&input.previous_output.txid))?;
-                        database.set_raw_tx(&fetched_tx)?;
-
-                        fetched_tx
-                    }
-                };
+        Ok(txs_downloaded)
+    }
+}
 
-                inputs_sum += tx.output[input.previous_output.vout as usize].value;
-            }
+fn save_transaction_details_and_utxos<D: BatchDatabase>(
+    txid: &Txid,
+    db: &mut D,
+    timestamp: u64,
+    height: Option<u32>,
+    updates: &mut dyn BatchOperations,
+    utxo_deps: &HashMap<OutPoint, UTXO>,
+) -> Result<(), Error> {
+    let tx = db
+        .get_raw_tx(txid)?
+        .ok_or_else(|| Error::TransactionNotFound)?;
+
+    let mut incoming: u64 = 0;
+    let mut outgoing: u64 = 0;
+
+    let mut inputs_sum: u64 = 0;
+    let mut outputs_sum: u64 = 0;
+
+    // look for our own inputs
+    for input in tx.input.iter() {
+        // skip coinbase inputs
+        if input.previous_output.is_null() {
+            continue;
         }
 
-        let mut to_check_later = vec![];
-        for (i, output) in tx.output.iter().enumerate() {
-            // to compute the fees later
-            outputs_sum += output.value;
-
-            // this output is ours, we have a path to derive it
-            if let Some((script_type, child)) =
-                database.get_path_from_script_pubkey(&output.script_pubkey)?
-            {
-                debug!("{} output #{} is mine, adding utxo", txid, i);
-                updates.set_utxo(&UTXO {
-                    outpoint: OutPoint::new(tx.txid(), i as u32),
-                    txout: output.clone(),
-                    is_internal: script_type.is_internal(),
-                })?;
-                incoming += output.value;
-
-                if output.script_pubkey != *cur_script {
-                    debug!("{} output #{} script {} was not current script, adding script to be checked later", txid, i, output.script_pubkey);
-                    to_check_later.push(output.script_pubkey.clone())
-                }
+        // We already downloaded all previous output txs in the previous step
+        if let Some(previous_output) = db.get_previous_output(&input.previous_output)? {
+            inputs_sum += previous_output.value;
 
-                // derive as many change addrs as external addresses that we've seen
-                if script_type == ScriptType::Internal
-                    && (change_max_deriv.is_none() || child > change_max_deriv.unwrap_or(0))
-                {
-                    *change_max_deriv = Some(child);
-                }
+            if db.is_mine(&previous_output.script_pubkey)? {
+                outgoing += previous_output.value;
             }
+        } else {
+            // The input is not ours, but we still need to count it for the fees
+            let tx = db
+                .get_raw_tx(&input.previous_output.txid)?
+                .ok_or_else(|| Error::TransactionNotFound)?;
+            inputs_sum += tx.output[input.previous_output.vout as usize].value;
         }
 
-        let tx = TransactionDetails {
-            txid: tx.txid(),
-            transaction: Some(tx),
-            received: incoming,
-            sent: outgoing,
-            height,
-            timestamp: 0,
-            fees: inputs_sum.saturating_sub(outputs_sum), // if the tx is a coinbase, fees would be negative
-        };
-        info!("Saving tx {}", txid);
-        updates.set_tx(&tx)?;
-
-        database.commit_batch(updates)?;
-
-        Ok(to_check_later)
+        // removes conflicting UTXO if any (generated from same inputs, like for example RBF)
+        if let Some(utxo) = utxo_deps.get(&input.previous_output) {
+            updates.del_utxo(&utxo.outpoint)?;
+        }
     }
 
-    fn check_history<D: BatchDatabase>(
-        &self,
-        database: &mut D,
-        script_pubkey: Script,
-        txs: Vec<ELSGetHistoryRes>,
-        change_max_deriv: &mut Option<u32>,
-    ) -> Result<Vec<Script>, Error> {
-        let mut to_check_later = Vec::new();
-
-        debug!(
-            "history of {} script {} has {} tx",
-            Address::from_script(&script_pubkey, Network::Testnet).unwrap(),
-            script_pubkey,
-            txs.len()
-        );
-
-        for tx in txs {
-            let height: Option<u32> = match tx.height {
-                0 | -1 => None,
-                x => u32::try_from(x).ok(),
-            };
-
-            to_check_later.extend_from_slice(&maybe_await!(self.check_tx_and_descendant(
-                database,
-                &tx.tx_hash,
-                height,
-                &script_pubkey,
-                change_max_deriv,
-            ))?);
+    for (i, output) in tx.output.iter().enumerate() {
+        // to compute the fees later
+        outputs_sum += output.value;
+
+        // this output is ours, we have a path to derive it
+        if let Some((script_type, _child)) =
+            db.get_path_from_script_pubkey(&output.script_pubkey)?
+        {
+            debug!("{} output #{} is mine, adding utxo", txid, i);
+            updates.set_utxo(&UTXO {
+                outpoint: OutPoint::new(tx.txid(), i as u32),
+                txout: output.clone(),
+                is_internal: script_type.is_internal(),
+            })?;
+
+            incoming += output.value;
         }
+    }
+
+    let tx_details = TransactionDetails {
+        txid: tx.txid(),
+        transaction: Some(tx),
+        received: incoming,
+        sent: outgoing,
+        height,
+        timestamp,
+        fees: inputs_sum.saturating_sub(outputs_sum), // if the tx is a coinbase, fees would be negative
+    };
+    updates.set_tx(&tx_details)?;
+
+    Ok(())
+}
 
-        Ok(to_check_later)
+/// returns utxo dependency as the inputs needed for the utxo to exist
+/// `tx_raw_in_db` must contains utxo's generating txs or errors witt [crate::Error::TransactionNotFound]
+fn utxos_deps<D: BatchDatabase>(
+    db: &mut D,
+    tx_raw_in_db: &HashMap<Txid, Transaction>,
+) -> Result<HashMap<OutPoint, UTXO>, Error> {
+    let utxos = db.iter_utxos()?;
+    let mut utxos_deps = HashMap::new();
+    for utxo in utxos {
+        let from_tx = tx_raw_in_db
+            .get(&utxo.outpoint.txid)
+            .ok_or_else(|| Error::TransactionNotFound)?;
+        for input in from_tx.input.iter() {
+            utxos_deps.insert(input.previous_output, utxo.clone());
+        }
     }
+    Ok(utxos_deps)
 }