diff --git a/Cargo.toml b/Cargo.toml index 9d8e1bf..e660b9e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ async-std = { version = "1.12.0", features = ["attributes"] } tokio = { version = "1.27.0", features = ["macros", "rt", "rt-multi-thread"] } criterion = { version = "0.4", features = ["async_std", "async_tokio"] } tokio-test = "0.4" +tokio-stream = { version="0.1.14", features = ["io-util"] } [features] default = ["sparse", "async-std"] diff --git a/src/lib.rs b/src/lib.rs index 7c978c1..ae25ef0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -109,7 +109,7 @@ compile_error!("features `random-access-disk/async-std` and `random-access-disk/ use async_std::{ fs::{self, OpenOptions}, io::prelude::{SeekExt, WriteExt}, - io::{ReadExt, SeekFrom}, + io::{BufReader, ReadExt, SeekFrom}, }; use random_access_storage::{RandomAccess, RandomAccessError}; use std::ops::Drop; @@ -120,7 +120,7 @@ use std::io::SeekFrom; #[cfg(feature = "tokio")] use tokio::{ fs::{self, OpenOptions}, - io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}, + io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader}, }; #[cfg(all( @@ -178,7 +178,7 @@ use default::{get_length_and_block_size, set_sparse, trim}; pub struct RandomAccessDisk { #[allow(dead_code)] filename: path::PathBuf, - file: Option, + file: fs::File, length: u64, block_size: u64, auto_sync: bool, @@ -197,6 +197,48 @@ impl RandomAccessDisk { pub fn builder(filename: impl AsRef) -> Builder { Builder::new(filename) } + + /// Acquire buffered reader. + /// # Examples + /// Read each lines to delete at specific line/column + /// + /// ```no_run + /// # use random_access_disk::RandomAccessDisk; + /// # use random_access_storage::RandomAccess; + /// # + /// # #[cfg(feature = "async-std")] + /// # use async_std::{io::prelude::BufReadExt, stream::StreamExt}; + /// # + /// # #[cfg(feature = "tokio")] + /// # use tokio::io::{AsyncBufReadExt}; + /// # #[cfg(feature = "tokio")] + /// # use tokio_stream::{StreamExt, wrappers::LinesStream}; + /// # + /// # #[async_std::main] + /// # async fn main() { + /// let line = 10; + /// let column = 10; + /// let length = 5; + /// + /// let mut file = RandomAccessDisk::open("text.db").await.unwrap(); + /// + /// #[cfg(feature = "tokio")] + /// let lines = LinesStream::new(file.reader().lines()); + /// + /// #[cfg(not(feature = "tokio"))] + /// let lines = file.reader().lines(); + /// + /// let offset = column + lines + /// .take(line - 1) + /// .fold(column, |acc, line| acc + line.unwrap().len() as u64) + /// .await; + /// + /// let _ = file.del(offset, length).await; + /// # } + /// ``` + pub fn reader(&mut self) -> BufReader<&mut fs::File> { + BufReader::new(&mut self.file) // currently BufReader<&fs::File> didn't work on tokio + } } #[async_trait::async_trait] @@ -206,7 +248,7 @@ impl RandomAccess for RandomAccessDisk { offset: u64, data: &[u8], ) -> Result<(), RandomAccessError> { - let file = self.file.as_mut().expect("self.file was None."); + let ref mut file = self.file; file.seek(SeekFrom::Start(offset)).await?; file.write_all(data).await?; if self.auto_sync { @@ -243,7 +285,7 @@ impl RandomAccess for RandomAccessDisk { }); } - let file = self.file.as_mut().expect("self.file was None."); + let ref mut file = self.file; let mut buffer = vec![0; length as usize]; file.seek(SeekFrom::Start(offset)).await?; let _bytes_read = file.read(&mut buffer[..]).await?; @@ -273,7 +315,7 @@ impl RandomAccess for RandomAccessDisk { return self.truncate(offset).await; } - let file = self.file.as_mut().expect("self.file was None."); + let ref mut file = self.file; trim(file, offset, length, self.block_size).await?; if self.auto_sync { file.sync_all().await?; @@ -282,7 +324,7 @@ impl RandomAccess for RandomAccessDisk { } async fn truncate(&mut self, length: u64) -> Result<(), RandomAccessError> { - let file = self.file.as_ref().expect("self.file was None."); + let ref file = self.file; self.length = length; file.set_len(self.length).await?; if self.auto_sync { @@ -301,7 +343,7 @@ impl RandomAccess for RandomAccessDisk { async fn sync_all(&mut self) -> Result<(), RandomAccessError> { if !self.auto_sync { - let file = self.file.as_ref().expect("self.file was None."); + let ref file = self.file; file.sync_all().await?; } Ok(()) @@ -310,15 +352,15 @@ impl RandomAccess for RandomAccessDisk { impl Drop for RandomAccessDisk { fn drop(&mut self) { + #[cfg(feature = "async-std")] + let ref file = self.file; // We need to flush the file on drop. Unfortunately, that is not possible to do in a // non-blocking fashion, but our only other option here is losing data remaining in the // write cache. Good task schedulers should be resilient to occasional blocking hiccups in // file destructors so we don't expect this to be a common problem in practice. // (from async_std::fs::File::drop) #[cfg(feature = "async-std")] - if let Some(file) = &self.file { - let _ = async_std::task::block_on(file.sync_all()); - } + let _ = async_std::task::block_on(file.sync_all()); // For tokio, the below errors with: // // "Cannot start a runtime from within a runtime. This happens because a function (like @@ -329,11 +371,9 @@ impl Drop for RandomAccessDisk { // in a sync drop(), so for tokio, we'll need to wait for a real AsyncDrop to arrive. // // #[cfg(feature = "tokio")] - // if let Some(file) = &self.file { - // tokio::runtime::Handle::current() - // .block_on(file.sync_all()) - // .expect("Could not sync file changes on drop."); - // } + // tokio::runtime::Handle::current() + // .block_on(file.sync_all()) + // .expect("Could not sync file changes on drop."); } } @@ -379,7 +419,7 @@ impl Builder { let (length, block_size) = get_length_and_block_size(&file).await?; Ok(RandomAccessDisk { filename: self.filename, - file: Some(file), + file, length, auto_sync: self.auto_sync, block_size,