From ca246193abec454ab5c0d1120b5c6da709a2709f Mon Sep 17 00:00:00 2001 From: Wisha Wa Date: Mon, 9 Jan 2023 20:43:53 +0000 Subject: [PATCH 1/5] used a single shared Arc between wakers in WakerArray and WakerVec --- src/utils/wakers/array/mod.rs | 2 - src/utils/wakers/array/waker.rs | 31 --------- src/utils/wakers/array/waker_array.rs | 69 ++++++++++++++++---- src/utils/wakers/mod.rs | 4 +- src/utils/wakers/shared_arc.rs | 92 +++++++++++++++++++++++++++ src/utils/wakers/vec/mod.rs | 2 - src/utils/wakers/vec/waker.rs | 31 --------- src/utils/wakers/vec/waker_vec.rs | 67 +++++++++++++++---- 8 files changed, 206 insertions(+), 92 deletions(-) delete mode 100644 src/utils/wakers/array/waker.rs create mode 100644 src/utils/wakers/shared_arc.rs delete mode 100644 src/utils/wakers/vec/waker.rs diff --git a/src/utils/wakers/array/mod.rs b/src/utils/wakers/array/mod.rs index 7303d9c..20a9392 100644 --- a/src/utils/wakers/array/mod.rs +++ b/src/utils/wakers/array/mod.rs @@ -1,7 +1,5 @@ mod readiness; -mod waker; mod waker_array; pub(crate) use readiness::ReadinessArray; -pub(crate) use waker::InlineWakerArray; pub(crate) use waker_array::WakerArray; diff --git a/src/utils/wakers/array/waker.rs b/src/utils/wakers/array/waker.rs deleted file mode 100644 index 960dff0..0000000 --- a/src/utils/wakers/array/waker.rs +++ /dev/null @@ -1,31 +0,0 @@ -use std::sync::{Arc, Mutex}; -use std::task::Wake; - -use super::ReadinessArray; - -/// An efficient waker which delegates wake events. -#[derive(Debug, Clone)] -pub(crate) struct InlineWakerArray { - pub(crate) id: usize, - pub(crate) readiness: Arc>>, -} - -impl InlineWakerArray { - /// Create a new instance of `InlineWaker`. - pub(crate) fn new(id: usize, readiness: Arc>>) -> Self { - Self { id, readiness } - } -} - -impl Wake for InlineWakerArray { - fn wake(self: std::sync::Arc) { - let mut readiness = self.readiness.lock().unwrap(); - if !readiness.set_ready(self.id) { - readiness - .parent_waker() - .as_mut() - .expect("`parent_waker` not available from `Readiness`. Did you forget to call `Readiness::set_waker`?") - .wake_by_ref() - } - } -} diff --git a/src/utils/wakers/array/waker_array.rs b/src/utils/wakers/array/waker_array.rs index 293155b..345c46b 100644 --- a/src/utils/wakers/array/waker_array.rs +++ b/src/utils/wakers/array/waker_array.rs @@ -1,26 +1,52 @@ use core::array; -use std::sync::Arc; -use std::sync::Mutex; -use std::task::Waker; +use core::task::Waker; +use std::sync::{Arc, Mutex}; -use super::{InlineWakerArray, ReadinessArray}; +use super::{ + super::shared_arc::{waker_for_wake_data_slot, WakeDataContainer}, + ReadinessArray, +}; /// A collection of wakers which delegate to an in-line waker. pub(crate) struct WakerArray { wakers: [Waker; N], - readiness: Arc>>, + inner: Arc>, +} + +/// See [super::super::shared_arc] for how this works. +struct WakerArrayInner { + wake_data: [*const Self; N], + readiness: Mutex>, } impl WakerArray { /// Create a new instance of `WakerArray`. pub(crate) fn new() -> Self { - let readiness = Arc::new(Mutex::new(ReadinessArray::new())); - Self { - wakers: array::from_fn(|i| { - Arc::new(InlineWakerArray::new(i, readiness.clone())).into() - }), - readiness, - } + let mut inner = Arc::new(WakerArrayInner { + readiness: Mutex::new(ReadinessArray::new()), + wake_data: [std::ptr::null(); N], // We don't know the Arc's address yet so put null for now. + }); + let raw = Arc::into_raw(Arc::clone(&inner)); // The Arc's address. + + // At this point the strong count is 2. Decrement it to 1. + // Each time we create/clone a Waker the count will be incremented by 1. + // So N Wakers -> count = N+1. + unsafe { Arc::decrement_strong_count(raw) } + + // Make wake_data all point to the Arc itself. + Arc::get_mut(&mut inner).unwrap().wake_data = [raw; N]; + + // Now the wake_data array is complete. Time to create the actual Wakers. + let wakers = array::from_fn(|i| { + let data = inner.wake_data.get(i).unwrap(); + unsafe { + waker_for_wake_data_slot::>( + data as *const *const WakerArrayInner, + ) + } + }); + + Self { inner, wakers } } pub(crate) fn get(&self, index: usize) -> Option<&Waker> { @@ -29,6 +55,23 @@ impl WakerArray { /// Access the `Readiness`. pub(crate) fn readiness(&self) -> &Mutex> { - self.readiness.as_ref() + &self.inner.readiness + } +} + +impl WakeDataContainer for WakerArrayInner { + fn get_wake_data_slice(&self) -> &[*const Self] { + &self.wake_data + } + + fn wake_index(&self, index: usize) { + let mut readiness = self.readiness.lock().unwrap(); + if !readiness.set_ready(index) { + readiness + .parent_waker() + .as_ref() + .expect("`parent_waker` not available from `Readiness`. Did you forget to call `Readiness::set_waker`?") + .wake_by_ref(); + } } } diff --git a/src/utils/wakers/mod.rs b/src/utils/wakers/mod.rs index d5c7f1d..3eba976 100644 --- a/src/utils/wakers/mod.rs +++ b/src/utils/wakers/mod.rs @@ -1,7 +1,9 @@ mod array; +mod shared_arc; +mod vec; + #[cfg(test)] mod dummy; -mod vec; #[cfg(test)] pub(crate) use dummy::DummyWaker; diff --git a/src/utils/wakers/shared_arc.rs b/src/utils/wakers/shared_arc.rs new file mode 100644 index 0000000..cad8431 --- /dev/null +++ b/src/utils/wakers/shared_arc.rs @@ -0,0 +1,92 @@ +use core::task::{RawWaker, RawWakerVTable, Waker}; +use std::sync::Arc; + +// In the diagram below, `A` is the upper block. +// It is a struct that implements WakeDataContainer (so either WakerVecInner or WakerArrayInner). +// The lower block is either WakerVec or WakerArray. Each waker there points to a slot of wake_data in `A`. +// Every one of these slots contain a pointer to the Arc wrapping `A` itself. +// Wakers figure out their indices by comparing the address they are pointing to to `wake_data`'s start address. +// +// ┌───────────────────────────┬──────────────┬──────────────┐ +// │ │ │ │ +// │ / ┌─────────────┬──────┼───────┬──────┼───────┬──────┼───────┬─────┐ \ +// ▼ / │ │ │ │ │ │ │ │ │ \ +// Arc < │ Readiness │ wake_data[0] │ wake_data[1] │ wake_data[2] │ ... │ > +// ▲ \ │ │ │ │ │ │ / +// │ \ └─────────────┴──────▲───────┴──────▲───────┴──────▲───────┴─────┘ / +// │ │ │ │ +// └─┐ ┌───────────────┘ │ │ +// │ │ │ │ +// │ │ ┌──────────────────┘ │ +// │ │ │ │ +// │ │ │ ┌─────────────────────┘ +// │ │ │ │ +// │ │ │ │ +// ┌────┼────┬────┼──────┬────┼──────┬────┼──────┬─────┐ +// │ │ │ │ │ │ │ │ │ │ +// │ Inner │ wakers[0] │ wakers[1] │ wakers[2] │ ... │ +// │ │ │ │ │ │ +// └─────────┴───────────┴───────────┴───────────┴─────┘ + +// TODO: Right now each waker gets its own wake_data slot. +// We can save space by making size_of::() wakers share the same slot. +// With such change, in 64-bit system, the wake_data array/vec would only need ⌈N/8⌉ slots instead of N. + +pub(super) trait WakeDataContainer { + /// Get the reference of the wake_data slice. This is used to compute the index. + fn get_wake_data_slice(&self) -> &[*const Self]; + /// Called when the subfuture at the specified index should be polled. + fn wake_index(&self, index: usize); +} +pub(super) unsafe fn waker_for_wake_data_slot( + pointer: *const *const A, +) -> Waker { + unsafe fn clone_waker(pointer: *const ()) -> RawWaker { + let pointer = pointer as *const *const A; + let raw = *pointer; // This is the raw pointer of Arc. + + // We're creating a new Waker, so we need to increment the count. + Arc::increment_strong_count(raw); + + RawWaker::new(pointer as *const (), create_vtable::()) + } + + // Convert a pointer to a wake_data slot to the Arc. + unsafe fn to_arc(pointer: *const *const A) -> Arc { + let raw = *pointer; + Arc::from_raw(raw) + } + unsafe fn wake(pointer: *const ()) { + let pointer = pointer as *const *const A; + let arc = to_arc::(pointer); + // Calculate the index + let index = ((pointer as usize) // This is the slot our pointer points to. + - (arc.get_wake_data_slice() as *const [*const A] as *const () as usize)) // This is the starting address of wake_data. + / std::mem::size_of::<*const A>(); + + arc.wake_index(index); + + // Dropping the Arc would decrement the strong count. + // We only want to do that when we're not waking by ref. + if BY_REF { + std::mem::forget(arc); + } else { + std::mem::drop(arc); + } + } + unsafe fn drop_waker(pointer: *const ()) { + let pointer = pointer as *const *const A; + let arc = to_arc::(pointer); + // Decrement the strong count by dropping the Arc. + std::mem::drop(arc); + } + fn create_vtable() -> &'static RawWakerVTable { + &RawWakerVTable::new( + clone_waker::, + wake::, + wake::, + drop_waker::, + ) + } + Waker::from_raw(clone_waker::(pointer as *const ())) +} diff --git a/src/utils/wakers/vec/mod.rs b/src/utils/wakers/vec/mod.rs index e002fd3..7f064fb 100644 --- a/src/utils/wakers/vec/mod.rs +++ b/src/utils/wakers/vec/mod.rs @@ -1,7 +1,5 @@ mod readiness; -mod waker; mod waker_vec; pub(crate) use readiness::ReadinessVec; -pub(crate) use waker::InlineWakerVec; pub(crate) use waker_vec::WakerVec; diff --git a/src/utils/wakers/vec/waker.rs b/src/utils/wakers/vec/waker.rs deleted file mode 100644 index cfd12ad..0000000 --- a/src/utils/wakers/vec/waker.rs +++ /dev/null @@ -1,31 +0,0 @@ -use std::sync::{Arc, Mutex}; -use std::task::Wake; - -use super::ReadinessVec; - -/// An efficient waker which delegates wake events. -#[derive(Debug, Clone)] -pub(crate) struct InlineWakerVec { - pub(crate) id: usize, - pub(crate) readiness: Arc>, -} - -impl InlineWakerVec { - /// Create a new instance of `InlineWaker`. - pub(crate) fn new(id: usize, readiness: Arc>) -> Self { - Self { id, readiness } - } -} - -impl Wake for InlineWakerVec { - fn wake(self: std::sync::Arc) { - let mut readiness = self.readiness.lock().unwrap(); - if !readiness.set_ready(self.id) { - readiness - .parent_waker() - .as_mut() - .expect("`parent_waker` not available from `Readiness`. Did you forget to call `Readiness::set_waker`?") - .wake_by_ref() - } - } -} diff --git a/src/utils/wakers/vec/waker_vec.rs b/src/utils/wakers/vec/waker_vec.rs index e317f0b..2e21f3f 100644 --- a/src/utils/wakers/vec/waker_vec.rs +++ b/src/utils/wakers/vec/waker_vec.rs @@ -1,31 +1,74 @@ -use std::sync::Arc; -use std::sync::Mutex; -use std::task::Waker; +use core::task::Waker; +use std::sync::{Arc, Mutex}; -use super::{InlineWakerVec, ReadinessVec}; +use super::{ + super::shared_arc::{waker_for_wake_data_slot, WakeDataContainer}, + ReadinessVec, +}; -/// A collection of wakers which delegate to an in-line waker. +/// A collection of wakers sharing the same allocation. pub(crate) struct WakerVec { wakers: Vec, - readiness: Arc>, + inner: Arc, +} + +/// See [super::super::shared_arc] for how this works. +struct WakerVecInner { + wake_data: Vec<*const Self>, + readiness: Mutex, } impl WakerVec { /// Create a new instance of `WakerVec`. pub(crate) fn new(len: usize) -> Self { - let readiness = Arc::new(Mutex::new(ReadinessVec::new(len))); - let wakers = (0..len) - .map(|i| Arc::new(InlineWakerVec::new(i, readiness.clone())).into()) + let mut inner = Arc::new(WakerVecInner { + readiness: Mutex::new(ReadinessVec::new(len)), + wake_data: Vec::new(), + }); + let raw = Arc::into_raw(Arc::clone(&inner)); // The Arc's address. + + // At this point the strong count is 2. Decrement it to 1. + // Each time we create/clone a Waker the count will be incremented by 1. + // So N Wakers -> count = N+1. + unsafe { Arc::decrement_strong_count(raw) } + + // Make wake_data all point to the Arc itself. + Arc::get_mut(&mut inner).unwrap().wake_data = vec![raw; len]; + + // Now the wake_data vec is complete. Time to create the actual Wakers. + let wakers = inner + .wake_data + .iter() + .map(|data| unsafe { + waker_for_wake_data_slot::(data as *const *const WakerVecInner) + }) .collect(); - Self { wakers, readiness } + + Self { inner, wakers } } pub(crate) fn get(&self, index: usize) -> Option<&Waker> { self.wakers.get(index) } - /// Access the `Readiness`. pub(crate) fn readiness(&self) -> &Mutex { - self.readiness.as_ref() + &self.inner.readiness + } +} + +impl WakeDataContainer for WakerVecInner { + fn get_wake_data_slice(&self) -> &[*const Self] { + &self.wake_data + } + + fn wake_index(&self, index: usize) { + let mut readiness = self.readiness.lock().unwrap(); + if !readiness.set_ready(index) { + readiness + .parent_waker() + .as_ref() + .expect("`parent_waker` not available from `Readiness`. Did you forget to call `Readiness::set_waker`?") + .wake_by_ref(); + } } } From b396a85536c9e19d71531b4ac5ebf2b39ea73566 Mon Sep 17 00:00:00 2001 From: Wisha Wa Date: Mon, 9 Jan 2023 20:47:56 +0000 Subject: [PATCH 2/5] Use RawWaker to create allocation-free dummy waker --- src/future/join/array.rs | 5 ++--- src/future/join/vec.rs | 5 ++--- src/utils/mod.rs | 2 +- src/utils/wakers/dummy.rs | 18 ++++++++++++++---- src/utils/wakers/mod.rs | 2 +- 5 files changed, 20 insertions(+), 12 deletions(-) diff --git a/src/future/join/array.rs b/src/future/join/array.rs index 9a6ec0c..cdedcc4 100644 --- a/src/future/join/array.rs +++ b/src/future/join/array.rs @@ -168,11 +168,10 @@ where #[cfg(test)] mod test { use super::*; - use crate::utils::DummyWaker; + use crate::utils::dummy_waker; use std::future; use std::future::Future; - use std::sync::Arc; use std::task::Context; #[test] @@ -189,7 +188,7 @@ mod test { assert_eq!(format!("{:?}", fut), "[Pending, Pending]"); let mut fut = Pin::new(&mut fut); - let waker = Arc::new(DummyWaker()).into(); + let waker = dummy_waker(); let mut cx = Context::from_waker(&waker); let _ = fut.as_mut().poll(&mut cx); assert_eq!(format!("{:?}", fut), "[Consumed, Consumed]"); diff --git a/src/future/join/vec.rs b/src/future/join/vec.rs index 1817de6..31d643b 100644 --- a/src/future/join/vec.rs +++ b/src/future/join/vec.rs @@ -170,11 +170,10 @@ where #[cfg(test)] mod test { use super::*; - use crate::utils::DummyWaker; + use crate::utils::dummy_waker; use std::future; use std::future::Future; - use std::sync::Arc; use std::task::Context; #[test] @@ -191,7 +190,7 @@ mod test { assert_eq!(format!("{:?}", fut), "[Pending, Pending]"); let mut fut = Pin::new(&mut fut); - let waker = Arc::new(DummyWaker()).into(); + let waker = dummy_waker(); let mut cx = Context::from_waker(&waker); let _ = fut.as_mut().poll(&mut cx); assert_eq!(format!("{:?}", fut), "[Consumed, Consumed]"); diff --git a/src/utils/mod.rs b/src/utils/mod.rs index ac5d38e..ed9932a 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -16,7 +16,7 @@ pub(crate) use tuple::{gen_conditions, tuple_len}; pub(crate) use wakers::{WakerArray, WakerVec}; #[cfg(test)] -pub(crate) use wakers::DummyWaker; +pub(crate) use wakers::dummy_waker; #[cfg(test)] pub(crate) mod channel; diff --git a/src/utils/wakers/dummy.rs b/src/utils/wakers/dummy.rs index 0f454b0..b60c996 100644 --- a/src/utils/wakers/dummy.rs +++ b/src/utils/wakers/dummy.rs @@ -1,6 +1,16 @@ -use std::{sync::Arc, task::Wake}; +use core::task::{RawWaker, RawWakerVTable, Waker}; -pub(crate) struct DummyWaker(); -impl Wake for DummyWaker { - fn wake(self: Arc) {} +/// A Waker that doesn't do anything. +pub(crate) fn dummy_waker() -> Waker { + fn new_raw_waker() -> RawWaker { + unsafe fn no_op(_data: *const ()) {} + unsafe fn clone(_data: *const ()) -> RawWaker { + new_raw_waker() + } + RawWaker::new( + core::ptr::null() as *const usize as *const (), + &RawWakerVTable::new(clone, no_op, no_op, no_op), + ) + } + unsafe { Waker::from_raw(new_raw_waker()) } } diff --git a/src/utils/wakers/mod.rs b/src/utils/wakers/mod.rs index 3eba976..e4fe1e7 100644 --- a/src/utils/wakers/mod.rs +++ b/src/utils/wakers/mod.rs @@ -6,7 +6,7 @@ mod vec; mod dummy; #[cfg(test)] -pub(crate) use dummy::DummyWaker; +pub(crate) use dummy::dummy_waker; pub(crate) use array::*; pub(crate) use vec::*; From 1d0be87e06f0ef758dc8ea2807519b5c50a583c2 Mon Sep 17 00:00:00 2001 From: Wisha Wa Date: Mon, 9 Jan 2023 20:48:46 +0000 Subject: [PATCH 3/5] add a test for the new Arc-sharing WakerArray --- src/utils/wakers/array/waker_array.rs | 40 +++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/src/utils/wakers/array/waker_array.rs b/src/utils/wakers/array/waker_array.rs index 345c46b..7797672 100644 --- a/src/utils/wakers/array/waker_array.rs +++ b/src/utils/wakers/array/waker_array.rs @@ -75,3 +75,43 @@ impl WakeDataContainer for WakerArrayInner { } } } + +#[cfg(test)] +mod tests { + use crate::utils::wakers::dummy_waker; + + use super::*; + #[test] + fn check_refcount() { + let mut wa = WakerArray::<5>::new(); + assert_eq!(Arc::strong_count(&wa.inner), 6); + wa.wakers[4] = dummy_waker(); + assert_eq!(Arc::strong_count(&wa.inner), 5); + let cloned = wa.wakers[3].clone(); + assert_eq!(Arc::strong_count(&wa.inner), 6); + wa.wakers[3] = wa.wakers[4].clone(); + assert_eq!(Arc::strong_count(&wa.inner), 5); + drop(cloned); + assert_eq!(Arc::strong_count(&wa.inner), 4); + + wa.wakers[0].wake_by_ref(); + wa.wakers[0].wake_by_ref(); + wa.wakers[0].wake_by_ref(); + assert_eq!(Arc::strong_count(&wa.inner), 4); + + wa.wakers[0] = wa.wakers[1].clone(); + assert_eq!(Arc::strong_count(&wa.inner), 4); + + let taken = std::mem::replace(&mut wa.wakers[2], dummy_waker()); + assert_eq!(Arc::strong_count(&wa.inner), 4); + taken.wake_by_ref(); + taken.wake_by_ref(); + taken.wake_by_ref(); + assert_eq!(Arc::strong_count(&wa.inner), 4); + taken.wake(); + assert_eq!(Arc::strong_count(&wa.inner), 3); + + wa.wakers = array::from_fn(|_| dummy_waker()); + assert_eq!(Arc::strong_count(&wa.inner), 1); + } +} From 4f164de7c749a5180af50b6b347e6bb3c24ddc3f Mon Sep 17 00:00:00 2001 From: Wisha Wa Date: Sun, 12 Feb 2023 07:13:03 +0000 Subject: [PATCH 4/5] improve readability, provenance compliance, and documentation for shared arc waker --- Cargo.toml | 1 + src/utils/wakers/array/waker_array.rs | 22 ++-- src/utils/wakers/mod.rs | 8 ++ src/utils/wakers/shared_arc.rs | 176 ++++++++++++++++---------- src/utils/wakers/vec/waker_vec.rs | 22 ++-- 5 files changed, 141 insertions(+), 88 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 25f411f..7512913 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ harness = false bitvec = { version = "1.0.1", default-features = false, features = ["alloc"] } futures-core = "0.3" pin-project = "1.0.8" +sptr = "0.3.2" [dev-dependencies] futures = "0.3.25" diff --git a/src/utils/wakers/array/waker_array.rs b/src/utils/wakers/array/waker_array.rs index 7797672..3ad3291 100644 --- a/src/utils/wakers/array/waker_array.rs +++ b/src/utils/wakers/array/waker_array.rs @@ -3,7 +3,7 @@ use core::task::Waker; use std::sync::{Arc, Mutex}; use super::{ - super::shared_arc::{waker_for_wake_data_slot, WakeDataContainer}, + super::shared_arc::{waker_from_redirect_position, SharedArcContent}, ReadinessArray, }; @@ -15,7 +15,7 @@ pub(crate) struct WakerArray { /// See [super::super::shared_arc] for how this works. struct WakerArrayInner { - wake_data: [*const Self; N], + redirect: [*const Self; N], readiness: Mutex>, } @@ -24,7 +24,7 @@ impl WakerArray { pub(crate) fn new() -> Self { let mut inner = Arc::new(WakerArrayInner { readiness: Mutex::new(ReadinessArray::new()), - wake_data: [std::ptr::null(); N], // We don't know the Arc's address yet so put null for now. + redirect: [std::ptr::null(); N], // We don't know the Arc's address yet so put null for now. }); let raw = Arc::into_raw(Arc::clone(&inner)); // The Arc's address. @@ -33,14 +33,14 @@ impl WakerArray { // So N Wakers -> count = N+1. unsafe { Arc::decrement_strong_count(raw) } - // Make wake_data all point to the Arc itself. - Arc::get_mut(&mut inner).unwrap().wake_data = [raw; N]; + // Make redirect all point to the Arc itself. + Arc::get_mut(&mut inner).unwrap().redirect = [raw; N]; - // Now the wake_data array is complete. Time to create the actual Wakers. + // Now the redirect array is complete. Time to create the actual Wakers. let wakers = array::from_fn(|i| { - let data = inner.wake_data.get(i).unwrap(); + let data = inner.redirect.get(i).unwrap(); unsafe { - waker_for_wake_data_slot::>( + waker_from_redirect_position::>( data as *const *const WakerArrayInner, ) } @@ -59,9 +59,9 @@ impl WakerArray { } } -impl WakeDataContainer for WakerArrayInner { - fn get_wake_data_slice(&self) -> &[*const Self] { - &self.wake_data +impl SharedArcContent for WakerArrayInner { + fn get_redirect_slice(&self) -> &[*const Self] { + &self.redirect } fn wake_index(&self, index: usize) { diff --git a/src/utils/wakers/mod.rs b/src/utils/wakers/mod.rs index e4fe1e7..72e0815 100644 --- a/src/utils/wakers/mod.rs +++ b/src/utils/wakers/mod.rs @@ -1,3 +1,11 @@ +//! Wakers that track when they are woken. +//! +//! By tracking which subfutures have woken, we can avoid having to re-poll N subfutures every time. +//! This tracking is done by a [ReadinessArray]/[ReadinessVec]. These store the indexes of the subfutures that have woken. +//! Each subfuture are given a Waker when polled. +//! This waker must know the index of its corresponding subfuture so that it can update Readiness correctly. +//! + mod array; mod shared_arc; mod vec; diff --git a/src/utils/wakers/shared_arc.rs b/src/utils/wakers/shared_arc.rs index cad8431..313fb41 100644 --- a/src/utils/wakers/shared_arc.rs +++ b/src/utils/wakers/shared_arc.rs @@ -1,92 +1,136 @@ -use core::task::{RawWaker, RawWakerVTable, Waker}; -use std::sync::Arc; +//! To save on allocations, we avoid making a separate Arc Waker for every subfuture. +//! Rather, we have all N Wakers share a single Arc, and use a "redirect" mechanism to allow different wakers to be distinguished. +//! The mechanism works as follows. +//! The Arc contains 2 things: +//! - the Readiness structure ([ReadinessArray][super::array::ReadinessArray] / [ReadinessVec][super::vec::ReadinessVec]) +//! - the redirect array. +//! The redirect array contains N repeated copies of the pointer to the Arc itself (obtained by `Arc::into_raw`). +//! The Waker for the `i`th subfuture points to the `i`th item in the redirect array. +//! (i.e. the Waker pointer is `*const *const A` where `A` is the type of the item in the Arc) +//! When the Waker is woken, we deref it twice (giving reference to the content of the Arc), +//! and compare it to the address of the redirect slice. +//! The difference tells us the index of the waker. We can then record this woken index in the Readiness. +//! +//! ```text +//! ┌───────────────────────────┬──────────────┬──────────────┐ +//! │ │ │ │ +//! │ / ┌─────────────┬──────┼───────┬──────┼───────┬──────┼───────┬─────┐ \ +//! ▼ / │ │ │ │ │ │ │ │ │ \ +//! Arc < │ Readiness │ redirect[0] │ redirect[1] │ redirect[2] │ ... │ > +//! ▲ \ │ │ │ │ │ │ / +//! │ \ └─────────────┴──────▲───────┴──────▲───────┴──────▲───────┴─────┘ / +//! │ │ │ │ +//! └─┐ ┌───────────────┘ │ │ +//! │ │ │ │ +//! │ │ ┌──────────────────┘ │ +//! │ │ │ │ +//! │ │ │ ┌─────────────────────┘ +//! │ │ │ │ +//! │ │ │ │ +//! ┌────┼────┬────┼──────┬────┼──────┬────┼──────┬─────┐ +//! │ │ │ │ │ │ │ │ │ │ +//! │ │ wakers[0] │ wakers[1] │ wakers[2] │ ... │ +//! │ │ │ │ │ │ +//! └─────────┴───────────┴───────────┴───────────┴─────┘ +//! ``` -// In the diagram below, `A` is the upper block. -// It is a struct that implements WakeDataContainer (so either WakerVecInner or WakerArrayInner). -// The lower block is either WakerVec or WakerArray. Each waker there points to a slot of wake_data in `A`. -// Every one of these slots contain a pointer to the Arc wrapping `A` itself. -// Wakers figure out their indices by comparing the address they are pointing to to `wake_data`'s start address. -// -// ┌───────────────────────────┬──────────────┬──────────────┐ -// │ │ │ │ -// │ / ┌─────────────┬──────┼───────┬──────┼───────┬──────┼───────┬─────┐ \ -// ▼ / │ │ │ │ │ │ │ │ │ \ -// Arc < │ Readiness │ wake_data[0] │ wake_data[1] │ wake_data[2] │ ... │ > -// ▲ \ │ │ │ │ │ │ / -// │ \ └─────────────┴──────▲───────┴──────▲───────┴──────▲───────┴─────┘ / -// │ │ │ │ -// └─┐ ┌───────────────┘ │ │ -// │ │ │ │ -// │ │ ┌──────────────────┘ │ -// │ │ │ │ -// │ │ │ ┌─────────────────────┘ -// │ │ │ │ -// │ │ │ │ -// ┌────┼────┬────┼──────┬────┼──────┬────┼──────┬─────┐ -// │ │ │ │ │ │ │ │ │ │ -// │ Inner │ wakers[0] │ wakers[1] │ wakers[2] │ ... │ -// │ │ │ │ │ │ -// └─────────┴───────────┴───────────┴───────────┴─────┘ +// TODO: Right now each waker gets its own redirect slot. +// We can save space by making size_of::<*const _>() wakers share the same slot. +// With such change, in 64-bit system, the redirect array/vec would only need ⌈N/8⌉ slots instead of N. -// TODO: Right now each waker gets its own wake_data slot. -// We can save space by making size_of::() wakers share the same slot. -// With such change, in 64-bit system, the wake_data array/vec would only need ⌈N/8⌉ slots instead of N. +use core::task::{RawWaker, RawWakerVTable, Waker}; +use std::sync::Arc; -pub(super) trait WakeDataContainer { - /// Get the reference of the wake_data slice. This is used to compute the index. - fn get_wake_data_slice(&self) -> &[*const Self]; +/// A trait to be implemented on [super::WakerArray] and [super::WakerVec] for polymorphism. +/// These are the type that goes in the Arc. They both contain the Readiness and the redirect array/vec. +pub(super) trait SharedArcContent { + /// Get the reference of the redirect slice. This is used to compute the index. + fn get_redirect_slice(&self) -> &[*const Self]; /// Called when the subfuture at the specified index should be polled. + /// Should call `Readiness::set_ready`. fn wake_index(&self, index: usize); } -pub(super) unsafe fn waker_for_wake_data_slot( + +/// Create one waker following the mechanism described in the [module][self] doc. +/// The following must be upheld for safety: +/// - `pointer` must points to a slot in the redirect array. +/// - that slot must contain a pointer obtained by `Arc::::into_raw`. +/// - the Arc must still be alive at the time this function is called. +/// The following should be upheld for correct behavior: +/// - calling `SharedArcContent::get_redirect_slice` on the content of the Arc should give the redirect array within which `pointer` points to. +#[deny(unsafe_op_in_unsafe_fn)] +pub(super) unsafe fn waker_from_redirect_position( pointer: *const *const A, ) -> Waker { - unsafe fn clone_waker(pointer: *const ()) -> RawWaker { + /// Create a Waker from a type-erased pointer. + /// The pointer must satisfy the safety constraints listed in the wrapping function's documentation. + unsafe fn create_waker(pointer: *const ()) -> RawWaker { + // Retype the type-erased pointer. let pointer = pointer as *const *const A; - let raw = *pointer; // This is the raw pointer of Arc. // We're creating a new Waker, so we need to increment the count. - Arc::increment_strong_count(raw); + // SAFETY: The constraints listed for the wrapping function documentation means + // - `*pointer` is an `*const A` obtained from `Arc::::into_raw`. + // - the Arc is alive. + // So this operation is safe. + unsafe { Arc::increment_strong_count(*pointer) }; RawWaker::new(pointer as *const (), create_vtable::()) } - // Convert a pointer to a wake_data slot to the Arc. - unsafe fn to_arc(pointer: *const *const A) -> Arc { - let raw = *pointer; - Arc::from_raw(raw) - } - unsafe fn wake(pointer: *const ()) { + /// Invoke `SharedArcContent::wake_index` with the index in the redirect slice where this pointer points to. + /// The pointer must satisfy the safety constraints listed in the wrapping function's documentation. + unsafe fn wake_by_ref(pointer: *const ()) { + // Retype the type-erased pointer. let pointer = pointer as *const *const A; - let arc = to_arc::(pointer); - // Calculate the index - let index = ((pointer as usize) // This is the slot our pointer points to. - - (arc.get_wake_data_slice() as *const [*const A] as *const () as usize)) // This is the starting address of wake_data. - / std::mem::size_of::<*const A>(); - arc.wake_index(index); + // SAFETY: we are already requiring `pointer` to point to a slot in the redirect array. + let raw: *const A = unsafe { *pointer }; + // SAFETY: we are already requiring the pointer in the redirect array slot to be obtained from `Arc::into_raw`. + let arc_content: &A = unsafe { &*raw }; - // Dropping the Arc would decrement the strong count. - // We only want to do that when we're not waking by ref. - if BY_REF { - std::mem::forget(arc); - } else { - std::mem::drop(arc); - } + // Calculate the index. + // This is your familiar pointer math + // `item_address = array_address + (index * item_size)` + // rearranged to + // `index = (item_address - array_address) / item_size`. + let item_address = sptr::Strict::addr(pointer); + let redirect_slice_address = sptr::Strict::addr(arc_content.get_redirect_slice().as_ptr()); + let redirect_item_size = core::mem::size_of::<*const A>(); // the size of each item in the redirect slice + let index = (item_address - redirect_slice_address) / redirect_item_size; + + arc_content.wake_index(index); } - unsafe fn drop_waker(pointer: *const ()) { + + /// The pointer must satisfy the safety constraints listed in the wrapping function's documentation. + unsafe fn drop_waker(pointer: *const ()) { + // Retype the type-erased pointer. let pointer = pointer as *const *const A; - let arc = to_arc::(pointer); - // Decrement the strong count by dropping the Arc. - std::mem::drop(arc); + + // SAFETY: we are already requiring `pointer` to point to a slot in the redirect array. + let raw = unsafe { *pointer }; + // SAFETY: we are already requiring the pointer in the redirect array slot to be obtained from `Arc::into_raw`. + unsafe { Arc::decrement_strong_count(raw) }; } - fn create_vtable() -> &'static RawWakerVTable { + + /// The pointer must satisfy the safety constraints listed in the wrapping function's documentation. + unsafe fn wake(pointer: *const ()) { + // SAFETY: we are already requiring the constraints of `wake_by_ref` and `drop_waker`. + unsafe { + wake_by_ref::(pointer); + drop_waker::(pointer); + } + } + + fn create_vtable() -> &'static RawWakerVTable { &RawWakerVTable::new( - clone_waker::, - wake::, - wake::, + create_waker::, + wake::, + wake_by_ref::, drop_waker::, ) } - Waker::from_raw(clone_waker::(pointer as *const ())) + // SAFETY: All our vtable functions adhere to the RawWakerVTable contract, + // and we are already requiring that `pointer` is what our functions expect. + unsafe { Waker::from_raw(create_waker::(pointer as *const ())) } } diff --git a/src/utils/wakers/vec/waker_vec.rs b/src/utils/wakers/vec/waker_vec.rs index 2e21f3f..e4f993e 100644 --- a/src/utils/wakers/vec/waker_vec.rs +++ b/src/utils/wakers/vec/waker_vec.rs @@ -2,7 +2,7 @@ use core::task::Waker; use std::sync::{Arc, Mutex}; use super::{ - super::shared_arc::{waker_for_wake_data_slot, WakeDataContainer}, + super::shared_arc::{waker_from_redirect_position, SharedArcContent}, ReadinessVec, }; @@ -14,7 +14,7 @@ pub(crate) struct WakerVec { /// See [super::super::shared_arc] for how this works. struct WakerVecInner { - wake_data: Vec<*const Self>, + redirect: Vec<*const Self>, readiness: Mutex, } @@ -23,7 +23,7 @@ impl WakerVec { pub(crate) fn new(len: usize) -> Self { let mut inner = Arc::new(WakerVecInner { readiness: Mutex::new(ReadinessVec::new(len)), - wake_data: Vec::new(), + redirect: Vec::new(), }); let raw = Arc::into_raw(Arc::clone(&inner)); // The Arc's address. @@ -32,15 +32,15 @@ impl WakerVec { // So N Wakers -> count = N+1. unsafe { Arc::decrement_strong_count(raw) } - // Make wake_data all point to the Arc itself. - Arc::get_mut(&mut inner).unwrap().wake_data = vec![raw; len]; + // Make redirect all point to the Arc itself. + Arc::get_mut(&mut inner).unwrap().redirect = vec![raw; len]; - // Now the wake_data vec is complete. Time to create the actual Wakers. + // Now the redirect vec is complete. Time to create the actual Wakers. let wakers = inner - .wake_data + .redirect .iter() .map(|data| unsafe { - waker_for_wake_data_slot::(data as *const *const WakerVecInner) + waker_from_redirect_position::(data as *const *const WakerVecInner) }) .collect(); @@ -56,9 +56,9 @@ impl WakerVec { } } -impl WakeDataContainer for WakerVecInner { - fn get_wake_data_slice(&self) -> &[*const Self] { - &self.wake_data +impl SharedArcContent for WakerVecInner { + fn get_redirect_slice(&self) -> &[*const Self] { + &self.redirect } fn wake_index(&self, index: usize) { From 6466c29dec2b918afe4099dea835e853b0778f42 Mon Sep 17 00:00:00 2001 From: Wisha Wa Date: Mon, 13 Feb 2023 04:25:16 +0000 Subject: [PATCH 5/5] simplify creation of WakerArray and WakerVec and make waker_from_redirect_position safer --- Cargo.toml | 1 - src/utils/wakers/array/waker_array.rs | 46 ++++++++-------- src/utils/wakers/shared_arc.rs | 76 ++++++++++++++++----------- src/utils/wakers/vec/waker_vec.rs | 32 +++++------ 4 files changed, 78 insertions(+), 77 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7512913..25f411f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,6 @@ harness = false bitvec = { version = "1.0.1", default-features = false, features = ["alloc"] } futures-core = "0.3" pin-project = "1.0.8" -sptr = "0.3.2" [dev-dependencies] futures = "0.3.25" diff --git a/src/utils/wakers/array/waker_array.rs b/src/utils/wakers/array/waker_array.rs index 3ad3291..ac6def9 100644 --- a/src/utils/wakers/array/waker_array.rs +++ b/src/utils/wakers/array/waker_array.rs @@ -1,6 +1,6 @@ use core::array; use core::task::Waker; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, Weak}; use super::{ super::shared_arc::{waker_from_redirect_position, SharedArcContent}, @@ -22,30 +22,18 @@ struct WakerArrayInner { impl WakerArray { /// Create a new instance of `WakerArray`. pub(crate) fn new() -> Self { - let mut inner = Arc::new(WakerArrayInner { - readiness: Mutex::new(ReadinessArray::new()), - redirect: [std::ptr::null(); N], // We don't know the Arc's address yet so put null for now. - }); - let raw = Arc::into_raw(Arc::clone(&inner)); // The Arc's address. - - // At this point the strong count is 2. Decrement it to 1. - // Each time we create/clone a Waker the count will be incremented by 1. - // So N Wakers -> count = N+1. - unsafe { Arc::decrement_strong_count(raw) } - - // Make redirect all point to the Arc itself. - Arc::get_mut(&mut inner).unwrap().redirect = [raw; N]; - - // Now the redirect array is complete. Time to create the actual Wakers. - let wakers = array::from_fn(|i| { - let data = inner.redirect.get(i).unwrap(); - unsafe { - waker_from_redirect_position::>( - data as *const *const WakerArrayInner, - ) + let inner = Arc::new_cyclic(|w| { + // `Weak::as_ptr` on a live Weak gives the same thing as `Arc::into_raw`. + let raw = Weak::as_ptr(w); + WakerArrayInner { + readiness: Mutex::new(ReadinessArray::new()), + redirect: [raw; N], } }); + let wakers = + array::from_fn(|i| unsafe { waker_from_redirect_position(Arc::clone(&inner), i) }); + Self { inner, wakers } } @@ -59,7 +47,8 @@ impl WakerArray { } } -impl SharedArcContent for WakerArrayInner { +#[deny(unsafe_op_in_unsafe_fn)] +unsafe impl SharedArcContent for WakerArrayInner { fn get_redirect_slice(&self) -> &[*const Self] { &self.redirect } @@ -84,7 +73,10 @@ mod tests { #[test] fn check_refcount() { let mut wa = WakerArray::<5>::new(); + + // Each waker holds 1 ref, and the combinator itself holds 1. assert_eq!(Arc::strong_count(&wa.inner), 6); + wa.wakers[4] = dummy_waker(); assert_eq!(Arc::strong_count(&wa.inner), 5); let cloned = wa.wakers[3].clone(); @@ -105,13 +97,17 @@ mod tests { let taken = std::mem::replace(&mut wa.wakers[2], dummy_waker()); assert_eq!(Arc::strong_count(&wa.inner), 4); taken.wake_by_ref(); - taken.wake_by_ref(); - taken.wake_by_ref(); + assert_eq!(Arc::strong_count(&wa.inner), 4); + taken.clone().wake(); assert_eq!(Arc::strong_count(&wa.inner), 4); taken.wake(); assert_eq!(Arc::strong_count(&wa.inner), 3); wa.wakers = array::from_fn(|_| dummy_waker()); assert_eq!(Arc::strong_count(&wa.inner), 1); + + let weak = Arc::downgrade(&wa.inner); + drop(wa); + assert_eq!(weak.strong_count(), 0); } } diff --git a/src/utils/wakers/shared_arc.rs b/src/utils/wakers/shared_arc.rs index 313fb41..1e9a555 100644 --- a/src/utils/wakers/shared_arc.rs +++ b/src/utils/wakers/shared_arc.rs @@ -43,43 +43,46 @@ use std::sync::Arc; /// A trait to be implemented on [super::WakerArray] and [super::WakerVec] for polymorphism. /// These are the type that goes in the Arc. They both contain the Readiness and the redirect array/vec. -pub(super) trait SharedArcContent { - /// Get the reference of the redirect slice. This is used to compute the index. +/// # Safety +/// The `get_redirect_slice` method MUST always return the same slice for the same self. +pub(super) unsafe trait SharedArcContent { + /// Get the reference of the redirect slice. fn get_redirect_slice(&self) -> &[*const Self]; + /// Called when the subfuture at the specified index should be polled. /// Should call `Readiness::set_ready`. fn wake_index(&self, index: usize); } /// Create one waker following the mechanism described in the [module][self] doc. -/// The following must be upheld for safety: -/// - `pointer` must points to a slot in the redirect array. -/// - that slot must contain a pointer obtained by `Arc::::into_raw`. -/// - the Arc must still be alive at the time this function is called. -/// The following should be upheld for correct behavior: -/// - calling `SharedArcContent::get_redirect_slice` on the content of the Arc should give the redirect array within which `pointer` points to. +/// For safety, the index MUST be within bounds of the slice returned by `A::get_redirect_slice()`. #[deny(unsafe_op_in_unsafe_fn)] pub(super) unsafe fn waker_from_redirect_position( - pointer: *const *const A, + arc: Arc, + index: usize, ) -> Waker { - /// Create a Waker from a type-erased pointer. - /// The pointer must satisfy the safety constraints listed in the wrapping function's documentation. - unsafe fn create_waker(pointer: *const ()) -> RawWaker { + // For `create_waker`, `wake_by_ref`, `wake`, and `drop_waker`, the following MUST be upheld for safety: + // - `pointer` must points to a slot in the redirect array. + // - that slot must contain a pointer of an Arc obtained from `Arc::::into_raw`. + // - that Arc must still be alive (strong count > 0) at the time the function is called. + + /// Clone a Waker from a type-erased pointer. + /// The pointer must satisfy the safety constraints listed in the code comments above. + unsafe fn clone_waker(pointer: *const ()) -> RawWaker { // Retype the type-erased pointer. let pointer = pointer as *const *const A; - // We're creating a new Waker, so we need to increment the count. - // SAFETY: The constraints listed for the wrapping function documentation means + // Increment the count so that the Arc won't die before this new Waker we're creating. + // SAFETY: The required constraints means // - `*pointer` is an `*const A` obtained from `Arc::::into_raw`. - // - the Arc is alive. - // So this operation is safe. + // - the Arc is alive right now. unsafe { Arc::increment_strong_count(*pointer) }; RawWaker::new(pointer as *const (), create_vtable::()) } /// Invoke `SharedArcContent::wake_index` with the index in the redirect slice where this pointer points to. - /// The pointer must satisfy the safety constraints listed in the wrapping function's documentation. + /// The pointer must satisfy the safety constraints listed in the code comments above. unsafe fn wake_by_ref(pointer: *const ()) { // Retype the type-erased pointer. let pointer = pointer as *const *const A; @@ -89,31 +92,28 @@ pub(super) unsafe fn waker_from_redirect_position( // SAFETY: we are already requiring the pointer in the redirect array slot to be obtained from `Arc::into_raw`. let arc_content: &A = unsafe { &*raw }; - // Calculate the index. - // This is your familiar pointer math - // `item_address = array_address + (index * item_size)` - // rearranged to - // `index = (item_address - array_address) / item_size`. - let item_address = sptr::Strict::addr(pointer); - let redirect_slice_address = sptr::Strict::addr(arc_content.get_redirect_slice().as_ptr()); - let redirect_item_size = core::mem::size_of::<*const A>(); // the size of each item in the redirect slice - let index = (item_address - redirect_slice_address) / redirect_item_size; + let slice_start = arc_content.get_redirect_slice().as_ptr(); + + // We'll switch to [`sub_ptr`](https://github.com/rust-lang/rust/issues/95892) once that's stable. + let index = unsafe { pointer.offset_from(slice_start) } as usize; arc_content.wake_index(index); } - /// The pointer must satisfy the safety constraints listed in the wrapping function's documentation. + /// Drop the waker (and drop the shared Arc if other Wakers and the combinator have all been dropped). + /// The pointer must satisfy the safety constraints listed in the code comments above. unsafe fn drop_waker(pointer: *const ()) { // Retype the type-erased pointer. let pointer = pointer as *const *const A; // SAFETY: we are already requiring `pointer` to point to a slot in the redirect array. - let raw = unsafe { *pointer }; + let raw: *const A = unsafe { *pointer }; // SAFETY: we are already requiring the pointer in the redirect array slot to be obtained from `Arc::into_raw`. unsafe { Arc::decrement_strong_count(raw) }; } - /// The pointer must satisfy the safety constraints listed in the wrapping function's documentation. + /// Like `wake_by_ref` but consumes the Waker. + /// The pointer must satisfy the safety constraints listed in the code comments above. unsafe fn wake(pointer: *const ()) { // SAFETY: we are already requiring the constraints of `wake_by_ref` and `drop_waker`. unsafe { @@ -124,13 +124,27 @@ pub(super) unsafe fn waker_from_redirect_position( fn create_vtable() -> &'static RawWakerVTable { &RawWakerVTable::new( - create_waker::, + clone_waker::, wake::, wake_by_ref::, drop_waker::, ) } + + let redirect_slice: &[*const A] = arc.get_redirect_slice(); + + debug_assert!(redirect_slice.len() > index); + + // SAFETY: we are already requiring that index be in bound of the slice. + let pointer: *const *const A = unsafe { redirect_slice.as_ptr().add(index) }; + // Type-erase the pointer because the Waker methods expect so. + let pointer = pointer as *const (); + + // We want to transfer management of the one strong count associated with `arc` to the Waker we're creating. + // That count should only be decremented when the Waker is dropped (by `drop_waker`). + core::mem::forget(arc); + // SAFETY: All our vtable functions adhere to the RawWakerVTable contract, // and we are already requiring that `pointer` is what our functions expect. - unsafe { Waker::from_raw(create_waker::(pointer as *const ())) } + unsafe { Waker::from_raw(RawWaker::new(pointer, create_vtable::())) } } diff --git a/src/utils/wakers/vec/waker_vec.rs b/src/utils/wakers/vec/waker_vec.rs index e4f993e..a84d9e7 100644 --- a/src/utils/wakers/vec/waker_vec.rs +++ b/src/utils/wakers/vec/waker_vec.rs @@ -1,5 +1,5 @@ use core::task::Waker; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, Weak}; use super::{ super::shared_arc::{waker_from_redirect_position, SharedArcContent}, @@ -21,27 +21,18 @@ struct WakerVecInner { impl WakerVec { /// Create a new instance of `WakerVec`. pub(crate) fn new(len: usize) -> Self { - let mut inner = Arc::new(WakerVecInner { - readiness: Mutex::new(ReadinessVec::new(len)), - redirect: Vec::new(), + let inner = Arc::new_cyclic(|w| { + // `Weak::as_ptr` on a live Weak gives the same thing as `Arc::into_raw`. + let raw = Weak::as_ptr(w); + WakerVecInner { + readiness: Mutex::new(ReadinessVec::new(len)), + redirect: vec![raw; len], + } }); - let raw = Arc::into_raw(Arc::clone(&inner)); // The Arc's address. - - // At this point the strong count is 2. Decrement it to 1. - // Each time we create/clone a Waker the count will be incremented by 1. - // So N Wakers -> count = N+1. - unsafe { Arc::decrement_strong_count(raw) } - - // Make redirect all point to the Arc itself. - Arc::get_mut(&mut inner).unwrap().redirect = vec![raw; len]; // Now the redirect vec is complete. Time to create the actual Wakers. - let wakers = inner - .redirect - .iter() - .map(|data| unsafe { - waker_from_redirect_position::(data as *const *const WakerVecInner) - }) + let wakers = (0..len) + .map(|i| unsafe { waker_from_redirect_position(Arc::clone(&inner), i) }) .collect(); Self { inner, wakers } @@ -56,7 +47,8 @@ impl WakerVec { } } -impl SharedArcContent for WakerVecInner { +#[deny(unsafe_op_in_unsafe_fn)] +unsafe impl SharedArcContent for WakerVecInner { fn get_redirect_slice(&self) -> &[*const Self] { &self.redirect }