From cfd9439e18e303700cc680a8d04ef9c13a63b5b9 Mon Sep 17 00:00:00 2001 From: Joel Crevier Date: Tue, 4 Oct 2022 19:09:39 -0500 Subject: [PATCH 1/9] feat(mpsc): add `len`, `capacity`, and related methods to mpsc (#71) --- src/mpsc/async_impl.rs | 472 +++++++++++++++++++++++++++++++++++++++++ src/mpsc/blocking.rs | 471 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 943 insertions(+) diff --git a/src/mpsc/async_impl.rs b/src/mpsc/async_impl.rs index 9f3431b..ca6a6ab 100644 --- a/src/mpsc/async_impl.rs +++ b/src/mpsc/async_impl.rs @@ -257,6 +257,119 @@ feature! { .core .try_send(self.inner.slots.as_ref(), val, &self.inner.recycle) } + + /// Returns the *total* capacity of the channel for this [`Sender`]. + /// This includes both occupied and unoccupied entries. + /// + /// To determine the channel's remaining *unoccupied* capacity, use + /// [`remaining`] instead. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc::channel; + /// + /// let (tx, _) = channel::(100); + /// assert_eq!(tx.capacity(), 100); + /// ``` + /// + /// Even after sending several messages, the capacity remains + /// the same: + /// ``` + /// # use thingbuf::mpsc::channel; + /// + /// let (tx, rx) = channel::(100); + /// *tx.try_send_ref().unwrap() = 1; + /// *tx.try_send_ref().unwrap() = 2; + /// *tx.try_send_ref().unwrap() = 3; + /// + /// assert_eq!(tx.capacity(), 100); + /// ``` + /// + /// [`remaining`]: Self::remaining + #[inline] + #[must_use] + pub fn capacity(&self) -> usize { + self.inner.core.core.capacity() + } + + /// Returns the unoccupied capacity of the channel for this [`Sender`] + /// (i.e., how many additional elements can be sent before the channel + /// will be full). + /// + /// This is equivalent to subtracting the channel's [`len`] from its [`capacity`]. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc::channel; + /// + /// let (tx, rx) = channel::(100); + /// assert_eq!(tx.remaining(), 100); + /// + /// *tx.try_send_ref().unwrap() = 1; + /// *tx.try_send_ref().unwrap() = 2; + /// *tx.try_send_ref().unwrap() = 3; + /// assert_eq!(tx.remaining(), 97); + /// + /// let _ = rx.try_recv_ref().unwrap(); + /// assert_eq!(tx.remaining(), 98) + /// ``` + /// + /// [`len`]: Self::len + /// [`capacity`]: Self::capacity + #[must_use] + pub fn remaining(&self) -> usize { + self.capacity() - self.len() + } + + /// Returns the number of elements in the channel of this [`Sender`]. + /// + /// To determine the channel's remaining *unoccupied* capacity, use + /// [`remaining`] instead. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc::channel; + /// + /// let (tx, rx) = channel::(100); + /// assert_eq!(tx.len(), 0); + /// + /// *tx.try_send_ref().unwrap() = 1; + /// *tx.try_send_ref().unwrap() = 2; + /// *tx.try_send_ref().unwrap() = 3; + /// assert_eq!(tx.len(), 3); + /// + /// let _ = rx.try_recv_ref().unwrap(); + /// assert_eq!(tx.len(), 2); + /// ``` + /// + /// [`remaining`]: Self::remaining + #[inline] + #[must_use] + pub fn len(&self) -> usize { + self.inner.core.core.len() + } + + /// Returns whether the number of elements in the channel of this [`Sender`] is 0. + /// + /// # Examples + /// ``` + /// use thingbuf::mpsc::channel; + /// let (tx, rx) = channel::(100); + /// assert!(tx.is_empty()); + /// + /// *tx.try_send_ref().unwrap() = 1; + /// + /// assert!(!tx.is_empty()); + /// ``` + /// + #[inline] + #[must_use] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } } impl Clone for Sender { @@ -581,6 +694,118 @@ feature! { pub fn is_closed(&self) -> bool { test_dbg!(self.inner.core.tx_count.load(Ordering::SeqCst)) <= 1 } + + /// Returns the *total* capacity of the channel for this [`Receiver`]. + /// This includes both occupied and unoccupied entries. + /// + /// To determine the channel's remaining *unoccupied* capacity, use + /// [`remaining`] instead. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc::channel; + /// + /// let (tx, rx) = channel::(100); + /// assert_eq!(rx.capacity(), 100); + /// ``` + /// + /// Even after sending several messages, the capacity remains + /// the same: + /// ``` + /// # use thingbuf::mpsc::channel; + /// + /// let (tx, rx) = channel::(100); + /// *tx.try_send_ref().unwrap() = 1; + /// *tx.try_send_ref().unwrap() = 2; + /// *tx.try_send_ref().unwrap() = 3; + /// + /// assert_eq!(rx.capacity(), 100); + /// ``` + /// + /// [`remaining`]: Self::remaining + #[inline] + #[must_use] + pub fn capacity(&self) -> usize { + self.inner.core.core.capacity() + } + + /// Returns the unoccupied capacity of the channel for this [`Receiver`] + /// (i.e., how many additional elements can be sent before the channel + /// will be full). + /// + /// This is equivalent to subtracting the channel's [`len`] from its [`capacity`]. + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc::channel; + /// + /// let (tx, rx) = channel::(100); + /// assert_eq!(rx.remaining(), 100); + /// + /// *tx.try_send_ref().unwrap() = 1; + /// *tx.try_send_ref().unwrap() = 2; + /// *tx.try_send_ref().unwrap() = 3; + /// assert_eq!(rx.remaining(), 97); + /// + /// let _ = rx.try_recv_ref().unwrap(); + /// assert_eq!(rx.remaining(), 98) + /// ``` + /// + /// [`len`]: Self::len + /// [`capacity`]: Self::capacity + #[must_use] + pub fn remaining(&self) -> usize { + self.capacity() - self.len() + } + + /// Returns the number of elements in the channel of this [`Receiver`]. + /// + /// To determine the channel's remaining *unoccupied* capacity, use + /// [`remaining`] instead. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc::channel; + /// + /// let (tx, rx) = channel::(100); + /// assert_eq!(rx.len(), 0); + /// + /// *tx.try_send_ref().unwrap() = 1; + /// *tx.try_send_ref().unwrap() = 2; + /// *tx.try_send_ref().unwrap() = 3; + /// assert_eq!(rx.len(), 3); + /// + /// let _ = rx.try_recv_ref().unwrap(); + /// assert_eq!(rx.len(), 2); + /// ``` + /// + /// [`remaining`]: Self::remaining + #[inline] + #[must_use] + pub fn len(&self) -> usize { + self.inner.core.core.len() + } + + /// Returns whether the number of elements in the channel of this [`Receiver`] is 0. + /// + /// # Examples + /// ``` + /// use thingbuf::mpsc::channel; + /// let (tx, rx) = channel::(100); + /// assert!(rx.is_empty()); + /// + /// *tx.try_send_ref().unwrap() = 1; + /// + /// assert!(!rx.is_empty()); + /// ``` + /// + #[inline] + #[must_use] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } } impl Drop for Receiver { @@ -942,6 +1167,130 @@ feature! { pub fn try_send(&self, val: T) -> Result<(), TrySendError> { self.core.try_send(self.slots, val, self.recycle) } + + /// Returns the *total* capacity of the channel for this [`Sender`]. + /// This includes both occupied and unoccupied entries. + /// + /// To determine the channel's remaining *unoccupied* capacity, use + /// [`remaining`] instead. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc::StaticChannel; + /// + /// static CHANNEL: StaticChannel = StaticChannel::new(); + /// + /// let (tx, _) = CHANNEL.split(); + /// assert_eq!(tx.capacity(), 100); + /// ``` + /// + /// Even after sending several messages, the capacity remains + /// the same: + /// ``` + /// # use thingbuf::mpsc::StaticChannel; + /// + /// # static CHANNEL: StaticChannel = StaticChannel::new(); + /// + /// let (tx, rx) = CHANNEL.split(); + /// *tx.try_send_ref().unwrap() = 1; + /// *tx.try_send_ref().unwrap() = 2; + /// *tx.try_send_ref().unwrap() = 3; + /// + /// assert_eq!(tx.capacity(), 100); + /// ``` + /// + /// [`remaining`]: Self::remaining + #[inline] + #[must_use] + pub fn capacity(&self) -> usize { + self.core.core.capacity() + } + + /// Returns the unoccupied capacity of the channel for this [`Sender`] + /// (i.e., how many additional elements can be sent before the channel + /// will be full). + /// + /// This is equivalent to subtracting the channel's [`len`] from its [`capacity`]. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc::StaticChannel; + /// + /// static CHANNEL: StaticChannel = StaticChannel::new(); + /// + /// let (tx, rx) = CHANNEL.split(); + /// assert_eq!(tx.remaining(), 100); + /// + /// *tx.try_send_ref().unwrap() = 1; + /// *tx.try_send_ref().unwrap() = 2; + /// *tx.try_send_ref().unwrap() = 3; + /// assert_eq!(tx.remaining(), 97); + /// + /// let _ = rx.try_recv_ref().unwrap(); + /// assert_eq!(tx.remaining(), 98) + /// ``` + /// + /// [`len`]: Self::len + /// [`capacity`]: Self::capacity + #[must_use] + pub fn remaining(&self) -> usize { + self.capacity() - self.len() + } + + /// Returns the number of elements in the channel of this [`Sender`]. + /// + /// To determine the channel's remaining *unoccupied* capacity, use + /// [`remaining`] instead. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc::StaticChannel; + /// + /// static CHANNEL: StaticChannel = StaticChannel::new(); + /// + /// let (tx, rx) = CHANNEL.split(); + /// assert_eq!(tx.len(), 0); + /// + /// *tx.try_send_ref().unwrap() = 1; + /// *tx.try_send_ref().unwrap() = 2; + /// *tx.try_send_ref().unwrap() = 3; + /// assert_eq!(tx.len(), 3); + /// + /// let _ = rx.try_recv_ref().unwrap(); + /// assert_eq!(tx.len(), 2); + /// ``` + /// + /// [`remaining`]: Self::remaining + #[inline] + #[must_use] + pub fn len(&self) -> usize { + self.core.core.len() + } + + /// Returns whether the number of elements in the channel of this [`Sender`] is 0. + /// + /// # Examples + /// ``` + /// use thingbuf::mpsc::StaticChannel; + /// + /// static CHANNEL: StaticChannel = StaticChannel::new(); + /// + /// let (tx, rx) = CHANNEL.split(); + /// assert!(tx.is_empty()); + /// + /// *tx.try_send_ref().unwrap() = 1; + /// + /// assert!(!tx.is_empty()); + /// ``` + /// + #[inline] + #[must_use] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } } impl Clone for StaticSender { @@ -1286,6 +1635,129 @@ feature! { pub fn is_closed(&self) -> bool { test_dbg!(self.core.tx_count.load(Ordering::SeqCst)) <= 1 } + + /// Returns the *total* capacity of the channel for this [`Receiver`]. + /// This includes both occupied and unoccupied entries. + /// + /// To determine the channel's remaining *unoccupied* capacity, use + /// [`remaining`] instead. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc::StaticChannel; + /// + /// static CHANNEL: StaticChannel = StaticChannel::new(); + /// + /// let (_, rx) = CHANNEL.split(); + /// assert_eq!(rx.capacity(), 100); + /// ``` + /// + /// Even after sending several messages, the capacity remains + /// the same: + /// ``` + /// # use thingbuf::mpsc::StaticChannel; + /// + /// # static CHANNEL: StaticChannel = StaticChannel::new(); + /// + /// let (tx, rx) = CHANNEL.split(); + /// *tx.try_send_ref().unwrap() = 1; + /// *tx.try_send_ref().unwrap() = 2; + /// *tx.try_send_ref().unwrap() = 3; + /// + /// assert_eq!(rx.capacity(), 100); + /// ``` + /// + /// [`remaining`]: Self::remaining + #[inline] + #[must_use] + pub fn capacity(&self) -> usize { + self.core.core.capacity() + } + + /// Returns the unoccupied capacity of the channel for this [`Receiver`] + /// (i.e., how many additional elements can be sent before the channel + /// will be full). + /// + /// This is equivalent to subtracting the channel's [`len`] from its [`capacity`]. + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc::StaticChannel; + /// + /// static CHANNEL: StaticChannel = StaticChannel::new(); + /// + /// let (tx, rx) = CHANNEL.split(); + /// assert_eq!(rx.remaining(), 100); + /// + /// *tx.try_send_ref().unwrap() = 1; + /// *tx.try_send_ref().unwrap() = 2; + /// *tx.try_send_ref().unwrap() = 3; + /// assert_eq!(rx.remaining(), 97); + /// + /// let _ = rx.try_recv_ref().unwrap(); + /// assert_eq!(rx.remaining(), 98) + /// ``` + /// + /// [`len`]: Self::len + /// [`capacity`]: Self::capacity + #[must_use] + pub fn remaining(&self) -> usize { + self.capacity() - self.len() + } + + /// Returns the number of elements in the channel of this [`Receiver`]. + /// + /// To determine the channel's remaining *unoccupied* capacity, use + /// [`remaining`] instead. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc::StaticChannel; + /// + /// static CHANNEL: StaticChannel = StaticChannel::new(); + /// + /// let (tx, rx) = CHANNEL.split(); + /// assert_eq!(rx.len(), 0); + /// + /// *tx.try_send_ref().unwrap() = 1; + /// *tx.try_send_ref().unwrap() = 2; + /// *tx.try_send_ref().unwrap() = 3; + /// assert_eq!(rx.len(), 3); + /// + /// let _ = rx.try_recv_ref().unwrap(); + /// assert_eq!(rx.len(), 2); + /// ``` + /// + /// [`remaining`]: Self::remaining + #[inline] + #[must_use] + pub fn len(&self) -> usize { + self.core.core.len() + } + + /// Returns whether the number of elements in the channel of this [`Receiver`] is 0. + /// + /// # Examples + /// ``` + /// use thingbuf::mpsc::StaticChannel; + /// + /// static CHANNEL: StaticChannel = StaticChannel::new(); + /// + /// let (tx, rx) = CHANNEL.split(); + /// assert!(rx.is_empty()); + /// + /// *tx.try_send_ref().unwrap() = 1; + /// + /// assert!(!rx.is_empty()); + /// ``` + /// + #[inline] + #[must_use] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } } impl Drop for StaticReceiver { diff --git a/src/mpsc/blocking.rs b/src/mpsc/blocking.rs index b46f47d..09b5996 100644 --- a/src/mpsc/blocking.rs +++ b/src/mpsc/blocking.rs @@ -410,6 +410,129 @@ feature! { pub fn try_send(&self, val: T) -> Result<(), TrySendError> { self.core.try_send(self.slots, val, self.recycle) } + /// Returns the *total* capacity of the channel for this [`Sender`]. + /// This includes both occupied and unoccupied entries. + /// + /// To determine the channel's remaining *unoccupied* capacity, use + /// [`remaining`] instead. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc::blocking::StaticChannel; + /// + /// static CHANNEL: StaticChannel = StaticChannel::new(); + /// + /// let (tx, _) = CHANNEL.split(); + /// assert_eq!(tx.capacity(), 100); + /// ``` + /// + /// Even after sending several messages, the capacity remains + /// the same: + /// ``` + /// # use thingbuf::mpsc::blocking::StaticChannel; + /// + /// # static CHANNEL: StaticChannel = StaticChannel::new(); + /// + /// let (tx, rx) = CHANNEL.split(); + /// *tx.try_send_ref().unwrap() = 1; + /// *tx.try_send_ref().unwrap() = 2; + /// *tx.try_send_ref().unwrap() = 3; + /// + /// assert_eq!(tx.capacity(), 100); + /// ``` + /// + /// [`remaining`]: Self::remaining + #[inline] + #[must_use] + pub fn capacity(&self) -> usize { + self.core.core.capacity() + } + + /// Returns the unoccupied capacity of the channel for this [`Sender`] + /// (i.e., how many additional elements can be sent before the channel + /// will be full). + /// + /// This is equivalent to subtracting the channel's [`len`] from its [`capacity`]. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc::blocking::StaticChannel; + /// + /// static CHANNEL: StaticChannel = StaticChannel::new(); + /// + /// let (tx, rx) = CHANNEL.split(); + /// assert_eq!(tx.remaining(), 100); + /// + /// *tx.try_send_ref().unwrap() = 1; + /// *tx.try_send_ref().unwrap() = 2; + /// *tx.try_send_ref().unwrap() = 3; + /// assert_eq!(tx.remaining(), 97); + /// + /// let _ = rx.try_recv_ref().unwrap(); + /// assert_eq!(tx.remaining(), 98) + /// ``` + /// + /// [`len`]: Self::len + /// [`capacity`]: Self::capacity + #[must_use] + pub fn remaining(&self) -> usize { + self.capacity() - self.len() + } + + /// Returns the number of elements in the channel of this [`Sender`]. + /// + /// To determine the channel's remaining *unoccupied* capacity, use + /// [`remaining`] instead. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc::blocking::StaticChannel; + /// + /// static CHANNEL: StaticChannel = StaticChannel::new(); + /// + /// let (tx, rx) = CHANNEL.split(); + /// assert_eq!(tx.len(), 0); + /// + /// *tx.try_send_ref().unwrap() = 1; + /// *tx.try_send_ref().unwrap() = 2; + /// *tx.try_send_ref().unwrap() = 3; + /// assert_eq!(tx.len(), 3); + /// + /// let _ = rx.try_recv_ref().unwrap(); + /// assert_eq!(tx.len(), 2); + /// ``` + /// + /// [`remaining`]: Self::remaining + #[inline] + #[must_use] + pub fn len(&self) -> usize { + self.core.core.len() + } + + /// Returns whether the number of elements in the channel of this [`Sender`] is 0. + /// + /// # Examples + /// ``` + /// use thingbuf::mpsc::blocking::StaticChannel; + /// + /// static CHANNEL: StaticChannel = StaticChannel::new(); + /// + /// let (tx, _) = CHANNEL.split(); + /// assert!(tx.is_empty()); + /// + /// *tx.try_send_ref().unwrap() = 1; + /// + /// assert!(!tx.is_empty()); + /// ``` + /// + #[inline] + #[must_use] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } } impl Clone for StaticSender { @@ -650,6 +773,129 @@ feature! { pub fn is_closed(&self) -> bool { test_dbg!(self.core.tx_count.load(Ordering::SeqCst)) <= 1 } + + /// Returns the *total* capacity of the channel for this [`Receiver`]. + /// This includes both occupied and unoccupied entries. + /// + /// To determine the channel's remaining *unoccupied* capacity, use + /// [`remaining`] instead. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc::blocking::StaticChannel; + /// + /// static CHANNEL: StaticChannel = StaticChannel::new(); + /// + /// let (_, rx) = CHANNEL.split(); + /// assert_eq!(rx.capacity(), 100); + /// ``` + /// + /// Even after sending several messages, the capacity remains + /// the same: + /// ``` + /// # use thingbuf::mpsc::blocking::StaticChannel; + /// + /// # static CHANNEL: StaticChannel = StaticChannel::new(); + /// + /// let (tx, rx) = CHANNEL.split(); + /// *tx.try_send_ref().unwrap() = 1; + /// *tx.try_send_ref().unwrap() = 2; + /// *tx.try_send_ref().unwrap() = 3; + /// + /// assert_eq!(rx.capacity(), 100); + /// ``` + /// + /// [`remaining`]: Self::remaining + #[inline] + #[must_use] + pub fn capacity(&self) -> usize { + self.core.core.capacity() + } + + /// Returns the unoccupied capacity of the channel for this [`Receiver`] + /// (i.e., how many additional elements can be sent before the channel + /// will be full). + /// + /// This is equivalent to subtracting the channel's [`len`] from its [`capacity`]. + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc::blocking::StaticChannel; + /// + /// static CHANNEL: StaticChannel = StaticChannel::new(); + /// + /// let (tx, rx) = CHANNEL.split(); + /// assert_eq!(rx.remaining(), 100); + /// + /// *tx.try_send_ref().unwrap() = 1; + /// *tx.try_send_ref().unwrap() = 2; + /// *tx.try_send_ref().unwrap() = 3; + /// assert_eq!(rx.remaining(), 97); + /// + /// let _ = rx.try_recv_ref().unwrap(); + /// assert_eq!(rx.remaining(), 98) + /// ``` + /// + /// [`len`]: Self::len + /// [`capacity`]: Self::capacity + #[must_use] + pub fn remaining(&self) -> usize { + self.capacity() - self.len() + } + + /// Returns the number of elements in the channel of this [`Receiver`]. + /// + /// To determine the channel's remaining *unoccupied* capacity, use + /// [`remaining`] instead. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc::blocking::StaticChannel; + /// + /// static CHANNEL: StaticChannel = StaticChannel::new(); + /// + /// let (tx, rx) = CHANNEL.split(); + /// assert_eq!(rx.len(), 0); + /// + /// *tx.try_send_ref().unwrap() = 1; + /// *tx.try_send_ref().unwrap() = 2; + /// *tx.try_send_ref().unwrap() = 3; + /// assert_eq!(rx.len(), 3); + /// + /// let _ = rx.try_recv_ref().unwrap(); + /// assert_eq!(rx.len(), 2); + /// ``` + /// + /// [`remaining`]: Self::remaining + #[inline] + #[must_use] + pub fn len(&self) -> usize { + self.core.core.len() + } + + /// Returns whether the number of elements in the channel of this [`Receiver`] is 0. + /// + /// # Examples + /// ``` + /// use thingbuf::mpsc::blocking::StaticChannel; + /// + /// static CHANNEL: StaticChannel = StaticChannel::new(); + /// + /// let (tx, rx) = CHANNEL.split(); + /// assert!(rx.is_empty()); + /// + /// *tx.try_send_ref().unwrap() = 1; + /// + /// assert!(!rx.is_empty()); + /// ``` + /// + #[inline] + #[must_use] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } } impl<'a, T, R> Iterator for &'a StaticReceiver { @@ -885,6 +1131,119 @@ where .core .try_send(self.inner.slots.as_ref(), val, &self.inner.recycle) } + + /// Returns the *total* capacity of the channel for this [`Sender`]. + /// This includes both occupied and unoccupied entries. + /// + /// To determine the channel's remaining *unoccupied* capacity, use + /// [`remaining`] instead. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc::blocking::channel; + /// + /// let (tx, _) = channel::(100); + /// assert_eq!(tx.capacity(), 100); + /// ``` + /// + /// Even after sending several messages, the capacity remains + /// the same: + /// ``` + /// # use thingbuf::mpsc::blocking::channel; + /// + /// let (tx, rx) = channel::(100); + /// *tx.try_send_ref().unwrap() = 1; + /// *tx.try_send_ref().unwrap() = 2; + /// *tx.try_send_ref().unwrap() = 3; + /// + /// assert_eq!(tx.capacity(), 100); + /// ``` + /// + /// [`remaining`]: Self::remaining + #[inline] + #[must_use] + pub fn capacity(&self) -> usize { + self.inner.core.core.capacity() + } + + /// Returns the unoccupied capacity of the channel for this [`Sender`] + /// (i.e., how many additional elements can be sent before the channel + /// will be full). + /// + /// This is equivalent to subtracting the channel's [`len`] from its [`capacity`]. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc::blocking::channel; + /// + /// let (tx, rx) = channel::(100); + /// assert_eq!(tx.remaining(), 100); + /// + /// *tx.try_send_ref().unwrap() = 1; + /// *tx.try_send_ref().unwrap() = 2; + /// *tx.try_send_ref().unwrap() = 3; + /// assert_eq!(tx.remaining(), 97); + /// + /// let _ = rx.try_recv_ref().unwrap(); + /// assert_eq!(tx.remaining(), 98) + /// ``` + /// + /// [`len`]: Self::len + /// [`capacity`]: Self::capacity + #[must_use] + pub fn remaining(&self) -> usize { + self.capacity() - self.len() + } + + /// Returns the number of elements in the channel of this [`Sender`]. + /// + /// To determine the channel's remaining *unoccupied* capacity, use + /// [`remaining`] instead. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc::blocking::channel; + /// + /// let (tx, rx) = channel::(100); + /// assert_eq!(tx.len(), 0); + /// + /// *tx.try_send_ref().unwrap() = 1; + /// *tx.try_send_ref().unwrap() = 2; + /// *tx.try_send_ref().unwrap() = 3; + /// assert_eq!(tx.len(), 3); + /// + /// let _ = rx.try_recv_ref().unwrap(); + /// assert_eq!(tx.len(), 2); + /// ``` + /// + /// [`remaining`]: Self::remaining + #[inline] + #[must_use] + pub fn len(&self) -> usize { + self.inner.core.core.len() + } + + /// Returns whether the number of elements in the channel of this [`Sender`] is 0. + /// + /// # Examples + /// ``` + /// use thingbuf::mpsc::blocking::channel; + /// let (tx, _) = channel::(100); + /// assert!(tx.is_empty()); + /// + /// *tx.try_send_ref().unwrap() = 1; + /// + /// assert!(!tx.is_empty()); + /// ``` + /// + #[inline] + #[must_use] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } } impl Clone for Sender { @@ -1111,6 +1470,118 @@ impl Receiver { pub fn is_closed(&self) -> bool { test_dbg!(self.inner.core.tx_count.load(Ordering::SeqCst)) <= 1 } + + /// Returns the *total* capacity of the channel for this [`Receiver`]. + /// This includes both occupied and unoccupied entries. + /// + /// To determine the channel's remaining *unoccupied* capacity, use + /// [`remaining`] instead. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc::blocking::channel; + /// + /// let (_, rx) = channel::(100); + /// assert_eq!(rx.capacity(), 100); + /// ``` + /// + /// Even after sending several messages, the capacity remains + /// the same: + /// ``` + /// # use thingbuf::mpsc::blocking::channel; + /// + /// let (tx, rx) = channel::(100); + /// *tx.try_send_ref().unwrap() = 1; + /// *tx.try_send_ref().unwrap() = 2; + /// *tx.try_send_ref().unwrap() = 3; + /// + /// assert_eq!(rx.capacity(), 100); + /// ``` + /// + /// [`remaining`]: Self::remaining + #[inline] + #[must_use] + pub fn capacity(&self) -> usize { + self.inner.core.core.capacity() + } + + /// Returns the unoccupied capacity of the channel for this [`Receiver`] + /// (i.e., how many additional elements can be sent before the channel + /// will be full). + /// + /// This is equivalent to subtracting the channel's [`len`] from its [`capacity`]. + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc::blocking::channel; + /// + /// let (tx, rx) = channel::(100); + /// assert_eq!(rx.remaining(), 100); + /// + /// *tx.try_send_ref().unwrap() = 1; + /// *tx.try_send_ref().unwrap() = 2; + /// *tx.try_send_ref().unwrap() = 3; + /// assert_eq!(rx.remaining(), 97); + /// + /// let _ = rx.try_recv_ref().unwrap(); + /// assert_eq!(rx.remaining(), 98) + /// ``` + /// + /// [`len`]: Self::len + /// [`capacity`]: Self::capacity + #[must_use] + pub fn remaining(&self) -> usize { + self.capacity() - self.len() + } + + /// Returns the number of elements in the channel of this [`Receiver`]. + /// + /// To determine the channel's remaining *unoccupied* capacity, use + /// [`remaining`] instead. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc::blocking::channel; + /// + /// let (tx, rx) = channel::(100); + /// assert_eq!(rx.len(), 0); + /// + /// *tx.try_send_ref().unwrap() = 1; + /// *tx.try_send_ref().unwrap() = 2; + /// *tx.try_send_ref().unwrap() = 3; + /// assert_eq!(rx.len(), 3); + /// + /// let _ = rx.try_recv_ref().unwrap(); + /// assert_eq!(rx.len(), 2); + /// ``` + /// + /// [`remaining`]: Self::remaining + #[inline] + #[must_use] + pub fn len(&self) -> usize { + self.inner.core.core.len() + } + + /// Returns whether the number of elements in the channel of this [`Receiver`] is 0. + /// + /// # Examples + /// ``` + /// use thingbuf::mpsc::channel; + /// let (tx, rx) = channel::(100); + /// assert!(rx.is_empty()); + /// + /// *tx.try_send_ref().unwrap() = 1; + /// + /// assert!(!rx.is_empty()); + /// ``` + /// + #[inline] + #[must_use] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } } impl<'a, T, R> Iterator for &'a Receiver { From bcfc458fcebd86897d31bfbf19f7344ab7f095d9 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 6 Oct 2022 13:36:31 -0700 Subject: [PATCH 2/9] Apply suggestions from code review --- src/mpsc/async_impl.rs | 18 ++++++++++-------- src/mpsc/blocking.rs | 14 +++++++------- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/src/mpsc/async_impl.rs b/src/mpsc/async_impl.rs index ca6a6ab..ed896db 100644 --- a/src/mpsc/async_impl.rs +++ b/src/mpsc/async_impl.rs @@ -319,6 +319,7 @@ feature! { /// [`len`]: Self::len /// [`capacity`]: Self::capacity #[must_use] + #[must_use] pub fn remaining(&self) -> usize { self.capacity() - self.len() } @@ -755,6 +756,7 @@ feature! { /// [`len`]: Self::len /// [`capacity`]: Self::capacity #[must_use] + #[must_use] pub fn remaining(&self) -> usize { self.capacity() - self.len() } @@ -1168,7 +1170,7 @@ feature! { self.core.try_send(self.slots, val, self.recycle) } - /// Returns the *total* capacity of the channel for this [`Sender`]. + /// Returns the *total* capacity of the channel for this [`StaticSender`]. /// This includes both occupied and unoccupied entries. /// /// To determine the channel's remaining *unoccupied* capacity, use @@ -1207,7 +1209,7 @@ feature! { self.core.core.capacity() } - /// Returns the unoccupied capacity of the channel for this [`Sender`] + /// Returns the unoccupied capacity of the channel for this [`StaticSender`] /// (i.e., how many additional elements can be sent before the channel /// will be full). /// @@ -1239,7 +1241,7 @@ feature! { self.capacity() - self.len() } - /// Returns the number of elements in the channel of this [`Sender`]. + /// Returns the number of elements in the channel of this [`StaticSender`]. /// /// To determine the channel's remaining *unoccupied* capacity, use /// [`remaining`] instead. @@ -1270,7 +1272,7 @@ feature! { self.core.core.len() } - /// Returns whether the number of elements in the channel of this [`Sender`] is 0. + /// Returns whether the number of elements in the channel of this [`StaticSender`] is 0. /// /// # Examples /// ``` @@ -1636,7 +1638,7 @@ feature! { test_dbg!(self.core.tx_count.load(Ordering::SeqCst)) <= 1 } - /// Returns the *total* capacity of the channel for this [`Receiver`]. + /// Returns the *total* capacity of the channel for this [`StaticReceiver`]. /// This includes both occupied and unoccupied entries. /// /// To determine the channel's remaining *unoccupied* capacity, use @@ -1675,7 +1677,7 @@ feature! { self.core.core.capacity() } - /// Returns the unoccupied capacity of the channel for this [`Receiver`] + /// Returns the unoccupied capacity of the channel for this [`StaticReceiver`] /// (i.e., how many additional elements can be sent before the channel /// will be full). /// @@ -1706,7 +1708,7 @@ feature! { self.capacity() - self.len() } - /// Returns the number of elements in the channel of this [`Receiver`]. + /// Returns the number of elements in the channel of this [`StaticReceiver`]. /// /// To determine the channel's remaining *unoccupied* capacity, use /// [`remaining`] instead. @@ -1737,7 +1739,7 @@ feature! { self.core.core.len() } - /// Returns whether the number of elements in the channel of this [`Receiver`] is 0. + /// Returns whether the number of elements in the channel of this [`StaticReceiver`] is 0. /// /// # Examples /// ``` diff --git a/src/mpsc/blocking.rs b/src/mpsc/blocking.rs index 09b5996..d9205d9 100644 --- a/src/mpsc/blocking.rs +++ b/src/mpsc/blocking.rs @@ -410,7 +410,7 @@ feature! { pub fn try_send(&self, val: T) -> Result<(), TrySendError> { self.core.try_send(self.slots, val, self.recycle) } - /// Returns the *total* capacity of the channel for this [`Sender`]. + /// Returns the *total* capacity of the channel for this [`StaticSender`]. /// This includes both occupied and unoccupied entries. /// /// To determine the channel's remaining *unoccupied* capacity, use @@ -481,7 +481,7 @@ feature! { self.capacity() - self.len() } - /// Returns the number of elements in the channel of this [`Sender`]. + /// Returns the number of elements in the channel of this [`StaticSender`]. /// /// To determine the channel's remaining *unoccupied* capacity, use /// [`remaining`] instead. @@ -512,7 +512,7 @@ feature! { self.core.core.len() } - /// Returns whether the number of elements in the channel of this [`Sender`] is 0. + /// Returns whether the number of elements in the channel of this [`StaticSender`] is 0. /// /// # Examples /// ``` @@ -774,7 +774,7 @@ feature! { test_dbg!(self.core.tx_count.load(Ordering::SeqCst)) <= 1 } - /// Returns the *total* capacity of the channel for this [`Receiver`]. + /// Returns the *total* capacity of the channel for this [`StaticReceiver`]. /// This includes both occupied and unoccupied entries. /// /// To determine the channel's remaining *unoccupied* capacity, use @@ -813,7 +813,7 @@ feature! { self.core.core.capacity() } - /// Returns the unoccupied capacity of the channel for this [`Receiver`] + /// Returns the unoccupied capacity of the channel for this [`StaticReceiver`] /// (i.e., how many additional elements can be sent before the channel /// will be full). /// @@ -844,7 +844,7 @@ feature! { self.capacity() - self.len() } - /// Returns the number of elements in the channel of this [`Receiver`]. + /// Returns the number of elements in the channel of this [`StaticReceiver`]. /// /// To determine the channel's remaining *unoccupied* capacity, use /// [`remaining`] instead. @@ -875,7 +875,7 @@ feature! { self.core.core.len() } - /// Returns whether the number of elements in the channel of this [`Receiver`] is 0. + /// Returns whether the number of elements in the channel of this [`StatucReceiver`] is 0. /// /// # Examples /// ``` From 2b1afa13458c2c0046de64be0bd86523e90e609d Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 6 Oct 2022 13:44:40 -0700 Subject: [PATCH 3/9] Update src/mpsc/blocking.rs --- src/mpsc/blocking.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/mpsc/blocking.rs b/src/mpsc/blocking.rs index d9205d9..1deb1e3 100644 --- a/src/mpsc/blocking.rs +++ b/src/mpsc/blocking.rs @@ -410,6 +410,7 @@ feature! { pub fn try_send(&self, val: T) -> Result<(), TrySendError> { self.core.try_send(self.slots, val, self.recycle) } + /// Returns the *total* capacity of the channel for this [`StaticSender`]. /// This includes both occupied and unoccupied entries. /// From b8772181584aeabe55ba9b5603992be33b91edcc Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 28 Apr 2023 10:36:46 -0700 Subject: [PATCH 4/9] remove duplicate `must_use` --- src/mpsc/async_impl.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/mpsc/async_impl.rs b/src/mpsc/async_impl.rs index ed896db..dcf2a8e 100644 --- a/src/mpsc/async_impl.rs +++ b/src/mpsc/async_impl.rs @@ -319,7 +319,6 @@ feature! { /// [`len`]: Self::len /// [`capacity`]: Self::capacity #[must_use] - #[must_use] pub fn remaining(&self) -> usize { self.capacity() - self.len() } From b425fb28377a9e08aa10d7a807d0536853b17035 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 28 Apr 2023 10:37:11 -0700 Subject: [PATCH 5/9] Apply suggestions from code review --- src/mpsc/blocking.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mpsc/blocking.rs b/src/mpsc/blocking.rs index 9bfff78..cec6faa 100644 --- a/src/mpsc/blocking.rs +++ b/src/mpsc/blocking.rs @@ -569,7 +569,7 @@ feature! { self.core.core.capacity() } - /// Returns the unoccupied capacity of the channel for this [`Sender`] + /// Returns the unoccupied capacity of the channel for this [`StaticSender`] /// (i.e., how many additional elements can be sent before the channel /// will be full). /// From 7bc89ac07d69d30826e6bde86d724b86469235a6 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Sat, 6 Apr 2024 10:04:03 -0700 Subject: [PATCH 6/9] Update src/mpsc/async_impl.rs --- src/mpsc/async_impl.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/mpsc/async_impl.rs b/src/mpsc/async_impl.rs index 4039428..f00739c 100644 --- a/src/mpsc/async_impl.rs +++ b/src/mpsc/async_impl.rs @@ -762,7 +762,6 @@ feature! { /// [`len`]: Self::len /// [`capacity`]: Self::capacity #[must_use] - #[must_use] pub fn remaining(&self) -> usize { self.capacity() - self.len() } From 419621783bf34f81399a0400e371677351aa370f Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Sat, 6 Apr 2024 10:24:19 -0700 Subject: [PATCH 7/9] fix `len` on closed channels --- src/lib.rs | 34 +++++++++++++++++++++++++++++++++- src/mpsc/blocking.rs | 16 +++++++++++----- 2 files changed, 44 insertions(+), 6 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index e3b2eee..f702ef2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -432,7 +432,11 @@ impl Core { return match head_idx.cmp(&tail_idx) { cmp::Ordering::Less => tail_idx - head_idx, cmp::Ordering::Greater => self.capacity - head_idx + tail_idx, - _ if tail == head => 0, + // Ignore the closed bit when comparing head and tail here, + // since it's not relevant to the length of the channel. If + // both indices point at the same slot and lap, the length + // is zero, even if the channel has been closed. + _ if (tail & !self.closed) == (head & !self.closed) => 0, _ => self.capacity, }; } @@ -665,3 +669,31 @@ impl fmt::Display for Full { #[cfg(feature = "std")] impl std::error::Error for Full {} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn zero_len() { + const CAP: usize = 16; + let mut core = Core::new(CAP); + assert_eq!(core.len(), 0); + assert_eq!(core.capacity(), CAP); + + // don't panic in drop impl. + core.has_dropped_slots = true; + } + + #[test] + fn closed_channel_len() { + const CAP: usize = 16; + let mut core = Core::new(CAP); + core.close(); + assert_eq!(core.len(), 0); + assert_eq!(core.capacity(), CAP); + + // don't panic in drop impl. + core.has_dropped_slots = true; + } +} diff --git a/src/mpsc/blocking.rs b/src/mpsc/blocking.rs index 8a0efa1..e4ad3a8 100644 --- a/src/mpsc/blocking.rs +++ b/src/mpsc/blocking.rs @@ -4,11 +4,17 @@ //! [`Receiver`] types in this module wait by blocking the current thread, //! rather than asynchronously yielding. use super::*; -use crate::{loom::{ - atomic::{self, Ordering}, - sync::Arc, - thread::{self, Thread}, -}, MAX_CAPACITY, recycling::{self, Recycle}, util::Backoff, wait::queue}; +use crate::{ + loom::{ + atomic::{self, Ordering}, + sync::Arc, + thread::{self, Thread}, + }, + recycling::{self, Recycle}, + util::Backoff, + wait::queue, + MAX_CAPACITY, +}; use core::{fmt, pin::Pin}; use errors::*; use std::time::{Duration, Instant}; From a2a33c6d4776d53e64d5d86b99e07df73ef46533 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Sat, 6 Apr 2024 10:26:28 -0700 Subject: [PATCH 8/9] fix blocking `Sender::is_empty` doctests failing these tests drop the rx, closing the channel. thus, the test fails. --- src/mpsc/blocking.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/mpsc/blocking.rs b/src/mpsc/blocking.rs index e4ad3a8..a066752 100644 --- a/src/mpsc/blocking.rs +++ b/src/mpsc/blocking.rs @@ -647,12 +647,15 @@ feature! { /// /// static CHANNEL: StaticChannel = StaticChannel::new(); /// - /// let (tx, _) = CHANNEL.split(); + /// let (tx, rx) = CHANNEL.split(); /// assert!(tx.is_empty()); /// /// *tx.try_send_ref().unwrap() = 1; /// /// assert!(!tx.is_empty()); + /// # // keep rx alive so that the send call succeeds --- otherwise, + /// # // the channel will close and the send will fail. + /// # drop(rx); /// ``` /// #[inline] @@ -1585,12 +1588,15 @@ where /// # Examples /// ``` /// use thingbuf::mpsc::blocking::channel; - /// let (tx, _) = channel::(100); + /// let (tx, rx) = channel::(100); /// assert!(tx.is_empty()); /// /// *tx.try_send_ref().unwrap() = 1; /// /// assert!(!tx.is_empty()); + /// # // keep rx alive so that the send call succeeds --- otherwise, + /// # // the channel will close and the send will fail. + /// # drop(rx); /// ``` /// #[inline] From cbb6bff1744f7f8c6a02813812f0fe2cf11c3a03 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Sat, 6 Apr 2024 10:53:02 -0700 Subject: [PATCH 9/9] fix docs typo --- src/mpsc/blocking.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/mpsc/blocking.rs b/src/mpsc/blocking.rs index a066752..49a8aed 100644 --- a/src/mpsc/blocking.rs +++ b/src/mpsc/blocking.rs @@ -1111,7 +1111,8 @@ feature! { self.core.core.len() } - /// Returns whether the number of elements in the channel of this [`StatucReceiver`] is 0. + /// Returns whether the number of elements in the channel of this + /// [`StaticReceiver`] is 0. /// /// # Examples /// ```