]> Untitled Git - bdk/commitdiff
feat(bitcoind_rpc): introduce `bitcoind_rpc` crate
author志宇 <hello@evanlinjin.me>
Wed, 4 Oct 2023 10:11:17 +0000 (18:11 +0800)
committer志宇 <hello@evanlinjin.me>
Mon, 9 Oct 2023 14:14:03 +0000 (22:14 +0800)
Cargo.toml
crates/bitcoind_rpc/Cargo.toml [new file with mode: 0644]
crates/bitcoind_rpc/src/lib.rs [new file with mode: 0644]

index b20ef222d77814dbae9d7f7ff40206397b39a4e8..a5058ebc45352ffa8e0d2048dfe7401c3bf3c3b7 100644 (file)
@@ -6,6 +6,7 @@ members = [
     "crates/file_store",
     "crates/electrum",
     "crates/esplora",
+    "crates/bitcoind_rpc",
     "example-crates/example_cli",
     "example-crates/example_electrum",
     "example-crates/example_esplora",
diff --git a/crates/bitcoind_rpc/Cargo.toml b/crates/bitcoind_rpc/Cargo.toml
new file mode 100644 (file)
index 0000000..eeb9de5
--- /dev/null
@@ -0,0 +1,21 @@
+[package]
+name = "bdk_bitcoind_rpc"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+# For no-std, remember to enable the bitcoin/no-std feature
+bitcoin = { version = "0.30", default-features = false }
+bitcoincore-rpc = { version = "0.17" }
+
+[dev-dependencies]
+bdk_chain = { path = "../chain", version = "0.5", features = ["serde", "miniscript"] }
+bitcoind = { version = "0.33", features = ["25_0"] }
+anyhow = { version = "1" }
+
+[features]
+default = ["std"]
+std = ["bitcoin/std"]
+serde = ["bitcoin/serde"]
diff --git a/crates/bitcoind_rpc/src/lib.rs b/crates/bitcoind_rpc/src/lib.rs
new file mode 100644 (file)
index 0000000..a4b28c8
--- /dev/null
@@ -0,0 +1,243 @@
+//! This crate is used for emitting blockchain data from the `bitcoind` RPC interface (excluding the
+//! RPC wallet API).
+//!
+//! [`Emitter`] is the main structure which sources blockchain data from [`bitcoincore_rpc::Client`].
+//!
+//! To only get block updates (exclude mempool transactions), the caller can use
+//! [`Emitter::next_block`] or/and [`Emitter::next_header`] until it returns `Ok(None)` (which means
+//! the chain tip is reached). A separate method, [`Emitter::mempool`] can be used to emit the whole
+//! mempool.
+#![warn(missing_docs)]
+
+use std::collections::BTreeMap;
+
+use bitcoin::{block::Header, Block, BlockHash, Transaction};
+pub use bitcoincore_rpc;
+use bitcoincore_rpc::bitcoincore_rpc_json;
+
+/// A structure that emits data sourced from [`bitcoincore_rpc::Client`].
+///
+/// Refer to [module-level documentation] for more.
+///
+/// [module-level documentation]: crate
+pub struct Emitter<'c, C> {
+    client: &'c C,
+    start_height: u32,
+
+    emitted_blocks: BTreeMap<u32, BlockHash>,
+    last_block: Option<bitcoincore_rpc_json::GetBlockResult>,
+
+    /// The latest first-seen epoch of emitted mempool transactions. This is used to determine
+    /// whether a mempool transaction is already emitted.
+    last_mempool_time: usize,
+
+    /// The last emitted block during our last mempool emission. This is used to determine whether
+    /// there has been a reorg since our last mempool emission.
+    last_mempool_tip: Option<(u32, BlockHash)>,
+}
+
+impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
+    /// Constructs a new [`Emitter`] with the provided [`bitcoincore_rpc::Client`].
+    ///
+    /// `start_height` is the block height to start emitting blocks from.
+    pub fn new(client: &'c C, start_height: u32) -> Self {
+        Self {
+            client,
+            start_height,
+            emitted_blocks: BTreeMap::new(),
+            last_block: None,
+            last_mempool_time: 0,
+            last_mempool_tip: None,
+        }
+    }
+
+    /// Emit mempool transactions, alongside their first-seen unix timestamps.
+    ///
+    /// Ideally, this method would only emit the same transaction once. However, if the receiver
+    /// filters transactions based on whether it alters the output set of tracked script pubkeys,
+    /// there are situations where we would want to re-emit. For example, if an emitted mempool
+    /// transaction spends a tracked UTXO which is confirmed at height `h`, but the receiver has
+    /// only seen up to block of height `h-1`, we want to re-emit this transaction until the
+    /// receiver has seen the block at height `h`.
+    ///
+    /// In other words, we want to re-emit a transaction if we cannot guarantee it's ancestors are
+    /// already emitted.
+    pub fn mempool(&mut self) -> Result<Vec<(Transaction, u64)>, bitcoincore_rpc::Error> {
+        let client = self.client;
+
+        let prev_mempool_tip = match self.last_mempool_tip {
+            // use 'avoid-re-emission' logic if there is no reorg
+            Some((height, hash)) if self.emitted_blocks.get(&height) == Some(&hash) => height,
+            _ => 0,
+        };
+
+        let prev_mempool_time = self.last_mempool_time;
+        let mut latest_time = prev_mempool_time;
+
+        let txs_to_emit = client
+            .get_raw_mempool_verbose()?
+            .into_iter()
+            .filter_map({
+                let latest_time = &mut latest_time;
+                move |(txid, tx_entry)| -> Option<Result<_, bitcoincore_rpc::Error>> {
+                    let tx_time = tx_entry.time as usize;
+                    if tx_time > *latest_time {
+                        *latest_time = tx_time;
+                    }
+
+                    // Avoid emitting transactions that are already emitted if we can guarantee
+                    // blocks containing ancestors are already emitted. The bitcoind rpc interface
+                    // provides us with the block height that the tx is introduced to the mempool.
+                    // If we have already emitted the block of height, we can assume that all
+                    // ancestor txs have been processed by the receiver.
+                    let is_already_emitted = tx_time <= prev_mempool_time;
+                    let is_within_height = tx_entry.height <= prev_mempool_tip as _;
+                    if is_already_emitted && is_within_height {
+                        return None;
+                    }
+
+                    let tx = match client.get_raw_transaction(&txid, None) {
+                        Ok(tx) => tx,
+                        // the tx is confirmed or evicted since `get_raw_mempool_verbose`
+                        Err(err) if err.is_not_found_error() => return None,
+                        Err(err) => return Some(Err(err)),
+                    };
+
+                    Some(Ok((tx, tx_time as u64)))
+                }
+            })
+            .collect::<Result<Vec<_>, _>>()?;
+
+        self.last_mempool_time = latest_time;
+        self.last_mempool_tip = self
+            .emitted_blocks
+            .iter()
+            .last()
+            .map(|(&height, &hash)| (height, hash));
+
+        Ok(txs_to_emit)
+    }
+
+    /// Emit the next block height and header (if any).
+    pub fn next_header(&mut self) -> Result<Option<(u32, Header)>, bitcoincore_rpc::Error> {
+        poll(self, |hash| self.client.get_block_header(hash))
+    }
+
+    /// Emit the next block height and block (if any).
+    pub fn next_block(&mut self) -> Result<Option<(u32, Block)>, bitcoincore_rpc::Error> {
+        poll(self, |hash| self.client.get_block(hash))
+    }
+}
+
+enum PollResponse {
+    Block(bitcoincore_rpc_json::GetBlockResult),
+    NoMoreBlocks,
+    /// Fetched block is not in the best chain.
+    BlockNotInBestChain,
+    AgreementFound(bitcoincore_rpc_json::GetBlockResult),
+    AgreementPointNotFound,
+}
+
+fn poll_once<C>(emitter: &Emitter<C>) -> Result<PollResponse, bitcoincore_rpc::Error>
+where
+    C: bitcoincore_rpc::RpcApi,
+{
+    let client = emitter.client;
+
+    if let Some(last_res) = &emitter.last_block {
+        assert!(!emitter.emitted_blocks.is_empty());
+
+        let next_hash = match last_res.nextblockhash {
+            None => return Ok(PollResponse::NoMoreBlocks),
+            Some(next_hash) => next_hash,
+        };
+
+        let res = client.get_block_info(&next_hash)?;
+        if res.confirmations < 0 {
+            return Ok(PollResponse::BlockNotInBestChain);
+        }
+        return Ok(PollResponse::Block(res));
+    }
+
+    if emitter.emitted_blocks.is_empty() {
+        let hash = client.get_block_hash(emitter.start_height as _)?;
+
+        let res = client.get_block_info(&hash)?;
+        if res.confirmations < 0 {
+            return Ok(PollResponse::BlockNotInBestChain);
+        }
+        return Ok(PollResponse::Block(res));
+    }
+
+    for (&_, hash) in emitter.emitted_blocks.iter().rev() {
+        let res = client.get_block_info(hash)?;
+        if res.confirmations < 0 {
+            // block is not in best chain
+            continue;
+        }
+
+        // agreement point found
+        return Ok(PollResponse::AgreementFound(res));
+    }
+
+    Ok(PollResponse::AgreementPointNotFound)
+}
+
+fn poll<C, V, F>(
+    emitter: &mut Emitter<C>,
+    get_item: F,
+) -> Result<Option<(u32, V)>, bitcoincore_rpc::Error>
+where
+    C: bitcoincore_rpc::RpcApi,
+    F: Fn(&BlockHash) -> Result<V, bitcoincore_rpc::Error>,
+{
+    loop {
+        match poll_once(emitter)? {
+            PollResponse::Block(res) => {
+                let height = res.height as u32;
+                let item = get_item(&res.hash)?;
+                assert_eq!(emitter.emitted_blocks.insert(height, res.hash), None);
+                emitter.last_block = Some(res);
+                return Ok(Some((height, item)));
+            }
+            PollResponse::NoMoreBlocks => {
+                emitter.last_block = None;
+                return Ok(None);
+            }
+            PollResponse::BlockNotInBestChain => {
+                emitter.last_block = None;
+                continue;
+            }
+            PollResponse::AgreementFound(res) => {
+                emitter.emitted_blocks.split_off(&(res.height as u32 + 1));
+                emitter.last_block = Some(res);
+                continue;
+            }
+            PollResponse::AgreementPointNotFound => {
+                emitter.emitted_blocks.clear();
+                emitter.last_block = None;
+                continue;
+            }
+        }
+    }
+}
+
+/// Extends [`bitcoincore_rpc::Error`].
+pub trait BitcoindRpcErrorExt {
+    /// Returns whether the error is a "not found" error.
+    ///
+    /// This is useful since [`Emitter`] emits [`Result<_, bitcoincore_rpc::Error>`]s as
+    /// [`Iterator::Item`].
+    fn is_not_found_error(&self) -> bool;
+}
+
+impl BitcoindRpcErrorExt for bitcoincore_rpc::Error {
+    fn is_not_found_error(&self) -> bool {
+        if let bitcoincore_rpc::Error::JsonRpc(bitcoincore_rpc::jsonrpc::Error::Rpc(rpc_err)) = self
+        {
+            rpc_err.code == -5
+        } else {
+            false
+        }
+    }
+}