From 9145a6a2c1de66f56b07d8ac952301eaea59b929 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Fournier?= Date: Tue, 14 May 2024 16:11:17 +0200 Subject: [PATCH] Add conversion to SystemTime within ReactionCtx --- src/scheduler/context.rs | 27 +++++++++++++++++++++------ src/scheduler/scheduler_impl.rs | 27 +++++++++++++++++++-------- 2 files changed, 40 insertions(+), 14 deletions(-) diff --git a/src/scheduler/context.rs b/src/scheduler/context.rs index 9851cc0d..3c23f14d 100644 --- a/src/scheduler/context.rs +++ b/src/scheduler/context.rs @@ -3,6 +3,7 @@ use std::hash::{Hash, Hasher}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::thread::JoinHandle; +use std::time::SystemTime; use crossbeam_channel::reconnectable::{Receiver, SendError, Sender}; use smallvec::SmallVec; @@ -22,8 +23,6 @@ use crate::*; // ReactionCtx may be used for multiple ReactionWaves, but // obviously at disjoint times (&mut). pub struct ReactionCtx<'a, 'x> { - pub(super) insides: RContextForwardableStuff<'x>, - /// Logical time of the execution of this wave, constant /// during the existence of the object tag: EventTag, @@ -39,16 +38,20 @@ pub struct ReactionCtx<'a, 'x> { /// Start time of the program. initial_time: Instant, + /// Start time of the program (absolute). + initial_time_system: SystemTime, // globals, also they might be copied and passed to AsyncCtx dataflow: &'x DataflowInfo, - debug_info: DebugInfoProvider<'a>, /// Whether the scheduler has been shut down. was_terminated_atomic: &'a Arc, /// In ReactionCtx, this will only be true if this is the shutdown tag. /// It duplicates [Self::was_terminated_atomic], to avoid an atomic /// operation within [Self::is_shutdown]. was_terminated: bool, + + debug_info: DebugInfoProvider<'a>, + pub(super) insides: RContextForwardableStuff<'x>, } impl<'a, 'x> ReactionCtx<'a, 'x> { @@ -80,6 +83,14 @@ impl<'a, 'x> ReactionCtx<'a, 'x> { self.tag.to_logical_time(self.get_start_time()) } + /// Convert an opaque [Instant] to a [SystemTime] instance for inspection. + /// Internally the runtime uses [Instant] as a monotonously non-decreasing clock. + /// For inspection purposes, and comparison between systems, it may be useful to + /// convert to [SystemTime]. + pub fn to_system_time(&self, instant: Instant) -> SystemTime { + self.initial_time_system + (instant - self.initial_time) + } + /// Returns the tag at which the reaction executes. /// /// Repeated invocation of this method will always produce @@ -516,6 +527,7 @@ impl<'a, 'x> ReactionCtx<'a, 'x> { rx: &'a Receiver, tag: EventTag, initial_time: Instant, + initial_time_system: SystemTime, todo: ReactionPlan<'x>, dataflow: &'x DataflowInfo, debug_info: DebugInfoProvider<'a>, @@ -529,6 +541,7 @@ impl<'a, 'x> ReactionCtx<'a, 'x> { current_reaction: None, rx, initial_time, + initial_time_system, dataflow, was_terminated_atomic, debug_info, @@ -541,18 +554,20 @@ impl<'a, 'x> ReactionCtx<'a, 'x> { #[cfg(feature = "parallel-runtime")] pub(super) fn fork(&self) -> Self { Self { - insides: Default::default(), - // all of that is common to all contexts tag: self.tag, rx: self.rx, cur_level: self.cur_level, initial_time: self.initial_time, + initial_time_system: self.initial_time_system, dataflow: self.dataflow, was_terminated: self.was_terminated, was_terminated_atomic: self.was_terminated_atomic, - debug_info: self.debug_info.clone(), current_reaction: self.current_reaction, + + // this is the context-specific part + debug_info: self.debug_info.clone(), + insides: Default::default(), } } } diff --git a/src/scheduler/scheduler_impl.rs b/src/scheduler/scheduler_impl.rs index f667ac54..c30faf0e 100644 --- a/src/scheduler/scheduler_impl.rs +++ b/src/scheduler/scheduler_impl.rs @@ -26,6 +26,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use std::time::SystemTime; use crossbeam_channel::reconnectable::*; @@ -107,8 +108,12 @@ pub struct SyncScheduler<'x> { rx: Receiver, /// Initial time of the logical system. - #[allow(unused)] // might be useful someday initial_time: Instant, + /// Initial time of the logical system, measured as [SystemTime]. + /// This is used as a reference to convert Instants to SystemTime, + /// with the assumption that [initial_time] and [initial_time_system] + /// refer to the same instant. + initial_time_system: SystemTime, /// Scheduled shutdown time. If Some, shutdown will be /// initiated at that logical time. @@ -150,17 +155,20 @@ impl<'x> SyncScheduler<'x> { // collect dependency information let dataflow_info = DataflowInfo::new(graph).map_err(|e| e.lift(&id_registry)).unwrap(); - // Using thread::scope here introduces an unnamed lifetime for - // the scope, which is captured as 't by the SyncScheduler. - // This is useful because it captures the constraint that the - // dataflow_info outlives 't, so that physical contexts - // can be spawned in threads that capture references - // to 'x. let initial_time = Instant::now(); + let initial_time_system = SystemTime::now(); + #[cfg(feature = "parallel-runtime")] let rayon_thread_pool = rayon::ThreadPoolBuilder::new().num_threads(options.threads).build().unwrap(); - let scheduler = SyncScheduler::new(options, id_registry, &dataflow_info, reactors, initial_time); + let scheduler = SyncScheduler::new( + options, + id_registry, + &dataflow_info, + reactors, + initial_time, + initial_time_system, + ); cfg_if::cfg_if! { if #[cfg(feature = "parallel-runtime")] { @@ -250,6 +258,7 @@ impl<'x> SyncScheduler<'x> { dependency_info: &'x DataflowInfo, reactors: ReactorVec<'x>, initial_time: Instant, + initial_time_system: SystemTime, ) -> Self { if !cfg!(feature = "parallel-runtime") && options.threads != 0 { warn!("'workers' runtime parameter has no effect unless feature 'parallel-runtime' is enabled") @@ -267,6 +276,7 @@ impl<'x> SyncScheduler<'x> { reactors, initial_time, + initial_time_system, latest_processed_tag: None, shutdown_time: options.timeout.map(|timeout| { let shutdown_tag = EventTag::ORIGIN.successor(timeout); @@ -387,6 +397,7 @@ impl<'x> SyncScheduler<'x> { rx, tag, self.initial_time, + self.initial_time_system, todo, self.dataflow, debug_info,