From bebc0d36d44fb955ac308a383b16cd3ef9b7afab Mon Sep 17 00:00:00 2001 From: Leonora Tindall Date: Fri, 6 Mar 2020 21:54:37 -0500 Subject: [PATCH 1/3] meta: add rustfmt.toml --- rustfmt.toml | 1 + 1 file changed, 1 insertion(+) create mode 100644 rustfmt.toml diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..d4d3d50 --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1 @@ +version = "Two" \ No newline at end of file From 153f80995d1aa3dc29761fad6c4cd4910d767327 Mon Sep 17 00:00:00 2001 From: Leonora Tindall Date: Sun, 8 Mar 2020 20:50:55 -0500 Subject: [PATCH 2/3] wake_on: Basic WakeOnWrite with simple test --- Cargo.toml | 3 ++ src/lib.rs | 2 + src/wake_on.rs | 117 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 122 insertions(+) create mode 100644 src/wake_on.rs diff --git a/Cargo.toml b/Cargo.toml index 9bbc5eb..18fe0ff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,5 +16,8 @@ authors = [ [features] [dependencies] +async-std = "1" [dev-dependencies] +async-std = { version = "1", features = ["attributes", "unstable"] } +pin-utils = "0.1.0-alpha.4" \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 552b339..fdaf4d0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,3 +9,5 @@ #![forbid(unsafe_code, future_incompatible, rust_2018_idioms)] #![deny(missing_debug_implementations, nonstandard_style)] #![warn(missing_docs, missing_doc_code_examples, unreachable_pub)] + +pub mod wake_on; diff --git a/src/wake_on.rs b/src/wake_on.rs new file mode 100644 index 0000000..38cbb98 --- /dev/null +++ b/src/wake_on.rs @@ -0,0 +1,117 @@ +//! Smart pointers to wake tasks on access +use async_std::task::Waker; +use std::ops::{Deref, DerefMut}; + +/// A wrapper type which wakes tasks whenever the wrapped value is accessed +/// through an `&mut` reference. +/// +/// `T` is the type of the value being wrapped. This struct is `Deref` and +/// `DerefMut` for that type, giving `&T` and `&mut T` respectively. +/// When a `Waker` is registered with `set_waker`, that `Waker` is woken +/// whenever the wrapped value is accessed through an `&mut` reference +/// and therefore potentially mutated. +/// +/// This is useful when there is a future polling the state of the wrapped +/// value. It needs to be awoken whenever that value changes so that they +/// can check whether or not its value is in a state that will let the +/// future make progress. That future can register the `Waker` from the +/// `Context` it is passed with the `WakeOnWrite` wrapping the value it is +/// interested in so that all mutations cause it to be woken going forward. +/// +/// This type isn't effective for observing changes on values with interior +/// mutablity, because it only wakes on `&mut` access. +#[derive(Default, Debug, Clone)] +pub struct WakeOnWrite { + inner: T, + waker: Option, +} + +impl WakeOnWrite { + /// Create a new `WakeOnWrite` with the given value. + pub fn new(value: T) -> Self { + Self { + inner: value, + waker: None, + } + } + + /// Set the `Waker` to be awoken when this value is mutated. + /// + /// Returns the currently registered `Waker`, if there is one. + pub fn set_waker(wow: &mut Self, waker: Waker) -> Option { + wow.waker.replace(waker) + } + + /// Removes and returns the currently registered `Waker`, if there is one. + pub fn take_waker(wow: &mut Self) -> Option { + wow.waker.take() + } + + /// Returns the currently registered `Waker`, leaving it registered, if + /// there is one. + pub fn waker(wow: &Self) -> Option<&Waker> { + wow.waker.as_ref() + } +} + +impl Deref for WakeOnWrite { + type Target = T; + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl DerefMut for WakeOnWrite { + fn deref_mut(&mut self) -> &mut Self::Target { + self.waker.as_ref().map(|w| w.wake_by_ref()); + &mut self.inner + } +} + +#[async_std::test] +async fn wow_wakes_target_on_mut_access() { + use async_std::future::poll_fn; + use async_std::prelude::*; + use async_std::sync::Arc; + use async_std::sync::Mutex; + use async_std::task::Poll; + use pin_utils::pin_mut; + use std::future::Future; + + let data: Arc>> = Default::default(); + let data_checker = { + let data_ref = data.clone(); + poll_fn(move |ctx| { + // This is an inefficient use of futures, but it does work in this + // case. + let data_lock_future = data_ref.lock(); + pin_mut!(data_lock_future); + match data_lock_future.poll(ctx) { + Poll::Ready(mut lock) => match **lock { + 10 => Poll::Ready(()), + _ => { + WakeOnWrite::set_waker(&mut lock, ctx.waker().clone()); + Poll::Pending + } + }, + Poll::Pending => Poll::Pending, + } + }) + }; + + let data_incrementor = { + let data_ref = data.clone(); + async move { + for _ in 0..10u8 { + let mut lock = data_ref.lock().await; + **lock += 1; + } + } + }; + + data_checker + .join(data_incrementor) + .timeout(core::time::Duration::new(1, 0)) + .await + .unwrap(); +} From bcb6eeb1166fb9f0ea791ed6ed4ec570cf882a36 Mon Sep 17 00:00:00 2001 From: Leonora Tindall Date: Mon, 16 Mar 2020 08:32:44 -0500 Subject: [PATCH 3/3] wake_on: add wake() assoc. func. to manually wake by suggestion of @spacekookie --- src/wake_on.rs | 63 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/src/wake_on.rs b/src/wake_on.rs index 38cbb98..1fc41e7 100644 --- a/src/wake_on.rs +++ b/src/wake_on.rs @@ -52,6 +52,12 @@ impl WakeOnWrite { pub fn waker(wow: &Self) -> Option<&Waker> { wow.waker.as_ref() } + + /// Wakes the currently registered `Waker`, if there is one, returning + /// `Some(())` if it was woken and `None` otherwise. + pub fn wake(wow: &Self) -> Option<()> { + wow.waker.as_ref().map(|w| w.wake_by_ref()) + } } impl Deref for WakeOnWrite { @@ -115,3 +121,60 @@ async fn wow_wakes_target_on_mut_access() { .await .unwrap(); } + +#[async_std::test] +async fn wow_wakes_target_on_explicit_wake() { + use async_std::future::poll_fn; + use async_std::prelude::*; + use async_std::sync::Arc; + use async_std::sync::Mutex; + use async_std::task::Poll; + use pin_utils::pin_mut; + use std::future::Future; + + let data: Arc>> = Default::default(); + let data_checker = { + let data_ref = data.clone(); + poll_fn(move |ctx| { + // This is an inefficient use of futures, but it does work in this + // case. + let data_lock_future = data_ref.lock(); + pin_mut!(data_lock_future); + match data_lock_future.poll(ctx) { + Poll::Ready(mut lock) => match **lock { + 10 => Poll::Ready(()), + _ => { + WakeOnWrite::set_waker(&mut lock, ctx.waker().clone()); + Poll::Pending + } + }, + Poll::Pending => Poll::Pending, + } + }) + }; + + let data_incrementor = { + let data_ref = data.clone(); + async move { + for _ in 0..10u8 { + let mut lock = data_ref.lock().await; + // Remove the Waker before mutable access, then put it back, so + // we have to wake the task manually. + let waker = WakeOnWrite::take_waker(&mut *lock); + **lock += 1; + // If there was a Waker, put it back. + if let Some(w) = waker { + WakeOnWrite::set_waker(&mut *lock, w); + } + // Now, manually wake the Waker. + WakeOnWrite::wake(&*lock); + } + } + }; + + data_checker + .join(data_incrementor) + .timeout(core::time::Duration::new(1, 0)) + .await + .unwrap(); +}