]> Untitled Git - bdk/commitdiff
refactor(store)!: change Store's method and error names
authornymius <155548262+nymius@users.noreply.github.com>
Fri, 28 Feb 2025 04:39:57 +0000 (15:39 +1100)
committernymius <155548262+nymius@users.noreply.github.com>
Wed, 5 Mar 2025 23:49:59 +0000 (10:49 +1100)
The changes in this commit were motivated due to a bug in the
`StoreFile` which caused old data to be lost if the file was `open`
instead of created and new data was appended. The bugfix later motivated
a general name cleanup in StoreFile's methods and errors and some minor
changes in their signatures. FileError was renamed to StoreError, which
now includes the IterError variants, allowing the remplacement of the
former form. The new StoreFile methods are:
- create: create file in write only mode or fail if file exists.
- load: open existing file, check integrity of content and retrieve
  Store.
- append: add new changesets to Store. Do nothing if changeset is empty.
- dump: aggregate and retrieve all stored changesets in store.
- load_or_create: load if file exists, create if not, and retrieve
  Store.

README.md
crates/file_store/src/entry_iter.rs
crates/file_store/src/lib.rs
crates/file_store/src/store.rs
example-crates/example_bitcoind_rpc_polling/src/main.rs
example-crates/example_cli/src/lib.rs
example-crates/example_electrum/src/main.rs
example-crates/example_esplora/src/main.rs

index c6c212f1f52bca6d54fda81532141375cfc0a955..f78350a975de12a7491a0e8045049773d26a3214 100644 (file)
--- a/README.md
+++ b/README.md
@@ -42,7 +42,7 @@ The project is split up into several crates in the `/crates` directory:
 - [`wallet`](./crates/wallet): Contains the central high level `Wallet` type that is built from the low-level mechanisms provided by the other components
 - [`chain`](./crates/chain): Tools for storing and indexing chain data
 - [`persist`](./crates/persist): Types that define data persistence of a BDK wallet
-- [`file_store`](./crates/file_store): A (experimental) persistence backend for storing chain data in a single file.
+- [`file_store`](./crates/file_store): Persistence backend for storing chain data in a single file. Intended for testing and development purposes, not for production.
 - [`esplora`](./crates/esplora): Extends the [`esplora-client`] crate with methods to fetch chain data from an esplora HTTP server in the form that [`bdk_chain`] and `Wallet` can consume.
 - [`electrum`](./crates/electrum): Extends the [`electrum-client`] crate with methods to fetch chain data from an electrum server in the form that [`bdk_chain`] and `Wallet` can consume.
 
index ad34c77ded7aff33366512d8e740f11b39aa1150..8b284f1814c6f708a1b10ef83db1839a7d38efd1 100644 (file)
@@ -1,3 +1,4 @@
+use crate::StoreError;
 use bincode::Options;
 use std::{
     fs::File,
@@ -37,7 +38,7 @@ impl<T> Iterator for EntryIter<'_, T>
 where
     T: serde::de::DeserializeOwned,
 {
-    type Item = Result<T, IterError>;
+    type Item = Result<T, StoreError>;
 
     fn next(&mut self) -> Option<Self::Item> {
         if self.finished {
@@ -63,7 +64,7 @@ where
                         }
                     }
                     self.db_file.seek(io::SeekFrom::Start(pos_before_read))?;
-                    Err(IterError::Bincode(*e))
+                    Err(StoreError::Bincode(*e))
                 }
             }
         })()
@@ -80,29 +81,3 @@ impl<T> Drop for EntryIter<'_, T> {
         }
     }
 }
-
-/// Error type for [`EntryIter`].
-#[derive(Debug)]
-pub enum IterError {
-    /// Failure to read from the file.
-    Io(io::Error),
-    /// Failure to decode data from the file.
-    Bincode(bincode::ErrorKind),
-}
-
-impl core::fmt::Display for IterError {
-    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
-        match self {
-            IterError::Io(e) => write!(f, "io error trying to read entry {}", e),
-            IterError::Bincode(e) => write!(f, "bincode error while reading entry {}", e),
-        }
-    }
-}
-
-impl From<io::Error> for IterError {
-    fn from(value: io::Error) -> Self {
-        IterError::Io(value)
-    }
-}
-
-impl std::error::Error for IterError {}
index dfe5c5c679eac1ec82f1cb654f562c24e3256bd0..8703b1a4dbe581c9c7683256592900fdcd98566c 100644 (file)
@@ -13,7 +13,7 @@ pub(crate) fn bincode_options() -> impl bincode::Options {
 
 /// Error that occurs due to problems encountered with the file.
 #[derive(Debug)]
-pub enum FileError {
+pub enum StoreError {
     /// IO error, this may mean that the file is too short.
     Io(io::Error),
     /// Magic bytes do not match what is expected.
@@ -22,7 +22,7 @@ pub enum FileError {
     Bincode(bincode::ErrorKind),
 }
 
-impl core::fmt::Display for FileError {
+impl core::fmt::Display for StoreError {
     fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
         match self {
             Self::Io(e) => write!(f, "io error trying to read file: {}", e),
@@ -36,10 +36,10 @@ impl core::fmt::Display for FileError {
     }
 }
 
-impl From<io::Error> for FileError {
+impl From<io::Error> for StoreError {
     fn from(value: io::Error) -> Self {
         Self::Io(value)
     }
 }
 
-impl std::error::Error for FileError {}
+impl std::error::Error for StoreError {}
index 27523f9d10433aaa5d781a722c4c57b2e2d01142..cad61af1b801b8ee269a5e0c29e3cb7cb5e9e31c 100644 (file)
@@ -1,10 +1,10 @@
-use crate::{bincode_options, EntryIter, FileError, IterError};
+use crate::{bincode_options, EntryIter, StoreError};
 use bdk_core::Merge;
 use bincode::Options;
 use std::{
     fmt::{self, Debug},
     fs::{File, OpenOptions},
-    io::{self, Read, Seek, Write},
+    io::{self, Read, Write},
     marker::PhantomData,
     path::Path,
 };
@@ -14,10 +14,7 @@ use std::{
 /// > ⚠ This is a development/testing database. It does not natively support backwards compatible
 /// > BDK version upgrades so should not be used in production.
 #[derive(Debug)]
-pub struct Store<C>
-where
-    C: Sync + Send,
-{
+pub struct Store<C> {
     magic_len: usize,
     db_file: File,
     marker: PhantomData<C>,
@@ -25,19 +22,15 @@ where
 
 impl<C> Store<C>
 where
-    C: Merge
-        + serde::Serialize
-        + serde::de::DeserializeOwned
-        + core::marker::Send
-        + core::marker::Sync,
+    C: Merge + serde::Serialize + serde::de::DeserializeOwned,
 {
     /// Create a new [`Store`] file in write-only mode; error if the file exists.
     ///
-    /// `magic` is the prefixed bytes to write to the new file. This will be checked when opening
-    /// the `Store` in the future with [`open`].
+    /// `magic` is the prefixed bytes to write to the new file. This will be checked when loading
+    /// the [`Store`] in the future with [`load`].
     ///
-    /// [`open`]: Store::open
-    pub fn create_new<P>(magic: &[u8], file_path: P) -> Result<Self, FileError>
+    /// [`load`]: Store::load
+    pub fn create<P>(magic: &[u8], file_path: P) -> Result<Self, StoreError>
     where
         P: AsRef<Path>,
     {
@@ -55,17 +48,95 @@ where
         })
     }
 
-    /// Open an existing [`Store`].
+    /// Load an existing [`Store`].
     ///
-    /// Use [`create_new`] to create a new `Store`.
+    /// Use [`create`] to create a new [`Store`].
     ///
     /// # Errors
     ///
-    /// If the prefixed bytes of the opened file does not match the provided `magic`, the
-    /// [`FileError::InvalidMagicBytes`] error variant will be returned.
+    /// If the prefixed bytes of the loaded file do not match the provided `magic`, a
+    /// [`StoreErrorWithDump`] will be returned with the [`StoreError::InvalidMagicBytes`] error variant in
+    /// its error field and changeset field set to [`Option::None`]
+    ///
+    /// If there exist changesets in the file, [`load`] will try to aggregate them in
+    /// a single changeset to verify their integrity. If aggregation fails
+    /// [`StoreErrorWithDump`] will be returned with the [`StoreError::Bincode`] error variant in
+    /// its error field and the aggregated changeset so far in the changeset field.
+    ///
+    /// To get a new working file store from this error use [`Store::create`] and [`Store::append`]
+    /// to add the aggregated changeset obtained from [`StoreErrorWithDump`].
     ///
-    /// [`create_new`]: Store::create_new
-    pub fn open<P>(magic: &[u8], file_path: P) -> Result<Self, FileError>
+    /// To analyze the causes of the problem in the original database do not recreate the [`Store`]
+    /// using the same file path. Not changing the file path will overwrite previous file without
+    /// being able to recover its original data.
+    ///
+    /// # Examples
+    /// ```
+    /// use bdk_file_store::{Store, StoreErrorWithDump};
+    /// # use std::fs::OpenOptions;
+    /// # use bdk_core::Merge;
+    /// # use std::collections::BTreeSet;
+    /// # use std::io;
+    /// # use std::io::SeekFrom;
+    /// # use std::io::{Seek, Write};
+    /// #
+    /// # fn main() -> io::Result<()> {
+    /// # const MAGIC_BYTES_LEN: usize = 12;
+    /// # const MAGIC_BYTES: [u8; MAGIC_BYTES_LEN] =
+    /// #     [98, 100, 107, 102, 115, 49, 49, 49, 49, 49, 49, 49];
+    /// #
+    /// # type TestChangeSet = BTreeSet<String>;
+    /// #
+    /// # let temp_dir = tempfile::tempdir()?;
+    /// # let file_path = temp_dir.path().join("db_file");
+    /// # let mut store = Store::<TestChangeSet>::create(&MAGIC_BYTES, &file_path).unwrap();
+    /// # let changesets = [
+    /// #     TestChangeSet::from(["1".into()]),
+    /// #     TestChangeSet::from(["2".into(), "3".into()]),
+    /// #     TestChangeSet::from(["4".into(), "5".into(), "6".into()]),
+    /// # ];
+    /// #
+    /// # for changeset in &changesets[..] {
+    /// #     store.append(changeset)?;
+    /// # }
+    /// #
+    /// # drop(store);
+    /// #
+    /// # // Simulate the file is broken
+    /// # let mut data = [255_u8; 2000];
+    /// # data[..MAGIC_BYTES_LEN].copy_from_slice(&MAGIC_BYTES);
+    /// # let mut file = OpenOptions::new().append(true).open(file_path.clone())?;
+    /// # let new_len = file.seek(SeekFrom::End(-2))?;
+    /// # file.set_len(new_len)?;
+    ///
+    /// let mut new_store = match Store::<TestChangeSet>::load(&MAGIC_BYTES, &file_path) {
+    /// #   Ok(_) => panic!("should have errored"),
+    ///     Ok((store, _aggregated_changeset)) => store,
+    ///     Err(StoreErrorWithDump { changeset, .. }) => {
+    ///         let new_file_path = file_path.with_extension("bkp");
+    ///         let mut new_store = Store::create(&MAGIC_BYTES, &new_file_path).unwrap();
+    ///         if let Some(aggregated_changeset) = changeset {
+    ///             new_store.append(&aggregated_changeset)?;
+    ///         }
+    ///         new_store
+    ///     }
+    /// };
+    /// #
+    /// # assert_eq!(
+    /// #     new_store.dump().expect("should dump changeset: {1, 2, 3} "),
+    /// #     changesets[..2].iter().cloned().reduce(|mut acc, cs| {
+    /// #         Merge::merge(&mut acc, cs);
+    /// #         acc
+    /// #     }),
+    /// #     "should recover all changesets",
+    /// # );
+    /// #
+    /// # Ok(())
+    /// # }
+    /// ```
+    /// [`create`]: Store::create
+    /// [`load`]: Store::load
+    pub fn load<P>(magic: &[u8], file_path: P) -> Result<(Self, Option<C>), StoreErrorWithDump<C>>
     where
         P: AsRef<Path>,
     {
@@ -74,87 +145,92 @@ where
         let mut magic_buf = vec![0_u8; magic.len()];
         f.read_exact(&mut magic_buf)?;
         if magic_buf != magic {
-            return Err(FileError::InvalidMagicBytes {
-                got: magic_buf,
-                expected: magic.to_vec(),
+            return Err(StoreErrorWithDump {
+                changeset: Option::<C>::None,
+                error: StoreError::InvalidMagicBytes {
+                    got: magic_buf,
+                    expected: magic.to_vec(),
+                },
             });
         }
 
-        Ok(Self {
+        let mut store = Self {
             magic_len: magic.len(),
             db_file: f,
             marker: Default::default(),
-        })
+        };
+
+        // Get aggregated changeset
+        let aggregated_changeset = store.dump()?;
+
+        Ok((store, aggregated_changeset))
+    }
+
+    /// Dump the aggregate of all changesets in [`Store`].
+    ///
+    /// # Errors
+    ///
+    /// If there exist changesets in the file, [`dump`] will try to aggregate them in a single
+    /// changeset. If aggregation fails [`StoreErrorWithDump`] will be returned with the
+    /// [`StoreError::Bincode`] error variant in its error field and the aggregated changeset so
+    /// far in the changeset field.
+    ///
+    /// [`dump`]: Store::dump
+    pub fn dump(&mut self) -> Result<Option<C>, StoreErrorWithDump<C>> {
+        EntryIter::new(self.magic_len as u64, &mut self.db_file).try_fold(
+            Option::<C>::None,
+            |mut aggregated_changeset: Option<C>, next_changeset| match next_changeset {
+                Ok(next_changeset) => {
+                    match &mut aggregated_changeset {
+                        Some(aggregated_changeset) => aggregated_changeset.merge(next_changeset),
+                        aggregated_changeset => *aggregated_changeset = Some(next_changeset),
+                    }
+                    Ok(aggregated_changeset)
+                }
+                Err(iter_error) => Err(StoreErrorWithDump {
+                    changeset: aggregated_changeset,
+                    error: iter_error,
+                }),
+            },
+        )
     }
 
-    /// Attempt to open existing [`Store`] file; create it if the file is non-existent.
+    /// Attempt to load existing [`Store`] file; create it if the file does not exist.
     ///
-    /// Internally, this calls either [`open`] or [`create_new`].
+    /// Internally, this calls either [`load`] or [`create`].
     ///
-    /// [`open`]: Store::open
-    /// [`create_new`]: Store::create_new
-    pub fn open_or_create_new<P>(magic: &[u8], file_path: P) -> Result<Self, FileError>
+    /// [`load`]: Store::load
+    /// [`create`]: Store::create
+    pub fn load_or_create<P>(
+        magic: &[u8],
+        file_path: P,
+    ) -> Result<(Self, Option<C>), StoreErrorWithDump<C>>
     where
         P: AsRef<Path>,
     {
         if file_path.as_ref().exists() {
-            Self::open(magic, file_path)
+            Self::load(magic, file_path)
         } else {
-            Self::create_new(magic, file_path)
+            Self::create(magic, file_path)
+                .map(|store| (store, Option::<C>::None))
+                .map_err(|err: StoreError| StoreErrorWithDump {
+                    changeset: Option::<C>::None,
+                    error: err,
+                })
         }
     }
 
-    /// Iterates over the stored changeset from first to last, changing the seek position at each
-    /// iteration.
-    ///
-    /// The iterator may fail to read an entry and therefore return an error. However, the first time
-    /// it returns an error will be the last. After doing so, the iterator will always yield `None`.
-    ///
-    /// **WARNING**: This method changes the write position in the underlying file. You should
-    /// always iterate over all entries until `None` is returned if you want your next write to go
-    /// at the end; otherwise, you will write over existing entries.
-    pub fn iter_changesets(&mut self) -> EntryIter<C> {
-        EntryIter::new(self.magic_len as u64, &mut self.db_file)
-    }
-
-    /// Loads all the changesets that have been stored as one giant changeset.
-    ///
-    /// This function returns the aggregate changeset, or `None` if nothing was persisted.
-    /// If reading or deserializing any of the entries fails, an error is returned that
-    /// consists of all those it was able to read.
+    /// Append a new changeset to the file. Does nothing if the changeset is empty. Truncation is
+    /// not needed because file pointer is always moved to the end of the last decodable data from
+    /// beginning to end.
     ///
-    /// You should usually check the error. In many applications, it may make sense to do a full
-    /// wallet scan with a stop-gap after getting an error, since it is likely that one of the
-    /// changesets was unable to read changes of the derivation indices of a keychain.
+    /// If multiple garbage writes are produced on the file, the next load will only retrieve the
+    /// first chunk of valid changesets.
     ///
-    /// **WARNING**: This method changes the write position of the underlying file. The next
-    /// changeset will be written over the erroring entry (or the end of the file if none existed).
-    pub fn aggregate_changesets(&mut self) -> Result<Option<C>, AggregateChangesetsError<C>> {
-        let mut changeset = Option::<C>::None;
-        for next_changeset in self.iter_changesets() {
-            let next_changeset = match next_changeset {
-                Ok(next_changeset) => next_changeset,
-                Err(iter_error) => {
-                    return Err(AggregateChangesetsError {
-                        changeset,
-                        iter_error,
-                    })
-                }
-            };
-            match &mut changeset {
-                Some(changeset) => changeset.merge(next_changeset),
-                changeset => *changeset = Some(next_changeset),
-            }
-        }
-        Ok(changeset)
-    }
-
-    /// Append a new changeset to the file and truncate the file to the end of the appended
-    /// changeset.
-    ///
-    /// The truncation is to avoid the possibility of having a valid but inconsistent changeset
-    /// directly after the appended changeset.
-    pub fn append_changeset(&mut self, changeset: &C) -> Result<(), io::Error> {
+    /// If garbage data is written and then valid changesets, the next load will still only
+    /// retrieve the first chunk of valid changesets. The recovery of those valid changesets after
+    /// the garbage data is responsibility of the user.
+    pub fn append(&mut self, changeset: &C) -> Result<(), io::Error> {
         // no need to write anything if changeset is empty
         if changeset.is_empty() {
             return Ok(());
@@ -167,45 +243,46 @@ where
                 unexpected_err => panic!("unexpected bincode error: {}", unexpected_err),
             })?;
 
-        // truncate file after this changeset addition
-        // if this is not done, data after this changeset may represent valid changesets, however
-        // applying those changesets on top of this one may result in an inconsistent state
-        let pos = self.db_file.stream_position()?;
-        self.db_file.set_len(pos)?;
-
         Ok(())
     }
 }
 
-/// Error type for [`Store::aggregate_changesets`].
+/// Error type for [`Store::dump`].
 #[derive(Debug)]
-pub struct AggregateChangesetsError<C> {
+pub struct StoreErrorWithDump<C> {
     /// The partially-aggregated changeset.
     pub changeset: Option<C>,
 
-    /// The error returned by [`EntryIter`].
-    pub iter_error: IterError,
+    /// The [`StoreError`]
+    pub error: StoreError,
 }
 
-impl<C> std::fmt::Display for AggregateChangesetsError<C> {
+impl<C> From<io::Error> for StoreErrorWithDump<C> {
+    fn from(value: io::Error) -> Self {
+        Self {
+            changeset: Option::<C>::None,
+            error: StoreError::Io(value),
+        }
+    }
+}
+
+impl<C> std::fmt::Display for StoreErrorWithDump<C> {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        std::fmt::Display::fmt(&self.iter_error, f)
+        std::fmt::Display::fmt(&self.error, f)
     }
 }
 
-impl<C: fmt::Debug> std::error::Error for AggregateChangesetsError<C> {}
+impl<C: fmt::Debug> std::error::Error for StoreErrorWithDump<C> {}
 
 #[cfg(test)]
 mod test {
     use super::*;
 
-    use bincode::DefaultOptions;
     use std::{
         collections::BTreeSet,
-        io::{Read, Write},
-        vec::Vec,
+        fs,
+        io::{Seek, Write},
     };
-    use tempfile::NamedTempFile;
 
     const TEST_MAGIC_BYTES_LEN: usize = 12;
     const TEST_MAGIC_BYTES: [u8; TEST_MAGIC_BYTES_LEN] =
@@ -213,65 +290,50 @@ mod test {
 
     type TestChangeSet = BTreeSet<String>;
 
-    /// Check behavior of [`Store::create_new`] and [`Store::open`].
+    /// Check behavior of [`Store::create`] and [`Store::load`].
     #[test]
     fn construct_store() {
         let temp_dir = tempfile::tempdir().unwrap();
         let file_path = temp_dir.path().join("db_file");
-        let _ = Store::<TestChangeSet>::open(&TEST_MAGIC_BYTES, &file_path)
+        let _ = Store::<TestChangeSet>::load(&TEST_MAGIC_BYTES, &file_path)
             .expect_err("must not open as file does not exist yet");
-        let _ = Store::<TestChangeSet>::create_new(&TEST_MAGIC_BYTES, &file_path)
+        let _ = Store::<TestChangeSet>::create(&TEST_MAGIC_BYTES, &file_path)
             .expect("must create file");
         // cannot create new as file already exists
-        let _ = Store::<TestChangeSet>::create_new(&TEST_MAGIC_BYTES, &file_path)
+        let _ = Store::<TestChangeSet>::create(&TEST_MAGIC_BYTES, &file_path)
             .expect_err("must fail as file already exists now");
-        let _ = Store::<TestChangeSet>::open(&TEST_MAGIC_BYTES, &file_path)
+        let _ = Store::<TestChangeSet>::load(&TEST_MAGIC_BYTES, &file_path)
             .expect("must open as file exists now");
     }
 
     #[test]
-    fn open_or_create_new() {
-        let temp_dir = tempfile::tempdir().unwrap();
-        let file_path = temp_dir.path().join("db_file");
-        let changeset = BTreeSet::from(["hello".to_string(), "world".to_string()]);
-
-        {
-            let mut db = Store::<TestChangeSet>::open_or_create_new(&TEST_MAGIC_BYTES, &file_path)
-                .expect("must create");
-            assert!(file_path.exists());
-            db.append_changeset(&changeset).expect("must succeed");
-        }
-
-        {
-            let mut db = Store::<TestChangeSet>::open_or_create_new(&TEST_MAGIC_BYTES, &file_path)
-                .expect("must recover");
-            let recovered_changeset = db.aggregate_changesets().expect("must succeed");
-            assert_eq!(recovered_changeset, Some(changeset));
-        }
-    }
-
-    #[test]
-    fn new_fails_if_file_is_too_short() {
-        let mut file = NamedTempFile::new().unwrap();
-        file.write_all(&TEST_MAGIC_BYTES[..TEST_MAGIC_BYTES_LEN - 1])
-            .expect("should write");
-
-        match Store::<TestChangeSet>::open(&TEST_MAGIC_BYTES, file.path()) {
-            Err(FileError::Io(e)) => assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof),
+    fn load_fails_if_file_is_too_short() {
+        let tempdir = tempfile::tempdir().unwrap();
+        let file_path = tempdir.path().join("db_file");
+        fs::write(&file_path, &TEST_MAGIC_BYTES[..TEST_MAGIC_BYTES_LEN - 1]).expect("should write");
+
+        match Store::<TestChangeSet>::load(&TEST_MAGIC_BYTES, &file_path) {
+            Err(StoreErrorWithDump {
+                error: StoreError::Io(e),
+                ..
+            }) => assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof),
             unexpected => panic!("unexpected result: {:?}", unexpected),
         };
     }
 
     #[test]
-    fn new_fails_if_magic_bytes_are_invalid() {
+    fn load_fails_if_magic_bytes_are_invalid() {
         let invalid_magic_bytes = "ldkfs0000000";
 
-        let mut file = NamedTempFile::new().unwrap();
-        file.write_all(invalid_magic_bytes.as_bytes())
-            .expect("should write");
+        let tempdir = tempfile::tempdir().unwrap();
+        let file_path = tempdir.path().join("db_file");
+        fs::write(&file_path, invalid_magic_bytes.as_bytes()).expect("should write");
 
-        match Store::<TestChangeSet>::open(&TEST_MAGIC_BYTES, file.path()) {
-            Err(FileError::InvalidMagicBytes { got, .. }) => {
+        match Store::<TestChangeSet>::load(&TEST_MAGIC_BYTES, &file_path) {
+            Err(StoreErrorWithDump {
+                error: StoreError::InvalidMagicBytes { got, .. },
+                ..
+            }) => {
                 assert_eq!(got, invalid_magic_bytes.as_bytes())
             }
             unexpected => panic!("unexpected result: {:?}", unexpected),
@@ -279,46 +341,120 @@ mod test {
     }
 
     #[test]
-    fn append_changeset_truncates_invalid_bytes() {
+    fn load_fails_if_undecodable_bytes() {
         // initial data to write to file (magic bytes + invalid data)
         let mut data = [255_u8; 2000];
         data[..TEST_MAGIC_BYTES_LEN].copy_from_slice(&TEST_MAGIC_BYTES);
 
-        let changeset = TestChangeSet::from(["one".into(), "two".into(), "three!".into()]);
+        let test_changesets = TestChangeSet::from(["one".into(), "two".into(), "three!".into()]);
 
-        let mut file = NamedTempFile::new().unwrap();
-        file.write_all(&data).expect("should write");
+        let temp_dir = tempfile::tempdir().unwrap();
+        let file_path = temp_dir.path().join("db_file");
+        let mut store =
+            Store::<TestChangeSet>::create(&TEST_MAGIC_BYTES, &file_path).expect("should create");
+        store.append(&test_changesets).expect("should append");
+
+        // Write garbage to file
+        store.db_file.write_all(&data).expect("should write");
+
+        drop(store);
 
+        match Store::<TestChangeSet>::load(&TEST_MAGIC_BYTES, file_path) {
+            Err(StoreErrorWithDump {
+                changeset,
+                error: StoreError::Bincode(_),
+            }) => {
+                assert_eq!(changeset, Some(test_changesets))
+            }
+            unexpected_res => panic!("unexpected result: {:?}", unexpected_res),
+        }
+    }
+
+    #[test]
+    fn dump_fails_if_undecodable_bytes() {
+        // initial data to write to file (magic bytes + invalid data)
+        let mut data = [255_u8; 2000];
+        data[..TEST_MAGIC_BYTES_LEN].copy_from_slice(&TEST_MAGIC_BYTES);
+
+        let test_changesets = TestChangeSet::from(["one".into(), "two".into(), "three!".into()]);
+
+        let temp_dir = tempfile::tempdir().unwrap();
+        let file_path = temp_dir.path().join("db_file");
         let mut store =
-            Store::<TestChangeSet>::open(&TEST_MAGIC_BYTES, file.path()).expect("should open");
-        match store.iter_changesets().next() {
-            Some(Err(IterError::Bincode(_))) => {}
+            Store::<TestChangeSet>::create(&TEST_MAGIC_BYTES, file_path).expect("should create");
+        store.append(&test_changesets).expect("should append");
+
+        // Write garbage to file
+        store.db_file.write_all(&data).expect("should write");
+
+        match store.dump() {
+            Err(StoreErrorWithDump {
+                changeset,
+                error: StoreError::Bincode(_),
+            }) => {
+                assert_eq!(changeset, Some(test_changesets))
+            }
             unexpected_res => panic!("unexpected result: {:?}", unexpected_res),
         }
+    }
 
-        store.append_changeset(&changeset).expect("should append");
+    #[test]
+    fn append() {
+        let temp_dir = tempfile::tempdir().unwrap();
+        let file_path = temp_dir.path().join("db_file");
 
-        drop(store);
+        let not_empty_changeset = BTreeSet::from(["hello".to_string(), "world".to_string()]);
 
-        let got_bytes = {
-            let mut buf = Vec::new();
-            file.reopen()
-                .unwrap()
-                .read_to_end(&mut buf)
-                .expect("should read");
-            buf
-        };
+        let mut store =
+            Store::<TestChangeSet>::create(&TEST_MAGIC_BYTES, file_path).expect("must create");
+
+        store
+            .append(&not_empty_changeset)
+            .expect("must append changeset");
+        let aggregated_changeset = store
+            .dump()
+            .expect("should aggregate")
+            .expect("should not be empty");
+        assert_eq!(not_empty_changeset, aggregated_changeset);
+    }
 
-        let expected_bytes = {
-            let mut buf = TEST_MAGIC_BYTES.to_vec();
-            DefaultOptions::new()
-                .with_varint_encoding()
-                .serialize_into(&mut buf, &changeset)
-                .expect("should encode");
-            buf
-        };
+    #[test]
+    fn append_empty_changeset_does_nothing() {
+        let temp_dir = tempfile::tempdir().unwrap();
+        let file_path = temp_dir.path().join("db_file");
+
+        let empty_changeset = BTreeSet::new();
 
-        assert_eq!(got_bytes, expected_bytes);
+        let mut store =
+            Store::<TestChangeSet>::create(&TEST_MAGIC_BYTES, file_path).expect("must create");
+
+        store
+            .append(&empty_changeset)
+            .expect("must append changeset");
+        let aggregated_changeset = store.dump().expect("should aggregate");
+        assert_eq!(None, aggregated_changeset);
+    }
+
+    #[test]
+    fn load_or_create() {
+        let temp_dir = tempfile::tempdir().unwrap();
+        let file_path = temp_dir.path().join("db_file");
+        let changeset = BTreeSet::from(["hello".to_string(), "world".to_string()]);
+
+        {
+            let (mut store, _) =
+                Store::<TestChangeSet>::load_or_create(&TEST_MAGIC_BYTES, &file_path)
+                    .expect("must create");
+            assert!(file_path.exists());
+            store.append(&changeset).expect("must succeed");
+        }
+
+        {
+            let (_, recovered_changeset) =
+                Store::<TestChangeSet>::load_or_create(&TEST_MAGIC_BYTES, &file_path)
+                    .expect("must load");
+            assert_eq!(recovered_changeset, Some(changeset));
+        }
     }
 
     #[test]
@@ -338,13 +474,14 @@ mod test {
 
             // simulate creating a file, writing data where the last write is incomplete
             {
-                let mut db =
-                    Store::<TestChangeSet>::create_new(&TEST_MAGIC_BYTES, &file_path).unwrap();
+                let mut store =
+                    Store::<TestChangeSet>::create(&TEST_MAGIC_BYTES, &file_path).unwrap();
                 for changeset in &changesets {
-                    db.append_changeset(changeset).unwrap();
+                    store.append(changeset).unwrap();
                 }
                 // this is the incomplete write
-                db.db_file
+                store
+                    .db_file
                     .write_all(&last_changeset_bytes[..short_write_len])
                     .unwrap();
             }
@@ -352,10 +489,8 @@ mod test {
             // load file again and aggregate changesets
             // write the last changeset again (this time it succeeds)
             {
-                let mut db = Store::<TestChangeSet>::open(&TEST_MAGIC_BYTES, &file_path).unwrap();
-                let err = db
-                    .aggregate_changesets()
-                    .expect_err("should return error as last read is short");
+                let err = Store::<TestChangeSet>::load(&TEST_MAGIC_BYTES, &file_path)
+                    .expect_err("should fail to aggregate");
                 assert_eq!(
                     err.changeset,
                     changesets.iter().cloned().reduce(|mut acc, cs| {
@@ -364,17 +499,26 @@ mod test {
                     }),
                     "should recover all changesets that are written in full",
                 );
-                db.db_file.write_all(&last_changeset_bytes).unwrap();
+                // Remove file and start again
+                fs::remove_file(&file_path).expect("should remove file");
+                let mut store =
+                    Store::<TestChangeSet>::create(&TEST_MAGIC_BYTES, &file_path).unwrap();
+                for changeset in &changesets {
+                    store.append(changeset).unwrap();
+                }
+                // this is the complete write
+                store
+                    .db_file
+                    .write_all(&last_changeset_bytes)
+                    .expect("should write last changeset in full");
             }
 
             // load file again - this time we should successfully aggregate all changesets
             {
-                let mut db = Store::<TestChangeSet>::open(&TEST_MAGIC_BYTES, &file_path).unwrap();
-                let aggregated_changesets = db
-                    .aggregate_changesets()
-                    .expect("aggregating all changesets should succeed");
+                let (_, aggregated_changeset) =
+                    Store::<TestChangeSet>::load(&TEST_MAGIC_BYTES, &file_path).unwrap();
                 assert_eq!(
-                    aggregated_changesets,
+                    aggregated_changeset,
                     changesets
                         .iter()
                         .cloned()
@@ -390,47 +534,58 @@ mod test {
     }
 
     #[test]
-    fn write_after_short_read() {
+    fn test_load_recovers_state_after_last_write() {
         let temp_dir = tempfile::tempdir().unwrap();
+        let file_path = temp_dir.path().join("db_file");
+        let changeset1 = BTreeSet::from(["hello".to_string(), "world".to_string()]);
+        let changeset2 = BTreeSet::from(["change after write".to_string()]);
 
-        let changesets = (0..20)
-            .map(|n| TestChangeSet::from([format!("{}", n)]))
-            .collect::<Vec<_>>();
-        let last_changeset = TestChangeSet::from(["last".into()]);
+        {
+            // create new store
+            let mut store =
+                Store::<TestChangeSet>::create(&TEST_MAGIC_BYTES, &file_path).expect("must create");
 
-        for read_count in 0..changesets.len() {
-            let file_path = temp_dir.path().join(format!("{}.dat", read_count));
+            // append first changeset to store
+            store.append(&changeset1).expect("must succeed");
+        }
 
-            // First, we create the file with all the changesets!
-            let mut db = Store::<TestChangeSet>::create_new(&TEST_MAGIC_BYTES, &file_path).unwrap();
-            for changeset in &changesets {
-                db.append_changeset(changeset).unwrap();
-            }
-            drop(db);
-
-            // We re-open the file and read `read_count` number of changesets.
-            let mut db = Store::<TestChangeSet>::open(&TEST_MAGIC_BYTES, &file_path).unwrap();
-            let mut exp_aggregation = db
-                .iter_changesets()
-                .take(read_count)
-                .map(|r| r.expect("must read valid changeset"))
-                .fold(TestChangeSet::default(), |mut acc, v| {
-                    Merge::merge(&mut acc, v);
-                    acc
-                });
-            // We write after a short read.
-            db.append_changeset(&last_changeset)
-                .expect("last write must succeed");
-            Merge::merge(&mut exp_aggregation, last_changeset.clone());
-            drop(db);
-
-            // We open the file again and check whether aggregate changeset is expected.
-            let aggregation = Store::<TestChangeSet>::open(&TEST_MAGIC_BYTES, &file_path)
-                .unwrap()
-                .aggregate_changesets()
-                .expect("must aggregate changesets")
-                .unwrap_or_default();
-            assert_eq!(aggregation, exp_aggregation);
+        {
+            // open store
+            let (mut store, _) = Store::<TestChangeSet>::load(&TEST_MAGIC_BYTES, &file_path)
+                .expect("failed to load store");
+
+            // now append the second changeset
+            store.append(&changeset2).expect("must succeed");
+
+            // Retrieve stored changesets from the database
+            let stored_changesets = store
+                .dump()
+                .expect("must succeed")
+                .expect("must be not empty");
+
+            // expected changeset must be changeset2 + changeset1
+            let mut expected_changeset = changeset2.clone();
+            expected_changeset.extend(changeset1);
+
+            // Assert that stored_changesets matches expected_changeset but not changeset2
+            assert_eq!(stored_changesets, expected_changeset);
+            assert_ne!(stored_changesets, changeset2);
         }
+
+        // Open the store again to verify file pointer position at the end of the file
+        let (mut store, _) = Store::<TestChangeSet>::load(&TEST_MAGIC_BYTES, &file_path)
+            .expect("should load correctly");
+
+        // get the current position of file pointer just after loading store
+        let current_pointer = store.db_file.stream_position().expect("must suceed");
+
+        // end pointer for the loaded store
+        let expected_pointer = store
+            .db_file
+            .seek(io::SeekFrom::End(0))
+            .expect("must succeed");
+
+        // current position matches EOF
+        assert_eq!(current_pointer, expected_pointer);
     }
 }
index 95c5479676e3947eb37bd83d516997c38089c6fb..83cb25f8a524fa60cec24db78e9e0120431bc132 100644 (file)
@@ -169,7 +169,7 @@ fn main() -> anyhow::Result<()> {
                     let db = &mut *db.lock().unwrap();
                     last_db_commit = Instant::now();
                     if let Some(changeset) = db_stage.take() {
-                        db.append_changeset(&changeset)?;
+                        db.append(&changeset)?;
                     }
                     println!(
                         "[{:>10}s] committed to db (took {}s)",
@@ -213,7 +213,7 @@ fn main() -> anyhow::Result<()> {
                     ..Default::default()
                 });
                 if let Some(changeset) = db_stage.take() {
-                    db.append_changeset(&changeset)?;
+                    db.append(&changeset)?;
                 }
             }
         }
@@ -307,7 +307,7 @@ fn main() -> anyhow::Result<()> {
                     let db = &mut *db.lock().unwrap();
                     last_db_commit = Instant::now();
                     if let Some(changeset) = db_stage.take() {
-                        db.append_changeset(&changeset)?;
+                        db.append(&changeset)?;
                     }
                     println!(
                         "[{:>10}s] committed to db (took {}s)",
index 3a700db3a120a8e46f542778d1618027b86cbc1e..e965495e05562a6dd0b57dd9c151e7179abc82cd 100644 (file)
@@ -466,7 +466,7 @@ pub fn handle_commands<CS: clap::Subcommand, S: clap::Args>(
                     let ((spk_i, spk), index_changeset) =
                         spk_chooser(index, Keychain::External).expect("Must exist");
                     let db = &mut *db.lock().unwrap();
-                    db.append_changeset(&ChangeSet {
+                    db.append(&ChangeSet {
                         indexer: index_changeset,
                         ..Default::default()
                     })?;
@@ -629,7 +629,7 @@ pub fn handle_commands<CS: clap::Subcommand, S: clap::Args>(
                     // If we're unable to persist this, then we don't want to broadcast.
                     {
                         let db = &mut *db.lock().unwrap();
-                        db.append_changeset(&ChangeSet {
+                        db.append(&ChangeSet {
                             indexer,
                             ..Default::default()
                         })?;
@@ -719,7 +719,7 @@ pub fn handle_commands<CS: clap::Subcommand, S: clap::Args>(
                             // We know the tx is at least unconfirmed now. Note if persisting here fails,
                             // it's not a big deal since we can always find it again from the
                             // blockchain.
-                            db.lock().unwrap().append_changeset(&ChangeSet {
+                            db.lock().unwrap().append(&ChangeSet {
                                 tx_graph: changeset.tx_graph,
                                 indexer: changeset.indexer,
                                 ..Default::default()
@@ -789,9 +789,10 @@ pub fn init_or_load<CS: clap::Subcommand, S: clap::Args>(
         Commands::Generate { network } => generate_bip86_helper(network).map(|_| None),
         // try load
         _ => {
-            let mut db =
-                Store::<ChangeSet>::open(db_magic, db_path).context("could not open file store")?;
-            let changeset = db.aggregate_changesets()?.expect("db must not be empty");
+            let (db, changeset) =
+                Store::<ChangeSet>::load(db_magic, db_path).context("could not open file store")?;
+
+            let changeset = changeset.expect("should not be empty");
 
             let network = changeset.network.expect("changeset network");
 
@@ -866,8 +867,8 @@ where
             LocalChain::from_genesis_hash(constants::genesis_block(network).block_hash());
         changeset.network = Some(network);
         changeset.local_chain = chain_changeset;
-        let mut db = Store::<ChangeSet>::create_new(db_magic, db_path)?;
-        db.append_changeset(&changeset)?;
+        let mut db = Store::<ChangeSet>::create(db_magic, db_path)?;
+        db.append(&changeset)?;
         println!("New database {db_path}");
     }
 
index 9c705a3df65a1834f1256937ba31e5a9c48a0a82..8e3110d68c5728fb9ebbad82ed13a52e26923a8b 100644 (file)
@@ -278,6 +278,6 @@ fn main() -> anyhow::Result<()> {
     };
 
     let mut db = db.lock().unwrap();
-    db.append_changeset(&db_changeset)?;
+    db.append(&db_changeset)?;
     Ok(())
 }
index cba86b86200cab99e1b870c0ce163ac231b96970..2c00751c2acd89637cb860091682cd1df35b9a58 100644 (file)
@@ -278,7 +278,7 @@ fn main() -> anyhow::Result<()> {
 
     // We persist the changes
     let mut db = db.lock().unwrap();
-    db.append_changeset(&ChangeSet {
+    db.append(&ChangeSet {
         local_chain: local_chain_changeset,
         tx_graph: indexed_tx_graph_changeset.tx_graph,
         indexer: indexed_tx_graph_changeset.indexer,