From b2b333bba88ad750c350b82aa436c186506fdd48 Mon Sep 17 00:00:00 2001 From: SteveLauC Date: Thu, 24 Oct 2024 16:50:17 +0800 Subject: [PATCH] fix: drop mutex guard when DuplexStream I/O op is pending (#8) * fix: drop mutex guard when DuplexStream I/O op is pending * chore: remove a println from test --- Cargo.toml | 2 +- src/duplex.rs | 117 ++++++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 100 insertions(+), 19 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2434ef5..4994de8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,9 +7,9 @@ license = "MIT" repository = "https://github.com/SteveLauC/monoio-duplex" [dependencies] -async-lock = "3.4.0" bytes = "1.7.2" monoio = "0.2.4" futures = "0.3.31" [dev-dependencies] +tokio = { version = "1.41.0", features = ["sync"] } diff --git a/src/duplex.rs b/src/duplex.rs index 5f682cb..9e164c3 100644 --- a/src/duplex.rs +++ b/src/duplex.rs @@ -2,10 +2,11 @@ use super::simplex::SimplexStream; -use async_lock::Mutex; +use futures::FutureExt; use monoio::io::AsyncReadRent; use monoio::io::AsyncWriteRent; use std::rc::Rc; +use std::sync::Mutex; /// Create a new pair of `DuplexStream`s that act like a pair of connected sockets. /// @@ -78,48 +79,90 @@ pub struct DuplexStream { impl Drop for DuplexStream { fn drop(&mut self) { - futures::executor::block_on(async { - // notify the other side of the closure - self.write.lock().await.close_write(); - self.read.lock().await.close_read(); - }) + // notify the other side of the closure + self.write.lock().unwrap().close_write(); + self.read.lock().unwrap().close_read(); } } +/// A helper macro to `.await` the I/O function, used in our I/O traits +/// implementations. +/// +/// Different from a plain `function().await`, it drops the lock guard if future +/// `function()` is pending. +macro_rules! await_io_future { + // For functions that do not have arugments: flush/shutdown + ($trait:ident, $function:ident, $guard:expr) => {{ + let opt_read_ready = ::$function(&mut *$guard).now_or_never(); + + match opt_read_ready { + Some(result) => result, + None => { + // drop the Mutex guard or it could deadlock + // https://github.com/SteveLauC/monoio-duplex/issues/7 + drop($guard); + std::future::pending().await + } + } + }}; + + // For functions with a `buf` arugment: read/readv/write/writev + ($trait:ident, $function:ident, $guard:expr, $buf:expr) => {{ + let opt_read_ready = + ::$function(&mut *$guard, $buf).now_or_never(); + + match opt_read_ready { + Some(result) => result, + None => { + // drop the Mutex guard or it could deadlock + // https://github.com/SteveLauC/monoio-duplex/issues/7 + drop($guard); + std::future::pending().await + } + } + }}; +} + impl AsyncReadRent for DuplexStream { + #[allow(clippy::await_holding_lock)] // false-positive, we explicitly dropped the guard before the await point. async fn read(&mut self, buf: T) -> monoio::BufResult { - let mut read_simplex = self.read.lock().await; - ::read(&mut *read_simplex, buf).await + let mut read_simplex = self.read.lock().unwrap(); + await_io_future!(AsyncReadRent, read, read_simplex, buf) } + #[allow(clippy::await_holding_lock)] // false-positive, we explicitly dropped the guard before the await point. async fn readv(&mut self, buf: T) -> monoio::BufResult { - let mut read_simplex = self.read.lock().await; - ::readv(&mut *read_simplex, buf).await + let mut read_simplex = self.read.lock().unwrap(); + await_io_future!(AsyncReadRent, readv, read_simplex, buf) } } impl AsyncWriteRent for DuplexStream { + #[allow(clippy::await_holding_lock)] // false-positive, we explicitly dropped the guard before the await point. async fn write(&mut self, buf: T) -> monoio::BufResult { - let mut write_simplex = self.write.lock().await; - ::write(&mut *write_simplex, buf).await + let mut write_simplex = self.write.lock().unwrap(); + await_io_future!(AsyncWriteRent, write, write_simplex, buf) } + #[allow(clippy::await_holding_lock)] // false-positive, we explicitly dropped the guard before the await point. async fn writev( &mut self, buf_vec: T, ) -> monoio::BufResult { - let mut write_simplex = self.write.lock().await; - ::writev(&mut *write_simplex, buf_vec).await + let mut write_simplex = self.write.lock().unwrap(); + await_io_future!(AsyncWriteRent, writev, write_simplex, buf_vec) } + #[allow(clippy::await_holding_lock)] // false-positive, we explicitly dropped the guard before the await point. async fn flush(&mut self) -> std::io::Result<()> { - let mut write_simplex = self.write.lock().await; - ::flush(&mut *write_simplex).await + let mut write_simplex = self.write.lock().unwrap(); + await_io_future!(AsyncWriteRent, flush, write_simplex) } + #[allow(clippy::await_holding_lock)] // false-positive, we explicitly dropped the guard before the await point. async fn shutdown(&mut self) -> std::io::Result<()> { - let mut write_simplex = self.write.lock().await; - ::shutdown(&mut *write_simplex).await + let mut write_simplex = self.write.lock().unwrap(); + await_io_future!(AsyncWriteRent, shutdown, write_simplex) } } @@ -180,4 +223,42 @@ mod tests { std::io::ErrorKind::BrokenPipe ); } + + #[monoio::test(enable_timer = true)] + async fn pending_read_will_not_hold_mutex_gaurd() { + let (mut client, server) = duplex(100); + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); + + monoio::spawn(async move { + // send task start signal + tx.send(()).unwrap(); + // this line should block + let (_result, _buf) = client.read(vec![0_u8; 10]).await; + }); + + rx.await.unwrap(); + + drop(server); + } + + #[monoio::test(enable_timer = true)] + async fn pending_write_will_not_hold_mutex_gaurd() { + let (mut client, server) = duplex(10); + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); + + // write 10 bytes to make future writes pending + let (write_result, _buf) = client.write(vec![0_u8; 10]).await; + assert_eq!(write_result.unwrap(), 10); + + monoio::spawn(async move { + // send task start signal + tx.send(()).unwrap(); + // this line should block + let (_result, _buf) = client.write(vec![0_u8; 10]).await; + }); + + rx.await.unwrap(); + + drop(server); + } }