Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Share Arc between wakers #171

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 38 additions & 11 deletions src/utils/wakers/array/waker_array.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,31 @@
use alloc::sync::Arc;
use alloc::sync::{Arc, Weak};
use core::array;
use core::task::Waker;
use std::sync::{Mutex, MutexGuard};

use super::{InlineWakerArray, ReadinessArray};
use crate::utils::wakers::shared_arc::{waker_from_redirec_position, SharedArcContent};

use super::ReadinessArray;

/// A collection of wakers which delegate to an in-line waker.
pub(crate) struct WakerArray<const N: usize> {
wakers: [Waker; N],
readiness: Arc<Mutex<ReadinessArray<N>>>,
inner: Arc<WakerArrayInner<N>>,
}

impl<const N: usize> WakerArray<N> {
/// Create a new instance of `WakerArray`.
pub(crate) fn new() -> Self {
let readiness = Arc::new(Mutex::new(ReadinessArray::new()));
Self {
wakers: array::from_fn(|i| {
Arc::new(InlineWakerArray::new(i, readiness.clone())).into()
}),
readiness,
}
let inner = Arc::new_cyclic(|w| {
let raw = Weak::as_ptr(w);
WakerArrayInner {
redirect: [raw; N],
readiness: Mutex::new(ReadinessArray::new()),
}
});
let wakers =
array::from_fn(|i| unsafe { waker_from_redirec_position(Arc::clone(&inner), i) });
Self { wakers, inner }
}

pub(crate) fn get(&self, index: usize) -> Option<&Waker> {
Expand All @@ -29,6 +34,28 @@ impl<const N: usize> WakerArray<N> {

/// Access the `Readiness`.
pub(crate) fn readiness(&mut self) -> MutexGuard<'_, ReadinessArray<N>> {
self.readiness.as_ref().lock().unwrap()
self.inner.readiness.lock().unwrap() // TODO: unwrap
}
}

struct WakerArrayInner<const N: usize> {
redirect: [*const Self; N],
readiness: Mutex<ReadinessArray<N>>,
}

unsafe impl<const N: usize> SharedArcContent for WakerArrayInner<N> {
fn get_redirect_slice(&self) -> &[*const Self] {
&self.redirect
}

fn wake_index(&self, index: usize) {
let mut readiness = self.readiness.lock().unwrap(); // TODO: unwrap
if !readiness.set_ready(index) {
readiness
.parent_waker()
.as_ref()
.expect("`parent_waker` not available from `Readiness`. Did you forget to call `Readiness::set_waker`?")
.wake_by_ref()
}
}
}
2 changes: 2 additions & 0 deletions src/utils/wakers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ mod array;
mod dummy;
#[cfg(feature = "alloc")]
mod vec;
// #[cfg(feature = "alloc")]
mod shared_arc;

#[cfg(all(test, feature = "alloc"))]
pub(crate) use dummy::DummyWaker;
Expand Down
58 changes: 58 additions & 0 deletions src/utils/wakers/shared_arc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use core::task::{RawWaker, RawWakerVTable, Waker};

use alloc::sync::Arc;

pub(super) unsafe trait SharedArcContent {
/// Get the reference of the redirect slice
fn get_redirect_slice(&self) -> &[*const Self];
/// Called when the subfuture at the specified index should be polled
/// Should call `Readiness::set_ready`
fn wake_index(&self, index: usize);
}

pub(super) unsafe fn waker_from_redirec_position<A: SharedArcContent>(
arc: Arc<A>,
index: usize,
) -> Waker {
unsafe fn clone_waker<A: SharedArcContent>(pointer: *const ()) -> RawWaker {
let pointer = pointer as *const *const A;
unsafe { Arc::increment_strong_count(*pointer) };
RawWaker::new(pointer as *const (), create_vtable::<A>())
}

unsafe fn wake_by_ref<A: SharedArcContent>(pointer: *const ()) {
let pointer = pointer as *const *const A;
let raw: *const A = unsafe { *pointer };
let arc_content: &A = unsafe { &*raw };
let slice = arc_content.get_redirect_slice().as_ptr();
let index = unsafe { pointer.offset_from(slice) } as usize;
arc_content.wake_index(index);
}

unsafe fn drop_waker<A: SharedArcContent>(pointer: *const ()) {
let pointer = pointer as *const *const A;
unsafe { Arc::decrement_strong_count(*pointer) };
}

unsafe fn wake<A: SharedArcContent>(pointer: *const ()) {
unsafe {
wake_by_ref::<A>(pointer);
drop_waker::<A>(pointer);
}
}

fn create_vtable<A: SharedArcContent>() -> &'static RawWakerVTable {
&RawWakerVTable::new(
clone_waker::<A>,
wake::<A>,
wake_by_ref::<A>,
drop_waker::<A>,
)
}

let redirect = arc.get_redirect_slice();
let pointer = unsafe { redirect.as_ptr().add(index) } as *const ();
core::mem::forget(arc);

unsafe { Waker::from_raw(RawWaker::new(pointer, create_vtable::<A>())) }
}
63 changes: 54 additions & 9 deletions src/utils/wakers/vec/waker_vec.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,32 @@
#[cfg(all(feature = "alloc", not(feature = "std")))]
use alloc::vec::Vec;

use alloc::sync::Arc;
use alloc::sync::{Arc, Weak};
use core::task::Waker;
use std::sync::{Mutex, MutexGuard};

use super::{InlineWakerVec, ReadinessVec};
use crate::utils::wakers::shared_arc::{waker_from_redirec_position, SharedArcContent};

use super::ReadinessVec;

/// A collection of wakers which delegate to an in-line waker.
pub(crate) struct WakerVec {
wakers: Vec<Waker>,
readiness: Arc<Mutex<ReadinessVec>>,
inner: Arc<WakerVecInner>,
}

struct WakerVecInner {
redirect: Mutex<Vec<WakerVecInnerPtr>>,
readiness: Mutex<ReadinessVec>,
}

#[derive(Clone, Copy)]
#[repr(transparent)]
struct WakerVecInnerPtr(*const WakerVecInner);

unsafe impl Send for WakerVecInnerPtr {}
unsafe impl Sync for WakerVecInnerPtr {}

impl Default for WakerVec {
fn default() -> Self {
Self::new(0)
Expand All @@ -22,11 +36,17 @@ impl Default for WakerVec {
impl WakerVec {
/// Create a new instance of `WakerVec`.
pub(crate) fn new(len: usize) -> Self {
let readiness = Arc::new(Mutex::new(ReadinessVec::new(len)));
let inner = Arc::new_cyclic(|weak| {
let raw = Weak::as_ptr(weak);
WakerVecInner {
redirect: Mutex::new(vec![WakerVecInnerPtr(raw); len]),
readiness: Mutex::new(ReadinessVec::new(len)),
}
});
let wakers = (0..len)
.map(|i| Arc::new(InlineWakerVec::new(i, readiness.clone())).into())
.map(|i| unsafe { waker_from_redirec_position(Arc::clone(&inner), i) })
.collect();
Self { wakers, readiness }
Self { wakers, inner }
}

pub(crate) fn get(&self, index: usize) -> Option<&Waker> {
Expand All @@ -35,7 +55,7 @@ impl WakerVec {

/// Access the `Readiness`.
pub(crate) fn readiness(&self) -> MutexGuard<'_, ReadinessVec> {
self.readiness.lock().unwrap()
self.inner.readiness.lock().unwrap()
}

/// Resize the `WakerVec` to the new size.
Expand All @@ -44,13 +64,38 @@ impl WakerVec {
// Which means the first position is the current length, and every position
// beyond that is incremented by 1.
let mut index = self.wakers.len();

let ptr = WakerVecInnerPtr(Arc::as_ptr(&self.inner));
let mut lock = self.inner.redirect.lock().unwrap();
lock.resize_with(len, || ptr);
drop(lock);

self.wakers.resize_with(len, || {
let ret = Arc::new(InlineWakerVec::new(index, self.readiness.clone())).into();
let ret = unsafe { waker_from_redirec_position(Arc::clone(&self.inner), index) };
index += 1;
ret
});

let mut readiness = self.readiness.lock().unwrap();
let mut readiness = self.inner.readiness.lock().unwrap();
readiness.resize(len);
}
}

unsafe impl SharedArcContent for WakerVecInner {
fn get_redirect_slice(&self) -> &[*const Self] {
let slice = self.redirect.lock().unwrap();
let slice = slice.as_slice();
unsafe { core::mem::transmute(slice) }
}

fn wake_index(&self, index: usize) {
let mut readiness = self.readiness.lock().unwrap();
if !readiness.set_ready(index) {
readiness
.parent_waker()
.as_ref()
.expect("msg") // todo message
.wake_by_ref()
}
}
}
Loading