]> Untitled Git - bdk/commitdiff
feat(persist): add PersistAsync trait and StagedPersistAsync struct
authorSteve Myers <steve@notmandatory.org>
Sun, 2 Jun 2024 03:45:11 +0000 (22:45 -0500)
committerSteve Myers <steve@notmandatory.org>
Thu, 13 Jun 2024 17:40:50 +0000 (12:40 -0500)
crates/chain/Cargo.toml
crates/chain/src/persist.rs

index a2f355df4d2a9cfa18ae37ad787e5e00a7cd6bfc..e6adfe5e164091ec60fc1cd1abe95e3045de8c74 100644 (file)
@@ -19,6 +19,7 @@ serde_crate = { package = "serde", version = "1", optional = true, features = ["
 # Use hashbrown as a feature flag to have HashSet and HashMap from it.
 hashbrown = { version = "0.9.1", optional = true, features = ["serde"] }
 miniscript = { version = "12.0.0", optional = true, default-features = false }
+async-trait = {  version = "0.1.80", optional = true }
 
 [dev-dependencies]
 rand = "0.8"
@@ -29,3 +30,4 @@ default = ["std", "miniscript", "persist"]
 std = ["bitcoin/std", "miniscript?/std"]
 serde = ["serde_crate", "bitcoin/serde", "miniscript?/serde"]
 persist = ["miniscript"]
+async = ["async-trait"]
index d52ebdf59d0efd70d5bcb39ef11db6b67b5588f1..a953e724f5fac78521fab4cab61abadb29802ec0 100644 (file)
@@ -8,6 +8,10 @@
 //! typically persisted together.
 
 use crate::{indexed_tx_graph, keychain, local_chain, Anchor, Append};
+#[cfg(feature = "async")]
+use alloc::boxed::Box;
+#[cfg(feature = "async")]
+use async_trait::async_trait;
 use bitcoin::Network;
 use core::convert::Infallible;
 use core::default::Default;
@@ -122,6 +126,48 @@ impl<C> Persist<C> for () {
     }
 }
 
+#[cfg(feature = "async")]
+/// An async persistence backend for writing and loading changesets.
+///
+/// `C` represents the changeset; a datatype that records changes made to in-memory data structures
+/// that are to be persisted, or retrieved from persistence.
+#[async_trait]
+pub trait PersistAsync<C> {
+    /// The error the backend returns when it fails to write.
+    type WriteError: Debug + Display;
+
+    /// The error the backend returns when it fails to load changesets `C`.
+    type LoadError: Debug + Display;
+
+    /// Writes a changeset to the persistence backend.
+    ///
+    /// It is up to the backend what it does with this. It could store every changeset in a list or
+    /// it inserts the actual changes into a more structured database. All it needs to guarantee is
+    /// that [`load_from_persistence`] restores a keychain tracker to what it should be if all
+    /// changesets had been applied sequentially.
+    ///
+    /// [`load_from_persistence`]: Self::load_changes
+    async fn write_changes(&mut self, changeset: &C) -> Result<(), Self::WriteError>;
+
+    /// Return the aggregate changeset `C` from persistence.
+    async fn load_changes(&mut self) -> Result<Option<C>, Self::LoadError>;
+}
+
+#[cfg(feature = "async")]
+#[async_trait]
+impl<C> PersistAsync<C> for () {
+    type WriteError = Infallible;
+    type LoadError = Infallible;
+
+    async fn write_changes(&mut self, _changeset: &C) -> Result<(), Self::WriteError> {
+        Ok(())
+    }
+
+    async fn load_changes(&mut self) -> Result<Option<C>, Self::LoadError> {
+        Ok(None)
+    }
+}
+
 /// `StagedPersist` adds a convenient staging area for changesets before they are persisted.
 ///
 /// Not all changes to the in-memory representation needs to be written to disk right away, so
@@ -208,6 +254,97 @@ where
     }
 }
 
+#[cfg(feature = "async")]
+/// `StagedPersistAsync` adds a convenient async staging area for changesets before they are persisted.
+///
+/// Not all changes to the in-memory representation needs to be written to disk right away, so
+/// [`StagedPersistAsync::stage`] can be used to *stage* changes first and then
+/// [`StagedPersistAsync::commit`] can be used to write changes to disk.
+pub struct StagedPersistAsync<C, P: PersistAsync<C>> {
+    inner: P,
+    staged: C,
+}
+
+#[cfg(feature = "async")]
+#[async_trait]
+impl<C: Send + Sync, P: PersistAsync<C> + Send> PersistAsync<C> for StagedPersistAsync<C, P> {
+    type WriteError = P::WriteError;
+    type LoadError = P::LoadError;
+
+    async fn write_changes(&mut self, changeset: &C) -> Result<(), Self::WriteError> {
+        self.inner.write_changes(changeset).await
+    }
+
+    async fn load_changes(&mut self) -> Result<Option<C>, Self::LoadError> {
+        self.inner.load_changes().await
+    }
+}
+
+#[cfg(feature = "async")]
+impl<C, P> StagedPersistAsync<C, P>
+where
+    C: Default + Append + Send + Sync,
+    P: PersistAsync<C> + Send,
+{
+    /// Create a new [`StagedPersistAsync`] adding staging to an inner data store that implements
+    /// [`PersistAsync`].
+    pub fn new(persist: P) -> Self {
+        Self {
+            inner: persist,
+            staged: Default::default(),
+        }
+    }
+
+    /// Stage a `changeset` to be committed later with [`commit`].
+    ///
+    /// [`commit`]: Self::commit
+    pub fn stage(&mut self, changeset: C) {
+        self.staged.append(changeset)
+    }
+
+    /// Get the changes that have not been committed yet.
+    pub fn staged(&self) -> &C {
+        &self.staged
+    }
+
+    /// Take the changes that have not been committed yet.
+    ///
+    /// New staged is set to default;
+    pub fn take_staged(&mut self) -> C {
+        mem::take(&mut self.staged)
+    }
+
+    /// Commit the staged changes to the underlying persistence backend.
+    ///
+    /// Changes that are committed (if any) are returned.
+    ///
+    /// # Error
+    ///
+    /// Returns a backend-defined error if this fails.
+    pub async fn commit(&mut self) -> Result<Option<C>, P::WriteError> {
+        if self.staged().is_empty() {
+            return Ok(None);
+        }
+        let staged = self.take_staged();
+        self.write_changes(&staged)
+            .await
+            // if written successfully, take and return `self.stage`
+            .map(|_| Some(staged))
+    }
+
+    /// Stages a new changeset and commits it (along with any other previously staged changes) to
+    /// the persistence backend
+    ///
+    /// Convenience method for calling [`stage`] and then [`commit`].
+    ///
+    /// [`stage`]: Self::stage
+    /// [`commit`]: Self::commit
+    pub async fn stage_and_commit(&mut self, changeset: C) -> Result<Option<C>, P::WriteError> {
+        self.stage(changeset);
+        self.commit().await
+    }
+}
+
 #[cfg(test)]
 mod test {
     extern crate core;