From 9fad3995e9039dc9c619513a81dd7fe760e14882 Mon Sep 17 00:00:00 2001 From: Tobias Shapinsky Date: Mon, 3 Feb 2025 13:36:59 -0700 Subject: [PATCH] clean up StepRunProcess and begin cleaning openstudio StepRun --- alfalfa_worker/jobs/openstudio/step_run.py | 28 +++++++-- alfalfa_worker/jobs/step_run_process.py | 68 ++++++++++++++++------ 2 files changed, 72 insertions(+), 24 deletions(-) diff --git a/alfalfa_worker/jobs/openstudio/step_run.py b/alfalfa_worker/jobs/openstudio/step_run.py index eef18310..ef4aa9ef 100644 --- a/alfalfa_worker/jobs/openstudio/step_run.py +++ b/alfalfa_worker/jobs/openstudio/step_run.py @@ -22,7 +22,7 @@ def wrapped_func(self: "StepRun", state): try: return func(self, state) except Exception: - self.catch_exception() + self.report_exception() return wrapped_func @@ -55,7 +55,7 @@ def __init__(self, run_id, realtime, timescale, external_clock, start_datetime, self.additional_points: list[AlfalfaPoint] = [] - def start_simulation_process(self): + def simulation_process_entrypoint(self): """ Initialize and start EnergyPlus co-simulation. @@ -85,19 +85,24 @@ def start_simulation_process(self): raise JobExceptionExternalProcess(f"EnergyPlus Exited with a non-zero exit code: {return_code}") def callback_message(self, message: bytes) -> None: + """Callback for when energyplus records a messaage to the log""" try: self.logger.info(message.decode()) except Exception: - self.catch_exception() + self.report_exception() def callback_error(self, state, message: bytes) -> None: + """Callback for when energyplus records an error to the log. + These 'Errors' include warnings and non-critical errors.""" try: self.logger.error(message.decode()) except Exception: - self.catch_exception() + self.report_exception() @callback_wrapper def initialize_handles(self, state): + """Callback called at begin_new_environment. Enumerates Alfalfa points to connect + them with handles that can be used to transact data with energyplus.""" exceptions = [] def get_handle(type, parameters): @@ -154,24 +159,35 @@ def add_additional_meter(fuel: str, units: str, converter: Callable[[float], flo @callback_wrapper def ep_begin_timestep(self, state): + """Callback called at end_zone_timestep_after_zone_reporting. This is responsible for + controlling simulation advancement, as well as """ + + # If simulation is not 'Running' yet and energyplus is still warming up, short circuit method and return if not self.running_event.is_set(): warmup = self.ep_api.exchange.warmup_flag(state) kind_of_sim = self.ep_api.exchange.kind_of_sim(state) if warmup or kind_of_sim == 1: return + + # If execution makes it here it means the simulation has left warmup and needs to be readied for regular running self.update_run_time() self.running_event.set() + # Update outputs from simulation self.ep_read_outputs() self.update_run_time() + # Wait for event from main process self.advance_event.clear() while not self.advance_event.is_set() and not self.stop_event.is_set(): self.advance_event.wait(1) + # Handle stop event if self.stop_event.is_set(): self.logger.info("Stop Event Set, stopping simulation") self.ep_api.runtime.stop_simulation(state) + + # Write inputs to energyplus self.ep_write_inputs() def get_sim_time(self) -> datetime: @@ -322,8 +338,8 @@ def prepare_idf(self): workspace.save(idf_path, True) - def catch_exception(self) -> None: - super().catch_exception(self.read_error_logs()) + def report_exception(self) -> None: + super().report_exception(self.read_error_logs()) if self.ep_state is not None: self.ep_api.runtime.stop_simulation(self.ep_state) diff --git a/alfalfa_worker/jobs/step_run_process.py b/alfalfa_worker/jobs/step_run_process.py index 92fbcfb9..22ed219b 100644 --- a/alfalfa_worker/jobs/step_run_process.py +++ b/alfalfa_worker/jobs/step_run_process.py @@ -15,55 +15,85 @@ class StepRunProcess(StepRunBase): + """Extension of StepRunBase with added functionality to handle cases where the simulation being run wants to be in control of the + process instead of the other way round.""" def __init__(self, run_id: str, realtime: bool, timescale: int, external_clock: bool, start_datetime: str, end_datetime: str, **kwargs) -> None: super().__init__(run_id, realtime, timescale, external_clock, start_datetime, end_datetime) + + # Create manager to handle communication between main process and simulation process self.manager = Manager() + # advance_event: set by main process to signal simulation process to advance. + # Cleared by simulation process immediately before waiting for a new advance event. self.advance_event = self.manager.Event() - self.running_event = self.manager.Event() + + # stop_event: set by main process to signal simulation process to stop self.stop_event = self.manager.Event() + + # running_event: is set by simulation process when the simulation moves out of the warmup stage + self.running_event = self.manager.Event() + + # error_event: set by simulation process to signal that an error has occurred self.error_event = self.manager.Event() + # error_log: contains string serialized errors self.error_log = self.manager.Value(c_wchar_p, '') - self.simulation_process: Process - self.subprocess: bool = False + + # timestamp: string serialized sim_time, set by simulation process after advancing self.timestamp = self.manager.Value(c_wchar_p, '') + # simulation_process: object to contain simulation process + self.simulation_process: Process + + # in_process: whether the current context is within simulation process or not + self.in_subprocess = False + def initialize_simulation(self) -> None: + """Starts simulation process. Waits for running event to be set. And then records updated time.""" self.simulation_process = Process(target=StepRunProcess._start_simulation_process, args=(self,)) self.simulation_process.start() self._wait_for_event(self.running_event, self.options.start_timeout, desired_event_set=True) self.update_run_time() - def set_run_time(self, sim_time: datetime): - if self.subprocess: + def set_run_time(self, sim_time: datetime) -> None: + """In simulation process the sim_time is saved to the shared timestamp. + In main process this calls the default implementation.""" + if self.in_subprocess: self.timestamp.value = sim_time.strftime(DATETIME_FORMAT) else: return super().set_run_time(sim_time) def update_run_time(self) -> None: - if self.subprocess: + """In simulation process calls default implementation. + In main process sets run_time to the serialized timestamp from the simulation process.""" + if self.in_subprocess: super().update_run_time() else: self.set_run_time(datetime.strptime(self.timestamp.value, DATETIME_FORMAT)) def _start_simulation_process(self) -> None: - self.subprocess = True try: - return self.start_simulation_process() + self.in_subprocess = True + return self.simulation_process_entrypoint() except Exception: - self.catch_exception() + self.report_exception() - def start_simulation_process(self) -> None: + def simulation_process_entrypoint(self) -> None: + """Placeholder for spinning up the simulation""" raise NotImplementedError def handle_process_error(self) -> None: + """This method is called in the main process when an error has been detected in the simulation process. + It kills the simulation process if it is still alive and raises an exception with the contents of the process + error log.""" if self.simulation_process.is_alive(): self.simulation_process.kill() raise JobExceptionExternalProcess(self.error_log.value) - def catch_exception(self, notes: list[str]) -> None: - if self.subprocess: + def report_exception(self, notes: list[str]) -> None: + """This method should be called by the subclass, when an exception has occurred + within the simulation process, to properly record the error log and signal that an error has occurred.""" + if self.in_subprocess: exception_log = exc_to_str() self.error_log.value = exception_log if len(notes) > 0: @@ -73,12 +103,15 @@ def catch_exception(self, notes: list[str]) -> None: def check_simulation_stop_conditions(self) -> bool: return not self.simulation_process.is_alive() - def check_for_errors(self): + def check_for_errors(self) -> None: + """Checks for errors with the simulation_process and raises an exception if any are detected. + This method should be overridden to add additional checks specific to a given process.""" exit_code = self.simulation_process.exitcode if exit_code: raise JobExceptionExternalProcess(f"Simulation process exited with non-zero exit code: {exit_code}") - def _wait_for_event(self, event: threading.Event, timeout: float, desired_event_set: bool = False): + def _wait_for_event(self, event: threading.Event, timeout: float, desired_event_set: bool = False) -> None: + """Wait for a given event to go be set or cleared within a given amount of time""" wait_until = time() + timeout while (event.is_set() != desired_event_set and time() < wait_until @@ -107,15 +140,14 @@ def advance(self) -> None: @message def stop(self): - self.logger.info("Stop called, stopping") if not self.stop_event.is_set(): - stop_start = time() + stop_start_time = time() self.stop_event.set() while (self.simulation_process.is_alive() - and time() - stop_start < self.options.stop_timeout + and time() - stop_start_time < self.options.stop_timeout and not self.error_event.is_set()): pass - if time() - stop_start > self.options.stop_timeout and self.simulation_process.is_alive(): + if time() - stop_start_time > self.options.stop_timeout and self.simulation_process.is_alive(): self.simulation_process.kill() raise JobExceptionExternalProcess("Simulation process stopped responding and was killed.") if self.error_event.is_set():