From: 志宇 Date: Tue, 9 May 2023 01:59:42 +0000 (+0800) Subject: [persist_redesign] Introduce redesigned `persist` types X-Git-Tag: v1.0.0-alpha.1~13^2~1 X-Git-Url: http://internal-gitweb-vhost/script/%22https:/struct.CodeLengthError.html?a=commitdiff_plain;h=2aa08a5898545f670df9ed9c4804231f321d811a;p=bdk [persist_redesign] Introduce redesigned `persist` types This is a more generic version of `keychain::persist::*` structures. Additional changes: * The `Append` trait has a new method `is_empty`. * Introduce `Store` structure for `bdk_file_store`. --- diff --git a/crates/chain/src/indexed_tx_graph.rs b/crates/chain/src/indexed_tx_graph.rs index c550d86f..7ab0ffa8 100644 --- a/crates/chain/src/indexed_tx_graph.rs +++ b/crates/chain/src/indexed_tx_graph.rs @@ -301,6 +301,10 @@ impl Append for IndexedAdditions { self.graph_additions.append(other.graph_additions); self.index_additions.append(other.index_additions); } + + fn is_empty(&self) -> bool { + self.graph_additions.is_empty() && self.index_additions.is_empty() + } } /// Represents a structure that can index transaction data. diff --git a/crates/chain/src/keychain.rs b/crates/chain/src/keychain.rs index 81503049..f4d398ab 100644 --- a/crates/chain/src/keychain.rs +++ b/crates/chain/src/keychain.rs @@ -84,6 +84,10 @@ impl Append for DerivationAdditions { self.0.append(&mut other.0); } + + fn is_empty(&self) -> bool { + self.0.is_empty() + } } impl Default for DerivationAdditions { diff --git a/crates/chain/src/lib.rs b/crates/chain/src/lib.rs index cf3cda3b..cbadf170 100644 --- a/crates/chain/src/lib.rs +++ b/crates/chain/src/lib.rs @@ -33,6 +33,8 @@ pub mod tx_graph; pub use tx_data_traits::*; mod chain_oracle; pub use chain_oracle::*; +mod persist; +pub use persist::*; #[doc(hidden)] pub mod example_utils; diff --git a/crates/chain/src/persist.rs b/crates/chain/src/persist.rs new file mode 100644 index 00000000..188f88f2 --- /dev/null +++ b/crates/chain/src/persist.rs @@ -0,0 +1,89 @@ +use core::convert::Infallible; + +use crate::Append; + +/// `Persist` wraps a [`PersistBackend`] (`B`) to create a convenient staging area for changes (`C`) +/// before they are persisted. +/// +/// Not all changes to the in-memory representation needs to be written to disk right away, so +/// [`Persist::stage`] can be used to *stage* changes first and then [`Persist::commit`] can be used +/// to write changes to disk. +#[derive(Debug)] +pub struct Persist { + backend: B, + stage: C, +} + +impl Persist +where + B: PersistBackend, + C: Default + Append, +{ + /// Create a new [`Persist`] from [`PersistBackend`]. + pub fn new(backend: B) -> Self { + Self { + backend, + stage: Default::default(), + } + } + + /// Stage a `changeset` to be commited later with [`commit`]. + /// + /// [`commit`]: Self::commit + pub fn stage(&mut self, changeset: C) { + self.stage.append(changeset) + } + + /// Get the changes that have not been commited yet. + pub fn staged(&self) -> &C { + &self.stage + } + + /// Commit the staged changes to the underlying persistance backend. + /// + /// Returns a backend-defined error if this fails. + pub fn commit(&mut self) -> Result<(), B::WriteError> { + let mut temp = C::default(); + core::mem::swap(&mut temp, &mut self.stage); + self.backend.write_changes(&temp) + } +} + +/// A persistence backend for [`Persist`]. +/// +/// `C` represents the changeset; a datatype that records changes made to in-memory data structures +/// that are to be persisted, or retrieved from persistence. +pub trait PersistBackend { + /// The error the backend returns when it fails to write. + type WriteError: core::fmt::Debug; + + /// The error the backend returns when it fails to load changesets `C`. + type LoadError: core::fmt::Debug; + + /// Writes a changeset to the persistence backend. + /// + /// It is up to the backend what it does with this. It could store every changeset in a list or + /// it inserts the actual changes into a more structured database. All it needs to guarantee is + /// that [`load_from_persistence`] restores a keychain tracker to what it should be if all + /// changesets had been applied sequentially. + /// + /// [`load_from_persistence`]: Self::load_from_persistence + fn write_changes(&mut self, changeset: &C) -> Result<(), Self::WriteError>; + + /// Return the aggregate changeset `C` from persistence. + fn load_from_persistence(&mut self) -> Result; +} + +impl PersistBackend for () { + type WriteError = Infallible; + + type LoadError = Infallible; + + fn write_changes(&mut self, _changeset: &C) -> Result<(), Self::WriteError> { + Ok(()) + } + + fn load_from_persistence(&mut self) -> Result { + Ok(C::default()) + } +} diff --git a/crates/chain/src/tx_data_traits.rs b/crates/chain/src/tx_data_traits.rs index 8ec695ad..bd7f138f 100644 --- a/crates/chain/src/tx_data_traits.rs +++ b/crates/chain/src/tx_data_traits.rs @@ -64,20 +64,35 @@ impl Anchor for &'static A { pub trait Append { /// Append another object of the same type onto `self`. fn append(&mut self, other: Self); + + /// Returns whether the structure is considered empty. + fn is_empty(&self) -> bool; } impl Append for () { fn append(&mut self, _other: Self) {} + + fn is_empty(&self) -> bool { + true + } } impl Append for BTreeMap { fn append(&mut self, mut other: Self) { BTreeMap::append(self, &mut other) } + + fn is_empty(&self) -> bool { + BTreeMap::is_empty(self) + } } impl Append for BTreeSet { fn append(&mut self, mut other: Self) { BTreeSet::append(self, &mut other) } + + fn is_empty(&self) -> bool { + BTreeSet::is_empty(self) + } } diff --git a/crates/chain/src/tx_graph.rs b/crates/chain/src/tx_graph.rs index e75255e4..ef3f3847 100644 --- a/crates/chain/src/tx_graph.rs +++ b/crates/chain/src/tx_graph.rs @@ -940,6 +940,13 @@ impl Append for Additions { .collect::>(), ); } + + fn is_empty(&self) -> bool { + self.tx.is_empty() + && self.txout.is_empty() + && self.anchors.is_empty() + && self.last_seen.is_empty() + } } impl AsRef> for TxGraph { diff --git a/crates/file_store/src/entry_iter.rs b/crates/file_store/src/entry_iter.rs new file mode 100644 index 00000000..770f264f --- /dev/null +++ b/crates/file_store/src/entry_iter.rs @@ -0,0 +1,100 @@ +use bincode::Options; +use std::{ + fs::File, + io::{self, Seek}, + marker::PhantomData, +}; + +use crate::bincode_options; + +/// Iterator over entries in a file store. +/// +/// Reads and returns an entry each time [`next`] is called. If an error occurs while reading the +/// iterator will yield a `Result::Err(_)` instead and then `None` for the next call to `next`. +/// +/// [`next`]: Self::next +pub struct EntryIter<'t, T> { + db_file: Option<&'t mut File>, + + /// The file position for the first read of `db_file`. + start_pos: Option, + types: PhantomData, +} + +impl<'t, T> EntryIter<'t, T> { + pub fn new(start_pos: u64, db_file: &'t mut File) -> Self { + Self { + db_file: Some(db_file), + start_pos: Some(start_pos), + types: PhantomData, + } + } +} + +impl<'t, T> Iterator for EntryIter<'t, T> +where + T: serde::de::DeserializeOwned, +{ + type Item = Result; + + fn next(&mut self) -> Option { + // closure which reads a single entry starting from `self.pos` + let read_one = |f: &mut File, start_pos: Option| -> Result, IterError> { + let pos = match start_pos { + Some(pos) => f.seek(io::SeekFrom::Start(pos))?, + None => f.stream_position()?, + }; + + match bincode_options().deserialize_from(&*f) { + Ok(changeset) => { + f.stream_position()?; + Ok(Some(changeset)) + } + Err(e) => { + if let bincode::ErrorKind::Io(inner) = &*e { + if inner.kind() == io::ErrorKind::UnexpectedEof { + let eof = f.seek(io::SeekFrom::End(0))?; + if pos == eof { + return Ok(None); + } + } + } + f.seek(io::SeekFrom::Start(pos))?; + Err(IterError::Bincode(*e)) + } + } + }; + + let result = read_one(self.db_file.as_mut()?, self.start_pos.take()); + if result.is_err() { + self.db_file = None; + } + result.transpose() + } +} + +impl From for IterError { + fn from(value: io::Error) -> Self { + IterError::Io(value) + } +} + +/// Error type for [`EntryIter`]. +#[derive(Debug)] +pub enum IterError { + /// Failure to read from the file. + Io(io::Error), + /// Failure to decode data from the file. + Bincode(bincode::ErrorKind), +} + +impl core::fmt::Display for IterError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + IterError::Io(e) => write!(f, "io error trying to read entry {}", e), + IterError::Bincode(e) => write!(f, "bincode error while reading entry {}", e), + } + } +} + +impl std::error::Error for IterError {} diff --git a/crates/file_store/src/file_store.rs b/crates/file_store/src/file_store.rs deleted file mode 100644 index 824e3ccc..00000000 --- a/crates/file_store/src/file_store.rs +++ /dev/null @@ -1,404 +0,0 @@ -//! Module for persisting data on disk. -//! -//! The star of the show is [`KeychainStore`], which maintains an append-only file of -//! [`KeychainChangeSet`]s which can be used to restore a [`KeychainTracker`]. -use bdk_chain::{ - keychain::{KeychainChangeSet, KeychainTracker}, - sparse_chain, -}; -use bincode::{DefaultOptions, Options}; -use core::marker::PhantomData; -use std::{ - fs::{File, OpenOptions}, - io::{self, Read, Seek, Write}, - path::Path, -}; - -/// BDK File Store magic bytes length. -const MAGIC_BYTES_LEN: usize = 12; - -/// BDK File Store magic bytes. -const MAGIC_BYTES: [u8; MAGIC_BYTES_LEN] = [98, 100, 107, 102, 115, 48, 48, 48, 48, 48, 48, 48]; - -/// Persists an append only list of `KeychainChangeSet` to a single file. -/// [`KeychainChangeSet`] record the changes made to a [`KeychainTracker`]. -#[derive(Debug)] -pub struct KeychainStore { - db_file: File, - changeset_type_params: core::marker::PhantomData<(K, P)>, -} - -fn bincode() -> impl bincode::Options { - DefaultOptions::new().with_varint_encoding() -} - -impl KeychainStore -where - K: Ord + Clone + core::fmt::Debug, - P: sparse_chain::ChainPosition, - KeychainChangeSet: serde::Serialize + serde::de::DeserializeOwned, -{ - /// Creates a new store from a [`File`]. - /// - /// The file must have been opened with read and write permissions. - /// - /// [`File`]: std::fs::File - pub fn new(mut file: File) -> Result { - file.rewind()?; - - let mut magic_bytes = [0_u8; MAGIC_BYTES_LEN]; - file.read_exact(&mut magic_bytes)?; - - if magic_bytes != MAGIC_BYTES { - return Err(FileError::InvalidMagicBytes(magic_bytes)); - } - - Ok(Self { - db_file: file, - changeset_type_params: Default::default(), - }) - } - - /// Creates or loads a store from `db_path`. If no file exists there, it will be created. - pub fn new_from_path>(db_path: D) -> Result { - let already_exists = db_path.as_ref().exists(); - - let mut db_file = OpenOptions::new() - .read(true) - .write(true) - .create(true) - .open(db_path)?; - - if !already_exists { - db_file.write_all(&MAGIC_BYTES)?; - } - - Self::new(db_file) - } - - /// Iterates over the stored changeset from first to last, changing the seek position at each - /// iteration. - /// - /// The iterator may fail to read an entry and therefore return an error. However, the first time - /// it returns an error will be the last. After doing so, the iterator will always yield `None`. - /// - /// **WARNING**: This method changes the write position in the underlying file. You should - /// always iterate over all entries until `None` is returned if you want your next write to go - /// at the end; otherwise, you will write over existing entries. - pub fn iter_changesets(&mut self) -> Result>, io::Error> { - self.db_file - .seek(io::SeekFrom::Start(MAGIC_BYTES_LEN as _))?; - - Ok(EntryIter::new(&mut self.db_file)) - } - - /// Loads all the changesets that have been stored as one giant changeset. - /// - /// This function returns a tuple of the aggregate changeset and a result that indicates - /// whether an error occurred while reading or deserializing one of the entries. If so the - /// changeset will consist of all of those it was able to read. - /// - /// You should usually check the error. In many applications, it may make sense to do a full - /// wallet scan with a stop-gap after getting an error, since it is likely that one of the - /// changesets it was unable to read changed the derivation indices of the tracker. - /// - /// **WARNING**: This method changes the write position of the underlying file. The next - /// changeset will be written over the erroring entry (or the end of the file if none existed). - pub fn aggregate_changeset(&mut self) -> (KeychainChangeSet, Result<(), IterError>) { - let mut changeset = KeychainChangeSet::default(); - let result = (|| { - let iter_changeset = self.iter_changesets()?; - for next_changeset in iter_changeset { - changeset.append(next_changeset?); - } - Ok(()) - })(); - - (changeset, result) - } - - /// Reads and applies all the changesets stored sequentially to the tracker, stopping when it fails - /// to read the next one. - /// - /// **WARNING**: This method changes the write position of the underlying file. The next - /// changeset will be written over the erroring entry (or the end of the file if none existed). - pub fn load_into_keychain_tracker( - &mut self, - tracker: &mut KeychainTracker, - ) -> Result<(), IterError> { - for changeset in self.iter_changesets()? { - tracker.apply_changeset(changeset?) - } - Ok(()) - } - - /// Append a new changeset to the file and truncate the file to the end of the appended changeset. - /// - /// The truncation is to avoid the possibility of having a valid but inconsistent changeset - /// directly after the appended changeset. - pub fn append_changeset( - &mut self, - changeset: &KeychainChangeSet, - ) -> Result<(), io::Error> { - if changeset.is_empty() { - return Ok(()); - } - - bincode() - .serialize_into(&mut self.db_file, changeset) - .map_err(|e| match *e { - bincode::ErrorKind::Io(inner) => inner, - unexpected_err => panic!("unexpected bincode error: {}", unexpected_err), - })?; - - // truncate file after this changeset addition - // if this is not done, data after this changeset may represent valid changesets, however - // applying those changesets on top of this one may result in an inconsistent state - let pos = self.db_file.stream_position()?; - self.db_file.set_len(pos)?; - - // We want to make sure that derivation indices changes are written to disk as soon as - // possible, so you know about the write failure before you give out the address in the application. - if !changeset.derivation_indices.is_empty() { - self.db_file.sync_data()?; - } - - Ok(()) - } -} - -/// Error that occurs due to problems encountered with the file. -#[derive(Debug)] -pub enum FileError { - /// IO error, this may mean that the file is too short. - Io(io::Error), - /// Magic bytes do not match what is expected. - InvalidMagicBytes([u8; MAGIC_BYTES_LEN]), -} - -impl core::fmt::Display for FileError { - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - match self { - Self::Io(e) => write!(f, "io error trying to read file: {}", e), - Self::InvalidMagicBytes(b) => write!( - f, - "file has invalid magic bytes: expected={:?} got={:?}", - MAGIC_BYTES, b - ), - } - } -} - -impl From for FileError { - fn from(value: io::Error) -> Self { - Self::Io(value) - } -} - -impl std::error::Error for FileError {} - -/// Error type for [`EntryIter`]. -#[derive(Debug)] -pub enum IterError { - /// Failure to read from the file. - Io(io::Error), - /// Failure to decode data from the file. - Bincode(bincode::ErrorKind), -} - -impl core::fmt::Display for IterError { - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - match self { - IterError::Io(e) => write!(f, "io error trying to read entry {}", e), - IterError::Bincode(e) => write!(f, "bincode error while reading entry {}", e), - } - } -} - -impl std::error::Error for IterError {} - -/// Iterator over entries in a file store. -/// -/// Reads and returns an entry each time [`next`] is called. If an error occurs while reading the -/// iterator will yield a `Result::Err(_)` instead and then `None` for the next call to `next`. -/// -/// [`next`]: Self::next -pub struct EntryIter<'a, V> { - db_file: &'a mut File, - types: PhantomData, - error_exit: bool, -} - -impl<'a, V> EntryIter<'a, V> { - pub fn new(db_file: &'a mut File) -> Self { - Self { - db_file, - types: PhantomData, - error_exit: false, - } - } -} - -impl<'a, V> Iterator for EntryIter<'a, V> -where - V: serde::de::DeserializeOwned, -{ - type Item = Result; - - fn next(&mut self) -> Option { - let result = (|| { - let pos = self.db_file.stream_position()?; - - match bincode().deserialize_from(&mut self.db_file) { - Ok(changeset) => Ok(Some(changeset)), - Err(e) => { - if let bincode::ErrorKind::Io(inner) = &*e { - if inner.kind() == io::ErrorKind::UnexpectedEof { - let eof = self.db_file.seek(io::SeekFrom::End(0))?; - if pos == eof { - return Ok(None); - } - } - } - - self.db_file.seek(io::SeekFrom::Start(pos))?; - Err(IterError::Bincode(*e)) - } - } - })(); - - let result = result.transpose(); - - if let Some(Err(_)) = &result { - self.error_exit = true; - } - - result - } -} - -impl From for IterError { - fn from(value: io::Error) -> Self { - IterError::Io(value) - } -} - -#[cfg(test)] -mod test { - use super::*; - use bdk_chain::{ - keychain::{DerivationAdditions, KeychainChangeSet}, - TxHeight, - }; - use std::{ - io::{Read, Write}, - vec::Vec, - }; - use tempfile::NamedTempFile; - #[derive( - Debug, - Clone, - Copy, - PartialOrd, - Ord, - PartialEq, - Eq, - Hash, - serde::Serialize, - serde::Deserialize, - )] - enum TestKeychain { - External, - Internal, - } - - impl core::fmt::Display for TestKeychain { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::External => write!(f, "external"), - Self::Internal => write!(f, "internal"), - } - } - } - - #[test] - fn magic_bytes() { - assert_eq!(&MAGIC_BYTES, "bdkfs0000000".as_bytes()); - } - - #[test] - fn new_fails_if_file_is_too_short() { - let mut file = NamedTempFile::new().unwrap(); - file.write_all(&MAGIC_BYTES[..MAGIC_BYTES_LEN - 1]) - .expect("should write"); - - match KeychainStore::::new(file.reopen().unwrap()) { - Err(FileError::Io(e)) => assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof), - unexpected => panic!("unexpected result: {:?}", unexpected), - }; - } - - #[test] - fn new_fails_if_magic_bytes_are_invalid() { - let invalid_magic_bytes = "ldkfs0000000"; - - let mut file = NamedTempFile::new().unwrap(); - file.write_all(invalid_magic_bytes.as_bytes()) - .expect("should write"); - - match KeychainStore::::new(file.reopen().unwrap()) { - Err(FileError::InvalidMagicBytes(b)) => { - assert_eq!(b, invalid_magic_bytes.as_bytes()) - } - unexpected => panic!("unexpected result: {:?}", unexpected), - }; - } - - #[test] - fn append_changeset_truncates_invalid_bytes() { - // initial data to write to file (magic bytes + invalid data) - let mut data = [255_u8; 2000]; - data[..MAGIC_BYTES_LEN].copy_from_slice(&MAGIC_BYTES); - - let changeset = KeychainChangeSet { - derivation_indices: DerivationAdditions( - vec![(TestKeychain::External, 42)].into_iter().collect(), - ), - chain_graph: Default::default(), - }; - - let mut file = NamedTempFile::new().unwrap(); - file.write_all(&data).expect("should write"); - - let mut store = KeychainStore::::new(file.reopen().unwrap()) - .expect("should open"); - match store.iter_changesets().expect("seek should succeed").next() { - Some(Err(IterError::Bincode(_))) => {} - unexpected_res => panic!("unexpected result: {:?}", unexpected_res), - } - - store.append_changeset(&changeset).expect("should append"); - - drop(store); - - let got_bytes = { - let mut buf = Vec::new(); - file.reopen() - .unwrap() - .read_to_end(&mut buf) - .expect("should read"); - buf - }; - - let expected_bytes = { - let mut buf = MAGIC_BYTES.to_vec(); - DefaultOptions::new() - .with_varint_encoding() - .serialize_into(&mut buf, &changeset) - .expect("should encode"); - buf - }; - - assert_eq!(got_bytes, expected_bytes); - } -} diff --git a/crates/file_store/src/keychain_store.rs b/crates/file_store/src/keychain_store.rs new file mode 100644 index 00000000..5f5074d5 --- /dev/null +++ b/crates/file_store/src/keychain_store.rs @@ -0,0 +1,313 @@ +//! Module for persisting data on disk. +//! +//! The star of the show is [`KeychainStore`], which maintains an append-only file of +//! [`KeychainChangeSet`]s which can be used to restore a [`KeychainTracker`]. +use bdk_chain::{ + keychain::{KeychainChangeSet, KeychainTracker}, + sparse_chain, +}; +use bincode::Options; +use std::{ + fs::{File, OpenOptions}, + io::{self, Read, Seek, Write}, + path::Path, +}; + +use crate::{bincode_options, EntryIter, IterError}; + +/// BDK File Store magic bytes length. +const MAGIC_BYTES_LEN: usize = 12; + +/// BDK File Store magic bytes. +const MAGIC_BYTES: [u8; MAGIC_BYTES_LEN] = [98, 100, 107, 102, 115, 48, 48, 48, 48, 48, 48, 48]; + +/// Persists an append only list of `KeychainChangeSet` to a single file. +/// [`KeychainChangeSet`] record the changes made to a [`KeychainTracker`]. +#[derive(Debug)] +pub struct KeychainStore { + db_file: File, + changeset_type_params: core::marker::PhantomData<(K, P)>, +} + +impl KeychainStore +where + K: Ord + Clone + core::fmt::Debug, + P: sparse_chain::ChainPosition, + KeychainChangeSet: serde::Serialize + serde::de::DeserializeOwned, +{ + /// Creates a new store from a [`File`]. + /// + /// The file must have been opened with read and write permissions. + /// + /// [`File`]: std::fs::File + pub fn new(mut file: File) -> Result { + file.rewind()?; + + let mut magic_bytes = [0_u8; MAGIC_BYTES_LEN]; + file.read_exact(&mut magic_bytes)?; + + if magic_bytes != MAGIC_BYTES { + return Err(FileError::InvalidMagicBytes(magic_bytes)); + } + + Ok(Self { + db_file: file, + changeset_type_params: Default::default(), + }) + } + + /// Creates or loads a store from `db_path`. If no file exists there, it will be created. + pub fn new_from_path>(db_path: D) -> Result { + let already_exists = db_path.as_ref().exists(); + + let mut db_file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(db_path)?; + + if !already_exists { + db_file.write_all(&MAGIC_BYTES)?; + } + + Self::new(db_file) + } + + /// Iterates over the stored changeset from first to last, changing the seek position at each + /// iteration. + /// + /// The iterator may fail to read an entry and therefore return an error. However, the first time + /// it returns an error will be the last. After doing so, the iterator will always yield `None`. + /// + /// **WARNING**: This method changes the write position in the underlying file. You should + /// always iterate over all entries until `None` is returned if you want your next write to go + /// at the end; otherwise, you will write over existing entries. + pub fn iter_changesets(&mut self) -> Result>, io::Error> { + Ok(EntryIter::new(MAGIC_BYTES_LEN as u64, &mut self.db_file)) + } + + /// Loads all the changesets that have been stored as one giant changeset. + /// + /// This function returns a tuple of the aggregate changeset and a result that indicates + /// whether an error occurred while reading or deserializing one of the entries. If so the + /// changeset will consist of all of those it was able to read. + /// + /// You should usually check the error. In many applications, it may make sense to do a full + /// wallet scan with a stop-gap after getting an error, since it is likely that one of the + /// changesets it was unable to read changed the derivation indices of the tracker. + /// + /// **WARNING**: This method changes the write position of the underlying file. The next + /// changeset will be written over the erroring entry (or the end of the file if none existed). + pub fn aggregate_changeset(&mut self) -> (KeychainChangeSet, Result<(), IterError>) { + let mut changeset = KeychainChangeSet::default(); + let result = (|| { + let iter_changeset = self.iter_changesets()?; + for next_changeset in iter_changeset { + changeset.append(next_changeset?); + } + Ok(()) + })(); + + (changeset, result) + } + + /// Reads and applies all the changesets stored sequentially to the tracker, stopping when it fails + /// to read the next one. + /// + /// **WARNING**: This method changes the write position of the underlying file. The next + /// changeset will be written over the erroring entry (or the end of the file if none existed). + pub fn load_into_keychain_tracker( + &mut self, + tracker: &mut KeychainTracker, + ) -> Result<(), IterError> { + for changeset in self.iter_changesets()? { + tracker.apply_changeset(changeset?) + } + Ok(()) + } + + /// Append a new changeset to the file and truncate the file to the end of the appended changeset. + /// + /// The truncation is to avoid the possibility of having a valid but inconsistent changeset + /// directly after the appended changeset. + pub fn append_changeset( + &mut self, + changeset: &KeychainChangeSet, + ) -> Result<(), io::Error> { + if changeset.is_empty() { + return Ok(()); + } + + bincode_options() + .serialize_into(&mut self.db_file, changeset) + .map_err(|e| match *e { + bincode::ErrorKind::Io(inner) => inner, + unexpected_err => panic!("unexpected bincode error: {}", unexpected_err), + })?; + + // truncate file after this changeset addition + // if this is not done, data after this changeset may represent valid changesets, however + // applying those changesets on top of this one may result in an inconsistent state + let pos = self.db_file.stream_position()?; + self.db_file.set_len(pos)?; + + // We want to make sure that derivation indices changes are written to disk as soon as + // possible, so you know about the write failure before you give out the address in the application. + if !changeset.derivation_indices.is_empty() { + self.db_file.sync_data()?; + } + + Ok(()) + } +} + +/// Error that occurs due to problems encountered with the file. +#[derive(Debug)] +pub enum FileError { + /// IO error, this may mean that the file is too short. + Io(io::Error), + /// Magic bytes do not match what is expected. + InvalidMagicBytes([u8; MAGIC_BYTES_LEN]), +} + +impl core::fmt::Display for FileError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + Self::Io(e) => write!(f, "io error trying to read file: {}", e), + Self::InvalidMagicBytes(b) => write!( + f, + "file has invalid magic bytes: expected={:?} got={:?}", + MAGIC_BYTES, b + ), + } + } +} + +impl From for FileError { + fn from(value: io::Error) -> Self { + Self::Io(value) + } +} + +impl std::error::Error for FileError {} + +#[cfg(test)] +mod test { + use super::*; + use bdk_chain::{ + keychain::{DerivationAdditions, KeychainChangeSet}, + TxHeight, + }; + use bincode::DefaultOptions; + use std::{ + io::{Read, Write}, + vec::Vec, + }; + use tempfile::NamedTempFile; + #[derive( + Debug, + Clone, + Copy, + PartialOrd, + Ord, + PartialEq, + Eq, + Hash, + serde::Serialize, + serde::Deserialize, + )] + enum TestKeychain { + External, + Internal, + } + + impl core::fmt::Display for TestKeychain { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::External => write!(f, "external"), + Self::Internal => write!(f, "internal"), + } + } + } + + #[test] + fn magic_bytes() { + assert_eq!(&MAGIC_BYTES, "bdkfs0000000".as_bytes()); + } + + #[test] + fn new_fails_if_file_is_too_short() { + let mut file = NamedTempFile::new().unwrap(); + file.write_all(&MAGIC_BYTES[..MAGIC_BYTES_LEN - 1]) + .expect("should write"); + + match KeychainStore::::new(file.reopen().unwrap()) { + Err(FileError::Io(e)) => assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof), + unexpected => panic!("unexpected result: {:?}", unexpected), + }; + } + + #[test] + fn new_fails_if_magic_bytes_are_invalid() { + let invalid_magic_bytes = "ldkfs0000000"; + + let mut file = NamedTempFile::new().unwrap(); + file.write_all(invalid_magic_bytes.as_bytes()) + .expect("should write"); + + match KeychainStore::::new(file.reopen().unwrap()) { + Err(FileError::InvalidMagicBytes(b)) => { + assert_eq!(b, invalid_magic_bytes.as_bytes()) + } + unexpected => panic!("unexpected result: {:?}", unexpected), + }; + } + + #[test] + fn append_changeset_truncates_invalid_bytes() { + // initial data to write to file (magic bytes + invalid data) + let mut data = [255_u8; 2000]; + data[..MAGIC_BYTES_LEN].copy_from_slice(&MAGIC_BYTES); + + let changeset = KeychainChangeSet { + derivation_indices: DerivationAdditions( + vec![(TestKeychain::External, 42)].into_iter().collect(), + ), + chain_graph: Default::default(), + }; + + let mut file = NamedTempFile::new().unwrap(); + file.write_all(&data).expect("should write"); + + let mut store = KeychainStore::::new(file.reopen().unwrap()) + .expect("should open"); + match store.iter_changesets().expect("seek should succeed").next() { + Some(Err(IterError::Bincode(_))) => {} + unexpected_res => panic!("unexpected result: {:?}", unexpected_res), + } + + store.append_changeset(&changeset).expect("should append"); + + drop(store); + + let got_bytes = { + let mut buf = Vec::new(); + file.reopen() + .unwrap() + .read_to_end(&mut buf) + .expect("should read"); + buf + }; + + let expected_bytes = { + let mut buf = MAGIC_BYTES.to_vec(); + DefaultOptions::new() + .with_varint_encoding() + .serialize_into(&mut buf, &changeset) + .expect("should encode"); + buf + }; + + assert_eq!(got_bytes, expected_bytes); + } +} diff --git a/crates/file_store/src/lib.rs b/crates/file_store/src/lib.rs index e3347419..b10c8c29 100644 --- a/crates/file_store/src/lib.rs +++ b/crates/file_store/src/lib.rs @@ -1,10 +1,51 @@ #![doc = include_str!("../README.md")] -mod file_store; +mod entry_iter; +mod keychain_store; +mod store; +use std::io; + use bdk_chain::{ keychain::{KeychainChangeSet, KeychainTracker, PersistBackend}, sparse_chain::ChainPosition, }; -pub use file_store::*; +use bincode::{DefaultOptions, Options}; +pub use entry_iter::*; +pub use keychain_store::*; +pub use store::*; + +pub(crate) fn bincode_options() -> impl bincode::Options { + DefaultOptions::new().with_varint_encoding() +} + +/// Error that occurs due to problems encountered with the file. +#[derive(Debug)] +pub enum FileError<'a> { + /// IO error, this may mean that the file is too short. + Io(io::Error), + /// Magic bytes do not match what is expected. + InvalidMagicBytes { got: Vec, expected: &'a [u8] }, +} + +impl<'a> core::fmt::Display for FileError<'a> { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + Self::Io(e) => write!(f, "io error trying to read file: {}", e), + Self::InvalidMagicBytes { got, expected } => write!( + f, + "file has invalid magic bytes: expected={:?} got={:?}", + expected, got, + ), + } + } +} + +impl<'a> From for FileError<'a> { + fn from(value: io::Error) -> Self { + Self::Io(value) + } +} + +impl<'a> std::error::Error for FileError<'a> {} impl PersistBackend for KeychainStore where diff --git a/crates/file_store/src/store.rs b/crates/file_store/src/store.rs new file mode 100644 index 00000000..b82fb396 --- /dev/null +++ b/crates/file_store/src/store.rs @@ -0,0 +1,289 @@ +use std::{ + fmt::Debug, + fs::{File, OpenOptions}, + io::{self, Read, Seek, Write}, + marker::PhantomData, + path::Path, +}; + +use bdk_chain::{Append, PersistBackend}; +use bincode::Options; + +use crate::{bincode_options, EntryIter, FileError, IterError}; + +/// Persists an append-only list of changesets (`C`) to a single file. +/// +/// The changesets are the results of altering a tracker implementation (`T`). +#[derive(Debug)] +pub struct Store<'a, C> { + magic: &'a [u8], + db_file: File, + marker: PhantomData, +} + +impl<'a, C> PersistBackend for Store<'a, C> +where + C: Default + Append + serde::Serialize + serde::de::DeserializeOwned, +{ + type WriteError = std::io::Error; + + type LoadError = IterError; + + fn write_changes(&mut self, changeset: &C) -> Result<(), Self::WriteError> { + self.append_changeset(changeset) + } + + fn load_from_persistence(&mut self) -> Result { + let (changeset, result) = self.aggregate_changesets(); + result.map(|_| changeset) + } +} + +impl<'a, C> Store<'a, C> +where + C: Default + Append + serde::Serialize + serde::de::DeserializeOwned, +{ + /// Creates a new store from a [`File`]. + /// + /// The file must have been opened with read and write permissions. + /// + /// [`File`]: std::fs::File + pub fn new(magic: &'a [u8], mut db_file: File) -> Result { + db_file.rewind()?; + + let mut magic_buf = Vec::from_iter((0..).take(magic.len())); + db_file.read_exact(magic_buf.as_mut())?; + + if magic_buf != magic { + return Err(FileError::InvalidMagicBytes { + got: magic_buf, + expected: magic, + }); + } + + Ok(Self { + magic, + db_file, + marker: Default::default(), + }) + } + + /// Creates or loads a store from `db_path`. + /// + /// If no file exists there, it will be created. + pub fn new_from_path

(magic: &'a [u8], db_path: P) -> Result + where + P: AsRef, + { + let already_exists = db_path.as_ref().exists(); + + let mut db_file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(db_path)?; + + if !already_exists { + db_file.write_all(magic)?; + } + + Self::new(magic, db_file) + } + + /// Iterates over the stored changeset from first to last, changing the seek position at each + /// iteration. + /// + /// The iterator may fail to read an entry and therefore return an error. However, the first time + /// it returns an error will be the last. After doing so, the iterator will always yield `None`. + /// + /// **WARNING**: This method changes the write position in the underlying file. You should + /// always iterate over all entries until `None` is returned if you want your next write to go + /// at the end; otherwise, you will write over existing entries. + pub fn iter_changesets(&mut self) -> EntryIter { + EntryIter::new(self.magic.len() as u64, &mut self.db_file) + } + + /// Loads all the changesets that have been stored as one giant changeset. + /// + /// This function returns a tuple of the aggregate changeset and a result that indicates + /// whether an error occurred while reading or deserializing one of the entries. If so the + /// changeset will consist of all of those it was able to read. + /// + /// You should usually check the error. In many applications, it may make sense to do a full + /// wallet scan with a stop-gap after getting an error, since it is likely that one of the + /// changesets it was unable to read changed the derivation indices of the tracker. + /// + /// **WARNING**: This method changes the write position of the underlying file. The next + /// changeset will be written over the erroring entry (or the end of the file if none existed). + pub fn aggregate_changesets(&mut self) -> (C, Result<(), IterError>) { + let mut changeset = C::default(); + let result = (|| { + for next_changeset in self.iter_changesets() { + changeset.append(next_changeset?); + } + Ok(()) + })(); + + (changeset, result) + } + + /// Append a new changeset to the file and truncate the file to the end of the appended + /// changeset. + /// + /// The truncation is to avoid the possibility of having a valid but inconsistent changeset + /// directly after the appended changeset. + pub fn append_changeset(&mut self, changeset: &C) -> Result<(), io::Error> { + // no need to write anything if changeset is empty + if changeset.is_empty() { + return Ok(()); + } + + bincode_options() + .serialize_into(&mut self.db_file, changeset) + .map_err(|e| match *e { + bincode::ErrorKind::Io(inner) => inner, + unexpected_err => panic!("unexpected bincode error: {}", unexpected_err), + })?; + + // truncate file after this changeset addition + // if this is not done, data after this changeset may represent valid changesets, however + // applying those changesets on top of this one may result in an inconsistent state + let pos = self.db_file.stream_position()?; + self.db_file.set_len(pos)?; + + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::*; + + use bincode::DefaultOptions; + use std::{ + io::{Read, Write}, + vec::Vec, + }; + use tempfile::NamedTempFile; + + const TEST_MAGIC_BYTES_LEN: usize = 12; + const TEST_MAGIC_BYTES: [u8; TEST_MAGIC_BYTES_LEN] = + [98, 100, 107, 102, 115, 49, 49, 49, 49, 49, 49, 49]; + + #[derive( + Debug, + Clone, + Copy, + PartialOrd, + Ord, + PartialEq, + Eq, + Hash, + serde::Serialize, + serde::Deserialize, + )] + enum TestKeychain { + External, + Internal, + } + + impl core::fmt::Display for TestKeychain { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::External => write!(f, "external"), + Self::Internal => write!(f, "internal"), + } + } + } + + #[derive(Debug, Default, serde::Serialize, serde::Deserialize)] + struct TestChangeSet { + pub changes: Vec, + } + + impl Append for TestChangeSet { + fn append(&mut self, mut other: Self) { + self.changes.append(&mut other.changes) + } + + fn is_empty(&self) -> bool { + self.changes.is_empty() + } + } + + #[derive(Debug)] + struct TestTracker; + + #[test] + fn new_fails_if_file_is_too_short() { + let mut file = NamedTempFile::new().unwrap(); + file.write_all(&TEST_MAGIC_BYTES[..TEST_MAGIC_BYTES_LEN - 1]) + .expect("should write"); + + match Store::::new(&TEST_MAGIC_BYTES, file.reopen().unwrap()) { + Err(FileError::Io(e)) => assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof), + unexpected => panic!("unexpected result: {:?}", unexpected), + }; + } + + #[test] + fn new_fails_if_magic_bytes_are_invalid() { + let invalid_magic_bytes = "ldkfs0000000"; + + let mut file = NamedTempFile::new().unwrap(); + file.write_all(invalid_magic_bytes.as_bytes()) + .expect("should write"); + + match Store::::new(&TEST_MAGIC_BYTES, file.reopen().unwrap()) { + Err(FileError::InvalidMagicBytes { got, .. }) => { + assert_eq!(got, invalid_magic_bytes.as_bytes()) + } + unexpected => panic!("unexpected result: {:?}", unexpected), + }; + } + + #[test] + fn append_changeset_truncates_invalid_bytes() { + // initial data to write to file (magic bytes + invalid data) + let mut data = [255_u8; 2000]; + data[..TEST_MAGIC_BYTES_LEN].copy_from_slice(&TEST_MAGIC_BYTES); + + let changeset = TestChangeSet { + changes: vec!["one".into(), "two".into(), "three!".into()], + }; + + let mut file = NamedTempFile::new().unwrap(); + file.write_all(&data).expect("should write"); + + let mut store = Store::::new(&TEST_MAGIC_BYTES, file.reopen().unwrap()) + .expect("should open"); + match store.iter_changesets().next() { + Some(Err(IterError::Bincode(_))) => {} + unexpected_res => panic!("unexpected result: {:?}", unexpected_res), + } + + store.append_changeset(&changeset).expect("should append"); + + drop(store); + + let got_bytes = { + let mut buf = Vec::new(); + file.reopen() + .unwrap() + .read_to_end(&mut buf) + .expect("should read"); + buf + }; + + let expected_bytes = { + let mut buf = TEST_MAGIC_BYTES.to_vec(); + DefaultOptions::new() + .with_varint_encoding() + .serialize_into(&mut buf, &changeset) + .expect("should encode"); + buf + }; + + assert_eq!(got_bytes, expected_bytes); + } +}