]> Untitled Git - bdk/commitdiff
[compact_filters] Use the new rust-bitcoin API
authorAlekos Filini <alekos.filini@gmail.com>
Wed, 3 Feb 2021 00:52:44 +0000 (19:52 -0500)
committerAlekos Filini <alekos.filini@gmail.com>
Fri, 5 Feb 2021 21:51:46 +0000 (16:51 -0500)
src/blockchain/compact_filters/mod.rs
src/blockchain/compact_filters/peer.rs
src/blockchain/compact_filters/store.rs
src/blockchain/compact_filters/sync.rs

index 99c7820e40f36a2a3362ba43acdf988b25acdea3..649c53dfa5316859e819e0e72e4b380c5e25cc79 100644 (file)
@@ -383,7 +383,12 @@ impl Blockchain for CompactFiltersBlockchain {
         }
         database.commit_batch(updates)?;
 
-        first_peer.ask_for_mempool()?;
+        match first_peer.ask_for_mempool() {
+            Err(CompactFiltersError::PeerBloomDisabled) => {
+                log::warn!("Peer has BLOOM disabled, we can't ask for the mempool")
+            }
+            e => e?,
+        };
 
         let mut internal_max_deriv = None;
         let mut external_max_deriv = None;
@@ -537,6 +542,8 @@ pub enum CompactFiltersError {
     NotConnected,
     /// A peer took too long to reply to one of our messages
     Timeout,
+    /// The peer doesn't advertise the [`BLOOM`](bitcoin::network::constants::ServiceFlags::BLOOM) service flag
+    PeerBloomDisabled,
 
     /// No peers have been specified
     NoPeers,
index b028954098e765c914af482c1efe7236138170dc..62fa2cb4a476d7addea4f278ff50f1007d04cc62 100644 (file)
@@ -34,7 +34,6 @@ use rand::{thread_rng, Rng};
 
 use bitcoin::consensus::Encodable;
 use bitcoin::hash_types::BlockHash;
-use bitcoin::hashes::Hash;
 use bitcoin::network::constants::ServiceFlags;
 use bitcoin::network::message::{NetworkMessage, RawNetworkMessage};
 use bitcoin::network::message_blockdata::*;
@@ -42,7 +41,7 @@ use bitcoin::network::message_filter::*;
 use bitcoin::network::message_network::VersionMessage;
 use bitcoin::network::stream_reader::StreamReader;
 use bitcoin::network::Address;
-use bitcoin::{Block, Network, Transaction, Txid};
+use bitcoin::{Block, Network, Transaction, Txid, Wtxid};
 
 use super::CompactFiltersError;
 
@@ -55,37 +54,71 @@ pub(crate) const TIMEOUT_SECS: u64 = 30;
 /// It is normally shared between [`Peer`]s with the use of [`Arc`], so that transactions are not
 /// duplicated in memory.
 #[derive(Debug, Default)]
-pub struct Mempool {
-    txs: RwLock<HashMap<Txid, Transaction>>,
+pub struct Mempool(RwLock<InnerMempool>);
+
+#[derive(Debug, Default)]
+struct InnerMempool {
+    txs: HashMap<Txid, Transaction>,
+    wtxids: HashMap<Wtxid, Txid>,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+enum TxIdentifier {
+    Wtxid(Wtxid),
+    Txid(Txid),
 }
 
 impl Mempool {
+    /// Create a new empty mempool
+    pub fn new() -> Self {
+        Self::default()
+    }
+
     /// Add a transaction to the mempool
     ///
     /// Note that this doesn't propagate the transaction to other
     /// peers. To do that, [`broadcast`](crate::blockchain::Blockchain::broadcast) should be used.
     pub fn add_tx(&self, tx: Transaction) {
-        self.txs.write().unwrap().insert(tx.txid(), tx);
+        let mut guard = self.0.write().unwrap();
+
+        guard.wtxids.insert(tx.wtxid(), tx.txid());
+        guard.txs.insert(tx.txid(), tx);
     }
 
     /// Look-up a transaction in the mempool given an [`Inventory`] request
     pub fn get_tx(&self, inventory: &Inventory) -> Option<Transaction> {
-        let txid = match inventory {
+        let identifer = match inventory {
             Inventory::Error | Inventory::Block(_) | Inventory::WitnessBlock(_) => return None,
-            Inventory::Transaction(txid) => *txid,
-            Inventory::WitnessTransaction(wtxid) => Txid::from_inner(wtxid.into_inner()),
+            Inventory::Transaction(txid) => TxIdentifier::Txid(*txid),
+            Inventory::WitnessTransaction(txid) => TxIdentifier::Txid(*txid),
+            Inventory::WTx(wtxid) => TxIdentifier::Wtxid(*wtxid),
+            Inventory::Unknown { inv_type, hash } => {
+                log::warn!(
+                    "Unknown inventory request type `{}`, hash `{:?}`",
+                    inv_type,
+                    hash
+                );
+                return None;
+            }
+        };
+
+        let txid = match identifer {
+            TxIdentifier::Txid(txid) => Some(txid),
+            TxIdentifier::Wtxid(wtxid) => self.0.read().unwrap().wtxids.get(&wtxid).cloned(),
         };
-        self.txs.read().unwrap().get(&txid).cloned()
+
+        txid.map(|txid| self.0.read().unwrap().txs.get(&txid).cloned())
+            .flatten()
     }
 
     /// Return whether or not the mempool contains a transaction with a given txid
     pub fn has_tx(&self, txid: &Txid) -> bool {
-        self.txs.read().unwrap().contains_key(txid)
+        self.0.read().unwrap().txs.contains_key(txid)
     }
 
     /// Return the list of transactions contained in the mempool
     pub fn iter_txs(&self) -> Vec<Transaction> {
-        self.txs.read().unwrap().values().cloned().collect()
+        self.0.read().unwrap().txs.values().cloned().collect()
     }
 }
 
@@ -508,6 +541,10 @@ impl InvPeer for Peer {
     }
 
     fn ask_for_mempool(&self) -> Result<(), CompactFiltersError> {
+        if !self.version.services.has(ServiceFlags::BLOOM) {
+            return Err(CompactFiltersError::PeerBloomDisabled);
+        }
+
         self.send(NetworkMessage::MemPool)?;
         let inv = match self.recv("inv", Some(Duration::from_secs(5)))? {
             None => return Ok(()), // empty mempool
index f552fde31c3b7be5a103d507b543d02cc25b9b94..5e278f24917b5387dd03a583d3d5f35b5b714393 100644 (file)
@@ -36,9 +36,9 @@ use rand::{thread_rng, Rng};
 use rocksdb::{Direction, IteratorMode, ReadOptions, WriteBatch, DB};
 
 use bitcoin::consensus::{deserialize, encode::VarInt, serialize, Decodable, Encodable};
-use bitcoin::hash_types::FilterHash;
+use bitcoin::hash_types::{FilterHash, FilterHeader};
 use bitcoin::hashes::hex::FromHex;
-use bitcoin::hashes::{sha256d, Hash};
+use bitcoin::hashes::Hash;
 use bitcoin::util::bip158::BlockFilter;
 use bitcoin::util::uint::Uint256;
 use bitcoin::Block;
@@ -52,6 +52,7 @@ lazy_static! {
     static ref MAINNET_GENESIS: Block = deserialize(&Vec::<u8>::from_hex("0100000000000000000000000000000000000000000000000000000000000000000000003BA3EDFD7A7B12B27AC72C3E67768F617FC81BC3888A51323A9FB8AA4B1E5E4A29AB5F49FFFF001D1DAC2B7C0101000000010000000000000000000000000000000000000000000000000000000000000000FFFFFFFF4D04FFFF001D0104455468652054696D65732030332F4A616E2F32303039204368616E63656C6C6F72206F6E206272696E6B206F66207365636F6E64206261696C6F757420666F722062616E6B73FFFFFFFF0100F2052A01000000434104678AFDB0FE5548271967F1A67130B7105CD6A828E03909A67962E0EA1F61DEB649F6BC3F4CEF38C4F35504E51EC112DE5C384DF7BA0B8D578A4C702B6BF11D5FAC00000000").unwrap()).unwrap();
     static ref TESTNET_GENESIS: Block = deserialize(&Vec::<u8>::from_hex("0100000000000000000000000000000000000000000000000000000000000000000000003BA3EDFD7A7B12B27AC72C3E67768F617FC81BC3888A51323A9FB8AA4B1E5E4ADAE5494DFFFF001D1AA4AE180101000000010000000000000000000000000000000000000000000000000000000000000000FFFFFFFF4D04FFFF001D0104455468652054696D65732030332F4A616E2F32303039204368616E63656C6C6F72206F6E206272696E6B206F66207365636F6E64206261696C6F757420666F722062616E6B73FFFFFFFF0100F2052A01000000434104678AFDB0FE5548271967F1A67130B7105CD6A828E03909A67962E0EA1F61DEB649F6BC3F4CEF38C4F35504E51EC112DE5C384DF7BA0B8D578A4C702B6BF11D5FAC00000000").unwrap()).unwrap();
     static ref REGTEST_GENESIS: Block = deserialize(&Vec::<u8>::from_hex("0100000000000000000000000000000000000000000000000000000000000000000000003BA3EDFD7A7B12B27AC72C3E67768F617FC81BC3888A51323A9FB8AA4B1E5E4ADAE5494DFFFF7F20020000000101000000010000000000000000000000000000000000000000000000000000000000000000FFFFFFFF4D04FFFF001D0104455468652054696D65732030332F4A616E2F32303039204368616E63656C6C6F72206F6E206272696E6B206F66207365636F6E64206261696C6F757420666F722062616E6B73FFFFFFFF0100F2052A01000000434104678AFDB0FE5548271967F1A67130B7105CD6A828E03909A67962E0EA1F61DEB649F6BC3F4CEF38C4F35504E51EC112DE5C384DF7BA0B8D578A4C702B6BF11D5FAC00000000").unwrap()).unwrap();
+    static ref SIGNET_GENESIS: Block = deserialize(&Vec::<u8>::from_hex("0100000000000000000000000000000000000000000000000000000000000000000000003BA3EDFD7A7B12B27AC72C3E67768F617FC81BC3888A51323A9FB8AA4B1E5E4A008F4D5FAE77031E8AD222030101000000010000000000000000000000000000000000000000000000000000000000000000FFFFFFFF4D04FFFF001D0104455468652054696D65732030332F4A616E2F32303039204368616E63656C6C6F72206F6E206272696E6B206F66207365636F6E64206261696C6F757420666F722062616E6B73FFFFFFFF0100F2052A01000000434104678AFDB0FE5548271967F1A67130B7105CD6A828E03909A67962E0EA1F61DEB649F6BC3F4CEF38C4F35504E51EC112DE5C384DF7BA0B8D578A4C702B6BF11D5FAC00000000").unwrap()).unwrap();
 }
 
 pub trait StoreType: Default + fmt::Debug {}
@@ -122,34 +123,8 @@ where
     }
 }
 
-impl Encodable for FilterHeader {
-    fn consensus_encode<W: Write>(
-        &self,
-        mut e: W,
-    ) -> Result<usize, bitcoin::consensus::encode::Error> {
-        let mut written = self.prev_header_hash.consensus_encode(&mut e)?;
-        written += self.filter_hash.consensus_encode(&mut e)?;
-        Ok(written)
-    }
-}
-
-impl Decodable for FilterHeader {
-    fn consensus_decode<D: Read>(mut d: D) -> Result<Self, bitcoin::consensus::encode::Error> {
-        let prev_header_hash = FilterHeaderHash::consensus_decode(&mut d)?;
-        let filter_hash = FilterHash::consensus_decode(&mut d)?;
-
-        Ok(FilterHeader {
-            prev_header_hash,
-            filter_hash,
-        })
-    }
-}
-
 impl Encodable for BundleStatus {
-    fn consensus_encode<W: Write>(
-        &self,
-        mut e: W,
-    ) -> Result<usize, bitcoin::consensus::encode::Error> {
+    fn consensus_encode<W: Write>(&self, mut e: W) -> Result<usize, std::io::Error> {
         let mut written = 0;
 
         match self {
@@ -264,6 +239,7 @@ impl ChainStore<Full> {
             Network::Bitcoin => MAINNET_GENESIS.deref(),
             Network::Testnet => TESTNET_GENESIS.deref(),
             Network::Regtest => REGTEST_GENESIS.deref(),
+            Network::Signet => SIGNET_GENESIS.deref(),
         };
 
         let cf_name = "default".to_string();
@@ -658,22 +634,6 @@ impl<T: StoreType> fmt::Debug for ChainStore<T> {
     }
 }
 
-pub type FilterHeaderHash = FilterHash;
-
-#[derive(Debug, Clone)]
-pub struct FilterHeader {
-    prev_header_hash: FilterHeaderHash,
-    filter_hash: FilterHash,
-}
-
-impl FilterHeader {
-    fn header_hash(&self) -> FilterHeaderHash {
-        let mut hash_data = self.filter_hash.into_inner().to_vec();
-        hash_data.extend_from_slice(&self.prev_header_hash);
-        sha256d::Hash::hash(&hash_data).into()
-    }
-}
-
 pub enum BundleStatus {
     Init,
     CFHeaders { cf_headers: Vec<FilterHeader> },
@@ -688,7 +648,7 @@ pub struct CFStore {
     filter_type: u8,
 }
 
-type BundleEntry = (BundleStatus, FilterHeaderHash);
+type BundleEntry = (BundleStatus, FilterHeader);
 
 impl CFStore {
     pub fn new(
@@ -704,6 +664,7 @@ impl CFStore {
             Network::Bitcoin => MAINNET_GENESIS.deref(),
             Network::Testnet => TESTNET_GENESIS.deref(),
             Network::Regtest => REGTEST_GENESIS.deref(),
+            Network::Signet => SIGNET_GENESIS.deref(),
         };
 
         let filter = BlockFilter::new_script_filter(genesis, |utxo| {
@@ -717,7 +678,11 @@ impl CFStore {
             if read_store.get_pinned(&first_key)?.is_none() {
                 read_store.put(
                     &first_key,
-                    (BundleStatus::Init, filter.filter_id(&FilterHash::default())).serialize(),
+                    (
+                        BundleStatus::Init,
+                        filter.filter_header(&FilterHeader::from_hash(Default::default())),
+                    )
+                        .serialize(),
                 )?;
             }
         }
@@ -743,7 +708,7 @@ impl CFStore {
             .collect::<Result<_, _>>()
     }
 
-    pub fn get_checkpoints(&self) -> Result<Vec<FilterHash>, CompactFiltersError> {
+    pub fn get_checkpoints(&self) -> Result<Vec<FilterHeader>, CompactFiltersError> {
         let read_store = self.store.read().unwrap();
 
         let prefix = StoreEntry::CFilterTable((self.filter_type, None)).get_key();
@@ -760,7 +725,7 @@ impl CFStore {
 
     pub fn replace_checkpoints(
         &self,
-        checkpoints: Vec<FilterHash>,
+        checkpoints: Vec<FilterHeader>,
     ) -> Result<(), CompactFiltersError> {
         let current_checkpoints = self.get_checkpoints()?;
 
@@ -802,20 +767,16 @@ impl CFStore {
     pub fn advance_to_cf_headers(
         &self,
         bundle: usize,
-        checkpoint_hash: FilterHeaderHash,
-        filter_headers: Vec<FilterHash>,
+        checkpoint: FilterHeader,
+        filter_hashes: Vec<FilterHash>,
     ) -> Result<BundleStatus, CompactFiltersError> {
-        let mut last_hash = checkpoint_hash;
-        let cf_headers = filter_headers
+        let cf_headers: Vec<FilterHeader> = filter_hashes
             .into_iter()
-            .map(|filter_hash| {
-                let filter_header = FilterHeader {
-                    prev_header_hash: last_hash,
-                    filter_hash,
-                };
-                last_hash = filter_header.header_hash();
-
-                filter_header
+            .scan(checkpoint, |prev_header, filter_hash| {
+                let filter_header = filter_hash.filter_header(&prev_header);
+                *prev_header = filter_header;
+
+                Some(filter_header)
             })
             .collect();
 
@@ -828,13 +789,13 @@ impl CFStore {
             .transpose()?
         {
             // check connection with the next bundle if present
-            if last_hash != next_checkpoint {
+            if cf_headers.iter().last() != Some(&next_checkpoint) {
                 return Err(CompactFiltersError::InvalidFilterHeader);
             }
         }
 
         let key = StoreEntry::CFilterTable((self.filter_type, Some(bundle))).get_key();
-        let value = (BundleStatus::CFHeaders { cf_headers }, checkpoint_hash);
+        let value = (BundleStatus::CFHeaders { cf_headers }, checkpoint);
 
         read_store.put(key, value.serialize())?;
 
@@ -844,24 +805,26 @@ impl CFStore {
     pub fn advance_to_cf_filters(
         &self,
         bundle: usize,
-        checkpoint_hash: FilterHeaderHash,
+        checkpoint: FilterHeader,
         headers: Vec<FilterHeader>,
         filters: Vec<(usize, Vec<u8>)>,
     ) -> Result<BundleStatus, CompactFiltersError> {
         let cf_filters = filters
             .into_iter()
-            .zip(headers.iter())
-            .map(|((_, filter_content), header)| {
-                if header.filter_hash != sha256d::Hash::hash(&filter_content).into() {
-                    return Err(CompactFiltersError::InvalidFilter);
+            .zip(headers.into_iter())
+            .scan(checkpoint, |prev_header, ((_, filter_content), header)| {
+                let filter = BlockFilter::new(&filter_content);
+                if header != filter.filter_header(&prev_header) {
+                    return Some(Err(CompactFiltersError::InvalidFilter));
                 }
+                *prev_header = header;
 
-                Ok::<_, CompactFiltersError>(filter_content)
+                Some(Ok::<_, CompactFiltersError>(filter_content))
             })
             .collect::<Result<_, _>>()?;
 
         let key = StoreEntry::CFilterTable((self.filter_type, Some(bundle))).get_key();
-        let value = (BundleStatus::CFilters { cf_filters }, checkpoint_hash);
+        let value = (BundleStatus::CFilters { cf_filters }, checkpoint);
 
         let read_store = self.store.read().unwrap();
         read_store.put(key, value.serialize())?;
@@ -872,10 +835,10 @@ impl CFStore {
     pub fn prune_filters(
         &self,
         bundle: usize,
-        checkpoint_hash: FilterHeaderHash,
+        checkpoint: FilterHeader,
     ) -> Result<BundleStatus, CompactFiltersError> {
         let key = StoreEntry::CFilterTable((self.filter_type, Some(bundle))).get_key();
-        let value = (BundleStatus::Pruned, checkpoint_hash);
+        let value = (BundleStatus::Pruned, checkpoint);
 
         let read_store = self.store.read().unwrap();
         read_store.put(key, value.serialize())?;
@@ -887,10 +850,10 @@ impl CFStore {
         &self,
         bundle: usize,
         cf_filters: Vec<Vec<u8>>,
-        checkpoint_hash: FilterHeaderHash,
+        checkpoint: FilterHeader,
     ) -> Result<BundleStatus, CompactFiltersError> {
         let key = StoreEntry::CFilterTable((self.filter_type, Some(bundle))).get_key();
-        let value = (BundleStatus::Tip { cf_filters }, checkpoint_hash);
+        let value = (BundleStatus::Tip { cf_filters }, checkpoint);
 
         let read_store = self.store.read().unwrap();
         read_store.put(key, value.serialize())?;
index a935fc1fc16823e0f04a2ef7b663305c7441581d..5b8a3aaac64501ea9f41959e2f594f5b9a1e68da 100644 (file)
@@ -26,7 +26,7 @@ use std::collections::{BTreeMap, HashMap, VecDeque};
 use std::sync::{Arc, Mutex};
 use std::time::Duration;
 
-use bitcoin::hash_types::{BlockHash, FilterHash};
+use bitcoin::hash_types::{BlockHash, FilterHeader};
 use bitcoin::network::message::NetworkMessage;
 use bitcoin::network::message_blockdata::GetHeadersMessage;
 use bitcoin::util::bip158::BlockFilter;
@@ -42,7 +42,7 @@ pub struct CFSync {
     headers_store: Arc<ChainStore<Full>>,
     cf_store: Arc<CFStore>,
     skip_blocks: usize,
-    bundles: Mutex<VecDeque<(BundleStatus, FilterHash, usize)>>,
+    bundles: Mutex<VecDeque<(BundleStatus, FilterHeader, usize)>>,
 }
 
 impl CFSync {
@@ -148,7 +148,7 @@ impl CFSync {
 
                 let resp = peer.get_cf_headers(0x00, start_height as u32, stop_hash)?;
 
-                assert!(resp.previous_filter == checkpoint);
+                assert!(resp.previous_filter_header == checkpoint);
                 status =
                     self.cf_store
                         .advance_to_cf_headers(index, checkpoint, resp.filter_hashes)?;