]> Untitled Git - bdk/commitdiff
[persist_redesign] Introduce redesigned `persist` types
author志宇 <hello@evanlinjin.me>
Tue, 9 May 2023 01:59:42 +0000 (09:59 +0800)
committer志宇 <hello@evanlinjin.me>
Tue, 9 May 2023 04:55:44 +0000 (12:55 +0800)
This is a more generic version of `keychain::persist::*` structures.

Additional changes:

* The `Append` trait has a new method `is_empty`.
* Introduce `Store` structure for `bdk_file_store`.

crates/chain/src/indexed_tx_graph.rs
crates/chain/src/keychain.rs
crates/chain/src/lib.rs
crates/chain/src/persist.rs [new file with mode: 0644]
crates/chain/src/tx_data_traits.rs
crates/chain/src/tx_graph.rs
crates/file_store/src/entry_iter.rs [new file with mode: 0644]
crates/file_store/src/file_store.rs [deleted file]
crates/file_store/src/keychain_store.rs [new file with mode: 0644]
crates/file_store/src/lib.rs
crates/file_store/src/store.rs [new file with mode: 0644]

index c550d86f07ddd42e804b35678a9fc2dc8edae77c..7ab0ffa8a488d69b28b72382f0a19cbcf7126452 100644 (file)
@@ -301,6 +301,10 @@ impl<A: Anchor, IA: Append> Append for IndexedAdditions<A, IA> {
         self.graph_additions.append(other.graph_additions);
         self.index_additions.append(other.index_additions);
     }
+
+    fn is_empty(&self) -> bool {
+        self.graph_additions.is_empty() && self.index_additions.is_empty()
+    }
 }
 
 /// Represents a structure that can index transaction data.
index 81503049bd9fac3ef1bfced1922368e5bc493e6a..f4d398ab0c359c2b69abe78f652f36791f372b9b 100644 (file)
@@ -84,6 +84,10 @@ impl<K: Ord> Append for DerivationAdditions<K> {
 
         self.0.append(&mut other.0);
     }
+
+    fn is_empty(&self) -> bool {
+        self.0.is_empty()
+    }
 }
 
 impl<K> Default for DerivationAdditions<K> {
index cf3cda3b02021df23208db296b575a9298ea43e7..cbadf1709a70c495d6e1d1cc7abc8f22ca3638a1 100644 (file)
@@ -33,6 +33,8 @@ pub mod tx_graph;
 pub use tx_data_traits::*;
 mod chain_oracle;
 pub use chain_oracle::*;
+mod persist;
+pub use persist::*;
 
 #[doc(hidden)]
 pub mod example_utils;
diff --git a/crates/chain/src/persist.rs b/crates/chain/src/persist.rs
new file mode 100644 (file)
index 0000000..188f88f
--- /dev/null
@@ -0,0 +1,89 @@
+use core::convert::Infallible;
+
+use crate::Append;
+
+/// `Persist` wraps a [`PersistBackend`] (`B`) to create a convenient staging area for changes (`C`)
+/// before they are persisted.
+///
+/// Not all changes to the in-memory representation needs to be written to disk right away, so
+/// [`Persist::stage`] can be used to *stage* changes first and then [`Persist::commit`] can be used
+/// to write changes to disk.
+#[derive(Debug)]
+pub struct Persist<B, C> {
+    backend: B,
+    stage: C,
+}
+
+impl<B, C> Persist<B, C>
+where
+    B: PersistBackend<C>,
+    C: Default + Append,
+{
+    /// Create a new [`Persist`] from [`PersistBackend`].
+    pub fn new(backend: B) -> Self {
+        Self {
+            backend,
+            stage: Default::default(),
+        }
+    }
+
+    /// Stage a `changeset` to be commited later with [`commit`].
+    ///
+    /// [`commit`]: Self::commit
+    pub fn stage(&mut self, changeset: C) {
+        self.stage.append(changeset)
+    }
+
+    /// Get the changes that have not been commited yet.
+    pub fn staged(&self) -> &C {
+        &self.stage
+    }
+
+    /// Commit the staged changes to the underlying persistance backend.
+    ///
+    /// Returns a backend-defined error if this fails.
+    pub fn commit(&mut self) -> Result<(), B::WriteError> {
+        let mut temp = C::default();
+        core::mem::swap(&mut temp, &mut self.stage);
+        self.backend.write_changes(&temp)
+    }
+}
+
+/// A persistence backend for [`Persist`].
+///
+/// `C` represents the changeset; a datatype that records changes made to in-memory data structures
+/// that are to be persisted, or retrieved from persistence.
+pub trait PersistBackend<C> {
+    /// The error the backend returns when it fails to write.
+    type WriteError: core::fmt::Debug;
+
+    /// The error the backend returns when it fails to load changesets `C`.
+    type LoadError: core::fmt::Debug;
+
+    /// Writes a changeset to the persistence backend.
+    ///
+    /// It is up to the backend what it does with this. It could store every changeset in a list or
+    /// it inserts the actual changes into a more structured database. All it needs to guarantee is
+    /// that [`load_from_persistence`] restores a keychain tracker to what it should be if all
+    /// changesets had been applied sequentially.
+    ///
+    /// [`load_from_persistence`]: Self::load_from_persistence
+    fn write_changes(&mut self, changeset: &C) -> Result<(), Self::WriteError>;
+
+    /// Return the aggregate changeset `C` from persistence.
+    fn load_from_persistence(&mut self) -> Result<C, Self::LoadError>;
+}
+
+impl<C: Default> PersistBackend<C> for () {
+    type WriteError = Infallible;
+
+    type LoadError = Infallible;
+
+    fn write_changes(&mut self, _changeset: &C) -> Result<(), Self::WriteError> {
+        Ok(())
+    }
+
+    fn load_from_persistence(&mut self) -> Result<C, Self::LoadError> {
+        Ok(C::default())
+    }
+}
index 8ec695add3bd04edd377304d584e62822505ce83..bd7f138f07293678cf8fb64d389fa2a12148210c 100644 (file)
@@ -64,20 +64,35 @@ impl<A: Anchor> Anchor for &'static A {
 pub trait Append {
     /// Append another object of the same type onto `self`.
     fn append(&mut self, other: Self);
+
+    /// Returns whether the structure is considered empty.
+    fn is_empty(&self) -> bool;
 }
 
 impl Append for () {
     fn append(&mut self, _other: Self) {}
+
+    fn is_empty(&self) -> bool {
+        true
+    }
 }
 
 impl<K: Ord, V> Append for BTreeMap<K, V> {
     fn append(&mut self, mut other: Self) {
         BTreeMap::append(self, &mut other)
     }
+
+    fn is_empty(&self) -> bool {
+        BTreeMap::is_empty(self)
+    }
 }
 
 impl<T: Ord> Append for BTreeSet<T> {
     fn append(&mut self, mut other: Self) {
         BTreeSet::append(self, &mut other)
     }
+
+    fn is_empty(&self) -> bool {
+        BTreeSet::is_empty(self)
+    }
 }
index e75255e4af4fc7dbe1ee998f121183f9b3415cca..ef3f3847cebed66ba9ac2401f78da5cde8066dc5 100644 (file)
@@ -940,6 +940,13 @@ impl<A: Ord> Append for Additions<A> {
                 .collect::<Vec<_>>(),
         );
     }
+
+    fn is_empty(&self) -> bool {
+        self.tx.is_empty()
+            && self.txout.is_empty()
+            && self.anchors.is_empty()
+            && self.last_seen.is_empty()
+    }
 }
 
 impl<A> AsRef<TxGraph<A>> for TxGraph<A> {
diff --git a/crates/file_store/src/entry_iter.rs b/crates/file_store/src/entry_iter.rs
new file mode 100644 (file)
index 0000000..770f264
--- /dev/null
@@ -0,0 +1,100 @@
+use bincode::Options;
+use std::{
+    fs::File,
+    io::{self, Seek},
+    marker::PhantomData,
+};
+
+use crate::bincode_options;
+
+/// Iterator over entries in a file store.
+///
+/// Reads and returns an entry each time [`next`] is called. If an error occurs while reading the
+/// iterator will yield a `Result::Err(_)` instead and then `None` for the next call to `next`.
+///
+/// [`next`]: Self::next
+pub struct EntryIter<'t, T> {
+    db_file: Option<&'t mut File>,
+
+    /// The file position for the first read of `db_file`.
+    start_pos: Option<u64>,
+    types: PhantomData<T>,
+}
+
+impl<'t, T> EntryIter<'t, T> {
+    pub fn new(start_pos: u64, db_file: &'t mut File) -> Self {
+        Self {
+            db_file: Some(db_file),
+            start_pos: Some(start_pos),
+            types: PhantomData,
+        }
+    }
+}
+
+impl<'t, T> Iterator for EntryIter<'t, T>
+where
+    T: serde::de::DeserializeOwned,
+{
+    type Item = Result<T, IterError>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        // closure which reads a single entry starting from `self.pos`
+        let read_one = |f: &mut File, start_pos: Option<u64>| -> Result<Option<T>, IterError> {
+            let pos = match start_pos {
+                Some(pos) => f.seek(io::SeekFrom::Start(pos))?,
+                None => f.stream_position()?,
+            };
+
+            match bincode_options().deserialize_from(&*f) {
+                Ok(changeset) => {
+                    f.stream_position()?;
+                    Ok(Some(changeset))
+                }
+                Err(e) => {
+                    if let bincode::ErrorKind::Io(inner) = &*e {
+                        if inner.kind() == io::ErrorKind::UnexpectedEof {
+                            let eof = f.seek(io::SeekFrom::End(0))?;
+                            if pos == eof {
+                                return Ok(None);
+                            }
+                        }
+                    }
+                    f.seek(io::SeekFrom::Start(pos))?;
+                    Err(IterError::Bincode(*e))
+                }
+            }
+        };
+
+        let result = read_one(self.db_file.as_mut()?, self.start_pos.take());
+        if result.is_err() {
+            self.db_file = None;
+        }
+        result.transpose()
+    }
+}
+
+impl From<io::Error> for IterError {
+    fn from(value: io::Error) -> Self {
+        IterError::Io(value)
+    }
+}
+
+/// Error type for [`EntryIter`].
+#[derive(Debug)]
+pub enum IterError {
+    /// Failure to read from the file.
+    Io(io::Error),
+    /// Failure to decode data from the file.
+    Bincode(bincode::ErrorKind),
+}
+
+impl core::fmt::Display for IterError {
+    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
+        match self {
+            IterError::Io(e) => write!(f, "io error trying to read entry {}", e),
+            IterError::Bincode(e) => write!(f, "bincode error while reading entry {}", e),
+        }
+    }
+}
+
+impl std::error::Error for IterError {}
diff --git a/crates/file_store/src/file_store.rs b/crates/file_store/src/file_store.rs
deleted file mode 100644 (file)
index 824e3cc..0000000
+++ /dev/null
@@ -1,404 +0,0 @@
-//! Module for persisting data on disk.
-//!
-//! The star of the show is [`KeychainStore`], which maintains an append-only file of
-//! [`KeychainChangeSet`]s which can be used to restore a [`KeychainTracker`].
-use bdk_chain::{
-    keychain::{KeychainChangeSet, KeychainTracker},
-    sparse_chain,
-};
-use bincode::{DefaultOptions, Options};
-use core::marker::PhantomData;
-use std::{
-    fs::{File, OpenOptions},
-    io::{self, Read, Seek, Write},
-    path::Path,
-};
-
-/// BDK File Store magic bytes length.
-const MAGIC_BYTES_LEN: usize = 12;
-
-/// BDK File Store magic bytes.
-const MAGIC_BYTES: [u8; MAGIC_BYTES_LEN] = [98, 100, 107, 102, 115, 48, 48, 48, 48, 48, 48, 48];
-
-/// Persists an append only list of `KeychainChangeSet<K,P>` to a single file.
-/// [`KeychainChangeSet<K,P>`] record the changes made to a [`KeychainTracker<K,P>`].
-#[derive(Debug)]
-pub struct KeychainStore<K, P> {
-    db_file: File,
-    changeset_type_params: core::marker::PhantomData<(K, P)>,
-}
-
-fn bincode() -> impl bincode::Options {
-    DefaultOptions::new().with_varint_encoding()
-}
-
-impl<K, P> KeychainStore<K, P>
-where
-    K: Ord + Clone + core::fmt::Debug,
-    P: sparse_chain::ChainPosition,
-    KeychainChangeSet<K, P>: serde::Serialize + serde::de::DeserializeOwned,
-{
-    /// Creates a new store from a [`File`].
-    ///
-    /// The file must have been opened with read and write permissions.
-    ///
-    /// [`File`]: std::fs::File
-    pub fn new(mut file: File) -> Result<Self, FileError> {
-        file.rewind()?;
-
-        let mut magic_bytes = [0_u8; MAGIC_BYTES_LEN];
-        file.read_exact(&mut magic_bytes)?;
-
-        if magic_bytes != MAGIC_BYTES {
-            return Err(FileError::InvalidMagicBytes(magic_bytes));
-        }
-
-        Ok(Self {
-            db_file: file,
-            changeset_type_params: Default::default(),
-        })
-    }
-
-    /// Creates or loads a store from `db_path`. If no file exists there, it will be created.
-    pub fn new_from_path<D: AsRef<Path>>(db_path: D) -> Result<Self, FileError> {
-        let already_exists = db_path.as_ref().exists();
-
-        let mut db_file = OpenOptions::new()
-            .read(true)
-            .write(true)
-            .create(true)
-            .open(db_path)?;
-
-        if !already_exists {
-            db_file.write_all(&MAGIC_BYTES)?;
-        }
-
-        Self::new(db_file)
-    }
-
-    /// Iterates over the stored changeset from first to last, changing the seek position at each
-    /// iteration.
-    ///
-    /// The iterator may fail to read an entry and therefore return an error. However, the first time
-    /// it returns an error will be the last. After doing so, the iterator will always yield `None`.
-    ///
-    /// **WARNING**: This method changes the write position in the underlying file. You should
-    /// always iterate over all entries until `None` is returned if you want your next write to go
-    /// at the end; otherwise, you will write over existing entries.
-    pub fn iter_changesets(&mut self) -> Result<EntryIter<'_, KeychainChangeSet<K, P>>, io::Error> {
-        self.db_file
-            .seek(io::SeekFrom::Start(MAGIC_BYTES_LEN as _))?;
-
-        Ok(EntryIter::new(&mut self.db_file))
-    }
-
-    /// Loads all the changesets that have been stored as one giant changeset.
-    ///
-    /// This function returns a tuple of the aggregate changeset and a result that indicates
-    /// whether an error occurred while reading or deserializing one of the entries. If so the
-    /// changeset will consist of all of those it was able to read.
-    ///
-    /// You should usually check the error. In many applications, it may make sense to do a full
-    /// wallet scan with a stop-gap after getting an error, since it is likely that one of the
-    /// changesets it was unable to read changed the derivation indices of the tracker.
-    ///
-    /// **WARNING**: This method changes the write position of the underlying file. The next
-    /// changeset will be written over the erroring entry (or the end of the file if none existed).
-    pub fn aggregate_changeset(&mut self) -> (KeychainChangeSet<K, P>, Result<(), IterError>) {
-        let mut changeset = KeychainChangeSet::default();
-        let result = (|| {
-            let iter_changeset = self.iter_changesets()?;
-            for next_changeset in iter_changeset {
-                changeset.append(next_changeset?);
-            }
-            Ok(())
-        })();
-
-        (changeset, result)
-    }
-
-    /// Reads and applies all the changesets stored sequentially to the tracker, stopping when it fails
-    /// to read the next one.
-    ///
-    /// **WARNING**: This method changes the write position of the underlying file. The next
-    /// changeset will be written over the erroring entry (or the end of the file if none existed).
-    pub fn load_into_keychain_tracker(
-        &mut self,
-        tracker: &mut KeychainTracker<K, P>,
-    ) -> Result<(), IterError> {
-        for changeset in self.iter_changesets()? {
-            tracker.apply_changeset(changeset?)
-        }
-        Ok(())
-    }
-
-    /// Append a new changeset to the file and truncate the file to the end of the appended changeset.
-    ///
-    /// The truncation is to avoid the possibility of having a valid but inconsistent changeset
-    /// directly after the appended changeset.
-    pub fn append_changeset(
-        &mut self,
-        changeset: &KeychainChangeSet<K, P>,
-    ) -> Result<(), io::Error> {
-        if changeset.is_empty() {
-            return Ok(());
-        }
-
-        bincode()
-            .serialize_into(&mut self.db_file, changeset)
-            .map_err(|e| match *e {
-                bincode::ErrorKind::Io(inner) => inner,
-                unexpected_err => panic!("unexpected bincode error: {}", unexpected_err),
-            })?;
-
-        // truncate file after this changeset addition
-        // if this is not done, data after this changeset may represent valid changesets, however
-        // applying those changesets on top of this one may result in an inconsistent state
-        let pos = self.db_file.stream_position()?;
-        self.db_file.set_len(pos)?;
-
-        // We want to make sure that derivation indices changes are written to disk as soon as
-        // possible, so you know about the write failure before you give out the address in the application.
-        if !changeset.derivation_indices.is_empty() {
-            self.db_file.sync_data()?;
-        }
-
-        Ok(())
-    }
-}
-
-/// Error that occurs due to problems encountered with the file.
-#[derive(Debug)]
-pub enum FileError {
-    /// IO error, this may mean that the file is too short.
-    Io(io::Error),
-    /// Magic bytes do not match what is expected.
-    InvalidMagicBytes([u8; MAGIC_BYTES_LEN]),
-}
-
-impl core::fmt::Display for FileError {
-    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
-        match self {
-            Self::Io(e) => write!(f, "io error trying to read file: {}", e),
-            Self::InvalidMagicBytes(b) => write!(
-                f,
-                "file has invalid magic bytes: expected={:?} got={:?}",
-                MAGIC_BYTES, b
-            ),
-        }
-    }
-}
-
-impl From<io::Error> for FileError {
-    fn from(value: io::Error) -> Self {
-        Self::Io(value)
-    }
-}
-
-impl std::error::Error for FileError {}
-
-/// Error type for [`EntryIter`].
-#[derive(Debug)]
-pub enum IterError {
-    /// Failure to read from the file.
-    Io(io::Error),
-    /// Failure to decode data from the file.
-    Bincode(bincode::ErrorKind),
-}
-
-impl core::fmt::Display for IterError {
-    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
-        match self {
-            IterError::Io(e) => write!(f, "io error trying to read entry {}", e),
-            IterError::Bincode(e) => write!(f, "bincode error while reading entry {}", e),
-        }
-    }
-}
-
-impl std::error::Error for IterError {}
-
-/// Iterator over entries in a file store.
-///
-/// Reads and returns an entry each time [`next`] is called. If an error occurs while reading the
-/// iterator will yield a `Result::Err(_)` instead and then `None` for the next call to `next`.
-///
-/// [`next`]: Self::next
-pub struct EntryIter<'a, V> {
-    db_file: &'a mut File,
-    types: PhantomData<V>,
-    error_exit: bool,
-}
-
-impl<'a, V> EntryIter<'a, V> {
-    pub fn new(db_file: &'a mut File) -> Self {
-        Self {
-            db_file,
-            types: PhantomData,
-            error_exit: false,
-        }
-    }
-}
-
-impl<'a, V> Iterator for EntryIter<'a, V>
-where
-    V: serde::de::DeserializeOwned,
-{
-    type Item = Result<V, IterError>;
-
-    fn next(&mut self) -> Option<Self::Item> {
-        let result = (|| {
-            let pos = self.db_file.stream_position()?;
-
-            match bincode().deserialize_from(&mut self.db_file) {
-                Ok(changeset) => Ok(Some(changeset)),
-                Err(e) => {
-                    if let bincode::ErrorKind::Io(inner) = &*e {
-                        if inner.kind() == io::ErrorKind::UnexpectedEof {
-                            let eof = self.db_file.seek(io::SeekFrom::End(0))?;
-                            if pos == eof {
-                                return Ok(None);
-                            }
-                        }
-                    }
-
-                    self.db_file.seek(io::SeekFrom::Start(pos))?;
-                    Err(IterError::Bincode(*e))
-                }
-            }
-        })();
-
-        let result = result.transpose();
-
-        if let Some(Err(_)) = &result {
-            self.error_exit = true;
-        }
-
-        result
-    }
-}
-
-impl From<io::Error> for IterError {
-    fn from(value: io::Error) -> Self {
-        IterError::Io(value)
-    }
-}
-
-#[cfg(test)]
-mod test {
-    use super::*;
-    use bdk_chain::{
-        keychain::{DerivationAdditions, KeychainChangeSet},
-        TxHeight,
-    };
-    use std::{
-        io::{Read, Write},
-        vec::Vec,
-    };
-    use tempfile::NamedTempFile;
-    #[derive(
-        Debug,
-        Clone,
-        Copy,
-        PartialOrd,
-        Ord,
-        PartialEq,
-        Eq,
-        Hash,
-        serde::Serialize,
-        serde::Deserialize,
-    )]
-    enum TestKeychain {
-        External,
-        Internal,
-    }
-
-    impl core::fmt::Display for TestKeychain {
-        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-            match self {
-                Self::External => write!(f, "external"),
-                Self::Internal => write!(f, "internal"),
-            }
-        }
-    }
-
-    #[test]
-    fn magic_bytes() {
-        assert_eq!(&MAGIC_BYTES, "bdkfs0000000".as_bytes());
-    }
-
-    #[test]
-    fn new_fails_if_file_is_too_short() {
-        let mut file = NamedTempFile::new().unwrap();
-        file.write_all(&MAGIC_BYTES[..MAGIC_BYTES_LEN - 1])
-            .expect("should write");
-
-        match KeychainStore::<TestKeychain, TxHeight>::new(file.reopen().unwrap()) {
-            Err(FileError::Io(e)) => assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof),
-            unexpected => panic!("unexpected result: {:?}", unexpected),
-        };
-    }
-
-    #[test]
-    fn new_fails_if_magic_bytes_are_invalid() {
-        let invalid_magic_bytes = "ldkfs0000000";
-
-        let mut file = NamedTempFile::new().unwrap();
-        file.write_all(invalid_magic_bytes.as_bytes())
-            .expect("should write");
-
-        match KeychainStore::<TestKeychain, TxHeight>::new(file.reopen().unwrap()) {
-            Err(FileError::InvalidMagicBytes(b)) => {
-                assert_eq!(b, invalid_magic_bytes.as_bytes())
-            }
-            unexpected => panic!("unexpected result: {:?}", unexpected),
-        };
-    }
-
-    #[test]
-    fn append_changeset_truncates_invalid_bytes() {
-        // initial data to write to file (magic bytes + invalid data)
-        let mut data = [255_u8; 2000];
-        data[..MAGIC_BYTES_LEN].copy_from_slice(&MAGIC_BYTES);
-
-        let changeset = KeychainChangeSet {
-            derivation_indices: DerivationAdditions(
-                vec![(TestKeychain::External, 42)].into_iter().collect(),
-            ),
-            chain_graph: Default::default(),
-        };
-
-        let mut file = NamedTempFile::new().unwrap();
-        file.write_all(&data).expect("should write");
-
-        let mut store = KeychainStore::<TestKeychain, TxHeight>::new(file.reopen().unwrap())
-            .expect("should open");
-        match store.iter_changesets().expect("seek should succeed").next() {
-            Some(Err(IterError::Bincode(_))) => {}
-            unexpected_res => panic!("unexpected result: {:?}", unexpected_res),
-        }
-
-        store.append_changeset(&changeset).expect("should append");
-
-        drop(store);
-
-        let got_bytes = {
-            let mut buf = Vec::new();
-            file.reopen()
-                .unwrap()
-                .read_to_end(&mut buf)
-                .expect("should read");
-            buf
-        };
-
-        let expected_bytes = {
-            let mut buf = MAGIC_BYTES.to_vec();
-            DefaultOptions::new()
-                .with_varint_encoding()
-                .serialize_into(&mut buf, &changeset)
-                .expect("should encode");
-            buf
-        };
-
-        assert_eq!(got_bytes, expected_bytes);
-    }
-}
diff --git a/crates/file_store/src/keychain_store.rs b/crates/file_store/src/keychain_store.rs
new file mode 100644 (file)
index 0000000..5f5074d
--- /dev/null
@@ -0,0 +1,313 @@
+//! Module for persisting data on disk.
+//!
+//! The star of the show is [`KeychainStore`], which maintains an append-only file of
+//! [`KeychainChangeSet`]s which can be used to restore a [`KeychainTracker`].
+use bdk_chain::{
+    keychain::{KeychainChangeSet, KeychainTracker},
+    sparse_chain,
+};
+use bincode::Options;
+use std::{
+    fs::{File, OpenOptions},
+    io::{self, Read, Seek, Write},
+    path::Path,
+};
+
+use crate::{bincode_options, EntryIter, IterError};
+
+/// BDK File Store magic bytes length.
+const MAGIC_BYTES_LEN: usize = 12;
+
+/// BDK File Store magic bytes.
+const MAGIC_BYTES: [u8; MAGIC_BYTES_LEN] = [98, 100, 107, 102, 115, 48, 48, 48, 48, 48, 48, 48];
+
+/// Persists an append only list of `KeychainChangeSet<K,P>` to a single file.
+/// [`KeychainChangeSet<K,P>`] record the changes made to a [`KeychainTracker<K,P>`].
+#[derive(Debug)]
+pub struct KeychainStore<K, P> {
+    db_file: File,
+    changeset_type_params: core::marker::PhantomData<(K, P)>,
+}
+
+impl<K, P> KeychainStore<K, P>
+where
+    K: Ord + Clone + core::fmt::Debug,
+    P: sparse_chain::ChainPosition,
+    KeychainChangeSet<K, P>: serde::Serialize + serde::de::DeserializeOwned,
+{
+    /// Creates a new store from a [`File`].
+    ///
+    /// The file must have been opened with read and write permissions.
+    ///
+    /// [`File`]: std::fs::File
+    pub fn new(mut file: File) -> Result<Self, FileError> {
+        file.rewind()?;
+
+        let mut magic_bytes = [0_u8; MAGIC_BYTES_LEN];
+        file.read_exact(&mut magic_bytes)?;
+
+        if magic_bytes != MAGIC_BYTES {
+            return Err(FileError::InvalidMagicBytes(magic_bytes));
+        }
+
+        Ok(Self {
+            db_file: file,
+            changeset_type_params: Default::default(),
+        })
+    }
+
+    /// Creates or loads a store from `db_path`. If no file exists there, it will be created.
+    pub fn new_from_path<D: AsRef<Path>>(db_path: D) -> Result<Self, FileError> {
+        let already_exists = db_path.as_ref().exists();
+
+        let mut db_file = OpenOptions::new()
+            .read(true)
+            .write(true)
+            .create(true)
+            .open(db_path)?;
+
+        if !already_exists {
+            db_file.write_all(&MAGIC_BYTES)?;
+        }
+
+        Self::new(db_file)
+    }
+
+    /// Iterates over the stored changeset from first to last, changing the seek position at each
+    /// iteration.
+    ///
+    /// The iterator may fail to read an entry and therefore return an error. However, the first time
+    /// it returns an error will be the last. After doing so, the iterator will always yield `None`.
+    ///
+    /// **WARNING**: This method changes the write position in the underlying file. You should
+    /// always iterate over all entries until `None` is returned if you want your next write to go
+    /// at the end; otherwise, you will write over existing entries.
+    pub fn iter_changesets(&mut self) -> Result<EntryIter<KeychainChangeSet<K, P>>, io::Error> {
+        Ok(EntryIter::new(MAGIC_BYTES_LEN as u64, &mut self.db_file))
+    }
+
+    /// Loads all the changesets that have been stored as one giant changeset.
+    ///
+    /// This function returns a tuple of the aggregate changeset and a result that indicates
+    /// whether an error occurred while reading or deserializing one of the entries. If so the
+    /// changeset will consist of all of those it was able to read.
+    ///
+    /// You should usually check the error. In many applications, it may make sense to do a full
+    /// wallet scan with a stop-gap after getting an error, since it is likely that one of the
+    /// changesets it was unable to read changed the derivation indices of the tracker.
+    ///
+    /// **WARNING**: This method changes the write position of the underlying file. The next
+    /// changeset will be written over the erroring entry (or the end of the file if none existed).
+    pub fn aggregate_changeset(&mut self) -> (KeychainChangeSet<K, P>, Result<(), IterError>) {
+        let mut changeset = KeychainChangeSet::default();
+        let result = (|| {
+            let iter_changeset = self.iter_changesets()?;
+            for next_changeset in iter_changeset {
+                changeset.append(next_changeset?);
+            }
+            Ok(())
+        })();
+
+        (changeset, result)
+    }
+
+    /// Reads and applies all the changesets stored sequentially to the tracker, stopping when it fails
+    /// to read the next one.
+    ///
+    /// **WARNING**: This method changes the write position of the underlying file. The next
+    /// changeset will be written over the erroring entry (or the end of the file if none existed).
+    pub fn load_into_keychain_tracker(
+        &mut self,
+        tracker: &mut KeychainTracker<K, P>,
+    ) -> Result<(), IterError> {
+        for changeset in self.iter_changesets()? {
+            tracker.apply_changeset(changeset?)
+        }
+        Ok(())
+    }
+
+    /// Append a new changeset to the file and truncate the file to the end of the appended changeset.
+    ///
+    /// The truncation is to avoid the possibility of having a valid but inconsistent changeset
+    /// directly after the appended changeset.
+    pub fn append_changeset(
+        &mut self,
+        changeset: &KeychainChangeSet<K, P>,
+    ) -> Result<(), io::Error> {
+        if changeset.is_empty() {
+            return Ok(());
+        }
+
+        bincode_options()
+            .serialize_into(&mut self.db_file, changeset)
+            .map_err(|e| match *e {
+                bincode::ErrorKind::Io(inner) => inner,
+                unexpected_err => panic!("unexpected bincode error: {}", unexpected_err),
+            })?;
+
+        // truncate file after this changeset addition
+        // if this is not done, data after this changeset may represent valid changesets, however
+        // applying those changesets on top of this one may result in an inconsistent state
+        let pos = self.db_file.stream_position()?;
+        self.db_file.set_len(pos)?;
+
+        // We want to make sure that derivation indices changes are written to disk as soon as
+        // possible, so you know about the write failure before you give out the address in the application.
+        if !changeset.derivation_indices.is_empty() {
+            self.db_file.sync_data()?;
+        }
+
+        Ok(())
+    }
+}
+
+/// Error that occurs due to problems encountered with the file.
+#[derive(Debug)]
+pub enum FileError {
+    /// IO error, this may mean that the file is too short.
+    Io(io::Error),
+    /// Magic bytes do not match what is expected.
+    InvalidMagicBytes([u8; MAGIC_BYTES_LEN]),
+}
+
+impl core::fmt::Display for FileError {
+    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
+        match self {
+            Self::Io(e) => write!(f, "io error trying to read file: {}", e),
+            Self::InvalidMagicBytes(b) => write!(
+                f,
+                "file has invalid magic bytes: expected={:?} got={:?}",
+                MAGIC_BYTES, b
+            ),
+        }
+    }
+}
+
+impl From<io::Error> for FileError {
+    fn from(value: io::Error) -> Self {
+        Self::Io(value)
+    }
+}
+
+impl std::error::Error for FileError {}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+    use bdk_chain::{
+        keychain::{DerivationAdditions, KeychainChangeSet},
+        TxHeight,
+    };
+    use bincode::DefaultOptions;
+    use std::{
+        io::{Read, Write},
+        vec::Vec,
+    };
+    use tempfile::NamedTempFile;
+    #[derive(
+        Debug,
+        Clone,
+        Copy,
+        PartialOrd,
+        Ord,
+        PartialEq,
+        Eq,
+        Hash,
+        serde::Serialize,
+        serde::Deserialize,
+    )]
+    enum TestKeychain {
+        External,
+        Internal,
+    }
+
+    impl core::fmt::Display for TestKeychain {
+        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+            match self {
+                Self::External => write!(f, "external"),
+                Self::Internal => write!(f, "internal"),
+            }
+        }
+    }
+
+    #[test]
+    fn magic_bytes() {
+        assert_eq!(&MAGIC_BYTES, "bdkfs0000000".as_bytes());
+    }
+
+    #[test]
+    fn new_fails_if_file_is_too_short() {
+        let mut file = NamedTempFile::new().unwrap();
+        file.write_all(&MAGIC_BYTES[..MAGIC_BYTES_LEN - 1])
+            .expect("should write");
+
+        match KeychainStore::<TestKeychain, TxHeight>::new(file.reopen().unwrap()) {
+            Err(FileError::Io(e)) => assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof),
+            unexpected => panic!("unexpected result: {:?}", unexpected),
+        };
+    }
+
+    #[test]
+    fn new_fails_if_magic_bytes_are_invalid() {
+        let invalid_magic_bytes = "ldkfs0000000";
+
+        let mut file = NamedTempFile::new().unwrap();
+        file.write_all(invalid_magic_bytes.as_bytes())
+            .expect("should write");
+
+        match KeychainStore::<TestKeychain, TxHeight>::new(file.reopen().unwrap()) {
+            Err(FileError::InvalidMagicBytes(b)) => {
+                assert_eq!(b, invalid_magic_bytes.as_bytes())
+            }
+            unexpected => panic!("unexpected result: {:?}", unexpected),
+        };
+    }
+
+    #[test]
+    fn append_changeset_truncates_invalid_bytes() {
+        // initial data to write to file (magic bytes + invalid data)
+        let mut data = [255_u8; 2000];
+        data[..MAGIC_BYTES_LEN].copy_from_slice(&MAGIC_BYTES);
+
+        let changeset = KeychainChangeSet {
+            derivation_indices: DerivationAdditions(
+                vec![(TestKeychain::External, 42)].into_iter().collect(),
+            ),
+            chain_graph: Default::default(),
+        };
+
+        let mut file = NamedTempFile::new().unwrap();
+        file.write_all(&data).expect("should write");
+
+        let mut store = KeychainStore::<TestKeychain, TxHeight>::new(file.reopen().unwrap())
+            .expect("should open");
+        match store.iter_changesets().expect("seek should succeed").next() {
+            Some(Err(IterError::Bincode(_))) => {}
+            unexpected_res => panic!("unexpected result: {:?}", unexpected_res),
+        }
+
+        store.append_changeset(&changeset).expect("should append");
+
+        drop(store);
+
+        let got_bytes = {
+            let mut buf = Vec::new();
+            file.reopen()
+                .unwrap()
+                .read_to_end(&mut buf)
+                .expect("should read");
+            buf
+        };
+
+        let expected_bytes = {
+            let mut buf = MAGIC_BYTES.to_vec();
+            DefaultOptions::new()
+                .with_varint_encoding()
+                .serialize_into(&mut buf, &changeset)
+                .expect("should encode");
+            buf
+        };
+
+        assert_eq!(got_bytes, expected_bytes);
+    }
+}
index e334741947221ab81d9d32788bbf6d3c3c94aa08..b10c8c29ef807288a4f7fa3fad131e89fcba1473 100644 (file)
@@ -1,10 +1,51 @@
 #![doc = include_str!("../README.md")]
-mod file_store;
+mod entry_iter;
+mod keychain_store;
+mod store;
+use std::io;
+
 use bdk_chain::{
     keychain::{KeychainChangeSet, KeychainTracker, PersistBackend},
     sparse_chain::ChainPosition,
 };
-pub use file_store::*;
+use bincode::{DefaultOptions, Options};
+pub use entry_iter::*;
+pub use keychain_store::*;
+pub use store::*;
+
+pub(crate) fn bincode_options() -> impl bincode::Options {
+    DefaultOptions::new().with_varint_encoding()
+}
+
+/// Error that occurs due to problems encountered with the file.
+#[derive(Debug)]
+pub enum FileError<'a> {
+    /// IO error, this may mean that the file is too short.
+    Io(io::Error),
+    /// Magic bytes do not match what is expected.
+    InvalidMagicBytes { got: Vec<u8>, expected: &'a [u8] },
+}
+
+impl<'a> core::fmt::Display for FileError<'a> {
+    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
+        match self {
+            Self::Io(e) => write!(f, "io error trying to read file: {}", e),
+            Self::InvalidMagicBytes { got, expected } => write!(
+                f,
+                "file has invalid magic bytes: expected={:?} got={:?}",
+                expected, got,
+            ),
+        }
+    }
+}
+
+impl<'a> From<io::Error> for FileError<'a> {
+    fn from(value: io::Error) -> Self {
+        Self::Io(value)
+    }
+}
+
+impl<'a> std::error::Error for FileError<'a> {}
 
 impl<K, P> PersistBackend<K, P> for KeychainStore<K, P>
 where
diff --git a/crates/file_store/src/store.rs b/crates/file_store/src/store.rs
new file mode 100644 (file)
index 0000000..b82fb39
--- /dev/null
@@ -0,0 +1,289 @@
+use std::{
+    fmt::Debug,
+    fs::{File, OpenOptions},
+    io::{self, Read, Seek, Write},
+    marker::PhantomData,
+    path::Path,
+};
+
+use bdk_chain::{Append, PersistBackend};
+use bincode::Options;
+
+use crate::{bincode_options, EntryIter, FileError, IterError};
+
+/// Persists an append-only list of changesets (`C`) to a single file.
+///
+/// The changesets are the results of altering a tracker implementation (`T`).
+#[derive(Debug)]
+pub struct Store<'a, C> {
+    magic: &'a [u8],
+    db_file: File,
+    marker: PhantomData<C>,
+}
+
+impl<'a, C> PersistBackend<C> for Store<'a, C>
+where
+    C: Default + Append + serde::Serialize + serde::de::DeserializeOwned,
+{
+    type WriteError = std::io::Error;
+
+    type LoadError = IterError;
+
+    fn write_changes(&mut self, changeset: &C) -> Result<(), Self::WriteError> {
+        self.append_changeset(changeset)
+    }
+
+    fn load_from_persistence(&mut self) -> Result<C, Self::LoadError> {
+        let (changeset, result) = self.aggregate_changesets();
+        result.map(|_| changeset)
+    }
+}
+
+impl<'a, C> Store<'a, C>
+where
+    C: Default + Append + serde::Serialize + serde::de::DeserializeOwned,
+{
+    /// Creates a new store from a [`File`].
+    ///
+    /// The file must have been opened with read and write permissions.
+    ///
+    /// [`File`]: std::fs::File
+    pub fn new(magic: &'a [u8], mut db_file: File) -> Result<Self, FileError> {
+        db_file.rewind()?;
+
+        let mut magic_buf = Vec::from_iter((0..).take(magic.len()));
+        db_file.read_exact(magic_buf.as_mut())?;
+
+        if magic_buf != magic {
+            return Err(FileError::InvalidMagicBytes {
+                got: magic_buf,
+                expected: magic,
+            });
+        }
+
+        Ok(Self {
+            magic,
+            db_file,
+            marker: Default::default(),
+        })
+    }
+
+    /// Creates or loads a store from `db_path`.
+    ///
+    /// If no file exists there, it will be created.
+    pub fn new_from_path<P>(magic: &'a [u8], db_path: P) -> Result<Self, FileError>
+    where
+        P: AsRef<Path>,
+    {
+        let already_exists = db_path.as_ref().exists();
+
+        let mut db_file = OpenOptions::new()
+            .read(true)
+            .write(true)
+            .create(true)
+            .open(db_path)?;
+
+        if !already_exists {
+            db_file.write_all(magic)?;
+        }
+
+        Self::new(magic, db_file)
+    }
+
+    /// Iterates over the stored changeset from first to last, changing the seek position at each
+    /// iteration.
+    ///
+    /// The iterator may fail to read an entry and therefore return an error. However, the first time
+    /// it returns an error will be the last. After doing so, the iterator will always yield `None`.
+    ///
+    /// **WARNING**: This method changes the write position in the underlying file. You should
+    /// always iterate over all entries until `None` is returned if you want your next write to go
+    /// at the end; otherwise, you will write over existing entries.
+    pub fn iter_changesets(&mut self) -> EntryIter<C> {
+        EntryIter::new(self.magic.len() as u64, &mut self.db_file)
+    }
+
+    /// Loads all the changesets that have been stored as one giant changeset.
+    ///
+    /// This function returns a tuple of the aggregate changeset and a result that indicates
+    /// whether an error occurred while reading or deserializing one of the entries. If so the
+    /// changeset will consist of all of those it was able to read.
+    ///
+    /// You should usually check the error. In many applications, it may make sense to do a full
+    /// wallet scan with a stop-gap after getting an error, since it is likely that one of the
+    /// changesets it was unable to read changed the derivation indices of the tracker.
+    ///
+    /// **WARNING**: This method changes the write position of the underlying file. The next
+    /// changeset will be written over the erroring entry (or the end of the file if none existed).
+    pub fn aggregate_changesets(&mut self) -> (C, Result<(), IterError>) {
+        let mut changeset = C::default();
+        let result = (|| {
+            for next_changeset in self.iter_changesets() {
+                changeset.append(next_changeset?);
+            }
+            Ok(())
+        })();
+
+        (changeset, result)
+    }
+
+    /// Append a new changeset to the file and truncate the file to the end of the appended
+    /// changeset.
+    ///
+    /// The truncation is to avoid the possibility of having a valid but inconsistent changeset
+    /// directly after the appended changeset.
+    pub fn append_changeset(&mut self, changeset: &C) -> Result<(), io::Error> {
+        // no need to write anything if changeset is empty
+        if changeset.is_empty() {
+            return Ok(());
+        }
+
+        bincode_options()
+            .serialize_into(&mut self.db_file, changeset)
+            .map_err(|e| match *e {
+                bincode::ErrorKind::Io(inner) => inner,
+                unexpected_err => panic!("unexpected bincode error: {}", unexpected_err),
+            })?;
+
+        // truncate file after this changeset addition
+        // if this is not done, data after this changeset may represent valid changesets, however
+        // applying those changesets on top of this one may result in an inconsistent state
+        let pos = self.db_file.stream_position()?;
+        self.db_file.set_len(pos)?;
+
+        Ok(())
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+
+    use bincode::DefaultOptions;
+    use std::{
+        io::{Read, Write},
+        vec::Vec,
+    };
+    use tempfile::NamedTempFile;
+
+    const TEST_MAGIC_BYTES_LEN: usize = 12;
+    const TEST_MAGIC_BYTES: [u8; TEST_MAGIC_BYTES_LEN] =
+        [98, 100, 107, 102, 115, 49, 49, 49, 49, 49, 49, 49];
+
+    #[derive(
+        Debug,
+        Clone,
+        Copy,
+        PartialOrd,
+        Ord,
+        PartialEq,
+        Eq,
+        Hash,
+        serde::Serialize,
+        serde::Deserialize,
+    )]
+    enum TestKeychain {
+        External,
+        Internal,
+    }
+
+    impl core::fmt::Display for TestKeychain {
+        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+            match self {
+                Self::External => write!(f, "external"),
+                Self::Internal => write!(f, "internal"),
+            }
+        }
+    }
+
+    #[derive(Debug, Default, serde::Serialize, serde::Deserialize)]
+    struct TestChangeSet {
+        pub changes: Vec<String>,
+    }
+
+    impl Append for TestChangeSet {
+        fn append(&mut self, mut other: Self) {
+            self.changes.append(&mut other.changes)
+        }
+
+        fn is_empty(&self) -> bool {
+            self.changes.is_empty()
+        }
+    }
+
+    #[derive(Debug)]
+    struct TestTracker;
+
+    #[test]
+    fn new_fails_if_file_is_too_short() {
+        let mut file = NamedTempFile::new().unwrap();
+        file.write_all(&TEST_MAGIC_BYTES[..TEST_MAGIC_BYTES_LEN - 1])
+            .expect("should write");
+
+        match Store::<TestChangeSet>::new(&TEST_MAGIC_BYTES, file.reopen().unwrap()) {
+            Err(FileError::Io(e)) => assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof),
+            unexpected => panic!("unexpected result: {:?}", unexpected),
+        };
+    }
+
+    #[test]
+    fn new_fails_if_magic_bytes_are_invalid() {
+        let invalid_magic_bytes = "ldkfs0000000";
+
+        let mut file = NamedTempFile::new().unwrap();
+        file.write_all(invalid_magic_bytes.as_bytes())
+            .expect("should write");
+
+        match Store::<TestChangeSet>::new(&TEST_MAGIC_BYTES, file.reopen().unwrap()) {
+            Err(FileError::InvalidMagicBytes { got, .. }) => {
+                assert_eq!(got, invalid_magic_bytes.as_bytes())
+            }
+            unexpected => panic!("unexpected result: {:?}", unexpected),
+        };
+    }
+
+    #[test]
+    fn append_changeset_truncates_invalid_bytes() {
+        // initial data to write to file (magic bytes + invalid data)
+        let mut data = [255_u8; 2000];
+        data[..TEST_MAGIC_BYTES_LEN].copy_from_slice(&TEST_MAGIC_BYTES);
+
+        let changeset = TestChangeSet {
+            changes: vec!["one".into(), "two".into(), "three!".into()],
+        };
+
+        let mut file = NamedTempFile::new().unwrap();
+        file.write_all(&data).expect("should write");
+
+        let mut store = Store::<TestChangeSet>::new(&TEST_MAGIC_BYTES, file.reopen().unwrap())
+            .expect("should open");
+        match store.iter_changesets().next() {
+            Some(Err(IterError::Bincode(_))) => {}
+            unexpected_res => panic!("unexpected result: {:?}", unexpected_res),
+        }
+
+        store.append_changeset(&changeset).expect("should append");
+
+        drop(store);
+
+        let got_bytes = {
+            let mut buf = Vec::new();
+            file.reopen()
+                .unwrap()
+                .read_to_end(&mut buf)
+                .expect("should read");
+            buf
+        };
+
+        let expected_bytes = {
+            let mut buf = TEST_MAGIC_BYTES.to_vec();
+            DefaultOptions::new()
+                .with_varint_encoding()
+                .serialize_into(&mut buf, &changeset)
+                .expect("should encode");
+            buf
+        };
+
+        assert_eq!(got_bytes, expected_bytes);
+    }
+}