]> Untitled Git - bdk/commitdiff
Make esplora call in parallel
authorRiccardo Casatta <riccardo@casatta.it>
Tue, 3 Nov 2020 21:09:32 +0000 (22:09 +0100)
committerRiccardo Casatta <riccardo@casatta.it>
Wed, 18 Nov 2020 10:08:19 +0000 (11:08 +0100)
src/blockchain/esplora.rs

index e2da5b80dd3845dc964b8bbf0f3580d2fbea187f..d236e28f5c10f11d1dfccac38d2c738fc1864e5b 100644 (file)
@@ -38,7 +38,7 @@
 use std::collections::{HashMap, HashSet};
 use std::fmt;
 
-use futures::stream::{self, StreamExt, TryStreamExt};
+use futures::stream::{self, FuturesOrdered, StreamExt, TryStreamExt};
 
 #[allow(unused_imports)]
 use log::{debug, error, info, trace};
@@ -56,8 +56,11 @@ use self::utils::{ELSGetHistoryRes, ElectrumLikeSync};
 use super::*;
 use crate::database::BatchDatabase;
 use crate::error::Error;
+use crate::wallet::utils::ChunksIterator;
 use crate::FeeRate;
 
+const CONCURRENT: usize = 4;
+
 #[derive(Debug)]
 struct UrlClient {
     url: String,
@@ -301,10 +304,16 @@ impl ElectrumLikeSync for UrlClient {
         scripts: I,
     ) -> Result<Vec<Vec<ELSGetHistoryRes>>, Error> {
         let future = async {
-            Ok(stream::iter(scripts)
-                .then(|script| self._script_get_history(&script))
-                .try_collect()
-                .await?)
+            let mut results = vec![];
+            for chunk in ChunksIterator::new(scripts.into_iter(), CONCURRENT) {
+                let mut futs = FuturesOrdered::new();
+                for script in chunk {
+                    futs.push(self._script_get_history(&script));
+                }
+                let partial_results: Vec<Vec<ELSGetHistoryRes>> = futs.try_collect().await?;
+                results.extend(partial_results);
+            }
+            Ok(stream::iter(results).collect().await)
         };
 
         await_or_block!(future)
@@ -315,10 +324,16 @@ impl ElectrumLikeSync for UrlClient {
         txids: I,
     ) -> Result<Vec<Transaction>, Error> {
         let future = async {
-            Ok(stream::iter(txids)
-                .then(|txid| self._get_tx_no_opt(&txid))
-                .try_collect()
-                .await?)
+            let mut results = vec![];
+            for chunk in ChunksIterator::new(txids.into_iter(), CONCURRENT) {
+                let mut futs = FuturesOrdered::new();
+                for txid in chunk {
+                    futs.push(self._get_tx_no_opt(&txid));
+                }
+                let partial_results: Vec<Transaction> = futs.try_collect().await?;
+                results.extend(partial_results);
+            }
+            Ok(stream::iter(results).collect().await)
         };
 
         await_or_block!(future)
@@ -329,10 +344,16 @@ impl ElectrumLikeSync for UrlClient {
         heights: I,
     ) -> Result<Vec<BlockHeader>, Error> {
         let future = async {
-            Ok(stream::iter(heights)
-                .then(|h| self._get_header(h))
-                .try_collect()
-                .await?)
+            let mut results = vec![];
+            for chunk in ChunksIterator::new(heights.into_iter(), CONCURRENT) {
+                let mut futs = FuturesOrdered::new();
+                for height in chunk {
+                    futs.push(self._get_header(height));
+                }
+                let partial_results: Vec<BlockHeader> = futs.try_collect().await?;
+                results.extend(partial_results);
+            }
+            Ok(stream::iter(results).collect().await)
         };
 
         await_or_block!(future)