]> Untitled Git - bdk/commitdiff
feat(example): use changeset staging with rpc polling example
author志宇 <hello@evanlinjin.me>
Thu, 13 Jun 2024 14:36:12 +0000 (22:36 +0800)
committerSteve Myers <steve@notmandatory.org>
Thu, 13 Jun 2024 17:58:39 +0000 (12:58 -0500)
example-crates/example_bitcoind_rpc_polling/src/main.rs

index 55ff6f007e79409d5dbf334c16592530816dc8ea..76b9ad7992f393fd8e5eb92dfd8148e9ffa74224 100644 (file)
@@ -11,12 +11,12 @@ use bdk_bitcoind_rpc::{
     bitcoincore_rpc::{Auth, Client, RpcApi},
     Emitter,
 };
-use bdk_chain::persist::PersistBackend;
+use bdk_chain::persist::{PersistBackend, StageExt};
 use bdk_chain::{
     bitcoin::{constants::genesis_block, Block, Transaction},
     indexed_tx_graph, keychain,
     local_chain::{self, LocalChain},
-    ConfirmationTimeHeightAnchor, IndexedTxGraph,
+    Append, ConfirmationTimeHeightAnchor, IndexedTxGraph,
 };
 use example_cli::{
     anyhow,
@@ -176,6 +176,7 @@ fn main() -> anyhow::Result<()> {
             let chain_tip = chain.lock().unwrap().tip();
             let rpc_client = rpc_args.new_client()?;
             let mut emitter = Emitter::new(&rpc_client, chain_tip, fallback_height);
+            let mut db_stage = ChangeSet::default();
 
             let mut last_db_commit = Instant::now();
             let mut last_print = Instant::now();
@@ -185,17 +186,18 @@ fn main() -> anyhow::Result<()> {
 
                 let mut chain = chain.lock().unwrap();
                 let mut graph = graph.lock().unwrap();
-                let mut db = db.lock().unwrap();
 
                 let chain_changeset = chain
                     .apply_update(emission.checkpoint)
                     .expect("must always apply as we receive blocks in order from emitter");
                 let graph_changeset = graph.apply_block_relevant(&emission.block, height);
-                db.write_changes(&(chain_changeset, graph_changeset))?;
+                db_stage.append((chain_changeset, graph_changeset));
 
                 // commit staged db changes in intervals
                 if last_db_commit.elapsed() >= DB_COMMIT_DELAY {
+                    let db = &mut *db.lock().unwrap();
                     last_db_commit = Instant::now();
+                    db_stage.commit_to(db)?;
                     println!(
                         "[{:>10}s] committed to db (took {}s)",
                         start.elapsed().as_secs_f32(),
@@ -230,8 +232,11 @@ fn main() -> anyhow::Result<()> {
                 mempool_txs.iter().map(|(tx, time)| (tx, *time)),
             );
             {
-                let mut db = db.lock().unwrap();
-                db.write_changes(&(local_chain::ChangeSet::default(), graph_changeset))?;
+                let db = &mut *db.lock().unwrap();
+                db_stage.append_and_commit_to(
+                    (local_chain::ChangeSet::default(), graph_changeset),
+                    db,
+                )?;
             }
         }
         RpcCommands::Live { rpc_args } => {
@@ -287,9 +292,9 @@ fn main() -> anyhow::Result<()> {
             let mut tip_height = 0_u32;
             let mut last_db_commit = Instant::now();
             let mut last_print = Option::<Instant>::None;
+            let mut db_stage = ChangeSet::default();
 
             for emission in rx {
-                let mut db = db.lock().unwrap();
                 let mut graph = graph.lock().unwrap();
                 let mut chain = chain.lock().unwrap();
 
@@ -314,11 +319,12 @@ fn main() -> anyhow::Result<()> {
                         continue;
                     }
                 };
-
-                db.write_changes(&changeset)?;
+                db_stage.append(changeset);
 
                 if last_db_commit.elapsed() >= DB_COMMIT_DELAY {
+                    let db = &mut *db.lock().unwrap();
                     last_db_commit = Instant::now();
+                    db_stage.commit_to(db)?;
                     println!(
                         "[{:>10}s] committed to db (took {}s)",
                         start.elapsed().as_secs_f32(),