From 8eb44252cd3451b7f1b91d5d4128f569099269a0 Mon Sep 17 00:00:00 2001 From: jtnunley Date: Sat, 8 Apr 2023 17:21:51 -0700 Subject: [PATCH 1/9] Replace futures-util with async-task --- Cargo.toml | 6 +- src/sources/futures.rs | 245 ++++++++++++++++++++++++++++++++--------- 2 files changed, 193 insertions(+), 58 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3056a655..a6524de6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,19 +22,19 @@ codecov = { repository = "Smithay/calloop" } io-lifetimes = "1.0.3" log = "0.4" nix = { version = "0.26", default-features = false, features = ["event", "fs", "signal", "socket", "time"] } -futures-util = { version = "0.3.5", optional = true, default-features = false, features = ["std"]} +async-task = { version = "4.4.0", optional = true } futures-io = { version = "0.3.5", optional = true } thiserror = "1.0" pin-utils = { version = "0.1.0", optional = true } +slab = "0.4.8" polling = "2.6.0" -slab = "0.4.7" [dev-dependencies] futures = "0.3.5" [features] block_on = ["pin-utils"] -executor = ["futures-util"] +executor = ["async-task"] [package.metadata.docs.rs] all-features = true diff --git a/src/sources/futures.rs b/src/sources/futures.rs index f44ffc95..75997839 100644 --- a/src/sources/futures.rs +++ b/src/sources/futures.rs @@ -17,16 +17,14 @@ //! **Note:** The futures must have their own means of being woken up, as this executor is, //! by itself, not I/O aware. See [`LoopHandle::adapt_io`](crate::LoopHandle#method.adapt_io) //! for that, or you can use some other mechanism if you prefer. -use std::{future::Future, pin::Pin, sync::Arc}; -use futures_util::{ - stream::{FuturesUnordered, Stream}, - task::{waker_ref, ArcWake, Context, LocalFutureObj, Poll as FutPoll}, -}; +use async_task::{Builder, Runnable}; +use slab::Slab; +use std::{cell::RefCell, collections::VecDeque, future::Future, rc::Rc, task::Waker}; use crate::{ sources::{ - channel::{channel, Channel, ChannelError, Event, Sender}, + channel::ChannelError, ping::{make_ping, Ping, PingError, PingSource}, EventSource, }, @@ -36,16 +34,53 @@ use crate::{ /// A future executor as an event source #[derive(Debug)] pub struct Executor { - futures: FuturesUnordered>, - new_futures: Channel>, - ready_futures: PingSource, - waker: Arc, + /// Shared state between the executor and the scheduler. + state: Rc>, + + /// The ping source to register into the poller. + ping_source: PingSource, } /// A scheduler to send futures to an executor #[derive(Clone, Debug)] pub struct Scheduler { - sender: Sender>, + state: Rc>, +} + +/// The inner state of the executor. +#[derive(Debug)] +struct State { + /// The incoming queue of futures to be executed. + incoming: RefCell>>, + + /// The list of currently active tasks. + /// + /// This is set to `None` when the executor is destroyed. + active: RefCell>>>, + + /// The ping to wake up the executor. + ping: Ping, +} + +/// An active future or its result. +#[derive(Debug)] +enum Active { + /// The future is currently being polled. + /// + /// Waking this waker will insert the runnable into `incoming`. + Future(Waker), + + /// The future has finished polling, and its result is stored here. + Finished(T), +} + +impl Active { + fn is_finished(&self) -> bool { + match self { + Active::Finished(_) => true, + _ => false, + } + } } impl Scheduler { @@ -56,8 +91,99 @@ impl Scheduler { where Fut: Future, { - let obj = LocalFutureObj::new(Box::new(future)); - self.sender.send(obj).map_err(|_| ExecutorDestroyed) + /// Store this future's result in the executor. + struct StoreOnDrop<'a, T> { + index: usize, + value: Option, + state: &'a State, + } + + impl Drop for StoreOnDrop<'_, T> { + fn drop(&mut self) { + let mut active = self.state.active.borrow_mut(); + if let Some(active) = active.as_mut() { + if let Some(value) = self.value.take() { + active[self.index] = Active::Finished(value); + } else { + // The future was dropped before it finished. + // Remove it from the active list. + active.remove(self.index); + } + } + } + } + + let mut active_guard = self.state.active.borrow_mut(); + let active = active_guard.as_mut().ok_or(ExecutorDestroyed)?; + + // Wrap the future in another future that polls it and stores the result. + let index = active.vacant_key(); + let future = { + let state = self.state.clone(); + async move { + let mut guard = StoreOnDrop { + index, + value: None, + state: &state, + }; + + // Get the value of the future. + let value = future.await; + + // Store it in the executor. + guard.value = Some(value); + } + }; + + // A schedule function that inserts the runnable into the incoming queue. + let schedule = { + let state = self.state.clone(); + move |runnable| { + let mut incoming = state.incoming.borrow_mut(); + incoming.push_back(runnable); + + // Wake up the executor. + state.ping.ping(); + } + }; + + // Spawn the future. + let (runnable, task) = { + let builder = Builder::new().metadata(index).propagate_panic(true); + + // SAFETY: todo + unsafe { builder.spawn_unchecked(move |_| future, schedule) } + }; + + // Insert the runnable into the set of active tasks. + active.insert(Active::Future(runnable.waker())); + drop(active_guard); + + // Schedule the runnable and detach the task so it isn't cancellable. + runnable.schedule(); + task.detach(); + + Ok(()) + } +} + +impl Drop for Executor { + fn drop(&mut self) { + let active = self.state.active.borrow_mut().take().unwrap(); + + // Wake all of the active tasks in order to destroy their runnables. + for (_, task) in active { + // Don't let a panicking waker blow everything up. + std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + if let Active::Future(waker) = task { + waker.wake(); + } + })) + .ok(); + } + + // Drain the queue in order to drop all of the runnables. + self.state.incoming.borrow_mut().clear(); } } @@ -67,32 +193,24 @@ impl Scheduler { #[error("the executor was destroyed")] pub struct ExecutorDestroyed; -#[derive(Debug)] -struct ExecWaker { - ping: Ping, -} - -impl ArcWake for ExecWaker { - fn wake_by_ref(arc_self: &Arc) { - arc_self.ping.ping(); - } -} - /// Create a new executor, and its associated scheduler /// /// May fail due to OS errors preventing calloop to setup its internal pipes (if your /// process has reatched its file descriptor limit for example). pub fn executor() -> crate::Result<(Executor, Scheduler)> { - let (ping, ready_futures) = make_ping()?; - let (sender, new_futures) = channel(); + let (ping, ping_source) = make_ping()?; + let state = Rc::new(State { + incoming: RefCell::new(VecDeque::new()), + active: RefCell::new(Some(Slab::new())), + ping, + }); + Ok(( Executor { - futures: FuturesUnordered::new(), - new_futures, - ready_futures, - waker: Arc::new(ExecWaker { ping }), + state: state.clone(), + ping_source, }, - Scheduler { sender }, + Scheduler { state }, )) } @@ -111,34 +229,53 @@ impl EventSource for Executor { where F: FnMut(T, &mut ()), { - // fetch all newly inserted futures and push them to the container - let futures = &mut self.futures; - self.new_futures - .process_events(readiness, token, |evt, _| { - if let Event::Msg(fut) = evt { - futures.push(fut); - } - }) - .map_err(ExecutorError::NewFutureError)?; + // Process all of the newly inserted futures. + let clear_readiness = { + let mut incoming = self.state.incoming.borrow_mut(); + let mut clear_readiness = false; - // process ping events to make it non-ready again - self.ready_futures - .process_events(readiness, token, |(), _| {}) - .map_err(ExecutorError::WakeError)?; + // Only process a limited number of tasks at a time; better to move onto the next event soon. + for _ in 0..1024 { + if let Some(runnable) = incoming.pop_front() { + let index = *runnable.metadata(); + runnable.run(); - // advance all available futures as much as possible - let waker = waker_ref(&self.waker); - let mut cx = Context::from_waker(&waker); + // If the runnable finished with a result, call the callback. + let mut active_guard = self.state.active.borrow_mut(); + let active = active_guard.as_mut().unwrap(); - while let FutPoll::Ready(Some(ret)) = Pin::new(&mut self.futures).poll_next(&mut cx) { - callback(ret, &mut ()); + if let Some(state) = active.get(index) { + if state.is_finished() { + // Take out the state and provide it. + let result = match active.remove(index) { + Active::Finished(result) => result, + _ => unreachable!(), + }; + + callback(result, &mut ()); + } + } + } else { + clear_readiness = true; + break; + } + } + + clear_readiness + }; + + // Clear the readiness of the ping event if we processed all of the incoming tasks. + if clear_readiness { + self.ping_source + .process_events(readiness, token, |(), &mut ()| {}) + .map_err(ExecutorError::WakeError)?; } + Ok(PostAction::Continue) } fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> { - self.new_futures.register(poll, token_factory)?; - self.ready_futures.register(poll, token_factory)?; + self.ping_source.register(poll, token_factory)?; Ok(()) } @@ -147,14 +284,12 @@ impl EventSource for Executor { poll: &mut Poll, token_factory: &mut TokenFactory, ) -> crate::Result<()> { - self.new_futures.reregister(poll, token_factory)?; - self.ready_futures.reregister(poll, token_factory)?; + self.ping_source.reregister(poll, token_factory)?; Ok(()) } fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> { - self.new_futures.unregister(poll)?; - self.ready_futures.unregister(poll)?; + self.ping_source.unregister(poll)?; Ok(()) } } From 3de9130425c203fc6c4b9dfc1b2fa403e35f08f9 Mon Sep 17 00:00:00 2001 From: jtnunley Date: Sun, 9 Apr 2023 09:39:58 -0700 Subject: [PATCH 2/9] Fix soundness mistakes --- src/sources/futures.rs | 143 +++++++++++++++++++++-------------------- 1 file changed, 74 insertions(+), 69 deletions(-) diff --git a/src/sources/futures.rs b/src/sources/futures.rs index 75997839..7abc2a28 100644 --- a/src/sources/futures.rs +++ b/src/sources/futures.rs @@ -20,15 +20,15 @@ use async_task::{Builder, Runnable}; use slab::Slab; -use std::{cell::RefCell, collections::VecDeque, future::Future, rc::Rc, task::Waker}; +use std::{cell::RefCell, future::Future, rc::Rc, task::Waker}; use crate::{ sources::{ - channel::ChannelError, - ping::{make_ping, Ping, PingError, PingSource}, + channel::{ChannelError, channel, Channel, Sender}, + ping::PingError, EventSource, }, - Poll, PostAction, Readiness, Token, TokenFactory, + Poll, PostAction, Readiness, Token, TokenFactory, channel::Event, }; /// A future executor as an event source @@ -37,29 +37,27 @@ pub struct Executor { /// Shared state between the executor and the scheduler. state: Rc>, - /// The ping source to register into the poller. - ping_source: PingSource, + /// The incoming queue of futures to be executed. + incoming: Channel>, } /// A scheduler to send futures to an executor #[derive(Clone, Debug)] pub struct Scheduler { + /// Shared state between the executor and the scheduler. state: Rc>, + + /// The sender used to send runnables to the executor. + sender: Sender>, } /// The inner state of the executor. #[derive(Debug)] struct State { - /// The incoming queue of futures to be executed. - incoming: RefCell>>, - /// The list of currently active tasks. /// /// This is set to `None` when the executor is destroyed. active: RefCell>>>, - - /// The ping to wake up the executor. - ping: Ping, } /// An active future or its result. @@ -137,13 +135,13 @@ impl Scheduler { // A schedule function that inserts the runnable into the incoming queue. let schedule = { - let state = self.state.clone(); + let sender = self.sender.clone(); move |runnable| { - let mut incoming = state.incoming.borrow_mut(); - incoming.push_back(runnable); - - // Wake up the executor. - state.ping.ping(); + if sender.send(runnable).is_err() { + // This shouldn't be able to happen since all of the tasks are destroyed before the + // executor is. This indicates a critical soundness bug. + std::process::abort(); + } } }; @@ -151,7 +149,33 @@ impl Scheduler { let (runnable, task) = { let builder = Builder::new().metadata(index).propagate_panic(true); - // SAFETY: todo + // SAFETY: spawn_unchecked has four safety requirements: + // + // - "If future is not Send, its Runnable must be used and dropped on the original thread." + // + // The runnable is created on the origin thread and sent to the origin thread, since both + // Scheduler and Executor are !Send and !Sync. The waker may be sent to another thread, + // which means that the scheduler function (and the Runnable it handles) can exist on + // another thread. However, the scheduler function immediately sends it back to the origin + // thread. The channel is always kept open in this case, since all tasks are destroyed + // once the Executor is dropped. This means that the runnable will always be sent back + // to the origin thread. + // + // - "If future is not 'static, borrowed variables must outlive its Runnable." + // + // `F` is `'static`, so we don't have to worry about this one. + // + // - "If schedule is not Send and Sync, the task’s Waker must be used and dropped on the + // original thread." + // + // The schedule function uses a thread-safe MPSC channel to send the runnable back to the + // origin thread. This means that the waker can be sent to another thread, which satisfies + // this requirement. + // + // - "If schedule is not 'static, borrowed variables must outlive the task’s Waker." + // + // `schedule` and the types it handles are `'static`, so we don't have to worry about + // this one. unsafe { builder.spawn_unchecked(move |_| future, schedule) } }; @@ -183,7 +207,7 @@ impl Drop for Executor { } // Drain the queue in order to drop all of the runnables. - self.state.incoming.borrow_mut().clear(); + while self.incoming.try_recv().is_ok() {} } } @@ -198,19 +222,17 @@ pub struct ExecutorDestroyed; /// May fail due to OS errors preventing calloop to setup its internal pipes (if your /// process has reatched its file descriptor limit for example). pub fn executor() -> crate::Result<(Executor, Scheduler)> { - let (ping, ping_source) = make_ping()?; + let (sender, channel) = channel(); let state = Rc::new(State { - incoming: RefCell::new(VecDeque::new()), active: RefCell::new(Some(Slab::new())), - ping, }); Ok(( Executor { state: state.clone(), - ping_source, + incoming: channel }, - Scheduler { state }, + Scheduler { state, sender }, )) } @@ -229,53 +251,36 @@ impl EventSource for Executor { where F: FnMut(T, &mut ()), { - // Process all of the newly inserted futures. - let clear_readiness = { - let mut incoming = self.state.incoming.borrow_mut(); - let mut clear_readiness = false; - - // Only process a limited number of tasks at a time; better to move onto the next event soon. - for _ in 0..1024 { - if let Some(runnable) = incoming.pop_front() { - let index = *runnable.metadata(); - runnable.run(); - - // If the runnable finished with a result, call the callback. - let mut active_guard = self.state.active.borrow_mut(); - let active = active_guard.as_mut().unwrap(); - - if let Some(state) = active.get(index) { - if state.is_finished() { - // Take out the state and provide it. - let result = match active.remove(index) { - Active::Finished(result) => result, - _ => unreachable!(), - }; - - callback(result, &mut ()); - } - } - } else { - clear_readiness = true; - break; + let state = &self.state; + self.incoming.process_events(readiness, token, |event, &mut ()| { + let runnable = match event { + Event::Msg(runnable) => runnable, + _ => return, + }; + + let index = *runnable.metadata(); + runnable.run(); + + // If the runnable finished with a result, call the callback. + let mut active_guard = state.active.borrow_mut(); + let active = active_guard.as_mut().unwrap(); + + if let Some(state) = active.get(index) { + if state.is_finished() { + // Take out the state and provide it. + let result = match active.remove(index) { + Active::Finished(result) => result, + _ => unreachable!(), + }; + + callback(result, &mut ()); } } - - clear_readiness - }; - - // Clear the readiness of the ping event if we processed all of the incoming tasks. - if clear_readiness { - self.ping_source - .process_events(readiness, token, |(), &mut ()| {}) - .map_err(ExecutorError::WakeError)?; - } - - Ok(PostAction::Continue) + }).map_err(ExecutorError::NewFutureError) } fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> { - self.ping_source.register(poll, token_factory)?; + self.incoming.register(poll, token_factory)?; Ok(()) } @@ -284,12 +289,12 @@ impl EventSource for Executor { poll: &mut Poll, token_factory: &mut TokenFactory, ) -> crate::Result<()> { - self.ping_source.reregister(poll, token_factory)?; + self.incoming.reregister(poll, token_factory)?; Ok(()) } fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> { - self.ping_source.unregister(poll)?; + self.incoming.unregister(poll)?; Ok(()) } } From ad206c719160687522176e4bc87cce57b0b43094 Mon Sep 17 00:00:00 2001 From: jtnunley Date: Sun, 9 Apr 2023 09:40:52 -0700 Subject: [PATCH 3/9] fmt all --- src/sources/futures.rs | 71 ++++++++++++++++++++++-------------------- 1 file changed, 37 insertions(+), 34 deletions(-) diff --git a/src/sources/futures.rs b/src/sources/futures.rs index 7abc2a28..0ce97162 100644 --- a/src/sources/futures.rs +++ b/src/sources/futures.rs @@ -23,12 +23,13 @@ use slab::Slab; use std::{cell::RefCell, future::Future, rc::Rc, task::Waker}; use crate::{ + channel::Event, sources::{ - channel::{ChannelError, channel, Channel, Sender}, + channel::{channel, Channel, ChannelError, Sender}, ping::PingError, EventSource, }, - Poll, PostAction, Readiness, Token, TokenFactory, channel::Event, + Poll, PostAction, Readiness, Token, TokenFactory, }; /// A future executor as an event source @@ -48,7 +49,7 @@ pub struct Scheduler { state: Rc>, /// The sender used to send runnables to the executor. - sender: Sender>, + sender: Sender>, } /// The inner state of the executor. @@ -150,7 +151,7 @@ impl Scheduler { let builder = Builder::new().metadata(index).propagate_panic(true); // SAFETY: spawn_unchecked has four safety requirements: - // + // // - "If future is not Send, its Runnable must be used and dropped on the original thread." // // The runnable is created on the origin thread and sent to the origin thread, since both @@ -162,18 +163,18 @@ impl Scheduler { // to the origin thread. // // - "If future is not 'static, borrowed variables must outlive its Runnable." - // + // // `F` is `'static`, so we don't have to worry about this one. - // + // // - "If schedule is not Send and Sync, the task’s Waker must be used and dropped on the // original thread." - // + // // The schedule function uses a thread-safe MPSC channel to send the runnable back to the // origin thread. This means that the waker can be sent to another thread, which satisfies // this requirement. - // + // // - "If schedule is not 'static, borrowed variables must outlive the task’s Waker." - // + // // `schedule` and the types it handles are `'static`, so we don't have to worry about // this one. unsafe { builder.spawn_unchecked(move |_| future, schedule) } @@ -230,7 +231,7 @@ pub fn executor() -> crate::Result<(Executor, Scheduler)> { Ok(( Executor { state: state.clone(), - incoming: channel + incoming: channel, }, Scheduler { state, sender }, )) @@ -252,31 +253,33 @@ impl EventSource for Executor { F: FnMut(T, &mut ()), { let state = &self.state; - self.incoming.process_events(readiness, token, |event, &mut ()| { - let runnable = match event { - Event::Msg(runnable) => runnable, - _ => return, - }; - - let index = *runnable.metadata(); - runnable.run(); - - // If the runnable finished with a result, call the callback. - let mut active_guard = state.active.borrow_mut(); - let active = active_guard.as_mut().unwrap(); - - if let Some(state) = active.get(index) { - if state.is_finished() { - // Take out the state and provide it. - let result = match active.remove(index) { - Active::Finished(result) => result, - _ => unreachable!(), - }; - - callback(result, &mut ()); + self.incoming + .process_events(readiness, token, |event, &mut ()| { + let runnable = match event { + Event::Msg(runnable) => runnable, + _ => return, + }; + + let index = *runnable.metadata(); + runnable.run(); + + // If the runnable finished with a result, call the callback. + let mut active_guard = state.active.borrow_mut(); + let active = active_guard.as_mut().unwrap(); + + if let Some(state) = active.get(index) { + if state.is_finished() { + // Take out the state and provide it. + let result = match active.remove(index) { + Active::Finished(result) => result, + _ => unreachable!(), + }; + + callback(result, &mut ()); + } } - } - }).map_err(ExecutorError::NewFutureError) + }) + .map_err(ExecutorError::NewFutureError) } fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> { From 0f9a7a5a296648c0eaf4c22be40b08b931b18031 Mon Sep 17 00:00:00 2001 From: jtnunley Date: Sun, 9 Apr 2023 13:51:57 -0700 Subject: [PATCH 4/9] Fix failing tests --- src/sources/futures.rs | 149 +++++++++++++++++++++++++++++++---------- 1 file changed, 113 insertions(+), 36 deletions(-) diff --git a/src/sources/futures.rs b/src/sources/futures.rs index 0ce97162..02478f94 100644 --- a/src/sources/futures.rs +++ b/src/sources/futures.rs @@ -20,13 +20,21 @@ use async_task::{Builder, Runnable}; use slab::Slab; -use std::{cell::RefCell, future::Future, rc::Rc, task::Waker}; +use std::{ + cell::RefCell, + future::Future, + rc::Rc, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc, Arc, + }, + task::Waker, +}; use crate::{ - channel::Event, sources::{ - channel::{channel, Channel, ChannelError, Sender}, - ping::PingError, + channel::ChannelError, + ping::{make_ping, Ping, PingError, PingSource}, EventSource, }, Poll, PostAction, Readiness, Token, TokenFactory, @@ -38,8 +46,8 @@ pub struct Executor { /// Shared state between the executor and the scheduler. state: Rc>, - /// The incoming queue of futures to be executed. - incoming: Channel>, + /// Notifies us when the executor is woken up. + ping: PingSource, } /// A scheduler to send futures to an executor @@ -47,20 +55,38 @@ pub struct Executor { pub struct Scheduler { /// Shared state between the executor and the scheduler. state: Rc>, - - /// The sender used to send runnables to the executor. - sender: Sender>, } /// The inner state of the executor. #[derive(Debug)] struct State { + /// The incoming queue of runnables to be executed. + incoming: mpsc::Receiver>, + + /// The sender corresponding to `incoming`. + sender: Arc, + /// The list of currently active tasks. /// /// This is set to `None` when the executor is destroyed. active: RefCell>>>, } +/// Send a future to an executor. +/// +/// This needs to be thread-safe, as it is called from a `Waker` that may be on a different thread. +#[derive(Debug)] +struct Sender { + /// The sender used to send runnables to the executor. + sender: mpsc::Sender>, + + /// The ping source used to wake up the executor. + wake_up: Ping, + + /// Whether the executor has already been woken. + notified: AtomicBool, +} + /// An active future or its result. #[derive(Debug)] enum Active { @@ -136,19 +162,13 @@ impl Scheduler { // A schedule function that inserts the runnable into the incoming queue. let schedule = { - let sender = self.sender.clone(); - move |runnable| { - if sender.send(runnable).is_err() { - // This shouldn't be able to happen since all of the tasks are destroyed before the - // executor is. This indicates a critical soundness bug. - std::process::abort(); - } - } + let sender = self.state.sender.clone(); + move |runnable| sender.send(runnable) }; // Spawn the future. let (runnable, task) = { - let builder = Builder::new().metadata(index).propagate_panic(true); + let builder = Builder::new().metadata(index); // SAFETY: spawn_unchecked has four safety requirements: // @@ -158,9 +178,12 @@ impl Scheduler { // Scheduler and Executor are !Send and !Sync. The waker may be sent to another thread, // which means that the scheduler function (and the Runnable it handles) can exist on // another thread. However, the scheduler function immediately sends it back to the origin - // thread. The channel is always kept open in this case, since all tasks are destroyed - // once the Executor is dropped. This means that the runnable will always be sent back - // to the origin thread. + // thread. + // + // The issue then becomes "is the Runnable dropped unsoundly when the channel is closed?" + // This problem is circumvented by immediately waking all runnables (pushes them into the + // queue) and then draining the queue. This means that, before the channel is closed, all + // runnables will be destroyed, preventing any unsoundness. // // - "If future is not 'static, borrowed variables must outlive its Runnable." // @@ -192,6 +215,29 @@ impl Scheduler { } } +impl Sender { + /// Send a runnable to the executor. + fn send(&self, runnable: Runnable) { + // Send on the channel. + if let Err(e) = self.sender.send(runnable) { + // Make sure the runnable is never dropped. + std::mem::forget(e); + + // This shouldn't be able to happen since all of the tasks are destroyed before the + // executor is. This indicates a critical soundness bug. + std::process::abort(); + } + + // If the executor is already awake, don't bother waking it up again. + if self.notified.swap(true, Ordering::SeqCst) { + return; + } + + // Wake the executor. + self.wake_up.ping(); + } +} + impl Drop for Executor { fn drop(&mut self) { let active = self.state.active.borrow_mut().take().unwrap(); @@ -208,7 +254,7 @@ impl Drop for Executor { } // Drain the queue in order to drop all of the runnables. - while self.incoming.try_recv().is_ok() {} + while self.state.incoming.try_recv().is_ok() {} } } @@ -223,17 +269,25 @@ pub struct ExecutorDestroyed; /// May fail due to OS errors preventing calloop to setup its internal pipes (if your /// process has reatched its file descriptor limit for example). pub fn executor() -> crate::Result<(Executor, Scheduler)> { - let (sender, channel) = channel(); + let (sender, incoming) = mpsc::channel(); + let (wake_up, ping) = make_ping()?; + let state = Rc::new(State { + incoming, active: RefCell::new(Some(Slab::new())), + sender: Arc::new(Sender { + sender, + wake_up, + notified: AtomicBool::new(false), + }), }); Ok(( Executor { state: state.clone(), - incoming: channel, + ping, }, - Scheduler { state, sender }, + Scheduler { state }, )) } @@ -253,13 +307,25 @@ impl EventSource for Executor { F: FnMut(T, &mut ()), { let state = &self.state; - self.incoming - .process_events(readiness, token, |event, &mut ()| { - let runnable = match event { - Event::Msg(runnable) => runnable, - _ => return, + + // Set to the unnotified state. + state.sender.notified.store(false, Ordering::SeqCst); + + let clear_readiness = { + let mut clear_readiness = false; + + // Process runnables, but not too many at a time; better to move onto the next event quickly! + for _ in 0..1024 { + let runnable = match state.incoming.try_recv() { + Ok(runnable) => runnable, + Err(_) => { + // Make sure to clear the readiness if there are no more runnables. + clear_readiness = true; + break; + } }; + // Run the runnable. let index = *runnable.metadata(); runnable.run(); @@ -269,7 +335,7 @@ impl EventSource for Executor { if let Some(state) = active.get(index) { if state.is_finished() { - // Take out the state and provide it. + // Take out the state and provide it to the caller. let result = match active.remove(index) { Active::Finished(result) => result, _ => unreachable!(), @@ -278,12 +344,23 @@ impl EventSource for Executor { callback(result, &mut ()); } } - }) - .map_err(ExecutorError::NewFutureError) + } + + clear_readiness + }; + + // Clear the readiness of the ping source if there are no more runnables. + if clear_readiness { + self.ping + .process_events(readiness, token, |(), &mut ()| {}) + .map_err(ExecutorError::WakeError)?; + } + + Ok(PostAction::Continue) } fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> { - self.incoming.register(poll, token_factory)?; + self.ping.register(poll, token_factory)?; Ok(()) } @@ -292,12 +369,12 @@ impl EventSource for Executor { poll: &mut Poll, token_factory: &mut TokenFactory, ) -> crate::Result<()> { - self.incoming.reregister(poll, token_factory)?; + self.ping.reregister(poll, token_factory)?; Ok(()) } fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> { - self.incoming.unregister(poll)?; + self.ping.unregister(poll)?; Ok(()) } } From acb7448450e9da3461ca19270d57bad65bd597e9 Mon Sep 17 00:00:00 2001 From: jtnunley Date: Sun, 9 Apr 2023 13:58:36 -0700 Subject: [PATCH 5/9] Make sure schedule is Send+Sync I keep forgetting that mpsc::Sender is Send but !Sync. Wrapping it in a mutex fixes that, at the cost of some overhead. Consider switching to crossbeam-queue or concurrent-queue in the future. --- src/sources/futures.rs | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/src/sources/futures.rs b/src/sources/futures.rs index 02478f94..c0f2084f 100644 --- a/src/sources/futures.rs +++ b/src/sources/futures.rs @@ -26,7 +26,7 @@ use std::{ rc::Rc, sync::{ atomic::{AtomicBool, Ordering}, - mpsc, Arc, + mpsc, Arc, Mutex, }, task::Waker, }; @@ -78,7 +78,7 @@ struct State { #[derive(Debug)] struct Sender { /// The sender used to send runnables to the executor. - sender: mpsc::Sender>, + sender: Mutex>>, /// The ping source used to wake up the executor. wake_up: Ping, @@ -138,6 +138,8 @@ impl Scheduler { } } + fn assert_send_and_sync(_: &T) {} + let mut active_guard = self.state.active.borrow_mut(); let active = active_guard.as_mut().ok_or(ExecutorDestroyed)?; @@ -166,6 +168,8 @@ impl Scheduler { move |runnable| sender.send(runnable) }; + assert_send_and_sync(&schedule); + // Spawn the future. let (runnable, task) = { let builder = Builder::new().metadata(index); @@ -219,7 +223,12 @@ impl Sender { /// Send a runnable to the executor. fn send(&self, runnable: Runnable) { // Send on the channel. - if let Err(e) = self.sender.send(runnable) { + if let Err(e) = self + .sender + .lock() + .unwrap_or_else(|e| e.into_inner()) + .send(runnable) + { // Make sure the runnable is never dropped. std::mem::forget(e); @@ -276,7 +285,7 @@ pub fn executor() -> crate::Result<(Executor, Scheduler)> { incoming, active: RefCell::new(Some(Slab::new())), sender: Arc::new(Sender { - sender, + sender: Mutex::new(sender), wake_up, notified: AtomicBool::new(false), }), From 611f0c41246c2f59de0ed989ae6b1dd07270c182 Mon Sep 17 00:00:00 2001 From: jtnunley Date: Mon, 10 Apr 2023 09:58:21 -0700 Subject: [PATCH 6/9] Review comments --- src/sources/futures.rs | 59 +++++++++++++++++++++++------------------- 1 file changed, 32 insertions(+), 27 deletions(-) diff --git a/src/sources/futures.rs b/src/sources/futures.rs index c0f2084f..c0599e4a 100644 --- a/src/sources/futures.rs +++ b/src/sources/futures.rs @@ -69,7 +69,7 @@ struct State { /// The list of currently active tasks. /// /// This is set to `None` when the executor is destroyed. - active: RefCell>>>, + active_tasks: RefCell>>>, } /// Send a future to an executor. @@ -78,6 +78,8 @@ struct State { #[derive(Debug)] struct Sender { /// The sender used to send runnables to the executor. + /// + /// `mpsc::Sender` is `!Sync`, wrapping it in a `Mutex` makes it `Sync`. sender: Mutex>>, /// The ping source used to wake up the executor. @@ -125,14 +127,14 @@ impl Scheduler { impl Drop for StoreOnDrop<'_, T> { fn drop(&mut self) { - let mut active = self.state.active.borrow_mut(); - if let Some(active) = active.as_mut() { + let mut active_tasks = self.state.active_tasks.borrow_mut(); + if let Some(active_tasks) = active_tasks.as_mut() { if let Some(value) = self.value.take() { - active[self.index] = Active::Finished(value); + active_tasks[self.index] = Active::Finished(value); } else { // The future was dropped before it finished. // Remove it from the active list. - active.remove(self.index); + active_tasks.remove(self.index); } } } @@ -140,11 +142,11 @@ impl Scheduler { fn assert_send_and_sync(_: &T) {} - let mut active_guard = self.state.active.borrow_mut(); - let active = active_guard.as_mut().ok_or(ExecutorDestroyed)?; + let mut active_guard = self.state.active_tasks.borrow_mut(); + let active_tasks = active_guard.as_mut().ok_or(ExecutorDestroyed)?; // Wrap the future in another future that polls it and stores the result. - let index = active.vacant_key(); + let index = active_tasks.vacant_key(); let future = { let state = self.state.clone(); async move { @@ -208,7 +210,7 @@ impl Scheduler { }; // Insert the runnable into the set of active tasks. - active.insert(Active::Future(runnable.waker())); + active_tasks.insert(Active::Future(runnable.waker())); drop(active_guard); // Schedule the runnable and detach the task so it isn't cancellable. @@ -223,17 +225,23 @@ impl Sender { /// Send a runnable to the executor. fn send(&self, runnable: Runnable) { // Send on the channel. + // + // All we do with the lock is call `send`, so there's no chance of any state being corrupted on + // panic. Therefore it's safe to ignore the mutex poison. if let Err(e) = self .sender .lock() .unwrap_or_else(|e| e.into_inner()) .send(runnable) { - // Make sure the runnable is never dropped. - std::mem::forget(e); + // The runnable must be dropped on its origin thread, since the original future might be + // !Send. This channel immediately sends it back to the Executor, which is pinned to the + // origin thread. The executor's Drop implementation will force all of the runnables to be + // dropped, therefore the channel should always be available. If we can't send the runnable, + // it indicates that the above behavior is broken and that unsoundness has occurred. The + // only option at this stage is to abort the process. - // This shouldn't be able to happen since all of the tasks are destroyed before the - // executor is. This indicates a critical soundness bug. + std::mem::forget(e); std::process::abort(); } @@ -249,17 +257,14 @@ impl Sender { impl Drop for Executor { fn drop(&mut self) { - let active = self.state.active.borrow_mut().take().unwrap(); + let active_tasks = self.state.active_tasks.borrow_mut().take().unwrap(); // Wake all of the active tasks in order to destroy their runnables. - for (_, task) in active { - // Don't let a panicking waker blow everything up. - std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { - if let Active::Future(waker) = task { - waker.wake(); - } - })) - .ok(); + for (_, task) in active_tasks { + if let Active::Future(waker) = task { + // Don't let a panicking waker blow everything up. + std::panic::catch_unwind(|| waker.wake()).ok(); + } } // Drain the queue in order to drop all of the runnables. @@ -283,7 +288,7 @@ pub fn executor() -> crate::Result<(Executor, Scheduler)> { let state = Rc::new(State { incoming, - active: RefCell::new(Some(Slab::new())), + active_tasks: RefCell::new(Some(Slab::new())), sender: Arc::new(Sender { sender: Mutex::new(sender), wake_up, @@ -339,13 +344,13 @@ impl EventSource for Executor { runnable.run(); // If the runnable finished with a result, call the callback. - let mut active_guard = state.active.borrow_mut(); - let active = active_guard.as_mut().unwrap(); + let mut active_guard = state.active_tasks.borrow_mut(); + let active_tasks = active_guard.as_mut().unwrap(); - if let Some(state) = active.get(index) { + if let Some(state) = active_tasks.get(index) { if state.is_finished() { // Take out the state and provide it to the caller. - let result = match active.remove(index) { + let result = match active_tasks.remove(index) { Active::Finished(result) => result, _ => unreachable!(), }; From 202f9a7f4a51205ddf9d70dc199ecbc1d52ca127 Mon Sep 17 00:00:00 2001 From: jtnunley Date: Mon, 17 Apr 2023 13:28:53 -0700 Subject: [PATCH 7/9] Remove unsafe code from futures.rs --- src/sources/futures.rs | 39 ++++----------------------------------- 1 file changed, 4 insertions(+), 35 deletions(-) diff --git a/src/sources/futures.rs b/src/sources/futures.rs index c0599e4a..8b57cb7a 100644 --- a/src/sources/futures.rs +++ b/src/sources/futures.rs @@ -117,6 +117,7 @@ impl Scheduler { pub fn schedule(&self, future: Fut) -> Result<(), ExecutorDestroyed> where Fut: Future, + T: 'static, { /// Store this future's result in the executor. struct StoreOnDrop<'a, T> { @@ -173,41 +174,9 @@ impl Scheduler { assert_send_and_sync(&schedule); // Spawn the future. - let (runnable, task) = { - let builder = Builder::new().metadata(index); - - // SAFETY: spawn_unchecked has four safety requirements: - // - // - "If future is not Send, its Runnable must be used and dropped on the original thread." - // - // The runnable is created on the origin thread and sent to the origin thread, since both - // Scheduler and Executor are !Send and !Sync. The waker may be sent to another thread, - // which means that the scheduler function (and the Runnable it handles) can exist on - // another thread. However, the scheduler function immediately sends it back to the origin - // thread. - // - // The issue then becomes "is the Runnable dropped unsoundly when the channel is closed?" - // This problem is circumvented by immediately waking all runnables (pushes them into the - // queue) and then draining the queue. This means that, before the channel is closed, all - // runnables will be destroyed, preventing any unsoundness. - // - // - "If future is not 'static, borrowed variables must outlive its Runnable." - // - // `F` is `'static`, so we don't have to worry about this one. - // - // - "If schedule is not Send and Sync, the task’s Waker must be used and dropped on the - // original thread." - // - // The schedule function uses a thread-safe MPSC channel to send the runnable back to the - // origin thread. This means that the waker can be sent to another thread, which satisfies - // this requirement. - // - // - "If schedule is not 'static, borrowed variables must outlive the task’s Waker." - // - // `schedule` and the types it handles are `'static`, so we don't have to worry about - // this one. - unsafe { builder.spawn_unchecked(move |_| future, schedule) } - }; + let (runnable, task) = Builder::new() + .metadata(index) + .spawn_local(move |_| future, schedule); // Insert the runnable into the set of active tasks. active_tasks.insert(Active::Future(runnable.waker())); From 3ae9561d397759a009ef0132149a398c8f6fb104 Mon Sep 17 00:00:00 2001 From: jtnunley Date: Wed, 19 Apr 2023 13:03:50 -0700 Subject: [PATCH 8/9] Add unreachable instead of abort --- src/sources/futures.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sources/futures.rs b/src/sources/futures.rs index 8b57cb7a..bcf12f5c 100644 --- a/src/sources/futures.rs +++ b/src/sources/futures.rs @@ -208,10 +208,10 @@ impl Sender { // origin thread. The executor's Drop implementation will force all of the runnables to be // dropped, therefore the channel should always be available. If we can't send the runnable, // it indicates that the above behavior is broken and that unsoundness has occurred. The - // only option at this stage is to abort the process. + // only option at this stage is to forget the runnable and leak the future. std::mem::forget(e); - std::process::abort(); + unreachable!("Attempted to send runnable to a stopped executor"); } // If the executor is already awake, don't bother waking it up again. From d9f48fa7a0a43b97b4a7155eb4d32d97871520d5 Mon Sep 17 00:00:00 2001 From: jtnunley Date: Fri, 21 Apr 2023 08:57:40 -0700 Subject: [PATCH 9/9] Add comment explaining waker panic drop --- src/sources/futures.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/sources/futures.rs b/src/sources/futures.rs index bcf12f5c..7c5bd30d 100644 --- a/src/sources/futures.rs +++ b/src/sources/futures.rs @@ -232,6 +232,14 @@ impl Drop for Executor { for (_, task) in active_tasks { if let Active::Future(waker) = task { // Don't let a panicking waker blow everything up. + // + // There is a chance that a future will panic and, during the unwinding process, + // drop this executor. However, since the future panicked, there is a possibility + // that the internal state of the waker will be invalid in such a way that the waker + // panics as well. Since this would be a panic during a panic, Rust will upgrade it + // into an abort. + // + // In the interest of not aborting without a good reason, we just drop the panic here. std::panic::catch_unwind(|| waker.wake()).ok(); } }