use std::collections::{HashMap, HashSet};
use std::fmt;
-use futures::stream::{self, StreamExt, TryStreamExt};
+use futures::stream::{self, FuturesOrdered, StreamExt, TryStreamExt};
#[allow(unused_imports)]
use log::{debug, error, info, trace};
use super::*;
use crate::database::BatchDatabase;
use crate::error::Error;
+use crate::wallet::utils::ChunksIterator;
use crate::FeeRate;
+const CONCURRENT: usize = 4;
+
#[derive(Debug)]
struct UrlClient {
url: String,
scripts: I,
) -> Result<Vec<Vec<ELSGetHistoryRes>>, Error> {
let future = async {
- Ok(stream::iter(scripts)
- .then(|script| self._script_get_history(&script))
- .try_collect()
- .await?)
+ let mut results = vec![];
+ for chunk in ChunksIterator::new(scripts.into_iter(), CONCURRENT) {
+ let mut futs = FuturesOrdered::new();
+ for script in chunk {
+ futs.push(self._script_get_history(&script));
+ }
+ let partial_results: Vec<Vec<ELSGetHistoryRes>> = futs.try_collect().await?;
+ results.extend(partial_results);
+ }
+ Ok(stream::iter(results).collect().await)
};
await_or_block!(future)
txids: I,
) -> Result<Vec<Transaction>, Error> {
let future = async {
- Ok(stream::iter(txids)
- .then(|txid| self._get_tx_no_opt(&txid))
- .try_collect()
- .await?)
+ let mut results = vec![];
+ for chunk in ChunksIterator::new(txids.into_iter(), CONCURRENT) {
+ let mut futs = FuturesOrdered::new();
+ for txid in chunk {
+ futs.push(self._get_tx_no_opt(&txid));
+ }
+ let partial_results: Vec<Transaction> = futs.try_collect().await?;
+ results.extend(partial_results);
+ }
+ Ok(stream::iter(results).collect().await)
};
await_or_block!(future)
heights: I,
) -> Result<Vec<BlockHeader>, Error> {
let future = async {
- Ok(stream::iter(heights)
- .then(|h| self._get_header(h))
- .try_collect()
- .await?)
+ let mut results = vec![];
+ for chunk in ChunksIterator::new(heights.into_iter(), CONCURRENT) {
+ let mut futs = FuturesOrdered::new();
+ for height in chunk {
+ futs.push(self._get_header(height));
+ }
+ let partial_results: Vec<BlockHeader> = futs.try_collect().await?;
+ results.extend(partial_results);
+ }
+ Ok(stream::iter(results).collect().await)
};
await_or_block!(future)