Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add _timeout variants to blocking executions #2912

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion futures-executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ extern crate std;
#[cfg(feature = "std")]
mod local_pool;
#[cfg(feature = "std")]
pub use crate::local_pool::{block_on, block_on_stream, BlockingStream, LocalPool, LocalSpawner};
pub use crate::local_pool::{
block_on, block_on_stream, block_on_timeout, BlockingStream, LocalPool, LocalSpawner,
TimeoutError,
};

#[cfg(feature = "thread-pool")]
#[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))]
Expand Down
163 changes: 140 additions & 23 deletions futures-executor/src/local_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@ use crate::enter;
use futures_core::future::Future;
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
use futures_task::{waker_ref, ArcWake};
use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};
use futures_task::{waker_ref, ArcWake, FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};
use futures_util::pin_mut;
use futures_util::stream::FuturesUnordered;
use futures_util::stream::StreamExt;
use futures_util::stream::{FuturesUnordered, StreamExt};
use std::cell::RefCell;
use std::error::Error;
use std::fmt;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::rc::{Rc, Weak};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::{self, Thread};
use std::time::{Duration, Instant};
use std::vec::Vec;

/// A single-threaded task pool for polling futures to completion.
Expand Down Expand Up @@ -75,9 +75,29 @@ impl ArcWake for ThreadNotify {
}
}

/// An error returned when a blocking execution times out.
#[non_exhaustive]
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct TimeoutError;

impl fmt::Display for TimeoutError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "blocking execution has timed out")
}
}

impl Error for TimeoutError {}

// Set up and run a basic single-threaded spawner loop, invoking `f` on each
// turn.
fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
fn run_executor_impl<
T,
F: FnMut(&mut Context<'_>) -> Poll<T>,
P: FnMut() -> Result<(), TimeoutError>,
>(
mut f: F,
mut park: P,
) -> Result<T, TimeoutError> {
let _enter = enter().expect(
"cannot execute `LocalPool` executor from within \
another executor",
Expand All @@ -88,20 +108,45 @@ fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
let mut cx = Context::from_waker(&waker);
loop {
if let Poll::Ready(t) = f(&mut cx) {
return t;
return Ok(t);
}

// Wait for a wakeup.
while !thread_notify.unparked.swap(false, Ordering::Acquire) {
// No wakeup occurred. It may occur now, right before parking,
// but in that case the token made available by `unpark()`
// is guaranteed to still be available and `park()` is a no-op.
thread::park();
park()?;
}
}
})
}

fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(f: F) -> T {
let park = || {
thread::park();
Ok(())
};
run_executor_impl(f, park).unwrap()
}

fn run_executor_timeout<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(
f: F,
mut timeout: Duration,
) -> Result<T, TimeoutError> {
let start = Instant::now();
let park = || {
thread::park_timeout(timeout);
let elapsed = start.elapsed();
if elapsed >= timeout {
return Err(TimeoutError);
}
timeout -= elapsed;
Ok(())
};
run_executor_impl(f, park)
}

/// Check for a wakeup, but don't consume it.
fn woken() -> bool {
CURRENT_THREAD_NOTIFY.with(|thread_notify| thread_notify.unparked.load(Ordering::Acquire))
Expand Down Expand Up @@ -137,6 +182,32 @@ impl LocalPool {
run_executor(|cx| self.poll_pool(cx))
}

/// Run all tasks in the pool to completion, or return an error if the execution
/// times out.
/// ```
/// use std::time::Duration;
/// use futures::executor::LocalPool;
///
/// let mut pool = LocalPool::new();
///
/// // ... spawn some initial tasks using `spawn.spawn()` or `spawn.spawn_local()`
///
/// // run *all* tasks in the pool to completion, including any newly-spawned ones.
/// if let Err(err) = pool.run_timeout(Duration::from_secs(10)) {
/// println!("{}", err);
/// }
/// ```
///
/// The function will block the calling thread until *all* tasks in the pool
/// are complete, including any spawned while running existing tasks, or until
/// the given timeout is reached.
/// In this case, there may still be incomplete tasks in the pool, which will
/// be inert after the call completes, but can continue with further use of
/// one of the pool's run or poll methods.
pub fn run_timeout(&mut self, timeout: Duration) -> Result<(), TimeoutError> {
run_executor_timeout(|cx| self.poll_pool(cx), timeout)
}

/// Runs all the tasks in the pool until the given future completes.
///
/// ```
Expand All @@ -156,19 +227,39 @@ impl LocalPool {
/// however, all tasks in the pool will try to make progress.
pub fn run_until<F: Future>(&mut self, future: F) -> F::Output {
pin_mut!(future);
run_executor(|cx| self.poll_pool_until(cx, &mut future))
}

run_executor(|cx| {
{
// if our main task is done, so are we
let result = future.as_mut().poll(cx);
if let Poll::Ready(output) = result {
return Poll::Ready(output);
}
}

let _ = self.poll_pool(cx);
Poll::Pending
})
/// Runs all the tasks in the pool until the given future completes, or return
/// an error if the execution times out.
///
/// ```
/// use std::time::Duration;
/// use futures::executor::LocalPool;
///
/// let mut pool = LocalPool::new();
/// # let my_app = async {};
///
/// // run tasks in the pool until `my_app` completes or times out
/// match pool.run_until_timeout(my_app, Duration::from_secs(10)) {
/// Ok(res) => { /*...*/ }
/// Err(err) => println!("{}", err)
/// }
/// ```
///
/// The function will block the calling thread *only* until the future `f`
/// completes, or until the given timeout is reached; there may still be
/// incomplete tasks in the pool, which will be inert after the call
/// completes, but can continue with further use of one of the pool's run
/// or poll methods. While the function is running, however, all tasks in
/// the pool will try to make progress.
pub fn run_until_timeout<F: Future>(
&mut self,
future: F,
timeout: Duration,
) -> Result<F::Output, TimeoutError> {
pin_mut!(future);
run_executor_timeout(|cx| self.poll_pool_until(cx, &mut future), timeout)
}

/// Runs all tasks and returns after completing one future or until no more progress
Expand Down Expand Up @@ -291,6 +382,20 @@ impl LocalPool {
}
}

/// Try to poll the given future, and poll the pool in case it's pending.
fn poll_pool_until<F: Future>(
&mut self,
cx: &mut Context<'_>,
future: &mut Pin<&mut F>,
) -> Poll<F::Output> {
// if our main task is done, so are we
if let Poll::Ready(output) = future.as_mut().poll(cx) {
return Poll::Ready(output);
}
let _ = self.poll_pool(cx);
Poll::Pending
}

/// Empty the incoming queue of newly-spawned tasks.
fn drain_incoming(&mut self) {
let mut incoming = self.incoming.borrow_mut();
Expand All @@ -316,6 +421,18 @@ pub fn block_on<F: Future>(f: F) -> F::Output {
run_executor(|cx| f.as_mut().poll(cx))
}

/// Run a future to completion on the current thread, or return an error
/// if the execution times out.
///
/// This function will block the caller until the given future has completed,
/// or until the given timeout is reached.
///
/// Use a [`LocalPool`] if you need finer-grained control over spawned tasks.
pub fn block_on_timeout<F: Future>(f: F, timeout: Duration) -> Result<F::Output, TimeoutError> {
pin_mut!(f);
run_executor_timeout(|cx| f.as_mut().poll(cx), timeout)
}

/// Turn a stream into a blocking iterator.
///
/// When `next` is called on the resulting `BlockingStream`, the caller
Expand Down
13 changes: 13 additions & 0 deletions futures-executor/tests/local_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,19 @@ fn run_until_executes_spawned() {
pool.run_until(rx).unwrap();
}

#[test]
fn run_until_timeout() {
let mut pool = LocalPool::new();
assert!(pool.run_until_timeout(pending(), Duration::from_millis(1)).is_err())
}

#[test]
fn run_timeout() {
let mut pool = LocalPool::new();
pool.spawner().spawn_local(pending()).unwrap();
assert!(pool.run_timeout(Duration::from_millis(1)).is_err())
}

#[test]
fn run_returns_if_empty() {
let mut pool = LocalPool::new();
Expand Down
4 changes: 2 additions & 2 deletions futures/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ pub mod executor {
//! [`spawn_local_obj`]: https://docs.rs/futures/0.3/futures/task/trait.LocalSpawn.html#tymethod.spawn_local_obj

pub use futures_executor::{
block_on, block_on_stream, enter, BlockingStream, Enter, EnterError, LocalPool,
LocalSpawner,
block_on, block_on_stream, block_on_timeout, enter, BlockingStream, Enter, EnterError,
LocalPool, LocalSpawner, TimeoutError,
};

#[cfg(feature = "thread-pool")]
Expand Down
Loading