]> Untitled Git - bdk/commitdiff
feat(example_bitcoind_rpc_polling): add example for RPC polling
author志宇 <hello@evanlinjin.me>
Wed, 4 Oct 2023 10:22:03 +0000 (18:22 +0800)
committer志宇 <hello@evanlinjin.me>
Mon, 9 Oct 2023 14:14:03 +0000 (22:14 +0800)
Cargo.toml
example-crates/example_bitcoind_rpc_polling/Cargo.toml [new file with mode: 0644]
example-crates/example_bitcoind_rpc_polling/src/main.rs [new file with mode: 0644]

index a5058ebc45352ffa8e0d2048dfe7401c3bf3c3b7..0e1efc902f5b45e3d73f009c93473a1a7b504aa2 100644 (file)
@@ -10,6 +10,7 @@ members = [
     "example-crates/example_cli",
     "example-crates/example_electrum",
     "example-crates/example_esplora",
+    "example-crates/example_bitcoind_rpc_polling",
     "example-crates/wallet_electrum",
     "example-crates/wallet_esplora_blocking",
     "example-crates/wallet_esplora_async",
diff --git a/example-crates/example_bitcoind_rpc_polling/Cargo.toml b/example-crates/example_bitcoind_rpc_polling/Cargo.toml
new file mode 100644 (file)
index 0000000..6728bb1
--- /dev/null
@@ -0,0 +1,12 @@
+[package]
+name = "example_bitcoind_rpc_polling"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+bdk_chain = { path = "../../crates/chain", features = ["serde"] }
+bdk_bitcoind_rpc = { path = "../../crates/bitcoind_rpc" }
+example_cli = { path = "../example_cli" }
+ctrlc = { version = "^2" }
diff --git a/example-crates/example_bitcoind_rpc_polling/src/main.rs b/example-crates/example_bitcoind_rpc_polling/src/main.rs
new file mode 100644 (file)
index 0000000..6fb557f
--- /dev/null
@@ -0,0 +1,366 @@
+use std::{
+    path::PathBuf,
+    sync::{
+        atomic::{AtomicBool, Ordering},
+        Arc, Mutex,
+    },
+    time::{Duration, Instant},
+};
+
+use bdk_bitcoind_rpc::{
+    bitcoincore_rpc::{Auth, Client, RpcApi},
+    Emitter,
+};
+use bdk_chain::{
+    bitcoin::{Block, Transaction},
+    indexed_tx_graph, keychain,
+    local_chain::{self, CheckPoint, LocalChain},
+    ConfirmationTimeAnchor, IndexedTxGraph,
+};
+use example_cli::{
+    anyhow,
+    clap::{self, Args, Subcommand},
+    Keychain,
+};
+
+const DB_MAGIC: &[u8] = b"bdk_example_rpc";
+const DB_PATH: &str = ".bdk_example_rpc.db";
+
+const CHANNEL_BOUND: usize = 10;
+/// The block depth which we assume no reorgs can happen at.
+const ASSUME_FINAL_DEPTH: u32 = 6;
+/// Delay for printing status to stdout.
+const STDOUT_PRINT_DELAY: Duration = Duration::from_secs(6);
+/// Delay between mempool emissions.
+const MEMPOOL_EMIT_DELAY: Duration = Duration::from_secs(30);
+/// Delay for commiting to persistance.
+const DB_COMMIT_DELAY: Duration = Duration::from_secs(60);
+
+type ChangeSet = (
+    local_chain::ChangeSet,
+    indexed_tx_graph::ChangeSet<ConfirmationTimeAnchor, keychain::ChangeSet<Keychain>>,
+);
+
+#[derive(Debug)]
+enum Emission {
+    Block { height: u32, block: Block },
+    Mempool(Vec<(Transaction, u64)>),
+    Tip(u32),
+}
+
+#[derive(Args, Debug, Clone)]
+struct RpcArgs {
+    /// RPC URL
+    #[clap(env = "RPC_URL", long, default_value = "127.0.0.1:8332")]
+    url: String,
+    /// RPC auth cookie file
+    #[clap(env = "RPC_COOKIE", long)]
+    rpc_cookie: Option<PathBuf>,
+    /// RPC auth username
+    #[clap(env = "RPC_USER", long)]
+    rpc_user: Option<String>,
+    /// RPC auth password
+    #[clap(env = "RPC_PASS", long)]
+    rpc_password: Option<String>,
+    /// Starting block height to fallback to if no point of agreement if found
+    #[clap(env = "FALLBACK_HEIGHT", long, default_value = "0")]
+    fallback_height: u32,
+    /// The unused-scripts lookahead will be kept at this size
+    #[clap(long, default_value = "10")]
+    lookahead: u32,
+}
+
+impl From<RpcArgs> for Auth {
+    fn from(args: RpcArgs) -> Self {
+        match (args.rpc_cookie, args.rpc_user, args.rpc_password) {
+            (None, None, None) => Self::None,
+            (Some(path), _, _) => Self::CookieFile(path),
+            (_, Some(user), Some(pass)) => Self::UserPass(user, pass),
+            (_, Some(_), None) => panic!("rpc auth: missing rpc_pass"),
+            (_, None, Some(_)) => panic!("rpc auth: missing rpc_user"),
+        }
+    }
+}
+
+impl RpcArgs {
+    fn new_client(&self) -> anyhow::Result<Client> {
+        Ok(Client::new(
+            &self.url,
+            match (&self.rpc_cookie, &self.rpc_user, &self.rpc_password) {
+                (None, None, None) => Auth::None,
+                (Some(path), _, _) => Auth::CookieFile(path.clone()),
+                (_, Some(user), Some(pass)) => Auth::UserPass(user.clone(), pass.clone()),
+                (_, Some(_), None) => panic!("rpc auth: missing rpc_pass"),
+                (_, None, Some(_)) => panic!("rpc auth: missing rpc_user"),
+            },
+        )?)
+    }
+}
+
+#[derive(Subcommand, Debug, Clone)]
+enum RpcCommands {
+    /// Syncs local state with remote state via RPC (starting from last point of agreement) and
+    /// stores/indexes relevant transactions
+    Sync {
+        #[clap(flatten)]
+        rpc_args: RpcArgs,
+    },
+    /// Sync by having the emitter logic in a separate thread
+    Live {
+        #[clap(flatten)]
+        rpc_args: RpcArgs,
+    },
+}
+
+fn main() -> anyhow::Result<()> {
+    let (args, keymap, index, db, init_changeset) =
+        example_cli::init::<RpcCommands, RpcArgs, ChangeSet>(DB_MAGIC, DB_PATH)?;
+
+    let graph = Mutex::new({
+        let mut graph = IndexedTxGraph::new(index);
+        graph.apply_changeset(init_changeset.1);
+        graph
+    });
+    println!("loaded indexed tx graph from db");
+
+    let chain = Mutex::new(LocalChain::from_changeset(init_changeset.0));
+    println!("loaded local chain from db");
+
+    let rpc_cmd = match args.command {
+        example_cli::Commands::ChainSpecific(rpc_cmd) => rpc_cmd,
+        general_cmd => {
+            let res = example_cli::handle_commands(
+                &graph,
+                &db,
+                &chain,
+                &keymap,
+                args.network,
+                |rpc_args, tx| {
+                    let client = rpc_args.new_client()?;
+                    client.send_raw_transaction(tx)?;
+                    Ok(())
+                },
+                general_cmd,
+            );
+            db.lock().unwrap().commit()?;
+            return res;
+        }
+    };
+
+    match rpc_cmd {
+        RpcCommands::Sync { rpc_args } => {
+            let RpcArgs {
+                fallback_height,
+                lookahead,
+                ..
+            } = rpc_args;
+
+            let mut chain = chain.lock().unwrap();
+            let mut graph = graph.lock().unwrap();
+            let mut db = db.lock().unwrap();
+
+            graph.index.set_lookahead_for_all(lookahead);
+            // we start at a height lower than last-seen tip in case of reorgs
+            let start_height = chain.tip().as_ref().map_or(fallback_height, |cp| {
+                cp.height().saturating_sub(ASSUME_FINAL_DEPTH)
+            });
+
+            let rpc_client = rpc_args.new_client()?;
+            let mut emitter = Emitter::new(&rpc_client, start_height);
+
+            let mut last_db_commit = Instant::now();
+            let mut last_print = Instant::now();
+
+            while let Some((height, block)) = emitter.next_block()? {
+                let chain_update =
+                    CheckPoint::from_header(&block.header, height).into_update(false);
+                let chain_changeset = chain.apply_update(chain_update)?;
+                let graph_changeset = graph.apply_block_relevant(block, height);
+                db.stage((chain_changeset, graph_changeset));
+
+                // commit staged db changes in intervals
+                if last_db_commit.elapsed() >= DB_COMMIT_DELAY {
+                    last_db_commit = Instant::now();
+                    db.commit()?;
+                    println!(
+                        "commited to db (took {}s)",
+                        last_db_commit.elapsed().as_secs_f32()
+                    );
+                }
+
+                // print synced-to height and current balance in intervals
+                if last_print.elapsed() >= STDOUT_PRINT_DELAY {
+                    last_print = Instant::now();
+                    if let Some(synced_to) = chain.tip() {
+                        let balance = {
+                            graph.graph().balance(
+                                &*chain,
+                                synced_to.block_id(),
+                                graph.index.outpoints().iter().cloned(),
+                                |(k, _), _| k == &Keychain::Internal,
+                            )
+                        };
+                        println!(
+                            "synced to {} @ {} | total: {} sats",
+                            synced_to.hash(),
+                            synced_to.height(),
+                            balance.total()
+                        );
+                    }
+                }
+            }
+
+            // mempool
+            let mempool_txs = emitter.mempool()?;
+            let graph_changeset = graph
+                .batch_insert_unconfirmed(mempool_txs.iter().map(|(tx, time)| (tx, Some(*time))));
+            db.stage((local_chain::ChangeSet::default(), graph_changeset));
+
+            // commit one last time!
+            db.commit()?;
+        }
+        RpcCommands::Live { rpc_args } => {
+            let RpcArgs {
+                fallback_height,
+                lookahead,
+                ..
+            } = rpc_args;
+            let sigterm_flag = start_ctrlc_handler();
+
+            graph.lock().unwrap().index.set_lookahead_for_all(lookahead);
+            // we start at a height lower than last-seen tip in case of reorgs
+            let start_height = chain.lock().unwrap().tip().map_or(fallback_height, |cp| {
+                cp.height().saturating_sub(ASSUME_FINAL_DEPTH)
+            });
+
+            let (tx, rx) = std::sync::mpsc::sync_channel::<Emission>(CHANNEL_BOUND);
+            let emission_jh = std::thread::spawn(move || -> anyhow::Result<()> {
+                println!("emitter thread started...");
+
+                let rpc_client = rpc_args.new_client()?;
+                let mut emitter = Emitter::new(&rpc_client, start_height);
+
+                let mut block_count = rpc_client.get_block_count()? as u32;
+                tx.send(Emission::Tip(block_count))?;
+
+                loop {
+                    match emitter.next_block()? {
+                        Some((height, block)) => {
+                            if sigterm_flag.load(Ordering::Acquire) {
+                                break;
+                            }
+                            if height > block_count {
+                                block_count = rpc_client.get_block_count()? as u32;
+                                tx.send(Emission::Tip(block_count))?;
+                            }
+                            tx.send(Emission::Block { height, block })?;
+                        }
+                        None => {
+                            if await_flag(&sigterm_flag, MEMPOOL_EMIT_DELAY) {
+                                break;
+                            }
+                            println!("preparing mempool emission...");
+                            let now = Instant::now();
+                            tx.send(Emission::Mempool(emitter.mempool()?))?;
+                            println!("mempool emission prepared in {}s", now.elapsed().as_secs());
+                            continue;
+                        }
+                    };
+                }
+
+                println!("emitter thread shutting down...");
+                Ok(())
+            });
+
+            let mut db = db.lock().unwrap();
+            let mut graph = graph.lock().unwrap();
+            let mut chain = chain.lock().unwrap();
+            let mut tip_height = 0_u32;
+
+            let mut last_db_commit = Instant::now();
+            let mut last_print = Option::<Instant>::None;
+
+            for emission in rx {
+                let changeset = match emission {
+                    Emission::Block { height, block } => {
+                        let chain_update =
+                            CheckPoint::from_header(&block.header, height).into_update(false);
+                        let chain_changeset = chain.apply_update(chain_update)?;
+                        let graph_changeset = graph.apply_block_relevant(block, height);
+                        (chain_changeset, graph_changeset)
+                    }
+                    Emission::Mempool(mempool_txs) => {
+                        let graph_changeset = graph.batch_insert_relevant_unconfirmed(
+                            mempool_txs.iter().map(|(tx, time)| (tx, Some(*time))),
+                        );
+                        (local_chain::ChangeSet::default(), graph_changeset)
+                    }
+                    Emission::Tip(h) => {
+                        tip_height = h;
+                        continue;
+                    }
+                };
+
+                db.stage(changeset);
+
+                if last_db_commit.elapsed() >= DB_COMMIT_DELAY {
+                    last_db_commit = Instant::now();
+                    db.commit()?;
+                    println!(
+                        "commited to db (took {}s)",
+                        last_db_commit.elapsed().as_secs_f32()
+                    );
+                }
+
+                if last_print.map_or(Duration::MAX, |i| i.elapsed()) >= STDOUT_PRINT_DELAY {
+                    last_print = Some(Instant::now());
+                    if let Some(synced_to) = chain.tip() {
+                        let balance = {
+                            graph.graph().balance(
+                                &*chain,
+                                synced_to.block_id(),
+                                graph.index.outpoints().iter().cloned(),
+                                |(k, _), _| k == &Keychain::Internal,
+                            )
+                        };
+                        println!(
+                            "synced to {} @ {} / {} | total: {} sats",
+                            synced_to.hash(),
+                            synced_to.height(),
+                            tip_height,
+                            balance.total()
+                        );
+                    }
+                }
+            }
+
+            emission_jh.join().expect("must join emitter thread")?;
+        }
+    }
+
+    Ok(())
+}
+
+#[allow(dead_code)]
+fn start_ctrlc_handler() -> Arc<AtomicBool> {
+    let flag = Arc::new(AtomicBool::new(false));
+    let cloned_flag = flag.clone();
+
+    ctrlc::set_handler(move || cloned_flag.store(true, Ordering::Release));
+
+    flag
+}
+
+#[allow(dead_code)]
+fn await_flag(flag: &AtomicBool, duration: Duration) -> bool {
+    let start = Instant::now();
+    loop {
+        if flag.load(Ordering::Acquire) {
+            return true;
+        }
+        if start.elapsed() >= duration {
+            return false;
+        }
+        std::thread::sleep(Duration::from_secs(1));
+    }
+}