Skip to content

Commit

Permalink
sync: handle panic during mpsc drop (#7094)
Browse files Browse the repository at this point in the history
  • Loading branch information
mox692 authored Jan 13, 2025
1 parent 435e390 commit a82bdee
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 2 deletions.
28 changes: 26 additions & 2 deletions tokio/src/sync/mpsc/chan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,10 +490,34 @@ impl<T, S: Semaphore> Drop for Rx<T, S> {

self.inner.rx_fields.with_mut(|rx_fields_ptr| {
let rx_fields = unsafe { &mut *rx_fields_ptr };
struct Guard<'a, T, S: Semaphore> {
list: &'a mut list::Rx<T>,
tx: &'a list::Tx<T>,
sem: &'a S,
}

impl<'a, T, S: Semaphore> Guard<'a, T, S> {
fn drain(&mut self) {
// call T's destructor.
while let Some(Value(_)) = self.list.pop(self.tx) {
self.sem.add_permit();
}
}
}

while let Some(Value(_)) = rx_fields.list.pop(&self.inner.tx) {
self.inner.semaphore.add_permit();
impl<'a, T, S: Semaphore> Drop for Guard<'a, T, S> {
fn drop(&mut self) {
self.drain();
}
}

let mut guard = Guard {
list: &mut rx_fields.list,
tx: &self.inner.tx,
sem: &self.inner.semaphore,
};

guard.drain();
});
}
}
Expand Down
46 changes: 46 additions & 0 deletions tokio/tests/sync_mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1454,4 +1454,50 @@ async fn test_is_empty_32_msgs() {
}
}

#[test]
#[cfg(not(panic = "abort"))]
fn drop_all_elements_during_panic() {
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::mpsc::UnboundedSender;

static COUNTER: AtomicUsize = AtomicUsize::new(0);

struct A(bool);
impl Drop for A {
// cause a panic when inner value is `true`.
fn drop(&mut self) {
COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if self.0 {
panic!("panic!")
}
}
}

fn func(tx: UnboundedSender<A>, rx: UnboundedReceiver<A>) {
tx.send(A(true)).unwrap();
tx.send(A(false)).unwrap();
tx.send(A(false)).unwrap();

drop(rx);

// `mpsc::Rx`'s drop is called and gets panicked while dropping the first value,
// but will keep dropping following elements.
}

let (tx, rx) = mpsc::unbounded_channel();

let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
func(tx.clone(), rx);
}));

// all A's destructor should be called at this point, even before `mpsc::Chan`'s
// drop gets called.
assert_eq!(COUNTER.load(Relaxed), 3);

drop(tx);
// `mpsc::Chan`'s drop is called, freeing the `Block` memory allocation.
}

fn is_debug<T: fmt::Debug>(_: &T) {}

0 comments on commit a82bdee

Please sign in to comment.