Skip to content

Commit

Permalink
fmt all
Browse files Browse the repository at this point in the history
  • Loading branch information
notgull committed Apr 9, 2023
1 parent af6b73f commit eac70a5
Showing 1 changed file with 37 additions and 34 deletions.
71 changes: 37 additions & 34 deletions src/sources/futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ use slab::Slab;
use std::{cell::RefCell, future::Future, rc::Rc, task::Waker};

use crate::{
channel::Event,
sources::{
channel::{ChannelError, channel, Channel, Sender},
channel::{channel, Channel, ChannelError, Sender},
ping::PingError,
EventSource,
},
Poll, PostAction, Readiness, Token, TokenFactory, channel::Event,
Poll, PostAction, Readiness, Token, TokenFactory,
};

/// A future executor as an event source
Expand All @@ -48,7 +49,7 @@ pub struct Scheduler<T> {
state: Rc<State<T>>,

/// The sender used to send runnables to the executor.
sender: Sender<Runnable<usize>>,
sender: Sender<Runnable<usize>>,
}

/// The inner state of the executor.
Expand Down Expand Up @@ -150,7 +151,7 @@ impl<T> Scheduler<T> {
let builder = Builder::new().metadata(index).propagate_panic(true);

// SAFETY: spawn_unchecked has four safety requirements:
//
//
// - "If future is not Send, its Runnable must be used and dropped on the original thread."
//
// The runnable is created on the origin thread and sent to the origin thread, since both
Expand All @@ -162,18 +163,18 @@ impl<T> Scheduler<T> {
// to the origin thread.
//
// - "If future is not 'static, borrowed variables must outlive its Runnable."
//
//
// `F` is `'static`, so we don't have to worry about this one.
//
//
// - "If schedule is not Send and Sync, the task’s Waker must be used and dropped on the
// original thread."
//
//
// The schedule function uses a thread-safe MPSC channel to send the runnable back to the
// origin thread. This means that the waker can be sent to another thread, which satisfies
// this requirement.
//
//
// - "If schedule is not 'static, borrowed variables must outlive the task’s Waker."
//
//
// `schedule` and the types it handles are `'static`, so we don't have to worry about
// this one.
unsafe { builder.spawn_unchecked(move |_| future, schedule) }
Expand Down Expand Up @@ -230,7 +231,7 @@ pub fn executor<T>() -> crate::Result<(Executor<T>, Scheduler<T>)> {
Ok((
Executor {
state: state.clone(),
incoming: channel
incoming: channel,
},
Scheduler { state, sender },
))
Expand All @@ -252,31 +253,33 @@ impl<T> EventSource for Executor<T> {
F: FnMut(T, &mut ()),
{
let state = &self.state;
self.incoming.process_events(readiness, token, |event, &mut ()| {
let runnable = match event {
Event::Msg(runnable) => runnable,
_ => return,
};

let index = *runnable.metadata();
runnable.run();

// If the runnable finished with a result, call the callback.
let mut active_guard = state.active.borrow_mut();
let active = active_guard.as_mut().unwrap();

if let Some(state) = active.get(index) {
if state.is_finished() {
// Take out the state and provide it.
let result = match active.remove(index) {
Active::Finished(result) => result,
_ => unreachable!(),
};

callback(result, &mut ());
self.incoming
.process_events(readiness, token, |event, &mut ()| {
let runnable = match event {
Event::Msg(runnable) => runnable,
_ => return,
};

let index = *runnable.metadata();
runnable.run();

// If the runnable finished with a result, call the callback.
let mut active_guard = state.active.borrow_mut();
let active = active_guard.as_mut().unwrap();

if let Some(state) = active.get(index) {
if state.is_finished() {
// Take out the state and provide it.
let result = match active.remove(index) {
Active::Finished(result) => result,
_ => unreachable!(),
};

callback(result, &mut ());
}
}
}
}).map_err(ExecutorError::NewFutureError)
})
.map_err(ExecutorError::NewFutureError)
}

fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {
Expand Down

0 comments on commit eac70a5

Please sign in to comment.