]> Untitled Git - bdk/commitdiff
fix(file_store): recover file offset after read
author志宇 <hello@evanlinjin.me>
Tue, 23 Jan 2024 04:32:13 +0000 (12:32 +0800)
committer志宇 <hello@evanlinjin.me>
Thu, 25 Jan 2024 08:19:42 +0000 (17:19 +0900)
Because we use wrap the file with `BufReader` with the `EntryIter`, we
need to sync the `BufReader`'s position with the file's offset when we
drop the `EntryIter`. Therefore we have a custom drop impl for
`EntryIter`.

crates/file_store/src/entry_iter.rs
crates/file_store/src/store.rs

index e5e70b3b17741d410c30128c99cb17a7b9e8a3ac..6be3fd03458ce2edcf75ace07b1b0234573ad0aa 100644 (file)
@@ -71,6 +71,16 @@ where
     }
 }
 
+impl<'t, T> Drop for EntryIter<'t, T> {
+    fn drop(&mut self) {
+        // This syncs the underlying file's offset with the buffer's position. This way, we
+        // maintain the correct position to start the next read/write.
+        if let Ok(pos) = self.db_file.stream_position() {
+            let _ = self.db_file.get_mut().seek(io::SeekFrom::Start(pos));
+        }
+    }
+}
+
 /// Error type for [`EntryIter`].
 #[derive(Debug)]
 pub enum IterError {
index 83e5272fda7f760f4394154b12b17da5623c420d..0dc45d28c19b2dfb5481855af4b7b555ba8eb8a6 100644 (file)
@@ -410,4 +410,50 @@ mod test {
             }
         }
     }
+
+    #[test]
+    fn write_after_short_read() {
+        let temp_dir = tempfile::tempdir().unwrap();
+
+        let changesets = (0..20)
+            .map(|n| TestChangeSet::from([format!("{}", n)]))
+            .collect::<Vec<_>>();
+        let last_changeset = TestChangeSet::from(["last".into()]);
+
+        for read_count in 0..changesets.len() {
+            let file_path = temp_dir.path().join(format!("{}.dat", read_count));
+            println!("Test file: {:?}", file_path);
+
+            // First, we create the file with all the changesets!
+            let mut db = Store::<TestChangeSet>::create_new(&TEST_MAGIC_BYTES, &file_path).unwrap();
+            for changeset in &changesets {
+                db.append_changeset(changeset).unwrap();
+            }
+            drop(db);
+
+            // We re-open the file and read `read_count` number of changesets.
+            let mut db = Store::<TestChangeSet>::open(&TEST_MAGIC_BYTES, &file_path).unwrap();
+            let mut exp_aggregation = db
+                .iter_changesets()
+                .take(read_count)
+                .map(|r| r.expect("must read valid changeset"))
+                .fold(TestChangeSet::default(), |mut acc, v| {
+                    Append::append(&mut acc, v);
+                    acc
+                });
+            // We write after a short read.
+            db.write_changes(&last_changeset)
+                .expect("last write must succeed");
+            Append::append(&mut exp_aggregation, last_changeset.clone());
+            drop(db);
+
+            // We open the file again and check whether aggregate changeset is expected.
+            let aggregation = Store::<TestChangeSet>::open(&TEST_MAGIC_BYTES, &file_path)
+                .unwrap()
+                .aggregate_changesets()
+                .expect("must aggregate changesets")
+                .unwrap_or_default();
+            assert_eq!(aggregation, exp_aggregation);
+        }
+    }
 }