Skip to content

Commit

Permalink
Add FileExt traits
Browse files Browse the repository at this point in the history
Addresses issue #576 to add pwrite/pread support to async_std for
parity with std & tokio.
  • Loading branch information
vlovich committed Feb 2, 2021
1 parent 5ee022b commit 820cc81
Show file tree
Hide file tree
Showing 4 changed files with 301 additions and 2 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ std = [
"futures-channel",
"async-channel",
"async-lock",
"async-trait",
]
alloc = [
"futures-core/alloc",
Expand All @@ -77,6 +78,7 @@ pin-project-lite = { version = "0.2.0", optional = true }
pin-utils = { version = "0.1.0-alpha.4", optional = true }
slab = { version = "0.4.2", optional = true }
async-channel = { version = "1.5.1", optional = true }
async-trait = { version = "0.1.42", optional = true }

# Devdepencency, but they are not allowed to be optional :/
surf = { version = "2.0.0", optional = true }
Expand Down
31 changes: 31 additions & 0 deletions src/fs/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -938,4 +938,35 @@ mod tests {
.unwrap();
});
}

#[cfg(target_os = "windows")]
#[test]
fn async_file_win_positional_io() {
use super::os::windows::fs::FileExt;

crate::task::block_on(async move {
let file = File::open(file!()).await.unwrap();
assert_eq!(10u64, file.seek_write(&[5u8; 10], 10u64).await.unwrap());

let mut buf: [u8; 20];
assert_eq!(20u64, file.seek_read(&buf, 0)).await.unwrap();
assert_eq!(buf.iter(), [0u8; 10].iter().chain([5u8; 10].iter()));
});
}

#[cfg(target_os = "unix")]
#[test]
fn async_file_unix_positional_io() {
use super::os::unix::fs::FileExt;

crate::task::block_on(async move {
let file = File::open(file!()).await.unwrap();
assert_eq!(10u64, file.write_all_at(&[5u8; 10], 10u64).await.unwrap());

let mut buf: [u8; 20];
assert_eq!(20u64, file.read_exact_at(&buf, 0)).await.unwrap();
assert_eq!(buf.iter(), [0u8; 10].iter().chain([5u8; 10].iter()));
});

}
}
196 changes: 195 additions & 1 deletion src/os/unix/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ pub async fn symlink<P: AsRef<Path>, Q: AsRef<Path>>(src: P, dst: Q) -> io::Resu
}

cfg_not_docs! {
pub use std::os::unix::fs::{DirBuilderExt, DirEntryExt, OpenOptionsExt};
pub use std::os::unix::fs::{DirBuilderExt, DirEntryExt, OpenOptionsExt, FileExt};
}

cfg_docs! {
use async_trait::async_trait;

/// Unix-specific extensions to `DirBuilder`.
pub trait DirBuilderExt {
/// Sets the mode to create new directories with. This option defaults to
Expand Down Expand Up @@ -68,4 +70,196 @@ cfg_docs! {
/// This options overwrites any previously set custom flags.
fn custom_flags(&mut self, flags: i32) -> &mut Self;
}

/// Unix-specific extensions to [`fs::File`].
#[async_trait]
pub trait FileExt {
/// Reads a number of bytes starting from a given offset.
///
/// Returns the number of bytes read.
///
/// The offset is relative to the start of the file and thus independent
/// from the current cursor.
///
/// The current file cursor is not affected by this function.
///
/// Note that similar to [`File::read`], it is not an error to return with a
/// short read.
///
/// [`File::read`]: fs::File::read
///
/// # Examples
///
/// ```no_run
/// use async_std::io;
/// use async_std::fs::File;
/// use async_std::os::unix::prelude::FileExt;
///
/// async fn main() -> io::Result<()> {
/// let mut buf = [0u8; 8];
/// let file = File::open("foo.txt").await?;
///
/// // We now read 8 bytes from the offset 10.
/// let num_bytes_read = file.read_at(&mut buf, 10).await?;
/// println!("read {} bytes: {:?}", num_bytes_read, buf);
/// Ok(())
/// }
/// ```
async fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<usize>;

/// Reads the exact number of byte required to fill `buf` from the given offset.
///
/// The offset is relative to the start of the file and thus independent
/// from the current cursor.
///
/// The current file cursor is not affected by this function.
///
/// Similar to [`io::Read::read_exact`] but uses [`read_at`] instead of `read`.
///
/// [`read_at`]: FileExt::read_at
///
/// # Errors
///
/// If this function encounters an error of the kind
/// [`io::ErrorKind::Interrupted`] then the error is ignored and the operation
/// will continue.
///
/// If this function encounters an "end of file" before completely filling
/// the buffer, it returns an error of the kind [`io::ErrorKind::UnexpectedEof`].
/// The contents of `buf` are unspecified in this case.
///
/// If any other read error is encountered then this function immediately
/// returns. The contents of `buf` are unspecified in this case.
///
/// If this function returns an error, it is unspecified how many bytes it
/// has read, but it will never read more than would be necessary to
/// completely fill the buffer.
///
/// # Examples
///
/// ```no_run
/// use async_std::io;
/// use async_std::fs::File;
/// use async_std::os::unix::prelude::FileExt;
///
/// async fn main() -> io::Result<()> {
/// let mut buf = [0u8; 8];
/// let file = File::open("foo.txt").await?;
///
/// // We now read exactly 8 bytes from the offset 10.
/// file.read_exact_at(&mut buf, 10).await?;
/// println!("read {} bytes: {:?}", buf.len(), buf);
/// Ok(())
/// }
/// ```
async fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> io::Result<()> {
while !buf.is_empty() {
match self.read_at(buf, offset).await {
Ok(0) => break,
Ok(n) => {
let tmp = buf;
buf = &mut tmp[n..];
offset += n as u64;
}
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
}
if !buf.is_empty() {
Err(io::Error::new(io::ErrorKind::UnexpectedEof, "failed to fill whole buffer"))
} else {
Ok(())
}
}

/// Writes a number of bytes starting from a given offset.
///
/// Returns the number of bytes written.
///
/// The offset is relative to the start of the file and thus independent
/// from the current cursor.
///
/// The current file cursor is not affected by this function.
///
/// When writing beyond the end of the file, the file is appropriately
/// extended and the intermediate bytes are initialized with the value 0.
///
/// Note that similar to [`File::write`], it is not an error to return a
/// short write.
///
/// [`File::write`]: fs::File::write
///
/// # Examples
///
/// ```no_run
/// use async_std::fs::File;
/// use async_std::io;
/// use async_std::os::unix::prelude::FileExt;
///
/// async fn main() -> io::Result<()> {
/// let file = File::open("foo.txt").await?;
///
/// // We now write at the offset 10.
/// file.write_at(b"sushi", 10).await?;
/// Ok(())
/// }
/// ```
async fn write_at(&self, buf: &[u8], offset: u64) -> io::Result<usize>;

/// Attempts to write an entire buffer starting from a given offset.
///
/// The offset is relative to the start of the file and thus independent
/// from the current cursor.
///
/// The current file cursor is not affected by this function.
///
/// This method will continuously call [`write_at`] until there is no more data
/// to be written or an error of non-[`io::ErrorKind::Interrupted`] kind is
/// returned. This method will not return until the entire buffer has been
/// successfully written or such an error occurs. The first error that is
/// not of [`io::ErrorKind::Interrupted`] kind generated from this method will be
/// returned.
///
/// # Errors
///
/// This function will return the first error of
/// non-[`io::ErrorKind::Interrupted`] kind that [`write_at`] returns.
///
/// [`write_at`]: FileExt::write_at
///
/// # Examples
///
/// ```no_run
/// use async_std::fs::File;
/// use async_std::io;
/// use async_std::os::unix::prelude::FileExt;
///
/// async fn main() -> io::Result<()> {
/// let file = File::open("foo.txt").await?;
///
/// // We now write at the offset 10.
/// file.write_all_at(b"sushi", 10).await?;
/// Ok(())
/// }
/// ```
async fn write_all_at(&self, mut buf: &[u8], mut offset: u64) -> io::Result<()> {
while !buf.is_empty() {
match self.write_at(buf, offset).await {
Ok(0) => {
return Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write whole buffer",
));
}
Ok(n) => {
buf = &buf[n..];
offset += n as u64
}
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
}
Ok(())
}
}
}
74 changes: 73 additions & 1 deletion src/os/windows/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,12 @@ pub async fn symlink_file<P: AsRef<Path>, Q: AsRef<Path>>(src: P, dst: Q) -> io:
}

cfg_not_docs! {
pub use std::os::windows::fs::{OpenOptionsExt};
pub use std::os::windows::fs::{OpenOptionsExt, FileExt};
}

cfg_docs! {
use async_trait::async_trait;

/// Windows-specific extensions to `OpenOptions`.
pub trait OpenOptionsExt {
/// Overrides the `dwDesiredAccess` argument to the call to [`CreateFile`]
Expand Down Expand Up @@ -223,4 +225,74 @@ cfg_docs! {
/// https://docs.microsoft.com/en-us/windows/win32/api/winnt/ne-winnt-security_impersonation_level
fn security_qos_flags(&mut self, flags: u32) -> &mut Self;
}

/// Windows-specific extensions to [`fs::File`].
#[async_trait]
pub trait FileExt {
/// Seeks to a given position and reads a number of bytes.
///
/// Returns the number of bytes read.
///
/// The offset is relative to the start of the file and thus independent
/// from the current cursor. The current cursor **is** affected by this
/// function, it is set to the end of the read.
///
/// Reading beyond the end of the file will always return with a length of
/// 0\.
///
/// Note that similar to `File::read`, it is not an error to return with a
/// short read. When returning from such a short read, the file pointer is
/// still updated.
///
/// # Examples
///
/// ```no_run
/// use async_std::io;
/// use async_std::fs::File;
/// use async_std::os::windows::prelude::*;
///
/// fn main() -> io::Result<()> {
/// let mut file = File::open("foo.txt").await?;
/// let mut buffer = [0; 10];
///
/// // Read 10 bytes, starting 72 bytes from the
/// // start of the file.
/// file.seek_read(&mut buffer[..], 72).await?;
/// Ok(())
/// }
/// ```
async fn seek_read(&self, buf: &mut [u8], offset: u64) -> io::Result<usize>;

/// Seeks to a given position and writes a number of bytes.
///
/// Returns the number of bytes written.
///
/// The offset is relative to the start of the file and thus independent
/// from the current cursor. The current cursor **is** affected by this
/// function, it is set to the end of the write.
///
/// When writing beyond the end of the file, the file is appropriately
/// extended and the intermediate bytes are left uninitialized.
///
/// Note that similar to `File::write`, it is not an error to return a
/// short write. When returning from such a short write, the file pointer
/// is still updated.
///
/// # Examples
///
/// ```no_run
/// use async_std::fs::File;
/// use async_std::os::windows::prelude::*;
///
/// fn main() -> std::io::Result<()> {
/// let mut buffer = File::create("foo.txt").await?;
///
/// // Write a byte string starting 72 bytes from
/// // the start of the file.
/// buffer.seek_write(b"some bytes", 72).await?;
/// Ok(())
/// }
/// ```
async fn seek_write(&self, buf: &[u8], offset: u64) -> io::Result<usize>;
}
}

0 comments on commit 820cc81

Please sign in to comment.