-use crate::{bincode_options, EntryIter, FileError, IterError};
+use crate::{bincode_options, EntryIter, StoreError};
use bdk_core::Merge;
use bincode::Options;
use std::{
fmt::{self, Debug},
fs::{File, OpenOptions},
- io::{self, Read, Seek, Write},
+ io::{self, Read, Write},
marker::PhantomData,
path::Path,
};
/// > ⚠ This is a development/testing database. It does not natively support backwards compatible
/// > BDK version upgrades so should not be used in production.
#[derive(Debug)]
-pub struct Store<C>
-where
- C: Sync + Send,
-{
+pub struct Store<C> {
magic_len: usize,
db_file: File,
marker: PhantomData<C>,
impl<C> Store<C>
where
- C: Merge
- + serde::Serialize
- + serde::de::DeserializeOwned
- + core::marker::Send
- + core::marker::Sync,
+ C: Merge + serde::Serialize + serde::de::DeserializeOwned,
{
/// Create a new [`Store`] file in write-only mode; error if the file exists.
///
- /// `magic` is the prefixed bytes to write to the new file. This will be checked when opening
- /// the `Store` in the future with [`open`].
+ /// `magic` is the prefixed bytes to write to the new file. This will be checked when loading
+ /// the [`Store`] in the future with [`load`].
///
- /// [`open`]: Store::open
- pub fn create_new<P>(magic: &[u8], file_path: P) -> Result<Self, FileError>
+ /// [`load`]: Store::load
+ pub fn create<P>(magic: &[u8], file_path: P) -> Result<Self, StoreError>
where
P: AsRef<Path>,
{
})
}
- /// Open an existing [`Store`].
+ /// Load an existing [`Store`].
///
- /// Use [`create_new`] to create a new `Store`.
+ /// Use [`create`] to create a new [`Store`].
///
/// # Errors
///
- /// If the prefixed bytes of the opened file does not match the provided `magic`, the
- /// [`FileError::InvalidMagicBytes`] error variant will be returned.
+ /// If the prefixed bytes of the loaded file do not match the provided `magic`, a
+ /// [`StoreErrorWithDump`] will be returned with the [`StoreError::InvalidMagicBytes`] error variant in
+ /// its error field and changeset field set to [`Option::None`]
+ ///
+ /// If there exist changesets in the file, [`load`] will try to aggregate them in
+ /// a single changeset to verify their integrity. If aggregation fails
+ /// [`StoreErrorWithDump`] will be returned with the [`StoreError::Bincode`] error variant in
+ /// its error field and the aggregated changeset so far in the changeset field.
+ ///
+ /// To get a new working file store from this error use [`Store::create`] and [`Store::append`]
+ /// to add the aggregated changeset obtained from [`StoreErrorWithDump`].
///
- /// [`create_new`]: Store::create_new
- pub fn open<P>(magic: &[u8], file_path: P) -> Result<Self, FileError>
+ /// To analyze the causes of the problem in the original database do not recreate the [`Store`]
+ /// using the same file path. Not changing the file path will overwrite previous file without
+ /// being able to recover its original data.
+ ///
+ /// # Examples
+ /// ```
+ /// use bdk_file_store::{Store, StoreErrorWithDump};
+ /// # use std::fs::OpenOptions;
+ /// # use bdk_core::Merge;
+ /// # use std::collections::BTreeSet;
+ /// # use std::io;
+ /// # use std::io::SeekFrom;
+ /// # use std::io::{Seek, Write};
+ /// #
+ /// # fn main() -> io::Result<()> {
+ /// # const MAGIC_BYTES_LEN: usize = 12;
+ /// # const MAGIC_BYTES: [u8; MAGIC_BYTES_LEN] =
+ /// # [98, 100, 107, 102, 115, 49, 49, 49, 49, 49, 49, 49];
+ /// #
+ /// # type TestChangeSet = BTreeSet<String>;
+ /// #
+ /// # let temp_dir = tempfile::tempdir()?;
+ /// # let file_path = temp_dir.path().join("db_file");
+ /// # let mut store = Store::<TestChangeSet>::create(&MAGIC_BYTES, &file_path).unwrap();
+ /// # let changesets = [
+ /// # TestChangeSet::from(["1".into()]),
+ /// # TestChangeSet::from(["2".into(), "3".into()]),
+ /// # TestChangeSet::from(["4".into(), "5".into(), "6".into()]),
+ /// # ];
+ /// #
+ /// # for changeset in &changesets[..] {
+ /// # store.append(changeset)?;
+ /// # }
+ /// #
+ /// # drop(store);
+ /// #
+ /// # // Simulate the file is broken
+ /// # let mut data = [255_u8; 2000];
+ /// # data[..MAGIC_BYTES_LEN].copy_from_slice(&MAGIC_BYTES);
+ /// # let mut file = OpenOptions::new().append(true).open(file_path.clone())?;
+ /// # let new_len = file.seek(SeekFrom::End(-2))?;
+ /// # file.set_len(new_len)?;
+ ///
+ /// let mut new_store = match Store::<TestChangeSet>::load(&MAGIC_BYTES, &file_path) {
+ /// # Ok(_) => panic!("should have errored"),
+ /// Ok((store, _aggregated_changeset)) => store,
+ /// Err(StoreErrorWithDump { changeset, .. }) => {
+ /// let new_file_path = file_path.with_extension("bkp");
+ /// let mut new_store = Store::create(&MAGIC_BYTES, &new_file_path).unwrap();
+ /// if let Some(aggregated_changeset) = changeset {
+ /// new_store.append(&aggregated_changeset)?;
+ /// }
+ /// new_store
+ /// }
+ /// };
+ /// #
+ /// # assert_eq!(
+ /// # new_store.dump().expect("should dump changeset: {1, 2, 3} "),
+ /// # changesets[..2].iter().cloned().reduce(|mut acc, cs| {
+ /// # Merge::merge(&mut acc, cs);
+ /// # acc
+ /// # }),
+ /// # "should recover all changesets",
+ /// # );
+ /// #
+ /// # Ok(())
+ /// # }
+ /// ```
+ /// [`create`]: Store::create
+ /// [`load`]: Store::load
+ pub fn load<P>(magic: &[u8], file_path: P) -> Result<(Self, Option<C>), StoreErrorWithDump<C>>
where
P: AsRef<Path>,
{
let mut magic_buf = vec![0_u8; magic.len()];
f.read_exact(&mut magic_buf)?;
if magic_buf != magic {
- return Err(FileError::InvalidMagicBytes {
- got: magic_buf,
- expected: magic.to_vec(),
+ return Err(StoreErrorWithDump {
+ changeset: Option::<C>::None,
+ error: StoreError::InvalidMagicBytes {
+ got: magic_buf,
+ expected: magic.to_vec(),
+ },
});
}
- Ok(Self {
+ let mut store = Self {
magic_len: magic.len(),
db_file: f,
marker: Default::default(),
- })
+ };
+
+ // Get aggregated changeset
+ let aggregated_changeset = store.dump()?;
+
+ Ok((store, aggregated_changeset))
+ }
+
+ /// Dump the aggregate of all changesets in [`Store`].
+ ///
+ /// # Errors
+ ///
+ /// If there exist changesets in the file, [`dump`] will try to aggregate them in a single
+ /// changeset. If aggregation fails [`StoreErrorWithDump`] will be returned with the
+ /// [`StoreError::Bincode`] error variant in its error field and the aggregated changeset so
+ /// far in the changeset field.
+ ///
+ /// [`dump`]: Store::dump
+ pub fn dump(&mut self) -> Result<Option<C>, StoreErrorWithDump<C>> {
+ EntryIter::new(self.magic_len as u64, &mut self.db_file).try_fold(
+ Option::<C>::None,
+ |mut aggregated_changeset: Option<C>, next_changeset| match next_changeset {
+ Ok(next_changeset) => {
+ match &mut aggregated_changeset {
+ Some(aggregated_changeset) => aggregated_changeset.merge(next_changeset),
+ aggregated_changeset => *aggregated_changeset = Some(next_changeset),
+ }
+ Ok(aggregated_changeset)
+ }
+ Err(iter_error) => Err(StoreErrorWithDump {
+ changeset: aggregated_changeset,
+ error: iter_error,
+ }),
+ },
+ )
}
- /// Attempt to open existing [`Store`] file; create it if the file is non-existent.
+ /// Attempt to load existing [`Store`] file; create it if the file does not exist.
///
- /// Internally, this calls either [`open`] or [`create_new`].
+ /// Internally, this calls either [`load`] or [`create`].
///
- /// [`open`]: Store::open
- /// [`create_new`]: Store::create_new
- pub fn open_or_create_new<P>(magic: &[u8], file_path: P) -> Result<Self, FileError>
+ /// [`load`]: Store::load
+ /// [`create`]: Store::create
+ pub fn load_or_create<P>(
+ magic: &[u8],
+ file_path: P,
+ ) -> Result<(Self, Option<C>), StoreErrorWithDump<C>>
where
P: AsRef<Path>,
{
if file_path.as_ref().exists() {
- Self::open(magic, file_path)
+ Self::load(magic, file_path)
} else {
- Self::create_new(magic, file_path)
+ Self::create(magic, file_path)
+ .map(|store| (store, Option::<C>::None))
+ .map_err(|err: StoreError| StoreErrorWithDump {
+ changeset: Option::<C>::None,
+ error: err,
+ })
}
}
- /// Iterates over the stored changeset from first to last, changing the seek position at each
- /// iteration.
- ///
- /// The iterator may fail to read an entry and therefore return an error. However, the first time
- /// it returns an error will be the last. After doing so, the iterator will always yield `None`.
- ///
- /// **WARNING**: This method changes the write position in the underlying file. You should
- /// always iterate over all entries until `None` is returned if you want your next write to go
- /// at the end; otherwise, you will write over existing entries.
- pub fn iter_changesets(&mut self) -> EntryIter<C> {
- EntryIter::new(self.magic_len as u64, &mut self.db_file)
- }
-
- /// Loads all the changesets that have been stored as one giant changeset.
- ///
- /// This function returns the aggregate changeset, or `None` if nothing was persisted.
- /// If reading or deserializing any of the entries fails, an error is returned that
- /// consists of all those it was able to read.
+ /// Append a new changeset to the file. Does nothing if the changeset is empty. Truncation is
+ /// not needed because file pointer is always moved to the end of the last decodable data from
+ /// beginning to end.
///
- /// You should usually check the error. In many applications, it may make sense to do a full
- /// wallet scan with a stop-gap after getting an error, since it is likely that one of the
- /// changesets was unable to read changes of the derivation indices of a keychain.
+ /// If multiple garbage writes are produced on the file, the next load will only retrieve the
+ /// first chunk of valid changesets.
///
- /// **WARNING**: This method changes the write position of the underlying file. The next
- /// changeset will be written over the erroring entry (or the end of the file if none existed).
- pub fn aggregate_changesets(&mut self) -> Result<Option<C>, AggregateChangesetsError<C>> {
- let mut changeset = Option::<C>::None;
- for next_changeset in self.iter_changesets() {
- let next_changeset = match next_changeset {
- Ok(next_changeset) => next_changeset,
- Err(iter_error) => {
- return Err(AggregateChangesetsError {
- changeset,
- iter_error,
- })
- }
- };
- match &mut changeset {
- Some(changeset) => changeset.merge(next_changeset),
- changeset => *changeset = Some(next_changeset),
- }
- }
- Ok(changeset)
- }
-
- /// Append a new changeset to the file and truncate the file to the end of the appended
- /// changeset.
- ///
- /// The truncation is to avoid the possibility of having a valid but inconsistent changeset
- /// directly after the appended changeset.
- pub fn append_changeset(&mut self, changeset: &C) -> Result<(), io::Error> {
+ /// If garbage data is written and then valid changesets, the next load will still only
+ /// retrieve the first chunk of valid changesets. The recovery of those valid changesets after
+ /// the garbage data is responsibility of the user.
+ pub fn append(&mut self, changeset: &C) -> Result<(), io::Error> {
// no need to write anything if changeset is empty
if changeset.is_empty() {
return Ok(());
unexpected_err => panic!("unexpected bincode error: {}", unexpected_err),
})?;
- // truncate file after this changeset addition
- // if this is not done, data after this changeset may represent valid changesets, however
- // applying those changesets on top of this one may result in an inconsistent state
- let pos = self.db_file.stream_position()?;
- self.db_file.set_len(pos)?;
-
Ok(())
}
}
-/// Error type for [`Store::aggregate_changesets`].
+/// Error type for [`Store::dump`].
#[derive(Debug)]
-pub struct AggregateChangesetsError<C> {
+pub struct StoreErrorWithDump<C> {
/// The partially-aggregated changeset.
pub changeset: Option<C>,
- /// The error returned by [`EntryIter`].
- pub iter_error: IterError,
+ /// The [`StoreError`]
+ pub error: StoreError,
}
-impl<C> std::fmt::Display for AggregateChangesetsError<C> {
+impl<C> From<io::Error> for StoreErrorWithDump<C> {
+ fn from(value: io::Error) -> Self {
+ Self {
+ changeset: Option::<C>::None,
+ error: StoreError::Io(value),
+ }
+ }
+}
+
+impl<C> std::fmt::Display for StoreErrorWithDump<C> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- std::fmt::Display::fmt(&self.iter_error, f)
+ std::fmt::Display::fmt(&self.error, f)
}
}
-impl<C: fmt::Debug> std::error::Error for AggregateChangesetsError<C> {}
+impl<C: fmt::Debug> std::error::Error for StoreErrorWithDump<C> {}
#[cfg(test)]
mod test {
use super::*;
- use bincode::DefaultOptions;
use std::{
collections::BTreeSet,
- io::{Read, Write},
- vec::Vec,
+ fs,
+ io::{Seek, Write},
};
- use tempfile::NamedTempFile;
const TEST_MAGIC_BYTES_LEN: usize = 12;
const TEST_MAGIC_BYTES: [u8; TEST_MAGIC_BYTES_LEN] =
type TestChangeSet = BTreeSet<String>;
- /// Check behavior of [`Store::create_new`] and [`Store::open`].
+ /// Check behavior of [`Store::create`] and [`Store::load`].
#[test]
fn construct_store() {
let temp_dir = tempfile::tempdir().unwrap();
let file_path = temp_dir.path().join("db_file");
- let _ = Store::<TestChangeSet>::open(&TEST_MAGIC_BYTES, &file_path)
+ let _ = Store::<TestChangeSet>::load(&TEST_MAGIC_BYTES, &file_path)
.expect_err("must not open as file does not exist yet");
- let _ = Store::<TestChangeSet>::create_new(&TEST_MAGIC_BYTES, &file_path)
+ let _ = Store::<TestChangeSet>::create(&TEST_MAGIC_BYTES, &file_path)
.expect("must create file");
// cannot create new as file already exists
- let _ = Store::<TestChangeSet>::create_new(&TEST_MAGIC_BYTES, &file_path)
+ let _ = Store::<TestChangeSet>::create(&TEST_MAGIC_BYTES, &file_path)
.expect_err("must fail as file already exists now");
- let _ = Store::<TestChangeSet>::open(&TEST_MAGIC_BYTES, &file_path)
+ let _ = Store::<TestChangeSet>::load(&TEST_MAGIC_BYTES, &file_path)
.expect("must open as file exists now");
}
#[test]
- fn open_or_create_new() {
- let temp_dir = tempfile::tempdir().unwrap();
- let file_path = temp_dir.path().join("db_file");
- let changeset = BTreeSet::from(["hello".to_string(), "world".to_string()]);
-
- {
- let mut db = Store::<TestChangeSet>::open_or_create_new(&TEST_MAGIC_BYTES, &file_path)
- .expect("must create");
- assert!(file_path.exists());
- db.append_changeset(&changeset).expect("must succeed");
- }
-
- {
- let mut db = Store::<TestChangeSet>::open_or_create_new(&TEST_MAGIC_BYTES, &file_path)
- .expect("must recover");
- let recovered_changeset = db.aggregate_changesets().expect("must succeed");
- assert_eq!(recovered_changeset, Some(changeset));
- }
- }
-
- #[test]
- fn new_fails_if_file_is_too_short() {
- let mut file = NamedTempFile::new().unwrap();
- file.write_all(&TEST_MAGIC_BYTES[..TEST_MAGIC_BYTES_LEN - 1])
- .expect("should write");
-
- match Store::<TestChangeSet>::open(&TEST_MAGIC_BYTES, file.path()) {
- Err(FileError::Io(e)) => assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof),
+ fn load_fails_if_file_is_too_short() {
+ let tempdir = tempfile::tempdir().unwrap();
+ let file_path = tempdir.path().join("db_file");
+ fs::write(&file_path, &TEST_MAGIC_BYTES[..TEST_MAGIC_BYTES_LEN - 1]).expect("should write");
+
+ match Store::<TestChangeSet>::load(&TEST_MAGIC_BYTES, &file_path) {
+ Err(StoreErrorWithDump {
+ error: StoreError::Io(e),
+ ..
+ }) => assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof),
unexpected => panic!("unexpected result: {:?}", unexpected),
};
}
#[test]
- fn new_fails_if_magic_bytes_are_invalid() {
+ fn load_fails_if_magic_bytes_are_invalid() {
let invalid_magic_bytes = "ldkfs0000000";
- let mut file = NamedTempFile::new().unwrap();
- file.write_all(invalid_magic_bytes.as_bytes())
- .expect("should write");
+ let tempdir = tempfile::tempdir().unwrap();
+ let file_path = tempdir.path().join("db_file");
+ fs::write(&file_path, invalid_magic_bytes.as_bytes()).expect("should write");
- match Store::<TestChangeSet>::open(&TEST_MAGIC_BYTES, file.path()) {
- Err(FileError::InvalidMagicBytes { got, .. }) => {
+ match Store::<TestChangeSet>::load(&TEST_MAGIC_BYTES, &file_path) {
+ Err(StoreErrorWithDump {
+ error: StoreError::InvalidMagicBytes { got, .. },
+ ..
+ }) => {
assert_eq!(got, invalid_magic_bytes.as_bytes())
}
unexpected => panic!("unexpected result: {:?}", unexpected),
}
#[test]
- fn append_changeset_truncates_invalid_bytes() {
+ fn load_fails_if_undecodable_bytes() {
// initial data to write to file (magic bytes + invalid data)
let mut data = [255_u8; 2000];
data[..TEST_MAGIC_BYTES_LEN].copy_from_slice(&TEST_MAGIC_BYTES);
- let changeset = TestChangeSet::from(["one".into(), "two".into(), "three!".into()]);
+ let test_changesets = TestChangeSet::from(["one".into(), "two".into(), "three!".into()]);
- let mut file = NamedTempFile::new().unwrap();
- file.write_all(&data).expect("should write");
+ let temp_dir = tempfile::tempdir().unwrap();
+ let file_path = temp_dir.path().join("db_file");
+ let mut store =
+ Store::<TestChangeSet>::create(&TEST_MAGIC_BYTES, &file_path).expect("should create");
+ store.append(&test_changesets).expect("should append");
+
+ // Write garbage to file
+ store.db_file.write_all(&data).expect("should write");
+
+ drop(store);
+ match Store::<TestChangeSet>::load(&TEST_MAGIC_BYTES, file_path) {
+ Err(StoreErrorWithDump {
+ changeset,
+ error: StoreError::Bincode(_),
+ }) => {
+ assert_eq!(changeset, Some(test_changesets))
+ }
+ unexpected_res => panic!("unexpected result: {:?}", unexpected_res),
+ }
+ }
+
+ #[test]
+ fn dump_fails_if_undecodable_bytes() {
+ // initial data to write to file (magic bytes + invalid data)
+ let mut data = [255_u8; 2000];
+ data[..TEST_MAGIC_BYTES_LEN].copy_from_slice(&TEST_MAGIC_BYTES);
+
+ let test_changesets = TestChangeSet::from(["one".into(), "two".into(), "three!".into()]);
+
+ let temp_dir = tempfile::tempdir().unwrap();
+ let file_path = temp_dir.path().join("db_file");
let mut store =
- Store::<TestChangeSet>::open(&TEST_MAGIC_BYTES, file.path()).expect("should open");
- match store.iter_changesets().next() {
- Some(Err(IterError::Bincode(_))) => {}
+ Store::<TestChangeSet>::create(&TEST_MAGIC_BYTES, file_path).expect("should create");
+ store.append(&test_changesets).expect("should append");
+
+ // Write garbage to file
+ store.db_file.write_all(&data).expect("should write");
+
+ match store.dump() {
+ Err(StoreErrorWithDump {
+ changeset,
+ error: StoreError::Bincode(_),
+ }) => {
+ assert_eq!(changeset, Some(test_changesets))
+ }
unexpected_res => panic!("unexpected result: {:?}", unexpected_res),
}
+ }
- store.append_changeset(&changeset).expect("should append");
+ #[test]
+ fn append() {
+ let temp_dir = tempfile::tempdir().unwrap();
+ let file_path = temp_dir.path().join("db_file");
- drop(store);
+ let not_empty_changeset = BTreeSet::from(["hello".to_string(), "world".to_string()]);
- let got_bytes = {
- let mut buf = Vec::new();
- file.reopen()
- .unwrap()
- .read_to_end(&mut buf)
- .expect("should read");
- buf
- };
+ let mut store =
+ Store::<TestChangeSet>::create(&TEST_MAGIC_BYTES, file_path).expect("must create");
+
+ store
+ .append(¬_empty_changeset)
+ .expect("must append changeset");
+ let aggregated_changeset = store
+ .dump()
+ .expect("should aggregate")
+ .expect("should not be empty");
+ assert_eq!(not_empty_changeset, aggregated_changeset);
+ }
- let expected_bytes = {
- let mut buf = TEST_MAGIC_BYTES.to_vec();
- DefaultOptions::new()
- .with_varint_encoding()
- .serialize_into(&mut buf, &changeset)
- .expect("should encode");
- buf
- };
+ #[test]
+ fn append_empty_changeset_does_nothing() {
+ let temp_dir = tempfile::tempdir().unwrap();
+ let file_path = temp_dir.path().join("db_file");
+
+ let empty_changeset = BTreeSet::new();
- assert_eq!(got_bytes, expected_bytes);
+ let mut store =
+ Store::<TestChangeSet>::create(&TEST_MAGIC_BYTES, file_path).expect("must create");
+
+ store
+ .append(&empty_changeset)
+ .expect("must append changeset");
+ let aggregated_changeset = store.dump().expect("should aggregate");
+ assert_eq!(None, aggregated_changeset);
+ }
+
+ #[test]
+ fn load_or_create() {
+ let temp_dir = tempfile::tempdir().unwrap();
+ let file_path = temp_dir.path().join("db_file");
+ let changeset = BTreeSet::from(["hello".to_string(), "world".to_string()]);
+
+ {
+ let (mut store, _) =
+ Store::<TestChangeSet>::load_or_create(&TEST_MAGIC_BYTES, &file_path)
+ .expect("must create");
+ assert!(file_path.exists());
+ store.append(&changeset).expect("must succeed");
+ }
+
+ {
+ let (_, recovered_changeset) =
+ Store::<TestChangeSet>::load_or_create(&TEST_MAGIC_BYTES, &file_path)
+ .expect("must load");
+ assert_eq!(recovered_changeset, Some(changeset));
+ }
}
#[test]
// simulate creating a file, writing data where the last write is incomplete
{
- let mut db =
- Store::<TestChangeSet>::create_new(&TEST_MAGIC_BYTES, &file_path).unwrap();
+ let mut store =
+ Store::<TestChangeSet>::create(&TEST_MAGIC_BYTES, &file_path).unwrap();
for changeset in &changesets {
- db.append_changeset(changeset).unwrap();
+ store.append(changeset).unwrap();
}
// this is the incomplete write
- db.db_file
+ store
+ .db_file
.write_all(&last_changeset_bytes[..short_write_len])
.unwrap();
}
// load file again and aggregate changesets
// write the last changeset again (this time it succeeds)
{
- let mut db = Store::<TestChangeSet>::open(&TEST_MAGIC_BYTES, &file_path).unwrap();
- let err = db
- .aggregate_changesets()
- .expect_err("should return error as last read is short");
+ let err = Store::<TestChangeSet>::load(&TEST_MAGIC_BYTES, &file_path)
+ .expect_err("should fail to aggregate");
assert_eq!(
err.changeset,
changesets.iter().cloned().reduce(|mut acc, cs| {
}),
"should recover all changesets that are written in full",
);
- db.db_file.write_all(&last_changeset_bytes).unwrap();
+ // Remove file and start again
+ fs::remove_file(&file_path).expect("should remove file");
+ let mut store =
+ Store::<TestChangeSet>::create(&TEST_MAGIC_BYTES, &file_path).unwrap();
+ for changeset in &changesets {
+ store.append(changeset).unwrap();
+ }
+ // this is the complete write
+ store
+ .db_file
+ .write_all(&last_changeset_bytes)
+ .expect("should write last changeset in full");
}
// load file again - this time we should successfully aggregate all changesets
{
- let mut db = Store::<TestChangeSet>::open(&TEST_MAGIC_BYTES, &file_path).unwrap();
- let aggregated_changesets = db
- .aggregate_changesets()
- .expect("aggregating all changesets should succeed");
+ let (_, aggregated_changeset) =
+ Store::<TestChangeSet>::load(&TEST_MAGIC_BYTES, &file_path).unwrap();
assert_eq!(
- aggregated_changesets,
+ aggregated_changeset,
changesets
.iter()
.cloned()
}
#[test]
- fn write_after_short_read() {
+ fn test_load_recovers_state_after_last_write() {
let temp_dir = tempfile::tempdir().unwrap();
+ let file_path = temp_dir.path().join("db_file");
+ let changeset1 = BTreeSet::from(["hello".to_string(), "world".to_string()]);
+ let changeset2 = BTreeSet::from(["change after write".to_string()]);
- let changesets = (0..20)
- .map(|n| TestChangeSet::from([format!("{}", n)]))
- .collect::<Vec<_>>();
- let last_changeset = TestChangeSet::from(["last".into()]);
+ {
+ // create new store
+ let mut store =
+ Store::<TestChangeSet>::create(&TEST_MAGIC_BYTES, &file_path).expect("must create");
- for read_count in 0..changesets.len() {
- let file_path = temp_dir.path().join(format!("{}.dat", read_count));
+ // append first changeset to store
+ store.append(&changeset1).expect("must succeed");
+ }
- // First, we create the file with all the changesets!
- let mut db = Store::<TestChangeSet>::create_new(&TEST_MAGIC_BYTES, &file_path).unwrap();
- for changeset in &changesets {
- db.append_changeset(changeset).unwrap();
- }
- drop(db);
-
- // We re-open the file and read `read_count` number of changesets.
- let mut db = Store::<TestChangeSet>::open(&TEST_MAGIC_BYTES, &file_path).unwrap();
- let mut exp_aggregation = db
- .iter_changesets()
- .take(read_count)
- .map(|r| r.expect("must read valid changeset"))
- .fold(TestChangeSet::default(), |mut acc, v| {
- Merge::merge(&mut acc, v);
- acc
- });
- // We write after a short read.
- db.append_changeset(&last_changeset)
- .expect("last write must succeed");
- Merge::merge(&mut exp_aggregation, last_changeset.clone());
- drop(db);
-
- // We open the file again and check whether aggregate changeset is expected.
- let aggregation = Store::<TestChangeSet>::open(&TEST_MAGIC_BYTES, &file_path)
- .unwrap()
- .aggregate_changesets()
- .expect("must aggregate changesets")
- .unwrap_or_default();
- assert_eq!(aggregation, exp_aggregation);
+ {
+ // open store
+ let (mut store, _) = Store::<TestChangeSet>::load(&TEST_MAGIC_BYTES, &file_path)
+ .expect("failed to load store");
+
+ // now append the second changeset
+ store.append(&changeset2).expect("must succeed");
+
+ // Retrieve stored changesets from the database
+ let stored_changesets = store
+ .dump()
+ .expect("must succeed")
+ .expect("must be not empty");
+
+ // expected changeset must be changeset2 + changeset1
+ let mut expected_changeset = changeset2.clone();
+ expected_changeset.extend(changeset1);
+
+ // Assert that stored_changesets matches expected_changeset but not changeset2
+ assert_eq!(stored_changesets, expected_changeset);
+ assert_ne!(stored_changesets, changeset2);
}
+
+ // Open the store again to verify file pointer position at the end of the file
+ let (mut store, _) = Store::<TestChangeSet>::load(&TEST_MAGIC_BYTES, &file_path)
+ .expect("should load correctly");
+
+ // get the current position of file pointer just after loading store
+ let current_pointer = store.db_file.stream_position().expect("must suceed");
+
+ // end pointer for the loaded store
+ let expected_pointer = store
+ .db_file
+ .seek(io::SeekFrom::End(0))
+ .expect("must succeed");
+
+ // current position matches EOF
+ assert_eq!(current_pointer, expected_pointer);
}
}