]> Untitled Git - bdk/commitdiff
example_bitcoind_rpc: tweaks
author志宇 <hello@evanlinjin.me>
Sat, 7 Oct 2023 18:29:04 +0000 (02:29 +0800)
committer志宇 <hello@evanlinjin.me>
Mon, 9 Oct 2023 17:04:30 +0000 (01:04 +0800)
* avoid holding mutex lock over io
* document `CHANNEL_BOUND` const
* use the `relevant` variant of `batch_insert_unconfirmed`
* print elapsed time in stdout for various updates

example-crates/example_bitcoind_rpc_polling/src/main.rs

index ad77030aec85a7582e65f6ad2de43260cedd89a6..32735022db53a31296e8263639845e83d44be0fd 100644 (file)
@@ -26,12 +26,13 @@ use example_cli::{
 const DB_MAGIC: &[u8] = b"bdk_example_rpc";
 const DB_PATH: &str = ".bdk_example_rpc.db";
 
+/// The mpsc channel bound for emissions from [`Emitter`].
 const CHANNEL_BOUND: usize = 10;
 /// Delay for printing status to stdout.
 const STDOUT_PRINT_DELAY: Duration = Duration::from_secs(6);
 /// Delay between mempool emissions.
 const MEMPOOL_EMIT_DELAY: Duration = Duration::from_secs(30);
-/// Delay for commiting to persistance.
+/// Delay for committing to persistance.
 const DB_COMMIT_DELAY: Duration = Duration::from_secs(60);
 
 type ChangeSet = (
@@ -111,18 +112,30 @@ enum RpcCommands {
 }
 
 fn main() -> anyhow::Result<()> {
+    let start = Instant::now();
+
     let (args, keymap, index, db, init_changeset) =
         example_cli::init::<RpcCommands, RpcArgs, ChangeSet>(DB_MAGIC, DB_PATH)?;
+    println!(
+        "[{:>10}s] loaded initial changeset from db",
+        start.elapsed().as_secs_f32()
+    );
 
     let graph = Mutex::new({
         let mut graph = IndexedTxGraph::new(index);
         graph.apply_changeset(init_changeset.1);
         graph
     });
-    println!("loaded indexed tx graph from db");
+    println!(
+        "[{:>10}s] loaded indexed tx graph from changeset",
+        start.elapsed().as_secs_f32()
+    );
 
     let chain = Mutex::new(LocalChain::from_changeset(init_changeset.0));
-    println!("loaded local chain from db");
+    println!(
+        "[{:>10}s] loaded local chain from changeset",
+        start.elapsed().as_secs_f32()
+    );
 
     let rpc_cmd = match args.command {
         example_cli::Commands::ChainSpecific(rpc_cmd) => rpc_cmd,
@@ -153,14 +166,11 @@ fn main() -> anyhow::Result<()> {
                 ..
             } = rpc_args;
 
-            let mut chain = chain.lock().unwrap();
-            let mut graph = graph.lock().unwrap();
-            let mut db = db.lock().unwrap();
-
-            graph.index.set_lookahead_for_all(lookahead);
+            graph.lock().unwrap().index.set_lookahead_for_all(lookahead);
 
+            let chain_tip = chain.lock().unwrap().tip();
             let rpc_client = rpc_args.new_client()?;
-            let mut emitter = match chain.tip() {
+            let mut emitter = match chain_tip {
                 Some(cp) => Emitter::from_checkpoint(&rpc_client, cp),
                 None => Emitter::from_height(&rpc_client, fallback_height),
             };
@@ -169,6 +179,10 @@ fn main() -> anyhow::Result<()> {
             let mut last_print = Instant::now();
 
             while let Some((height, block)) = emitter.next_block()? {
+                let mut chain = chain.lock().unwrap();
+                let mut graph = graph.lock().unwrap();
+                let mut db = db.lock().unwrap();
+
                 let chain_update =
                     CheckPoint::from_header(&block.header, height).into_update(false);
                 let chain_changeset = chain
@@ -182,7 +196,8 @@ fn main() -> anyhow::Result<()> {
                     last_db_commit = Instant::now();
                     db.commit()?;
                     println!(
-                        "commited to db (took {}s)",
+                        "[{:>10}s] commited to db (took {}s)",
+                        start.elapsed().as_secs_f32(),
                         last_db_commit.elapsed().as_secs_f32()
                     );
                 }
@@ -200,7 +215,8 @@ fn main() -> anyhow::Result<()> {
                             )
                         };
                         println!(
-                            "synced to {} @ {} | total: {} sats",
+                            "[{:>10}s] synced to {} @ {} | total: {} sats",
+                            start.elapsed().as_secs_f32(),
                             synced_to.hash(),
                             synced_to.height(),
                             balance.total()
@@ -209,13 +225,15 @@ fn main() -> anyhow::Result<()> {
                 }
             }
 
-            // mempool
             let mempool_txs = emitter.mempool()?;
-            let graph_changeset = graph.batch_insert_unconfirmed(mempool_txs);
-            db.stage((local_chain::ChangeSet::default(), graph_changeset));
-
-            // commit one last time!
-            db.commit()?;
+            let graph_changeset = graph.lock().unwrap().batch_insert_relevant_unconfirmed(
+                mempool_txs.iter().map(|(tx, time)| (tx, *time)),
+            );
+            {
+                let mut db = db.lock().unwrap();
+                db.stage((local_chain::ChangeSet::default(), graph_changeset));
+                db.commit()?; // commit one last time
+            }
         }
         RpcCommands::Live { rpc_args } => {
             let RpcArgs {
@@ -228,10 +246,12 @@ fn main() -> anyhow::Result<()> {
             graph.lock().unwrap().index.set_lookahead_for_all(lookahead);
             let last_cp = chain.lock().unwrap().tip();
 
+            println!(
+                "[{:>10}s] starting emitter thread...",
+                start.elapsed().as_secs_f32()
+            );
             let (tx, rx) = std::sync::mpsc::sync_channel::<Emission>(CHANNEL_BOUND);
             let emission_jh = std::thread::spawn(move || -> anyhow::Result<()> {
-                println!("emitter thread started...");
-
                 let rpc_client = rpc_args.new_client()?;
                 let mut emitter = match last_cp {
                     Some(cp) => Emitter::from_checkpoint(&rpc_client, cp),
@@ -270,15 +290,15 @@ fn main() -> anyhow::Result<()> {
                 Ok(())
             });
 
-            let mut db = db.lock().unwrap();
-            let mut graph = graph.lock().unwrap();
-            let mut chain = chain.lock().unwrap();
             let mut tip_height = 0_u32;
-
             let mut last_db_commit = Instant::now();
             let mut last_print = Option::<Instant>::None;
 
             for emission in rx {
+                let mut db = db.lock().unwrap();
+                let mut graph = graph.lock().unwrap();
+                let mut chain = chain.lock().unwrap();
+
                 let changeset = match emission {
                     Emission::Block { height, block } => {
                         let chain_update =
@@ -307,7 +327,8 @@ fn main() -> anyhow::Result<()> {
                     last_db_commit = Instant::now();
                     db.commit()?;
                     println!(
-                        "commited to db (took {}s)",
+                        "[{:>10}s] commited to db (took {}s)",
+                        start.elapsed().as_secs_f32(),
                         last_db_commit.elapsed().as_secs_f32()
                     );
                 }
@@ -324,7 +345,8 @@ fn main() -> anyhow::Result<()> {
                             )
                         };
                         println!(
-                            "synced to {} @ {} / {} | total: {} sats",
+                            "[{:>10}s] synced to {} @ {} / {} | total: {} sats",
+                            start.elapsed().as_secs_f32(),
                             synced_to.hash(),
                             synced_to.height(),
                             tip_height,