diff --git a/examples/cat.rs b/examples/cat.rs index d41b7ab1..48c530a6 100644 --- a/examples/cat.rs +++ b/examples/cat.rs @@ -29,8 +29,7 @@ fn main() { loop { // Read a chunk - let (res, b) = file.read_at(buf, pos).await; - let n = res.unwrap(); + let (n, b) = file.read_at(buf, pos).await.unwrap(); if n == 0 { break; diff --git a/examples/mix.rs b/examples/mix.rs index 4e094019..49f1e9d8 100644 --- a/examples/mix.rs +++ b/examples/mix.rs @@ -34,15 +34,14 @@ fn main() { loop { // Read a chunk - let (res, b) = file.read_at(buf, pos).await; - let n = res.unwrap(); + let (n, b) = file.read_at(buf, pos).await.unwrap(); if n == 0 { break; } - let (res, b) = socket.write(b).submit().await; - pos += res.unwrap() as u64; + let (written, b) = socket.write(b).submit().await.unwrap(); + pos += written as u64; buf = b; } diff --git a/examples/tcp_listener.rs b/examples/tcp_listener.rs index 918503ca..45175f16 100644 --- a/examples/tcp_listener.rs +++ b/examples/tcp_listener.rs @@ -29,16 +29,14 @@ fn main() { let mut buf = vec![0u8; 4096]; loop { - let (result, nbuf) = stream.read(buf).await; + let (read, nbuf) = stream.read(buf).await.unwrap(); buf = nbuf; - let read = result.unwrap(); if read == 0 { println!("{} closed, {} total ping-ponged", socket_addr, n); break; } - let (res, slice) = stream.write_all(buf.slice(..read)).await; - let _ = res.unwrap(); + let (_, slice) = stream.write_all(buf.slice(..read)).await.unwrap(); buf = slice.into_inner(); println!("{} all {} bytes ping-ponged", socket_addr, read); n += read; diff --git a/examples/tcp_listener_fixed_buffers.rs b/examples/tcp_listener_fixed_buffers.rs index 69db2c8e..e49df810 100644 --- a/examples/tcp_listener_fixed_buffers.rs +++ b/examples/tcp_listener_fixed_buffers.rs @@ -79,17 +79,15 @@ async fn echo_handler( // Each time through the loop, use fbuf and then get it back for the next // iteration. - let (result, fbuf1) = stream.read_fixed(fbuf).await; + let (read, fbuf1) = stream.read_fixed(fbuf).await.unwrap(); fbuf = { - let read = result.unwrap(); if read == 0 { break; } assert_eq!(4096, fbuf1.len()); // To prove a point. - let (res, nslice) = stream.write_fixed_all(fbuf1.slice(..read)).await; + let (_, nslice) = stream.write_fixed_all(fbuf1.slice(..read)).await.unwrap(); - let _ = res.unwrap(); println!("peer {} all {} bytes ping-ponged", peer, read); n += read; diff --git a/examples/tcp_stream.rs b/examples/tcp_stream.rs index 4983ee4c..29768ab4 100644 --- a/examples/tcp_stream.rs +++ b/examples/tcp_stream.rs @@ -15,11 +15,10 @@ fn main() { let stream = TcpStream::connect(socket_addr).await.unwrap(); let buf = vec![1u8; 128]; - let (result, buf) = stream.write(buf).submit().await; - println!("written: {}", result.unwrap()); + let (written, buf) = stream.write(buf).submit().await.unwrap(); + println!("written: {}", written); - let (result, buf) = stream.read(buf).await; - let read = result.unwrap(); + let (read, buf) = stream.read(buf).await.unwrap(); println!("read: {:?}", &buf[..read]); }); } diff --git a/examples/udp_socket.rs b/examples/udp_socket.rs index 01eb9b32..2e049406 100644 --- a/examples/udp_socket.rs +++ b/examples/udp_socket.rs @@ -15,12 +15,11 @@ fn main() { let buf = vec![0u8; 128]; - let (result, mut buf) = socket.recv_from(buf).await; - let (read, socket_addr) = result.unwrap(); + let ((read, socket_addr), mut buf) = socket.recv_from(buf).await.unwrap(); buf.resize(read, 0); println!("received from {}: {:?}", socket_addr, &buf[..]); - let (result, _buf) = socket.send_to(buf, socket_addr).await; - println!("sent to {}: {}", socket_addr, result.unwrap()); + let (sent, _buf) = socket.send_to(buf, socket_addr).await.unwrap(); + println!("sent to {}: {}", socket_addr, sent); }); } diff --git a/examples/unix_listener.rs b/examples/unix_listener.rs index 9e10496d..0273e7b3 100644 --- a/examples/unix_listener.rs +++ b/examples/unix_listener.rs @@ -20,11 +20,10 @@ fn main() { tokio_uring::spawn(async move { let buf = vec![1u8; 128]; - let (result, buf) = stream.write(buf).submit().await; - println!("written to {}: {}", &socket_addr, result.unwrap()); + let (written, buf) = stream.write(buf).submit().await.unwrap(); + println!("written to {}: {}", &socket_addr, written); - let (result, buf) = stream.read(buf).await; - let read = result.unwrap(); + let (read, buf) = stream.read(buf).await.unwrap(); println!("read from {}: {:?}", &socket_addr, &buf[..read]); }); } diff --git a/examples/unix_stream.rs b/examples/unix_stream.rs index 7caf06f9..ef04577c 100644 --- a/examples/unix_stream.rs +++ b/examples/unix_stream.rs @@ -15,11 +15,10 @@ fn main() { let stream = UnixStream::connect(socket_addr).await.unwrap(); let buf = vec![1u8; 128]; - let (result, buf) = stream.write(buf).submit().await; - println!("written: {}", result.unwrap()); + let (written, buf) = stream.write(buf).submit().await.unwrap(); + println!("written: {}", written); - let (result, buf) = stream.read(buf).await; - let read = result.unwrap(); + let (read, buf) = stream.read(buf).await.unwrap(); println!("read: {:?}", &buf[..read]); }); } diff --git a/examples/wrk-bench.rs b/examples/wrk-bench.rs index 222df76a..cbe63828 100644 --- a/examples/wrk-bench.rs +++ b/examples/wrk-bench.rs @@ -21,10 +21,10 @@ fn main() -> io::Result<()> { let (stream, _) = listener.accept().await?; tokio_uring::spawn(async move { - let (result, _) = stream.write(RESPONSE).submit().await; + let result = stream.write(RESPONSE).submit().await; if let Err(err) = result { - eprintln!("Client connection failed: {}", err); + eprintln!("Client connection failed: {}", err.0); } }); } diff --git a/src/buf/slice.rs b/src/buf/slice.rs index 287ed322..bb0ffd30 100644 --- a/src/buf/slice.rs +++ b/src/buf/slice.rs @@ -23,6 +23,7 @@ use std::ops; /// /// assert_eq!(&slice[..], b"hello"); /// ``` +#[derive(Debug)] pub struct Slice { buf: T, begin: usize, diff --git a/src/fs/file.rs b/src/fs/file.rs index 9cd47f21..2a92381b 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -4,7 +4,7 @@ use crate::fs::OpenOptions; use crate::io::SharedFd; use crate::runtime::driver::op::Op; -use crate::{UnsubmittedOneshot, UnsubmittedWrite}; +use crate::{map_buf, BufError, UnsubmittedOneshot, UnsubmittedWrite}; use std::fmt; use std::io; use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; @@ -397,8 +397,9 @@ impl File { T: BoundedBufMut, { let orig_bounds = buf.bounds(); - let (res, buf) = self.read_exact_slice_at(buf.slice_full(), pos).await; - (res, T::from_buf_bounds(buf, orig_bounds)) + let buf_res = self.read_exact_slice_at(buf.slice_full(), pos).await; + // (res, T::from_buf_bounds(buf, orig_bounds)) + map_buf(buf_res, |buf| T::from_buf_bounds(buf, orig_bounds)) } async fn read_exact_slice_at( @@ -407,28 +408,22 @@ impl File { mut pos: u64, ) -> crate::BufResult<(), T> { if pos.checked_add(buf.bytes_total() as u64).is_none() { - return ( - Err(io::Error::new( - io::ErrorKind::InvalidInput, - "buffer too large for file", - )), + return Err(BufError( + io::Error::new(io::ErrorKind::InvalidInput, "buffer too large for file"), buf.into_inner(), - ); + )); } while buf.bytes_total() != 0 { - let (res, slice) = self.read_at(buf, pos).await; - match res { - Ok(0) => { - return ( - Err(io::Error::new( - io::ErrorKind::UnexpectedEof, - "failed to fill whole buffer", - )), + let buf_result = self.read_at(buf, pos).await; + match buf_result { + Ok((0, slice)) => { + return Err(BufError( + io::Error::new(io::ErrorKind::UnexpectedEof, "failed to fill whole buffer"), slice.into_inner(), - ) + )) } - Ok(n) => { + Ok((n, slice)) => { pos += n as u64; buf = slice.slice(n..); } @@ -437,11 +432,10 @@ impl File { // crate's design ensures we are not calling the 'wait' option // in the ENTER syscall. Only an Enter with 'wait' can generate // an EINTR according to the io_uring man pages. - Err(e) => return (Err(e), slice.into_inner()), + Err(e) => return Err(e.map_buf(|slice| slice.into_inner())), }; } - - (Ok(()), buf.into_inner()) + Ok(((), buf.into_inner())) } /// Like [`read_at`], but using a pre-mapped buffer @@ -589,8 +583,8 @@ impl File { T: BoundedBuf, { let orig_bounds = buf.bounds(); - let (res, buf) = self.write_all_slice_at(buf.slice_full(), pos).await; - (res, T::from_buf_bounds(buf, orig_bounds)) + let buf_result = self.write_all_slice_at(buf.slice_full(), pos).await; + map_buf(buf_result, |buf| T::from_buf_bounds(buf, orig_bounds)) } async fn write_all_slice_at( @@ -599,28 +593,22 @@ impl File { mut pos: u64, ) -> crate::BufResult<(), T> { if pos.checked_add(buf.bytes_init() as u64).is_none() { - return ( - Err(io::Error::new( - io::ErrorKind::InvalidInput, - "buffer too large for file", - )), + return Err(BufError( + io::Error::new(io::ErrorKind::InvalidInput, "buffer too large for file"), buf.into_inner(), - ); + )); } while buf.bytes_init() != 0 { - let (res, slice) = self.write_at(buf, pos).submit().await; - match res { - Ok(0) => { - return ( - Err(io::Error::new( - io::ErrorKind::WriteZero, - "failed to write whole buffer", - )), + let buf_result = self.write_at(buf, pos).submit().await; + match buf_result { + Ok((0, slice)) => { + return Err(BufError( + io::Error::new(io::ErrorKind::WriteZero, "failed to write whole buffer"), slice.into_inner(), - ) + )) } - Ok(n) => { + Ok((n, slice)) => { pos += n as u64; buf = slice.slice(n..); } @@ -629,11 +617,10 @@ impl File { // crate's design ensures we are not calling the 'wait' option // in the ENTER syscall. Only an Enter with 'wait' can generate // an EINTR according to the io_uring man pages. - Err(e) => return (Err(e), slice.into_inner()), + Err(e) => return Err(e.map_buf(|slice| slice.into_inner())), }; } - - (Ok(()), buf.into_inner()) + Ok(((), buf.into_inner())) } /// Like [`write_at`], but using a pre-mapped buffer @@ -709,8 +696,8 @@ impl File { T: BoundedBuf, { let orig_bounds = buf.bounds(); - let (res, buf) = self.write_fixed_all_at_slice(buf.slice_full(), pos).await; - (res, T::from_buf_bounds(buf, orig_bounds)) + let buf_result = self.write_fixed_all_at_slice(buf.slice_full(), pos).await; + map_buf(buf_result, |buf| T::from_buf_bounds(buf, orig_bounds)) } async fn write_fixed_all_at_slice( @@ -719,28 +706,22 @@ impl File { mut pos: u64, ) -> crate::BufResult<(), FixedBuf> { if pos.checked_add(buf.bytes_init() as u64).is_none() { - return ( - Err(io::Error::new( - io::ErrorKind::InvalidInput, - "buffer too large for file", - )), + return Err(BufError( + io::Error::new(io::ErrorKind::InvalidInput, "buffer too large for file"), buf.into_inner(), - ); + )); } while buf.bytes_init() != 0 { - let (res, slice) = self.write_fixed_at(buf, pos).await; - match res { - Ok(0) => { - return ( - Err(io::Error::new( - io::ErrorKind::WriteZero, - "failed to write whole buffer", - )), + let buf_result = self.write_fixed_at(buf, pos).await; + match buf_result { + Ok((0, slice)) => { + return Err(BufError( + io::Error::new(io::ErrorKind::WriteZero, "failed to write whole buffer"), slice.into_inner(), - ) + )) } - Ok(n) => { + Ok((n, slice)) => { pos += n as u64; buf = slice.slice(n..); } @@ -749,11 +730,10 @@ impl File { // crate's design ensures we are not calling the 'wait' option // in the ENTER syscall. Only an Enter with 'wait' can generate // an EINTR according to the io_uring man pages. - Err(e) => return (Err(e), slice.into_inner()), + Err(e) => return Err(e.map_buf(|slice| slice.into_inner())), }; } - - (Ok(()), buf.into_inner()) + Ok(((), buf.into_inner())) } /// Attempts to sync all OS-internal metadata to disk. diff --git a/src/io/read.rs b/src/io/read.rs index c3395b40..c3ff3a38 100644 --- a/src/io/read.rs +++ b/src/io/read.rs @@ -1,6 +1,6 @@ use crate::buf::BoundedBufMut; use crate::io::SharedFd; -use crate::BufResult; +use crate::{BufError, BufResult}; use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; @@ -59,6 +59,9 @@ where } } - (res, buf) + match res { + Ok(n) => Ok((n, buf)), + Err(e) => Err(BufError(e, buf)), + } } } diff --git a/src/io/read_fixed.rs b/src/io/read_fixed.rs index 3cb96cdb..e91b3079 100644 --- a/src/io/read_fixed.rs +++ b/src/io/read_fixed.rs @@ -2,7 +2,7 @@ use crate::buf::fixed::FixedBuf; use crate::buf::BoundedBufMut; use crate::io::SharedFd; use crate::runtime::driver::op::{self, Completable, Op}; -use crate::BufResult; +use crate::{BufError, BufResult}; use crate::runtime::CONTEXT; use std::io; @@ -68,6 +68,9 @@ where } } - (res, buf) + match res { + Ok(n) => Ok((n, buf)), + Err(e) => Err(BufError(e, buf)), + } } } diff --git a/src/io/readv.rs b/src/io/readv.rs index ff71dc79..4adfb4e4 100644 --- a/src/io/readv.rs +++ b/src/io/readv.rs @@ -1,5 +1,5 @@ use crate::buf::BoundedBufMut; -use crate::BufResult; +use crate::{BufError, BufResult}; use crate::io::SharedFd; use crate::runtime::driver::op::{Completable, CqeResult, Op}; @@ -87,6 +87,9 @@ where assert_eq!(count, 0); } - (res, bufs) + match res { + Ok(n) => Ok((n, bufs)), + Err(e) => Err(BufError(e, bufs)), + } } } diff --git a/src/io/recv_from.rs b/src/io/recv_from.rs index e9b360ca..54734593 100644 --- a/src/io/recv_from.rs +++ b/src/io/recv_from.rs @@ -1,5 +1,6 @@ use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; +use crate::BufError; use crate::{buf::BoundedBufMut, io::SharedFd, BufResult}; use socket2::SockAddr; use std::{ @@ -78,6 +79,9 @@ where (n, socket_addr) }); - (res, buf) + match res { + Ok(n) => Ok((n, buf)), + Err(e) => Err(BufError(e, buf)), + } } } diff --git a/src/io/recvmsg.rs b/src/io/recvmsg.rs index 3cae2e50..247b22e9 100644 --- a/src/io/recvmsg.rs +++ b/src/io/recvmsg.rs @@ -1,5 +1,6 @@ use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; +use crate::BufError; use crate::{buf::BoundedBufMut, io::SharedFd, BufResult}; use socket2::SockAddr; use std::{ @@ -92,6 +93,9 @@ where (n, socket_addr) }); - (res, bufs) + match res { + Ok(n) => Ok((n, bufs)), + Err(e) => Err(BufError(e, bufs)), + } } } diff --git a/src/io/send_to.rs b/src/io/send_to.rs index 8895f5fa..027421ef 100644 --- a/src/io/send_to.rs +++ b/src/io/send_to.rs @@ -2,7 +2,7 @@ use crate::buf::BoundedBuf; use crate::io::SharedFd; use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; -use crate::BufResult; +use crate::{BufError, BufResult}; use socket2::SockAddr; use std::io::IoSlice; use std::{boxed::Box, io, net::SocketAddr}; @@ -78,6 +78,9 @@ impl Completable for SendTo { // Recover the buffer let buf = self.buf; - (res, buf) + match res { + Ok(n) => Ok((n, buf)), + Err(e) => Err(BufError(e, buf)), + } } } diff --git a/src/io/send_zc.rs b/src/io/send_zc.rs index df37722b..5a73597b 100644 --- a/src/io/send_zc.rs +++ b/src/io/send_zc.rs @@ -1,5 +1,6 @@ use crate::runtime::driver::op::{Completable, CqeResult, MultiCQEFuture, Op, Updateable}; use crate::runtime::CONTEXT; +use crate::BufError; use crate::{buf::BoundedBuf, io::SharedFd, BufResult}; use std::io; @@ -46,7 +47,11 @@ impl Completable for SendZc { let res = cqe.result.map(|v| self.bytes + v as usize); // Recover the buffer let buf = self.buf; - (res, buf) + + match res { + Ok(n) => Ok((n, buf)), + Err(e) => Err(BufError(e, buf)), + } } } diff --git a/src/io/socket.rs b/src/io/socket.rs index dda1bb36..0f110820 100644 --- a/src/io/socket.rs +++ b/src/io/socket.rs @@ -6,6 +6,7 @@ use crate::{ io::SharedFd, UnsubmittedOneshot, }; +use crate::{map_buf, BufError}; use std::{ io, net::SocketAddr, @@ -49,24 +50,24 @@ impl Socket { pub async fn write_all(&self, buf: T) -> crate::BufResult<(), T> { let orig_bounds = buf.bounds(); - let (res, buf) = self.write_all_slice(buf.slice_full()).await; - (res, T::from_buf_bounds(buf, orig_bounds)) + let buf_result = self.write_all_slice(buf.slice_full()).await; + map_buf(buf_result, |buf| T::from_buf_bounds(buf, orig_bounds)) } async fn write_all_slice(&self, mut buf: Slice) -> crate::BufResult<(), T> { while buf.bytes_init() != 0 { - let res = self.write(buf).submit().await; - match res { - (Ok(0), slice) => { - return ( - Err(std::io::Error::new( + let buf_result = self.write(buf).submit().await; + match buf_result { + Ok((0, slice)) => { + return Err(BufError( + std::io::Error::new( std::io::ErrorKind::WriteZero, "failed to write whole buffer", - )), + ), slice.into_inner(), - ) + )) } - (Ok(n), slice) => { + Ok((n, slice)) => { buf = slice.slice(n..); } @@ -74,11 +75,10 @@ impl Socket { // crate's design ensures we are not calling the 'wait' option // in the ENTER syscall. Only an Enter with 'wait' can generate // an EINTR according to the io_uring man pages. - (Err(e), slice) => return (Err(e), slice.into_inner()), + Err(e) => return Err(e.map_buf(|slice| slice.into_inner())), } } - - (Ok(()), buf.into_inner()) + Ok(((), buf.into_inner())) } pub(crate) async fn write_fixed(&self, buf: T) -> crate::BufResult @@ -94,8 +94,8 @@ impl Socket { T: BoundedBuf, { let orig_bounds = buf.bounds(); - let (res, buf) = self.write_fixed_all_slice(buf.slice_full()).await; - (res, T::from_buf_bounds(buf, orig_bounds)) + let buf_result = self.write_fixed_all_slice(buf.slice_full()).await; + map_buf(buf_result, |buf| T::from_buf_bounds(buf, orig_bounds)) } async fn write_fixed_all_slice( @@ -103,18 +103,18 @@ impl Socket { mut buf: Slice, ) -> crate::BufResult<(), FixedBuf> { while buf.bytes_init() != 0 { - let res = self.write_fixed(buf).await; - match res { - (Ok(0), slice) => { - return ( - Err(std::io::Error::new( + let buf_result = self.write_fixed(buf).await; + match buf_result { + Ok((0, slice)) => { + return Err(BufError( + std::io::Error::new( std::io::ErrorKind::WriteZero, "failed to write whole buffer", - )), + ), slice.into_inner(), - ) + )) } - (Ok(n), slice) => { + Ok((n, slice)) => { buf = slice.slice(n..); } @@ -122,11 +122,10 @@ impl Socket { // crate's design ensures we are not calling the 'wait' option // in the ENTER syscall. Only an Enter with 'wait' can generate // an EINTR according to the io_uring man pages. - (Err(e), slice) => return (Err(e), slice.into_inner()), + Err(e) => return Err(e.map_buf(|slice| slice.into_inner())), } } - - (Ok(()), buf.into_inner()) + Ok(((), buf.into_inner())) } pub async fn writev(&self, buf: Vec) -> crate::BufResult> { diff --git a/src/io/write.rs b/src/io/write.rs index 6c607f75..1ed21ba3 100644 --- a/src/io/write.rs +++ b/src/io/write.rs @@ -1,3 +1,4 @@ +use crate::BufError; use crate::{buf::BoundedBuf, io::SharedFd, BufResult, OneshotOutputTransform, UnsubmittedOneshot}; use io_uring::cqueue::Entry; use std::io; @@ -31,7 +32,10 @@ impl OneshotOutputTransform for WriteTransform { Err(io::Error::from_raw_os_error(-cqe.result())) }; - (res, data.buf) + match res { + Ok(n) => Ok((n, data.buf)), + Err(e) => Err(BufError(e, data.buf)), + } } } diff --git a/src/io/write_fixed.rs b/src/io/write_fixed.rs index 1d2c3e38..226dbbcc 100644 --- a/src/io/write_fixed.rs +++ b/src/io/write_fixed.rs @@ -2,7 +2,7 @@ use crate::buf::fixed::FixedBuf; use crate::buf::BoundedBuf; use crate::io::SharedFd; use crate::runtime::driver::op::{self, Completable, Op}; -use crate::BufResult; +use crate::{BufError, BufResult}; use crate::runtime::CONTEXT; use std::io; @@ -56,6 +56,9 @@ impl Completable for WriteFixed { // Recover the buffer let buf = self.buf; - (res, buf) + match res { + Ok(n) => Ok((n, buf)), + Err(e) => Err(BufError(e, buf)), + } } } diff --git a/src/io/writev.rs b/src/io/writev.rs index 86236ebc..ca21001c 100644 --- a/src/io/writev.rs +++ b/src/io/writev.rs @@ -1,5 +1,6 @@ use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; +use crate::BufError; use crate::{buf::BoundedBuf, io::SharedFd, BufResult}; use libc::iovec; use std::io; @@ -66,6 +67,9 @@ where // Recover the buffer let buf = self.bufs; - (res, buf) + match res { + Ok(n) => Ok((n, buf)), + Err(e) => Err(BufError(e, buf)), + } } } diff --git a/src/io/writev_all.rs b/src/io/writev_all.rs index ef5b9d40..3a1e4c56 100644 --- a/src/io/writev_all.rs +++ b/src/io/writev_all.rs @@ -1,5 +1,6 @@ use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; +use crate::BufError; use crate::{buf::BoundedBuf, io::SharedFd}; use libc::iovec; use std::io; @@ -59,7 +60,7 @@ pub(crate) async fn writev_at_all( // On error, there is no indication how many bytes were written. This is standard. // The device doesn't tell us that either. - Err(e) => return (Err(e), bufs), + Err(e) => return Err(BufError(e, bufs)), }; // TODO if n is zero, while there was more data to be written, should this be interpreted @@ -101,7 +102,7 @@ pub(crate) async fn writev_at_all( break; } } - (Ok(total), bufs) + Ok((total, bufs)) } struct WritevAll { diff --git a/src/lib.rs b/src/lib.rs index d1cc6e02..957b5b25 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -84,6 +84,8 @@ pub use runtime::spawn; pub use runtime::Runtime; use crate::runtime::driver::op::Op; +use std::error::Error; +use std::fmt::{Debug, Display}; use std::future::Future; /// Starts an `io_uring` enabled Tokio runtime. @@ -237,8 +239,7 @@ impl Builder { /// /// This type is used as a return value for asynchronous `io-uring` methods that /// require passing ownership of a buffer to the runtime. When the operation -/// completes, the buffer is returned whether or not the operation completed -/// successfully. +/// completes, the buffer is returned both in the success tuple and as part of the error /// /// # Examples /// @@ -254,8 +255,7 @@ impl Builder { /// // Read some data, the buffer is passed by ownership and /// // submitted to the kernel. When the operation completes, /// // we get the buffer back. -/// let (res, buf) = file.read_at(buf, 0).await; -/// let n = res?; +/// let (n, buf) = file.read_at(buf, 0).await?; /// /// // Display the contents /// println!("{:?}", &buf[..n]); @@ -264,7 +264,44 @@ impl Builder { /// }) /// } /// ``` -pub type BufResult = (std::io::Result, B); +pub type BufResult = std::result::Result<(T, B), BufError>; + +/// A specialized `Error` type for `io-uring` operations with buffers. +#[derive(Debug)] +pub struct BufError(pub std::io::Error, pub B); + +impl Display for BufError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + Debug::fmt(&self.0, f) + } +} + +impl Error for BufError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + Some(&self.0) + } +} + +impl BufError { + /// Applies a function to the contained buffer, returning a new `BufError`. + pub fn map_buf(self, f: F) -> BufError + where + F: FnOnce(B) -> U, + { + BufError(self.0, f(self.1)) + } +} + +/// Applies a function to the contained buffer, returning a new `BufResult`. +pub fn map_buf(buf_result: BufResult, f: F) -> BufResult +where + F: FnOnce(B) -> U, +{ + match buf_result { + Ok((t, b)) => Ok((t, f(b))), + Err(e) => Err(e.map_buf(f)), + } +} /// The simplest possible operation. Just posts a completion event, nothing else. /// diff --git a/tests/driver.rs b/tests/driver.rs index f4381dd5..7923b51e 100644 --- a/tests/driver.rs +++ b/tests/driver.rs @@ -10,6 +10,7 @@ mod future; fn complete_ops_on_drop() { use std::sync::Arc; + #[derive(Debug)] struct MyBuf { data: Vec, _ref_cnt: Arc<()>, @@ -59,7 +60,6 @@ fn complete_ops_on_drop() { 25 * 1024 * 1024, ) .await - .0 .unwrap(); }) .await; @@ -86,7 +86,6 @@ fn too_many_submissions() { file.write_at(b"hello world".to_vec(), 0) .submit() .await - .0 .unwrap(); }) .await; diff --git a/tests/fixed_buf.rs b/tests/fixed_buf.rs index a5442217..3e0562e2 100644 --- a/tests/fixed_buf.rs +++ b/tests/fixed_buf.rs @@ -42,8 +42,7 @@ fn fixed_buf_turnaround() { // for another instance. assert!(buffers.check_out(0).is_none()); - let (res, buf) = op.await; - let n = res.unwrap(); + let (n, buf) = op.await.unwrap(); assert_eq!(n, HELLO.len()); // The buffer is owned by `buf`, can't check it out @@ -81,12 +80,11 @@ fn unregister_invalidates_checked_out_buffers() { // The old buffer's index no longer matches the memory area of the // currently registered buffer, so the read operation using the old // buffer's memory should fail. - let (res, _) = file.read_fixed_at(fixed_buf, 0).await; + let res = file.read_fixed_at(fixed_buf, 0).await; assert_err!(res); let fixed_buf = buffers.check_out(0).unwrap(); - let (res, buf) = file.read_fixed_at(fixed_buf, 0).await; - let n = res.unwrap(); + let (n, buf) = file.read_fixed_at(fixed_buf, 0).await.unwrap(); assert_eq!(n, HELLO.len()); assert_eq!(&buf[..], HELLO); }); @@ -112,18 +110,17 @@ fn slicing() { let fixed_buf = buffers.check_out(0).unwrap(); // Read no more than 8 bytes into the fixed buffer. - let (res, slice) = file.read_fixed_at(fixed_buf.slice(..8), 3).await; - let n = res.unwrap(); + let (n, slice) = file.read_fixed_at(fixed_buf.slice(..8), 3).await.unwrap(); assert_eq!(n, 8); assert_eq!(slice[..], HELLO[3..11]); let fixed_buf = slice.into_inner(); // Write from the fixed buffer, starting at offset 1, // up to the end of the initialized bytes in the buffer. - let (res, slice) = file + let (n, slice) = file .write_fixed_at(fixed_buf.slice(1..), HELLO.len() as u64) - .await; - let n = res.unwrap(); + .await + .unwrap(); assert_eq!(n, 7); assert_eq!(slice[..], HELLO[4..11]); let fixed_buf = slice.into_inner(); @@ -131,8 +128,7 @@ fn slicing() { // Read into the fixed buffer, overwriting bytes starting from offset 3 // and then extending the initialized part with as many bytes as // the operation can read. - let (res, slice) = file.read_fixed_at(fixed_buf.slice(3..), 0).await; - let n = res.unwrap(); + let (n, slice) = file.read_fixed_at(fixed_buf.slice(3..), 0).await.unwrap(); assert_eq!(n, HELLO.len() + 7); assert_eq!(slice[..HELLO.len()], HELLO[..]); assert_eq!(slice[HELLO.len()..], HELLO[4..11]); @@ -167,8 +163,10 @@ fn pool_next_as_concurrency_limit() { let file = File::from_std(cloned_file); let data = [b'0' + i as u8; BUF_SIZE]; buf.put_slice(&data); - let (res, buf) = file.write_fixed_all_at(buf, BUF_SIZE as u64 * i).await; - res.unwrap(); + let (_, buf) = file + .write_fixed_all_at(buf, BUF_SIZE as u64 * i) + .await + .unwrap(); println!("[worker {}]: dropping buffer {}", i, buf.buf_index()); }); diff --git a/tests/fs_file.rs b/tests/fs_file.rs index 6ec14d43..d5170024 100644 --- a/tests/fs_file.rs +++ b/tests/fs_file.rs @@ -19,8 +19,7 @@ const HELLO: &[u8] = b"hello world..."; async fn read_hello(file: &File) { let buf = Vec::with_capacity(1024); - let (res, buf) = file.read_at(buf, 0).await; - let n = res.unwrap(); + let (n, buf) = file.read_at(buf, 0).await.unwrap(); assert_eq!(n, HELLO.len()); assert_eq!(&buf[..n], HELLO); @@ -47,8 +46,7 @@ fn basic_read_exact() { tempfile.write_all(&data).unwrap(); let file = File::open(tempfile.path()).await.unwrap(); - let (res, buf) = file.read_exact_at(buf, 0).await; - res.unwrap(); + let (_, buf) = file.read_exact_at(buf, 0).await.unwrap(); assert_eq!(buf, data); }); } @@ -60,7 +58,7 @@ fn basic_write() { let file = File::create(tempfile.path()).await.unwrap(); - file.write_at(HELLO, 0).submit().await.0.unwrap(); + file.write_at(HELLO, 0).submit().await.unwrap(); let file = std::fs::read(tempfile.path()).unwrap(); assert_eq!(file, HELLO); @@ -75,8 +73,7 @@ fn vectored_read() { let file = File::open(tempfile.path()).await.unwrap(); let bufs = vec![Vec::::with_capacity(5), Vec::::with_capacity(9)]; - let (res, bufs) = file.readv_at(bufs, 0).await; - let n = res.unwrap(); + let (n, bufs) = file.readv_at(bufs, 0).await.unwrap(); assert_eq!(n, HELLO.len()); assert_eq!(bufs[1][0], b' '); @@ -93,7 +90,7 @@ fn vectored_write() { let buf2 = " world...".to_owned().into_bytes(); let bufs = vec![buf1, buf2]; - file.writev_at(bufs, 0).await.0.unwrap(); + file.writev_at(bufs, 0).await.unwrap(); let file = std::fs::read(tempfile.path()).unwrap(); assert_eq!(file, HELLO); @@ -108,8 +105,7 @@ fn basic_write_all() { let tempfile = tempfile(); let file = File::create(tempfile.path()).await.unwrap(); - let (ret, data) = file.write_all_at(data, 0).await; - ret.unwrap(); + let (_, data) = file.write_all_at(data, 0).await.unwrap(); let file = std::fs::read(tempfile.path()).unwrap(); assert_eq!(file, data); @@ -155,7 +151,7 @@ fn drop_open() { // Do something else let file = File::create(tempfile.path()).await.unwrap(); - file.write_at(HELLO, 0).submit().await.0.unwrap(); + file.write_at(HELLO, 0).submit().await.unwrap(); let file = std::fs::read(tempfile.path()).unwrap(); assert_eq!(file, HELLO); @@ -183,7 +179,7 @@ fn sync_doesnt_kill_anything() { let file = File::create(tempfile.path()).await.unwrap(); file.sync_all().await.unwrap(); file.sync_data().await.unwrap(); - file.write_at(&b"foo"[..], 0).submit().await.0.unwrap(); + file.write_at(&b"foo"[..], 0).submit().await.unwrap(); file.sync_all().await.unwrap(); file.sync_data().await.unwrap(); }); @@ -236,16 +232,14 @@ fn read_fixed() { let fixed_buf = buffers.check_out(0).unwrap(); assert_eq!(fixed_buf.bytes_total(), 6); - let (res, buf) = file.read_fixed_at(fixed_buf.slice(..), 0).await; - let n = res.unwrap(); + let (n, buf) = file.read_fixed_at(fixed_buf.slice(..), 0).await.unwrap(); assert_eq!(n, 6); assert_eq!(&buf[..], &HELLO[..6]); let fixed_buf = buffers.check_out(1).unwrap(); assert_eq!(fixed_buf.bytes_total(), 1024); - let (res, buf) = file.read_fixed_at(fixed_buf.slice(..), 6).await; - let n = res.unwrap(); + let (n, buf) = file.read_fixed_at(fixed_buf.slice(..), 6).await.unwrap(); assert_eq!(n, HELLO.len() - 6); assert_eq!(&buf[..], &HELLO[6..]); @@ -266,16 +260,14 @@ fn write_fixed() { let mut buf = fixed_buf; buf.put_slice(&HELLO[..6]); - let (res, _) = file.write_fixed_at(buf, 0).await; - let n = res.unwrap(); + let (n, _) = file.write_fixed_at(buf, 0).await.unwrap(); assert_eq!(n, 6); let fixed_buf = buffers.check_out(1).unwrap(); let mut buf = fixed_buf; buf.put_slice(&HELLO[6..]); - let (res, _) = file.write_fixed_at(buf, 6).await; - let n = res.unwrap(); + let (n, _) = file.write_fixed_at(buf, 6).await.unwrap(); assert_eq!(n, HELLO.len() - 6); let file = std::fs::read(tempfile.path()).unwrap();