From ba7483f70704112701ea4cb173703af6078b8522 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Thu, 29 Feb 2024 11:34:58 +0000 Subject: [PATCH 1/6] Add example poll_recv impl --- src/lib.rs | 99 ++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 62 insertions(+), 37 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index d6c9d2f..5d77f7c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1298,6 +1298,62 @@ impl Receiver { listener: None, } } + + /// A low level poll function that returns items of the same format handed back by + /// [`Receiver::recv()`] and [`Receiver::recv_direct()`]. This can be useful for building + /// stream implementations which use a [`Receiver`] under the hood and want to know if the + /// stream has overflowed. + /// + /// Prefer to use [`Receiver::recv()`] or [`Receiver::recv_direct()`] when otherwise possible. + pub fn poll_recv( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + loop { + // If this stream is listening for events, first wait for a notification. + if let Some(listener) = self.listener.as_mut() { + ready!(Pin::new(listener).poll(cx)); + self.listener = None; + } + + loop { + // Attempt to receive a message. + match self.try_recv() { + Ok(msg) => { + // The stream is not blocked on an event - drop the listener. + self.listener = None; + return Poll::Ready(Some(Ok(msg))); + } + Err(TryRecvError::Closed) => { + // The stream is not blocked on an event - drop the listener. + self.listener = None; + return Poll::Ready(None); + } + Err(TryRecvError::Overflowed(n)) => { + // The stream is not blocked on an event - drop the listener. + self.listener = None; + return Poll::Ready(Some(Err(RecvError::Overflowed(n)))); + } + Err(TryRecvError::Empty) => {} + } + + // Receiving failed - now start listening for notifications or wait for one. + match self.listener.as_mut() { + None => { + // Start listening and then try receiving again. + self.listener = { + let inner = self.inner.write().unwrap(); + Some(inner.recv_ops.listen()) + }; + } + Some(_) => { + // Go back to the outer loop to poll the listener. + break; + } + } + } + } + } } impl Drop for Receiver { @@ -1363,43 +1419,12 @@ impl Stream for Receiver { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { - // If this stream is listening for events, first wait for a notification. - if let Some(listener) = self.listener.as_mut() { - ready!(Pin::new(listener).poll(cx)); - self.listener = None; - } - - loop { - // Attempt to receive a message. - match self.try_recv() { - Ok(msg) => { - // The stream is not blocked on an event - drop the listener. - self.listener = None; - return Poll::Ready(Some(msg)); - } - Err(TryRecvError::Closed) => { - // The stream is not blocked on an event - drop the listener. - self.listener = None; - return Poll::Ready(None); - } - Err(TryRecvError::Overflowed(_)) => continue, - Err(TryRecvError::Empty) => {} - } - - // Receiving failed - now start listening for notifications or wait for one. - match self.listener.as_mut() { - None => { - // Start listening and then try receiving again. - self.listener = { - let inner = self.inner.write().unwrap(); - Some(inner.recv_ops.listen()) - }; - } - Some(_) => { - // Go back to the outer loop to poll the listener. - break; - } - } + match ready!(self.as_mut().poll_recv(cx)) { + Some(Ok(val)) => return Poll::Ready(Some(val)), + // RecvError::Closed is handled upstream, so shouldn't be seen here. + None | Some(Err(RecvError::Closed)) => return Poll::Ready(None), + // If overflowed, we expect future operations to succeed so try again. + Some(Err(RecvError::Overflowed(_))) => continue, } } } From 606d263ea4de7023f01972e899760a098fa59c33 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Thu, 29 Feb 2024 14:57:29 +0000 Subject: [PATCH 2/6] Quick test for poll_recv and funtcion=>method --- src/lib.rs | 2 +- tests/test.rs | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 5d77f7c..6d0da38 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1299,7 +1299,7 @@ impl Receiver { } } - /// A low level poll function that returns items of the same format handed back by + /// A low level poll method that returns items of the same format handed back by /// [`Receiver::recv()`] and [`Receiver::recv_direct()`]. This can be useful for building /// stream implementations which use a [`Receiver`] under the hood and want to know if the /// stream has overflowed. diff --git a/tests/test.rs b/tests/test.rs index 4416909..79718cb 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -288,3 +288,38 @@ fn inactive_drop() { assert!(s.is_closed()) } + +#[test] +fn poll_recv() { + let (s, mut r) = broadcast::(2); + r.set_overflow(true); + + // A quick custom stream impl to demonstrate/test `poll_recv`. + struct MyStream(Receiver); + impl futures_core::Stream for MyStream { + type Item = Result; + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::pin::Pin::new(&mut self.0).poll_recv(cx) + } + } + + block_on(async move { + let mut stream = MyStream(r); + + s.broadcast(1).await.unwrap(); + s.broadcast(2).await.unwrap(); + s.broadcast(3).await.unwrap(); + s.broadcast(4).await.unwrap(); + + assert_eq!(stream.next().await.unwrap(), Err(RecvError::Overflowed(2))); + assert_eq!(stream.next().await.unwrap(), Ok(3)); + assert_eq!(stream.next().await.unwrap(), Ok(4)); + + drop(s); + + assert_eq!(stream.next().await, None); + }) +} From 5a909c47b3228219ff7a1ee6cba1a06457dbd529 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Thu, 29 Feb 2024 15:46:57 +0000 Subject: [PATCH 3/6] Add OverflowError and example to poll_recv --- src/lib.rs | 45 ++++++++++++++++++++++++++++++++++++--------- tests/test.rs | 4 ++-- 2 files changed, 38 insertions(+), 11 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 6d0da38..def6dd4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1299,16 +1299,32 @@ impl Receiver { } } - /// A low level poll method that returns items of the same format handed back by - /// [`Receiver::recv()`] and [`Receiver::recv_direct()`]. This can be useful for building - /// stream implementations which use a [`Receiver`] under the hood and want to know if the - /// stream has overflowed. + /// A low level poll method that is similar to [`Receiver::recv()`] or + /// [`Receiver::recv_direct()`], and can be useful for building stream implementations which + /// use a [`Receiver`] under the hood and want to know if the stream has overflowed. /// /// Prefer to use [`Receiver::recv()`] or [`Receiver::recv_direct()`] when otherwise possible. + /// + /// # Examples + /// + /// ``` + /// use futures_core::Stream; + /// use async_broadcast::{Receiver, OverflowError}; + /// use std::{pin::Pin, task::{Poll, Context}}; + /// + /// struct MyStream(Receiver); + /// + /// impl futures_core::Stream for MyStream { + /// type Item = Result; + /// fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + /// Pin::new(&mut self.0).poll_recv(cx) + /// } + /// } + /// ``` pub fn poll_recv( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll>> { loop { // If this stream is listening for events, first wait for a notification. if let Some(listener) = self.listener.as_mut() { @@ -1332,7 +1348,7 @@ impl Receiver { Err(TryRecvError::Overflowed(n)) => { // The stream is not blocked on an event - drop the listener. self.listener = None; - return Poll::Ready(Some(Err(RecvError::Overflowed(n)))); + return Poll::Ready(Some(Err(OverflowError(n)))); } Err(TryRecvError::Empty) => {} } @@ -1421,10 +1437,9 @@ impl Stream for Receiver { loop { match ready!(self.as_mut().poll_recv(cx)) { Some(Ok(val)) => return Poll::Ready(Some(val)), - // RecvError::Closed is handled upstream, so shouldn't be seen here. - None | Some(Err(RecvError::Closed)) => return Poll::Ready(None), // If overflowed, we expect future operations to succeed so try again. - Some(Err(RecvError::Overflowed(_))) => continue, + Some(Err(OverflowError(_))) => continue, + None => return Poll::Ready(None), } } } @@ -1560,6 +1575,18 @@ impl fmt::Display for RecvError { } } +/// An error returned from [`Receiver::poll_recv()`]. +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +pub struct OverflowError(pub u64); + +impl error::Error for OverflowError {} + +impl fmt::Display for OverflowError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "receiving skipped {} messages", self.0) + } +} + /// An error returned from [`Receiver::try_recv()`]. #[derive(PartialEq, Eq, Clone, Copy, Debug)] pub enum TryRecvError { diff --git a/tests/test.rs b/tests/test.rs index 79718cb..ed8daa4 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -297,7 +297,7 @@ fn poll_recv() { // A quick custom stream impl to demonstrate/test `poll_recv`. struct MyStream(Receiver); impl futures_core::Stream for MyStream { - type Item = Result; + type Item = Result; fn poll_next( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, @@ -314,7 +314,7 @@ fn poll_recv() { s.broadcast(3).await.unwrap(); s.broadcast(4).await.unwrap(); - assert_eq!(stream.next().await.unwrap(), Err(RecvError::Overflowed(2))); + assert_eq!(stream.next().await.unwrap(), Err(OverflowError(2))); assert_eq!(stream.next().await.unwrap(), Ok(3)); assert_eq!(stream.next().await.unwrap(), Ok(4)); From 4d44bcd09743256caa49a7c42e8f87ceb68dd71d Mon Sep 17 00:00:00 2001 From: James Wilson Date: Thu, 29 Feb 2024 16:51:02 +0000 Subject: [PATCH 4/6] Add description of error condition to try_recv too --- src/lib.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index def6dd4..ef273eb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1303,6 +1303,9 @@ impl Receiver { /// [`Receiver::recv_direct()`], and can be useful for building stream implementations which /// use a [`Receiver`] under the hood and want to know if the stream has overflowed. /// + /// If the number of messages that have been sent has overflowed the channel capacity, an + /// [`OverflowError`] is returned containing the number of items that overflowed and were lost. + /// /// Prefer to use [`Receiver::recv()`] or [`Receiver::recv_direct()`] when otherwise possible. /// /// # Examples From 54d080f09c58f902155197cf9417ceae46bcf6fe Mon Sep 17 00:00:00 2001 From: James Wilson Date: Thu, 29 Feb 2024 18:08:27 +0000 Subject: [PATCH 5/6] revert to RecvError, Errors section, example desc --- src/lib.rs | 37 ++++++++++++++++--------------------- tests/test.rs | 4 ++-- 2 files changed, 18 insertions(+), 23 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index ef273eb..a62079d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1303,22 +1303,28 @@ impl Receiver { /// [`Receiver::recv_direct()`], and can be useful for building stream implementations which /// use a [`Receiver`] under the hood and want to know if the stream has overflowed. /// - /// If the number of messages that have been sent has overflowed the channel capacity, an - /// [`OverflowError`] is returned containing the number of items that overflowed and were lost. - /// /// Prefer to use [`Receiver::recv()`] or [`Receiver::recv_direct()`] when otherwise possible. /// + /// # Errors + /// + /// If the number of messages that have been sent has overflowed the channel capacity, a + /// [`RecvError::Overflowed`] variant is returned containing the number of items that + /// overflowed and were lost. + /// /// # Examples /// + /// This example shows how the [`Receiver::poll_recv`] method can be used to allow a custom + /// stream implementation to internally make use of a [`Receiver`]. + /// /// ``` /// use futures_core::Stream; - /// use async_broadcast::{Receiver, OverflowError}; + /// use async_broadcast::{Receiver, RecvError}; /// use std::{pin::Pin, task::{Poll, Context}}; /// /// struct MyStream(Receiver); /// /// impl futures_core::Stream for MyStream { - /// type Item = Result; + /// type Item = Result; /// fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { /// Pin::new(&mut self.0).poll_recv(cx) /// } @@ -1327,7 +1333,7 @@ impl Receiver { pub fn poll_recv( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll>> { loop { // If this stream is listening for events, first wait for a notification. if let Some(listener) = self.listener.as_mut() { @@ -1351,7 +1357,7 @@ impl Receiver { Err(TryRecvError::Overflowed(n)) => { // The stream is not blocked on an event - drop the listener. self.listener = None; - return Poll::Ready(Some(Err(OverflowError(n)))); + return Poll::Ready(Some(Err(RecvError::Overflowed(n)))); } Err(TryRecvError::Empty) => {} } @@ -1441,8 +1447,9 @@ impl Stream for Receiver { match ready!(self.as_mut().poll_recv(cx)) { Some(Ok(val)) => return Poll::Ready(Some(val)), // If overflowed, we expect future operations to succeed so try again. - Some(Err(OverflowError(_))) => continue, - None => return Poll::Ready(None), + Some(Err(RecvError::Overflowed(_))) => continue, + // RecvError::Closed should never appear here, but handle it anyway. + None | Some(Err(RecvError::Closed)) => return Poll::Ready(None), } } } @@ -1578,18 +1585,6 @@ impl fmt::Display for RecvError { } } -/// An error returned from [`Receiver::poll_recv()`]. -#[derive(PartialEq, Eq, Clone, Copy, Debug)] -pub struct OverflowError(pub u64); - -impl error::Error for OverflowError {} - -impl fmt::Display for OverflowError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "receiving skipped {} messages", self.0) - } -} - /// An error returned from [`Receiver::try_recv()`]. #[derive(PartialEq, Eq, Clone, Copy, Debug)] pub enum TryRecvError { diff --git a/tests/test.rs b/tests/test.rs index ed8daa4..79718cb 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -297,7 +297,7 @@ fn poll_recv() { // A quick custom stream impl to demonstrate/test `poll_recv`. struct MyStream(Receiver); impl futures_core::Stream for MyStream { - type Item = Result; + type Item = Result; fn poll_next( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, @@ -314,7 +314,7 @@ fn poll_recv() { s.broadcast(3).await.unwrap(); s.broadcast(4).await.unwrap(); - assert_eq!(stream.next().await.unwrap(), Err(OverflowError(2))); + assert_eq!(stream.next().await.unwrap(), Err(RecvError::Overflowed(2))); assert_eq!(stream.next().await.unwrap(), Ok(3)); assert_eq!(stream.next().await.unwrap(), Ok(4)); From 694a71056e30a309b8d412934f709e27862f4957 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Fri, 1 Mar 2024 09:46:19 +0000 Subject: [PATCH 6/6] Note difference in example --- src/lib.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index a62079d..90334b1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1314,7 +1314,9 @@ impl Receiver { /// # Examples /// /// This example shows how the [`Receiver::poll_recv`] method can be used to allow a custom - /// stream implementation to internally make use of a [`Receiver`]. + /// stream implementation to internally make use of a [`Receiver`]. This example implementation + /// differs from the stream implementation of [`Receiver`] because it returns an error if + /// the channel capacity overflows, which the built in [`Receiver`] stream doesn't do. /// /// ``` /// use futures_core::Stream;