use bitcoin::consensus::Encodable;
use bitcoin::hash_types::BlockHash;
-use bitcoin::hashes::Hash;
use bitcoin::network::constants::ServiceFlags;
use bitcoin::network::message::{NetworkMessage, RawNetworkMessage};
use bitcoin::network::message_blockdata::*;
use bitcoin::network::message_network::VersionMessage;
use bitcoin::network::stream_reader::StreamReader;
use bitcoin::network::Address;
-use bitcoin::{Block, Network, Transaction, Txid};
+use bitcoin::{Block, Network, Transaction, Txid, Wtxid};
use super::CompactFiltersError;
/// It is normally shared between [`Peer`]s with the use of [`Arc`], so that transactions are not
/// duplicated in memory.
#[derive(Debug, Default)]
-pub struct Mempool {
- txs: RwLock<HashMap<Txid, Transaction>>,
+pub struct Mempool(RwLock<InnerMempool>);
+
+#[derive(Debug, Default)]
+struct InnerMempool {
+ txs: HashMap<Txid, Transaction>,
+ wtxids: HashMap<Wtxid, Txid>,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+enum TxIdentifier {
+ Wtxid(Wtxid),
+ Txid(Txid),
}
impl Mempool {
+ /// Create a new empty mempool
+ pub fn new() -> Self {
+ Self::default()
+ }
+
/// Add a transaction to the mempool
///
/// Note that this doesn't propagate the transaction to other
/// peers. To do that, [`broadcast`](crate::blockchain::Blockchain::broadcast) should be used.
pub fn add_tx(&self, tx: Transaction) {
- self.txs.write().unwrap().insert(tx.txid(), tx);
+ let mut guard = self.0.write().unwrap();
+
+ guard.wtxids.insert(tx.wtxid(), tx.txid());
+ guard.txs.insert(tx.txid(), tx);
}
/// Look-up a transaction in the mempool given an [`Inventory`] request
pub fn get_tx(&self, inventory: &Inventory) -> Option<Transaction> {
- let txid = match inventory {
+ let identifer = match inventory {
Inventory::Error | Inventory::Block(_) | Inventory::WitnessBlock(_) => return None,
- Inventory::Transaction(txid) => *txid,
- Inventory::WitnessTransaction(wtxid) => Txid::from_inner(wtxid.into_inner()),
+ Inventory::Transaction(txid) => TxIdentifier::Txid(*txid),
+ Inventory::WitnessTransaction(txid) => TxIdentifier::Txid(*txid),
+ Inventory::WTx(wtxid) => TxIdentifier::Wtxid(*wtxid),
+ Inventory::Unknown { inv_type, hash } => {
+ log::warn!(
+ "Unknown inventory request type `{}`, hash `{:?}`",
+ inv_type,
+ hash
+ );
+ return None;
+ }
+ };
+
+ let txid = match identifer {
+ TxIdentifier::Txid(txid) => Some(txid),
+ TxIdentifier::Wtxid(wtxid) => self.0.read().unwrap().wtxids.get(&wtxid).cloned(),
};
- self.txs.read().unwrap().get(&txid).cloned()
+
+ txid.map(|txid| self.0.read().unwrap().txs.get(&txid).cloned())
+ .flatten()
}
/// Return whether or not the mempool contains a transaction with a given txid
pub fn has_tx(&self, txid: &Txid) -> bool {
- self.txs.read().unwrap().contains_key(txid)
+ self.0.read().unwrap().txs.contains_key(txid)
}
/// Return the list of transactions contained in the mempool
pub fn iter_txs(&self) -> Vec<Transaction> {
- self.txs.read().unwrap().values().cloned().collect()
+ self.0.read().unwrap().txs.values().cloned().collect()
}
}
}
fn ask_for_mempool(&self) -> Result<(), CompactFiltersError> {
+ if !self.version.services.has(ServiceFlags::BLOOM) {
+ return Err(CompactFiltersError::PeerBloomDisabled);
+ }
+
self.send(NetworkMessage::MemPool)?;
let inv = match self.recv("inv", Some(Duration::from_secs(5)))? {
None => return Ok(()), // empty mempool
use rocksdb::{Direction, IteratorMode, ReadOptions, WriteBatch, DB};
use bitcoin::consensus::{deserialize, encode::VarInt, serialize, Decodable, Encodable};
-use bitcoin::hash_types::FilterHash;
+use bitcoin::hash_types::{FilterHash, FilterHeader};
use bitcoin::hashes::hex::FromHex;
-use bitcoin::hashes::{sha256d, Hash};
+use bitcoin::hashes::Hash;
use bitcoin::util::bip158::BlockFilter;
use bitcoin::util::uint::Uint256;
use bitcoin::Block;
static ref MAINNET_GENESIS: Block = deserialize(&Vec::<u8>::from_hex("0100000000000000000000000000000000000000000000000000000000000000000000003BA3EDFD7A7B12B27AC72C3E67768F617FC81BC3888A51323A9FB8AA4B1E5E4A29AB5F49FFFF001D1DAC2B7C0101000000010000000000000000000000000000000000000000000000000000000000000000FFFFFFFF4D04FFFF001D0104455468652054696D65732030332F4A616E2F32303039204368616E63656C6C6F72206F6E206272696E6B206F66207365636F6E64206261696C6F757420666F722062616E6B73FFFFFFFF0100F2052A01000000434104678AFDB0FE5548271967F1A67130B7105CD6A828E03909A67962E0EA1F61DEB649F6BC3F4CEF38C4F35504E51EC112DE5C384DF7BA0B8D578A4C702B6BF11D5FAC00000000").unwrap()).unwrap();
static ref TESTNET_GENESIS: Block = deserialize(&Vec::<u8>::from_hex("0100000000000000000000000000000000000000000000000000000000000000000000003BA3EDFD7A7B12B27AC72C3E67768F617FC81BC3888A51323A9FB8AA4B1E5E4ADAE5494DFFFF001D1AA4AE180101000000010000000000000000000000000000000000000000000000000000000000000000FFFFFFFF4D04FFFF001D0104455468652054696D65732030332F4A616E2F32303039204368616E63656C6C6F72206F6E206272696E6B206F66207365636F6E64206261696C6F757420666F722062616E6B73FFFFFFFF0100F2052A01000000434104678AFDB0FE5548271967F1A67130B7105CD6A828E03909A67962E0EA1F61DEB649F6BC3F4CEF38C4F35504E51EC112DE5C384DF7BA0B8D578A4C702B6BF11D5FAC00000000").unwrap()).unwrap();
static ref REGTEST_GENESIS: Block = deserialize(&Vec::<u8>::from_hex("0100000000000000000000000000000000000000000000000000000000000000000000003BA3EDFD7A7B12B27AC72C3E67768F617FC81BC3888A51323A9FB8AA4B1E5E4ADAE5494DFFFF7F20020000000101000000010000000000000000000000000000000000000000000000000000000000000000FFFFFFFF4D04FFFF001D0104455468652054696D65732030332F4A616E2F32303039204368616E63656C6C6F72206F6E206272696E6B206F66207365636F6E64206261696C6F757420666F722062616E6B73FFFFFFFF0100F2052A01000000434104678AFDB0FE5548271967F1A67130B7105CD6A828E03909A67962E0EA1F61DEB649F6BC3F4CEF38C4F35504E51EC112DE5C384DF7BA0B8D578A4C702B6BF11D5FAC00000000").unwrap()).unwrap();
+ static ref SIGNET_GENESIS: Block = deserialize(&Vec::<u8>::from_hex("0100000000000000000000000000000000000000000000000000000000000000000000003BA3EDFD7A7B12B27AC72C3E67768F617FC81BC3888A51323A9FB8AA4B1E5E4A008F4D5FAE77031E8AD222030101000000010000000000000000000000000000000000000000000000000000000000000000FFFFFFFF4D04FFFF001D0104455468652054696D65732030332F4A616E2F32303039204368616E63656C6C6F72206F6E206272696E6B206F66207365636F6E64206261696C6F757420666F722062616E6B73FFFFFFFF0100F2052A01000000434104678AFDB0FE5548271967F1A67130B7105CD6A828E03909A67962E0EA1F61DEB649F6BC3F4CEF38C4F35504E51EC112DE5C384DF7BA0B8D578A4C702B6BF11D5FAC00000000").unwrap()).unwrap();
}
pub trait StoreType: Default + fmt::Debug {}
}
}
-impl Encodable for FilterHeader {
- fn consensus_encode<W: Write>(
- &self,
- mut e: W,
- ) -> Result<usize, bitcoin::consensus::encode::Error> {
- let mut written = self.prev_header_hash.consensus_encode(&mut e)?;
- written += self.filter_hash.consensus_encode(&mut e)?;
- Ok(written)
- }
-}
-
-impl Decodable for FilterHeader {
- fn consensus_decode<D: Read>(mut d: D) -> Result<Self, bitcoin::consensus::encode::Error> {
- let prev_header_hash = FilterHeaderHash::consensus_decode(&mut d)?;
- let filter_hash = FilterHash::consensus_decode(&mut d)?;
-
- Ok(FilterHeader {
- prev_header_hash,
- filter_hash,
- })
- }
-}
-
impl Encodable for BundleStatus {
- fn consensus_encode<W: Write>(
- &self,
- mut e: W,
- ) -> Result<usize, bitcoin::consensus::encode::Error> {
+ fn consensus_encode<W: Write>(&self, mut e: W) -> Result<usize, std::io::Error> {
let mut written = 0;
match self {
Network::Bitcoin => MAINNET_GENESIS.deref(),
Network::Testnet => TESTNET_GENESIS.deref(),
Network::Regtest => REGTEST_GENESIS.deref(),
+ Network::Signet => SIGNET_GENESIS.deref(),
};
let cf_name = "default".to_string();
}
}
-pub type FilterHeaderHash = FilterHash;
-
-#[derive(Debug, Clone)]
-pub struct FilterHeader {
- prev_header_hash: FilterHeaderHash,
- filter_hash: FilterHash,
-}
-
-impl FilterHeader {
- fn header_hash(&self) -> FilterHeaderHash {
- let mut hash_data = self.filter_hash.into_inner().to_vec();
- hash_data.extend_from_slice(&self.prev_header_hash);
- sha256d::Hash::hash(&hash_data).into()
- }
-}
-
pub enum BundleStatus {
Init,
CFHeaders { cf_headers: Vec<FilterHeader> },
filter_type: u8,
}
-type BundleEntry = (BundleStatus, FilterHeaderHash);
+type BundleEntry = (BundleStatus, FilterHeader);
impl CFStore {
pub fn new(
Network::Bitcoin => MAINNET_GENESIS.deref(),
Network::Testnet => TESTNET_GENESIS.deref(),
Network::Regtest => REGTEST_GENESIS.deref(),
+ Network::Signet => SIGNET_GENESIS.deref(),
};
let filter = BlockFilter::new_script_filter(genesis, |utxo| {
if read_store.get_pinned(&first_key)?.is_none() {
read_store.put(
&first_key,
- (BundleStatus::Init, filter.filter_id(&FilterHash::default())).serialize(),
+ (
+ BundleStatus::Init,
+ filter.filter_header(&FilterHeader::from_hash(Default::default())),
+ )
+ .serialize(),
)?;
}
}
.collect::<Result<_, _>>()
}
- pub fn get_checkpoints(&self) -> Result<Vec<FilterHash>, CompactFiltersError> {
+ pub fn get_checkpoints(&self) -> Result<Vec<FilterHeader>, CompactFiltersError> {
let read_store = self.store.read().unwrap();
let prefix = StoreEntry::CFilterTable((self.filter_type, None)).get_key();
pub fn replace_checkpoints(
&self,
- checkpoints: Vec<FilterHash>,
+ checkpoints: Vec<FilterHeader>,
) -> Result<(), CompactFiltersError> {
let current_checkpoints = self.get_checkpoints()?;
pub fn advance_to_cf_headers(
&self,
bundle: usize,
- checkpoint_hash: FilterHeaderHash,
- filter_headers: Vec<FilterHash>,
+ checkpoint: FilterHeader,
+ filter_hashes: Vec<FilterHash>,
) -> Result<BundleStatus, CompactFiltersError> {
- let mut last_hash = checkpoint_hash;
- let cf_headers = filter_headers
+ let cf_headers: Vec<FilterHeader> = filter_hashes
.into_iter()
- .map(|filter_hash| {
- let filter_header = FilterHeader {
- prev_header_hash: last_hash,
- filter_hash,
- };
- last_hash = filter_header.header_hash();
-
- filter_header
+ .scan(checkpoint, |prev_header, filter_hash| {
+ let filter_header = filter_hash.filter_header(&prev_header);
+ *prev_header = filter_header;
+
+ Some(filter_header)
})
.collect();
.transpose()?
{
// check connection with the next bundle if present
- if last_hash != next_checkpoint {
+ if cf_headers.iter().last() != Some(&next_checkpoint) {
return Err(CompactFiltersError::InvalidFilterHeader);
}
}
let key = StoreEntry::CFilterTable((self.filter_type, Some(bundle))).get_key();
- let value = (BundleStatus::CFHeaders { cf_headers }, checkpoint_hash);
+ let value = (BundleStatus::CFHeaders { cf_headers }, checkpoint);
read_store.put(key, value.serialize())?;
pub fn advance_to_cf_filters(
&self,
bundle: usize,
- checkpoint_hash: FilterHeaderHash,
+ checkpoint: FilterHeader,
headers: Vec<FilterHeader>,
filters: Vec<(usize, Vec<u8>)>,
) -> Result<BundleStatus, CompactFiltersError> {
let cf_filters = filters
.into_iter()
- .zip(headers.iter())
- .map(|((_, filter_content), header)| {
- if header.filter_hash != sha256d::Hash::hash(&filter_content).into() {
- return Err(CompactFiltersError::InvalidFilter);
+ .zip(headers.into_iter())
+ .scan(checkpoint, |prev_header, ((_, filter_content), header)| {
+ let filter = BlockFilter::new(&filter_content);
+ if header != filter.filter_header(&prev_header) {
+ return Some(Err(CompactFiltersError::InvalidFilter));
}
+ *prev_header = header;
- Ok::<_, CompactFiltersError>(filter_content)
+ Some(Ok::<_, CompactFiltersError>(filter_content))
})
.collect::<Result<_, _>>()?;
let key = StoreEntry::CFilterTable((self.filter_type, Some(bundle))).get_key();
- let value = (BundleStatus::CFilters { cf_filters }, checkpoint_hash);
+ let value = (BundleStatus::CFilters { cf_filters }, checkpoint);
let read_store = self.store.read().unwrap();
read_store.put(key, value.serialize())?;
pub fn prune_filters(
&self,
bundle: usize,
- checkpoint_hash: FilterHeaderHash,
+ checkpoint: FilterHeader,
) -> Result<BundleStatus, CompactFiltersError> {
let key = StoreEntry::CFilterTable((self.filter_type, Some(bundle))).get_key();
- let value = (BundleStatus::Pruned, checkpoint_hash);
+ let value = (BundleStatus::Pruned, checkpoint);
let read_store = self.store.read().unwrap();
read_store.put(key, value.serialize())?;
&self,
bundle: usize,
cf_filters: Vec<Vec<u8>>,
- checkpoint_hash: FilterHeaderHash,
+ checkpoint: FilterHeader,
) -> Result<BundleStatus, CompactFiltersError> {
let key = StoreEntry::CFilterTable((self.filter_type, Some(bundle))).get_key();
- let value = (BundleStatus::Tip { cf_filters }, checkpoint_hash);
+ let value = (BundleStatus::Tip { cf_filters }, checkpoint);
let read_store = self.store.read().unwrap();
read_store.put(key, value.serialize())?;