]> Untitled Git - bdk/commitdiff
Merge branch 'master' into sync_pipeline
authorLLFourn <lloyd.fourn@gmail.com>
Tue, 23 Nov 2021 00:28:18 +0000 (11:28 +1100)
committerLLFourn <lloyd.fourn@gmail.com>
Tue, 23 Nov 2021 01:53:40 +0000 (12:53 +1100)
1  2 
CHANGELOG.md
src/blockchain/electrum.rs
src/blockchain/esplora/api.rs
src/blockchain/script_sync.rs
src/testutils/blockchain_tests.rs

diff --cc CHANGELOG.md
index fae09c5627546a27b6cb57b490caac356e34488c,48b0bca3c71ce91b860b4ec584d3ddc08ea99e14..866dc2e14874a81a8c0646f1f40e7e151ccfbdf3
@@@ -8,9 -8,8 +8,11 @@@ and this project adheres to [Semantic V
  
  - BIP39 implementation dependency, in `keys::bip39` changed from tiny-bip39 to rust-bip39.
  - Add new method on the `TxBuilder` to embed data in the transaction via `OP_RETURN`. To allow that a fix to check the dust only on spendable output has been introduced.
 +- Overhauled sync logic for electrum and esplora.
 +- Unify ureq and reqwest esplora backends to have the same configuration parameters. This means reqwest now has a timeout parameter and ureq has a concurrency parameter.
 +- Fixed esplora fee estimation.
+ - Update the `Database` trait to store the last sync timestamp and block height
+ - Rename `ConfirmationTime` to `BlockTime`
  
  ## [v0.13.0] - [v0.12.0]
  
index 53fb6e8691da40f387b7a7f7e1b1b6f17b679bcd,53d4dabb92b40bc6f2ed66bde5b3de90df50a539..cb079a8743818718d84df0774016750261d66200
@@@ -33,11 -33,11 +33,11 @@@ use bitcoin::{Transaction, Txid}
  
  use electrum_client::{Client, ConfigBuilder, ElectrumApi, Socks5Config};
  
 -use self::utils::{ElectrumLikeSync, ElsGetHistoryRes};
 +use super::script_sync::Request;
  use super::*;
 -use crate::database::BatchDatabase;
 +use crate::database::{BatchDatabase, Database};
  use crate::error::Error;
- use crate::{ConfirmationTime, FeeRate};
 -use crate::FeeRate;
++use crate::{BlockTime, FeeRate};
  
  /// Wrapper over an Electrum Client that implements the required blockchain traits
  ///
@@@ -71,139 -71,10 +71,139 @@@ impl Blockchain for ElectrumBlockchain 
      fn setup<D: BatchDatabase, P: Progress>(
          &self,
          database: &mut D,
 -        progress_update: P,
 +        _progress_update: P,
      ) -> Result<(), Error> {
 -        self.client
 -            .electrum_like_setup(self.stop_gap, database, progress_update)
 +        let mut request = script_sync::start(database, self.stop_gap)?;
 +        let mut block_times = HashMap::<u32, u32>::new();
 +        let mut txid_to_height = HashMap::<Txid, u32>::new();
 +        let mut tx_cache = TxCache::new(database, &self.client);
 +        let chunk_size = self.stop_gap;
 +        // The electrum server has been inconsistent somehow in its responses during sync. For
 +        // example, we do a batch request of transactions and the response contains less
 +        // tranascations than in the request. This should never happen but we don't want to panic.
 +        let electrum_goof = || Error::Generic("electrum server misbehaving".to_string());
 +
 +        let batch_update = loop {
 +            request = match request {
 +                Request::Script(script_req) => {
 +                    let scripts = script_req.request().take(chunk_size);
 +                    let txids_per_script: Vec<Vec<_>> = self
 +                        .client
 +                        .batch_script_get_history(scripts)
 +                        .map_err(Error::Electrum)?
 +                        .into_iter()
 +                        .map(|txs| {
 +                            txs.into_iter()
 +                                .map(|tx| {
 +                                    let tx_height = match tx.height {
 +                                        none if none <= 0 => None,
 +                                        height => {
 +                                            txid_to_height.insert(tx.tx_hash, height as u32);
 +                                            Some(height as u32)
 +                                        }
 +                                    };
 +                                    (tx.tx_hash, tx_height)
 +                                })
 +                                .collect()
 +                        })
 +                        .collect();
 +
 +                    script_req.satisfy(txids_per_script)?
 +                }
 +
 +                Request::Conftime(conftime_req) => {
 +                    // collect up to chunk_size heights to fetch from electrum
 +                    let needs_block_height = {
 +                        let mut needs_block_height_iter = conftime_req
 +                            .request()
 +                            .filter_map(|txid| txid_to_height.get(txid).cloned())
 +                            .filter(|height| block_times.get(height).is_none());
 +                        let mut needs_block_height = HashSet::new();
 +
 +                        while needs_block_height.len() < chunk_size {
 +                            match needs_block_height_iter.next() {
 +                                Some(height) => needs_block_height.insert(height),
 +                                None => break,
 +                            };
 +                        }
 +                        needs_block_height
 +                    };
 +
 +                    let new_block_headers = self
 +                        .client
 +                        .batch_block_header(needs_block_height.iter().cloned())?;
 +
 +                    for (height, header) in needs_block_height.into_iter().zip(new_block_headers) {
 +                        block_times.insert(height, header.time);
 +                    }
 +
 +                    let conftimes = conftime_req
 +                        .request()
 +                        .take(chunk_size)
 +                        .map(|txid| {
 +                            let confirmation_time = txid_to_height
 +                                .get(txid)
 +                                .map(|height| {
 +                                    let timestamp =
 +                                        *block_times.get(height).ok_or_else(electrum_goof)?;
-                                     Result::<_, Error>::Ok(ConfirmationTime {
++                                    Result::<_, Error>::Ok(BlockTime {
 +                                        height: *height,
 +                                        timestamp: timestamp.into(),
 +                                    })
 +                                })
 +                                .transpose()?;
 +                            Ok(confirmation_time)
 +                        })
 +                        .collect::<Result<_, Error>>()?;
 +
 +                    conftime_req.satisfy(conftimes)?
 +                }
 +                Request::Tx(tx_req) => {
 +                    let needs_full = tx_req.request().take(chunk_size);
 +                    tx_cache.save_txs(needs_full.clone())?;
 +                    let full_transactions = needs_full
 +                        .map(|txid| tx_cache.get(*txid).ok_or_else(electrum_goof))
 +                        .collect::<Result<Vec<_>, _>>()?;
 +                    let input_txs = full_transactions.iter().flat_map(|tx| {
 +                        tx.input
 +                            .iter()
 +                            .filter(|input| !input.previous_output.is_null())
 +                            .map(|input| &input.previous_output.txid)
 +                    });
 +                    tx_cache.save_txs(input_txs)?;
 +
 +                    let full_details = full_transactions
 +                        .into_iter()
 +                        .map(|tx| {
 +                            let prev_outputs = tx
 +                                .input
 +                                .iter()
 +                                .map(|input| {
 +                                    if input.previous_output.is_null() {
 +                                        return Ok(None);
 +                                    }
 +                                    let prev_tx = tx_cache
 +                                        .get(input.previous_output.txid)
 +                                        .ok_or_else(electrum_goof)?;
 +                                    let txout = prev_tx
 +                                        .output
 +                                        .get(input.previous_output.vout as usize)
 +                                        .ok_or_else(electrum_goof)?;
 +                                    Ok(Some(txout.clone()))
 +                                })
 +                                .collect::<Result<Vec<_>, Error>>()?;
 +                            Ok((prev_outputs, tx))
 +                        })
 +                        .collect::<Result<Vec<_>, Error>>()?;
 +
 +                    tx_req.satisfy(full_details)?
 +                }
 +                Request::Finish(batch_update) => break batch_update,
 +            }
 +        };
 +
 +        database.commit_batch(batch_update)?;
 +        Ok(())
      }
  
      fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
index 74c46c885722abe80c3d259c1077c7312a5a6921,0000000000000000000000000000000000000000..4e0e3f88b8b38509bd2d346ca82f8f4d31feaf41
mode 100644,000000..100644
--- /dev/null
@@@ -1,117 -1,0 +1,117 @@@
- use crate::ConfirmationTime;
 +//! structs from the esplora API
 +//!
 +//! see: <https://github.com/Blockstream/esplora/blob/master/API.md>
-     pub fn confirmation_time(&self) -> Option<ConfirmationTime> {
++use crate::BlockTime;
 +use bitcoin::{OutPoint, Script, Transaction, TxIn, TxOut, Txid};
 +
 +#[derive(serde::Deserialize, Clone, Debug)]
 +pub struct PrevOut {
 +    pub value: u64,
 +    pub scriptpubkey: Script,
 +}
 +
 +#[derive(serde::Deserialize, Clone, Debug)]
 +pub struct Vin {
 +    pub txid: Txid,
 +    pub vout: u32,
 +    // None if coinbase
 +    pub prevout: Option<PrevOut>,
 +    pub scriptsig: Script,
 +    #[serde(deserialize_with = "deserialize_witness")]
 +    pub witness: Vec<Vec<u8>>,
 +    pub sequence: u32,
 +    pub is_coinbase: bool,
 +}
 +
 +#[derive(serde::Deserialize, Clone, Debug)]
 +pub struct Vout {
 +    pub value: u64,
 +    pub scriptpubkey: Script,
 +}
 +
 +#[derive(serde::Deserialize, Clone, Debug)]
 +pub struct TxStatus {
 +    pub confirmed: bool,
 +    pub block_height: Option<u32>,
 +    pub block_time: Option<u64>,
 +}
 +
 +#[derive(serde::Deserialize, Clone, Debug)]
 +pub struct Tx {
 +    pub txid: Txid,
 +    pub version: i32,
 +    pub locktime: u32,
 +    pub vin: Vec<Vin>,
 +    pub vout: Vec<Vout>,
 +    pub status: TxStatus,
 +    pub fee: u64,
 +}
 +
 +impl Tx {
 +    pub fn to_tx(&self) -> Transaction {
 +        Transaction {
 +            version: self.version,
 +            lock_time: self.locktime,
 +            input: self
 +                .vin
 +                .iter()
 +                .cloned()
 +                .map(|vin| TxIn {
 +                    previous_output: OutPoint {
 +                        txid: vin.txid,
 +                        vout: vin.vout,
 +                    },
 +                    script_sig: vin.scriptsig,
 +                    sequence: vin.sequence,
 +                    witness: vin.witness,
 +                })
 +                .collect(),
 +            output: self
 +                .vout
 +                .iter()
 +                .cloned()
 +                .map(|vout| TxOut {
 +                    value: vout.value,
 +                    script_pubkey: vout.scriptpubkey,
 +                })
 +                .collect(),
 +        }
 +    }
 +
-             } => Some(ConfirmationTime { timestamp, height }),
++    pub fn confirmation_time(&self) -> Option<BlockTime> {
 +        match self.status {
 +            TxStatus {
 +                confirmed: true,
 +                block_height: Some(height),
 +                block_time: Some(timestamp),
++            } => Some(BlockTime { timestamp, height }),
 +            _ => None,
 +        }
 +    }
 +
 +    pub fn previous_outputs(&self) -> Vec<Option<TxOut>> {
 +        self.vin
 +            .iter()
 +            .cloned()
 +            .map(|vin| {
 +                vin.prevout.map(|po| TxOut {
 +                    script_pubkey: po.scriptpubkey,
 +                    value: po.value,
 +                })
 +            })
 +            .collect()
 +    }
 +}
 +
 +fn deserialize_witness<'de, D>(d: D) -> Result<Vec<Vec<u8>>, D::Error>
 +where
 +    D: serde::de::Deserializer<'de>,
 +{
 +    use crate::serde::Deserialize;
 +    use bitcoin::hashes::hex::FromHex;
 +    let list = Vec::<String>::deserialize(d)?;
 +    list.into_iter()
 +        .map(|hex_str| Vec::<u8>::from_hex(&hex_str))
 +        .collect::<Result<Vec<Vec<u8>>, _>>()
 +        .map_err(serde::de::Error::custom)
 +}
index e7ae2763d554a35dd008896b2b5775252ebad020,0000000000000000000000000000000000000000..4c9b02227e279c92ee3c2f4d13a107c021176556
mode 100644,000000..100644
--- /dev/null
@@@ -1,394 -1,0 +1,394 @@@
-     ConfirmationTime, Error, KeychainKind, LocalUtxo, TransactionDetails,
 +/*!
 +This models a how a sync happens where you have a server that you send your script pubkeys to and it
 +returns associated transactions i.e. electrum.
 +*/
 +#![allow(dead_code)]
 +use crate::{
 +    database::{BatchDatabase, BatchOperations, DatabaseUtils},
 +    wallet::time::Instant,
-         confirmation_times: Vec<Option<ConfirmationTime>>,
++    BlockTime, Error, KeychainKind, LocalUtxo, TransactionDetails,
 +};
 +use bitcoin::{OutPoint, Script, Transaction, TxOut, Txid};
 +use log::*;
 +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
 +
 +/// A request for on-chain information
 +pub enum Request<'a, D: BatchDatabase> {
 +    /// A request for transactions related to script pubkeys.
 +    Script(ScriptReq<'a, D>),
 +    /// A request for confirmation times for some transactions.
 +    Conftime(ConftimeReq<'a, D>),
 +    /// A request for full transaction details of some transactions.
 +    Tx(TxReq<'a, D>),
 +    /// Requests are finished here's a batch database update to reflect data gathered.
 +    Finish(D::Batch),
 +}
 +
 +/// starts a sync
 +pub fn start<D: BatchDatabase>(db: &D, stop_gap: usize) -> Result<Request<'_, D>, Error> {
 +    use rand::seq::SliceRandom;
 +    let mut keychains = vec![KeychainKind::Internal, KeychainKind::External];
 +    // shuffling improve privacy, the server doesn't know my first request is from my internal or external addresses
 +    keychains.shuffle(&mut rand::thread_rng());
 +    let keychain = keychains.pop().unwrap();
 +    let scripts_needed = db
 +        .iter_script_pubkeys(Some(keychain))?
 +        .into_iter()
 +        .collect();
 +    let state = State::new(db);
 +
 +    Ok(Request::Script(ScriptReq {
 +        state,
 +        scripts_needed,
 +        script_index: 0,
 +        stop_gap,
 +        keychain,
 +        next_keychains: keychains,
 +    }))
 +}
 +
 +pub struct ScriptReq<'a, D: BatchDatabase> {
 +    state: State<'a, D>,
 +    script_index: usize,
 +    scripts_needed: VecDeque<Script>,
 +    stop_gap: usize,
 +    keychain: KeychainKind,
 +    next_keychains: Vec<KeychainKind>,
 +}
 +
 +/// The sync starts by returning script pubkeys we are interested in.
 +impl<'a, D: BatchDatabase> ScriptReq<'a, D> {
 +    pub fn request(&self) -> impl Iterator<Item = &Script> + Clone {
 +        self.scripts_needed.iter()
 +    }
 +
 +    pub fn satisfy(
 +        mut self,
 +        // we want to know the txids assoiciated with the script and their height
 +        txids: Vec<Vec<(Txid, Option<u32>)>>,
 +    ) -> Result<Request<'a, D>, Error> {
 +        for (txid_list, script) in txids.iter().zip(self.scripts_needed.iter()) {
 +            debug!(
 +                "found {} transactions for script pubkey {}",
 +                txid_list.len(),
 +                script
 +            );
 +            if !txid_list.is_empty() {
 +                // the address is active
 +                self.state
 +                    .last_active_index
 +                    .insert(self.keychain, self.script_index);
 +            }
 +
 +            for (txid, height) in txid_list {
 +                // have we seen this txid already?
 +                match self.state.db.get_tx(txid, true)? {
 +                    Some(mut details) => {
 +                        let old_height = details.confirmation_time.as_ref().map(|x| x.height);
 +                        match (old_height, height) {
 +                            (None, Some(_)) => {
 +                                // It looks like the tx has confirmed since we last saw it -- we
 +                                // need to know the confirmation time.
 +                                self.state.tx_missing_conftime.insert(*txid, details);
 +                            }
 +                            (Some(old_height), Some(new_height)) if old_height != *new_height => {
 +                                // The height of the tx has changed !? -- It's a reorg get the new confirmation time.
 +                                self.state.tx_missing_conftime.insert(*txid, details);
 +                            }
 +                            (Some(_), None) => {
 +                                // A re-org where the tx is not in the chain anymore.
 +                                details.confirmation_time = None;
 +                                self.state.finished_txs.push(details);
 +                            }
 +                            _ => self.state.finished_txs.push(details),
 +                        }
 +                    }
 +                    None => {
 +                        // we've never seen it let's get the whole thing
 +                        self.state.tx_needed.insert(*txid);
 +                    }
 +                };
 +            }
 +
 +            self.script_index += 1;
 +        }
 +
 +        for _ in txids {
 +            self.scripts_needed.pop_front();
 +        }
 +
 +        let last_active_index = self
 +            .state
 +            .last_active_index
 +            .get(&self.keychain)
 +            .map(|x| x + 1)
 +            .unwrap_or(0); // so no addresses active maps to 0
 +
 +        Ok(
 +            if self.script_index > last_active_index + self.stop_gap
 +                || self.scripts_needed.is_empty()
 +            {
 +                debug!(
 +                    "finished scanning for transactions for keychain {:?} at index {}",
 +                    self.keychain, last_active_index
 +                );
 +                // we're done here -- check if we need to do the next keychain
 +                if let Some(keychain) = self.next_keychains.pop() {
 +                    self.keychain = keychain;
 +                    self.script_index = 0;
 +                    self.scripts_needed = self
 +                        .state
 +                        .db
 +                        .iter_script_pubkeys(Some(keychain))?
 +                        .into_iter()
 +                        .collect();
 +                    Request::Script(self)
 +                } else {
 +                    Request::Tx(TxReq { state: self.state })
 +                }
 +            } else {
 +                Request::Script(self)
 +            },
 +        )
 +    }
 +}
 +
 +/// Then we get full transactions
 +pub struct TxReq<'a, D> {
 +    state: State<'a, D>,
 +}
 +
 +impl<'a, D: BatchDatabase> TxReq<'a, D> {
 +    pub fn request(&self) -> impl Iterator<Item = &Txid> + Clone {
 +        self.state.tx_needed.iter()
 +    }
 +
 +    pub fn satisfy(
 +        mut self,
 +        tx_details: Vec<(Vec<Option<TxOut>>, Transaction)>,
 +    ) -> Result<Request<'a, D>, Error> {
 +        let tx_details: Vec<TransactionDetails> = tx_details
 +            .into_iter()
 +            .zip(self.state.tx_needed.iter())
 +            .map(|((vout, tx), txid)| {
 +                debug!("found tx_details for {}", txid);
 +                assert_eq!(tx.txid(), *txid);
 +                let mut sent: u64 = 0;
 +                let mut received: u64 = 0;
 +                let mut inputs_sum: u64 = 0;
 +                let mut outputs_sum: u64 = 0;
 +
 +                for (txout, input) in vout.into_iter().zip(tx.input.iter()) {
 +                    let txout = match txout {
 +                        Some(txout) => txout,
 +                        None => {
 +                            // skip coinbase inputs
 +                            debug_assert!(
 +                                input.previous_output.is_null(),
 +                                "prevout should only be missing for coinbase"
 +                            );
 +                            continue;
 +                        }
 +                    };
 +
 +                    inputs_sum += txout.value;
 +                    if self.state.db.is_mine(&txout.script_pubkey)? {
 +                        sent += txout.value;
 +                    }
 +                }
 +
 +                for out in &tx.output {
 +                    outputs_sum += out.value;
 +                    if self.state.db.is_mine(&out.script_pubkey)? {
 +                        received += out.value;
 +                    }
 +                }
 +                // we need to saturating sub since we want coinbase txs to map to 0 fee and
 +                // this subtraction will be negative for coinbase txs.
 +                let fee = inputs_sum.saturating_sub(outputs_sum);
 +                Result::<_, Error>::Ok(TransactionDetails {
 +                    txid: *txid,
 +                    transaction: Some(tx),
 +                    received,
 +                    sent,
 +                    // we're going to fill this in later
 +                    confirmation_time: None,
 +                    fee: Some(fee),
 +                    verified: false,
 +                })
 +            })
 +            .collect::<Result<Vec<_>, _>>()?;
 +
 +        for tx_detail in tx_details {
 +            self.state.tx_needed.remove(&tx_detail.txid);
 +            self.state
 +                .tx_missing_conftime
 +                .insert(tx_detail.txid, tx_detail);
 +        }
 +
 +        if !self.state.tx_needed.is_empty() {
 +            Ok(Request::Tx(self))
 +        } else {
 +            Ok(Request::Conftime(ConftimeReq { state: self.state }))
 +        }
 +    }
 +}
 +
 +/// Final step is to get confirmation times
 +pub struct ConftimeReq<'a, D> {
 +    state: State<'a, D>,
 +}
 +
 +impl<'a, D: BatchDatabase> ConftimeReq<'a, D> {
 +    pub fn request(&self) -> impl Iterator<Item = &Txid> + Clone {
 +        self.state.tx_missing_conftime.keys()
 +    }
 +
 +    pub fn satisfy(
 +        mut self,
++        confirmation_times: Vec<Option<BlockTime>>,
 +    ) -> Result<Request<'a, D>, Error> {
 +        let conftime_needed = self
 +            .request()
 +            .cloned()
 +            .take(confirmation_times.len())
 +            .collect::<Vec<_>>();
 +        for (confirmation_time, txid) in confirmation_times.into_iter().zip(conftime_needed.iter())
 +        {
 +            debug!("confirmation time for {} was {:?}", txid, confirmation_time);
 +            if let Some(mut tx_details) = self.state.tx_missing_conftime.remove(txid) {
 +                tx_details.confirmation_time = confirmation_time;
 +                self.state.finished_txs.push(tx_details);
 +            }
 +        }
 +
 +        if self.state.tx_missing_conftime.is_empty() {
 +            Ok(Request::Finish(self.state.into_db_update()?))
 +        } else {
 +            Ok(Request::Conftime(self))
 +        }
 +    }
 +}
 +
 +struct State<'a, D> {
 +    db: &'a D,
 +    last_active_index: HashMap<KeychainKind, usize>,
 +    /// Transactions where we need to get the full details
 +    tx_needed: BTreeSet<Txid>,
 +    /// Transacitions that we know everything about
 +    finished_txs: Vec<TransactionDetails>,
 +    /// Transactions that discovered conftimes should be inserted into
 +    tx_missing_conftime: BTreeMap<Txid, TransactionDetails>,
 +    /// The start of the sync
 +    start_time: Instant,
 +}
 +
 +impl<'a, D: BatchDatabase> State<'a, D> {
 +    fn new(db: &'a D) -> Self {
 +        State {
 +            db,
 +            last_active_index: HashMap::default(),
 +            finished_txs: vec![],
 +            tx_needed: BTreeSet::default(),
 +            tx_missing_conftime: BTreeMap::default(),
 +            start_time: Instant::new(),
 +        }
 +    }
 +    fn into_db_update(self) -> Result<D::Batch, Error> {
 +        debug_assert!(self.tx_needed.is_empty() && self.tx_missing_conftime.is_empty());
 +        let existing_txs = self.db.iter_txs(false)?;
 +        let existing_txids: HashSet<Txid> = existing_txs.iter().map(|tx| tx.txid).collect();
 +        let finished_txs = make_txs_consistent(&self.finished_txs);
 +        let observed_txids: HashSet<Txid> = finished_txs.iter().map(|tx| tx.txid).collect();
 +        let txids_to_delete = existing_txids.difference(&observed_txids);
 +        let mut batch = self.db.begin_batch();
 +
 +        // Delete old txs that no longer exist
 +        for txid in txids_to_delete {
 +            if let Some(raw_tx) = self.db.get_raw_tx(txid)? {
 +                for i in 0..raw_tx.output.len() {
 +                    // Also delete any utxos from the txs that no longer exist.
 +                    let _ = batch.del_utxo(&OutPoint {
 +                        txid: *txid,
 +                        vout: i as u32,
 +                    })?;
 +                }
 +            } else {
 +                unreachable!("we should always have the raw tx");
 +            }
 +            batch.del_tx(txid, true)?;
 +        }
 +
 +        // Set every tx we observed
 +        for finished_tx in &finished_txs {
 +            let tx = finished_tx
 +                .transaction
 +                .as_ref()
 +                .expect("transaction will always be present here");
 +            for (i, output) in tx.output.iter().enumerate() {
 +                if let Some((keychain, _)) =
 +                    self.db.get_path_from_script_pubkey(&output.script_pubkey)?
 +                {
 +                    // add utxos we own from the new transactions we've seen.
 +                    batch.set_utxo(&LocalUtxo {
 +                        outpoint: OutPoint {
 +                            txid: finished_tx.txid,
 +                            vout: i as u32,
 +                        },
 +                        txout: output.clone(),
 +                        keychain,
 +                    })?;
 +                }
 +            }
 +            batch.set_tx(finished_tx)?;
 +        }
 +
 +        // we don't do this in the loop above since we may want to delete some of the utxos we
 +        // just added in case there are new tranasactions that spend form each other.
 +        for finished_tx in &finished_txs {
 +            let tx = finished_tx
 +                .transaction
 +                .as_ref()
 +                .expect("transaction will always be present here");
 +            for input in &tx.input {
 +                // Delete any spent utxos
 +                batch.del_utxo(&input.previous_output)?;
 +            }
 +        }
 +
 +        for (keychain, last_active_index) in self.last_active_index {
 +            batch.set_last_index(keychain, last_active_index as u32)?;
 +        }
 +
 +        info!(
 +            "finished setup, elapsed {:?}ms",
 +            self.start_time.elapsed().as_millis()
 +        );
 +        Ok(batch)
 +    }
 +}
 +
 +/// Remove conflicting transactions -- tie breaking them by fee.
 +fn make_txs_consistent(txs: &[TransactionDetails]) -> Vec<&TransactionDetails> {
 +    let mut utxo_index: HashMap<OutPoint, &TransactionDetails> = HashMap::default();
 +    for tx in txs {
 +        for input in &tx.transaction.as_ref().unwrap().input {
 +            utxo_index
 +                .entry(input.previous_output)
 +                .and_modify(|existing| match (tx.fee, existing.fee) {
 +                    (Some(fee), Some(existing_fee)) if fee > existing_fee => *existing = tx,
 +                    (Some(_), None) => *existing = tx,
 +                    _ => { /* leave it the same */ }
 +                })
 +                .or_insert(tx);
 +        }
 +    }
 +
 +    utxo_index
 +        .into_iter()
 +        .map(|(_, tx)| (tx.txid, tx))
 +        .collect::<HashMap<_, _>>()
 +        .into_iter()
 +        .map(|(_, tx)| tx)
 +        .collect()
 +}
Simple merge