From d2a0893eb81d231be435a3feda3e73c6bc2e3784 Mon Sep 17 00:00:00 2001 From: SteveLauC Date: Wed, 23 Oct 2024 11:07:12 +0800 Subject: [PATCH] feat: implement DuplexStream (#2) --- Cargo.toml | 3 +- src/duplex.rs | 172 +++++++++++++++++++++++++++++++++++++++++++++++++ src/simplex.rs | 5 +- 3 files changed, 176 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b61a720..3d79da5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,8 +5,9 @@ edition = "2021" description = "Port of Tokio's SimplexStream and DuplexStream for Monoio" [dependencies] +async-lock = "3.4.0" bytes = "1.7.2" monoio = "0.2.4" +futures = "0.3.31" [dev-dependencies] -futures = "0.3.31" diff --git a/src/duplex.rs b/src/duplex.rs index 76ec346..17dca08 100644 --- a/src/duplex.rs +++ b/src/duplex.rs @@ -1 +1,173 @@ //! Implementation of [`DuplexStream`]. + +use super::simplex::SimplexStream; + +use async_lock::Mutex; +use monoio::io::AsyncReadRent; +use monoio::io::AsyncWriteRent; +use std::rc::Rc; + +/// Create a new pair of `DuplexStream`s that act like a pair of connected sockets. +/// +/// The `max_buf_size` argument is the maximum amount of bytes that can be +/// written to a side before the write returns `Poll::Pending`. +pub fn duplex(max_buf_size: usize) -> (DuplexStream, DuplexStream) { + let one = Rc::new(Mutex::new(SimplexStream::new_unsplit(max_buf_size))); + let two = Rc::new(Mutex::new(SimplexStream::new_unsplit(max_buf_size))); + + ( + DuplexStream { + read: one.clone(), + write: two.clone(), + }, + DuplexStream { + read: two, + write: one, + }, + ) +} + +/// A bidirectional pipe to read and write bytes in memory. +/// +/// A pair of `DuplexStream`s are created together, and they act as a "channel" +/// that can be used as in-memory IO types. Writing to one of the pairs will +/// allow that data to be read from the other, and vice versa. +/// +/// # Closing a `DuplexStream` +/// +/// If one end of the `DuplexStream` channel is dropped, any pending reads on +/// the other side will continue to read data until the buffer is drained, then +/// they will signal EOF by returning 0 bytes. Any writes to the other side, +/// including pending ones (that are waiting for free space in the buffer) will +/// return `Err(BrokenPipe)` immediately. +/// +/// # Example +/// +/// ``` +/// # use monoio::io::{AsyncReadRentExt, AsyncWriteRentExt}; +/// # use monoio_duplex::simplex::simplex; +/// # use monoio::RuntimeBuilder; +/// # use monoio::FusionDriver; +/// # use monoio_duplex::duplex::duplex; +/// # +/// # let mut rt = RuntimeBuilder::::new().enable_all().build().unwrap(); +/// # rt.block_on(async { +/// let (mut client, mut server) = duplex(64); +/// +/// let (write_result, _buf) = client.write_all(b"ping").await; +/// assert_eq!(write_result.unwrap(), 4); +/// +/// let mut buf = [0u8; 4]; +/// let (read_result, buf) = server.read_exact(vec![0_u8; 4]).await; +/// assert_eq!(read_result.unwrap(), 4); +/// assert_eq!(&buf, b"ping"); +/// +/// let (write_result, _buf) = server.write_all(b"poong").await; +/// assert_eq!(write_result.unwrap(), 5); +/// +/// let (read_result, buf) = client.read_exact(vec![0_u8; 5]).await; +/// assert_eq!(read_result.unwrap(), 5); +/// assert_eq!(&buf, b"poong"); +/// # }); +/// ``` +#[derive(Debug)] +pub struct DuplexStream { + read: Rc>, + write: Rc>, +} + +impl Drop for DuplexStream { + fn drop(&mut self) { + futures::executor::block_on(async { + // notify the other side of the closure + self.write.lock().await.close_write(); + self.read.lock().await.close_read(); + }) + } +} + +impl AsyncReadRent for DuplexStream { + async fn read(&mut self, buf: T) -> monoio::BufResult { + let mut read_simplex = self.read.lock().await; + ::read(&mut *read_simplex, buf).await + } + + async fn readv(&mut self, buf: T) -> monoio::BufResult { + let mut read_simplex = self.read.lock().await; + ::readv(&mut *read_simplex, buf).await + } +} + +impl AsyncWriteRent for DuplexStream { + async fn write(&mut self, buf: T) -> monoio::BufResult { + let mut write_simplex = self.write.lock().await; + ::write(&mut *write_simplex, buf).await + } + + async fn writev( + &mut self, + buf_vec: T, + ) -> monoio::BufResult { + let mut write_simplex = self.write.lock().await; + ::writev(&mut *write_simplex, buf_vec).await + } + + async fn flush(&mut self) -> std::io::Result<()> { + let mut write_simplex = self.write.lock().await; + ::flush(&mut *write_simplex).await + } + + async fn shutdown(&mut self) -> std::io::Result<()> { + let mut write_simplex = self.write.lock().await; + ::shutdown(&mut *write_simplex).await + } +} + +#[cfg(test)] +mod tests { + use monoio::io::AsyncReadRentExt; + + use super::*; + + /// To guard this behavior: + /// + /// > If one end of the `DuplexStream` channel is dropped, + /// > + /// > * Any pending reads on the other side will continue to read data until + /// > the buffer is drained, then they will signal EOF by returning 0 bytes + /// > + /// > * Any writes to the other side including pending ones (that are waiting + /// > for free space in the buffer) will return `Err(BrokenPipe)` immediately. + #[monoio::test(enable_timer = true)] + async fn close_one_end() { + let (mut drop_side, mut use_side) = duplex(64); + + // before dropping the `drop_side`, let's populate some data + let (write_result, _buf) = drop_side + .write("dropdropdropdrop".as_bytes().to_vec()) + .await; + assert_eq!(write_result.unwrap(), 16); + drop(drop_side); + + // test read + // First, let's read the messages out using a buffer of size 4 + for _ in 0..4 { + let (read_result, msg) = use_side.read_exact(vec![0_u8; 4]).await; + assert_eq!(read_result.unwrap(), 4); + assert_eq!(msg, b"drop"); + } + // Now the buffer should be empty, the next read should return `Ready(Ok(4))` + let read_buf = vec![0_u8; 10]; + let all_0s = read_buf.clone(); + let (read_result, should_be_all_0s) = use_side.read(read_buf).await; + assert_eq!(read_result.unwrap(), 0); + assert_eq!(should_be_all_0s, all_0s); + + // test write + let (write_result, _buf) = use_side.write(b"are you still there?").await; + assert_eq!( + write_result.unwrap_err().kind(), + std::io::ErrorKind::BrokenPipe + ); + } +} diff --git a/src/simplex.rs b/src/simplex.rs index 3ede305..1b36f29 100644 --- a/src/simplex.rs +++ b/src/simplex.rs @@ -92,7 +92,7 @@ impl SimplexStream { } /// Closes and notifies the reader tasks. - fn close_write(&mut self) { + pub(crate) fn close_write(&mut self) { self.is_closed = true; // needs to notify any readers that no more data will come if let Some(waker) = self.read_waker.take() { @@ -101,8 +101,7 @@ impl SimplexStream { } /// Closes and notifies the writer tasks. - #[allow(unused)] // TODO: remove this attribute once DuplexStream is implemented - fn close_read(&mut self) { + pub(crate) fn close_read(&mut self) { self.is_closed = true; // needs to notify any writers that they have to abort if let Some(waker) = self.write_waker.take() {