From a82bdeebe9560d22a0179ae7ff8ce3986202e24d Mon Sep 17 00:00:00 2001 From: Motoyuki Kimura Date: Tue, 14 Jan 2025 02:36:51 +0900 Subject: [PATCH] sync: handle panic during mpsc drop (#7094) --- tokio/src/sync/mpsc/chan.rs | 28 ++++++++++++++++++++-- tokio/tests/sync_mpsc.rs | 46 +++++++++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+), 2 deletions(-) diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index f4cedf0d4dd..1e6eaab1798 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -490,10 +490,34 @@ impl Drop for Rx { self.inner.rx_fields.with_mut(|rx_fields_ptr| { let rx_fields = unsafe { &mut *rx_fields_ptr }; + struct Guard<'a, T, S: Semaphore> { + list: &'a mut list::Rx, + tx: &'a list::Tx, + sem: &'a S, + } + + impl<'a, T, S: Semaphore> Guard<'a, T, S> { + fn drain(&mut self) { + // call T's destructor. + while let Some(Value(_)) = self.list.pop(self.tx) { + self.sem.add_permit(); + } + } + } - while let Some(Value(_)) = rx_fields.list.pop(&self.inner.tx) { - self.inner.semaphore.add_permit(); + impl<'a, T, S: Semaphore> Drop for Guard<'a, T, S> { + fn drop(&mut self) { + self.drain(); + } } + + let mut guard = Guard { + list: &mut rx_fields.list, + tx: &self.inner.tx, + sem: &self.inner.semaphore, + }; + + guard.drain(); }); } } diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index 638ced588ce..577e9c35faa 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -1454,4 +1454,50 @@ async fn test_is_empty_32_msgs() { } } +#[test] +#[cfg(not(panic = "abort"))] +fn drop_all_elements_during_panic() { + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering::Relaxed; + use tokio::sync::mpsc::UnboundedReceiver; + use tokio::sync::mpsc::UnboundedSender; + + static COUNTER: AtomicUsize = AtomicUsize::new(0); + + struct A(bool); + impl Drop for A { + // cause a panic when inner value is `true`. + fn drop(&mut self) { + COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + if self.0 { + panic!("panic!") + } + } + } + + fn func(tx: UnboundedSender, rx: UnboundedReceiver) { + tx.send(A(true)).unwrap(); + tx.send(A(false)).unwrap(); + tx.send(A(false)).unwrap(); + + drop(rx); + + // `mpsc::Rx`'s drop is called and gets panicked while dropping the first value, + // but will keep dropping following elements. + } + + let (tx, rx) = mpsc::unbounded_channel(); + + let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| { + func(tx.clone(), rx); + })); + + // all A's destructor should be called at this point, even before `mpsc::Chan`'s + // drop gets called. + assert_eq!(COUNTER.load(Relaxed), 3); + + drop(tx); + // `mpsc::Chan`'s drop is called, freeing the `Block` memory allocation. +} + fn is_debug(_: &T) {}