Skip to content

Commit

Permalink
Add conversion to SystemTime within ReactionCtx
Browse files Browse the repository at this point in the history
  • Loading branch information
oowekyala committed May 14, 2024
1 parent 10fee74 commit 9145a6a
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 14 deletions.
27 changes: 21 additions & 6 deletions src/scheduler/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::hash::{Hash, Hasher};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::JoinHandle;
use std::time::SystemTime;

use crossbeam_channel::reconnectable::{Receiver, SendError, Sender};
use smallvec::SmallVec;
Expand All @@ -22,8 +23,6 @@ use crate::*;
// ReactionCtx may be used for multiple ReactionWaves, but
// obviously at disjoint times (&mut).
pub struct ReactionCtx<'a, 'x> {
pub(super) insides: RContextForwardableStuff<'x>,

/// Logical time of the execution of this wave, constant
/// during the existence of the object
tag: EventTag,
Expand All @@ -39,16 +38,20 @@ pub struct ReactionCtx<'a, 'x> {

/// Start time of the program.
initial_time: Instant,
/// Start time of the program (absolute).
initial_time_system: SystemTime,

// globals, also they might be copied and passed to AsyncCtx
dataflow: &'x DataflowInfo,
debug_info: DebugInfoProvider<'a>,
/// Whether the scheduler has been shut down.
was_terminated_atomic: &'a Arc<AtomicBool>,
/// In ReactionCtx, this will only be true if this is the shutdown tag.
/// It duplicates [Self::was_terminated_atomic], to avoid an atomic
/// operation within [Self::is_shutdown].
was_terminated: bool,

debug_info: DebugInfoProvider<'a>,
pub(super) insides: RContextForwardableStuff<'x>,
}

impl<'a, 'x> ReactionCtx<'a, 'x> {
Expand Down Expand Up @@ -80,6 +83,14 @@ impl<'a, 'x> ReactionCtx<'a, 'x> {
self.tag.to_logical_time(self.get_start_time())
}

/// Convert an opaque [Instant] to a [SystemTime] instance for inspection.
/// Internally the runtime uses [Instant] as a monotonously non-decreasing clock.
/// For inspection purposes, and comparison between systems, it may be useful to
/// convert to [SystemTime].
pub fn to_system_time(&self, instant: Instant) -> SystemTime {
self.initial_time_system + (instant - self.initial_time)
}

/// Returns the tag at which the reaction executes.
///
/// Repeated invocation of this method will always produce
Expand Down Expand Up @@ -516,6 +527,7 @@ impl<'a, 'x> ReactionCtx<'a, 'x> {
rx: &'a Receiver<PhysicalEvent>,
tag: EventTag,
initial_time: Instant,
initial_time_system: SystemTime,
todo: ReactionPlan<'x>,
dataflow: &'x DataflowInfo,
debug_info: DebugInfoProvider<'a>,
Expand All @@ -529,6 +541,7 @@ impl<'a, 'x> ReactionCtx<'a, 'x> {
current_reaction: None,
rx,
initial_time,
initial_time_system,
dataflow,
was_terminated_atomic,
debug_info,
Expand All @@ -541,18 +554,20 @@ impl<'a, 'x> ReactionCtx<'a, 'x> {
#[cfg(feature = "parallel-runtime")]
pub(super) fn fork(&self) -> Self {
Self {
insides: Default::default(),

// all of that is common to all contexts
tag: self.tag,
rx: self.rx,
cur_level: self.cur_level,
initial_time: self.initial_time,
initial_time_system: self.initial_time_system,
dataflow: self.dataflow,
was_terminated: self.was_terminated,
was_terminated_atomic: self.was_terminated_atomic,
debug_info: self.debug_info.clone(),
current_reaction: self.current_reaction,

// this is the context-specific part
debug_info: self.debug_info.clone(),
insides: Default::default(),
}
}
}
Expand Down
27 changes: 19 additions & 8 deletions src/scheduler/scheduler_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::SystemTime;

use crossbeam_channel::reconnectable::*;

Expand Down Expand Up @@ -107,8 +108,12 @@ pub struct SyncScheduler<'x> {
rx: Receiver<PhysicalEvent>,

/// Initial time of the logical system.
#[allow(unused)] // might be useful someday
initial_time: Instant,
/// Initial time of the logical system, measured as [SystemTime].
/// This is used as a reference to convert Instants to SystemTime,
/// with the assumption that [initial_time] and [initial_time_system]
/// refer to the same instant.
initial_time_system: SystemTime,

/// Scheduled shutdown time. If Some, shutdown will be
/// initiated at that logical time.
Expand Down Expand Up @@ -150,17 +155,20 @@ impl<'x> SyncScheduler<'x> {
// collect dependency information
let dataflow_info = DataflowInfo::new(graph).map_err(|e| e.lift(&id_registry)).unwrap();

// Using thread::scope here introduces an unnamed lifetime for
// the scope, which is captured as 't by the SyncScheduler.
// This is useful because it captures the constraint that the
// dataflow_info outlives 't, so that physical contexts
// can be spawned in threads that capture references
// to 'x.
let initial_time = Instant::now();
let initial_time_system = SystemTime::now();

#[cfg(feature = "parallel-runtime")]
let rayon_thread_pool = rayon::ThreadPoolBuilder::new().num_threads(options.threads).build().unwrap();

let scheduler = SyncScheduler::new(options, id_registry, &dataflow_info, reactors, initial_time);
let scheduler = SyncScheduler::new(
options,
id_registry,
&dataflow_info,
reactors,
initial_time,
initial_time_system,
);

cfg_if::cfg_if! {
if #[cfg(feature = "parallel-runtime")] {
Expand Down Expand Up @@ -250,6 +258,7 @@ impl<'x> SyncScheduler<'x> {
dependency_info: &'x DataflowInfo,
reactors: ReactorVec<'x>,
initial_time: Instant,
initial_time_system: SystemTime,
) -> Self {
if !cfg!(feature = "parallel-runtime") && options.threads != 0 {
warn!("'workers' runtime parameter has no effect unless feature 'parallel-runtime' is enabled")
Expand All @@ -267,6 +276,7 @@ impl<'x> SyncScheduler<'x> {
reactors,

initial_time,
initial_time_system,
latest_processed_tag: None,
shutdown_time: options.timeout.map(|timeout| {
let shutdown_tag = EventTag::ORIGIN.successor(timeout);
Expand Down Expand Up @@ -387,6 +397,7 @@ impl<'x> SyncScheduler<'x> {
rx,
tag,
self.initial_time,
self.initial_time_system,
todo,
self.dataflow,
debug_info,
Expand Down

0 comments on commit 9145a6a

Please sign in to comment.