Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add method to acquire BufReader #50

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ async-std = { version = "1.12.0", features = ["attributes"] }
tokio = { version = "1.27.0", features = ["macros", "rt", "rt-multi-thread"] }
criterion = { version = "0.4", features = ["async_std", "async_tokio"] }
tokio-test = "0.4"
tokio-stream = { version="0.1.14", features = ["io-util"] }

[features]
default = ["sparse", "async-std"]
Expand Down
74 changes: 57 additions & 17 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ compile_error!("features `random-access-disk/async-std` and `random-access-disk/
use async_std::{
fs::{self, OpenOptions},
io::prelude::{SeekExt, WriteExt},
io::{ReadExt, SeekFrom},
io::{BufReader, ReadExt, SeekFrom},
};
use random_access_storage::{RandomAccess, RandomAccessError};
use std::ops::Drop;
Expand All @@ -120,7 +120,7 @@ use std::io::SeekFrom;
#[cfg(feature = "tokio")]
use tokio::{
fs::{self, OpenOptions},
io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader},
};

#[cfg(all(
Expand Down Expand Up @@ -178,7 +178,7 @@ use default::{get_length_and_block_size, set_sparse, trim};
pub struct RandomAccessDisk {
#[allow(dead_code)]
filename: path::PathBuf,
file: Option<fs::File>,
file: fs::File,
length: u64,
block_size: u64,
auto_sync: bool,
Expand All @@ -197,6 +197,48 @@ impl RandomAccessDisk {
pub fn builder(filename: impl AsRef<path::Path>) -> Builder {
Builder::new(filename)
}

/// Acquire buffered reader.
/// # Examples
/// Read each lines to delete at specific line/column
///
/// ```no_run
/// # use random_access_disk::RandomAccessDisk;
/// # use random_access_storage::RandomAccess;
/// #
/// # #[cfg(feature = "async-std")]
/// # use async_std::{io::prelude::BufReadExt, stream::StreamExt};
/// #
/// # #[cfg(feature = "tokio")]
/// # use tokio::io::{AsyncBufReadExt};
/// # #[cfg(feature = "tokio")]
/// # use tokio_stream::{StreamExt, wrappers::LinesStream};
/// #
/// # #[async_std::main]
/// # async fn main() {
/// let line = 10;
/// let column = 10;
/// let length = 5;
///
/// let mut file = RandomAccessDisk::open("text.db").await.unwrap();
///
/// #[cfg(feature = "tokio")]
/// let lines = LinesStream::new(file.reader().lines());
///
/// #[cfg(not(feature = "tokio"))]
/// let lines = file.reader().lines();
///
/// let offset = column + lines
/// .take(line - 1)
/// .fold(column, |acc, line| acc + line.unwrap().len() as u64)
/// .await;
///
/// let _ = file.del(offset, length).await;
/// # }
/// ```
pub fn reader(&mut self) -> BufReader<&mut fs::File> {
BufReader::new(&mut self.file) // currently BufReader<&fs::File> didn't work on tokio
}
}

#[async_trait::async_trait]
Expand All @@ -206,7 +248,7 @@ impl RandomAccess for RandomAccessDisk {
offset: u64,
data: &[u8],
) -> Result<(), RandomAccessError> {
let file = self.file.as_mut().expect("self.file was None.");
let ref mut file = self.file;
file.seek(SeekFrom::Start(offset)).await?;
file.write_all(data).await?;
if self.auto_sync {
Expand Down Expand Up @@ -243,7 +285,7 @@ impl RandomAccess for RandomAccessDisk {
});
}

let file = self.file.as_mut().expect("self.file was None.");
let ref mut file = self.file;
let mut buffer = vec![0; length as usize];
file.seek(SeekFrom::Start(offset)).await?;
let _bytes_read = file.read(&mut buffer[..]).await?;
Expand Down Expand Up @@ -273,7 +315,7 @@ impl RandomAccess for RandomAccessDisk {
return self.truncate(offset).await;
}

let file = self.file.as_mut().expect("self.file was None.");
let ref mut file = self.file;
trim(file, offset, length, self.block_size).await?;
if self.auto_sync {
file.sync_all().await?;
Expand All @@ -282,7 +324,7 @@ impl RandomAccess for RandomAccessDisk {
}

async fn truncate(&mut self, length: u64) -> Result<(), RandomAccessError> {
let file = self.file.as_ref().expect("self.file was None.");
let ref file = self.file;
self.length = length;
file.set_len(self.length).await?;
if self.auto_sync {
Expand All @@ -301,7 +343,7 @@ impl RandomAccess for RandomAccessDisk {

async fn sync_all(&mut self) -> Result<(), RandomAccessError> {
if !self.auto_sync {
let file = self.file.as_ref().expect("self.file was None.");
let ref file = self.file;
file.sync_all().await?;
}
Ok(())
Expand All @@ -310,15 +352,15 @@ impl RandomAccess for RandomAccessDisk {

impl Drop for RandomAccessDisk {
fn drop(&mut self) {
#[cfg(feature = "async-std")]
let ref file = self.file;
// We need to flush the file on drop. Unfortunately, that is not possible to do in a
// non-blocking fashion, but our only other option here is losing data remaining in the
// write cache. Good task schedulers should be resilient to occasional blocking hiccups in
// file destructors so we don't expect this to be a common problem in practice.
// (from async_std::fs::File::drop)
#[cfg(feature = "async-std")]
if let Some(file) = &self.file {
let _ = async_std::task::block_on(file.sync_all());
}
let _ = async_std::task::block_on(file.sync_all());
// For tokio, the below errors with:
//
// "Cannot start a runtime from within a runtime. This happens because a function (like
Expand All @@ -329,11 +371,9 @@ impl Drop for RandomAccessDisk {
// in a sync drop(), so for tokio, we'll need to wait for a real AsyncDrop to arrive.
//
// #[cfg(feature = "tokio")]
// if let Some(file) = &self.file {
// tokio::runtime::Handle::current()
// .block_on(file.sync_all())
// .expect("Could not sync file changes on drop.");
// }
// tokio::runtime::Handle::current()
// .block_on(file.sync_all())
// .expect("Could not sync file changes on drop.");
}
}

Expand Down Expand Up @@ -379,7 +419,7 @@ impl Builder {
let (length, block_size) = get_length_and_block_size(&file).await?;
Ok(RandomAccessDisk {
filename: self.filename,
file: Some(file),
file,
length,
auto_sync: self.auto_sync,
block_size,
Expand Down
Loading