From 4f164de7c749a5180af50b6b347e6bb3c24ddc3f Mon Sep 17 00:00:00 2001 From: Wisha Wa Date: Sun, 12 Feb 2023 07:13:03 +0000 Subject: [PATCH] improve readability, provenance compliance, and documentation for shared arc waker --- Cargo.toml | 1 + src/utils/wakers/array/waker_array.rs | 22 ++-- src/utils/wakers/mod.rs | 8 ++ src/utils/wakers/shared_arc.rs | 176 ++++++++++++++++---------- src/utils/wakers/vec/waker_vec.rs | 22 ++-- 5 files changed, 141 insertions(+), 88 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 25f411f..7512913 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ harness = false bitvec = { version = "1.0.1", default-features = false, features = ["alloc"] } futures-core = "0.3" pin-project = "1.0.8" +sptr = "0.3.2" [dev-dependencies] futures = "0.3.25" diff --git a/src/utils/wakers/array/waker_array.rs b/src/utils/wakers/array/waker_array.rs index 7797672..3ad3291 100644 --- a/src/utils/wakers/array/waker_array.rs +++ b/src/utils/wakers/array/waker_array.rs @@ -3,7 +3,7 @@ use core::task::Waker; use std::sync::{Arc, Mutex}; use super::{ - super::shared_arc::{waker_for_wake_data_slot, WakeDataContainer}, + super::shared_arc::{waker_from_redirect_position, SharedArcContent}, ReadinessArray, }; @@ -15,7 +15,7 @@ pub(crate) struct WakerArray { /// See [super::super::shared_arc] for how this works. struct WakerArrayInner { - wake_data: [*const Self; N], + redirect: [*const Self; N], readiness: Mutex>, } @@ -24,7 +24,7 @@ impl WakerArray { pub(crate) fn new() -> Self { let mut inner = Arc::new(WakerArrayInner { readiness: Mutex::new(ReadinessArray::new()), - wake_data: [std::ptr::null(); N], // We don't know the Arc's address yet so put null for now. + redirect: [std::ptr::null(); N], // We don't know the Arc's address yet so put null for now. }); let raw = Arc::into_raw(Arc::clone(&inner)); // The Arc's address. @@ -33,14 +33,14 @@ impl WakerArray { // So N Wakers -> count = N+1. unsafe { Arc::decrement_strong_count(raw) } - // Make wake_data all point to the Arc itself. - Arc::get_mut(&mut inner).unwrap().wake_data = [raw; N]; + // Make redirect all point to the Arc itself. + Arc::get_mut(&mut inner).unwrap().redirect = [raw; N]; - // Now the wake_data array is complete. Time to create the actual Wakers. + // Now the redirect array is complete. Time to create the actual Wakers. let wakers = array::from_fn(|i| { - let data = inner.wake_data.get(i).unwrap(); + let data = inner.redirect.get(i).unwrap(); unsafe { - waker_for_wake_data_slot::>( + waker_from_redirect_position::>( data as *const *const WakerArrayInner, ) } @@ -59,9 +59,9 @@ impl WakerArray { } } -impl WakeDataContainer for WakerArrayInner { - fn get_wake_data_slice(&self) -> &[*const Self] { - &self.wake_data +impl SharedArcContent for WakerArrayInner { + fn get_redirect_slice(&self) -> &[*const Self] { + &self.redirect } fn wake_index(&self, index: usize) { diff --git a/src/utils/wakers/mod.rs b/src/utils/wakers/mod.rs index e4fe1e7..72e0815 100644 --- a/src/utils/wakers/mod.rs +++ b/src/utils/wakers/mod.rs @@ -1,3 +1,11 @@ +//! Wakers that track when they are woken. +//! +//! By tracking which subfutures have woken, we can avoid having to re-poll N subfutures every time. +//! This tracking is done by a [ReadinessArray]/[ReadinessVec]. These store the indexes of the subfutures that have woken. +//! Each subfuture are given a Waker when polled. +//! This waker must know the index of its corresponding subfuture so that it can update Readiness correctly. +//! + mod array; mod shared_arc; mod vec; diff --git a/src/utils/wakers/shared_arc.rs b/src/utils/wakers/shared_arc.rs index cad8431..313fb41 100644 --- a/src/utils/wakers/shared_arc.rs +++ b/src/utils/wakers/shared_arc.rs @@ -1,92 +1,136 @@ -use core::task::{RawWaker, RawWakerVTable, Waker}; -use std::sync::Arc; +//! To save on allocations, we avoid making a separate Arc Waker for every subfuture. +//! Rather, we have all N Wakers share a single Arc, and use a "redirect" mechanism to allow different wakers to be distinguished. +//! The mechanism works as follows. +//! The Arc contains 2 things: +//! - the Readiness structure ([ReadinessArray][super::array::ReadinessArray] / [ReadinessVec][super::vec::ReadinessVec]) +//! - the redirect array. +//! The redirect array contains N repeated copies of the pointer to the Arc itself (obtained by `Arc::into_raw`). +//! The Waker for the `i`th subfuture points to the `i`th item in the redirect array. +//! (i.e. the Waker pointer is `*const *const A` where `A` is the type of the item in the Arc) +//! When the Waker is woken, we deref it twice (giving reference to the content of the Arc), +//! and compare it to the address of the redirect slice. +//! The difference tells us the index of the waker. We can then record this woken index in the Readiness. +//! +//! ```text +//! ┌───────────────────────────┬──────────────┬──────────────┐ +//! │ │ │ │ +//! │ / ┌─────────────┬──────┼───────┬──────┼───────┬──────┼───────┬─────┐ \ +//! ▼ / │ │ │ │ │ │ │ │ │ \ +//! Arc < │ Readiness │ redirect[0] │ redirect[1] │ redirect[2] │ ... │ > +//! ▲ \ │ │ │ │ │ │ / +//! │ \ └─────────────┴──────▲───────┴──────▲───────┴──────▲───────┴─────┘ / +//! │ │ │ │ +//! └─┐ ┌───────────────┘ │ │ +//! │ │ │ │ +//! │ │ ┌──────────────────┘ │ +//! │ │ │ │ +//! │ │ │ ┌─────────────────────┘ +//! │ │ │ │ +//! │ │ │ │ +//! ┌────┼────┬────┼──────┬────┼──────┬────┼──────┬─────┐ +//! │ │ │ │ │ │ │ │ │ │ +//! │ │ wakers[0] │ wakers[1] │ wakers[2] │ ... │ +//! │ │ │ │ │ │ +//! └─────────┴───────────┴───────────┴───────────┴─────┘ +//! ``` -// In the diagram below, `A` is the upper block. -// It is a struct that implements WakeDataContainer (so either WakerVecInner or WakerArrayInner). -// The lower block is either WakerVec or WakerArray. Each waker there points to a slot of wake_data in `A`. -// Every one of these slots contain a pointer to the Arc wrapping `A` itself. -// Wakers figure out their indices by comparing the address they are pointing to to `wake_data`'s start address. -// -// ┌───────────────────────────┬──────────────┬──────────────┐ -// │ │ │ │ -// │ / ┌─────────────┬──────┼───────┬──────┼───────┬──────┼───────┬─────┐ \ -// ▼ / │ │ │ │ │ │ │ │ │ \ -// Arc < │ Readiness │ wake_data[0] │ wake_data[1] │ wake_data[2] │ ... │ > -// ▲ \ │ │ │ │ │ │ / -// │ \ └─────────────┴──────▲───────┴──────▲───────┴──────▲───────┴─────┘ / -// │ │ │ │ -// └─┐ ┌───────────────┘ │ │ -// │ │ │ │ -// │ │ ┌──────────────────┘ │ -// │ │ │ │ -// │ │ │ ┌─────────────────────┘ -// │ │ │ │ -// │ │ │ │ -// ┌────┼────┬────┼──────┬────┼──────┬────┼──────┬─────┐ -// │ │ │ │ │ │ │ │ │ │ -// │ Inner │ wakers[0] │ wakers[1] │ wakers[2] │ ... │ -// │ │ │ │ │ │ -// └─────────┴───────────┴───────────┴───────────┴─────┘ +// TODO: Right now each waker gets its own redirect slot. +// We can save space by making size_of::<*const _>() wakers share the same slot. +// With such change, in 64-bit system, the redirect array/vec would only need ⌈N/8⌉ slots instead of N. -// TODO: Right now each waker gets its own wake_data slot. -// We can save space by making size_of::() wakers share the same slot. -// With such change, in 64-bit system, the wake_data array/vec would only need ⌈N/8⌉ slots instead of N. +use core::task::{RawWaker, RawWakerVTable, Waker}; +use std::sync::Arc; -pub(super) trait WakeDataContainer { - /// Get the reference of the wake_data slice. This is used to compute the index. - fn get_wake_data_slice(&self) -> &[*const Self]; +/// A trait to be implemented on [super::WakerArray] and [super::WakerVec] for polymorphism. +/// These are the type that goes in the Arc. They both contain the Readiness and the redirect array/vec. +pub(super) trait SharedArcContent { + /// Get the reference of the redirect slice. This is used to compute the index. + fn get_redirect_slice(&self) -> &[*const Self]; /// Called when the subfuture at the specified index should be polled. + /// Should call `Readiness::set_ready`. fn wake_index(&self, index: usize); } -pub(super) unsafe fn waker_for_wake_data_slot( + +/// Create one waker following the mechanism described in the [module][self] doc. +/// The following must be upheld for safety: +/// - `pointer` must points to a slot in the redirect array. +/// - that slot must contain a pointer obtained by `Arc::::into_raw`. +/// - the Arc must still be alive at the time this function is called. +/// The following should be upheld for correct behavior: +/// - calling `SharedArcContent::get_redirect_slice` on the content of the Arc should give the redirect array within which `pointer` points to. +#[deny(unsafe_op_in_unsafe_fn)] +pub(super) unsafe fn waker_from_redirect_position( pointer: *const *const A, ) -> Waker { - unsafe fn clone_waker(pointer: *const ()) -> RawWaker { + /// Create a Waker from a type-erased pointer. + /// The pointer must satisfy the safety constraints listed in the wrapping function's documentation. + unsafe fn create_waker(pointer: *const ()) -> RawWaker { + // Retype the type-erased pointer. let pointer = pointer as *const *const A; - let raw = *pointer; // This is the raw pointer of Arc. // We're creating a new Waker, so we need to increment the count. - Arc::increment_strong_count(raw); + // SAFETY: The constraints listed for the wrapping function documentation means + // - `*pointer` is an `*const A` obtained from `Arc::::into_raw`. + // - the Arc is alive. + // So this operation is safe. + unsafe { Arc::increment_strong_count(*pointer) }; RawWaker::new(pointer as *const (), create_vtable::()) } - // Convert a pointer to a wake_data slot to the Arc. - unsafe fn to_arc(pointer: *const *const A) -> Arc { - let raw = *pointer; - Arc::from_raw(raw) - } - unsafe fn wake(pointer: *const ()) { + /// Invoke `SharedArcContent::wake_index` with the index in the redirect slice where this pointer points to. + /// The pointer must satisfy the safety constraints listed in the wrapping function's documentation. + unsafe fn wake_by_ref(pointer: *const ()) { + // Retype the type-erased pointer. let pointer = pointer as *const *const A; - let arc = to_arc::(pointer); - // Calculate the index - let index = ((pointer as usize) // This is the slot our pointer points to. - - (arc.get_wake_data_slice() as *const [*const A] as *const () as usize)) // This is the starting address of wake_data. - / std::mem::size_of::<*const A>(); - arc.wake_index(index); + // SAFETY: we are already requiring `pointer` to point to a slot in the redirect array. + let raw: *const A = unsafe { *pointer }; + // SAFETY: we are already requiring the pointer in the redirect array slot to be obtained from `Arc::into_raw`. + let arc_content: &A = unsafe { &*raw }; - // Dropping the Arc would decrement the strong count. - // We only want to do that when we're not waking by ref. - if BY_REF { - std::mem::forget(arc); - } else { - std::mem::drop(arc); - } + // Calculate the index. + // This is your familiar pointer math + // `item_address = array_address + (index * item_size)` + // rearranged to + // `index = (item_address - array_address) / item_size`. + let item_address = sptr::Strict::addr(pointer); + let redirect_slice_address = sptr::Strict::addr(arc_content.get_redirect_slice().as_ptr()); + let redirect_item_size = core::mem::size_of::<*const A>(); // the size of each item in the redirect slice + let index = (item_address - redirect_slice_address) / redirect_item_size; + + arc_content.wake_index(index); } - unsafe fn drop_waker(pointer: *const ()) { + + /// The pointer must satisfy the safety constraints listed in the wrapping function's documentation. + unsafe fn drop_waker(pointer: *const ()) { + // Retype the type-erased pointer. let pointer = pointer as *const *const A; - let arc = to_arc::(pointer); - // Decrement the strong count by dropping the Arc. - std::mem::drop(arc); + + // SAFETY: we are already requiring `pointer` to point to a slot in the redirect array. + let raw = unsafe { *pointer }; + // SAFETY: we are already requiring the pointer in the redirect array slot to be obtained from `Arc::into_raw`. + unsafe { Arc::decrement_strong_count(raw) }; } - fn create_vtable() -> &'static RawWakerVTable { + + /// The pointer must satisfy the safety constraints listed in the wrapping function's documentation. + unsafe fn wake(pointer: *const ()) { + // SAFETY: we are already requiring the constraints of `wake_by_ref` and `drop_waker`. + unsafe { + wake_by_ref::(pointer); + drop_waker::(pointer); + } + } + + fn create_vtable() -> &'static RawWakerVTable { &RawWakerVTable::new( - clone_waker::, - wake::, - wake::, + create_waker::, + wake::, + wake_by_ref::, drop_waker::, ) } - Waker::from_raw(clone_waker::(pointer as *const ())) + // SAFETY: All our vtable functions adhere to the RawWakerVTable contract, + // and we are already requiring that `pointer` is what our functions expect. + unsafe { Waker::from_raw(create_waker::(pointer as *const ())) } } diff --git a/src/utils/wakers/vec/waker_vec.rs b/src/utils/wakers/vec/waker_vec.rs index 2e21f3f..e4f993e 100644 --- a/src/utils/wakers/vec/waker_vec.rs +++ b/src/utils/wakers/vec/waker_vec.rs @@ -2,7 +2,7 @@ use core::task::Waker; use std::sync::{Arc, Mutex}; use super::{ - super::shared_arc::{waker_for_wake_data_slot, WakeDataContainer}, + super::shared_arc::{waker_from_redirect_position, SharedArcContent}, ReadinessVec, }; @@ -14,7 +14,7 @@ pub(crate) struct WakerVec { /// See [super::super::shared_arc] for how this works. struct WakerVecInner { - wake_data: Vec<*const Self>, + redirect: Vec<*const Self>, readiness: Mutex, } @@ -23,7 +23,7 @@ impl WakerVec { pub(crate) fn new(len: usize) -> Self { let mut inner = Arc::new(WakerVecInner { readiness: Mutex::new(ReadinessVec::new(len)), - wake_data: Vec::new(), + redirect: Vec::new(), }); let raw = Arc::into_raw(Arc::clone(&inner)); // The Arc's address. @@ -32,15 +32,15 @@ impl WakerVec { // So N Wakers -> count = N+1. unsafe { Arc::decrement_strong_count(raw) } - // Make wake_data all point to the Arc itself. - Arc::get_mut(&mut inner).unwrap().wake_data = vec![raw; len]; + // Make redirect all point to the Arc itself. + Arc::get_mut(&mut inner).unwrap().redirect = vec![raw; len]; - // Now the wake_data vec is complete. Time to create the actual Wakers. + // Now the redirect vec is complete. Time to create the actual Wakers. let wakers = inner - .wake_data + .redirect .iter() .map(|data| unsafe { - waker_for_wake_data_slot::(data as *const *const WakerVecInner) + waker_from_redirect_position::(data as *const *const WakerVecInner) }) .collect(); @@ -56,9 +56,9 @@ impl WakerVec { } } -impl WakeDataContainer for WakerVecInner { - fn get_wake_data_slice(&self) -> &[*const Self] { - &self.wake_data +impl SharedArcContent for WakerVecInner { + fn get_redirect_slice(&self) -> &[*const Self] { + &self.redirect } fn wake_index(&self, index: usize) {