Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Waker optimization + O(woken) polling for every combinator except chain #115

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ repository = "https://github.com/yoshuawuyts/futures-concurrency"
documentation = "https://docs.rs/futures-concurrency"
description = "Structured concurrency operations for async Rust"
readme = "README.md"
edition = "2018"
edition = "2021"
keywords = []
categories = []
authors = [
Expand Down Expand Up @@ -35,3 +35,4 @@ futures-lite = "1.12.0"
criterion = { version = "0.3", features = ["async", "async_futures", "html_reports"] }
async-std = { version = "1.12.0", features = ["attributes"] }
futures-time = "3.0.0"
rand = "0.8.5"
2 changes: 1 addition & 1 deletion benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,6 @@ mod race {
async fn tuple_race() {
let futures = futures_tuple();
let output = futures.race().await;
assert_eq!(output, ());
assert_eq!(output.any(), ());
}
}
149 changes: 87 additions & 62 deletions benches/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,36 @@ use futures_core::Stream;
use futures_lite::prelude::*;
use pin_project::pin_project;

use std::cell::RefCell;
use std::collections::VecDeque;
use std::cell::{Cell, RefCell};
use std::collections::{BinaryHeap, VecDeque};
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll, Waker};

fn shuffle<T>(slice: &mut [T]) {
use rand::seq::SliceRandom;
use rand::SeedableRng;
let mut rng = rand::rngs::StdRng::seed_from_u64(42);
slice.shuffle(&mut rng);
}

pub fn futures_vec(len: usize) -> Vec<CountdownFuture> {
let wakers = Rc::new(RefCell::new(VecDeque::new()));
let completed = Rc::new(RefCell::new(0));
let futures: Vec<_> = (0..len)
let wakers = Rc::new(RefCell::new(BinaryHeap::new()));
let completed = Rc::new(Cell::new(0));
let mut futures: Vec<_> = (0..len)
.map(|n| CountdownFuture::new(n, len, wakers.clone(), completed.clone()))
.collect();
shuffle(&mut futures);
futures
}

pub fn futures_array<const N: usize>() -> [CountdownFuture; N] {
let wakers = Rc::new(RefCell::new(VecDeque::new()));
let completed = Rc::new(RefCell::new(0));
std::array::from_fn(|n| CountdownFuture::new(n, N, wakers.clone(), completed.clone()))
let wakers = Rc::new(RefCell::new(BinaryHeap::new()));
let completed = Rc::new(Cell::new(0));
let mut futures =
std::array::from_fn(|n| CountdownFuture::new(n, N, wakers.clone(), completed.clone()));
shuffle(&mut futures);
futures
}

pub fn futures_tuple() -> (
Expand All @@ -37,36 +48,27 @@ pub fn futures_tuple() -> (
CountdownFuture,
CountdownFuture,
) {
let len = 10;
let wakers = Rc::new(RefCell::new(VecDeque::new()));
let completed = Rc::new(RefCell::new(0));
(
CountdownFuture::new(0, len, wakers.clone(), completed.clone()),
CountdownFuture::new(1, len, wakers.clone(), completed.clone()),
CountdownFuture::new(2, len, wakers.clone(), completed.clone()),
CountdownFuture::new(3, len, wakers.clone(), completed.clone()),
CountdownFuture::new(4, len, wakers.clone(), completed.clone()),
CountdownFuture::new(5, len, wakers.clone(), completed.clone()),
CountdownFuture::new(6, len, wakers.clone(), completed.clone()),
CountdownFuture::new(7, len, wakers.clone(), completed.clone()),
CountdownFuture::new(8, len, wakers.clone(), completed.clone()),
CountdownFuture::new(9, len, wakers, completed),
)
let [f0, f1, f2, f3, f4, f5, f6, f7, f8, f9] = futures_array::<10>();
(f0, f1, f2, f3, f4, f5, f6, f7, f8, f9)
}

pub fn streams_vec(len: usize) -> Vec<CountdownStream> {
let wakers = Rc::new(RefCell::new(VecDeque::new()));
let completed = Rc::new(RefCell::new(0));
let streams: Vec<_> = (0..len)
let wakers = Rc::new(RefCell::new(BinaryHeap::new()));
let completed = Rc::new(Cell::new(0));
let mut streams: Vec<_> = (0..len)
.map(|n| CountdownStream::new(n, len, wakers.clone(), completed.clone()))
.collect();
shuffle(&mut streams);
streams
}

pub fn streams_array<const N: usize>() -> [CountdownStream; N] {
let wakers = Rc::new(RefCell::new(VecDeque::new()));
let completed = Rc::new(RefCell::new(0));
std::array::from_fn(|n| CountdownStream::new(n, N, wakers.clone(), completed.clone()))
let wakers = Rc::new(RefCell::new(BinaryHeap::new()));
let completed = Rc::new(Cell::new(0));
let mut streams =
std::array::from_fn(|n| CountdownStream::new(n, N, wakers.clone(), completed.clone()));
shuffle(&mut streams);
streams
}

pub fn streams_tuple() -> (
Expand All @@ -81,21 +83,28 @@ pub fn streams_tuple() -> (
CountdownStream,
CountdownStream,
) {
let len = 10;
let wakers = Rc::new(RefCell::new(VecDeque::new()));
let completed = Rc::new(RefCell::new(0));
(
CountdownStream::new(0, len, wakers.clone(), completed.clone()),
CountdownStream::new(1, len, wakers.clone(), completed.clone()),
CountdownStream::new(2, len, wakers.clone(), completed.clone()),
CountdownStream::new(3, len, wakers.clone(), completed.clone()),
CountdownStream::new(4, len, wakers.clone(), completed.clone()),
CountdownStream::new(5, len, wakers.clone(), completed.clone()),
CountdownStream::new(6, len, wakers.clone(), completed.clone()),
CountdownStream::new(7, len, wakers.clone(), completed.clone()),
CountdownStream::new(8, len, wakers.clone(), completed.clone()),
CountdownStream::new(9, len, wakers, completed),
)
let [f0, f1, f2, f3, f4, f5, f6, f7, f8, f9] = streams_array::<10>();
(f0, f1, f2, f3, f4, f5, f6, f7, f8, f9)
}

pub struct PrioritizedWaker(usize, Waker);
impl PartialEq for PrioritizedWaker {
fn eq(&self, other: &Self) -> bool {
self.0 == other.0
}
}
impl Eq for PrioritizedWaker {
fn assert_receiver_is_total_eq(&self) {}
}
impl PartialOrd for PrioritizedWaker {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for PrioritizedWaker {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.0.cmp(&other.0).reverse()
}
}

#[derive(Clone, Copy)]
Expand All @@ -109,18 +118,18 @@ enum State {
#[pin_project]
pub struct CountdownStream {
state: State,
wakers: Rc<RefCell<VecDeque<Waker>>>,
wakers: Rc<RefCell<BinaryHeap<PrioritizedWaker>>>,
index: usize,
max_count: usize,
completed_count: Rc<RefCell<usize>>,
completed_count: Rc<Cell<usize>>,
}

impl CountdownStream {
pub fn new(
index: usize,
max_count: usize,
wakers: Rc<RefCell<VecDeque<Waker>>>,
completed_count: Rc<RefCell<usize>>,
wakers: Rc<RefCell<BinaryHeap<PrioritizedWaker>>>,
completed_count: Rc<Cell<usize>>,
) -> Self {
Self {
state: State::Init,
Expand All @@ -145,21 +154,29 @@ impl Stream for CountdownStream {
match this.state {
State::Init => {
// Push our waker onto the stack so we get woken again someday.
this.wakers.borrow_mut().push_back(cx.waker().clone());
this.wakers
.borrow_mut()
.push(PrioritizedWaker(*this.index, cx.waker().clone()));
*this.state = State::Polled;
Poll::Pending
}
State::Polled => {
// Wake up the next one
let _ = this.wakers.borrow_mut().pop_front().map(Waker::wake);
let _ = this
.wakers
.borrow_mut()
.pop()
.map(|PrioritizedWaker(_, waker)| waker.wake());

if *this.completed_count.borrow() == *this.index {
if this.completed_count.get() == *this.index {
*this.state = State::Done;
*this.completed_count.borrow_mut() += 1;
this.completed_count.set(this.completed_count.get() + 1);
Poll::Ready(Some(()))
} else {
// We're not done yet, so schedule another wakeup
this.wakers.borrow_mut().push_back(cx.waker().clone());
this.wakers
.borrow_mut()
.push(PrioritizedWaker(*this.index, cx.waker().clone()));
Poll::Pending
}
}
Expand All @@ -172,18 +189,18 @@ impl Stream for CountdownStream {
#[pin_project]
pub struct CountdownFuture {
state: State,
wakers: Rc<RefCell<VecDeque<Waker>>>,
wakers: Rc<RefCell<BinaryHeap<PrioritizedWaker>>>,
index: usize,
max_count: usize,
completed_count: Rc<RefCell<usize>>,
completed_count: Rc<Cell<usize>>,
}

impl CountdownFuture {
pub fn new(
index: usize,
max_count: usize,
wakers: Rc<RefCell<VecDeque<Waker>>>,
completed_count: Rc<RefCell<usize>>,
wakers: Rc<RefCell<BinaryHeap<PrioritizedWaker>>>,
completed_count: Rc<Cell<usize>>,
) -> Self {
Self {
state: State::Init,
Expand All @@ -208,21 +225,29 @@ impl Future for CountdownFuture {
match this.state {
State::Init => {
// Push our waker onto the stack so we get woken again someday.
this.wakers.borrow_mut().push_back(cx.waker().clone());
this.wakers
.borrow_mut()
.push(PrioritizedWaker(*this.index, cx.waker().clone()));
*this.state = State::Polled;
Poll::Pending
}
State::Polled => {
// Wake up the next one
let _ = this.wakers.borrow_mut().pop_front().map(Waker::wake);
let _ = this
.wakers
.borrow_mut()
.pop()
.map(|PrioritizedWaker(_, waker)| waker.wake());

if *this.completed_count.borrow() == *this.index {
if this.completed_count.get() == *this.index {
*this.state = State::Done;
*this.completed_count.borrow_mut() += 1;
this.completed_count.set(this.completed_count.get() + 1);
Poll::Ready(())
} else {
// We're not done yet, so schedule another wakeup
this.wakers.borrow_mut().push_back(cx.waker().clone());
this.wakers
.borrow_mut()
.push(PrioritizedWaker(*this.index, cx.waker().clone()));
Poll::Pending
}
}
Expand Down
11 changes: 6 additions & 5 deletions examples/happy_eyeballs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ use futures_time::prelude::*;
use async_std::io;
use async_std::net::TcpStream;
use futures::channel::oneshot;
use futures_concurrency::vec::AggregateError;
use futures_time::time::Duration;
use std::error;
use std::error::Error;

#[async_std::main]
async fn main() -> Result<(), Box<dyn error::Error + Send + Sync + 'static>> {
async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
// Connect to a socket
let mut socket = open_tcp_socket("rust-lang.org", 80, 3).await?;
let mut socket = open_tcp_socket("rust-lang.org", 80, 3)
.await
.map_err(|e| format!("{e:?}"))?;

// Make an HTTP GET request.
socket.write_all(b"GET / \r\n").await?;
Expand All @@ -27,7 +28,7 @@ async fn open_tcp_socket(
addr: &str,
port: u16,
attempts: u64,
) -> Result<TcpStream, AggregateError<io::Error>> {
) -> Result<TcpStream, Vec<io::Error>> {
let (mut sender, mut receiver) = oneshot::channel();
let mut futures = Vec::with_capacity(attempts as usize);

Expand Down
Loading