diff --git a/Cargo.toml b/Cargo.toml index 3056a655..a6524de6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,19 +22,19 @@ codecov = { repository = "Smithay/calloop" } io-lifetimes = "1.0.3" log = "0.4" nix = { version = "0.26", default-features = false, features = ["event", "fs", "signal", "socket", "time"] } -futures-util = { version = "0.3.5", optional = true, default-features = false, features = ["std"]} +async-task = { version = "4.4.0", optional = true } futures-io = { version = "0.3.5", optional = true } thiserror = "1.0" pin-utils = { version = "0.1.0", optional = true } +slab = "0.4.8" polling = "2.6.0" -slab = "0.4.7" [dev-dependencies] futures = "0.3.5" [features] block_on = ["pin-utils"] -executor = ["futures-util"] +executor = ["async-task"] [package.metadata.docs.rs] all-features = true diff --git a/src/sources/futures.rs b/src/sources/futures.rs index f44ffc95..7c5bd30d 100644 --- a/src/sources/futures.rs +++ b/src/sources/futures.rs @@ -17,16 +17,23 @@ //! **Note:** The futures must have their own means of being woken up, as this executor is, //! by itself, not I/O aware. See [`LoopHandle::adapt_io`](crate::LoopHandle#method.adapt_io) //! for that, or you can use some other mechanism if you prefer. -use std::{future::Future, pin::Pin, sync::Arc}; -use futures_util::{ - stream::{FuturesUnordered, Stream}, - task::{waker_ref, ArcWake, Context, LocalFutureObj, Poll as FutPoll}, +use async_task::{Builder, Runnable}; +use slab::Slab; +use std::{ + cell::RefCell, + future::Future, + rc::Rc, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc, Arc, Mutex, + }, + task::Waker, }; use crate::{ sources::{ - channel::{channel, Channel, ChannelError, Event, Sender}, + channel::ChannelError, ping::{make_ping, Ping, PingError, PingSource}, EventSource, }, @@ -36,16 +43,71 @@ use crate::{ /// A future executor as an event source #[derive(Debug)] pub struct Executor { - futures: FuturesUnordered>, - new_futures: Channel>, - ready_futures: PingSource, - waker: Arc, + /// Shared state between the executor and the scheduler. + state: Rc>, + + /// Notifies us when the executor is woken up. + ping: PingSource, } /// A scheduler to send futures to an executor #[derive(Clone, Debug)] pub struct Scheduler { - sender: Sender>, + /// Shared state between the executor and the scheduler. + state: Rc>, +} + +/// The inner state of the executor. +#[derive(Debug)] +struct State { + /// The incoming queue of runnables to be executed. + incoming: mpsc::Receiver>, + + /// The sender corresponding to `incoming`. + sender: Arc, + + /// The list of currently active tasks. + /// + /// This is set to `None` when the executor is destroyed. + active_tasks: RefCell>>>, +} + +/// Send a future to an executor. +/// +/// This needs to be thread-safe, as it is called from a `Waker` that may be on a different thread. +#[derive(Debug)] +struct Sender { + /// The sender used to send runnables to the executor. + /// + /// `mpsc::Sender` is `!Sync`, wrapping it in a `Mutex` makes it `Sync`. + sender: Mutex>>, + + /// The ping source used to wake up the executor. + wake_up: Ping, + + /// Whether the executor has already been woken. + notified: AtomicBool, +} + +/// An active future or its result. +#[derive(Debug)] +enum Active { + /// The future is currently being polled. + /// + /// Waking this waker will insert the runnable into `incoming`. + Future(Waker), + + /// The future has finished polling, and its result is stored here. + Finished(T), +} + +impl Active { + fn is_finished(&self) -> bool { + match self { + Active::Finished(_) => true, + _ => false, + } + } } impl Scheduler { @@ -55,44 +117,168 @@ impl Scheduler { pub fn schedule(&self, future: Fut) -> Result<(), ExecutorDestroyed> where Fut: Future, + T: 'static, { - let obj = LocalFutureObj::new(Box::new(future)); - self.sender.send(obj).map_err(|_| ExecutorDestroyed) + /// Store this future's result in the executor. + struct StoreOnDrop<'a, T> { + index: usize, + value: Option, + state: &'a State, + } + + impl Drop for StoreOnDrop<'_, T> { + fn drop(&mut self) { + let mut active_tasks = self.state.active_tasks.borrow_mut(); + if let Some(active_tasks) = active_tasks.as_mut() { + if let Some(value) = self.value.take() { + active_tasks[self.index] = Active::Finished(value); + } else { + // The future was dropped before it finished. + // Remove it from the active list. + active_tasks.remove(self.index); + } + } + } + } + + fn assert_send_and_sync(_: &T) {} + + let mut active_guard = self.state.active_tasks.borrow_mut(); + let active_tasks = active_guard.as_mut().ok_or(ExecutorDestroyed)?; + + // Wrap the future in another future that polls it and stores the result. + let index = active_tasks.vacant_key(); + let future = { + let state = self.state.clone(); + async move { + let mut guard = StoreOnDrop { + index, + value: None, + state: &state, + }; + + // Get the value of the future. + let value = future.await; + + // Store it in the executor. + guard.value = Some(value); + } + }; + + // A schedule function that inserts the runnable into the incoming queue. + let schedule = { + let sender = self.state.sender.clone(); + move |runnable| sender.send(runnable) + }; + + assert_send_and_sync(&schedule); + + // Spawn the future. + let (runnable, task) = Builder::new() + .metadata(index) + .spawn_local(move |_| future, schedule); + + // Insert the runnable into the set of active tasks. + active_tasks.insert(Active::Future(runnable.waker())); + drop(active_guard); + + // Schedule the runnable and detach the task so it isn't cancellable. + runnable.schedule(); + task.detach(); + + Ok(()) } } -/// Error generated when trying to schedule a future after the -/// executor was destroyed. -#[derive(thiserror::Error, Debug)] -#[error("the executor was destroyed")] -pub struct ExecutorDestroyed; +impl Sender { + /// Send a runnable to the executor. + fn send(&self, runnable: Runnable) { + // Send on the channel. + // + // All we do with the lock is call `send`, so there's no chance of any state being corrupted on + // panic. Therefore it's safe to ignore the mutex poison. + if let Err(e) = self + .sender + .lock() + .unwrap_or_else(|e| e.into_inner()) + .send(runnable) + { + // The runnable must be dropped on its origin thread, since the original future might be + // !Send. This channel immediately sends it back to the Executor, which is pinned to the + // origin thread. The executor's Drop implementation will force all of the runnables to be + // dropped, therefore the channel should always be available. If we can't send the runnable, + // it indicates that the above behavior is broken and that unsoundness has occurred. The + // only option at this stage is to forget the runnable and leak the future. -#[derive(Debug)] -struct ExecWaker { - ping: Ping, + std::mem::forget(e); + unreachable!("Attempted to send runnable to a stopped executor"); + } + + // If the executor is already awake, don't bother waking it up again. + if self.notified.swap(true, Ordering::SeqCst) { + return; + } + + // Wake the executor. + self.wake_up.ping(); + } } -impl ArcWake for ExecWaker { - fn wake_by_ref(arc_self: &Arc) { - arc_self.ping.ping(); +impl Drop for Executor { + fn drop(&mut self) { + let active_tasks = self.state.active_tasks.borrow_mut().take().unwrap(); + + // Wake all of the active tasks in order to destroy their runnables. + for (_, task) in active_tasks { + if let Active::Future(waker) = task { + // Don't let a panicking waker blow everything up. + // + // There is a chance that a future will panic and, during the unwinding process, + // drop this executor. However, since the future panicked, there is a possibility + // that the internal state of the waker will be invalid in such a way that the waker + // panics as well. Since this would be a panic during a panic, Rust will upgrade it + // into an abort. + // + // In the interest of not aborting without a good reason, we just drop the panic here. + std::panic::catch_unwind(|| waker.wake()).ok(); + } + } + + // Drain the queue in order to drop all of the runnables. + while self.state.incoming.try_recv().is_ok() {} } } +/// Error generated when trying to schedule a future after the +/// executor was destroyed. +#[derive(thiserror::Error, Debug)] +#[error("the executor was destroyed")] +pub struct ExecutorDestroyed; + /// Create a new executor, and its associated scheduler /// /// May fail due to OS errors preventing calloop to setup its internal pipes (if your /// process has reatched its file descriptor limit for example). pub fn executor() -> crate::Result<(Executor, Scheduler)> { - let (ping, ready_futures) = make_ping()?; - let (sender, new_futures) = channel(); + let (sender, incoming) = mpsc::channel(); + let (wake_up, ping) = make_ping()?; + + let state = Rc::new(State { + incoming, + active_tasks: RefCell::new(Some(Slab::new())), + sender: Arc::new(Sender { + sender: Mutex::new(sender), + wake_up, + notified: AtomicBool::new(false), + }), + }); + Ok(( Executor { - futures: FuturesUnordered::new(), - new_futures, - ready_futures, - waker: Arc::new(ExecWaker { ping }), + state: state.clone(), + ping, }, - Scheduler { sender }, + Scheduler { state }, )) } @@ -111,34 +297,61 @@ impl EventSource for Executor { where F: FnMut(T, &mut ()), { - // fetch all newly inserted futures and push them to the container - let futures = &mut self.futures; - self.new_futures - .process_events(readiness, token, |evt, _| { - if let Event::Msg(fut) = evt { - futures.push(fut); - } - }) - .map_err(ExecutorError::NewFutureError)?; + let state = &self.state; + + // Set to the unnotified state. + state.sender.notified.store(false, Ordering::SeqCst); + + let clear_readiness = { + let mut clear_readiness = false; + + // Process runnables, but not too many at a time; better to move onto the next event quickly! + for _ in 0..1024 { + let runnable = match state.incoming.try_recv() { + Ok(runnable) => runnable, + Err(_) => { + // Make sure to clear the readiness if there are no more runnables. + clear_readiness = true; + break; + } + }; + + // Run the runnable. + let index = *runnable.metadata(); + runnable.run(); - // process ping events to make it non-ready again - self.ready_futures - .process_events(readiness, token, |(), _| {}) - .map_err(ExecutorError::WakeError)?; + // If the runnable finished with a result, call the callback. + let mut active_guard = state.active_tasks.borrow_mut(); + let active_tasks = active_guard.as_mut().unwrap(); - // advance all available futures as much as possible - let waker = waker_ref(&self.waker); - let mut cx = Context::from_waker(&waker); + if let Some(state) = active_tasks.get(index) { + if state.is_finished() { + // Take out the state and provide it to the caller. + let result = match active_tasks.remove(index) { + Active::Finished(result) => result, + _ => unreachable!(), + }; - while let FutPoll::Ready(Some(ret)) = Pin::new(&mut self.futures).poll_next(&mut cx) { - callback(ret, &mut ()); + callback(result, &mut ()); + } + } + } + + clear_readiness + }; + + // Clear the readiness of the ping source if there are no more runnables. + if clear_readiness { + self.ping + .process_events(readiness, token, |(), &mut ()| {}) + .map_err(ExecutorError::WakeError)?; } + Ok(PostAction::Continue) } fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> { - self.new_futures.register(poll, token_factory)?; - self.ready_futures.register(poll, token_factory)?; + self.ping.register(poll, token_factory)?; Ok(()) } @@ -147,14 +360,12 @@ impl EventSource for Executor { poll: &mut Poll, token_factory: &mut TokenFactory, ) -> crate::Result<()> { - self.new_futures.reregister(poll, token_factory)?; - self.ready_futures.reregister(poll, token_factory)?; + self.ping.reregister(poll, token_factory)?; Ok(()) } fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> { - self.new_futures.unregister(poll)?; - self.ready_futures.unregister(poll)?; + self.ping.unregister(poll)?; Ok(()) } }