]> Untitled Git - bdk/commitdiff
[electrum_redesign] Introduce redesigned `ElectrumExt`
author志宇 <hello@evanlinjin.me>
Thu, 11 May 2023 16:08:16 +0000 (00:08 +0800)
committer志宇 <hello@evanlinjin.me>
Sat, 3 Jun 2023 19:32:17 +0000 (03:32 +0800)
There are a number of improvements that can be done, but it is in a
decent state to be usable.

Possible improvements:

* Remove requirement to retry obtaining ALL data after reorg is
  detected. Transactions can be anchored to a lower block (not block
  tip), and an `assume_final_depth` value can be used.

* The logic to finalize an update with confirmation time can be improved
  during reorgs to not require returning an error.

crates/electrum/src/lib.rs
crates/electrum/src/v2.rs [new file with mode: 0644]

index 6d352ca10eef11635780f8877cbfe59b890d7d55..051b6375d0a2a55be74bdb078f39376b79534a41 100644 (file)
 //! [`batch_transaction_get`]: ElectrumApi::batch_transaction_get
 //! [`bdk_electrum_example`]: https://github.com/LLFourn/bdk_core_staging/tree/master/bdk_electrum_example
 
-use std::{
-    collections::{BTreeMap, HashMap},
-    fmt::Debug,
-};
-
-pub use bdk_chain;
 use bdk_chain::{
     bitcoin::{hashes::hex::FromHex, BlockHash, OutPoint, Script, Transaction, Txid},
     chain_graph::{self, ChainGraph},
@@ -34,8 +28,15 @@ use bdk_chain::{
     tx_graph::TxGraph,
     BlockId, ConfirmationTime, TxHeight,
 };
-pub use electrum_client;
 use electrum_client::{Client, ElectrumApi, Error};
+use std::{
+    collections::{BTreeMap, HashMap},
+    fmt::Debug,
+};
+
+pub mod v2;
+pub use bdk_chain;
+pub use electrum_client;
 
 /// Trait to extend [`electrum_client::Client`] functionality.
 ///
diff --git a/crates/electrum/src/v2.rs b/crates/electrum/src/v2.rs
new file mode 100644 (file)
index 0000000..6a942a1
--- /dev/null
@@ -0,0 +1,507 @@
+use bdk_chain::{
+    bitcoin::{hashes::hex::FromHex, BlockHash, OutPoint, Script, Transaction, Txid},
+    keychain::LocalUpdate,
+    local_chain::LocalChain,
+    tx_graph::{self, TxGraph},
+    Anchor, BlockId, ConfirmationHeightAnchor, ConfirmationTimeAnchor,
+};
+use electrum_client::{Client, ElectrumApi, Error};
+use std::{
+    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
+    fmt::Debug,
+};
+
+use crate::InternalError;
+
+#[derive(Debug, Clone)]
+pub struct ElectrumUpdate<K, A> {
+    pub graph_update: HashMap<Txid, BTreeSet<A>>,
+    pub chain_update: LocalChain,
+    pub keychain_update: BTreeMap<K, u32>,
+}
+
+impl<K, A> Default for ElectrumUpdate<K, A> {
+    fn default() -> Self {
+        Self {
+            graph_update: Default::default(),
+            chain_update: Default::default(),
+            keychain_update: Default::default(),
+        }
+    }
+}
+
+impl<'a, K, A: Anchor> ElectrumUpdate<K, A> {
+    pub fn missing_full_txs<A2>(
+        &'a self,
+        graph: &'a TxGraph<A2>,
+    ) -> impl Iterator<Item = &'a Txid> + 'a {
+        self.graph_update
+            .keys()
+            .filter(move |&&txid| graph.as_ref().get_tx(txid).is_none())
+    }
+
+    pub fn finalize<T>(self, seen_at: Option<u64>, new_txs: T) -> LocalUpdate<K, A>
+    where
+        T: IntoIterator<Item = Transaction>,
+    {
+        let mut graph_update = TxGraph::<A>::new(new_txs);
+        for (txid, anchors) in self.graph_update {
+            if let Some(seen_at) = seen_at {
+                let _ = graph_update.insert_seen_at(txid, seen_at);
+            }
+            for anchor in anchors {
+                let _ = graph_update.insert_anchor(txid, anchor);
+            }
+        }
+        dbg!(graph_update.full_txs().count());
+        LocalUpdate {
+            keychain: self.keychain_update,
+            graph: graph_update,
+            chain: self.chain_update,
+        }
+    }
+}
+
+impl<K> ElectrumUpdate<K, ConfirmationHeightAnchor> {
+    pub fn finalize_as_confirmation_time<T>(
+        self,
+        client: &Client,
+        seen_at: Option<u64>,
+        new_txs: T,
+    ) -> Result<LocalUpdate<K, ConfirmationTimeAnchor>, Error>
+    where
+        T: IntoIterator<Item = Transaction>,
+    {
+        let update = self.finalize(seen_at, new_txs);
+        let update_tip = update.chain.tip().expect("must have tip");
+
+        let relevant_heights = {
+            let mut visited_heights = HashSet::new();
+            update
+                .graph
+                .all_anchors()
+                .iter()
+                .map(|(a, _)| a.confirmation_height_upper_bound())
+                .filter(move |&h| visited_heights.insert(h))
+                .collect::<Vec<_>>()
+        };
+
+        let height_to_time = relevant_heights
+            .clone()
+            .into_iter()
+            .zip(
+                client
+                    .batch_block_header(relevant_heights)?
+                    .into_iter()
+                    .map(|bh| bh.time as u64),
+            )
+            .collect::<HashMap<u32, u64>>();
+
+        if update_tip.hash != client.block_header(update_tip.height as _)?.block_hash() {
+            // [TODO] We should alter the logic so we won't have to return an error. This is to
+            // [TODO] ensure obtained block times are "anchored" to our tip. If we exclude this, it
+            // [TODO] should be "safe" as well. Tx confirmation times would just slightly vary.
+            return Err(Error::Message(format!(
+                "tip changed during update: update_tip={:?}",
+                update_tip
+            )));
+        }
+
+        let graph_additions = {
+            let old_additions = TxGraph::default().determine_additions(&update.graph);
+            tx_graph::Additions {
+                tx: old_additions.tx,
+                txout: old_additions.txout,
+                last_seen: old_additions.last_seen,
+                anchors: old_additions
+                    .anchors
+                    .into_iter()
+                    .map(|(height_anchor, txid)| {
+                        let confirmation_height = dbg!(height_anchor.confirmation_height);
+                        let confirmation_time = height_to_time[&confirmation_height];
+                        let time_anchor = ConfirmationTimeAnchor {
+                            anchor_block: height_anchor.anchor_block,
+                            confirmation_height,
+                            confirmation_time,
+                        };
+                        (time_anchor, txid)
+                    })
+                    .collect(),
+            }
+        };
+
+        Ok(LocalUpdate {
+            keychain: update.keychain,
+            graph: {
+                let mut graph = TxGraph::default();
+                graph.apply_additions(graph_additions);
+                graph
+            },
+            chain: update.chain,
+        })
+    }
+}
+
+pub trait ElectrumExt<A> {
+    fn get_tip(&self) -> Result<(u32, BlockHash), Error>;
+
+    fn scan<K: Ord + Clone>(
+        &self,
+        local_chain: &BTreeMap<u32, BlockHash>,
+        keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
+        txids: impl IntoIterator<Item = Txid>,
+        outpoints: impl IntoIterator<Item = OutPoint>,
+        stop_gap: usize,
+        batch_size: usize,
+    ) -> Result<ElectrumUpdate<K, A>, Error>;
+
+    fn scan_without_keychain(
+        &self,
+        local_chain: &BTreeMap<u32, BlockHash>,
+        misc_spks: impl IntoIterator<Item = Script>,
+        txids: impl IntoIterator<Item = Txid>,
+        outpoints: impl IntoIterator<Item = OutPoint>,
+        batch_size: usize,
+    ) -> Result<ElectrumUpdate<(), A>, Error> {
+        let spk_iter = misc_spks
+            .into_iter()
+            .enumerate()
+            .map(|(i, spk)| (i as u32, spk));
+
+        self.scan(
+            local_chain,
+            [((), spk_iter)].into(),
+            txids,
+            outpoints,
+            usize::MAX,
+            batch_size,
+        )
+    }
+}
+
+impl ElectrumExt<ConfirmationHeightAnchor> for Client {
+    fn get_tip(&self) -> Result<(u32, BlockHash), Error> {
+        // TODO: unsubscribe when added to the client, or is there a better call to use here?
+        self.block_headers_subscribe()
+            .map(|data| (data.height as u32, data.header.block_hash()))
+    }
+
+    fn scan<K: Ord + Clone>(
+        &self,
+        local_chain: &BTreeMap<u32, BlockHash>,
+        keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
+        txids: impl IntoIterator<Item = Txid>,
+        outpoints: impl IntoIterator<Item = OutPoint>,
+        stop_gap: usize,
+        batch_size: usize,
+    ) -> Result<ElectrumUpdate<K, ConfirmationHeightAnchor>, Error> {
+        let mut request_spks = keychain_spks
+            .into_iter()
+            .map(|(k, s)| (k, s.into_iter()))
+            .collect::<BTreeMap<K, _>>();
+        let mut scanned_spks = BTreeMap::<(K, u32), (Script, bool)>::new();
+
+        let txids = txids.into_iter().collect::<Vec<_>>();
+        let outpoints = outpoints.into_iter().collect::<Vec<_>>();
+
+        let update = loop {
+            let mut update = ElectrumUpdate::<K, ConfirmationHeightAnchor> {
+                chain_update: prepare_chain_update(self, local_chain)?,
+                ..Default::default()
+            };
+            let anchor_block = update
+                .chain_update
+                .tip()
+                .expect("must have atleast one block");
+
+            if !request_spks.is_empty() {
+                if !scanned_spks.is_empty() {
+                    let mut scanned_spk_iter = scanned_spks
+                        .iter()
+                        .map(|(i, (spk, _))| (i.clone(), spk.clone()));
+                    match populate_with_spks(
+                        self,
+                        anchor_block,
+                        &mut update,
+                        &mut scanned_spk_iter,
+                        stop_gap,
+                        batch_size,
+                    ) {
+                        Err(InternalError::Reorg) => continue,
+                        Err(InternalError::ElectrumError(e)) => return Err(e),
+                        Ok(mut spks) => scanned_spks.append(&mut spks),
+                    };
+                }
+                for (keychain, keychain_spks) in &mut request_spks {
+                    match populate_with_spks(
+                        self,
+                        anchor_block,
+                        &mut update,
+                        keychain_spks,
+                        stop_gap,
+                        batch_size,
+                    ) {
+                        Err(InternalError::Reorg) => continue,
+                        Err(InternalError::ElectrumError(e)) => return Err(e),
+                        Ok(spks) => scanned_spks.extend(
+                            spks.into_iter()
+                                .map(|(spk_i, spk)| ((keychain.clone(), spk_i), spk)),
+                        ),
+                    };
+                }
+            }
+
+            match populate_with_txids(self, anchor_block, &mut update, &mut txids.iter().cloned()) {
+                Err(InternalError::Reorg) => continue,
+                Err(InternalError::ElectrumError(e)) => return Err(e),
+                Ok(_) => {}
+            }
+
+            match populate_with_outpoints(
+                self,
+                anchor_block,
+                &mut update,
+                &mut outpoints.iter().cloned(),
+            ) {
+                Err(InternalError::Reorg) => continue,
+                Err(InternalError::ElectrumError(e)) => return Err(e),
+                Ok(_txs) => { /* [TODO] cache full txs to reduce bandwidth */ }
+            }
+
+            // check for reorgs during scan process
+            let server_blockhash = self
+                .block_header(anchor_block.height as usize)?
+                .block_hash();
+            if anchor_block.hash != server_blockhash {
+                continue; // reorg
+            }
+
+            update.keychain_update = request_spks
+                .into_keys()
+                .filter_map(|k| {
+                    scanned_spks
+                        .range((k.clone(), u32::MIN)..=(k.clone(), u32::MAX))
+                        .rev()
+                        .find(|(_, (_, active))| *active)
+                        .map(|((_, i), _)| (k, *i))
+                })
+                .collect::<BTreeMap<_, _>>();
+            break update;
+        };
+
+        Ok(update)
+    }
+}
+
+/// Prepare an update "template" based on the checkpoints of the `local_chain`.
+fn prepare_chain_update(
+    client: &Client,
+    local_chain: &BTreeMap<u32, BlockHash>,
+) -> Result<LocalChain, Error> {
+    let mut update = LocalChain::default();
+
+    // Find the local chain block that is still there so our update can connect to the local chain.
+    for (&existing_height, &existing_hash) in local_chain.iter().rev() {
+        // TODO: a batch request may be safer, as a reorg that happens when we are obtaining
+        //       `block_header`s will result in inconsistencies
+        let current_hash = client.block_header(existing_height as usize)?.block_hash();
+        let _ = update
+            .insert_block(BlockId {
+                height: existing_height,
+                hash: current_hash,
+            })
+            .expect("This never errors because we are working with a fresh chain");
+
+        if current_hash == existing_hash {
+            break;
+        }
+    }
+
+    // Insert the new tip so new transactions will be accepted into the sparsechain.
+    let tip = {
+        let (height, hash) = crate::get_tip(client)?;
+        BlockId { height, hash }
+    };
+    if update.insert_block(tip).is_err() {
+        // There has been a re-org before we even begin scanning addresses.
+        // Just recursively call (this should never happen).
+        return prepare_chain_update(client, local_chain);
+    }
+
+    Ok(update)
+}
+
+fn determine_tx_anchor(
+    anchor_block: BlockId,
+    raw_height: i32,
+    txid: Txid,
+) -> Option<ConfirmationHeightAnchor> {
+    if txid
+        == Txid::from_hex("4a5e1e4baab89f3a32518a88c31bc87f618f76673e2cc77ab2127b7afdeda33b")
+            .expect("must deserialize genesis coinbase txid")
+    {
+        return Some(ConfirmationHeightAnchor {
+            anchor_block,
+            confirmation_height: 0,
+        });
+    }
+    match raw_height {
+        h if h <= 0 => {
+            debug_assert!(h == 0 || h == -1, "unexpected height ({}) from electrum", h);
+            None
+        }
+        h => {
+            let h = h as u32;
+            if h > anchor_block.height {
+                None
+            } else {
+                Some(ConfirmationHeightAnchor {
+                    anchor_block,
+                    confirmation_height: h,
+                })
+            }
+        }
+    }
+}
+
+fn populate_with_outpoints<K>(
+    client: &Client,
+    anchor_block: BlockId,
+    update: &mut ElectrumUpdate<K, ConfirmationHeightAnchor>,
+    outpoints: &mut impl Iterator<Item = OutPoint>,
+) -> Result<HashMap<Txid, Transaction>, InternalError> {
+    let mut full_txs = HashMap::new();
+    for outpoint in outpoints {
+        let txid = outpoint.txid;
+        let tx = client.transaction_get(&txid)?;
+        debug_assert_eq!(tx.txid(), txid);
+        let txout = match tx.output.get(outpoint.vout as usize) {
+            Some(txout) => txout,
+            None => continue,
+        };
+        // attempt to find the following transactions (alongside their chain positions), and
+        // add to our sparsechain `update`:
+        let mut has_residing = false; // tx in which the outpoint resides
+        let mut has_spending = false; // tx that spends the outpoint
+        for res in client.script_get_history(&txout.script_pubkey)? {
+            if has_residing && has_spending {
+                break;
+            }
+
+            if res.tx_hash == txid {
+                if has_residing {
+                    continue;
+                }
+                has_residing = true;
+                full_txs.insert(res.tx_hash, tx.clone());
+            } else {
+                if has_spending {
+                    continue;
+                }
+                let res_tx = match full_txs.get(&res.tx_hash) {
+                    Some(tx) => tx,
+                    None => {
+                        let res_tx = client.transaction_get(&res.tx_hash)?;
+                        full_txs.insert(res.tx_hash, res_tx);
+                        full_txs.get(&res.tx_hash).expect("just inserted")
+                    }
+                };
+                has_spending = res_tx
+                    .input
+                    .iter()
+                    .any(|txin| txin.previous_output == outpoint);
+                if !has_spending {
+                    continue;
+                }
+            };
+
+            let anchor = determine_tx_anchor(anchor_block, res.height, res.tx_hash);
+
+            let tx_entry = update.graph_update.entry(res.tx_hash).or_default();
+            if let Some(anchor) = anchor {
+                tx_entry.insert(anchor);
+            }
+        }
+    }
+    Ok(full_txs)
+}
+
+fn populate_with_txids<K>(
+    client: &Client,
+    anchor_block: BlockId,
+    update: &mut ElectrumUpdate<K, ConfirmationHeightAnchor>,
+    txids: &mut impl Iterator<Item = Txid>,
+) -> Result<(), InternalError> {
+    for txid in txids {
+        let tx = match client.transaction_get(&txid) {
+            Ok(tx) => tx,
+            Err(electrum_client::Error::Protocol(_)) => continue,
+            Err(other_err) => return Err(other_err.into()),
+        };
+
+        let spk = tx
+            .output
+            .get(0)
+            .map(|txo| &txo.script_pubkey)
+            .expect("tx must have an output");
+
+        let anchor = match client
+            .script_get_history(spk)?
+            .into_iter()
+            .find(|r| r.tx_hash == txid)
+        {
+            Some(r) => determine_tx_anchor(anchor_block, r.height, txid),
+            None => continue,
+        };
+
+        let tx_entry = update.graph_update.entry(txid).or_default();
+        if let Some(anchor) = anchor {
+            tx_entry.insert(anchor);
+        }
+    }
+    Ok(())
+}
+
+fn populate_with_spks<K, I: Ord + Clone>(
+    client: &Client,
+    anchor_block: BlockId,
+    update: &mut ElectrumUpdate<K, ConfirmationHeightAnchor>,
+    spks: &mut impl Iterator<Item = (I, Script)>,
+    stop_gap: usize,
+    batch_size: usize,
+) -> Result<BTreeMap<I, (Script, bool)>, InternalError> {
+    let mut unused_spk_count = 0_usize;
+    let mut scanned_spks = BTreeMap::new();
+
+    loop {
+        let spks = (0..batch_size)
+            .map_while(|_| spks.next())
+            .collect::<Vec<_>>();
+        if spks.is_empty() {
+            return Ok(scanned_spks);
+        }
+
+        let spk_histories = client.batch_script_get_history(spks.iter().map(|(_, s)| s))?;
+
+        for ((spk_index, spk), spk_history) in spks.into_iter().zip(spk_histories) {
+            if spk_history.is_empty() {
+                scanned_spks.insert(spk_index, (spk, false));
+                unused_spk_count += 1;
+                if unused_spk_count > stop_gap {
+                    return Ok(scanned_spks);
+                }
+                continue;
+            } else {
+                scanned_spks.insert(spk_index, (spk, true));
+                unused_spk_count = 0;
+            }
+
+            for tx in spk_history {
+                let tx_entry = update.graph_update.entry(tx.tx_hash).or_default();
+                if let Some(anchor) = determine_tx_anchor(anchor_block, tx.height, tx.tx_hash) {
+                    tx_entry.insert(anchor);
+                }
+            }
+        }
+    }
+}