const DB_MAGIC: &[u8] = b"bdk_example_rpc";
const DB_PATH: &str = ".bdk_example_rpc.db";
+/// The mpsc channel bound for emissions from [`Emitter`].
const CHANNEL_BOUND: usize = 10;
/// Delay for printing status to stdout.
const STDOUT_PRINT_DELAY: Duration = Duration::from_secs(6);
/// Delay between mempool emissions.
const MEMPOOL_EMIT_DELAY: Duration = Duration::from_secs(30);
-/// Delay for commiting to persistance.
+/// Delay for committing to persistance.
const DB_COMMIT_DELAY: Duration = Duration::from_secs(60);
type ChangeSet = (
}
fn main() -> anyhow::Result<()> {
+ let start = Instant::now();
+
let (args, keymap, index, db, init_changeset) =
example_cli::init::<RpcCommands, RpcArgs, ChangeSet>(DB_MAGIC, DB_PATH)?;
+ println!(
+ "[{:>10}s] loaded initial changeset from db",
+ start.elapsed().as_secs_f32()
+ );
let graph = Mutex::new({
let mut graph = IndexedTxGraph::new(index);
graph.apply_changeset(init_changeset.1);
graph
});
- println!("loaded indexed tx graph from db");
+ println!(
+ "[{:>10}s] loaded indexed tx graph from changeset",
+ start.elapsed().as_secs_f32()
+ );
let chain = Mutex::new(LocalChain::from_changeset(init_changeset.0));
- println!("loaded local chain from db");
+ println!(
+ "[{:>10}s] loaded local chain from changeset",
+ start.elapsed().as_secs_f32()
+ );
let rpc_cmd = match args.command {
example_cli::Commands::ChainSpecific(rpc_cmd) => rpc_cmd,
..
} = rpc_args;
- let mut chain = chain.lock().unwrap();
- let mut graph = graph.lock().unwrap();
- let mut db = db.lock().unwrap();
-
- graph.index.set_lookahead_for_all(lookahead);
+ graph.lock().unwrap().index.set_lookahead_for_all(lookahead);
+ let chain_tip = chain.lock().unwrap().tip();
let rpc_client = rpc_args.new_client()?;
- let mut emitter = match chain.tip() {
+ let mut emitter = match chain_tip {
Some(cp) => Emitter::from_checkpoint(&rpc_client, cp),
None => Emitter::from_height(&rpc_client, fallback_height),
};
let mut last_print = Instant::now();
while let Some((height, block)) = emitter.next_block()? {
+ let mut chain = chain.lock().unwrap();
+ let mut graph = graph.lock().unwrap();
+ let mut db = db.lock().unwrap();
+
let chain_update =
CheckPoint::from_header(&block.header, height).into_update(false);
let chain_changeset = chain
last_db_commit = Instant::now();
db.commit()?;
println!(
- "commited to db (took {}s)",
+ "[{:>10}s] commited to db (took {}s)",
+ start.elapsed().as_secs_f32(),
last_db_commit.elapsed().as_secs_f32()
);
}
)
};
println!(
- "synced to {} @ {} | total: {} sats",
+ "[{:>10}s] synced to {} @ {} | total: {} sats",
+ start.elapsed().as_secs_f32(),
synced_to.hash(),
synced_to.height(),
balance.total()
}
}
- // mempool
let mempool_txs = emitter.mempool()?;
- let graph_changeset = graph.batch_insert_unconfirmed(mempool_txs);
- db.stage((local_chain::ChangeSet::default(), graph_changeset));
-
- // commit one last time!
- db.commit()?;
+ let graph_changeset = graph.lock().unwrap().batch_insert_relevant_unconfirmed(
+ mempool_txs.iter().map(|(tx, time)| (tx, *time)),
+ );
+ {
+ let mut db = db.lock().unwrap();
+ db.stage((local_chain::ChangeSet::default(), graph_changeset));
+ db.commit()?; // commit one last time
+ }
}
RpcCommands::Live { rpc_args } => {
let RpcArgs {
graph.lock().unwrap().index.set_lookahead_for_all(lookahead);
let last_cp = chain.lock().unwrap().tip();
+ println!(
+ "[{:>10}s] starting emitter thread...",
+ start.elapsed().as_secs_f32()
+ );
let (tx, rx) = std::sync::mpsc::sync_channel::<Emission>(CHANNEL_BOUND);
let emission_jh = std::thread::spawn(move || -> anyhow::Result<()> {
- println!("emitter thread started...");
-
let rpc_client = rpc_args.new_client()?;
let mut emitter = match last_cp {
Some(cp) => Emitter::from_checkpoint(&rpc_client, cp),
Ok(())
});
- let mut db = db.lock().unwrap();
- let mut graph = graph.lock().unwrap();
- let mut chain = chain.lock().unwrap();
let mut tip_height = 0_u32;
-
let mut last_db_commit = Instant::now();
let mut last_print = Option::<Instant>::None;
for emission in rx {
+ let mut db = db.lock().unwrap();
+ let mut graph = graph.lock().unwrap();
+ let mut chain = chain.lock().unwrap();
+
let changeset = match emission {
Emission::Block { height, block } => {
let chain_update =
last_db_commit = Instant::now();
db.commit()?;
println!(
- "commited to db (took {}s)",
+ "[{:>10}s] commited to db (took {}s)",
+ start.elapsed().as_secs_f32(),
last_db_commit.elapsed().as_secs_f32()
);
}
)
};
println!(
- "synced to {} @ {} / {} | total: {} sats",
+ "[{:>10}s] synced to {} @ {} / {} | total: {} sats",
+ start.elapsed().as_secs_f32(),
synced_to.hash(),
synced_to.height(),
tip_height,