Skip to content

Commit

Permalink
reactor: properly reset registered wakers state when needed (async-rs#13
Browse files Browse the repository at this point in the history
)

If the list of pending wakers grew too big and we woke everything,
also drop the bit from wakers_registered

Signed-off-by: Marc-Antoine Perennou <[email protected]>
  • Loading branch information
Keruspe authored Aug 20, 2020
1 parent e91d306 commit defb205
Showing 1 changed file with 13 additions and 4 deletions.
17 changes: 13 additions & 4 deletions src/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,10 @@ impl Source {
// Register the current task's waker if not present already.
if w.readers.iter().all(|w| !w.will_wake(cx.waker())) {
w.readers.push(cx.waker().clone());
limit_waker_list(&mut w.readers);
if limit_waker_list(&mut w.readers) {
self.wakers_registered
.fetch_and(!Self::READERS_REGISTERED, Ordering::SeqCst);
}
}

// Remember the current ticks.
Expand Down Expand Up @@ -527,7 +530,10 @@ impl Source {
// Register the current task's waker if not present already.
if w.writers.iter().all(|w| !w.will_wake(cx.waker())) {
w.writers.push(cx.waker().clone());
limit_waker_list(&mut w.writers);
if limit_waker_list(&mut w.writers) {
self.wakers_registered
.fetch_and(!Self::WRITERS_REGISTERED, Ordering::SeqCst);
}
}

// Remember the current ticks.
Expand All @@ -549,7 +555,7 @@ impl Source {
}
}

/// Wakes up all wakers in the list if it grew too big.
/// Wakes up all wakers in the list if it grew too big and returns whether it did.
///
/// The waker list keeps growing in pathological cases where a single async I/O handle has lots of
/// different reader or writer tasks. If the number of interested wakers crosses some threshold, we
Expand All @@ -562,11 +568,14 @@ impl Source {
/// However, we don't worry about such scenarios because it's very unlikely to have more than two
/// actually concurrent tasks operating on a single async I/O handle. If we happen to cross the
/// aforementioned threshold, we have bigger problems to worry about.
fn limit_waker_list(wakers: &mut Vec<Waker>) {
fn limit_waker_list(wakers: &mut Vec<Waker>) -> bool {
if wakers.len() > 50 {
for waker in wakers.drain(..) {
// Don't let a panicking waker blow everything up.
let _ = panic::catch_unwind(|| waker.wake());
}
true
} else {
false
}
}

0 comments on commit defb205

Please sign in to comment.