Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(runtime): register timer on first poll #354

Merged
merged 3 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions compio-runtime/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,13 +287,8 @@ impl Runtime {
}

#[cfg(feature = "time")]
pub(crate) fn create_timer(&self, delay: std::time::Duration) -> impl Future<Output = ()> {
let mut timer_runtime = self.timer_runtime.borrow_mut();
if let Some(key) = timer_runtime.insert(delay) {
Either::Left(TimerFuture::new(key))
} else {
Either::Right(std::future::ready(()))
}
pub(crate) fn create_timer(&self, instant: std::time::Instant) -> impl Future<Output = ()> {
TimerFuture::new(instant)
}

pub(crate) fn cancel_op<T: OpCode>(&self, op: Key<T>) {
Expand All @@ -318,6 +313,21 @@ impl Runtime {
})
}

#[cfg(feature = "time")]
pub(crate) fn register_timer(
&self,
cx: &mut Context,
instant: std::time::Instant,
) -> Option<usize> {
let mut timer_runtime = self.timer_runtime.borrow_mut();
if let Some(key) = timer_runtime.insert(instant) {
timer_runtime.update_waker(key, cx.waker().clone());
Some(key)
} else {
None
}
}

#[cfg(feature = "time")]
pub(crate) fn poll_timer(&self, cx: &mut Context, key: usize) -> Poll<()> {
instrument!(compio_log::Level::DEBUG, "poll_timer", ?cx, ?key);
Expand Down Expand Up @@ -496,6 +506,9 @@ pub fn spawn_blocking<T: Send + 'static>(

/// Submit an operation to the current runtime, and return a future for it.
///
/// It is safe but unspecified behavior to send the returned future to another
/// runtime and poll it.
///
/// ## Panics
///
/// This method doesn't create runtime. It tries to obtain the current runtime
Expand All @@ -507,6 +520,9 @@ pub fn submit<T: OpCode + 'static>(op: T) -> impl Future<Output = BufResult<usiz
/// Submit an operation to the current runtime, and return a future for it with
/// flags.
///
/// It is safe but unspecified behavior to send the returned future to another
/// runtime and poll it.
///
/// ## Panics
///
/// This method doesn't create runtime. It tries to obtain the current runtime
Expand Down
38 changes: 26 additions & 12 deletions compio-runtime/src/runtime/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
time::{Duration, Instant},
};

use futures_util::future::Either;
use slab::Slab;

use crate::runtime::Runtime;
Expand Down Expand Up @@ -70,13 +71,12 @@ impl TimerRuntime {
.unwrap_or_default()
}

pub fn insert(&mut self, mut delay: Duration) -> Option<usize> {
if delay.is_zero() {
pub fn insert(&mut self, instant: Instant) -> Option<usize> {
let delay = instant - self.time;
if delay <= self.time.elapsed() {
return None;
}
let elapsed = self.time.elapsed();
let key = self.tasks.insert(FutureState::Active(None));
delay += elapsed;
let entry = TimerEntry { key, delay };
self.wheel.push(Reverse(entry));
Some(key)
Expand Down Expand Up @@ -125,26 +125,39 @@ impl TimerRuntime {
}

pub struct TimerFuture {
key: usize,
key: Either<Instant, usize>,
}

impl TimerFuture {
pub fn new(key: usize) -> Self {
Self { key }
pub fn new(instant: Instant) -> Self {
Self {
key: Either::Left(instant),
}
}
}

impl Future for TimerFuture {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Runtime::with_current(|r| r.poll_timer(cx, self.key))
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Runtime::with_current(|r| match self.key {
Either::Left(instant) => match r.register_timer(cx, instant) {
Some(key) => {
self.key = Either::Right(key);
Poll::Pending
}
None => Poll::Ready(()),
},
Either::Right(key) => r.poll_timer(cx, key),
})
}
}

impl Drop for TimerFuture {
fn drop(&mut self) {
Runtime::with_current(|r| r.cancel_timer(self.key));
if let Either::Right(key) = self.key {
Runtime::with_current(|r| r.cancel_timer(key));
}
}
}

Expand All @@ -153,8 +166,9 @@ fn timer_min_timeout() {
let mut runtime = TimerRuntime::new();
assert_eq!(runtime.min_timeout(), None);

runtime.insert(Duration::from_secs(1));
runtime.insert(Duration::from_secs(10));
let now = Instant::now();
runtime.insert(now + Duration::from_secs(1));
runtime.insert(now + Duration::from_secs(10));
let min_timeout = runtime.min_timeout().unwrap().as_secs_f32();

assert!(min_timeout < 1.);
Expand Down
4 changes: 2 additions & 2 deletions compio-runtime/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::Runtime;
/// # })
/// ```
pub async fn sleep(duration: Duration) {
Runtime::with_current(|r| r.create_timer(duration)).await
sleep_until(Instant::now() + duration).await
}

/// Waits until `deadline` is reached.
Expand All @@ -55,7 +55,7 @@ pub async fn sleep(duration: Duration) {
/// # })
/// ```
pub async fn sleep_until(deadline: Instant) {
sleep(deadline - Instant::now()).await
Runtime::with_current(|r| r.create_timer(deadline)).await
}

/// Error returned by [`timeout`] or [`timeout_at`].
Expand Down
Loading