base64 = { version = "^0.11", optional = true }
async-trait = { version = "0.1", optional = true }
rocksdb = { version = "0.14", optional = true }
+socks = { version = "0.3", optional = true }
lazy_static = { version = "1.4", optional = true }
# Platform-specific dependencies
default = ["key-value-db", "electrum"]
electrum = ["electrum-client"]
esplora = ["reqwest", "futures"]
-compact_filters = ["rocksdb", "lazy_static"]
+compact_filters = ["rocksdb", "socks", "lazy_static"]
key-value-db = ["sled"]
cli-utils = ["clap", "base64"]
async-interface = ["async-trait"]
use std::collections::HashSet;
-use std::fmt;
-use std::net::ToSocketAddrs;
use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use log::{debug, error, info, trace};
use bitcoin::network::message_blockdata::Inventory;
-use bitcoin::{BitcoinHash, Network, OutPoint, Transaction, Txid};
+use bitcoin::{BitcoinHash, OutPoint, Transaction, Txid};
use rocksdb::{Options, SliceTransform, DB};
use store::*;
use sync::*;
+pub use peer::{Mempool, Peer};
+
const SYNC_HEADERS_COST: f32 = 1.0;
const SYNC_FILTERS_COST: f32 = 11.6 * 1_000.0;
const PROCESS_BLOCKS_COST: f32 = 20_000.0;
pub struct CompactFiltersBlockchain(Option<CompactFilters>);
impl CompactFiltersBlockchain {
- pub fn new<A: ToSocketAddrs, P: AsRef<Path>>(
- address: A,
+ pub fn new<P: AsRef<Path>>(
+ peers: Vec<Peer>,
storage_dir: P,
- num_threads: usize,
- skip_blocks: usize,
- network: Network,
+ skip_blocks: Option<usize>,
) -> Result<Self, CompactFiltersError> {
Ok(CompactFiltersBlockchain(Some(CompactFilters::new(
- address,
+ peers,
storage_dir,
- num_threads,
skip_blocks,
- network,
)?)))
}
}
+#[derive(Debug)]
struct CompactFilters {
- peer: Arc<Peer>,
- headers: Arc<HeadersStore<Full>>,
- skip_blocks: usize,
- num_threads: usize,
+ peers: Vec<Arc<Peer>>,
+ headers: Arc<ChainStore<Full>>,
+ skip_blocks: Option<usize>,
}
impl CompactFilters {
- pub fn new<A: ToSocketAddrs, P: AsRef<Path>>(
- address: A,
+ pub fn new<P: AsRef<Path>>(
+ peers: Vec<Peer>,
storage_dir: P,
- num_threads: usize,
- skip_blocks: usize,
- network: Network,
+ skip_blocks: Option<usize>,
) -> Result<Self, CompactFiltersError> {
+ if peers.is_empty() {
+ return Err(CompactFiltersError::NoPeers);
+ }
+
let mut opts = Options::default();
opts.create_if_missing(true);
opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(16));
+ let network = peers[0].get_network();
+
let cfs = DB::list_cf(&opts, &storage_dir).unwrap_or(vec!["default".to_string()]);
let db = DB::open_cf(&opts, &storage_dir, &cfs)?;
- let headers = Arc::new(HeadersStore::new(db, network)?);
+ let headers = Arc::new(ChainStore::new(db, network)?);
// try to recover partial snapshots
for cf_name in &cfs {
}
Ok(CompactFilters {
- peer: Arc::new(Peer::new(address, Arc::new(Mempool::default()), network)?),
- num_threads,
+ peers: peers.into_iter().map(Arc::new).collect(),
headers,
skip_blocks,
})
progress_update: P,
) -> Result<(), Error> {
let inner = self.0.as_ref().ok_or(Error::OfflineClient)?;
+ let first_peer = &inner.peers[0];
- let cf_sync = Arc::new(CFSync::new(
- Arc::clone(&inner.headers),
- inner.skip_blocks,
- 0x00,
- )?);
+ let skip_blocks = inner.skip_blocks.unwrap_or(0);
+
+ let cf_sync = Arc::new(CFSync::new(Arc::clone(&inner.headers), skip_blocks, 0x00)?);
let initial_height = inner.headers.get_height()?;
- let total_bundles = (inner.peer.get_version().start_height as usize)
- .checked_sub(inner.skip_blocks)
+ let total_bundles = (first_peer.get_version().start_height as usize)
+ .checked_sub(skip_blocks)
.map(|x| x / 1000)
.unwrap_or(0)
+ 1;
.checked_sub(cf_sync.pruned_bundles()?)
.unwrap_or(0);
- let headers_cost = (inner.peer.get_version().start_height as usize)
+ let headers_cost = (first_peer.get_version().start_height as usize)
.checked_sub(initial_height)
.unwrap_or(0) as f32
* SYNC_HEADERS_COST;
let total_cost = headers_cost + filters_cost + PROCESS_BLOCKS_COST;
if let Some(snapshot) = sync::sync_headers(
- Arc::clone(&inner.peer),
+ Arc::clone(&first_peer),
Arc::clone(&inner.headers),
|new_height| {
let local_headers_cost =
.unwrap_or(0);
info!("Synced headers to height: {}", synced_height);
- cf_sync.prepare_sync(Arc::clone(&inner.peer))?;
+ cf_sync.prepare_sync(Arc::clone(&first_peer))?;
let all_scripts = Arc::new(
database
let synced_bundles = Arc::new(AtomicUsize::new(0));
let progress_update = Arc::new(Mutex::new(progress_update));
- let mut threads = Vec::with_capacity(inner.num_threads);
- for _ in 0..inner.num_threads {
+ let mut threads = Vec::with_capacity(inner.peers.len());
+ for peer in &inner.peers {
let cf_sync = Arc::clone(&cf_sync);
- let peer = Arc::new(inner.peer.new_connection()?);
+ let peer = Arc::clone(&peer);
let headers = Arc::clone(&inner.headers);
let all_scripts = Arc::clone(&all_scripts);
let last_synced_block = Arc::clone(&last_synced_block);
}
database.commit_batch(updates)?;
- inner.peer.ask_for_mempool()?;
+ first_peer.ask_for_mempool()?;
let mut internal_max_deriv = 0;
let mut external_max_deriv = 0;
)?;
}
}
- for tx in inner.peer.get_mempool().iter_txs().iter() {
+ for tx in first_peer.get_mempool().iter_txs().iter() {
inner.process_tx(
database,
tx,
fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
let inner = self.0.as_ref().ok_or(Error::OfflineClient)?;
- Ok(inner
- .peer
+ Ok(inner.peers[0]
.get_mempool()
.get_tx(&Inventory::Transaction(*txid)))
}
fn broadcast(&self, tx: &Transaction) -> Result<(), Error> {
let inner = self.0.as_ref().ok_or(Error::OfflineClient)?;
- inner.peer.broadcast_tx(tx.clone())?;
+ inner.peers[0].broadcast_tx(tx.clone())?;
Ok(())
}
}
}
-impl fmt::Debug for CompactFilters {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("CompactFilters")
- .field("peer", &self.peer)
- .finish()
- }
-}
-
#[derive(Debug)]
pub enum CompactFiltersError {
InvalidResponse,
InvalidFilter,
MissingBlock,
DataCorruption,
+
+ NotConnected,
Timeout,
+ NoPeers,
+
DB(rocksdb::Error),
IO(std::io::Error),
BIP158(bitcoin::util::bip158::Error),
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
+use socks::{Socks5Stream, ToTargetAddr};
+
use rand::{thread_rng, Rng};
use bitcoin::consensus::Encodable;
type ResponsesMap = HashMap<&'static str, Arc<(Mutex<Vec<NetworkMessage>>, Condvar)>>;
+pub(crate) const TIMEOUT_SECS: u64 = 10;
+
#[derive(Debug, Default)]
pub struct Mempool {
txs: RwLock<HashMap<Txid, Transaction>>,
mempool: Arc<Mempool>,
network: Network,
) -> Result<Self, CompactFiltersError> {
- let connection = TcpStream::connect(address)?;
+ let stream = TcpStream::connect(address)?;
+
+ Peer::from_stream(stream, mempool, network)
+ }
+
+ pub fn new_proxy<T: ToTargetAddr, P: ToSocketAddrs>(
+ target: T,
+ proxy: P,
+ credentials: Option<(&str, &str)>,
+ mempool: Arc<Mempool>,
+ network: Network,
+ ) -> Result<Self, CompactFiltersError> {
+ let socks_stream = if let Some((username, password)) = credentials {
+ Socks5Stream::connect_with_password(proxy, target, username, password)?
+ } else {
+ Socks5Stream::connect(proxy, target)?
+ };
- let writer = Arc::new(Mutex::new(connection.try_clone()?));
+ Peer::from_stream(socks_stream.into_inner(), mempool, network)
+ }
+
+ fn from_stream(
+ stream: TcpStream,
+ mempool: Arc<Mempool>,
+ network: Network,
+ ) -> Result<Self, CompactFiltersError> {
+ let writer = Arc::new(Mutex::new(stream.try_clone()?));
let responses: Arc<RwLock<ResponsesMap>> = Arc::new(RwLock::new(HashMap::new()));
let connected = Arc::new(RwLock::new(true));
let reader_thread = thread::spawn(move || {
Self::reader_thread(
network,
- connection,
+ stream,
reader_thread_responses,
reader_thread_writer,
reader_thread_mempool,
})
}
- pub fn new_connection(&self) -> Result<Self, CompactFiltersError> {
- let socket_addr = self.writer.lock().unwrap().peer_addr()?;
- Self::new(socket_addr, Arc::clone(&self.mempool), self.network)
- }
-
fn _send(
writer: &mut TcpStream,
magic: u32,
&self.version
}
+ pub fn get_network(&self) -> Network {
+ self.network
+ }
+
pub fn get_mempool(&self) -> Arc<Mempool> {
Arc::clone(&self.mempool)
}
}))?;
let response = self
- .recv("cfcheckpt", Some(Duration::from_secs(10)))?
+ .recv("cfcheckpt", Some(Duration::from_secs(TIMEOUT_SECS)))?
.ok_or(CompactFiltersError::Timeout)?;
let response = match response {
NetworkMessage::CFCheckpt(response) => response,
}))?;
let response = self
- .recv("cfheaders", Some(Duration::from_secs(10)))?
+ .recv("cfheaders", Some(Duration::from_secs(TIMEOUT_SECS)))?
.ok_or(CompactFiltersError::Timeout)?;
let response = match response {
NetworkMessage::CFHeaders(response) => response,
fn pop_cf_filter_resp(&self) -> Result<CFilter, CompactFiltersError> {
let response = self
- .recv("cfilter", Some(Duration::from_secs(10)))?
+ .recv("cfilter", Some(Duration::from_secs(TIMEOUT_SECS)))?
.ok_or(CompactFiltersError::Timeout)?;
let response = match response {
NetworkMessage::CFilter(response) => response,
block_hash,
)]))?;
- match self.recv("block", Some(Duration::from_secs(10)))? {
+ match self.recv("block", Some(Duration::from_secs(TIMEOUT_SECS)))? {
None => Ok(None),
Some(NetworkMessage::Block(response)) => Ok(Some(response)),
_ => Err(CompactFiltersError::InvalidResponse),
for _ in 0..num_txs {
let tx = self
- .recv("tx", Some(Duration::from_secs(10)))?
+ .recv("tx", Some(Duration::from_secs(TIMEOUT_SECS)))?
.ok_or(CompactFiltersError::Timeout)?;
let tx = match tx {
NetworkMessage::Tx(tx) => tx,
use std::convert::TryInto;
+use std::fmt;
use std::io::{Read, Write};
use std::marker::PhantomData;
use std::ops::Deref;
static ref REGTEST_GENESIS: Block = deserialize(&Vec::<u8>::from_hex("0100000000000000000000000000000000000000000000000000000000000000000000003BA3EDFD7A7B12B27AC72C3E67768F617FC81BC3888A51323A9FB8AA4B1E5E4ADAE5494DFFFF7F20020000000101000000010000000000000000000000000000000000000000000000000000000000000000FFFFFFFF4D04FFFF001D0104455468652054696D65732030332F4A616E2F32303039204368616E63656C6C6F72206F6E206272696E6B206F66207365636F6E64206261696C6F757420666F722062616E6B73FFFFFFFF0100F2052A01000000434104678AFDB0FE5548271967F1A67130B7105CD6A828E03909A67962E0EA1F61DEB649F6BC3F4CEF38C4F35504E51EC112DE5C384DF7BA0B8D578A4C702B6BF11D5FAC00000000").unwrap()).unwrap();
}
-pub trait StoreType: Default {}
+pub trait StoreType: Default + fmt::Debug {}
-#[derive(Default)]
+#[derive(Default, Debug)]
pub struct Full;
impl StoreType for Full {}
-#[derive(Default)]
+#[derive(Default, Debug)]
pub struct Snapshot;
impl StoreType for Snapshot {}
}
}
-pub struct HeadersStore<T: StoreType> {
+pub struct ChainStore<T: StoreType> {
store: Arc<RwLock<DB>>,
cf_name: String,
min_height: usize,
phantom: PhantomData<T>,
}
-impl HeadersStore<Full> {
+impl ChainStore<Full> {
pub fn new(store: DB, network: Network) -> Result<Self, CompactFiltersError> {
let genesis = match network {
Network::Bitcoin => MAINNET_GENESIS.deref(),
store.write(batch)?;
}
- Ok(HeadersStore {
+ Ok(ChainStore {
store: Arc::new(RwLock::new(store)),
cf_name,
min_height: 0,
Ok(answer)
}
- pub fn start_snapshot(
- &self,
- from: usize,
- ) -> Result<HeadersStore<Snapshot>, CompactFiltersError> {
+ pub fn start_snapshot(&self, from: usize) -> Result<ChainStore<Snapshot>, CompactFiltersError> {
let new_cf_name: String = thread_rng().sample_iter(&Alphanumeric).take(16).collect();
let new_cf_name = format!("_headers:{}", new_cf_name);
write_store.write(batch)?;
let store = Arc::clone(&self.store);
- Ok(HeadersStore {
+ Ok(ChainStore {
store,
cf_name: new_cf_name,
min_height: from,
std::mem::drop(iterator);
std::mem::drop(write_store);
- let snapshot = HeadersStore {
+ let snapshot = ChainStore {
store: Arc::clone(&self.store),
cf_name: cf_name.into(),
min_height,
pub fn apply_snapshot(
&self,
- snaphost: HeadersStore<Snapshot>,
+ snaphost: ChainStore<Snapshot>,
) -> Result<(), CompactFiltersError> {
let mut batch = WriteBatch::default();
}
}
-impl<T: StoreType> HeadersStore<T> {
+impl<T: StoreType> ChainStore<T> {
pub fn work(&self) -> Result<Uint256, CompactFiltersError> {
let read_store = self.store.read().unwrap();
let cf_handle = read_store.cf_handle(&self.cf_name).unwrap();
}
}
+impl<T: StoreType> fmt::Debug for ChainStore<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct(&format!("ChainStore<{:?}>", T::default()))
+ .field("cf_name", &self.cf_name)
+ .field("min_height", &self.min_height)
+ .field("network", &self.network)
+ .field("headers_height", &self.get_height())
+ .field("tip_hash", &self.get_tip_hash())
+ .finish()
+ }
+}
+
pub type FilterHeaderHash = FilterHash;
#[derive(Debug, Clone)]
impl CFStore {
pub fn new(
- headers_store: &HeadersStore<Full>,
+ headers_store: &ChainStore<Full>,
filter_type: u8,
) -> Result<Self, CompactFiltersError> {
let cf_store = CFStore {
use std::collections::{BTreeMap, HashMap, VecDeque};
use std::sync::{Arc, Mutex};
+use std::time::Duration;
use bitcoin::hash_types::{BlockHash, FilterHash};
use bitcoin::network::message::NetworkMessage;
use super::peer::*;
use super::store::*;
use super::CompactFiltersError;
+use crate::error::Error;
pub(crate) const BURIED_CONFIRMATIONS: usize = 100;
pub struct CFSync {
- headers_store: Arc<HeadersStore<Full>>,
+ headers_store: Arc<ChainStore<Full>>,
cf_store: Arc<CFStore>,
skip_blocks: usize,
bundles: Mutex<VecDeque<(BundleStatus, FilterHash, usize)>>,
impl CFSync {
pub fn new(
- headers_store: Arc<HeadersStore<Full>>,
+ headers_store: Arc<ChainStore<Full>>,
skip_blocks: usize,
filter_type: u8,
) -> Result<Self, CompactFiltersError> {
) -> Result<(), CompactFiltersError>
where
F: Fn(&BlockHash, &BlockFilter) -> Result<bool, CompactFiltersError>,
- Q: Fn(usize) -> Result<(), crate::error::Error>,
+ Q: Fn(usize) -> Result<(), Error>,
{
let current_height = self.headers_store.get_height()?; // TODO: we should update it in case headers_store is also updated
pub fn sync_headers<F>(
peer: Arc<Peer>,
- store: Arc<HeadersStore<Full>>,
+ store: Arc<ChainStore<Full>>,
sync_fn: F,
-) -> Result<Option<HeadersStore<Snapshot>>, CompactFiltersError>
+) -> Result<Option<ChainStore<Snapshot>>, CompactFiltersError>
where
- F: Fn(usize) -> Result<(), crate::error::Error>,
+ F: Fn(usize) -> Result<(), Error>,
{
let locators = store.get_locators()?;
let locators_vec = locators.iter().map(|(hash, _)| hash).cloned().collect();
locators_vec,
Default::default(),
)))?;
- let (mut snapshot, mut last_hash) =
- if let Some(NetworkMessage::Headers(headers)) = peer.recv("headers", None)? {
- if headers.is_empty() {
- return Ok(None);
- }
+ let (mut snapshot, mut last_hash) = if let NetworkMessage::Headers(headers) = peer
+ .recv("headers", Some(Duration::from_secs(TIMEOUT_SECS)))?
+ .ok_or(CompactFiltersError::Timeout)?
+ {
+ if headers.is_empty() {
+ return Ok(None);
+ }
- match locators_map.get(&headers[0].prev_blockhash) {
- None => return Err(CompactFiltersError::InvalidHeaders),
- Some(from) => (
- store.start_snapshot(*from)?,
- headers[0].prev_blockhash.clone(),
- ),
- }
- } else {
- return Err(CompactFiltersError::InvalidResponse);
- };
+ match locators_map.get(&headers[0].prev_blockhash) {
+ None => return Err(CompactFiltersError::InvalidHeaders),
+ Some(from) => (
+ store.start_snapshot(*from)?,
+ headers[0].prev_blockhash.clone(),
+ ),
+ }
+ } else {
+ return Err(CompactFiltersError::InvalidResponse);
+ };
let mut sync_height = store.get_height()?;
while sync_height < peer.get_version().start_height as usize {
vec![last_hash],
Default::default(),
)))?;
- if let Some(NetworkMessage::Headers(headers)) = peer.recv("headers", None)? {
+ if let NetworkMessage::Headers(headers) = peer
+ .recv("headers", Some(Duration::from_secs(TIMEOUT_SECS)))?
+ .ok_or(CompactFiltersError::Timeout)?
+ {
let batch_len = headers.len();
last_hash = snapshot.apply(sync_height, headers)?;