From d182d1b26b7e7156cefc5588a01bdf22f6a23a5d Mon Sep 17 00:00:00 2001 From: Vitali Lovich Date: Fri, 29 Jan 2021 16:11:17 -0800 Subject: [PATCH] Add FileExt traits Addresses issue #576 to add pwrite/pread support to async_std for parity with std & tokio. --- Cargo.toml | 3 + src/fs/file.rs | 31 +++++++ src/os/unix/fs.rs | 194 ++++++++++++++++++++++++++++++++++++++++++- src/os/windows/fs.rs | 72 +++++++++++++++- 4 files changed, 298 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9d7ed23a0..a31a3b130 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ unstable = [ "std", "async-io", "async-process", + "file-ext", ] attributes = ["async-attributes"] std = [ @@ -61,6 +62,7 @@ alloc = [ tokio1 = ["async-global-executor/tokio"] tokio02 = ["async-global-executor/tokio02"] tokio03 = ["async-global-executor/tokio03"] +file-ext = ["async-trait"] [dependencies] async-attributes = { version = "1.1.1", optional = true } @@ -77,6 +79,7 @@ pin-project-lite = { version = "0.2.0", optional = true } pin-utils = { version = "0.1.0-alpha.4", optional = true } slab = { version = "0.4.2", optional = true } async-channel = { version = "1.5.1", optional = true } +async-trait = { version = "0.1.42", optional = true } # Devdepencency, but they are not allowed to be optional :/ surf = { version = "2.0.0", optional = true } diff --git a/src/fs/file.rs b/src/fs/file.rs index 157757e43..d32d4b547 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -938,4 +938,35 @@ mod tests { .unwrap(); }); } + + #[cfg(target_os = "windows")] + #[test] + fn async_file_win_positional_io() { + use super::os::windows::fs::FileExt; + + crate::task::block_on(async move { + let file = File::open(file!()).await.unwrap(); + assert_eq!(10u64, file.seek_write(&[5u8; 10], 10u64).await.unwrap()); + + let mut buf: [u8; 20]; + assert_eq!(20u64, file.seek_read(&buf, 0)).await.unwrap(); + assert_eq!(buf.iter(), [0u8; 10].iter().chain([5u8; 10].iter())); + }); + } + + #[cfg(target_os = "unix")] + #[test] + fn async_file_unix_positional_io() { + use super::os::unix::fs::FileExt; + + crate::task::block_on(async move { + let file = File::open(file!()).await.unwrap(); + assert_eq!(10u64, file.write_all_at(&[5u8; 10], 10u64).await.unwrap()); + + let mut buf: [u8; 20]; + assert_eq!(20u64, file.read_exact_at(&buf, 0)).await.unwrap(); + assert_eq!(buf.iter(), [0u8; 10].iter().chain([5u8; 10].iter())); + }); + + } } diff --git a/src/os/unix/fs.rs b/src/os/unix/fs.rs index 498b3a97f..3c1e22fab 100644 --- a/src/os/unix/fs.rs +++ b/src/os/unix/fs.rs @@ -30,7 +30,7 @@ pub async fn symlink, Q: AsRef>(src: P, dst: Q) -> io::Resu } cfg_not_docs! { - pub use std::os::unix::fs::{DirBuilderExt, DirEntryExt, OpenOptionsExt}; + pub use std::os::unix::fs::{DirBuilderExt, DirEntryExt, OpenOptionsExt, FileExt}; } cfg_docs! { @@ -68,4 +68,196 @@ cfg_docs! { /// This options overwrites any previously set custom flags. fn custom_flags(&mut self, flags: i32) -> &mut Self; } + + /// Unix-specific extensions to [`fs::File`]. + #[async_trait] + pub trait FileExt { + /// Reads a number of bytes starting from a given offset. + /// + /// Returns the number of bytes read. + /// + /// The offset is relative to the start of the file and thus independent + /// from the current cursor. + /// + /// The current file cursor is not affected by this function. + /// + /// Note that similar to [`File::read`], it is not an error to return with a + /// short read. + /// + /// [`File::read`]: fs::File::read + /// + /// # Examples + /// + /// ```no_run + /// use async_std::io; + /// use async_std::fs::File; + /// use async_std::os::unix::prelude::FileExt; + /// + /// async fn main() -> io::Result<()> { + /// let mut buf = [0u8; 8]; + /// let file = File::open("foo.txt").await?; + /// + /// // We now read 8 bytes from the offset 10. + /// let num_bytes_read = file.read_at(&mut buf, 10).await?; + /// println!("read {} bytes: {:?}", num_bytes_read, buf); + /// Ok(()) + /// } + /// ``` + async fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result; + + /// Reads the exact number of byte required to fill `buf` from the given offset. + /// + /// The offset is relative to the start of the file and thus independent + /// from the current cursor. + /// + /// The current file cursor is not affected by this function. + /// + /// Similar to [`io::Read::read_exact`] but uses [`read_at`] instead of `read`. + /// + /// [`read_at`]: FileExt::read_at + /// + /// # Errors + /// + /// If this function encounters an error of the kind + /// [`io::ErrorKind::Interrupted`] then the error is ignored and the operation + /// will continue. + /// + /// If this function encounters an "end of file" before completely filling + /// the buffer, it returns an error of the kind [`io::ErrorKind::UnexpectedEof`]. + /// The contents of `buf` are unspecified in this case. + /// + /// If any other read error is encountered then this function immediately + /// returns. The contents of `buf` are unspecified in this case. + /// + /// If this function returns an error, it is unspecified how many bytes it + /// has read, but it will never read more than would be necessary to + /// completely fill the buffer. + /// + /// # Examples + /// + /// ```no_run + /// use async_std::io; + /// use async_std::fs::File; + /// use async_std::os::unix::prelude::FileExt; + /// + /// async fn main() -> io::Result<()> { + /// let mut buf = [0u8; 8]; + /// let file = File::open("foo.txt").await?; + /// + /// // We now read exactly 8 bytes from the offset 10. + /// file.read_exact_at(&mut buf, 10).await?; + /// println!("read {} bytes: {:?}", buf.len(), buf); + /// Ok(()) + /// } + /// ``` + async fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> io::Result<()> { + while !buf.is_empty() { + match self.read_at(buf, offset).await { + Ok(0) => break, + Ok(n) => { + let tmp = buf; + buf = &mut tmp[n..]; + offset += n as u64; + } + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} + Err(e) => return Err(e), + } + } + if !buf.is_empty() { + Err(io::Error::new(io::ErrorKind::UnexpectedEof, "failed to fill whole buffer")) + } else { + Ok(()) + } + } + + /// Writes a number of bytes starting from a given offset. + /// + /// Returns the number of bytes written. + /// + /// The offset is relative to the start of the file and thus independent + /// from the current cursor. + /// + /// The current file cursor is not affected by this function. + /// + /// When writing beyond the end of the file, the file is appropriately + /// extended and the intermediate bytes are initialized with the value 0. + /// + /// Note that similar to [`File::write`], it is not an error to return a + /// short write. + /// + /// [`File::write`]: fs::File::write + /// + /// # Examples + /// + /// ```no_run + /// use async_std::fs::File; + /// use async_std::io; + /// use async_std::os::unix::prelude::FileExt; + /// + /// async fn main() -> io::Result<()> { + /// let file = File::open("foo.txt").await?; + /// + /// // We now write at the offset 10. + /// file.write_at(b"sushi", 10).await?; + /// Ok(()) + /// } + /// ``` + async fn write_at(&self, buf: &[u8], offset: u64) -> io::Result; + + /// Attempts to write an entire buffer starting from a given offset. + /// + /// The offset is relative to the start of the file and thus independent + /// from the current cursor. + /// + /// The current file cursor is not affected by this function. + /// + /// This method will continuously call [`write_at`] until there is no more data + /// to be written or an error of non-[`io::ErrorKind::Interrupted`] kind is + /// returned. This method will not return until the entire buffer has been + /// successfully written or such an error occurs. The first error that is + /// not of [`io::ErrorKind::Interrupted`] kind generated from this method will be + /// returned. + /// + /// # Errors + /// + /// This function will return the first error of + /// non-[`io::ErrorKind::Interrupted`] kind that [`write_at`] returns. + /// + /// [`write_at`]: FileExt::write_at + /// + /// # Examples + /// + /// ```no_run + /// use async_std::fs::File; + /// use async_std::io; + /// use async_std::os::unix::prelude::FileExt; + /// + /// async fn main() -> io::Result<()> { + /// let file = File::open("foo.txt").await?; + /// + /// // We now write at the offset 10. + /// file.write_all_at(b"sushi", 10).await?; + /// Ok(()) + /// } + /// ``` + async fn write_all_at(&self, mut buf: &[u8], mut offset: u64) -> io::Result<()> { + while !buf.is_empty() { + match self.write_at(buf, offset).await { + Ok(0) => { + return Err(io::Error::new( + io::ErrorKind::WriteZero, + "failed to write whole buffer", + )); + } + Ok(n) => { + buf = &buf[n..]; + offset += n as u64 + } + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} + Err(e) => return Err(e), + } + } + Ok(()) + } + } } diff --git a/src/os/windows/fs.rs b/src/os/windows/fs.rs index 2177d698d..2238b05c4 100644 --- a/src/os/windows/fs.rs +++ b/src/os/windows/fs.rs @@ -55,7 +55,7 @@ pub async fn symlink_file, Q: AsRef>(src: P, dst: Q) -> io: } cfg_not_docs! { - pub use std::os::windows::fs::{OpenOptionsExt}; + pub use std::os::windows::fs::{OpenOptionsExt, FileExt}; } cfg_docs! { @@ -227,4 +227,74 @@ cfg_docs! { #[stable(feature = "open_options_ext", since = "1.10.0")] fn security_qos_flags(&mut self, flags: u32) -> &mut Self; } + + /// Windows-specific extensions to [`fs::File`]. + #[async_trait] + pub trait FileExt { + /// Seeks to a given position and reads a number of bytes. + /// + /// Returns the number of bytes read. + /// + /// The offset is relative to the start of the file and thus independent + /// from the current cursor. The current cursor **is** affected by this + /// function, it is set to the end of the read. + /// + /// Reading beyond the end of the file will always return with a length of + /// 0\. + /// + /// Note that similar to `File::read`, it is not an error to return with a + /// short read. When returning from such a short read, the file pointer is + /// still updated. + /// + /// # Examples + /// + /// ```no_run + /// use async_std::io; + /// use async_std::fs::File; + /// use async_std::os::windows::prelude::*; + /// + /// fn main() -> io::Result<()> { + /// let mut file = File::open("foo.txt").await?; + /// let mut buffer = [0; 10]; + /// + /// // Read 10 bytes, starting 72 bytes from the + /// // start of the file. + /// file.seek_read(&mut buffer[..], 72).await?; + /// Ok(()) + /// } + /// ``` + async fn seek_read(&self, buf: &mut [u8], offset: u64) -> io::Result; + + /// Seeks to a given position and writes a number of bytes. + /// + /// Returns the number of bytes written. + /// + /// The offset is relative to the start of the file and thus independent + /// from the current cursor. The current cursor **is** affected by this + /// function, it is set to the end of the write. + /// + /// When writing beyond the end of the file, the file is appropriately + /// extended and the intermediate bytes are left uninitialized. + /// + /// Note that similar to `File::write`, it is not an error to return a + /// short write. When returning from such a short write, the file pointer + /// is still updated. + /// + /// # Examples + /// + /// ```no_run + /// use async_std::fs::File; + /// use async_std::os::windows::prelude::*; + /// + /// fn main() -> std::io::Result<()> { + /// let mut buffer = File::create("foo.txt").await?; + /// + /// // Write a byte string starting 72 bytes from + /// // the start of the file. + /// buffer.seek_write(b"some bytes", 72).await?; + /// Ok(()) + /// } + /// ``` + async fn seek_write(&self, buf: &[u8], offset: u64) -> io::Result; + } }