]> Untitled Git - bdk/commitdiff
[compact_filters] Add support for Tor
authorAlekos Filini <alekos.filini@gmail.com>
Sat, 29 Aug 2020 17:40:45 +0000 (19:40 +0200)
committerAlekos Filini <alekos.filini@gmail.com>
Sun, 30 Aug 2020 15:24:04 +0000 (17:24 +0200)
Cargo.toml
src/blockchain/compact_filters/mod.rs
src/blockchain/compact_filters/peer.rs
src/blockchain/compact_filters/store.rs
src/blockchain/compact_filters/sync.rs

index 91caadf9a747a7dd623d3ac22d65849644531c0e..8b5bea3e1625467e6b18ff0bb9e62cc27d207f8a 100644 (file)
@@ -22,6 +22,7 @@ clap = { version = "2.33", optional = true }
 base64 = { version = "^0.11", optional = true }
 async-trait = { version = "0.1", optional = true }
 rocksdb = { version = "0.14", optional = true }
+socks = { version = "0.3", optional = true }
 lazy_static = { version = "1.4", optional = true }
 
 # Platform-specific dependencies
@@ -38,7 +39,7 @@ compiler = ["clap", "miniscript/compiler"]
 default = ["key-value-db", "electrum"]
 electrum = ["electrum-client"]
 esplora = ["reqwest", "futures"]
-compact_filters = ["rocksdb", "lazy_static"]
+compact_filters = ["rocksdb", "socks", "lazy_static"]
 key-value-db = ["sled"]
 cli-utils = ["clap", "base64"]
 async-interface = ["async-trait"]
index f32b28b2805098a6cbe596198c0093ff550921b5..4e33af466a81d21d2782ce5c8809147ddb42f587 100644 (file)
@@ -1,6 +1,4 @@
 use std::collections::HashSet;
-use std::fmt;
-use std::net::ToSocketAddrs;
 use std::path::Path;
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::{Arc, Mutex};
@@ -9,7 +7,7 @@ use std::sync::{Arc, Mutex};
 use log::{debug, error, info, trace};
 
 use bitcoin::network::message_blockdata::Inventory;
-use bitcoin::{BitcoinHash, Network, OutPoint, Transaction, Txid};
+use bitcoin::{BitcoinHash, OutPoint, Transaction, Txid};
 
 use rocksdb::{Options, SliceTransform, DB};
 
@@ -27,6 +25,8 @@ use peer::*;
 use store::*;
 use sync::*;
 
+pub use peer::{Mempool, Peer};
+
 const SYNC_HEADERS_COST: f32 = 1.0;
 const SYNC_FILTERS_COST: f32 = 11.6 * 1_000.0;
 const PROCESS_BLOCKS_COST: f32 = 20_000.0;
@@ -35,45 +35,45 @@ const PROCESS_BLOCKS_COST: f32 = 20_000.0;
 pub struct CompactFiltersBlockchain(Option<CompactFilters>);
 
 impl CompactFiltersBlockchain {
-    pub fn new<A: ToSocketAddrs, P: AsRef<Path>>(
-        address: A,
+    pub fn new<P: AsRef<Path>>(
+        peers: Vec<Peer>,
         storage_dir: P,
-        num_threads: usize,
-        skip_blocks: usize,
-        network: Network,
+        skip_blocks: Option<usize>,
     ) -> Result<Self, CompactFiltersError> {
         Ok(CompactFiltersBlockchain(Some(CompactFilters::new(
-            address,
+            peers,
             storage_dir,
-            num_threads,
             skip_blocks,
-            network,
         )?)))
     }
 }
 
+#[derive(Debug)]
 struct CompactFilters {
-    peer: Arc<Peer>,
-    headers: Arc<HeadersStore<Full>>,
-    skip_blocks: usize,
-    num_threads: usize,
+    peers: Vec<Arc<Peer>>,
+    headers: Arc<ChainStore<Full>>,
+    skip_blocks: Option<usize>,
 }
 
 impl CompactFilters {
-    pub fn new<A: ToSocketAddrs, P: AsRef<Path>>(
-        address: A,
+    pub fn new<P: AsRef<Path>>(
+        peers: Vec<Peer>,
         storage_dir: P,
-        num_threads: usize,
-        skip_blocks: usize,
-        network: Network,
+        skip_blocks: Option<usize>,
     ) -> Result<Self, CompactFiltersError> {
+        if peers.is_empty() {
+            return Err(CompactFiltersError::NoPeers);
+        }
+
         let mut opts = Options::default();
         opts.create_if_missing(true);
         opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(16));
 
+        let network = peers[0].get_network();
+
         let cfs = DB::list_cf(&opts, &storage_dir).unwrap_or(vec!["default".to_string()]);
         let db = DB::open_cf(&opts, &storage_dir, &cfs)?;
-        let headers = Arc::new(HeadersStore::new(db, network)?);
+        let headers = Arc::new(ChainStore::new(db, network)?);
 
         // try to recover partial snapshots
         for cf_name in &cfs {
@@ -86,8 +86,7 @@ impl CompactFilters {
         }
 
         Ok(CompactFilters {
-            peer: Arc::new(Peer::new(address, Arc::new(Mempool::default()), network)?),
-            num_threads,
+            peers: peers.into_iter().map(Arc::new).collect(),
             headers,
             skip_blocks,
         })
@@ -191,16 +190,15 @@ impl OnlineBlockchain for CompactFiltersBlockchain {
         progress_update: P,
     ) -> Result<(), Error> {
         let inner = self.0.as_ref().ok_or(Error::OfflineClient)?;
+        let first_peer = &inner.peers[0];
 
-        let cf_sync = Arc::new(CFSync::new(
-            Arc::clone(&inner.headers),
-            inner.skip_blocks,
-            0x00,
-        )?);
+        let skip_blocks = inner.skip_blocks.unwrap_or(0);
+
+        let cf_sync = Arc::new(CFSync::new(Arc::clone(&inner.headers), skip_blocks, 0x00)?);
 
         let initial_height = inner.headers.get_height()?;
-        let total_bundles = (inner.peer.get_version().start_height as usize)
-            .checked_sub(inner.skip_blocks)
+        let total_bundles = (first_peer.get_version().start_height as usize)
+            .checked_sub(skip_blocks)
             .map(|x| x / 1000)
             .unwrap_or(0)
             + 1;
@@ -208,7 +206,7 @@ impl OnlineBlockchain for CompactFiltersBlockchain {
             .checked_sub(cf_sync.pruned_bundles()?)
             .unwrap_or(0);
 
-        let headers_cost = (inner.peer.get_version().start_height as usize)
+        let headers_cost = (first_peer.get_version().start_height as usize)
             .checked_sub(initial_height)
             .unwrap_or(0) as f32
             * SYNC_HEADERS_COST;
@@ -217,7 +215,7 @@ impl OnlineBlockchain for CompactFiltersBlockchain {
         let total_cost = headers_cost + filters_cost + PROCESS_BLOCKS_COST;
 
         if let Some(snapshot) = sync::sync_headers(
-            Arc::clone(&inner.peer),
+            Arc::clone(&first_peer),
             Arc::clone(&inner.headers),
             |new_height| {
                 let local_headers_cost =
@@ -240,7 +238,7 @@ impl OnlineBlockchain for CompactFiltersBlockchain {
             .unwrap_or(0);
         info!("Synced headers to height: {}", synced_height);
 
-        cf_sync.prepare_sync(Arc::clone(&inner.peer))?;
+        cf_sync.prepare_sync(Arc::clone(&first_peer))?;
 
         let all_scripts = Arc::new(
             database
@@ -254,10 +252,10 @@ impl OnlineBlockchain for CompactFiltersBlockchain {
         let synced_bundles = Arc::new(AtomicUsize::new(0));
         let progress_update = Arc::new(Mutex::new(progress_update));
 
-        let mut threads = Vec::with_capacity(inner.num_threads);
-        for _ in 0..inner.num_threads {
+        let mut threads = Vec::with_capacity(inner.peers.len());
+        for peer in &inner.peers {
             let cf_sync = Arc::clone(&cf_sync);
-            let peer = Arc::new(inner.peer.new_connection()?);
+            let peer = Arc::clone(&peer);
             let headers = Arc::clone(&inner.headers);
             let all_scripts = Arc::clone(&all_scripts);
             let last_synced_block = Arc::clone(&last_synced_block);
@@ -336,7 +334,7 @@ impl OnlineBlockchain for CompactFiltersBlockchain {
         }
         database.commit_batch(updates)?;
 
-        inner.peer.ask_for_mempool()?;
+        first_peer.ask_for_mempool()?;
 
         let mut internal_max_deriv = 0;
         let mut external_max_deriv = 0;
@@ -353,7 +351,7 @@ impl OnlineBlockchain for CompactFiltersBlockchain {
                 )?;
             }
         }
-        for tx in inner.peer.get_mempool().iter_txs().iter() {
+        for tx in first_peer.get_mempool().iter_txs().iter() {
             inner.process_tx(
                 database,
                 tx,
@@ -392,15 +390,14 @@ impl OnlineBlockchain for CompactFiltersBlockchain {
     fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
         let inner = self.0.as_ref().ok_or(Error::OfflineClient)?;
 
-        Ok(inner
-            .peer
+        Ok(inner.peers[0]
             .get_mempool()
             .get_tx(&Inventory::Transaction(*txid)))
     }
 
     fn broadcast(&self, tx: &Transaction) -> Result<(), Error> {
         let inner = self.0.as_ref().ok_or(Error::OfflineClient)?;
-        inner.peer.broadcast_tx(tx.clone())?;
+        inner.peers[0].broadcast_tx(tx.clone())?;
 
         Ok(())
     }
@@ -417,14 +414,6 @@ impl OnlineBlockchain for CompactFiltersBlockchain {
     }
 }
 
-impl fmt::Debug for CompactFilters {
-    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-        f.debug_struct("CompactFilters")
-            .field("peer", &self.peer)
-            .finish()
-    }
-}
-
 #[derive(Debug)]
 pub enum CompactFiltersError {
     InvalidResponse,
@@ -433,8 +422,12 @@ pub enum CompactFiltersError {
     InvalidFilter,
     MissingBlock,
     DataCorruption,
+
+    NotConnected,
     Timeout,
 
+    NoPeers,
+
     DB(rocksdb::Error),
     IO(std::io::Error),
     BIP158(bitcoin::util::bip158::Error),
index 589328337a1ba1dae97900dbd751625aad461f7c..3312009680bac33f4172800421b9ee7190182529 100644 (file)
@@ -4,6 +4,8 @@ use std::sync::{Arc, Condvar, Mutex, RwLock};
 use std::thread;
 use std::time::{Duration, SystemTime, UNIX_EPOCH};
 
+use socks::{Socks5Stream, ToTargetAddr};
+
 use rand::{thread_rng, Rng};
 
 use bitcoin::consensus::Encodable;
@@ -22,6 +24,8 @@ use super::CompactFiltersError;
 
 type ResponsesMap = HashMap<&'static str, Arc<(Mutex<Vec<NetworkMessage>>, Condvar)>>;
 
+pub(crate) const TIMEOUT_SECS: u64 = 10;
+
 #[derive(Debug, Default)]
 pub struct Mempool {
     txs: RwLock<HashMap<Txid, Transaction>>,
@@ -70,9 +74,33 @@ impl Peer {
         mempool: Arc<Mempool>,
         network: Network,
     ) -> Result<Self, CompactFiltersError> {
-        let connection = TcpStream::connect(address)?;
+        let stream = TcpStream::connect(address)?;
+
+        Peer::from_stream(stream, mempool, network)
+    }
+
+    pub fn new_proxy<T: ToTargetAddr, P: ToSocketAddrs>(
+        target: T,
+        proxy: P,
+        credentials: Option<(&str, &str)>,
+        mempool: Arc<Mempool>,
+        network: Network,
+    ) -> Result<Self, CompactFiltersError> {
+        let socks_stream = if let Some((username, password)) = credentials {
+            Socks5Stream::connect_with_password(proxy, target, username, password)?
+        } else {
+            Socks5Stream::connect(proxy, target)?
+        };
 
-        let writer = Arc::new(Mutex::new(connection.try_clone()?));
+        Peer::from_stream(socks_stream.into_inner(), mempool, network)
+    }
+
+    fn from_stream(
+        stream: TcpStream,
+        mempool: Arc<Mempool>,
+        network: Network,
+    ) -> Result<Self, CompactFiltersError> {
+        let writer = Arc::new(Mutex::new(stream.try_clone()?));
         let responses: Arc<RwLock<ResponsesMap>> = Arc::new(RwLock::new(HashMap::new()));
         let connected = Arc::new(RwLock::new(true));
 
@@ -85,7 +113,7 @@ impl Peer {
         let reader_thread = thread::spawn(move || {
             Self::reader_thread(
                 network,
-                connection,
+                stream,
                 reader_thread_responses,
                 reader_thread_writer,
                 reader_thread_mempool,
@@ -142,11 +170,6 @@ impl Peer {
         })
     }
 
-    pub fn new_connection(&self) -> Result<Self, CompactFiltersError> {
-        let socket_addr = self.writer.lock().unwrap().peer_addr()?;
-        Self::new(socket_addr, Arc::clone(&self.mempool), self.network)
-    }
-
     fn _send(
         writer: &mut TcpStream,
         magic: u32,
@@ -198,6 +221,10 @@ impl Peer {
         &self.version
     }
 
+    pub fn get_network(&self) -> Network {
+        self.network
+    }
+
     pub fn get_mempool(&self) -> Arc<Mempool> {
         Arc::clone(&self.mempool)
     }
@@ -337,7 +364,7 @@ impl CompactFiltersPeer for Peer {
         }))?;
 
         let response = self
-            .recv("cfcheckpt", Some(Duration::from_secs(10)))?
+            .recv("cfcheckpt", Some(Duration::from_secs(TIMEOUT_SECS)))?
             .ok_or(CompactFiltersError::Timeout)?;
         let response = match response {
             NetworkMessage::CFCheckpt(response) => response,
@@ -364,7 +391,7 @@ impl CompactFiltersPeer for Peer {
         }))?;
 
         let response = self
-            .recv("cfheaders", Some(Duration::from_secs(10)))?
+            .recv("cfheaders", Some(Duration::from_secs(TIMEOUT_SECS)))?
             .ok_or(CompactFiltersError::Timeout)?;
         let response = match response {
             NetworkMessage::CFHeaders(response) => response,
@@ -380,7 +407,7 @@ impl CompactFiltersPeer for Peer {
 
     fn pop_cf_filter_resp(&self) -> Result<CFilter, CompactFiltersError> {
         let response = self
-            .recv("cfilter", Some(Duration::from_secs(10)))?
+            .recv("cfilter", Some(Duration::from_secs(TIMEOUT_SECS)))?
             .ok_or(CompactFiltersError::Timeout)?;
         let response = match response {
             NetworkMessage::CFilter(response) => response,
@@ -418,7 +445,7 @@ impl InvPeer for Peer {
             block_hash,
         )]))?;
 
-        match self.recv("block", Some(Duration::from_secs(10)))? {
+        match self.recv("block", Some(Duration::from_secs(TIMEOUT_SECS)))? {
             None => Ok(None),
             Some(NetworkMessage::Block(response)) => Ok(Some(response)),
             _ => Err(CompactFiltersError::InvalidResponse),
@@ -446,7 +473,7 @@ impl InvPeer for Peer {
 
         for _ in 0..num_txs {
             let tx = self
-                .recv("tx", Some(Duration::from_secs(10)))?
+                .recv("tx", Some(Duration::from_secs(TIMEOUT_SECS)))?
                 .ok_or(CompactFiltersError::Timeout)?;
             let tx = match tx {
                 NetworkMessage::Tx(tx) => tx,
index 81f24d93ce5aa4386dcbec7ecd556b44e9bd3cbb..bfd570cd9691b626e2693bbc6af740d15569c417 100644 (file)
@@ -1,4 +1,5 @@
 use std::convert::TryInto;
+use std::fmt;
 use std::io::{Read, Write};
 use std::marker::PhantomData;
 use std::ops::Deref;
@@ -30,12 +31,12 @@ lazy_static! {
     static ref REGTEST_GENESIS: Block = deserialize(&Vec::<u8>::from_hex("0100000000000000000000000000000000000000000000000000000000000000000000003BA3EDFD7A7B12B27AC72C3E67768F617FC81BC3888A51323A9FB8AA4B1E5E4ADAE5494DFFFF7F20020000000101000000010000000000000000000000000000000000000000000000000000000000000000FFFFFFFF4D04FFFF001D0104455468652054696D65732030332F4A616E2F32303039204368616E63656C6C6F72206F6E206272696E6B206F66207365636F6E64206261696C6F757420666F722062616E6B73FFFFFFFF0100F2052A01000000434104678AFDB0FE5548271967F1A67130B7105CD6A828E03909A67962E0EA1F61DEB649F6BC3F4CEF38C4F35504E51EC112DE5C384DF7BA0B8D578A4C702B6BF11D5FAC00000000").unwrap()).unwrap();
 }
 
-pub trait StoreType: Default {}
+pub trait StoreType: Default + fmt::Debug {}
 
-#[derive(Default)]
+#[derive(Default, Debug)]
 pub struct Full;
 impl StoreType for Full {}
-#[derive(Default)]
+#[derive(Default, Debug)]
 pub struct Snapshot;
 impl StoreType for Snapshot {}
 
@@ -226,7 +227,7 @@ impl Decodable for BundleStatus {
     }
 }
 
-pub struct HeadersStore<T: StoreType> {
+pub struct ChainStore<T: StoreType> {
     store: Arc<RwLock<DB>>,
     cf_name: String,
     min_height: usize,
@@ -234,7 +235,7 @@ pub struct HeadersStore<T: StoreType> {
     phantom: PhantomData<T>,
 }
 
-impl HeadersStore<Full> {
+impl ChainStore<Full> {
     pub fn new(store: DB, network: Network) -> Result<Self, CompactFiltersError> {
         let genesis = match network {
             Network::Bitcoin => MAINNET_GENESIS.deref(),
@@ -262,7 +263,7 @@ impl HeadersStore<Full> {
             store.write(batch)?;
         }
 
-        Ok(HeadersStore {
+        Ok(ChainStore {
             store: Arc::new(RwLock::new(store)),
             cf_name,
             min_height: 0,
@@ -301,10 +302,7 @@ impl HeadersStore<Full> {
         Ok(answer)
     }
 
-    pub fn start_snapshot(
-        &self,
-        from: usize,
-    ) -> Result<HeadersStore<Snapshot>, CompactFiltersError> {
+    pub fn start_snapshot(&self, from: usize) -> Result<ChainStore<Snapshot>, CompactFiltersError> {
         let new_cf_name: String = thread_rng().sample_iter(&Alphanumeric).take(16).collect();
         let new_cf_name = format!("_headers:{}", new_cf_name);
 
@@ -335,7 +333,7 @@ impl HeadersStore<Full> {
         write_store.write(batch)?;
 
         let store = Arc::clone(&self.store);
-        Ok(HeadersStore {
+        Ok(ChainStore {
             store,
             cf_name: new_cf_name,
             min_height: from,
@@ -367,7 +365,7 @@ impl HeadersStore<Full> {
         std::mem::drop(iterator);
         std::mem::drop(write_store);
 
-        let snapshot = HeadersStore {
+        let snapshot = ChainStore {
             store: Arc::clone(&self.store),
             cf_name: cf_name.into(),
             min_height,
@@ -383,7 +381,7 @@ impl HeadersStore<Full> {
 
     pub fn apply_snapshot(
         &self,
-        snaphost: HeadersStore<Snapshot>,
+        snaphost: ChainStore<Snapshot>,
     ) -> Result<(), CompactFiltersError> {
         let mut batch = WriteBatch::default();
 
@@ -523,7 +521,7 @@ impl HeadersStore<Full> {
     }
 }
 
-impl<T: StoreType> HeadersStore<T> {
+impl<T: StoreType> ChainStore<T> {
     pub fn work(&self) -> Result<Uint256, CompactFiltersError> {
         let read_store = self.store.read().unwrap();
         let cf_handle = read_store.cf_handle(&self.cf_name).unwrap();
@@ -629,6 +627,18 @@ impl<T: StoreType> HeadersStore<T> {
     }
 }
 
+impl<T: StoreType> fmt::Debug for ChainStore<T> {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct(&format!("ChainStore<{:?}>", T::default()))
+            .field("cf_name", &self.cf_name)
+            .field("min_height", &self.min_height)
+            .field("network", &self.network)
+            .field("headers_height", &self.get_height())
+            .field("tip_hash", &self.get_tip_hash())
+            .finish()
+    }
+}
+
 pub type FilterHeaderHash = FilterHash;
 
 #[derive(Debug, Clone)]
@@ -663,7 +673,7 @@ type BundleEntry = (BundleStatus, FilterHeaderHash);
 
 impl CFStore {
     pub fn new(
-        headers_store: &HeadersStore<Full>,
+        headers_store: &ChainStore<Full>,
         filter_type: u8,
     ) -> Result<Self, CompactFiltersError> {
         let cf_store = CFStore {
index 57b65f8b40febd7aa56e84a24091436291968110..3c09f687694a1dabc749981a0294add05d520779 100644 (file)
@@ -1,5 +1,6 @@
 use std::collections::{BTreeMap, HashMap, VecDeque};
 use std::sync::{Arc, Mutex};
+use std::time::Duration;
 
 use bitcoin::hash_types::{BlockHash, FilterHash};
 use bitcoin::network::message::NetworkMessage;
@@ -9,11 +10,12 @@ use bitcoin::util::bip158::BlockFilter;
 use super::peer::*;
 use super::store::*;
 use super::CompactFiltersError;
+use crate::error::Error;
 
 pub(crate) const BURIED_CONFIRMATIONS: usize = 100;
 
 pub struct CFSync {
-    headers_store: Arc<HeadersStore<Full>>,
+    headers_store: Arc<ChainStore<Full>>,
     cf_store: Arc<CFStore>,
     skip_blocks: usize,
     bundles: Mutex<VecDeque<(BundleStatus, FilterHash, usize)>>,
@@ -21,7 +23,7 @@ pub struct CFSync {
 
 impl CFSync {
     pub fn new(
-        headers_store: Arc<HeadersStore<Full>>,
+        headers_store: Arc<ChainStore<Full>>,
         skip_blocks: usize,
         filter_type: u8,
     ) -> Result<Self, CompactFiltersError> {
@@ -72,7 +74,7 @@ impl CFSync {
     ) -> Result<(), CompactFiltersError>
     where
         F: Fn(&BlockHash, &BlockFilter) -> Result<bool, CompactFiltersError>,
-        Q: Fn(usize) -> Result<(), crate::error::Error>,
+        Q: Fn(usize) -> Result<(), Error>,
     {
         let current_height = self.headers_store.get_height()?; // TODO: we should update it in case headers_store is also updated
 
@@ -230,11 +232,11 @@ impl CFSync {
 
 pub fn sync_headers<F>(
     peer: Arc<Peer>,
-    store: Arc<HeadersStore<Full>>,
+    store: Arc<ChainStore<Full>>,
     sync_fn: F,
-) -> Result<Option<HeadersStore<Snapshot>>, CompactFiltersError>
+) -> Result<Option<ChainStore<Snapshot>>, CompactFiltersError>
 where
-    F: Fn(usize) -> Result<(), crate::error::Error>,
+    F: Fn(usize) -> Result<(), Error>,
 {
     let locators = store.get_locators()?;
     let locators_vec = locators.iter().map(|(hash, _)| hash).cloned().collect();
@@ -244,22 +246,24 @@ where
         locators_vec,
         Default::default(),
     )))?;
-    let (mut snapshot, mut last_hash) =
-        if let Some(NetworkMessage::Headers(headers)) = peer.recv("headers", None)? {
-            if headers.is_empty() {
-                return Ok(None);
-            }
+    let (mut snapshot, mut last_hash) = if let NetworkMessage::Headers(headers) = peer
+        .recv("headers", Some(Duration::from_secs(TIMEOUT_SECS)))?
+        .ok_or(CompactFiltersError::Timeout)?
+    {
+        if headers.is_empty() {
+            return Ok(None);
+        }
 
-            match locators_map.get(&headers[0].prev_blockhash) {
-                None => return Err(CompactFiltersError::InvalidHeaders),
-                Some(from) => (
-                    store.start_snapshot(*from)?,
-                    headers[0].prev_blockhash.clone(),
-                ),
-            }
-        } else {
-            return Err(CompactFiltersError::InvalidResponse);
-        };
+        match locators_map.get(&headers[0].prev_blockhash) {
+            None => return Err(CompactFiltersError::InvalidHeaders),
+            Some(from) => (
+                store.start_snapshot(*from)?,
+                headers[0].prev_blockhash.clone(),
+            ),
+        }
+    } else {
+        return Err(CompactFiltersError::InvalidResponse);
+    };
 
     let mut sync_height = store.get_height()?;
     while sync_height < peer.get_version().start_height as usize {
@@ -267,7 +271,10 @@ where
             vec![last_hash],
             Default::default(),
         )))?;
-        if let Some(NetworkMessage::Headers(headers)) = peer.recv("headers", None)? {
+        if let NetworkMessage::Headers(headers) = peer
+            .recv("headers", Some(Duration::from_secs(TIMEOUT_SECS)))?
+            .ok_or(CompactFiltersError::Timeout)?
+        {
             let batch_len = headers.len();
             last_hash = snapshot.apply(sync_height, headers)?;