]> Untitled Git - bdk/commitdiff
Add a generalized "Blockchain" interface
authorAlekos Filini <alekos.filini@gmail.com>
Sun, 3 May 2020 14:15:11 +0000 (16:15 +0200)
committerAlekos Filini <alekos.filini@gmail.com>
Sun, 3 May 2020 16:16:11 +0000 (18:16 +0200)
Cargo.toml
examples/repl.rs
src/blockchain/electrum.rs [new file with mode: 0644]
src/blockchain/mod.rs [new file with mode: 0644]
src/database/memory.rs
src/database/mod.rs
src/error.rs
src/lib.rs
src/wallet/mod.rs
src/wallet/offline_stream.rs

index 0edd8c32bf852be71c21ce374c69da79c8235bbb..a3aa967e459241b8d7a6f7b72adc4920b3873b1f 100644 (file)
@@ -18,7 +18,7 @@ electrum-client = { version = "0.1.0-beta.5", optional = true }
 [features]
 minimal = []
 compiler = ["miniscript/compiler"]
-default = ["sled", "electrum-client"]
+default = ["key-value-db", "electrum"]
 electrum = ["electrum-client"]
 key-value-db = ["sled"]
 
index 82c41f308703c566c9ac5448ac8d936f938d9129..66598514d37f6f1caaf59dbb26bda71d2de0d27f 100644 (file)
@@ -23,6 +23,7 @@ use bitcoin::util::psbt::PartiallySignedTransaction;
 use bitcoin::{Address, Network, OutPoint};
 
 use magical_bitcoin_wallet::bitcoin;
+use magical_bitcoin_wallet::blockchain::ElectrumBlockchain;
 use magical_bitcoin_wallet::sled;
 use magical_bitcoin_wallet::types::ScriptType;
 use magical_bitcoin_wallet::{Client, Wallet};
@@ -255,7 +256,14 @@ fn main() {
     debug!("database opened successfully");
 
     let client = Client::new(matches.value_of("server").unwrap()).unwrap();
-    let wallet = Wallet::new(descriptor, change_descriptor, network, tree, client).unwrap();
+    let wallet = Wallet::new(
+        descriptor,
+        change_descriptor,
+        network,
+        tree,
+        ElectrumBlockchain::from(client),
+    )
+    .unwrap();
 
     // TODO: print errors in a nice way
     let handle_matches = |matches: ArgMatches<'_>| {
diff --git a/src/blockchain/electrum.rs b/src/blockchain/electrum.rs
new file mode 100644 (file)
index 0000000..592953e
--- /dev/null
@@ -0,0 +1,333 @@
+use std::cmp;
+use std::collections::{HashSet, VecDeque};
+use std::convert::TryFrom;
+use std::io::{Read, Write};
+
+#[allow(unused_imports)]
+use log::{debug, error, info, trace};
+
+use bitcoin::{Address, Network, OutPoint, Script, Transaction, Txid};
+
+use electrum_client::types::*;
+use electrum_client::Client;
+
+use super::*;
+use crate::database::{BatchDatabase, BatchOperations, DatabaseUtils};
+use crate::error::Error;
+use crate::types::{ScriptType, TransactionDetails, UTXO};
+use crate::wallet::utils::ChunksIterator;
+
+pub struct ElectrumBlockchain<T: Read + Write>(Option<Client<T>>);
+
+impl<T: Read + Write> std::convert::From<Client<T>> for ElectrumBlockchain<T> {
+    fn from(client: Client<T>) -> Self {
+        ElectrumBlockchain(Some(client))
+    }
+}
+
+impl<T: Read + Write> Blockchain for ElectrumBlockchain<T> {
+    fn offline() -> Self {
+        ElectrumBlockchain(None)
+    }
+}
+
+impl<T: Read + Write> OnlineBlockchain for ElectrumBlockchain<T> {
+    fn get_capabilities(&self) -> HashSet<Capability> {
+        vec![Capability::FullHistory, Capability::GetAnyTx]
+            .into_iter()
+            .collect()
+    }
+
+    fn setup<D: BatchDatabase + DatabaseUtils, P: Progress>(
+        &mut self,
+        stop_gap: Option<usize>,
+        database: &mut D,
+        _progress_update: P,
+    ) -> Result<(), Error> {
+        // TODO: progress
+
+        let stop_gap = stop_gap.unwrap_or(20);
+        let batch_query_size = 20;
+
+        // check unconfirmed tx, delete so they are retrieved later
+        let mut del_batch = database.begin_batch();
+        for tx in database.iter_txs(false)? {
+            if tx.height.is_none() {
+                del_batch.del_tx(&tx.txid, false)?;
+            }
+        }
+        database.commit_batch(del_batch)?;
+
+        // maximum derivation index for a change address that we've seen during sync
+        let mut change_max_deriv = 0;
+
+        let mut already_checked: HashSet<Script> = HashSet::new();
+        let mut to_check_later = VecDeque::with_capacity(batch_query_size);
+
+        // insert the first chunk
+        let mut iter_scriptpubkeys = database
+            .iter_script_pubkeys(Some(ScriptType::External))?
+            .into_iter();
+        let chunk: Vec<Script> = iter_scriptpubkeys.by_ref().take(batch_query_size).collect();
+        for item in chunk.into_iter().rev() {
+            to_check_later.push_front(item);
+        }
+
+        let mut iterating_external = true;
+        let mut index = 0;
+        let mut last_found = 0;
+        while !to_check_later.is_empty() {
+            trace!("to_check_later size {}", to_check_later.len());
+
+            let until = cmp::min(to_check_later.len(), batch_query_size);
+            let chunk: Vec<Script> = to_check_later.drain(..until).collect();
+            let call_result = self
+                .0
+                .as_mut()
+                .unwrap()
+                .batch_script_get_history(chunk.iter())?;
+
+            for (script, history) in chunk.into_iter().zip(call_result.into_iter()) {
+                trace!("received history for {:?}, size {}", script, history.len());
+
+                if !history.is_empty() {
+                    last_found = index;
+
+                    let mut check_later_scripts = self
+                        .check_history(database, script, history, &mut change_max_deriv)?
+                        .into_iter()
+                        .filter(|x| already_checked.insert(x.clone()))
+                        .collect();
+                    to_check_later.append(&mut check_later_scripts);
+                }
+
+                index += 1;
+            }
+
+            match iterating_external {
+                true if index - last_found >= stop_gap => iterating_external = false,
+                true => {
+                    trace!("pushing one more batch from `iter_scriptpubkeys`. index = {}, last_found = {}, stop_gap = {}", index, last_found, stop_gap);
+
+                    let chunk: Vec<Script> =
+                        iter_scriptpubkeys.by_ref().take(batch_query_size).collect();
+                    for item in chunk.into_iter().rev() {
+                        to_check_later.push_front(item);
+                    }
+                }
+                _ => {}
+            }
+        }
+
+        // check utxo
+        // TODO: try to minimize network requests and re-use scripts if possible
+        let mut batch = database.begin_batch();
+        for chunk in ChunksIterator::new(database.iter_utxos()?.into_iter(), batch_query_size) {
+            let scripts: Vec<_> = chunk.iter().map(|u| &u.txout.script_pubkey).collect();
+            let call_result = self
+                .0
+                .as_mut()
+                .unwrap()
+                .batch_script_list_unspent(scripts)?;
+
+            // check which utxos are actually still unspent
+            for (utxo, list_unspent) in chunk.into_iter().zip(call_result.iter()) {
+                debug!(
+                    "outpoint {:?} is unspent for me, list unspent is {:?}",
+                    utxo.outpoint, list_unspent
+                );
+
+                let mut spent = true;
+                for unspent in list_unspent {
+                    let res_outpoint = OutPoint::new(unspent.tx_hash, unspent.tx_pos as u32);
+                    if utxo.outpoint == res_outpoint {
+                        spent = false;
+                        break;
+                    }
+                }
+                if spent {
+                    info!("{} not anymore unspent, removing", utxo.outpoint);
+                    batch.del_utxo(&utxo.outpoint)?;
+                }
+            }
+        }
+
+        let current_ext = database.get_last_index(ScriptType::External)?.unwrap_or(0);
+        let first_ext_new = last_found as u32 + 1;
+        if first_ext_new > current_ext {
+            info!("Setting external index to {}", first_ext_new);
+            database.set_last_index(ScriptType::External, first_ext_new)?;
+        }
+
+        let current_int = database.get_last_index(ScriptType::Internal)?.unwrap_or(0);
+        let first_int_new = change_max_deriv + 1;
+        if first_int_new > current_int {
+            info!("Setting internal index to {}", first_int_new);
+            database.set_last_index(ScriptType::Internal, first_int_new)?;
+        }
+
+        database.commit_batch(batch)?;
+
+        Ok(())
+    }
+
+    fn get_tx(&mut self, txid: &Txid) -> Result<Option<Transaction>, Error> {
+        Ok(self
+            .0
+            .as_mut()
+            .unwrap()
+            .transaction_get(txid)
+            .map(Option::Some)?)
+    }
+
+    fn broadcast(&mut self, tx: &Transaction) -> Result<(), Error> {
+        Ok(self
+            .0
+            .as_mut()
+            .unwrap()
+            .transaction_broadcast(tx)
+            .map(|_| ())?)
+    }
+
+    fn get_height(&mut self) -> Result<usize, Error> {
+        Ok(self
+            .0
+            .as_mut()
+            .unwrap()
+            .block_headers_subscribe()
+            .map(|data| data.height)?)
+    }
+}
+
+impl<T: Read + Write> ElectrumBlockchain<T> {
+    fn check_tx_and_descendant<D: DatabaseUtils + BatchDatabase>(
+        &mut self,
+        database: &mut D,
+        txid: &Txid,
+        height: Option<u32>,
+        cur_script: &Script,
+        change_max_deriv: &mut u32,
+    ) -> Result<Vec<Script>, Error> {
+        debug!(
+            "check_tx_and_descendant of {}, height: {:?}, script: {}",
+            txid, height, cur_script
+        );
+        let mut updates = database.begin_batch();
+        let tx = match database.get_tx(&txid, true)? {
+            // TODO: do we need the raw?
+            Some(mut saved_tx) => {
+                // update the height if it's different (in case of reorg)
+                if saved_tx.height != height {
+                    info!(
+                        "updating height from {:?} to {:?} for tx {}",
+                        saved_tx.height, height, txid
+                    );
+                    saved_tx.height = height;
+                    updates.set_tx(&saved_tx)?;
+                }
+
+                debug!("already have {} in db, returning the cached version", txid);
+
+                // unwrap since we explicitly ask for the raw_tx, if it's not present something
+                // went wrong
+                saved_tx.transaction.unwrap()
+            }
+            None => self.0.as_mut().unwrap().transaction_get(&txid)?,
+        };
+
+        let mut incoming: u64 = 0;
+        let mut outgoing: u64 = 0;
+
+        // look for our own inputs
+        for (i, input) in tx.input.iter().enumerate() {
+            // the fact that we visit addresses in a BFS fashion starting from the external addresses
+            // should ensure that this query is always consistent (i.e. when we get to call this all
+            // the transactions at a lower depth have already been indexed, so if an outpoint is ours
+            // we are guaranteed to have it in the db).
+            if let Some(previous_output) = database.get_previous_output(&input.previous_output)? {
+                if database.is_mine(&previous_output.script_pubkey)? {
+                    outgoing += previous_output.value;
+
+                    debug!("{} input #{} is mine, removing from utxo", txid, i);
+                    updates.del_utxo(&input.previous_output)?;
+                }
+            }
+        }
+
+        let mut to_check_later = vec![];
+        for (i, output) in tx.output.iter().enumerate() {
+            // this output is ours, we have a path to derive it
+            if let Some((script_type, path)) =
+                database.get_path_from_script_pubkey(&output.script_pubkey)?
+            {
+                debug!("{} output #{} is mine, adding utxo", txid, i);
+                updates.set_utxo(&UTXO {
+                    outpoint: OutPoint::new(tx.txid(), i as u32),
+                    txout: output.clone(),
+                })?;
+                incoming += output.value;
+
+                if output.script_pubkey != *cur_script {
+                    debug!("{} output #{} script {} was not current script, adding script to be checked later", txid, i, output.script_pubkey);
+                    to_check_later.push(output.script_pubkey.clone())
+                }
+
+                // derive as many change addrs as external addresses that we've seen
+                if script_type == ScriptType::Internal
+                    && u32::from(path.as_ref()[0]) > *change_max_deriv
+                {
+                    *change_max_deriv = u32::from(path.as_ref()[0]);
+                }
+            }
+        }
+
+        let tx = TransactionDetails {
+            txid: tx.txid(),
+            transaction: Some(tx),
+            received: incoming,
+            sent: outgoing,
+            height,
+            timestamp: 0,
+        };
+        info!("Saving tx {}", txid);
+        updates.set_tx(&tx)?;
+
+        database.commit_batch(updates)?;
+
+        Ok(to_check_later)
+    }
+
+    fn check_history<D: DatabaseUtils + BatchDatabase>(
+        &mut self,
+        database: &mut D,
+        script_pubkey: Script,
+        txs: Vec<GetHistoryRes>,
+        change_max_deriv: &mut u32,
+    ) -> Result<Vec<Script>, Error> {
+        let mut to_check_later = Vec::new();
+
+        debug!(
+            "history of {} script {} has {} tx",
+            Address::from_script(&script_pubkey, Network::Testnet).unwrap(),
+            script_pubkey,
+            txs.len()
+        );
+
+        for tx in txs {
+            let height: Option<u32> = match tx.height {
+                0 | -1 => None,
+                x => u32::try_from(x).ok(),
+            };
+
+            to_check_later.extend_from_slice(&self.check_tx_and_descendant(
+                database,
+                &tx.tx_hash,
+                height,
+                &script_pubkey,
+                change_max_deriv,
+            )?);
+        }
+
+        Ok(to_check_later)
+    }
+}
diff --git a/src/blockchain/mod.rs b/src/blockchain/mod.rs
new file mode 100644 (file)
index 0000000..5ea3fdb
--- /dev/null
@@ -0,0 +1,86 @@
+use std::collections::HashSet;
+use std::sync::mpsc::{channel, Receiver, Sender};
+
+use bitcoin::{Transaction, Txid};
+
+use crate::database::{BatchDatabase, DatabaseUtils};
+use crate::error::Error;
+
+#[cfg(feature = "electrum")]
+pub mod electrum;
+#[cfg(feature = "electrum")]
+pub use self::electrum::ElectrumBlockchain;
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+pub enum Capability {
+    FullHistory,
+    GetAnyTx,
+}
+
+pub trait Blockchain {
+    fn offline() -> Self;
+}
+
+pub struct OfflineBlockchain;
+impl Blockchain for OfflineBlockchain {
+    fn offline() -> Self {
+        OfflineBlockchain
+    }
+}
+
+pub trait OnlineBlockchain: Blockchain {
+    fn get_capabilities(&self) -> HashSet<Capability>;
+
+    fn setup<D: BatchDatabase + DatabaseUtils, P: Progress>(
+        &mut self,
+        stop_gap: Option<usize>,
+        database: &mut D,
+        progress_update: P,
+    ) -> Result<(), Error>;
+    fn sync<D: BatchDatabase + DatabaseUtils, P: Progress>(
+        &mut self,
+        stop_gap: Option<usize>,
+        database: &mut D,
+        progress_update: P,
+    ) -> Result<(), Error> {
+        self.setup(stop_gap, database, progress_update)
+    }
+
+    fn get_tx(&mut self, txid: &Txid) -> Result<Option<Transaction>, Error>;
+    fn broadcast(&mut self, tx: &Transaction) -> Result<(), Error>;
+
+    fn get_height(&mut self) -> Result<usize, Error>;
+}
+
+pub type ProgressData = (f32, Option<String>);
+
+pub trait Progress {
+    fn update(&self, progress: f32, message: Option<String>) -> Result<(), Error>;
+}
+
+pub fn progress() -> (Sender<ProgressData>, Receiver<ProgressData>) {
+    channel()
+}
+
+impl Progress for Sender<ProgressData> {
+    fn update(&self, progress: f32, message: Option<String>) -> Result<(), Error> {
+        if progress < 0.0 || progress > 100.0 {
+            return Err(Error::InvalidProgressValue(progress));
+        }
+
+        self.send((progress, message))
+            .map_err(|_| Error::ProgressUpdateError)
+    }
+}
+
+pub struct NoopProgress;
+
+pub fn noop_progress() -> NoopProgress {
+    NoopProgress
+}
+
+impl Progress for NoopProgress {
+    fn update(&self, _progress: f32, _message: Option<String>) -> Result<(), Error> {
+        Ok(())
+    }
+}
index ddf2ceafa65c1546e4c2d48ceb223f6258ff7c12..495d9393c79fd62e93df2d2e565abd4072a9ac78 100644 (file)
@@ -1,9 +1,6 @@
 use std::collections::BTreeMap;
-use std::convert::{From, TryInto};
 use std::ops::Bound::{Excluded, Included};
 
-use serde::Serialize;
-
 use bitcoin::consensus::encode::{deserialize, serialize};
 use bitcoin::hash_types::Txid;
 use bitcoin::util::bip32::{ChildNumber, DerivationPath};
index 5c4a3e2ffdd931be38f4ff49705683611432a773..e4a69234d46ae7df77aefec2dbf78febfc7d9404 100644 (file)
@@ -1,6 +1,6 @@
 use bitcoin::hash_types::Txid;
 use bitcoin::util::bip32::{ChildNumber, DerivationPath};
-use bitcoin::{OutPoint, Script, Transaction};
+use bitcoin::{OutPoint, Script, Transaction, TxOut};
 
 use crate::error::Error;
 use crate::types::*;
@@ -76,3 +76,34 @@ pub trait BatchDatabase: Database {
     fn begin_batch(&self) -> Self::Batch;
     fn commit_batch(&mut self, batch: Self::Batch) -> Result<(), Error>;
 }
+
+pub trait DatabaseUtils: Database {
+    fn is_mine(&self, script: &Script) -> Result<bool, Error> {
+        self.get_path_from_script_pubkey(script)
+            .map(|o| o.is_some())
+    }
+
+    fn get_raw_tx_or<F>(&self, txid: &Txid, f: F) -> Result<Option<Transaction>, Error>
+    where
+        F: FnOnce() -> Result<Option<Transaction>, Error>,
+    {
+        self.get_tx(txid, true)?
+            .map(|t| t.transaction)
+            .flatten()
+            .map_or_else(f, |t| Ok(Some(t)))
+    }
+
+    fn get_previous_output(&self, outpoint: &OutPoint) -> Result<Option<TxOut>, Error> {
+        self.get_raw_tx(&outpoint.txid)?
+            .and_then(|previous_tx| {
+                if outpoint.vout as usize >= previous_tx.output.len() {
+                    Some(Err(Error::InvalidOutpoint(outpoint.clone())))
+                } else {
+                    Some(Ok(previous_tx.output[outpoint.vout as usize].clone()))
+                }
+            })
+            .transpose()
+    }
+}
+
+impl<T: Database> DatabaseUtils for T {}
index fa17199e432c13baa66bf411d2732c341cdd2488..64fb07a72c52c00fc3ff2751c5351022f544d660 100644 (file)
@@ -27,6 +27,13 @@ pub enum Error {
     InputMissingWitnessScript(usize),
     MissingUTXO,
 
+    // Blockchain interface errors
+    Uncapable(crate::blockchain::Capability),
+    InvalidProgressValue(f32),
+    ProgressUpdateError,
+    MissingCachedAddresses,
+    InvalidOutpoint(OutPoint),
+
     Descriptor(crate::descriptor::error::Error),
 
     Encode(bitcoin::consensus::encode::Error),
index 2d122d987bd1326260b3c9b9ed2adc935e490124..f1e40deef1d777921dffce09569286c1104b0057 100644 (file)
@@ -18,6 +18,7 @@ pub extern crate sled;
 
 #[macro_use]
 pub mod error;
+pub mod blockchain;
 pub mod database;
 pub mod descriptor;
 pub mod psbt;
index 871d09ed688f4027c1684d042c7e6dae9d24f65d..2350441dc422916dca9b23e2a1b0850602664f60 100644 (file)
@@ -1,8 +1,6 @@
 use std::cell::RefCell;
-use std::cmp;
-use std::collections::{BTreeMap, HashSet, VecDeque};
-use std::convert::TryFrom;
-use std::io::{Read, Write};
+use std::collections::{BTreeMap, HashSet};
+use std::ops::DerefMut;
 use std::str::FromStr;
 use std::time::{Instant, SystemTime, UNIX_EPOCH};
 
@@ -20,39 +18,35 @@ use miniscript::BitcoinSig;
 #[allow(unused_imports)]
 use log::{debug, error, info, trace};
 
-pub mod offline_stream;
 pub mod utils;
 
-pub type OfflineWallet<D> = Wallet<offline_stream::OfflineStream, D>;
-
-use self::utils::{ChunksIterator, IsDust};
-use crate::database::{BatchDatabase, BatchOperations};
+use self::utils::IsDust;
+use crate::blockchain::{noop_progress, Blockchain, OfflineBlockchain, OnlineBlockchain};
+use crate::database::{BatchDatabase, BatchOperations, DatabaseUtils};
 use crate::descriptor::{get_checksum, DescriptorMeta, ExtendedDescriptor, ExtractPolicy, Policy};
 use crate::error::Error;
 use crate::psbt::{utils::PSBTUtils, PSBTSatisfier, PSBTSigner};
 use crate::signer::Signer;
 use crate::types::*;
 
-#[cfg(any(feature = "electrum", feature = "default"))]
-use electrum_client::types::*;
-#[cfg(any(feature = "electrum", feature = "default"))]
-use electrum_client::Client;
-#[cfg(not(any(feature = "electrum", feature = "default")))]
-use std::marker::PhantomData as Client;
+pub type OfflineWallet<D> = Wallet<OfflineBlockchain, D>;
+
+//#[cfg(feature = "electrum")]
+//pub type ElectrumWallet<S, D> = Wallet<crate::blockchain::ElectrumBlockchain<electrum_client::Client<S>>, D>;
 
-pub struct Wallet<S: Read + Write, D: BatchDatabase> {
+pub struct Wallet<B: Blockchain, D: BatchDatabase> {
     descriptor: ExtendedDescriptor,
     change_descriptor: Option<ExtendedDescriptor>,
     network: Network,
 
-    client: Option<RefCell<Client<S>>>,
+    client: RefCell<B>,
     database: RefCell<D>,
 }
 
 // offline actions, always available
-impl<S, D> Wallet<S, D>
+impl<B, D> Wallet<B, D>
 where
-    S: Read + Write,
+    B: Blockchain,
     D: BatchDatabase,
 {
     pub fn new_offline(
@@ -88,7 +82,7 @@ where
             change_descriptor,
             network,
 
-            client: None,
+            client: RefCell::new(B::offline()),
             database: RefCell::new(database),
         })
     }
@@ -107,7 +101,7 @@ where
     }
 
     pub fn is_mine(&self, script: &Script) -> Result<bool, Error> {
-        self.get_path(script).map(|x| x.is_some())
+        self.database.borrow().is_mine(script)
     }
 
     pub fn list_unspent(&self) -> Result<Vec<UTXO>, Error> {
@@ -510,10 +504,6 @@ where
             .as_secs()
     }
 
-    fn get_path(&self, script: &Script) -> Result<Option<(ScriptType, DerivationPath)>, Error> {
-        self.database.borrow().get_path_from_script_pubkey(script)
-    }
-
     fn get_descriptor_for(&self, script_type: ScriptType) -> &ExtendedDescriptor {
         let desc = match script_type {
             ScriptType::External => &self.descriptor,
@@ -679,10 +669,9 @@ where
     }
 }
 
-#[cfg(any(feature = "electrum", feature = "default"))]
-impl<S, D> Wallet<S, D>
+impl<B, D> Wallet<B, D>
 where
-    S: Read + Write,
+    B: OnlineBlockchain,
     D: BatchDatabase,
 {
     pub fn new(
@@ -690,7 +679,7 @@ where
         change_descriptor: Option<&str>,
         network: Network,
         mut database: D,
-        client: Client<S>,
+        client: B,
     ) -> Result<Self, Error> {
         database.check_descriptor_checksum(
             ScriptType::External,
@@ -719,154 +708,15 @@ where
             change_descriptor,
             network,
 
-            client: Some(RefCell::new(client)),
+            client: RefCell::new(client),
             database: RefCell::new(database),
         })
     }
 
-    fn get_previous_output(&self, outpoint: &OutPoint) -> Option<TxOut> {
-        // the fact that we visit addresses in a BFS fashion starting from the external addresses
-        // should ensure that this query is always consistent (i.e. when we get to call this all
-        // the transactions at a lower depth have already been indexed, so if an outpoint is ours
-        // we are guaranteed to have it in the db).
-        self.database
-            .borrow()
-            .get_raw_tx(&outpoint.txid)
-            .unwrap()
-            .map(|previous_tx| previous_tx.output[outpoint.vout as usize].clone())
-    }
-
-    fn check_tx_and_descendant(
-        &self,
-        txid: &Txid,
-        height: Option<u32>,
-        cur_script: &Script,
-        change_max_deriv: &mut u32,
-    ) -> Result<Vec<Script>, Error> {
-        debug!(
-            "check_tx_and_descendant of {}, height: {:?}, script: {}",
-            txid, height, cur_script
-        );
-        let mut updates = self.database.borrow().begin_batch();
-        let tx = match self.database.borrow().get_tx(&txid, true)? {
-            // TODO: do we need the raw?
-            Some(mut saved_tx) => {
-                // update the height if it's different (in case of reorg)
-                if saved_tx.height != height {
-                    info!(
-                        "updating height from {:?} to {:?} for tx {}",
-                        saved_tx.height, height, txid
-                    );
-                    saved_tx.height = height;
-                    updates.set_tx(&saved_tx)?;
-                }
-
-                debug!("already have {} in db, returning the cached version", txid);
-
-                // unwrap since we explicitly ask for the raw_tx, if it's not present something
-                // went wrong
-                saved_tx.transaction.unwrap()
-            }
-            None => self
-                .client
-                .as_ref()
-                .unwrap()
-                .borrow_mut()
-                .transaction_get(&txid)?,
-        };
-
-        let mut incoming: u64 = 0;
-        let mut outgoing: u64 = 0;
-
-        // look for our own inputs
-        for (i, input) in tx.input.iter().enumerate() {
-            if let Some(previous_output) = self.get_previous_output(&input.previous_output) {
-                if self.is_mine(&previous_output.script_pubkey)? {
-                    outgoing += previous_output.value;
-
-                    debug!("{} input #{} is mine, removing from utxo", txid, i);
-                    updates.del_utxo(&input.previous_output)?;
-                }
-            }
-        }
-
-        let mut to_check_later = vec![];
-        for (i, output) in tx.output.iter().enumerate() {
-            // this output is ours, we have a path to derive it
-            if let Some((script_type, path)) = self.get_path(&output.script_pubkey)? {
-                debug!("{} output #{} is mine, adding utxo", txid, i);
-                updates.set_utxo(&UTXO {
-                    outpoint: OutPoint::new(tx.txid(), i as u32),
-                    txout: output.clone(),
-                })?;
-                incoming += output.value;
-
-                if output.script_pubkey != *cur_script {
-                    debug!("{} output #{} script {} was not current script, adding script to be checked later", txid, i, output.script_pubkey);
-                    to_check_later.push(output.script_pubkey.clone())
-                }
-
-                // derive as many change addrs as external addresses that we've seen
-                if script_type == ScriptType::Internal
-                    && u32::from(path.as_ref()[0]) > *change_max_deriv
-                {
-                    *change_max_deriv = u32::from(path.as_ref()[0]);
-                }
-            }
-        }
-
-        let tx = TransactionDetails {
-            txid: tx.txid(),
-            transaction: Some(tx),
-            received: incoming,
-            sent: outgoing,
-            height,
-            timestamp: 0,
-        };
-        info!("Saving tx {}", txid);
-
-        updates.set_tx(&tx)?;
-        self.database.borrow_mut().commit_batch(updates)?;
-
-        Ok(to_check_later)
-    }
-
-    fn check_history(
-        &self,
-        script_pubkey: Script,
-        txs: Vec<GetHistoryRes>,
-        change_max_deriv: &mut u32,
-    ) -> Result<Vec<Script>, Error> {
-        let mut to_check_later = Vec::new();
-
-        debug!(
-            "history of {} script {} has {} tx",
-            Address::from_script(&script_pubkey, self.network).unwrap(),
-            script_pubkey,
-            txs.len()
-        );
-
-        for tx in txs {
-            let height: Option<u32> = match tx.height {
-                0 | -1 => None,
-                x => u32::try_from(x).ok(),
-            };
-
-            to_check_later.extend_from_slice(&self.check_tx_and_descendant(
-                &tx.tx_hash,
-                height,
-                &script_pubkey,
-                change_max_deriv,
-            )?);
-        }
-
-        Ok(to_check_later)
-    }
-
     pub fn sync(
         &self,
         max_address: Option<u32>,
-        batch_query_size: Option<usize>,
+        _batch_query_size: Option<usize>,
     ) -> Result<(), Error> {
         debug!("begin sync...");
         // TODO: consider taking an RwLock as writere here to prevent other "read-only" calls to
@@ -878,8 +728,8 @@ where
             max_address.unwrap_or(100)
         };
 
-        let batch_query_size = batch_query_size.unwrap_or(20);
-        let stop_gap = batch_query_size;
+        // TODO:
+        // let batch_query_size = batch_query_size.unwrap_or(20);
 
         let path = DerivationPath::from(vec![ChildNumber::Normal { index: max_address }]);
         let last_addr = self
@@ -923,154 +773,16 @@ where
             self.database.borrow_mut().commit_batch(address_batch)?;
         }
 
-        // check unconfirmed tx, delete so they are retrieved later
-        let mut del_batch = self.database.borrow().begin_batch();
-        for tx in self.database.borrow().iter_txs(false)? {
-            if tx.height.is_none() {
-                del_batch.del_tx(&tx.txid, false)?;
-            }
-        }
-        self.database.borrow_mut().commit_batch(del_batch)?;
-
-        // maximum derivation index for a change address that we've seen during sync
-        let mut change_max_deriv = 0;
-
-        let mut already_checked: HashSet<Script> = HashSet::new();
-        let mut to_check_later = VecDeque::with_capacity(batch_query_size);
-
-        // insert the first chunk
-        let mut iter_scriptpubkeys = self
-            .database
-            .borrow()
-            .iter_script_pubkeys(Some(ScriptType::External))?
-            .into_iter();
-        let chunk: Vec<Script> = iter_scriptpubkeys.by_ref().take(batch_query_size).collect();
-        for item in chunk.into_iter().rev() {
-            to_check_later.push_front(item);
-        }
-
-        let mut iterating_external = true;
-        let mut index = 0;
-        let mut last_found = 0;
-        while !to_check_later.is_empty() {
-            trace!("to_check_later size {}", to_check_later.len());
-
-            let until = cmp::min(to_check_later.len(), batch_query_size);
-            let chunk: Vec<Script> = to_check_later.drain(..until).collect();
-            let call_result = self
-                .client
-                .as_ref()
-                .unwrap()
-                .borrow_mut()
-                .batch_script_get_history(chunk.iter())?;
-
-            for (script, history) in chunk.into_iter().zip(call_result.into_iter()) {
-                trace!("received history for {:?}, size {}", script, history.len());
-
-                if !history.is_empty() {
-                    last_found = index;
-
-                    let mut check_later_scripts = self
-                        .check_history(script, history, &mut change_max_deriv)?
-                        .into_iter()
-                        .filter(|x| already_checked.insert(x.clone()))
-                        .collect();
-                    to_check_later.append(&mut check_later_scripts);
-                }
-
-                index += 1;
-            }
-
-            match iterating_external {
-                true if index - last_found >= stop_gap => iterating_external = false,
-                true => {
-                    trace!("pushing one more batch from `iter_scriptpubkeys`. index = {}, last_found = {}, stop_gap = {}", index, last_found, stop_gap);
-
-                    let chunk: Vec<Script> =
-                        iter_scriptpubkeys.by_ref().take(batch_query_size).collect();
-                    for item in chunk.into_iter().rev() {
-                        to_check_later.push_front(item);
-                    }
-                }
-                _ => {}
-            }
-        }
-
-        // check utxo
-        // TODO: try to minimize network requests and re-use scripts if possible
-        let mut batch = self.database.borrow().begin_batch();
-        for chunk in ChunksIterator::new(
-            self.database.borrow().iter_utxos()?.into_iter(),
-            batch_query_size,
-        ) {
-            let scripts: Vec<_> = chunk.iter().map(|u| &u.txout.script_pubkey).collect();
-            let call_result = self
-                .client
-                .as_ref()
-                .unwrap()
-                .borrow_mut()
-                .batch_script_list_unspent(scripts)?;
-
-            // check which utxos are actually still unspent
-            for (utxo, list_unspent) in chunk.into_iter().zip(call_result.iter()) {
-                debug!(
-                    "outpoint {:?} is unspent for me, list unspent is {:?}",
-                    utxo.outpoint, list_unspent
-                );
-
-                let mut spent = true;
-                for unspent in list_unspent {
-                    let res_outpoint = OutPoint::new(unspent.tx_hash, unspent.tx_pos as u32);
-                    if utxo.outpoint == res_outpoint {
-                        spent = false;
-                        break;
-                    }
-                }
-                if spent {
-                    info!("{} not anymore unspent, removing", utxo.outpoint);
-                    batch.del_utxo(&utxo.outpoint)?;
-                }
-            }
-        }
-
-        let current_ext = self
-            .database
-            .borrow()
-            .get_last_index(ScriptType::External)?
-            .unwrap_or(0);
-        let first_ext_new = last_found as u32 + 1;
-        if first_ext_new > current_ext {
-            info!("Setting external index to {}", first_ext_new);
-            self.database
-                .borrow_mut()
-                .set_last_index(ScriptType::External, first_ext_new)?;
-        }
-
-        let current_int = self
-            .database
-            .borrow()
-            .get_last_index(ScriptType::Internal)?
-            .unwrap_or(0);
-        let first_int_new = change_max_deriv + 1;
-        if first_int_new > current_int {
-            info!("Setting internal index to {}", first_int_new);
-            self.database
-                .borrow_mut()
-                .set_last_index(ScriptType::Internal, first_int_new)?;
-        }
-
-        self.database.borrow_mut().commit_batch(batch)?;
-
-        Ok(())
+        self.client.borrow_mut().sync(
+            None,
+            self.database.borrow_mut().deref_mut(),
+            noop_progress(),
+        )
     }
 
     pub fn broadcast(&self, psbt: PSBT) -> Result<(Txid, Transaction), Error> {
         let extracted = psbt.extract_tx();
-        self.client
-            .as_ref()
-            .unwrap()
-            .borrow_mut()
-            .transaction_broadcast(&extracted)?;
+        self.client.borrow_mut().broadcast(&extracted)?;
 
         Ok((extracted.txid(), extracted))
     }
index 721032d277c36f3512373dbfe19d465ec3c56f35..69ea645194526167b66341a35441223ed3dde9ec 100644 (file)
@@ -1,7 +1,7 @@
 use std::io::{self, Error, ErrorKind, Read, Write};
 
 #[derive(Clone, Debug)]
-pub struct OfflineStream {}
+pub struct OfflineStream;
 
 impl Read for OfflineStream {
     fn read(&mut self, _buf: &mut [u8]) -> io::Result<usize> {