]> Untitled Git - bdk/commitdiff
Compact Filters blockchain implementation
authorAlekos Filini <alekos.filini@gmail.com>
Tue, 25 Aug 2020 14:07:26 +0000 (16:07 +0200)
committerAlekos Filini <alekos.filini@gmail.com>
Sun, 30 Aug 2020 15:23:33 +0000 (17:23 +0200)
14 files changed:
.travis.yml
Cargo.toml
src/blockchain/compact_filters/mod.rs [new file with mode: 0644]
src/blockchain/compact_filters/peer.rs [new file with mode: 0644]
src/blockchain/compact_filters/store.rs [new file with mode: 0644]
src/blockchain/compact_filters/sync.rs [new file with mode: 0644]
src/blockchain/electrum.rs
src/blockchain/esplora.rs
src/blockchain/mod.rs
src/cli.rs
src/error.rs
src/lib.rs
src/wallet/mod.rs
testutils-macros/src/lib.rs

index d3718fd2ac9ed9c306ddcb6cfd5553675d79b4f0..6a6263f19eee92ea535d6f76b7556d381a03fdcc 100644 (file)
@@ -13,6 +13,7 @@ env:
     - TARGET=x86_64-unknown-linux-gnu    FEATURES=minimal,esplora    NO_DEFAULT_FEATURES=1
     - TARGET=x86_64-unknown-linux-gnu    FEATURES=key-value-db       NO_DEFAULT_FEATURES=1
     - TARGET=x86_64-unknown-linux-gnu    FEATURES=electrum           NO_DEFAULT_FEATURES=1
+    - TARGET=x86_64-unknown-linux-gnu    FEATURES=compact_filters    NO_DEFAULT_FEATURES=1
     - TARGET=x86_64-unknown-linux-gnu    FEATURES=cli-utils,esplora  NO_DEFAULT_FEATURES=1
     - TARGET=x86_64-unknown-linux-gnu    FEATURES=compiler           NO_DEFAULT_FEATURES=1    RUN_TESTS=1 # Test the `miniscriptc` example
     - TARGET=x86_64-unknown-linux-gnu    FEATURES=test-electrum      NO_DEFAULT_FEATURES=1    RUN_TESTS=1    RUN_CORE=1
index 71af2deafbcea02b656c6ab843ea597c117e0bd6..91caadf9a747a7dd623d3ac22d65849644531c0e 100644 (file)
@@ -14,13 +14,15 @@ serde_json = { version = "^1.0" }
 rand = "^0.7"
 
 # Optional dependencies
-sled = { version = "0.31.0", optional = true }
+sled = { version = "0.34", optional = true }
 electrum-client = { version = "0.2.0-beta.1", optional = true }
 reqwest = { version = "0.10", optional = true, features = ["json"] }
 futures = { version = "0.3", optional = true }
 clap = { version = "2.33", optional = true }
 base64 = { version = "^0.11", optional = true }
 async-trait = { version = "0.1", optional = true }
+rocksdb = { version = "0.14", optional = true }
+lazy_static = { version = "1.4", optional = true }
 
 # Platform-specific dependencies
 [target.'cfg(not(target_arch = "wasm32"))'.dependencies]
@@ -36,6 +38,7 @@ compiler = ["clap", "miniscript/compiler"]
 default = ["key-value-db", "electrum"]
 electrum = ["electrum-client"]
 esplora = ["reqwest", "futures"]
+compact_filters = ["rocksdb", "lazy_static"]
 key-value-db = ["sled"]
 cli-utils = ["clap", "base64"]
 async-interface = ["async-trait"]
diff --git a/src/blockchain/compact_filters/mod.rs b/src/blockchain/compact_filters/mod.rs
new file mode 100644 (file)
index 0000000..f32b28b
--- /dev/null
@@ -0,0 +1,465 @@
+use std::collections::HashSet;
+use std::fmt;
+use std::net::ToSocketAddrs;
+use std::path::Path;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, Mutex};
+
+#[allow(unused_imports)]
+use log::{debug, error, info, trace};
+
+use bitcoin::network::message_blockdata::Inventory;
+use bitcoin::{BitcoinHash, Network, OutPoint, Transaction, Txid};
+
+use rocksdb::{Options, SliceTransform, DB};
+
+mod peer;
+mod store;
+mod sync;
+
+use super::{Blockchain, Capability, OnlineBlockchain, Progress};
+use crate::database::{BatchDatabase, BatchOperations, DatabaseUtils};
+use crate::error::Error;
+use crate::types::{ScriptType, TransactionDetails, UTXO};
+use crate::FeeRate;
+
+use peer::*;
+use store::*;
+use sync::*;
+
+const SYNC_HEADERS_COST: f32 = 1.0;
+const SYNC_FILTERS_COST: f32 = 11.6 * 1_000.0;
+const PROCESS_BLOCKS_COST: f32 = 20_000.0;
+
+#[derive(Debug)]
+pub struct CompactFiltersBlockchain(Option<CompactFilters>);
+
+impl CompactFiltersBlockchain {
+    pub fn new<A: ToSocketAddrs, P: AsRef<Path>>(
+        address: A,
+        storage_dir: P,
+        num_threads: usize,
+        skip_blocks: usize,
+        network: Network,
+    ) -> Result<Self, CompactFiltersError> {
+        Ok(CompactFiltersBlockchain(Some(CompactFilters::new(
+            address,
+            storage_dir,
+            num_threads,
+            skip_blocks,
+            network,
+        )?)))
+    }
+}
+
+struct CompactFilters {
+    peer: Arc<Peer>,
+    headers: Arc<HeadersStore<Full>>,
+    skip_blocks: usize,
+    num_threads: usize,
+}
+
+impl CompactFilters {
+    pub fn new<A: ToSocketAddrs, P: AsRef<Path>>(
+        address: A,
+        storage_dir: P,
+        num_threads: usize,
+        skip_blocks: usize,
+        network: Network,
+    ) -> Result<Self, CompactFiltersError> {
+        let mut opts = Options::default();
+        opts.create_if_missing(true);
+        opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(16));
+
+        let cfs = DB::list_cf(&opts, &storage_dir).unwrap_or(vec!["default".to_string()]);
+        let db = DB::open_cf(&opts, &storage_dir, &cfs)?;
+        let headers = Arc::new(HeadersStore::new(db, network)?);
+
+        // try to recover partial snapshots
+        for cf_name in &cfs {
+            if !cf_name.starts_with("_headers:") {
+                continue;
+            }
+
+            info!("Trying to recover: {:?}", cf_name);
+            headers.recover_snapshot(cf_name)?;
+        }
+
+        Ok(CompactFilters {
+            peer: Arc::new(Peer::new(address, Arc::new(Mempool::default()), network)?),
+            num_threads,
+            headers,
+            skip_blocks,
+        })
+    }
+
+    fn process_tx<D: BatchDatabase + DatabaseUtils>(
+        &self,
+        database: &mut D,
+        tx: &Transaction,
+        height: Option<u32>,
+        timestamp: u64,
+        internal_max_deriv: &mut u32,
+        external_max_deriv: &mut u32,
+    ) -> Result<(), Error> {
+        let mut updates = database.begin_batch();
+
+        let mut incoming: u64 = 0;
+        let mut outgoing: u64 = 0;
+
+        let mut inputs_sum: u64 = 0;
+        let mut outputs_sum: u64 = 0;
+
+        // look for our own inputs
+        for (i, input) in tx.input.iter().enumerate() {
+            if let Some(previous_output) = database.get_previous_output(&input.previous_output)? {
+                inputs_sum += previous_output.value;
+
+                if database.is_mine(&previous_output.script_pubkey)? {
+                    outgoing += previous_output.value;
+
+                    debug!("{} input #{} is mine, removing from utxo", tx.txid(), i);
+                    updates.del_utxo(&input.previous_output)?;
+                }
+            }
+        }
+
+        for (i, output) in tx.output.iter().enumerate() {
+            // to compute the fees later
+            outputs_sum += output.value;
+
+            // this output is ours, we have a path to derive it
+            if let Some((script_type, child)) =
+                database.get_path_from_script_pubkey(&output.script_pubkey)?
+            {
+                debug!("{} output #{} is mine, adding utxo", tx.txid(), i);
+                updates.set_utxo(&UTXO {
+                    outpoint: OutPoint::new(tx.txid(), i as u32),
+                    txout: output.clone(),
+                    is_internal: script_type.is_internal(),
+                })?;
+                incoming += output.value;
+
+                if script_type == ScriptType::Internal && child > *internal_max_deriv {
+                    *internal_max_deriv = child;
+                } else if script_type == ScriptType::External && child > *external_max_deriv {
+                    *external_max_deriv = child;
+                }
+            }
+        }
+
+        if incoming > 0 || outgoing > 0 {
+            let tx = TransactionDetails {
+                txid: tx.txid(),
+                transaction: Some(tx.clone()),
+                received: incoming,
+                sent: outgoing,
+                height,
+                timestamp,
+                fees: inputs_sum.checked_sub(outputs_sum).unwrap_or(0),
+            };
+
+            info!("Saving tx {}", tx.txid);
+            updates.set_tx(&tx)?;
+        }
+
+        database.commit_batch(updates)?;
+
+        Ok(())
+    }
+}
+
+impl Blockchain for CompactFiltersBlockchain {
+    fn offline() -> Self {
+        CompactFiltersBlockchain(None)
+    }
+
+    fn is_online(&self) -> bool {
+        self.0.is_some()
+    }
+}
+
+impl OnlineBlockchain for CompactFiltersBlockchain {
+    fn get_capabilities(&self) -> HashSet<Capability> {
+        vec![Capability::FullHistory].into_iter().collect()
+    }
+
+    fn setup<D: BatchDatabase + DatabaseUtils, P: 'static + Progress>(
+        &self,
+        _stop_gap: Option<usize>, // TODO: move to electrum and esplora only
+        database: &mut D,
+        progress_update: P,
+    ) -> Result<(), Error> {
+        let inner = self.0.as_ref().ok_or(Error::OfflineClient)?;
+
+        let cf_sync = Arc::new(CFSync::new(
+            Arc::clone(&inner.headers),
+            inner.skip_blocks,
+            0x00,
+        )?);
+
+        let initial_height = inner.headers.get_height()?;
+        let total_bundles = (inner.peer.get_version().start_height as usize)
+            .checked_sub(inner.skip_blocks)
+            .map(|x| x / 1000)
+            .unwrap_or(0)
+            + 1;
+        let expected_bundles_to_sync = total_bundles
+            .checked_sub(cf_sync.pruned_bundles()?)
+            .unwrap_or(0);
+
+        let headers_cost = (inner.peer.get_version().start_height as usize)
+            .checked_sub(initial_height)
+            .unwrap_or(0) as f32
+            * SYNC_HEADERS_COST;
+        let filters_cost = expected_bundles_to_sync as f32 * SYNC_FILTERS_COST;
+
+        let total_cost = headers_cost + filters_cost + PROCESS_BLOCKS_COST;
+
+        if let Some(snapshot) = sync::sync_headers(
+            Arc::clone(&inner.peer),
+            Arc::clone(&inner.headers),
+            |new_height| {
+                let local_headers_cost =
+                    new_height.checked_sub(initial_height).unwrap_or(0) as f32 * SYNC_HEADERS_COST;
+                progress_update.update(
+                    local_headers_cost / total_cost * 100.0,
+                    Some(format!("Synced headers to {}", new_height)),
+                )
+            },
+        )? {
+            if snapshot.work()? > inner.headers.work()? {
+                info!("Applying snapshot with work: {}", snapshot.work()?);
+                inner.headers.apply_snapshot(snapshot)?;
+            }
+        }
+
+        let synced_height = inner.headers.get_height()?;
+        let buried_height = synced_height
+            .checked_sub(sync::BURIED_CONFIRMATIONS)
+            .unwrap_or(0);
+        info!("Synced headers to height: {}", synced_height);
+
+        cf_sync.prepare_sync(Arc::clone(&inner.peer))?;
+
+        let all_scripts = Arc::new(
+            database
+                .iter_script_pubkeys(None)?
+                .into_iter()
+                .map(|s| s.to_bytes())
+                .collect::<Vec<_>>(),
+        );
+
+        let last_synced_block = Arc::new(Mutex::new(synced_height));
+        let synced_bundles = Arc::new(AtomicUsize::new(0));
+        let progress_update = Arc::new(Mutex::new(progress_update));
+
+        let mut threads = Vec::with_capacity(inner.num_threads);
+        for _ in 0..inner.num_threads {
+            let cf_sync = Arc::clone(&cf_sync);
+            let peer = Arc::new(inner.peer.new_connection()?);
+            let headers = Arc::clone(&inner.headers);
+            let all_scripts = Arc::clone(&all_scripts);
+            let last_synced_block = Arc::clone(&last_synced_block);
+            let progress_update = Arc::clone(&progress_update);
+            let synced_bundles = Arc::clone(&synced_bundles);
+
+            let thread = std::thread::spawn(move || {
+                cf_sync.capture_thread_for_sync(
+                    peer,
+                    |block_hash, filter| {
+                        if !filter
+                            .match_any(block_hash, &mut all_scripts.iter().map(AsRef::as_ref))?
+                        {
+                            return Ok(false);
+                        }
+
+                        let block_height = headers.get_height_for(block_hash)?.unwrap_or(0);
+                        let saved_correct_block = match headers.get_full_block(block_height)? {
+                            Some(block) if &block.bitcoin_hash() == block_hash => true,
+                            _ => false,
+                        };
+
+                        if saved_correct_block {
+                            Ok(false)
+                        } else {
+                            let mut last_synced_block = last_synced_block.lock().unwrap();
+
+                            // If we download a block older than `last_synced_block`, we update it so that
+                            // we know to delete and re-process all txs starting from that height
+                            if block_height < *last_synced_block {
+                                *last_synced_block = block_height;
+                            }
+
+                            Ok(true)
+                        }
+                    },
+                    |index| {
+                        let synced_bundles = synced_bundles.fetch_add(1, Ordering::SeqCst);
+                        let local_filters_cost = synced_bundles as f32 * SYNC_FILTERS_COST;
+                        progress_update.lock().unwrap().update(
+                            (headers_cost + local_filters_cost) / total_cost * 100.0,
+                            Some(format!(
+                                "Synced filters {} - {}",
+                                index * 1000 + 1,
+                                (index + 1) * 1000
+                            )),
+                        )
+                    },
+                )
+            });
+
+            threads.push(thread);
+        }
+
+        for t in threads {
+            t.join().unwrap()?;
+        }
+
+        progress_update.lock().unwrap().update(
+            (headers_cost + filters_cost) / total_cost * 100.0,
+            Some("Processing downloaded blocks and mempool".into()),
+        )?;
+
+        // delete all txs newer than last_synced_block
+        let last_synced_block = *last_synced_block.lock().unwrap();
+        log::debug!(
+            "Dropping transactions newer than `last_synced_block` = {}",
+            last_synced_block
+        );
+        let mut updates = database.begin_batch();
+        for details in database.iter_txs(false)? {
+            match details.height {
+                Some(height) if (height as usize) < last_synced_block => continue,
+                _ => updates.del_tx(&details.txid, false)?,
+            };
+        }
+        database.commit_batch(updates)?;
+
+        inner.peer.ask_for_mempool()?;
+
+        let mut internal_max_deriv = 0;
+        let mut external_max_deriv = 0;
+
+        for (height, block) in inner.headers.iter_full_blocks()? {
+            for tx in &block.txdata {
+                inner.process_tx(
+                    database,
+                    tx,
+                    Some(height as u32),
+                    0,
+                    &mut internal_max_deriv,
+                    &mut external_max_deriv,
+                )?;
+            }
+        }
+        for tx in inner.peer.get_mempool().iter_txs().iter() {
+            inner.process_tx(
+                database,
+                tx,
+                None,
+                0,
+                &mut internal_max_deriv,
+                &mut external_max_deriv,
+            )?;
+        }
+
+        let current_ext = database.get_last_index(ScriptType::External)?.unwrap_or(0);
+        let first_ext_new = external_max_deriv as u32 + 1;
+        if first_ext_new > current_ext {
+            info!("Setting external index to {}", first_ext_new);
+            database.set_last_index(ScriptType::External, first_ext_new)?;
+        }
+
+        let current_int = database.get_last_index(ScriptType::Internal)?.unwrap_or(0);
+        let first_int_new = internal_max_deriv + 1;
+        if first_int_new > current_int {
+            info!("Setting internal index to {}", first_int_new);
+            database.set_last_index(ScriptType::Internal, first_int_new)?;
+        }
+
+        info!("Dropping blocks until {}", buried_height);
+        inner.headers.delete_blocks_until(buried_height)?;
+
+        progress_update
+            .lock()
+            .unwrap()
+            .update(100.0, Some("Done".into()))?;
+
+        Ok(())
+    }
+
+    fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
+        let inner = self.0.as_ref().ok_or(Error::OfflineClient)?;
+
+        Ok(inner
+            .peer
+            .get_mempool()
+            .get_tx(&Inventory::Transaction(*txid)))
+    }
+
+    fn broadcast(&self, tx: &Transaction) -> Result<(), Error> {
+        let inner = self.0.as_ref().ok_or(Error::OfflineClient)?;
+        inner.peer.broadcast_tx(tx.clone())?;
+
+        Ok(())
+    }
+
+    fn get_height(&self) -> Result<u32, Error> {
+        let inner = self.0.as_ref().ok_or(Error::OfflineClient)?;
+
+        Ok(inner.headers.get_height()? as u32)
+    }
+
+    fn estimate_fee(&self, _target: usize) -> Result<FeeRate, Error> {
+        // TODO
+        Ok(FeeRate::default())
+    }
+}
+
+impl fmt::Debug for CompactFilters {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("CompactFilters")
+            .field("peer", &self.peer)
+            .finish()
+    }
+}
+
+#[derive(Debug)]
+pub enum CompactFiltersError {
+    InvalidResponse,
+    InvalidHeaders,
+    InvalidFilterHeader,
+    InvalidFilter,
+    MissingBlock,
+    DataCorruption,
+    Timeout,
+
+    DB(rocksdb::Error),
+    IO(std::io::Error),
+    BIP158(bitcoin::util::bip158::Error),
+    Time(std::time::SystemTimeError),
+
+    Global(Box<crate::error::Error>),
+}
+
+macro_rules! impl_error {
+    ( $from:ty, $to:ident ) => {
+        impl std::convert::From<$from> for CompactFiltersError {
+            fn from(err: $from) -> Self {
+                CompactFiltersError::$to(err)
+            }
+        }
+    };
+}
+
+impl_error!(rocksdb::Error, DB);
+impl_error!(std::io::Error, IO);
+impl_error!(bitcoin::util::bip158::Error, BIP158);
+impl_error!(std::time::SystemTimeError, Time);
+
+impl From<crate::error::Error> for CompactFiltersError {
+    fn from(err: crate::error::Error) -> Self {
+        CompactFiltersError::Global(Box::new(err))
+    }
+}
diff --git a/src/blockchain/compact_filters/peer.rs b/src/blockchain/compact_filters/peer.rs
new file mode 100644 (file)
index 0000000..5893283
--- /dev/null
@@ -0,0 +1,468 @@
+use std::collections::HashMap;
+use std::net::{TcpStream, ToSocketAddrs};
+use std::sync::{Arc, Condvar, Mutex, RwLock};
+use std::thread;
+use std::time::{Duration, SystemTime, UNIX_EPOCH};
+
+use rand::{thread_rng, Rng};
+
+use bitcoin::consensus::Encodable;
+use bitcoin::hash_types::BlockHash;
+use bitcoin::hashes::Hash;
+use bitcoin::network::constants::ServiceFlags;
+use bitcoin::network::message::{NetworkMessage, RawNetworkMessage};
+use bitcoin::network::message_blockdata::*;
+use bitcoin::network::message_filter::*;
+use bitcoin::network::message_network::VersionMessage;
+use bitcoin::network::stream_reader::StreamReader;
+use bitcoin::network::Address;
+use bitcoin::{Block, Network, Transaction, Txid};
+
+use super::CompactFiltersError;
+
+type ResponsesMap = HashMap<&'static str, Arc<(Mutex<Vec<NetworkMessage>>, Condvar)>>;
+
+#[derive(Debug, Default)]
+pub struct Mempool {
+    txs: RwLock<HashMap<Txid, Transaction>>,
+}
+
+impl Mempool {
+    pub fn add_tx(&self, tx: Transaction) {
+        self.txs.write().unwrap().insert(tx.txid(), tx);
+    }
+
+    pub fn get_tx(&self, inventory: &Inventory) -> Option<Transaction> {
+        let txid = match inventory {
+            Inventory::Error | Inventory::Block(_) | Inventory::WitnessBlock(_) => return None,
+            Inventory::Transaction(txid) => *txid,
+            Inventory::WitnessTransaction(wtxid) => Txid::from_inner(wtxid.into_inner()),
+        };
+        self.txs.read().unwrap().get(&txid).cloned()
+    }
+
+    pub fn has_tx(&self, txid: &Txid) -> bool {
+        self.txs.read().unwrap().contains_key(txid)
+    }
+
+    pub fn iter_txs(&self) -> Vec<Transaction> {
+        self.txs.read().unwrap().values().cloned().collect()
+    }
+}
+
+#[derive(Debug)]
+pub struct Peer {
+    writer: Arc<Mutex<TcpStream>>,
+    responses: Arc<RwLock<ResponsesMap>>,
+
+    reader_thread: thread::JoinHandle<()>,
+    connected: Arc<RwLock<bool>>,
+
+    mempool: Arc<Mempool>,
+
+    version: VersionMessage,
+    network: Network,
+}
+
+impl Peer {
+    pub fn new<A: ToSocketAddrs>(
+        address: A,
+        mempool: Arc<Mempool>,
+        network: Network,
+    ) -> Result<Self, CompactFiltersError> {
+        let connection = TcpStream::connect(address)?;
+
+        let writer = Arc::new(Mutex::new(connection.try_clone()?));
+        let responses: Arc<RwLock<ResponsesMap>> = Arc::new(RwLock::new(HashMap::new()));
+        let connected = Arc::new(RwLock::new(true));
+
+        let mut locked_writer = writer.lock().unwrap();
+
+        let reader_thread_responses = Arc::clone(&responses);
+        let reader_thread_writer = Arc::clone(&writer);
+        let reader_thread_mempool = Arc::clone(&mempool);
+        let reader_thread_connected = Arc::clone(&connected);
+        let reader_thread = thread::spawn(move || {
+            Self::reader_thread(
+                network,
+                connection,
+                reader_thread_responses,
+                reader_thread_writer,
+                reader_thread_mempool,
+                reader_thread_connected,
+            )
+        });
+
+        let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as i64;
+        let nonce = thread_rng().gen();
+        let receiver = Address::new(&locked_writer.peer_addr()?, ServiceFlags::NONE);
+        let sender = Address {
+            services: ServiceFlags::NONE,
+            address: [0u16; 8],
+            port: 0,
+        };
+
+        Self::_send(
+            &mut locked_writer,
+            network.magic(),
+            NetworkMessage::Version(VersionMessage::new(
+                ServiceFlags::WITNESS,
+                timestamp,
+                receiver,
+                sender,
+                nonce,
+                "MagicalBitcoinWallet".into(),
+                0,
+            )),
+        )?;
+        let version = if let NetworkMessage::Version(version) =
+            Self::_recv(&responses, "version", None)?.unwrap()
+        {
+            version
+        } else {
+            return Err(CompactFiltersError::InvalidResponse);
+        };
+
+        if let NetworkMessage::Verack = Self::_recv(&responses, "verack", None)?.unwrap() {
+            Self::_send(&mut locked_writer, network.magic(), NetworkMessage::Verack)?;
+        } else {
+            return Err(CompactFiltersError::InvalidResponse);
+        }
+
+        std::mem::drop(locked_writer);
+
+        Ok(Peer {
+            writer,
+            reader_thread,
+            responses,
+            connected,
+            mempool,
+            network,
+            version,
+        })
+    }
+
+    pub fn new_connection(&self) -> Result<Self, CompactFiltersError> {
+        let socket_addr = self.writer.lock().unwrap().peer_addr()?;
+        Self::new(socket_addr, Arc::clone(&self.mempool), self.network)
+    }
+
+    fn _send(
+        writer: &mut TcpStream,
+        magic: u32,
+        payload: NetworkMessage,
+    ) -> Result<(), CompactFiltersError> {
+        log::trace!("==> {:?}", payload);
+
+        let raw_message = RawNetworkMessage { magic, payload };
+
+        raw_message
+            .consensus_encode(writer)
+            .map_err(|_| CompactFiltersError::DataCorruption)?;
+
+        Ok(())
+    }
+
+    fn _recv(
+        responses: &Arc<RwLock<ResponsesMap>>,
+        wait_for: &'static str,
+        timeout: Option<Duration>,
+    ) -> Result<Option<NetworkMessage>, CompactFiltersError> {
+        let message_resp = {
+            let mut lock = responses.write().unwrap();
+            let message_resp = lock.entry(wait_for).or_default();
+            Arc::clone(&message_resp)
+        };
+
+        let (lock, cvar) = &*message_resp;
+
+        let mut messages = lock.lock().unwrap();
+        while messages.is_empty() {
+            match timeout {
+                None => messages = cvar.wait(messages).unwrap(),
+                Some(t) => {
+                    let result = cvar.wait_timeout(messages, t).unwrap();
+                    if result.1.timed_out() {
+                        return Ok(None);
+                    }
+
+                    messages = result.0;
+                }
+            }
+        }
+
+        Ok(messages.pop())
+    }
+
+    pub fn get_version(&self) -> &VersionMessage {
+        &self.version
+    }
+
+    pub fn get_mempool(&self) -> Arc<Mempool> {
+        Arc::clone(&self.mempool)
+    }
+
+    pub fn is_connected(&self) -> bool {
+        *self.connected.read().unwrap()
+    }
+
+    pub fn reader_thread(
+        network: Network,
+        connection: TcpStream,
+        reader_thread_responses: Arc<RwLock<ResponsesMap>>,
+        reader_thread_writer: Arc<Mutex<TcpStream>>,
+        reader_thread_mempool: Arc<Mempool>,
+        reader_thread_connected: Arc<RwLock<bool>>,
+    ) {
+        macro_rules! check_disconnect {
+            ($call:expr) => {
+                match $call {
+                    Ok(good) => good,
+                    Err(e) => {
+                        log::debug!("Error {:?}", e);
+                        *reader_thread_connected.write().unwrap() = false;
+
+                        break;
+                    }
+                }
+            };
+        }
+
+        let mut reader = StreamReader::new(connection, None);
+        loop {
+            let raw_message: RawNetworkMessage = check_disconnect!(reader.read_next());
+
+            let in_message = if raw_message.magic != network.magic() {
+                continue;
+            } else {
+                raw_message.payload
+            };
+
+            log::trace!("<== {:?}", in_message);
+
+            match in_message {
+                NetworkMessage::Ping(nonce) => {
+                    check_disconnect!(Self::_send(
+                        &mut reader_thread_writer.lock().unwrap(),
+                        network.magic(),
+                        NetworkMessage::Pong(nonce),
+                    ));
+
+                    continue;
+                }
+                NetworkMessage::Alert(_) => continue,
+                NetworkMessage::GetData(ref inv) => {
+                    let (found, not_found): (Vec<_>, Vec<_>) = inv
+                        .into_iter()
+                        .map(|item| (*item, reader_thread_mempool.get_tx(item)))
+                        .partition(|(_, d)| d.is_some());
+                    for (_, found_tx) in found {
+                        check_disconnect!(Self::_send(
+                            &mut reader_thread_writer.lock().unwrap(),
+                            network.magic(),
+                            NetworkMessage::Tx(found_tx.unwrap()),
+                        ));
+                    }
+
+                    if !not_found.is_empty() {
+                        check_disconnect!(Self::_send(
+                            &mut reader_thread_writer.lock().unwrap(),
+                            network.magic(),
+                            NetworkMessage::NotFound(
+                                not_found.into_iter().map(|(i, _)| i).collect(),
+                            ),
+                        ));
+                    }
+                }
+                _ => {}
+            }
+
+            let message_resp = {
+                let mut lock = reader_thread_responses.write().unwrap();
+                let message_resp = lock.entry(in_message.cmd()).or_default();
+                Arc::clone(&message_resp)
+            };
+
+            let (lock, cvar) = &*message_resp;
+            let mut messages = lock.lock().unwrap();
+            messages.push(in_message);
+            cvar.notify_all();
+        }
+    }
+
+    pub fn send(&self, payload: NetworkMessage) -> Result<(), CompactFiltersError> {
+        let mut writer = self.writer.lock().unwrap();
+        Self::_send(&mut writer, self.network.magic(), payload)
+    }
+
+    pub fn recv(
+        &self,
+        wait_for: &'static str,
+        timeout: Option<Duration>,
+    ) -> Result<Option<NetworkMessage>, CompactFiltersError> {
+        Self::_recv(&self.responses, wait_for, timeout)
+    }
+}
+
+pub trait CompactFiltersPeer {
+    fn get_cf_checkpt(
+        &self,
+        filter_type: u8,
+        stop_hash: BlockHash,
+    ) -> Result<CFCheckpt, CompactFiltersError>;
+    fn get_cf_headers(
+        &self,
+        filter_type: u8,
+        start_height: u32,
+        stop_hash: BlockHash,
+    ) -> Result<CFHeaders, CompactFiltersError>;
+    fn get_cf_filters(
+        &self,
+        filter_type: u8,
+        start_height: u32,
+        stop_hash: BlockHash,
+    ) -> Result<(), CompactFiltersError>;
+    fn pop_cf_filter_resp(&self) -> Result<CFilter, CompactFiltersError>;
+}
+
+impl CompactFiltersPeer for Peer {
+    fn get_cf_checkpt(
+        &self,
+        filter_type: u8,
+        stop_hash: BlockHash,
+    ) -> Result<CFCheckpt, CompactFiltersError> {
+        self.send(NetworkMessage::GetCFCheckpt(GetCFCheckpt {
+            filter_type,
+            stop_hash,
+        }))?;
+
+        let response = self
+            .recv("cfcheckpt", Some(Duration::from_secs(10)))?
+            .ok_or(CompactFiltersError::Timeout)?;
+        let response = match response {
+            NetworkMessage::CFCheckpt(response) => response,
+            _ => return Err(CompactFiltersError::InvalidResponse),
+        };
+
+        if response.filter_type != filter_type {
+            return Err(CompactFiltersError::InvalidResponse);
+        }
+
+        Ok(response)
+    }
+
+    fn get_cf_headers(
+        &self,
+        filter_type: u8,
+        start_height: u32,
+        stop_hash: BlockHash,
+    ) -> Result<CFHeaders, CompactFiltersError> {
+        self.send(NetworkMessage::GetCFHeaders(GetCFHeaders {
+            filter_type,
+            start_height,
+            stop_hash,
+        }))?;
+
+        let response = self
+            .recv("cfheaders", Some(Duration::from_secs(10)))?
+            .ok_or(CompactFiltersError::Timeout)?;
+        let response = match response {
+            NetworkMessage::CFHeaders(response) => response,
+            _ => return Err(CompactFiltersError::InvalidResponse),
+        };
+
+        if response.filter_type != filter_type {
+            return Err(CompactFiltersError::InvalidResponse);
+        }
+
+        Ok(response)
+    }
+
+    fn pop_cf_filter_resp(&self) -> Result<CFilter, CompactFiltersError> {
+        let response = self
+            .recv("cfilter", Some(Duration::from_secs(10)))?
+            .ok_or(CompactFiltersError::Timeout)?;
+        let response = match response {
+            NetworkMessage::CFilter(response) => response,
+            _ => return Err(CompactFiltersError::InvalidResponse),
+        };
+
+        Ok(response)
+    }
+
+    fn get_cf_filters(
+        &self,
+        filter_type: u8,
+        start_height: u32,
+        stop_hash: BlockHash,
+    ) -> Result<(), CompactFiltersError> {
+        self.send(NetworkMessage::GetCFilters(GetCFilters {
+            filter_type,
+            start_height,
+            stop_hash,
+        }))?;
+
+        Ok(())
+    }
+}
+
+pub trait InvPeer {
+    fn get_block(&self, block_hash: BlockHash) -> Result<Option<Block>, CompactFiltersError>;
+    fn ask_for_mempool(&self) -> Result<(), CompactFiltersError>;
+    fn broadcast_tx(&self, tx: Transaction) -> Result<(), CompactFiltersError>;
+}
+
+impl InvPeer for Peer {
+    fn get_block(&self, block_hash: BlockHash) -> Result<Option<Block>, CompactFiltersError> {
+        self.send(NetworkMessage::GetData(vec![Inventory::WitnessBlock(
+            block_hash,
+        )]))?;
+
+        match self.recv("block", Some(Duration::from_secs(10)))? {
+            None => Ok(None),
+            Some(NetworkMessage::Block(response)) => Ok(Some(response)),
+            _ => Err(CompactFiltersError::InvalidResponse),
+        }
+    }
+
+    fn ask_for_mempool(&self) -> Result<(), CompactFiltersError> {
+        self.send(NetworkMessage::MemPool)?;
+        let inv = match self.recv("inv", Some(Duration::from_secs(5)))? {
+            None => return Ok(()), // empty mempool
+            Some(NetworkMessage::Inv(inv)) => inv,
+            _ => return Err(CompactFiltersError::InvalidResponse),
+        };
+
+        let getdata = inv
+            .iter()
+            .cloned()
+            .filter(|item| match item {
+                Inventory::Transaction(txid) if !self.mempool.has_tx(txid) => true,
+                _ => false,
+            })
+            .collect::<Vec<_>>();
+        let num_txs = getdata.len();
+        self.send(NetworkMessage::GetData(getdata))?;
+
+        for _ in 0..num_txs {
+            let tx = self
+                .recv("tx", Some(Duration::from_secs(10)))?
+                .ok_or(CompactFiltersError::Timeout)?;
+            let tx = match tx {
+                NetworkMessage::Tx(tx) => tx,
+                _ => return Err(CompactFiltersError::InvalidResponse),
+            };
+
+            self.mempool.add_tx(tx);
+        }
+
+        Ok(())
+    }
+
+    fn broadcast_tx(&self, tx: Transaction) -> Result<(), CompactFiltersError> {
+        self.mempool.add_tx(tx.clone());
+        self.send(NetworkMessage::Tx(tx))?;
+
+        Ok(())
+    }
+}
diff --git a/src/blockchain/compact_filters/store.rs b/src/blockchain/compact_filters/store.rs
new file mode 100644 (file)
index 0000000..81f24d9
--- /dev/null
@@ -0,0 +1,871 @@
+use std::convert::TryInto;
+use std::io::{Read, Write};
+use std::marker::PhantomData;
+use std::ops::Deref;
+use std::sync::Arc;
+use std::sync::RwLock;
+
+use rand::distributions::Alphanumeric;
+use rand::{thread_rng, Rng};
+
+use rocksdb::{Direction, IteratorMode, ReadOptions, WriteBatch, DB};
+
+use bitcoin::consensus::{deserialize, encode::VarInt, serialize, Decodable, Encodable};
+use bitcoin::hash_types::FilterHash;
+use bitcoin::hashes::hex::FromHex;
+use bitcoin::hashes::{sha256d, Hash};
+use bitcoin::util::bip158::BlockFilter;
+use bitcoin::util::hash::BitcoinHash;
+use bitcoin::util::uint::Uint256;
+use bitcoin::Block;
+use bitcoin::BlockHash;
+use bitcoin::BlockHeader;
+use bitcoin::Network;
+
+use super::CompactFiltersError;
+
+lazy_static! {
+    static ref MAINNET_GENESIS: Block = deserialize(&Vec::<u8>::from_hex("0100000000000000000000000000000000000000000000000000000000000000000000003BA3EDFD7A7B12B27AC72C3E67768F617FC81BC3888A51323A9FB8AA4B1E5E4A29AB5F49FFFF001D1DAC2B7C0101000000010000000000000000000000000000000000000000000000000000000000000000FFFFFFFF4D04FFFF001D0104455468652054696D65732030332F4A616E2F32303039204368616E63656C6C6F72206F6E206272696E6B206F66207365636F6E64206261696C6F757420666F722062616E6B73FFFFFFFF0100F2052A01000000434104678AFDB0FE5548271967F1A67130B7105CD6A828E03909A67962E0EA1F61DEB649F6BC3F4CEF38C4F35504E51EC112DE5C384DF7BA0B8D578A4C702B6BF11D5FAC00000000").unwrap()).unwrap();
+    static ref TESTNET_GENESIS: Block = deserialize(&Vec::<u8>::from_hex("0100000000000000000000000000000000000000000000000000000000000000000000003BA3EDFD7A7B12B27AC72C3E67768F617FC81BC3888A51323A9FB8AA4B1E5E4ADAE5494DFFFF001D1AA4AE180101000000010000000000000000000000000000000000000000000000000000000000000000FFFFFFFF4D04FFFF001D0104455468652054696D65732030332F4A616E2F32303039204368616E63656C6C6F72206F6E206272696E6B206F66207365636F6E64206261696C6F757420666F722062616E6B73FFFFFFFF0100F2052A01000000434104678AFDB0FE5548271967F1A67130B7105CD6A828E03909A67962E0EA1F61DEB649F6BC3F4CEF38C4F35504E51EC112DE5C384DF7BA0B8D578A4C702B6BF11D5FAC00000000").unwrap()).unwrap();
+    static ref REGTEST_GENESIS: Block = deserialize(&Vec::<u8>::from_hex("0100000000000000000000000000000000000000000000000000000000000000000000003BA3EDFD7A7B12B27AC72C3E67768F617FC81BC3888A51323A9FB8AA4B1E5E4ADAE5494DFFFF7F20020000000101000000010000000000000000000000000000000000000000000000000000000000000000FFFFFFFF4D04FFFF001D0104455468652054696D65732030332F4A616E2F32303039204368616E63656C6C6F72206F6E206272696E6B206F66207365636F6E64206261696C6F757420666F722062616E6B73FFFFFFFF0100F2052A01000000434104678AFDB0FE5548271967F1A67130B7105CD6A828E03909A67962E0EA1F61DEB649F6BC3F4CEF38C4F35504E51EC112DE5C384DF7BA0B8D578A4C702B6BF11D5FAC00000000").unwrap()).unwrap();
+}
+
+pub trait StoreType: Default {}
+
+#[derive(Default)]
+pub struct Full;
+impl StoreType for Full {}
+#[derive(Default)]
+pub struct Snapshot;
+impl StoreType for Snapshot {}
+
+pub enum StoreEntry {
+    BlockHeader(Option<usize>),
+    Block(Option<usize>),
+    BlockHeaderIndex(Option<BlockHash>),
+    CFilterTable((u8, Option<usize>)),
+}
+
+impl StoreEntry {
+    pub fn get_prefix(&self) -> Vec<u8> {
+        match self {
+            StoreEntry::BlockHeader(_) => b"z",
+            StoreEntry::Block(_) => b"x",
+            StoreEntry::BlockHeaderIndex(_) => b"i",
+            StoreEntry::CFilterTable(_) => b"t",
+        }
+        .to_vec()
+    }
+
+    pub fn get_key(&self) -> Vec<u8> {
+        let mut prefix = self.get_prefix();
+        match self {
+            StoreEntry::BlockHeader(Some(height)) => {
+                prefix.extend_from_slice(&height.to_be_bytes())
+            }
+            StoreEntry::Block(Some(height)) => prefix.extend_from_slice(&height.to_be_bytes()),
+            StoreEntry::BlockHeaderIndex(Some(hash)) => {
+                prefix.extend_from_slice(&hash.into_inner())
+            }
+            StoreEntry::CFilterTable((filter_type, bundle_index)) => {
+                prefix.push(*filter_type);
+                if let Some(bundle_index) = bundle_index {
+                    prefix.extend_from_slice(&bundle_index.to_be_bytes());
+                }
+            }
+            _ => {}
+        }
+
+        prefix
+    }
+}
+
+pub trait SerializeDb: Sized {
+    fn serialize(&self) -> Vec<u8>;
+    fn deserialize(data: &[u8]) -> Result<Self, CompactFiltersError>;
+}
+
+impl<T> SerializeDb for T
+where
+    T: Encodable + Decodable,
+{
+    fn serialize(&self) -> Vec<u8> {
+        serialize(self)
+    }
+
+    fn deserialize(data: &[u8]) -> Result<Self, CompactFiltersError> {
+        Ok(deserialize(data).map_err(|_| CompactFiltersError::DataCorruption)?)
+    }
+}
+
+impl Encodable for FilterHeader {
+    fn consensus_encode<W: Write>(
+        &self,
+        mut e: W,
+    ) -> Result<usize, bitcoin::consensus::encode::Error> {
+        let mut written = self.prev_header_hash.consensus_encode(&mut e)?;
+        written += self.filter_hash.consensus_encode(&mut e)?;
+        Ok(written)
+    }
+}
+
+impl Decodable for FilterHeader {
+    fn consensus_decode<D: Read>(mut d: D) -> Result<Self, bitcoin::consensus::encode::Error> {
+        let prev_header_hash = FilterHeaderHash::consensus_decode(&mut d)?;
+        let filter_hash = FilterHash::consensus_decode(&mut d)?;
+
+        Ok(FilterHeader {
+            prev_header_hash,
+            filter_hash,
+        })
+    }
+}
+
+impl Encodable for BundleStatus {
+    fn consensus_encode<W: Write>(
+        &self,
+        mut e: W,
+    ) -> Result<usize, bitcoin::consensus::encode::Error> {
+        let mut written = 0;
+
+        match self {
+            BundleStatus::Init => {
+                written += 0x00u8.consensus_encode(&mut e)?;
+            }
+            BundleStatus::CFHeaders { cf_headers } => {
+                written += 0x01u8.consensus_encode(&mut e)?;
+                written += VarInt(cf_headers.len() as u64).consensus_encode(&mut e)?;
+                for header in cf_headers {
+                    written += header.consensus_encode(&mut e)?;
+                }
+            }
+            BundleStatus::CFilters { cf_filters } => {
+                written += 0x02u8.consensus_encode(&mut e)?;
+                written += VarInt(cf_filters.len() as u64).consensus_encode(&mut e)?;
+                for filter in cf_filters {
+                    written += filter.consensus_encode(&mut e)?;
+                }
+            }
+            BundleStatus::Processed { cf_filters } => {
+                written += 0x03u8.consensus_encode(&mut e)?;
+                written += VarInt(cf_filters.len() as u64).consensus_encode(&mut e)?;
+                for filter in cf_filters {
+                    written += filter.consensus_encode(&mut e)?;
+                }
+            }
+            BundleStatus::Pruned => {
+                written += 0x04u8.consensus_encode(&mut e)?;
+            }
+            BundleStatus::Tip { cf_filters } => {
+                written += 0x05u8.consensus_encode(&mut e)?;
+                written += VarInt(cf_filters.len() as u64).consensus_encode(&mut e)?;
+                for filter in cf_filters {
+                    written += filter.consensus_encode(&mut e)?;
+                }
+            }
+        }
+
+        Ok(written)
+    }
+}
+
+impl Decodable for BundleStatus {
+    fn consensus_decode<D: Read>(mut d: D) -> Result<Self, bitcoin::consensus::encode::Error> {
+        let byte_type = u8::consensus_decode(&mut d)?;
+        match byte_type {
+            0x00 => Ok(BundleStatus::Init),
+            0x01 => {
+                let num = VarInt::consensus_decode(&mut d)?;
+                let num = num.0 as usize;
+
+                let mut cf_headers = Vec::with_capacity(num);
+                for _ in 0..num {
+                    cf_headers.push(FilterHeader::consensus_decode(&mut d)?);
+                }
+
+                Ok(BundleStatus::CFHeaders { cf_headers })
+            }
+            0x02 => {
+                let num = VarInt::consensus_decode(&mut d)?;
+                let num = num.0 as usize;
+
+                let mut cf_filters = Vec::with_capacity(num);
+                for _ in 0..num {
+                    cf_filters.push(Vec::<u8>::consensus_decode(&mut d)?);
+                }
+
+                Ok(BundleStatus::CFilters { cf_filters })
+            }
+            0x03 => {
+                let num = VarInt::consensus_decode(&mut d)?;
+                let num = num.0 as usize;
+
+                let mut cf_filters = Vec::with_capacity(num);
+                for _ in 0..num {
+                    cf_filters.push(Vec::<u8>::consensus_decode(&mut d)?);
+                }
+
+                Ok(BundleStatus::Processed { cf_filters })
+            }
+            0x04 => Ok(BundleStatus::Pruned),
+            0x05 => {
+                let num = VarInt::consensus_decode(&mut d)?;
+                let num = num.0 as usize;
+
+                let mut cf_filters = Vec::with_capacity(num);
+                for _ in 0..num {
+                    cf_filters.push(Vec::<u8>::consensus_decode(&mut d)?);
+                }
+
+                Ok(BundleStatus::Tip { cf_filters })
+            }
+            _ => Err(bitcoin::consensus::encode::Error::ParseFailed(
+                "Invalid byte type",
+            )),
+        }
+    }
+}
+
+pub struct HeadersStore<T: StoreType> {
+    store: Arc<RwLock<DB>>,
+    cf_name: String,
+    min_height: usize,
+    network: Network,
+    phantom: PhantomData<T>,
+}
+
+impl HeadersStore<Full> {
+    pub fn new(store: DB, network: Network) -> Result<Self, CompactFiltersError> {
+        let genesis = match network {
+            Network::Bitcoin => MAINNET_GENESIS.deref(),
+            Network::Testnet => TESTNET_GENESIS.deref(),
+            Network::Regtest => REGTEST_GENESIS.deref(),
+        };
+
+        let cf_name = "default".to_string();
+        let cf_handle = store.cf_handle(&cf_name).unwrap();
+
+        let genesis_key = StoreEntry::BlockHeader(Some(0)).get_key();
+
+        if store.get_pinned_cf(cf_handle, &genesis_key)?.is_none() {
+            let mut batch = WriteBatch::default();
+            batch.put_cf(
+                cf_handle,
+                genesis_key,
+                (genesis.header, genesis.header.work()).serialize(),
+            );
+            batch.put_cf(
+                cf_handle,
+                StoreEntry::BlockHeaderIndex(Some(genesis.bitcoin_hash())).get_key(),
+                &0usize.to_be_bytes(),
+            );
+            store.write(batch)?;
+        }
+
+        Ok(HeadersStore {
+            store: Arc::new(RwLock::new(store)),
+            cf_name,
+            min_height: 0,
+            network,
+            phantom: PhantomData,
+        })
+    }
+
+    pub fn get_locators(&self) -> Result<Vec<(BlockHash, usize)>, CompactFiltersError> {
+        let mut step = 1;
+        let mut index = self.get_height()?;
+        let mut answer = Vec::new();
+
+        let store_read = self.store.read().unwrap();
+        let cf_handle = store_read.cf_handle(&self.cf_name).unwrap();
+
+        loop {
+            if answer.len() > 10 {
+                step *= 2;
+            }
+
+            let (header, _): (BlockHeader, Uint256) = SerializeDb::deserialize(
+                &store_read
+                    .get_pinned_cf(cf_handle, StoreEntry::BlockHeader(Some(index)).get_key())?
+                    .unwrap(),
+            )?;
+            answer.push((header.bitcoin_hash(), index));
+
+            if let Some(new_index) = index.checked_sub(step) {
+                index = new_index;
+            } else {
+                break;
+            }
+        }
+
+        Ok(answer)
+    }
+
+    pub fn start_snapshot(
+        &self,
+        from: usize,
+    ) -> Result<HeadersStore<Snapshot>, CompactFiltersError> {
+        let new_cf_name: String = thread_rng().sample_iter(&Alphanumeric).take(16).collect();
+        let new_cf_name = format!("_headers:{}", new_cf_name);
+
+        let mut write_store = self.store.write().unwrap();
+
+        write_store.create_cf(&new_cf_name, &Default::default())?;
+
+        let cf_handle = write_store.cf_handle(&self.cf_name).unwrap();
+        let new_cf_handle = write_store.cf_handle(&new_cf_name).unwrap();
+
+        let (header, work): (BlockHeader, Uint256) = SerializeDb::deserialize(
+            &write_store
+                .get_pinned_cf(cf_handle, StoreEntry::BlockHeader(Some(from)).get_key())?
+                .ok_or(CompactFiltersError::DataCorruption)?,
+        )?;
+
+        let mut batch = WriteBatch::default();
+        batch.put_cf(
+            new_cf_handle,
+            StoreEntry::BlockHeaderIndex(Some(header.bitcoin_hash())).get_key(),
+            &from.to_be_bytes(),
+        );
+        batch.put_cf(
+            new_cf_handle,
+            StoreEntry::BlockHeader(Some(from)).get_key(),
+            (header, work).serialize(),
+        );
+        write_store.write(batch)?;
+
+        let store = Arc::clone(&self.store);
+        Ok(HeadersStore {
+            store,
+            cf_name: new_cf_name,
+            min_height: from,
+            network: self.network,
+            phantom: PhantomData,
+        })
+    }
+
+    pub fn recover_snapshot(&self, cf_name: &str) -> Result<(), CompactFiltersError> {
+        let mut write_store = self.store.write().unwrap();
+        let snapshot_cf_handle = write_store.cf_handle(cf_name).unwrap();
+
+        let prefix = StoreEntry::BlockHeader(None).get_key();
+        let mut iterator = write_store.prefix_iterator_cf(snapshot_cf_handle, prefix);
+
+        let min_height = match iterator
+            .next()
+            .and_then(|(k, _)| k[1..].try_into().ok())
+            .map(|bytes| usize::from_be_bytes(bytes))
+        {
+            None => {
+                std::mem::drop(iterator);
+                write_store.drop_cf(cf_name).ok();
+
+                return Ok(());
+            }
+            Some(x) => x,
+        };
+        std::mem::drop(iterator);
+        std::mem::drop(write_store);
+
+        let snapshot = HeadersStore {
+            store: Arc::clone(&self.store),
+            cf_name: cf_name.into(),
+            min_height,
+            network: self.network,
+            phantom: PhantomData,
+        };
+        if snapshot.work()? > self.work()? {
+            self.apply_snapshot(snapshot)?;
+        }
+
+        Ok(())
+    }
+
+    pub fn apply_snapshot(
+        &self,
+        snaphost: HeadersStore<Snapshot>,
+    ) -> Result<(), CompactFiltersError> {
+        let mut batch = WriteBatch::default();
+
+        let read_store = self.store.read().unwrap();
+        let cf_handle = read_store.cf_handle(&self.cf_name).unwrap();
+        let snapshot_cf_handle = read_store.cf_handle(&snaphost.cf_name).unwrap();
+
+        let from_key = StoreEntry::BlockHeader(Some(snaphost.min_height)).get_key();
+        let to_key = StoreEntry::BlockHeader(Some(usize::MAX)).get_key();
+
+        let mut opts = ReadOptions::default();
+        opts.set_iterate_upper_bound(to_key.clone());
+
+        log::debug!("Removing items");
+        batch.delete_range_cf(cf_handle, &from_key, &to_key);
+        for (_, v) in read_store.iterator_cf_opt(
+            cf_handle,
+            opts,
+            IteratorMode::From(&from_key, Direction::Forward),
+        ) {
+            let (header, _): (BlockHeader, Uint256) = SerializeDb::deserialize(&v)?;
+
+            batch.delete_cf(
+                cf_handle,
+                StoreEntry::BlockHeaderIndex(Some(header.bitcoin_hash())).get_key(),
+            );
+        }
+
+        // Delete full blocks overriden by snapshot
+        let from_key = StoreEntry::Block(Some(snaphost.min_height)).get_key();
+        let to_key = StoreEntry::Block(Some(usize::MAX)).get_key();
+        batch.delete_range(&from_key, &to_key);
+
+        log::debug!("Copying over new items");
+        for (k, v) in read_store.iterator_cf(snapshot_cf_handle, IteratorMode::Start) {
+            batch.put_cf(cf_handle, k, v);
+        }
+
+        read_store.write(batch)?;
+
+        std::mem::drop(snapshot_cf_handle);
+        std::mem::drop(cf_handle);
+        std::mem::drop(read_store);
+
+        self.store.write().unwrap().drop_cf(&snaphost.cf_name)?;
+
+        Ok(())
+    }
+
+    pub fn get_height_for(
+        &self,
+        block_hash: &BlockHash,
+    ) -> Result<Option<usize>, CompactFiltersError> {
+        let read_store = self.store.read().unwrap();
+        let cf_handle = read_store.cf_handle(&self.cf_name).unwrap();
+
+        let key = StoreEntry::BlockHeaderIndex(Some(block_hash.clone())).get_key();
+        let data = read_store.get_pinned_cf(cf_handle, key)?;
+        Ok(data
+            .map(|data| {
+                Ok::<_, CompactFiltersError>(usize::from_be_bytes(
+                    data.as_ref()
+                        .try_into()
+                        .map_err(|_| CompactFiltersError::DataCorruption)?,
+                ))
+            })
+            .transpose()?)
+    }
+
+    pub fn get_block_hash(&self, height: usize) -> Result<Option<BlockHash>, CompactFiltersError> {
+        let read_store = self.store.read().unwrap();
+        let cf_handle = read_store.cf_handle(&self.cf_name).unwrap();
+
+        let key = StoreEntry::BlockHeader(Some(height)).get_key();
+        let data = read_store.get_pinned_cf(cf_handle, key)?;
+        Ok(data
+            .map(|data| {
+                let (header, _): (BlockHeader, Uint256) =
+                    deserialize(&data).map_err(|_| CompactFiltersError::DataCorruption)?;
+                Ok::<_, CompactFiltersError>(header.bitcoin_hash())
+            })
+            .transpose()?)
+    }
+
+    pub fn save_full_block(&self, block: &Block, height: usize) -> Result<(), CompactFiltersError> {
+        let key = StoreEntry::Block(Some(height)).get_key();
+        self.store.read().unwrap().put(key, block.serialize())?;
+
+        Ok(())
+    }
+
+    pub fn get_full_block(&self, height: usize) -> Result<Option<Block>, CompactFiltersError> {
+        let read_store = self.store.read().unwrap();
+
+        let key = StoreEntry::Block(Some(height)).get_key();
+        let opt_block = read_store.get_pinned(key)?;
+
+        Ok(opt_block
+            .map(|data| deserialize(&data))
+            .transpose()
+            .map_err(|_| CompactFiltersError::DataCorruption)?)
+    }
+
+    pub fn delete_blocks_until(&self, height: usize) -> Result<(), CompactFiltersError> {
+        let from_key = StoreEntry::Block(Some(0)).get_key();
+        let to_key = StoreEntry::Block(Some(height)).get_key();
+
+        let mut batch = WriteBatch::default();
+        batch.delete_range(&from_key, &to_key);
+
+        self.store.read().unwrap().write(batch)?;
+
+        Ok(())
+    }
+
+    pub fn iter_full_blocks(&self) -> Result<Vec<(usize, Block)>, CompactFiltersError> {
+        let read_store = self.store.read().unwrap();
+
+        let prefix = StoreEntry::Block(None).get_key();
+
+        let iterator = read_store.prefix_iterator(&prefix);
+        // FIXME: we have to filter manually because rocksdb sometimes returns stuff that doesn't
+        // have the right prefix
+        iterator
+            .filter(|(k, _)| k.starts_with(&prefix))
+            .map(|(k, v)| {
+                let height: usize = usize::from_be_bytes(
+                    k[1..]
+                        .try_into()
+                        .map_err(|_| CompactFiltersError::DataCorruption)?,
+                );
+                let block = SerializeDb::deserialize(&v)?;
+
+                Ok((height, block))
+            })
+            .collect::<Result<_, _>>()
+    }
+}
+
+impl<T: StoreType> HeadersStore<T> {
+    pub fn work(&self) -> Result<Uint256, CompactFiltersError> {
+        let read_store = self.store.read().unwrap();
+        let cf_handle = read_store.cf_handle(&self.cf_name).unwrap();
+
+        let prefix = StoreEntry::BlockHeader(None).get_key();
+        let iterator = read_store.prefix_iterator_cf(cf_handle, prefix);
+
+        Ok(iterator
+            .last()
+            .map(|(_, v)| -> Result<_, CompactFiltersError> {
+                let (_, work): (BlockHeader, Uint256) = SerializeDb::deserialize(&v)?;
+
+                Ok(work)
+            })
+            .transpose()?
+            .unwrap_or_default())
+    }
+
+    pub fn get_height(&self) -> Result<usize, CompactFiltersError> {
+        let read_store = self.store.read().unwrap();
+        let cf_handle = read_store.cf_handle(&self.cf_name).unwrap();
+
+        let prefix = StoreEntry::BlockHeader(None).get_key();
+        let iterator = read_store.prefix_iterator_cf(cf_handle, prefix);
+
+        Ok(iterator
+            .last()
+            .map(|(k, _)| -> Result<_, CompactFiltersError> {
+                let height = usize::from_be_bytes(
+                    k[1..]
+                        .try_into()
+                        .map_err(|_| CompactFiltersError::DataCorruption)?,
+                );
+
+                Ok(height)
+            })
+            .transpose()?
+            .unwrap_or_default())
+    }
+
+    pub fn get_tip_hash(&self) -> Result<Option<BlockHash>, CompactFiltersError> {
+        let read_store = self.store.read().unwrap();
+        let cf_handle = read_store.cf_handle(&self.cf_name).unwrap();
+
+        let prefix = StoreEntry::BlockHeader(None).get_key();
+        let iterator = read_store.prefix_iterator_cf(cf_handle, prefix);
+
+        Ok(iterator
+            .last()
+            .map(|(_, v)| -> Result<_, CompactFiltersError> {
+                let (header, _): (BlockHeader, Uint256) = SerializeDb::deserialize(&v)?;
+
+                Ok(header.bitcoin_hash())
+            })
+            .transpose()?)
+    }
+
+    pub fn apply(
+        &mut self,
+        from: usize,
+        headers: Vec<BlockHeader>,
+    ) -> Result<BlockHash, CompactFiltersError> {
+        let mut batch = WriteBatch::default();
+
+        let read_store = self.store.read().unwrap();
+        let cf_handle = read_store.cf_handle(&self.cf_name).unwrap();
+
+        let (mut last_hash, mut accumulated_work) = read_store
+            .get_pinned_cf(cf_handle, StoreEntry::BlockHeader(Some(from)).get_key())?
+            .map(|result| {
+                let (header, work): (BlockHeader, Uint256) = SerializeDb::deserialize(&result)?;
+                Ok::<_, CompactFiltersError>((header.bitcoin_hash(), work))
+            })
+            .transpose()?
+            .ok_or(CompactFiltersError::DataCorruption)?;
+
+        for (index, header) in headers.into_iter().enumerate() {
+            if header.prev_blockhash != last_hash {
+                return Err(CompactFiltersError::InvalidHeaders);
+            }
+
+            last_hash = header.bitcoin_hash();
+            accumulated_work = accumulated_work + header.work();
+
+            let height = from + index + 1;
+            batch.put_cf(
+                cf_handle,
+                StoreEntry::BlockHeaderIndex(Some(header.bitcoin_hash())).get_key(),
+                &(height).to_be_bytes(),
+            );
+            batch.put_cf(
+                cf_handle,
+                StoreEntry::BlockHeader(Some(height)).get_key(),
+                (header, accumulated_work).serialize(),
+            );
+        }
+
+        std::mem::drop(cf_handle);
+        std::mem::drop(read_store);
+
+        self.store.write().unwrap().write(batch)?;
+        Ok(last_hash)
+    }
+}
+
+pub type FilterHeaderHash = FilterHash;
+
+#[derive(Debug, Clone)]
+pub struct FilterHeader {
+    prev_header_hash: FilterHeaderHash,
+    filter_hash: FilterHash,
+}
+
+impl BitcoinHash<FilterHeaderHash> for FilterHeader {
+    fn bitcoin_hash(&self) -> FilterHeaderHash {
+        let mut hash_data = self.filter_hash.into_inner().to_vec();
+        hash_data.extend_from_slice(&self.prev_header_hash);
+        sha256d::Hash::hash(&hash_data).into()
+    }
+}
+
+pub enum BundleStatus {
+    Init,
+    CFHeaders { cf_headers: Vec<FilterHeader> },
+    CFilters { cf_filters: Vec<Vec<u8>> },
+    Processed { cf_filters: Vec<Vec<u8>> },
+    Tip { cf_filters: Vec<Vec<u8>> },
+    Pruned,
+}
+
+pub struct CFStore {
+    store: Arc<RwLock<DB>>,
+    filter_type: u8,
+}
+
+type BundleEntry = (BundleStatus, FilterHeaderHash);
+
+impl CFStore {
+    pub fn new(
+        headers_store: &HeadersStore<Full>,
+        filter_type: u8,
+    ) -> Result<Self, CompactFiltersError> {
+        let cf_store = CFStore {
+            store: Arc::clone(&headers_store.store),
+            filter_type,
+        };
+
+        let genesis = match headers_store.network {
+            Network::Bitcoin => MAINNET_GENESIS.deref(),
+            Network::Testnet => TESTNET_GENESIS.deref(),
+            Network::Regtest => REGTEST_GENESIS.deref(),
+        };
+
+        let filter = BlockFilter::new_script_filter(genesis, |utxo| {
+            Err(bitcoin::util::bip158::Error::UtxoMissing(*utxo))
+        })?;
+        let first_key = StoreEntry::CFilterTable((filter_type, Some(0))).get_key();
+
+        // Add the genesis' filter
+        {
+            let read_store = cf_store.store.read().unwrap();
+            if read_store.get_pinned(&first_key)?.is_none() {
+                read_store.put(
+                    &first_key,
+                    (BundleStatus::Init, filter.filter_id(&FilterHash::default())).serialize(),
+                )?;
+            }
+        }
+
+        Ok(cf_store)
+    }
+
+    pub fn get_filter_type(&self) -> u8 {
+        self.filter_type
+    }
+
+    pub fn get_bundles(&self) -> Result<Vec<BundleEntry>, CompactFiltersError> {
+        let read_store = self.store.read().unwrap();
+
+        let prefix = StoreEntry::CFilterTable((self.filter_type, None)).get_key();
+        let iterator = read_store.prefix_iterator(&prefix);
+
+        // FIXME: we have to filter manually because rocksdb sometimes returns stuff that doesn't
+        // have the right prefix
+        iterator
+            .filter(|(k, _)| k.starts_with(&prefix))
+            .map(|(_, data)| BundleEntry::deserialize(&data))
+            .collect::<Result<_, _>>()
+    }
+
+    pub fn get_checkpoints(&self) -> Result<Vec<FilterHash>, CompactFiltersError> {
+        let read_store = self.store.read().unwrap();
+
+        let prefix = StoreEntry::CFilterTable((self.filter_type, None)).get_key();
+        let iterator = read_store.prefix_iterator(&prefix);
+
+        // FIXME: we have to filter manually because rocksdb sometimes returns stuff that doesn't
+        // have the right prefix
+        Ok(iterator
+            .filter(|(k, _)| k.starts_with(&prefix))
+            .skip(1)
+            .map(|(_, data)| Ok::<_, CompactFiltersError>(BundleEntry::deserialize(&data)?.1))
+            .collect::<Result<_, _>>()?)
+    }
+
+    pub fn replace_checkpoints(
+        &self,
+        checkpoints: Vec<FilterHash>,
+    ) -> Result<(), CompactFiltersError> {
+        let current_checkpoints = self.get_checkpoints()?;
+
+        let mut equal_bundles = 0;
+        for (index, (our, their)) in current_checkpoints
+            .iter()
+            .zip(checkpoints.iter())
+            .enumerate()
+        {
+            equal_bundles = index;
+
+            if our != their {
+                break;
+            }
+        }
+
+        let read_store = self.store.read().unwrap();
+        let mut batch = WriteBatch::default();
+
+        for (index, filter_hash) in checkpoints.iter().enumerate().skip(equal_bundles) {
+            let key = StoreEntry::CFilterTable((self.filter_type, Some(index + 1))).get_key(); // +1 to skip the genesis' filter
+
+            if let Some((BundleStatus::Tip { .. }, _)) = read_store
+                .get_pinned(&key)?
+                .map(|data| BundleEntry::deserialize(&data))
+                .transpose()?
+            {
+                println!("Keeping bundle #{} as Tip", index);
+            } else {
+                batch.put(&key, (BundleStatus::Init, *filter_hash).serialize());
+            }
+        }
+
+        read_store.write(batch)?;
+
+        Ok(())
+    }
+
+    pub fn advance_to_cf_headers(
+        &self,
+        bundle: usize,
+        checkpoint_hash: FilterHeaderHash,
+        filter_headers: Vec<FilterHash>,
+    ) -> Result<BundleStatus, CompactFiltersError> {
+        let mut last_hash = checkpoint_hash;
+        let cf_headers = filter_headers
+            .into_iter()
+            .map(|filter_hash| {
+                let filter_header = FilterHeader {
+                    prev_header_hash: last_hash,
+                    filter_hash,
+                };
+                last_hash = filter_header.bitcoin_hash();
+
+                filter_header
+            })
+            .collect();
+
+        let read_store = self.store.read().unwrap();
+
+        let next_key = StoreEntry::CFilterTable((self.filter_type, Some(bundle + 1))).get_key(); // +1 to skip the genesis' filter
+        if let Some((_, next_checkpoint)) = read_store
+            .get_pinned(&next_key)?
+            .map(|data| BundleEntry::deserialize(&data))
+            .transpose()?
+        {
+            // check connection with the next bundle if present
+            if last_hash != next_checkpoint {
+                return Err(CompactFiltersError::InvalidFilterHeader);
+            }
+        }
+
+        let key = StoreEntry::CFilterTable((self.filter_type, Some(bundle))).get_key();
+        let value = (BundleStatus::CFHeaders { cf_headers }, checkpoint_hash);
+
+        read_store.put(key, value.serialize())?;
+
+        Ok(value.0)
+    }
+
+    pub fn advance_to_cf_filters(
+        &self,
+        bundle: usize,
+        checkpoint_hash: FilterHeaderHash,
+        headers: Vec<FilterHeader>,
+        filters: Vec<(usize, Vec<u8>)>,
+    ) -> Result<BundleStatus, CompactFiltersError> {
+        let cf_filters = filters
+            .into_iter()
+            .zip(headers.iter())
+            .map(|((_, filter_content), header)| {
+                if header.filter_hash != sha256d::Hash::hash(&filter_content).into() {
+                    return Err(CompactFiltersError::InvalidFilter);
+                }
+
+                Ok::<_, CompactFiltersError>(filter_content)
+            })
+            .collect::<Result<_, _>>()?;
+
+        let key = StoreEntry::CFilterTable((self.filter_type, Some(bundle))).get_key();
+        let value = (BundleStatus::CFilters { cf_filters }, checkpoint_hash);
+
+        let read_store = self.store.read().unwrap();
+        read_store.put(key, value.serialize())?;
+
+        Ok(value.0)
+    }
+
+    pub fn prune_filters(
+        &self,
+        bundle: usize,
+        checkpoint_hash: FilterHeaderHash,
+    ) -> Result<BundleStatus, CompactFiltersError> {
+        let key = StoreEntry::CFilterTable((self.filter_type, Some(bundle))).get_key();
+        let value = (BundleStatus::Pruned, checkpoint_hash);
+
+        let read_store = self.store.read().unwrap();
+        read_store.put(key, value.serialize())?;
+
+        Ok(value.0)
+    }
+
+    pub fn mark_as_tip(
+        &self,
+        bundle: usize,
+        cf_filters: Vec<Vec<u8>>,
+        checkpoint_hash: FilterHeaderHash,
+    ) -> Result<BundleStatus, CompactFiltersError> {
+        let key = StoreEntry::CFilterTable((self.filter_type, Some(bundle))).get_key();
+        let value = (BundleStatus::Tip { cf_filters }, checkpoint_hash);
+
+        let read_store = self.store.read().unwrap();
+        read_store.put(key, value.serialize())?;
+
+        Ok(value.0)
+    }
+}
diff --git a/src/blockchain/compact_filters/sync.rs b/src/blockchain/compact_filters/sync.rs
new file mode 100644 (file)
index 0000000..57b65f8
--- /dev/null
@@ -0,0 +1,282 @@
+use std::collections::{BTreeMap, HashMap, VecDeque};
+use std::sync::{Arc, Mutex};
+
+use bitcoin::hash_types::{BlockHash, FilterHash};
+use bitcoin::network::message::NetworkMessage;
+use bitcoin::network::message_blockdata::GetHeadersMessage;
+use bitcoin::util::bip158::BlockFilter;
+
+use super::peer::*;
+use super::store::*;
+use super::CompactFiltersError;
+
+pub(crate) const BURIED_CONFIRMATIONS: usize = 100;
+
+pub struct CFSync {
+    headers_store: Arc<HeadersStore<Full>>,
+    cf_store: Arc<CFStore>,
+    skip_blocks: usize,
+    bundles: Mutex<VecDeque<(BundleStatus, FilterHash, usize)>>,
+}
+
+impl CFSync {
+    pub fn new(
+        headers_store: Arc<HeadersStore<Full>>,
+        skip_blocks: usize,
+        filter_type: u8,
+    ) -> Result<Self, CompactFiltersError> {
+        let cf_store = Arc::new(CFStore::new(&headers_store, filter_type)?);
+
+        Ok(CFSync {
+            headers_store,
+            cf_store,
+            skip_blocks,
+            bundles: Mutex::new(VecDeque::new()),
+        })
+    }
+
+    pub fn pruned_bundles(&self) -> Result<usize, CompactFiltersError> {
+        Ok(self
+            .cf_store
+            .get_bundles()?
+            .into_iter()
+            .skip(self.skip_blocks / 1000)
+            .fold(0, |acc, (status, _)| match status {
+                BundleStatus::Pruned => acc + 1,
+                _ => acc,
+            }))
+    }
+
+    pub fn prepare_sync(&self, peer: Arc<Peer>) -> Result<(), CompactFiltersError> {
+        let mut bundles_lock = self.bundles.lock().unwrap();
+
+        let resp = peer.get_cf_checkpt(
+            self.cf_store.get_filter_type(),
+            self.headers_store.get_tip_hash()?.unwrap(),
+        )?;
+        self.cf_store.replace_checkpoints(resp.filter_headers)?;
+
+        bundles_lock.clear();
+        for (index, (status, checkpoint)) in self.cf_store.get_bundles()?.into_iter().enumerate() {
+            bundles_lock.push_back((status, checkpoint, index));
+        }
+
+        Ok(())
+    }
+
+    pub fn capture_thread_for_sync<F, Q>(
+        &self,
+        peer: Arc<Peer>,
+        process: F,
+        completed_bundle: Q,
+    ) -> Result<(), CompactFiltersError>
+    where
+        F: Fn(&BlockHash, &BlockFilter) -> Result<bool, CompactFiltersError>,
+        Q: Fn(usize) -> Result<(), crate::error::Error>,
+    {
+        let current_height = self.headers_store.get_height()?; // TODO: we should update it in case headers_store is also updated
+
+        loop {
+            let (mut status, checkpoint, index) = match self.bundles.lock().unwrap().pop_front() {
+                None => break,
+                Some(x) => x,
+            };
+
+            log::debug!(
+                "Processing bundle #{} - height {} to {}",
+                index,
+                index * 1000 + 1,
+                (index + 1) * 1000
+            );
+
+            let process_received_filters =
+                |expected_filters| -> Result<BTreeMap<usize, Vec<u8>>, CompactFiltersError> {
+                    let mut filters_map = BTreeMap::new();
+                    for _ in 0..expected_filters {
+                        let filter = peer.pop_cf_filter_resp()?;
+                        if filter.filter_type != self.cf_store.get_filter_type() {
+                            return Err(CompactFiltersError::InvalidResponse);
+                        }
+
+                        match self.headers_store.get_height_for(&filter.block_hash)? {
+                            Some(height) => filters_map.insert(height, filter.filter),
+                            None => return Err(CompactFiltersError::InvalidFilter),
+                        };
+                    }
+
+                    Ok(filters_map)
+                };
+
+            let start_height = index * 1000 + 1;
+            let mut already_processed = 0;
+
+            if start_height < self.skip_blocks {
+                status = self.cf_store.prune_filters(index, checkpoint)?;
+            }
+
+            let stop_height = std::cmp::min(current_height, start_height + 999);
+            let stop_hash = self.headers_store.get_block_hash(stop_height)?.unwrap();
+
+            if let BundleStatus::Init = status {
+                log::trace!("status: Init");
+
+                let resp = peer.get_cf_headers(0x00, start_height as u32, stop_hash)?;
+
+                assert!(resp.previous_filter == checkpoint);
+                status =
+                    self.cf_store
+                        .advance_to_cf_headers(index, checkpoint, resp.filter_hashes)?;
+            }
+            if let BundleStatus::Tip { cf_filters } = status {
+                log::trace!("status: Tip (beginning) ");
+
+                already_processed = cf_filters.len();
+                let headers_resp = peer.get_cf_headers(0x00, start_height as u32, stop_hash)?;
+
+                let cf_headers = match self.cf_store.advance_to_cf_headers(
+                    index,
+                    checkpoint,
+                    headers_resp.filter_hashes,
+                )? {
+                    BundleStatus::CFHeaders { cf_headers } => cf_headers,
+                    _ => return Err(CompactFiltersError::InvalidResponse),
+                };
+
+                peer.get_cf_filters(
+                    self.cf_store.get_filter_type(),
+                    (start_height + cf_filters.len()) as u32,
+                    stop_hash,
+                )?;
+                let expected_filters = stop_height - start_height + 1 - cf_filters.len();
+                let filters_map = process_received_filters(expected_filters)?;
+                let filters = cf_filters
+                    .into_iter()
+                    .enumerate()
+                    .chain(filters_map.into_iter())
+                    .collect();
+                status = self
+                    .cf_store
+                    .advance_to_cf_filters(index, checkpoint, cf_headers, filters)?;
+            }
+            if let BundleStatus::CFHeaders { cf_headers } = status {
+                log::trace!("status: CFHeaders");
+
+                peer.get_cf_filters(
+                    self.cf_store.get_filter_type(),
+                    start_height as u32,
+                    stop_hash,
+                )?;
+                let expected_filters = stop_height - start_height + 1;
+                let filters_map = process_received_filters(expected_filters)?;
+                status = self.cf_store.advance_to_cf_filters(
+                    index,
+                    checkpoint,
+                    cf_headers,
+                    filters_map.into_iter().collect(),
+                )?;
+            }
+            if let BundleStatus::CFilters { cf_filters } = status {
+                log::trace!("status: CFilters");
+
+                let last_sync_buried_height = (start_height + already_processed)
+                    .checked_sub(BURIED_CONFIRMATIONS)
+                    .unwrap_or(0);
+
+                for (filter_index, filter) in cf_filters.iter().enumerate() {
+                    let height = filter_index + start_height;
+
+                    // do not download blocks that were already "buried" since the last sync
+                    if height < last_sync_buried_height {
+                        continue;
+                    }
+
+                    let block_hash = self.headers_store.get_block_hash(height)?.unwrap();
+
+                    // TODO: also download random blocks?
+                    if process(&block_hash, &BlockFilter::new(&filter))? {
+                        log::debug!("Downloading block {}", block_hash);
+
+                        let block = peer
+                            .get_block(block_hash)?
+                            .ok_or(CompactFiltersError::MissingBlock)?;
+                        self.headers_store.save_full_block(&block, height)?;
+                    }
+                }
+
+                status = BundleStatus::Processed { cf_filters };
+            }
+            if let BundleStatus::Processed { cf_filters } = status {
+                log::trace!("status: Processed");
+
+                if current_height - stop_height > 1000 {
+                    status = self.cf_store.prune_filters(index, checkpoint)?;
+                } else {
+                    status = self.cf_store.mark_as_tip(index, cf_filters, checkpoint)?;
+                }
+
+                completed_bundle(index)?;
+            }
+            if let BundleStatus::Pruned = status {
+                log::trace!("status: Pruned");
+            }
+            if let BundleStatus::Tip { .. } = status {
+                log::trace!("status: Tip");
+            }
+        }
+
+        Ok(())
+    }
+}
+
+pub fn sync_headers<F>(
+    peer: Arc<Peer>,
+    store: Arc<HeadersStore<Full>>,
+    sync_fn: F,
+) -> Result<Option<HeadersStore<Snapshot>>, CompactFiltersError>
+where
+    F: Fn(usize) -> Result<(), crate::error::Error>,
+{
+    let locators = store.get_locators()?;
+    let locators_vec = locators.iter().map(|(hash, _)| hash).cloned().collect();
+    let locators_map: HashMap<_, _> = locators.into_iter().collect();
+
+    peer.send(NetworkMessage::GetHeaders(GetHeadersMessage::new(
+        locators_vec,
+        Default::default(),
+    )))?;
+    let (mut snapshot, mut last_hash) =
+        if let Some(NetworkMessage::Headers(headers)) = peer.recv("headers", None)? {
+            if headers.is_empty() {
+                return Ok(None);
+            }
+
+            match locators_map.get(&headers[0].prev_blockhash) {
+                None => return Err(CompactFiltersError::InvalidHeaders),
+                Some(from) => (
+                    store.start_snapshot(*from)?,
+                    headers[0].prev_blockhash.clone(),
+                ),
+            }
+        } else {
+            return Err(CompactFiltersError::InvalidResponse);
+        };
+
+    let mut sync_height = store.get_height()?;
+    while sync_height < peer.get_version().start_height as usize {
+        peer.send(NetworkMessage::GetHeaders(GetHeadersMessage::new(
+            vec![last_hash],
+            Default::default(),
+        )))?;
+        if let Some(NetworkMessage::Headers(headers)) = peer.recv("headers", None)? {
+            let batch_len = headers.len();
+            last_hash = snapshot.apply(sync_height, headers)?;
+
+            sync_height += batch_len;
+            sync_fn(sync_height)?;
+        } else {
+            return Err(CompactFiltersError::InvalidResponse);
+        }
+    }
+
+    Ok(Some(snapshot))
+}
index 44331e2c7406abbadfce41b1c639d43830abfc76..c4ed4cd4394df5a7c0b8f5caef018ab211d11572 100644 (file)
@@ -40,9 +40,13 @@ impl Blockchain for ElectrumBlockchain {
 
 impl OnlineBlockchain for ElectrumBlockchain {
     fn get_capabilities(&self) -> HashSet<Capability> {
-        vec![Capability::FullHistory, Capability::GetAnyTx]
-            .into_iter()
-            .collect()
+        vec![
+            Capability::FullHistory,
+            Capability::GetAnyTx,
+            Capability::AccurateFees,
+        ]
+        .into_iter()
+        .collect()
     }
 
     fn setup<D: BatchDatabase + DatabaseUtils, P: Progress>(
index 10eb5cb4fef200ab37628a0eb4257ddf3cd814f0..4214a0fa6d9ceacd82eba7ebf7f2c2d4b561d951 100644 (file)
@@ -59,9 +59,13 @@ impl Blockchain for EsploraBlockchain {
 #[maybe_async]
 impl OnlineBlockchain for EsploraBlockchain {
     fn get_capabilities(&self) -> HashSet<Capability> {
-        vec![Capability::FullHistory, Capability::GetAnyTx]
-            .into_iter()
-            .collect()
+        vec![
+            Capability::FullHistory,
+            Capability::GetAnyTx,
+            Capability::AccurateFees,
+        ]
+        .into_iter()
+        .collect()
     }
 
     fn setup<D: BatchDatabase + DatabaseUtils, P: Progress>(
index da267335da9e43127b6e36987ee974a38555b249..a38f38b5a742d4ef8543f8a28c10d1e605ad7cef 100644 (file)
@@ -19,10 +19,14 @@ pub mod esplora;
 #[cfg(feature = "esplora")]
 pub use self::esplora::EsploraBlockchain;
 
+#[cfg(feature = "compact_filters")]
+pub mod compact_filters;
+
 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
 pub enum Capability {
     FullHistory,
     GetAnyTx,
+    AccurateFees,
 }
 
 pub trait Blockchain {
@@ -46,13 +50,13 @@ impl Blockchain for OfflineBlockchain {
 pub trait OnlineBlockchain: Blockchain {
     fn get_capabilities(&self) -> HashSet<Capability>;
 
-    fn setup<D: BatchDatabase + DatabaseUtils, P: Progress>(
+    fn setup<D: BatchDatabase + DatabaseUtils, P: 'static + Progress>(
         &self,
         stop_gap: Option<usize>,
         database: &mut D,
         progress_update: P,
     ) -> Result<(), Error>;
-    fn sync<D: BatchDatabase + DatabaseUtils, P: Progress>(
+    fn sync<D: BatchDatabase + DatabaseUtils, P: 'static + Progress>(
         &self,
         stop_gap: Option<usize>,
         database: &mut D,
@@ -70,7 +74,7 @@ pub trait OnlineBlockchain: Blockchain {
 
 pub type ProgressData = (f32, Option<String>);
 
-pub trait Progress {
+pub trait Progress: Send {
     fn update(&self, progress: f32, message: Option<String>) -> Result<(), Error>;
 }
 
@@ -89,6 +93,7 @@ impl Progress for Sender<ProgressData> {
     }
 }
 
+#[derive(Clone)]
 pub struct NoopProgress;
 
 pub fn noop_progress() -> NoopProgress {
@@ -100,3 +105,18 @@ impl Progress for NoopProgress {
         Ok(())
     }
 }
+
+#[derive(Clone)]
+pub struct LogProgress;
+
+pub fn log_progress() -> LogProgress {
+    LogProgress
+}
+
+impl Progress for LogProgress {
+    fn update(&self, progress: f32, message: Option<String>) -> Result<(), Error> {
+        log::info!("Sync {:.3}%: `{}`", progress, message.unwrap_or("".into()));
+
+        Ok(())
+    }
+}
index 3da4d49e9134b0c2c2328b86ac9403e3fb2f610c..2b0c5210b7b85d8d2d4eda0a77f12dbc02a4c056 100644 (file)
@@ -11,6 +11,7 @@ use bitcoin::hashes::hex::{FromHex, ToHex};
 use bitcoin::util::psbt::PartiallySignedTransaction;
 use bitcoin::{Address, OutPoint, Txid};
 
+use crate::blockchain::log_progress;
 use crate::error::Error;
 use crate::types::ScriptType;
 use crate::{FeeRate, TxBuilder, Wallet};
@@ -344,7 +345,7 @@ where
     if let Some(_sub_matches) = matches.subcommand_matches("get_new_address") {
         Ok(Some(format!("{}", wallet.get_new_address()?)))
     } else if let Some(_sub_matches) = matches.subcommand_matches("sync") {
-        maybe_await!(wallet.sync(None))?;
+        maybe_await!(wallet.sync(log_progress(), None))?;
         Ok(None)
     } else if let Some(_sub_matches) = matches.subcommand_matches("list_unspent") {
         let mut res = String::new();
index f7641bddbb281a71e9d01e36c992768080b468c5..77cb92c38f400bbac74d2da3ad0ef93a8a1f8b64 100644 (file)
@@ -54,6 +54,8 @@ pub enum Error {
     Electrum(electrum_client::Error),
     #[cfg(feature = "esplora")]
     Esplora(crate::blockchain::esplora::EsploraError),
+    #[cfg(feature = "compact_filters")]
+    CompactFilters(crate::blockchain::compact_filters::CompactFiltersError),
     #[cfg(feature = "key-value-db")]
     Sled(sled::Error),
 }
@@ -87,3 +89,13 @@ impl_error!(electrum_client::Error, Electrum);
 impl_error!(crate::blockchain::esplora::EsploraError, Esplora);
 #[cfg(feature = "key-value-db")]
 impl_error!(sled::Error, Sled);
+
+#[cfg(feature = "compact_filters")]
+impl From<crate::blockchain::compact_filters::CompactFiltersError> for Error {
+    fn from(other: crate::blockchain::compact_filters::CompactFiltersError) -> Self {
+        match other {
+            crate::blockchain::compact_filters::CompactFiltersError::Global(e) => *e,
+            err @ _ => Error::CompactFilters(err),
+        }
+    }
+}
index 8143052d0f6a4cbbc42740893f592bdf158b5778..f727c10996fb74e46110a66dc9aa50d436cf4b3b 100644 (file)
@@ -11,7 +11,7 @@ extern crate async_trait;
 #[macro_use]
 extern crate magical_macros;
 
-#[cfg(test)]
+#[cfg(any(test, feature = "compact_filters"))]
 #[macro_use]
 extern crate lazy_static;
 
index edc803607de6f1a84f90c579fdd48d5478a58650..24c6e1fc819cb3fafc02ba77e09b74a19f3e4a0f 100644 (file)
@@ -27,7 +27,7 @@ pub mod utils;
 use tx_builder::TxBuilder;
 use utils::{FeeRate, IsDust};
 
-use crate::blockchain::{noop_progress, Blockchain, OfflineBlockchain, OnlineBlockchain};
+use crate::blockchain::{Blockchain, OfflineBlockchain, OnlineBlockchain, Progress};
 use crate::database::{BatchDatabase, BatchOperations, DatabaseUtils};
 use crate::descriptor::{get_checksum, DescriptorMeta, ExtendedDescriptor, ExtractPolicy, Policy};
 use crate::error::Error;
@@ -1015,7 +1015,11 @@ where
     }
 
     #[maybe_async]
-    pub fn sync(&self, max_address_param: Option<u32>) -> Result<(), Error> {
+    pub fn sync<P: 'static + Progress>(
+        &self,
+        progress_update: P,
+        max_address_param: Option<u32>,
+    ) -> Result<(), Error> {
         debug!("Begin sync...");
 
         let mut run_setup = false;
@@ -1057,13 +1061,13 @@ where
             maybe_await!(self.client.setup(
                 None,
                 self.database.borrow_mut().deref_mut(),
-                noop_progress(),
+                progress_update,
             ))
         } else {
             maybe_await!(self.client.sync(
                 None,
                 self.database.borrow_mut().deref_mut(),
-                noop_progress(),
+                progress_update,
             ))
         }
     }
index b70a37ec1029502131dfd90cb71be1e1ea1cd1af..0722c7cf04acd9e0def8723ae0fcf9fa39638a5b 100644 (file)
@@ -55,7 +55,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
 
                 use testutils::{TestClient, serial};
 
-                use #root_ident::blockchain::OnlineBlockchain;
+                use #root_ident::blockchain::{OnlineBlockchain, noop_progress};
                 use #root_ident::descriptor::ExtendedDescriptor;
                 use #root_ident::database::MemoryDatabase;
                 use #root_ident::types::ScriptType;
@@ -92,7 +92,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
                     };
                     let txid = test_client.receive(tx);
 
-                    wallet.sync(None).unwrap();
+                    wallet.sync(noop_progress(), None).unwrap();
 
                     assert_eq!(wallet.get_balance().unwrap(), 50_000);
                     assert_eq!(wallet.list_unspent().unwrap()[0].is_internal, false);
@@ -116,7 +116,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
                         @tx ( (@external descriptors, 25) => 50_000 )
                     });
 
-                    wallet.sync(None).unwrap();
+                    wallet.sync(noop_progress(), None).unwrap();
 
                     assert_eq!(wallet.get_balance().unwrap(), 100_000);
                     assert_eq!(wallet.list_transactions(false).unwrap().len(), 2);
@@ -127,14 +127,14 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
                 fn test_sync_before_and_after_receive() {
                     let (wallet, descriptors, mut test_client) = init_single_sig();
 
-                    wallet.sync(None).unwrap();
+                    wallet.sync(noop_progress(), None).unwrap();
                     assert_eq!(wallet.get_balance().unwrap(), 0);
 
                     test_client.receive(testutils! {
                         @tx ( (@external descriptors, 0) => 50_000 )
                     });
 
-                    wallet.sync(None).unwrap();
+                    wallet.sync(noop_progress(), None).unwrap();
 
                     assert_eq!(wallet.get_balance().unwrap(), 50_000);
                     assert_eq!(wallet.list_transactions(false).unwrap().len(), 1);
@@ -149,7 +149,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
                         @tx ( (@external descriptors, 0) => 50_000, (@external descriptors, 1) => 25_000, (@external descriptors, 5) => 30_000 )
                     });
 
-                    wallet.sync(None).unwrap();
+                    wallet.sync(noop_progress(), None).unwrap();
 
                     assert_eq!(wallet.get_balance().unwrap(), 105_000);
                     assert_eq!(wallet.list_transactions(false).unwrap().len(), 1);
@@ -174,7 +174,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
                         @tx ( (@external descriptors, 5) => 25_000 )
                     });
 
-                    wallet.sync(None).unwrap();
+                    wallet.sync(noop_progress(), None).unwrap();
 
                     assert_eq!(wallet.get_balance().unwrap(), 75_000);
                     assert_eq!(wallet.list_transactions(false).unwrap().len(), 2);
@@ -190,14 +190,14 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
                         @tx ( (@external descriptors, 0) => 50_000 )
                     });
 
-                    wallet.sync(None).unwrap();
+                    wallet.sync(noop_progress(), None).unwrap();
                     assert_eq!(wallet.get_balance().unwrap(), 50_000);
 
                     test_client.receive(testutils! {
                         @tx ( (@external descriptors, 0) => 25_000 )
                     });
 
-                    wallet.sync(None).unwrap();
+                    wallet.sync(noop_progress(), None).unwrap();
                     assert_eq!(wallet.get_balance().unwrap(), 75_000);
                 }
 
@@ -210,7 +210,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
                         @tx ( (@external descriptors, 0) => 50_000 ) ( @replaceable true )
                     });
 
-                    wallet.sync(None).unwrap();
+                    wallet.sync(noop_progress(), None).unwrap();
 
                     assert_eq!(wallet.get_balance().unwrap(), 50_000);
                     assert_eq!(wallet.list_transactions(false).unwrap().len(), 1);
@@ -224,7 +224,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
 
                     let new_txid = test_client.bump_fee(&txid);
 
-                    wallet.sync(None).unwrap();
+                    wallet.sync(noop_progress(), None).unwrap();
 
                     assert_eq!(wallet.get_balance().unwrap(), 50_000);
                     assert_eq!(wallet.list_transactions(false).unwrap().len(), 1);
@@ -246,7 +246,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
                         @tx ( (@external descriptors, 0) => 50_000 ) ( @confirmations 1 ) ( @replaceable true )
                     });
 
-                    wallet.sync(None).unwrap();
+                    wallet.sync(noop_progress(), None).unwrap();
 
                     assert_eq!(wallet.get_balance().unwrap(), 50_000);
                     assert_eq!(wallet.list_transactions(false).unwrap().len(), 1);
@@ -259,7 +259,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
                     // Invalidate 1 block
                     test_client.invalidate(1);
 
-                    wallet.sync(None).unwrap();
+                    wallet.sync(noop_progress(), None).unwrap();
 
                     assert_eq!(wallet.get_balance().unwrap(), 50_000);
 
@@ -278,7 +278,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
                         @tx ( (@external descriptors, 0) => 50_000 )
                     });
 
-                    wallet.sync(None).unwrap();
+                    wallet.sync(noop_progress(), None).unwrap();
                     assert_eq!(wallet.get_balance().unwrap(), 50_000);
 
                     let (psbt, details) = wallet.create_tx(TxBuilder::from_addressees(vec![(node_addr, 25_000)])).unwrap();
@@ -286,7 +286,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
                     assert!(finalized, "Cannot finalize transaction");
                     wallet.broadcast(psbt.extract_tx()).unwrap();
 
-                    wallet.sync(None).unwrap();
+                    wallet.sync(noop_progress(), None).unwrap();
                     assert_eq!(wallet.get_balance().unwrap(), details.received);
 
                     assert_eq!(wallet.list_transactions(false).unwrap().len(), 2);
@@ -303,7 +303,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
                         @tx ( (@external descriptors, 0) => 50_000 )
                     });
 
-                    wallet.sync(None).unwrap();
+                    wallet.sync(noop_progress(), None).unwrap();
                     assert_eq!(wallet.get_balance().unwrap(), 50_000);
 
                     let (psbt, details) = wallet.create_tx(TxBuilder::from_addressees(vec![(node_addr, 25_000)])).unwrap();
@@ -311,12 +311,12 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
                     assert!(finalized, "Cannot finalize transaction");
                     let sent_txid = wallet.broadcast(psbt.extract_tx()).unwrap();
 
-                    wallet.sync(None).unwrap();
+                    wallet.sync(noop_progress(), None).unwrap();
                     assert_eq!(wallet.get_balance().unwrap(), details.received);
 
                     // empty wallet
                     let wallet = get_wallet_from_descriptors(&descriptors);
-                    wallet.sync(None).unwrap();
+                    wallet.sync(noop_progress(), None).unwrap();
 
                     let tx_map = wallet.list_transactions(false).unwrap().into_iter().map(|tx| (tx.txid, tx)).collect::<std::collections::HashMap<_, _>>();
 
@@ -340,7 +340,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
                         @tx ( (@external descriptors, 0) => 50_000 )
                     });
 
-                    wallet.sync(None).unwrap();
+                    wallet.sync(noop_progress(), None).unwrap();
                     assert_eq!(wallet.get_balance().unwrap(), 50_000);
 
                     let mut total_sent = 0;
@@ -350,17 +350,17 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
                         assert!(finalized, "Cannot finalize transaction");
                         wallet.broadcast(psbt.extract_tx()).unwrap();
 
-                        wallet.sync(None).unwrap();
+                        wallet.sync(noop_progress(), None).unwrap();
 
                         total_sent += 5_000 + details.fees;
                     }
 
-                    wallet.sync(None).unwrap();
+                    wallet.sync(noop_progress(), None).unwrap();
                     assert_eq!(wallet.get_balance().unwrap(), 50_000 - total_sent);
 
                     // empty wallet
                     let wallet = get_wallet_from_descriptors(&descriptors);
-                    wallet.sync(None).unwrap();
+                    wallet.sync(noop_progress(), None).unwrap();
                     assert_eq!(wallet.get_balance().unwrap(), 50_000 - total_sent);
                 }
 
@@ -374,14 +374,14 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
                         @tx ( (@external descriptors, 0) => 50_000 ) (@confirmations 1)
                     });
 
-                    wallet.sync(None).unwrap();
+                    wallet.sync(noop_progress(), None).unwrap();
                     assert_eq!(wallet.get_balance().unwrap(), 50_000);
 
                     let (psbt, details) = wallet.create_tx(TxBuilder::from_addressees(vec![(node_addr.clone(), 5_000)]).enable_rbf()).unwrap();
                     let (psbt, finalized) = wallet.sign(psbt, None).unwrap();
                     assert!(finalized, "Cannot finalize transaction");
                     wallet.broadcast(psbt.extract_tx()).unwrap();
-                    wallet.sync(None).unwrap();
+                    wallet.sync(noop_progress(), None).unwrap();
                     assert_eq!(wallet.get_balance().unwrap(), 50_000 - details.fees - 5_000);
                     assert_eq!(wallet.get_balance().unwrap(), details.received);
 
@@ -389,7 +389,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
                     let (new_psbt, finalized) = wallet.sign(new_psbt, None).unwrap();
                     assert!(finalized, "Cannot finalize transaction");
                     wallet.broadcast(new_psbt.extract_tx()).unwrap();
-                    wallet.sync(None).unwrap();
+                    wallet.sync(noop_progress(), None).unwrap();
                     assert_eq!(wallet.get_balance().unwrap(), 50_000 - new_details.fees - 5_000);
                     assert_eq!(wallet.get_balance().unwrap(), new_details.received);
 
@@ -406,14 +406,14 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
                         @tx ( (@external descriptors, 0) => 50_000 ) (@confirmations 1)
                     });
 
-                    wallet.sync(None).unwrap();
+                    wallet.sync(noop_progress(), None).unwrap();
                     assert_eq!(wallet.get_balance().unwrap(), 50_000);
 
                     let (psbt, details) = wallet.create_tx(TxBuilder::from_addressees(vec![(node_addr.clone(), 49_000)]).enable_rbf()).unwrap();
                     let (psbt, finalized) = wallet.sign(psbt, None).unwrap();
                     assert!(finalized, "Cannot finalize transaction");
                     wallet.broadcast(psbt.extract_tx()).unwrap();
-                    wallet.sync(None).unwrap();
+                    wallet.sync(noop_progress(), None).unwrap();
                     assert_eq!(wallet.get_balance().unwrap(), 1_000 - details.fees);
                     assert_eq!(wallet.get_balance().unwrap(), details.received);
 
@@ -422,7 +422,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
                     let (new_psbt, finalized) = wallet.sign(new_psbt, None).unwrap();
                     assert!(finalized, "Cannot finalize transaction");
                     wallet.broadcast(new_psbt.extract_tx()).unwrap();
-                    wallet.sync(None).unwrap();
+                    wallet.sync(noop_progress(), None).unwrap();
                     assert_eq!(wallet.get_balance().unwrap(), 0);
                     assert_eq!(new_details.received, 0);
 
@@ -439,14 +439,14 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
                         @tx ( (@external descriptors, 0) => 50_000, (@external descriptors, 1) => 25_000 ) (@confirmations 1)
                     });
 
-                    wallet.sync(None).unwrap();
+                    wallet.sync(noop_progress(), None).unwrap();
                     assert_eq!(wallet.get_balance().unwrap(), 75_000);
 
                     let (psbt, details) = wallet.create_tx(TxBuilder::from_addressees(vec![(node_addr.clone(), 49_000)]).enable_rbf()).unwrap();
                     let (psbt, finalized) = wallet.sign(psbt, None).unwrap();
                     assert!(finalized, "Cannot finalize transaction");
                     wallet.broadcast(psbt.extract_tx()).unwrap();
-                    wallet.sync(None).unwrap();
+                    wallet.sync(noop_progress(), None).unwrap();
                     assert_eq!(wallet.get_balance().unwrap(), 26_000 - details.fees);
                     assert_eq!(details.received, 1_000 - details.fees);
 
@@ -455,7 +455,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
                     let (new_psbt, finalized) = wallet.sign(new_psbt, None).unwrap();
                     assert!(finalized, "Cannot finalize transaction");
                     wallet.broadcast(new_psbt.extract_tx()).unwrap();
-                    wallet.sync(None).unwrap();
+                    wallet.sync(noop_progress(), None).unwrap();
                     assert_eq!(new_details.sent, 75_000);
                     assert_eq!(wallet.get_balance().unwrap(), new_details.received);
                 }
@@ -470,14 +470,14 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
                         @tx ( (@external descriptors, 0) => 50_000, (@external descriptors, 1) => 25_000 ) (@confirmations 1)
                     });
 
-                    wallet.sync(None).unwrap();
+                    wallet.sync(noop_progress(), None).unwrap();
                     assert_eq!(wallet.get_balance().unwrap(), 75_000);
 
                     let (psbt, details) = wallet.create_tx(TxBuilder::from_addressees(vec![(node_addr.clone(), 49_000)]).enable_rbf()).unwrap();
                     let (psbt, finalized) = wallet.sign(psbt, None).unwrap();
                     assert!(finalized, "Cannot finalize transaction");
                     wallet.broadcast(psbt.extract_tx()).unwrap();
-                    wallet.sync(None).unwrap();
+                    wallet.sync(noop_progress(), None).unwrap();
                     assert_eq!(wallet.get_balance().unwrap(), 26_000 - details.fees);
                     assert_eq!(details.received, 1_000 - details.fees);
 
@@ -487,7 +487,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
                     let (new_psbt, finalized) = wallet.sign(new_psbt, None).unwrap();
                     assert!(finalized, "Cannot finalize transaction");
                     wallet.broadcast(new_psbt.extract_tx()).unwrap();
-                    wallet.sync(None).unwrap();
+                    wallet.sync(noop_progress(), None).unwrap();
                     assert_eq!(new_details.sent, 75_000);
                     assert_eq!(wallet.get_balance().unwrap(), 0);
                     assert_eq!(new_details.received, 0);