}
}
+impl<'t, T> Drop for EntryIter<'t, T> {
+ fn drop(&mut self) {
+ // This syncs the underlying file's offset with the buffer's position. This way, we
+ // maintain the correct position to start the next read/write.
+ if let Ok(pos) = self.db_file.stream_position() {
+ let _ = self.db_file.get_mut().seek(io::SeekFrom::Start(pos));
+ }
+ }
+}
+
/// Error type for [`EntryIter`].
#[derive(Debug)]
pub enum IterError {
}
}
}
+
+ #[test]
+ fn write_after_short_read() {
+ let temp_dir = tempfile::tempdir().unwrap();
+
+ let changesets = (0..20)
+ .map(|n| TestChangeSet::from([format!("{}", n)]))
+ .collect::<Vec<_>>();
+ let last_changeset = TestChangeSet::from(["last".into()]);
+
+ for read_count in 0..changesets.len() {
+ let file_path = temp_dir.path().join(format!("{}.dat", read_count));
+ println!("Test file: {:?}", file_path);
+
+ // First, we create the file with all the changesets!
+ let mut db = Store::<TestChangeSet>::create_new(&TEST_MAGIC_BYTES, &file_path).unwrap();
+ for changeset in &changesets {
+ db.append_changeset(changeset).unwrap();
+ }
+ drop(db);
+
+ // We re-open the file and read `read_count` number of changesets.
+ let mut db = Store::<TestChangeSet>::open(&TEST_MAGIC_BYTES, &file_path).unwrap();
+ let mut exp_aggregation = db
+ .iter_changesets()
+ .take(read_count)
+ .map(|r| r.expect("must read valid changeset"))
+ .fold(TestChangeSet::default(), |mut acc, v| {
+ Append::append(&mut acc, v);
+ acc
+ });
+ // We write after a short read.
+ db.write_changes(&last_changeset)
+ .expect("last write must succeed");
+ Append::append(&mut exp_aggregation, last_changeset.clone());
+ drop(db);
+
+ // We open the file again and check whether aggregate changeset is expected.
+ let aggregation = Store::<TestChangeSet>::open(&TEST_MAGIC_BYTES, &file_path)
+ .unwrap()
+ .aggregate_changesets()
+ .expect("must aggregate changesets")
+ .unwrap_or_default();
+ assert_eq!(aggregation, exp_aggregation);
+ }
+ }
}