]> Untitled Git - bdk/commitdiff
refactor(file_store): Use BufReader but simplify
authorLLFourn <lloyd.fourn@gmail.com>
Mon, 22 Jan 2024 02:48:48 +0000 (13:48 +1100)
committer志宇 <hello@evanlinjin.me>
Tue, 23 Jan 2024 03:41:00 +0000 (11:41 +0800)
crates/file_store/src/entry_iter.rs

index d95a67f8e7f09623a4956b0338995d92a2960858..e5e70b3b17741d410c30128c99cb17a7b9e8a3ac 100644 (file)
@@ -7,8 +7,6 @@ use std::{
 
 use crate::bincode_options;
 
-type EntryReader<'t> = CountingReader<BufReader<&'t mut File>>;
-
 /// Iterator over entries in a file store.
 ///
 /// Reads and returns an entry each time [`next`] is called. If an error occurs while reading the
@@ -16,8 +14,9 @@ type EntryReader<'t> = CountingReader<BufReader<&'t mut File>>;
 ///
 /// [`next`]: Self::next
 pub struct EntryIter<'t, T> {
-    db_file: Option<EntryReader<'t>>,
-
+    /// Buffered reader around the file
+    db_file: BufReader<&'t mut File>,
+    finished: bool,
     /// The file position for the first read of `db_file`.
     start_pos: Option<u64>,
     types: PhantomData<T>,
@@ -26,8 +25,9 @@ pub struct EntryIter<'t, T> {
 impl<'t, T> EntryIter<'t, T> {
     pub fn new(start_pos: u64, db_file: &'t mut File) -> Self {
         Self {
-            db_file: Some(CountingReader::new(BufReader::new(db_file))),
+            db_file: BufReader::new(db_file),
             start_pos: Some(start_pos),
+            finished: false,
             types: PhantomData,
         }
     }
@@ -40,45 +40,34 @@ where
     type Item = Result<T, IterError>;
 
     fn next(&mut self) -> Option<Self::Item> {
-        // closure which reads a single entry starting from `self.pos`
-        let read_one =
-            |f: &mut EntryReader, start_pos: Option<u64>| -> Result<Option<T>, IterError> {
-                if let Some(pos) = start_pos {
-                    f.seek(io::SeekFrom::Start(pos))?;
-                }
-                match bincode_options().deserialize_from(&mut *f) {
-                    Ok(changeset) => {
-                        f.clear_count();
-                        Ok(Some(changeset))
-                    }
-                    Err(e) => {
-                        // allow unexpected EOF if 0 bytes were read
-                        if let bincode::ErrorKind::Io(inner) = &*e {
-                            if inner.kind() == io::ErrorKind::UnexpectedEof && f.count() == 0 {
-                                f.clear_count();
-                                return Ok(None);
-                            }
+        if self.finished {
+            return None;
+        }
+        (|| {
+            if let Some(start) = self.start_pos.take() {
+                self.db_file.seek(io::SeekFrom::Start(start))?;
+            }
+
+            let pos_before_read = self.db_file.stream_position()?;
+            match bincode_options().deserialize_from(&mut self.db_file) {
+                Ok(changeset) => Ok(Some(changeset)),
+                Err(e) => {
+                    self.finished = true;
+                    let pos_after_read = self.db_file.stream_position()?;
+                    // allow unexpected EOF if 0 bytes were read
+                    if let bincode::ErrorKind::Io(inner) = &*e {
+                        if inner.kind() == io::ErrorKind::UnexpectedEof
+                            && pos_after_read == pos_before_read
+                        {
+                            return Ok(None);
                         }
-                        f.rewind()?;
-                        Err(IterError::Bincode(*e))
                     }
+                    self.db_file.seek(io::SeekFrom::Start(pos_before_read))?;
+                    Err(IterError::Bincode(*e))
                 }
-            };
-        let result = read_one(self.db_file.as_mut()?, self.start_pos.take());
-        if result.is_err() {
-            self.db_file = None;
-        }
-        result.transpose()
-    }
-}
-
-impl<'t, T> Drop for EntryIter<'t, T> {
-    fn drop(&mut self) {
-        if let Some(r) = self.db_file.as_mut() {
-            // This syncs the underlying file's offset with the buffer's position. This way, no data
-            // is lost with future reads.
-            let _ = r.stream_position();
-        }
+            }
+        })()
+        .transpose()
     }
 }
 
@@ -107,51 +96,3 @@ impl From<io::Error> for IterError {
 }
 
 impl std::error::Error for IterError {}
-
-/// A wrapped [`Reader`] which counts total bytes read.
-struct CountingReader<R> {
-    r: R,
-    n: u64,
-}
-
-impl<R> CountingReader<R> {
-    fn new(file: R) -> Self {
-        Self { r: file, n: 0 }
-    }
-
-    /// Counted bytes read.
-    fn count(&self) -> u64 {
-        self.n
-    }
-
-    /// Clear read count.
-    fn clear_count(&mut self) {
-        self.n = 0;
-    }
-}
-
-impl<R: io::Seek> CountingReader<R> {
-    /// Rewind file descriptor offset to before all counted read operations. Then clear the read
-    /// count.
-    fn rewind(&mut self) -> io::Result<u64> {
-        let read = self.r.seek(std::io::SeekFrom::Current(-(self.n as i64)))?;
-        self.n = 0;
-        Ok(read)
-    }
-}
-
-impl<R: io::Read> io::Read for CountingReader<R> {
-    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
-        let read = self.r.read(&mut *buf)?;
-        self.n += read as u64;
-        Ok(read)
-    }
-}
-
-impl<R: io::Seek> io::Seek for CountingReader<R> {
-    fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
-        let res = self.r.seek(pos);
-        self.n = 0;
-        res
-    }
-}