From eac70a5c4d57cad974bc2deb15e42530c681b3fa Mon Sep 17 00:00:00 2001 From: jtnunley Date: Sun, 9 Apr 2023 09:40:52 -0700 Subject: [PATCH] fmt all --- src/sources/futures.rs | 71 ++++++++++++++++++++++-------------------- 1 file changed, 37 insertions(+), 34 deletions(-) diff --git a/src/sources/futures.rs b/src/sources/futures.rs index 7abc2a28..0ce97162 100644 --- a/src/sources/futures.rs +++ b/src/sources/futures.rs @@ -23,12 +23,13 @@ use slab::Slab; use std::{cell::RefCell, future::Future, rc::Rc, task::Waker}; use crate::{ + channel::Event, sources::{ - channel::{ChannelError, channel, Channel, Sender}, + channel::{channel, Channel, ChannelError, Sender}, ping::PingError, EventSource, }, - Poll, PostAction, Readiness, Token, TokenFactory, channel::Event, + Poll, PostAction, Readiness, Token, TokenFactory, }; /// A future executor as an event source @@ -48,7 +49,7 @@ pub struct Scheduler { state: Rc>, /// The sender used to send runnables to the executor. - sender: Sender>, + sender: Sender>, } /// The inner state of the executor. @@ -150,7 +151,7 @@ impl Scheduler { let builder = Builder::new().metadata(index).propagate_panic(true); // SAFETY: spawn_unchecked has four safety requirements: - // + // // - "If future is not Send, its Runnable must be used and dropped on the original thread." // // The runnable is created on the origin thread and sent to the origin thread, since both @@ -162,18 +163,18 @@ impl Scheduler { // to the origin thread. // // - "If future is not 'static, borrowed variables must outlive its Runnable." - // + // // `F` is `'static`, so we don't have to worry about this one. - // + // // - "If schedule is not Send and Sync, the task’s Waker must be used and dropped on the // original thread." - // + // // The schedule function uses a thread-safe MPSC channel to send the runnable back to the // origin thread. This means that the waker can be sent to another thread, which satisfies // this requirement. - // + // // - "If schedule is not 'static, borrowed variables must outlive the task’s Waker." - // + // // `schedule` and the types it handles are `'static`, so we don't have to worry about // this one. unsafe { builder.spawn_unchecked(move |_| future, schedule) } @@ -230,7 +231,7 @@ pub fn executor() -> crate::Result<(Executor, Scheduler)> { Ok(( Executor { state: state.clone(), - incoming: channel + incoming: channel, }, Scheduler { state, sender }, )) @@ -252,31 +253,33 @@ impl EventSource for Executor { F: FnMut(T, &mut ()), { let state = &self.state; - self.incoming.process_events(readiness, token, |event, &mut ()| { - let runnable = match event { - Event::Msg(runnable) => runnable, - _ => return, - }; - - let index = *runnable.metadata(); - runnable.run(); - - // If the runnable finished with a result, call the callback. - let mut active_guard = state.active.borrow_mut(); - let active = active_guard.as_mut().unwrap(); - - if let Some(state) = active.get(index) { - if state.is_finished() { - // Take out the state and provide it. - let result = match active.remove(index) { - Active::Finished(result) => result, - _ => unreachable!(), - }; - - callback(result, &mut ()); + self.incoming + .process_events(readiness, token, |event, &mut ()| { + let runnable = match event { + Event::Msg(runnable) => runnable, + _ => return, + }; + + let index = *runnable.metadata(); + runnable.run(); + + // If the runnable finished with a result, call the callback. + let mut active_guard = state.active.borrow_mut(); + let active = active_guard.as_mut().unwrap(); + + if let Some(state) = active.get(index) { + if state.is_finished() { + // Take out the state and provide it. + let result = match active.remove(index) { + Active::Finished(result) => result, + _ => unreachable!(), + }; + + callback(result, &mut ()); + } } - } - }).map_err(ExecutorError::NewFutureError) + }) + .map_err(ExecutorError::NewFutureError) } fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {