Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a single Arc shared between Wakers in WakerArray and WakerVec #118

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions src/future/join/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,10 @@ where
#[cfg(test)]
mod test {
use super::*;
use crate::utils::DummyWaker;
use crate::utils::dummy_waker;

use std::future;
use std::future::Future;
use std::sync::Arc;
use std::task::Context;

#[test]
Expand All @@ -189,7 +188,7 @@ mod test {
assert_eq!(format!("{:?}", fut), "[Pending, Pending]");
let mut fut = Pin::new(&mut fut);

let waker = Arc::new(DummyWaker()).into();
let waker = dummy_waker();
let mut cx = Context::from_waker(&waker);
let _ = fut.as_mut().poll(&mut cx);
assert_eq!(format!("{:?}", fut), "[Consumed, Consumed]");
Expand Down
5 changes: 2 additions & 3 deletions src/future/join/vec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,10 @@ where
#[cfg(test)]
mod test {
use super::*;
use crate::utils::DummyWaker;
use crate::utils::dummy_waker;

use std::future;
use std::future::Future;
use std::sync::Arc;
use std::task::Context;

#[test]
Expand All @@ -191,7 +190,7 @@ mod test {
assert_eq!(format!("{:?}", fut), "[Pending, Pending]");
let mut fut = Pin::new(&mut fut);

let waker = Arc::new(DummyWaker()).into();
let waker = dummy_waker();
let mut cx = Context::from_waker(&waker);
let _ = fut.as_mut().poll(&mut cx);
assert_eq!(format!("{:?}", fut), "[Consumed, Consumed]");
Expand Down
2 changes: 1 addition & 1 deletion src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub(crate) use tuple::{gen_conditions, tuple_len};
pub(crate) use wakers::{WakerArray, WakerVec};

#[cfg(test)]
pub(crate) use wakers::DummyWaker;
pub(crate) use wakers::dummy_waker;

#[cfg(test)]
pub(crate) mod channel;
2 changes: 0 additions & 2 deletions src/utils/wakers/array/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
mod readiness;
mod waker;
mod waker_array;

pub(crate) use readiness::ReadinessArray;
pub(crate) use waker::InlineWakerArray;
pub(crate) use waker_array::WakerArray;
31 changes: 0 additions & 31 deletions src/utils/wakers/array/waker.rs

This file was deleted.

105 changes: 92 additions & 13 deletions src/utils/wakers/array/waker_array.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,40 @@
use core::array;
use std::sync::Arc;
use std::sync::Mutex;
use std::task::Waker;
use core::task::Waker;
use std::sync::{Arc, Mutex, Weak};

use super::{InlineWakerArray, ReadinessArray};
use super::{
super::shared_arc::{waker_from_redirect_position, SharedArcContent},
ReadinessArray,
};

/// A collection of wakers which delegate to an in-line waker.
pub(crate) struct WakerArray<const N: usize> {
wakers: [Waker; N],
readiness: Arc<Mutex<ReadinessArray<N>>>,
inner: Arc<WakerArrayInner<N>>,
}

/// See [super::super::shared_arc] for how this works.
struct WakerArrayInner<const N: usize> {
redirect: [*const Self; N],
readiness: Mutex<ReadinessArray<N>>,
}

impl<const N: usize> WakerArray<N> {
/// Create a new instance of `WakerArray`.
pub(crate) fn new() -> Self {
let readiness = Arc::new(Mutex::new(ReadinessArray::new()));
Self {
wakers: array::from_fn(|i| {
Arc::new(InlineWakerArray::new(i, readiness.clone())).into()
}),
readiness,
}
let inner = Arc::new_cyclic(|w| {
// `Weak::as_ptr` on a live Weak gives the same thing as `Arc::into_raw`.
let raw = Weak::as_ptr(w);
WakerArrayInner {
readiness: Mutex::new(ReadinessArray::new()),
redirect: [raw; N],
}
});

let wakers =
array::from_fn(|i| unsafe { waker_from_redirect_position(Arc::clone(&inner), i) });

Self { inner, wakers }
}

pub(crate) fn get(&self, index: usize) -> Option<&Waker> {
Expand All @@ -29,6 +43,71 @@ impl<const N: usize> WakerArray<N> {

/// Access the `Readiness`.
pub(crate) fn readiness(&self) -> &Mutex<ReadinessArray<N>> {
self.readiness.as_ref()
&self.inner.readiness
}
}

#[deny(unsafe_op_in_unsafe_fn)]
unsafe impl<const N: usize> SharedArcContent for WakerArrayInner<N> {
fn get_redirect_slice(&self) -> &[*const Self] {
&self.redirect
}

fn wake_index(&self, index: usize) {
let mut readiness = self.readiness.lock().unwrap();
if !readiness.set_ready(index) {
readiness
.parent_waker()
.as_ref()
.expect("`parent_waker` not available from `Readiness`. Did you forget to call `Readiness::set_waker`?")
.wake_by_ref();
}
}
}

#[cfg(test)]
mod tests {
use crate::utils::wakers::dummy_waker;

use super::*;
#[test]
fn check_refcount() {
let mut wa = WakerArray::<5>::new();

// Each waker holds 1 ref, and the combinator itself holds 1.
assert_eq!(Arc::strong_count(&wa.inner), 6);

wa.wakers[4] = dummy_waker();
assert_eq!(Arc::strong_count(&wa.inner), 5);
let cloned = wa.wakers[3].clone();
assert_eq!(Arc::strong_count(&wa.inner), 6);
wa.wakers[3] = wa.wakers[4].clone();
assert_eq!(Arc::strong_count(&wa.inner), 5);
drop(cloned);
assert_eq!(Arc::strong_count(&wa.inner), 4);

wa.wakers[0].wake_by_ref();
wa.wakers[0].wake_by_ref();
wa.wakers[0].wake_by_ref();
assert_eq!(Arc::strong_count(&wa.inner), 4);

wa.wakers[0] = wa.wakers[1].clone();
assert_eq!(Arc::strong_count(&wa.inner), 4);

let taken = std::mem::replace(&mut wa.wakers[2], dummy_waker());
assert_eq!(Arc::strong_count(&wa.inner), 4);
taken.wake_by_ref();
assert_eq!(Arc::strong_count(&wa.inner), 4);
taken.clone().wake();
assert_eq!(Arc::strong_count(&wa.inner), 4);
taken.wake();
assert_eq!(Arc::strong_count(&wa.inner), 3);

wa.wakers = array::from_fn(|_| dummy_waker());
assert_eq!(Arc::strong_count(&wa.inner), 1);

let weak = Arc::downgrade(&wa.inner);
drop(wa);
assert_eq!(weak.strong_count(), 0);
}
}
18 changes: 14 additions & 4 deletions src/utils/wakers/dummy.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
use std::{sync::Arc, task::Wake};
use core::task::{RawWaker, RawWakerVTable, Waker};

pub(crate) struct DummyWaker();
impl Wake for DummyWaker {
fn wake(self: Arc<Self>) {}
/// A Waker that doesn't do anything.
pub(crate) fn dummy_waker() -> Waker {
fn new_raw_waker() -> RawWaker {
unsafe fn no_op(_data: *const ()) {}
unsafe fn clone(_data: *const ()) -> RawWaker {
new_raw_waker()
}
RawWaker::new(
core::ptr::null() as *const usize as *const (),
&RawWakerVTable::new(clone, no_op, no_op, no_op),
)
}
unsafe { Waker::from_raw(new_raw_waker()) }
}
14 changes: 12 additions & 2 deletions src/utils/wakers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
//! Wakers that track when they are woken.
//!
//! By tracking which subfutures have woken, we can avoid having to re-poll N subfutures every time.
//! This tracking is done by a [ReadinessArray]/[ReadinessVec]. These store the indexes of the subfutures that have woken.
//! Each subfuture are given a Waker when polled.
//! This waker must know the index of its corresponding subfuture so that it can update Readiness correctly.
//!

mod array;
mod shared_arc;
mod vec;

#[cfg(test)]
mod dummy;
mod vec;

#[cfg(test)]
pub(crate) use dummy::DummyWaker;
pub(crate) use dummy::dummy_waker;

pub(crate) use array::*;
pub(crate) use vec::*;
Loading