From a28076af9b1016d5456de6c2b81d502beb05737d Mon Sep 17 00:00:00 2001 From: Mehmet Efe Umit Date: Tue, 18 Nov 2025 22:14:22 -0800 Subject: [PATCH] refactor: move sync to a helper function This is a pre-requisite for adding Payjoin support. When the receiver sends the Payjoin proposal to the sender to be broadcasted, they need to sync the blockchain before checking if the Payjoin has indeed been broadcasted. To do that, the sync function will need to be shared between the two online commands. --- src/handlers.rs | 169 ++++++++++++++++++++++++++---------------------- 1 file changed, 92 insertions(+), 77 deletions(-) diff --git a/src/handlers.rs b/src/handlers.rs index 6c58a83..1882b33 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -683,83 +683,7 @@ pub(crate) async fn handle_online_wallet_subcommand( Ok(serde_json::to_string_pretty(&json!({}))?) } Sync => { - #[cfg(any(feature = "electrum", feature = "esplora"))] - let request = wallet - .start_sync_with_revealed_spks() - .inspect(|item, progress| { - let pc = (100 * progress.consumed()) as f32 / progress.total() as f32; - eprintln!("[ SCANNING {pc:03.0}% ] {item}"); - }); - match client { - #[cfg(feature = "electrum")] - Electrum { client, batch_size } => { - // Populate the electrum client's transaction cache so it doesn't re-download transaction we - // already have. - client - .populate_tx_cache(wallet.tx_graph().full_txs().map(|tx_node| tx_node.tx)); - - let update = client.sync(request, batch_size, false)?; - wallet.apply_update(update)?; - } - #[cfg(feature = "esplora")] - Esplora { - client, - parallel_requests, - } => { - let update = client - .sync(request, parallel_requests) - .await - .map_err(|e| *e)?; - wallet.apply_update(update)?; - } - #[cfg(feature = "rpc")] - RpcClient { client } => { - let blockchain_info = client.get_blockchain_info()?; - let wallet_cp = wallet.latest_checkpoint(); - - // reload the last 200 blocks in case of a reorg - let emitter_height = wallet_cp.height().saturating_sub(200); - let mut emitter = Emitter::new( - &*client, - wallet_cp, - emitter_height, - wallet - .tx_graph() - .list_canonical_txs( - wallet.local_chain(), - wallet.local_chain().tip().block_id(), - CanonicalizationParams::default(), - ) - .filter(|tx| tx.chain_position.is_unconfirmed()), - ); - - while let Some(block_event) = emitter.next_block()? { - if block_event.block_height() % 10_000 == 0 { - let percent_done = f64::from(block_event.block_height()) - / f64::from(blockchain_info.headers as u32) - * 100f64; - println!( - "Applying block at height: {}, {:.2}% done.", - block_event.block_height(), - percent_done - ); - } - - wallet.apply_block_connected_to( - &block_event.block, - block_event.block_height(), - block_event.connected_to(), - )?; - } - - let mempool_txs = emitter.mempool()?; - wallet.apply_unconfirmed_txs(mempool_txs.update); - } - #[cfg(feature = "cbf")] - KyotoClient { client } => { - sync_kyoto_client(wallet, client).await?; - } - } + sync_wallet(client, wallet).await?; Ok(serde_json::to_string_pretty(&json!({}))?) } Broadcast { psbt, tx } => { @@ -1325,6 +1249,97 @@ async fn respond( } } +#[cfg(any( + feature = "electrum", + feature = "esplora", + feature = "cbf", + feature = "rpc" +))] +/// Syncs a given wallet using the blockchain client. +pub async fn sync_wallet(client: BlockchainClient, wallet: &mut Wallet) -> Result<(), Error> { + #[cfg(any(feature = "electrum", feature = "esplora"))] + let request = wallet + .start_sync_with_revealed_spks() + .inspect(|item, progress| { + let pc = (100 * progress.consumed()) as f32 / progress.total() as f32; + eprintln!("[ SCANNING {pc:03.0}% ] {item}"); + }); + match client { + #[cfg(feature = "electrum")] + Electrum { client, batch_size } => { + // Populate the electrum client's transaction cache so it doesn't re-download transaction we + // already have. + client.populate_tx_cache(wallet.tx_graph().full_txs().map(|tx_node| tx_node.tx)); + + let update = client.sync(request, batch_size, false)?; + wallet + .apply_update(update) + .map_err(|e| Error::Generic(e.to_string())) + } + #[cfg(feature = "esplora")] + Esplora { + client, + parallel_requests, + } => { + let update = client + .sync(request, parallel_requests) + .await + .map_err(|e| *e)?; + wallet + .apply_update(update) + .map_err(|e| Error::Generic(e.to_string())) + } + #[cfg(feature = "rpc")] + RpcClient { client } => { + let blockchain_info = client.get_blockchain_info()?; + let wallet_cp = wallet.latest_checkpoint(); + + // reload the last 200 blocks in case of a reorg + let emitter_height = wallet_cp.height().saturating_sub(200); + let mut emitter = Emitter::new( + &*client, + wallet_cp, + emitter_height, + wallet + .tx_graph() + .list_canonical_txs( + wallet.local_chain(), + wallet.local_chain().tip().block_id(), + CanonicalizationParams::default(), + ) + .filter(|tx| tx.chain_position.is_unconfirmed()), + ); + + while let Some(block_event) = emitter.next_block()? { + if block_event.block_height() % 10_000 == 0 { + let percent_done = f64::from(block_event.block_height()) + / f64::from(blockchain_info.headers as u32) + * 100f64; + println!( + "Applying block at height: {}, {:.2}% done.", + block_event.block_height(), + percent_done + ); + } + + wallet.apply_block_connected_to( + &block_event.block, + block_event.block_height(), + block_event.connected_to(), + )?; + } + + let mempool_txs = emitter.mempool()?; + wallet.apply_unconfirmed_txs(mempool_txs.update); + Ok(()) + } + #[cfg(feature = "cbf")] + KyotoClient { client } => sync_kyoto_client(wallet, client) + .await + .map_err(|e| Error::Generic(e.to_string())), + } +} + #[cfg(feature = "repl")] fn readline() -> Result { write!(std::io::stdout(), "> ").map_err(|e| Error::Generic(e.to_string()))?; -- 2.49.0