Skip to content

Commit

Permalink
Fix soundness mistakes
Browse files Browse the repository at this point in the history
  • Loading branch information
notgull committed Apr 9, 2023
1 parent 575abec commit af6b73f
Showing 1 changed file with 74 additions and 69 deletions.
143 changes: 74 additions & 69 deletions src/sources/futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@
use async_task::{Builder, Runnable};
use slab::Slab;
use std::{cell::RefCell, collections::VecDeque, future::Future, rc::Rc, task::Waker};
use std::{cell::RefCell, future::Future, rc::Rc, task::Waker};

use crate::{
sources::{
channel::ChannelError,
ping::{make_ping, Ping, PingError, PingSource},
channel::{ChannelError, channel, Channel, Sender},
ping::PingError,
EventSource,
},
Poll, PostAction, Readiness, Token, TokenFactory,
Poll, PostAction, Readiness, Token, TokenFactory, channel::Event,
};

/// A future executor as an event source
Expand All @@ -37,29 +37,27 @@ pub struct Executor<T> {
/// Shared state between the executor and the scheduler.
state: Rc<State<T>>,

/// The ping source to register into the poller.
ping_source: PingSource,
/// The incoming queue of futures to be executed.
incoming: Channel<Runnable<usize>>,
}

/// A scheduler to send futures to an executor
#[derive(Clone, Debug)]
pub struct Scheduler<T> {
/// Shared state between the executor and the scheduler.
state: Rc<State<T>>,

/// The sender used to send runnables to the executor.
sender: Sender<Runnable<usize>>,
}

/// The inner state of the executor.
#[derive(Debug)]
struct State<T> {
/// The incoming queue of futures to be executed.
incoming: RefCell<VecDeque<Runnable<usize>>>,

/// The list of currently active tasks.
///
/// This is set to `None` when the executor is destroyed.
active: RefCell<Option<Slab<Active<T>>>>,

/// The ping to wake up the executor.
ping: Ping,
}

/// An active future or its result.
Expand Down Expand Up @@ -137,21 +135,47 @@ impl<T> Scheduler<T> {

// A schedule function that inserts the runnable into the incoming queue.
let schedule = {
let state = self.state.clone();
let sender = self.sender.clone();
move |runnable| {
let mut incoming = state.incoming.borrow_mut();
incoming.push_back(runnable);

// Wake up the executor.
state.ping.ping();
if sender.send(runnable).is_err() {
// This shouldn't be able to happen since all of the tasks are destroyed before the
// executor is. This indicates a critical soundness bug.
std::process::abort();
}
}
};

// Spawn the future.
let (runnable, task) = {
let builder = Builder::new().metadata(index).propagate_panic(true);

// SAFETY: todo
// SAFETY: spawn_unchecked has four safety requirements:
//
// - "If future is not Send, its Runnable must be used and dropped on the original thread."
//
// The runnable is created on the origin thread and sent to the origin thread, since both
// Scheduler and Executor are !Send and !Sync. The waker may be sent to another thread,
// which means that the scheduler function (and the Runnable it handles) can exist on
// another thread. However, the scheduler function immediately sends it back to the origin
// thread. The channel is always kept open in this case, since all tasks are destroyed
// once the Executor is dropped. This means that the runnable will always be sent back
// to the origin thread.
//
// - "If future is not 'static, borrowed variables must outlive its Runnable."
//
// `F` is `'static`, so we don't have to worry about this one.
//
// - "If schedule is not Send and Sync, the task’s Waker must be used and dropped on the
// original thread."
//
// The schedule function uses a thread-safe MPSC channel to send the runnable back to the
// origin thread. This means that the waker can be sent to another thread, which satisfies
// this requirement.
//
// - "If schedule is not 'static, borrowed variables must outlive the task’s Waker."
//
// `schedule` and the types it handles are `'static`, so we don't have to worry about
// this one.
unsafe { builder.spawn_unchecked(move |_| future, schedule) }
};

Expand Down Expand Up @@ -183,7 +207,7 @@ impl<T> Drop for Executor<T> {
}

// Drain the queue in order to drop all of the runnables.
self.state.incoming.borrow_mut().clear();
while self.incoming.try_recv().is_ok() {}
}
}

Expand All @@ -198,19 +222,17 @@ pub struct ExecutorDestroyed;
/// May fail due to OS errors preventing calloop to setup its internal pipes (if your
/// process has reatched its file descriptor limit for example).
pub fn executor<T>() -> crate::Result<(Executor<T>, Scheduler<T>)> {
let (ping, ping_source) = make_ping()?;
let (sender, channel) = channel();
let state = Rc::new(State {
incoming: RefCell::new(VecDeque::new()),
active: RefCell::new(Some(Slab::new())),
ping,
});

Ok((
Executor {
state: state.clone(),
ping_source,
incoming: channel
},
Scheduler { state },
Scheduler { state, sender },
))
}

Expand All @@ -229,53 +251,36 @@ impl<T> EventSource for Executor<T> {
where
F: FnMut(T, &mut ()),
{
// Process all of the newly inserted futures.
let clear_readiness = {
let mut incoming = self.state.incoming.borrow_mut();
let mut clear_readiness = false;

// Only process a limited number of tasks at a time; better to move onto the next event soon.
for _ in 0..1024 {
if let Some(runnable) = incoming.pop_front() {
let index = *runnable.metadata();
runnable.run();

// If the runnable finished with a result, call the callback.
let mut active_guard = self.state.active.borrow_mut();
let active = active_guard.as_mut().unwrap();

if let Some(state) = active.get(index) {
if state.is_finished() {
// Take out the state and provide it.
let result = match active.remove(index) {
Active::Finished(result) => result,
_ => unreachable!(),
};

callback(result, &mut ());
}
}
} else {
clear_readiness = true;
break;
let state = &self.state;
self.incoming.process_events(readiness, token, |event, &mut ()| {
let runnable = match event {
Event::Msg(runnable) => runnable,
_ => return,
};

let index = *runnable.metadata();
runnable.run();

// If the runnable finished with a result, call the callback.
let mut active_guard = state.active.borrow_mut();
let active = active_guard.as_mut().unwrap();

if let Some(state) = active.get(index) {
if state.is_finished() {
// Take out the state and provide it.
let result = match active.remove(index) {
Active::Finished(result) => result,
_ => unreachable!(),
};

callback(result, &mut ());
}
}

clear_readiness
};

// Clear the readiness of the ping event if we processed all of the incoming tasks.
if clear_readiness {
self.ping_source
.process_events(readiness, token, |(), &mut ()| {})
.map_err(ExecutorError::WakeError)?;
}

Ok(PostAction::Continue)
}).map_err(ExecutorError::NewFutureError)
}

fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {
self.ping_source.register(poll, token_factory)?;
self.incoming.register(poll, token_factory)?;
Ok(())
}

Expand All @@ -284,12 +289,12 @@ impl<T> EventSource for Executor<T> {
poll: &mut Poll,
token_factory: &mut TokenFactory,
) -> crate::Result<()> {
self.ping_source.reregister(poll, token_factory)?;
self.incoming.reregister(poll, token_factory)?;
Ok(())
}

fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
self.ping_source.unregister(poll)?;
self.incoming.unregister(poll)?;
Ok(())
}
}
Expand Down

0 comments on commit af6b73f

Please sign in to comment.