Skip to content

Commit

Permalink
clean up StepRunProcess and begin cleaning openstudio StepRun
Browse files Browse the repository at this point in the history
  • Loading branch information
TShapinsky committed Feb 3, 2025
1 parent 415aebc commit 9fad399
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 24 deletions.
28 changes: 22 additions & 6 deletions alfalfa_worker/jobs/openstudio/step_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def wrapped_func(self: "StepRun", state):
try:
return func(self, state)
except Exception:
self.catch_exception()
self.report_exception()
return wrapped_func


Expand Down Expand Up @@ -55,7 +55,7 @@ def __init__(self, run_id, realtime, timescale, external_clock, start_datetime,

self.additional_points: list[AlfalfaPoint] = []

def start_simulation_process(self):
def simulation_process_entrypoint(self):
"""
Initialize and start EnergyPlus co-simulation.
Expand Down Expand Up @@ -85,19 +85,24 @@ def start_simulation_process(self):
raise JobExceptionExternalProcess(f"EnergyPlus Exited with a non-zero exit code: {return_code}")

def callback_message(self, message: bytes) -> None:
"""Callback for when energyplus records a messaage to the log"""
try:
self.logger.info(message.decode())
except Exception:
self.catch_exception()
self.report_exception()

def callback_error(self, state, message: bytes) -> None:
"""Callback for when energyplus records an error to the log.
These 'Errors' include warnings and non-critical errors."""
try:
self.logger.error(message.decode())
except Exception:
self.catch_exception()
self.report_exception()

@callback_wrapper
def initialize_handles(self, state):
"""Callback called at begin_new_environment. Enumerates Alfalfa points to connect
them with handles that can be used to transact data with energyplus."""
exceptions = []

def get_handle(type, parameters):
Expand Down Expand Up @@ -154,24 +159,35 @@ def add_additional_meter(fuel: str, units: str, converter: Callable[[float], flo

@callback_wrapper
def ep_begin_timestep(self, state):
"""Callback called at end_zone_timestep_after_zone_reporting. This is responsible for
controlling simulation advancement, as well as """

# If simulation is not 'Running' yet and energyplus is still warming up, short circuit method and return
if not self.running_event.is_set():
warmup = self.ep_api.exchange.warmup_flag(state)
kind_of_sim = self.ep_api.exchange.kind_of_sim(state)
if warmup or kind_of_sim == 1:
return

# If execution makes it here it means the simulation has left warmup and needs to be readied for regular running
self.update_run_time()
self.running_event.set()

# Update outputs from simulation
self.ep_read_outputs()
self.update_run_time()

# Wait for event from main process
self.advance_event.clear()
while not self.advance_event.is_set() and not self.stop_event.is_set():
self.advance_event.wait(1)

# Handle stop event
if self.stop_event.is_set():
self.logger.info("Stop Event Set, stopping simulation")
self.ep_api.runtime.stop_simulation(state)

# Write inputs to energyplus
self.ep_write_inputs()

def get_sim_time(self) -> datetime:
Expand Down Expand Up @@ -322,8 +338,8 @@ def prepare_idf(self):

workspace.save(idf_path, True)

def catch_exception(self) -> None:
super().catch_exception(self.read_error_logs())
def report_exception(self) -> None:
super().report_exception(self.read_error_logs())
if self.ep_state is not None:
self.ep_api.runtime.stop_simulation(self.ep_state)

Expand Down
68 changes: 50 additions & 18 deletions alfalfa_worker/jobs/step_run_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,55 +15,85 @@


class StepRunProcess(StepRunBase):
"""Extension of StepRunBase with added functionality to handle cases where the simulation being run wants to be in control of the
process instead of the other way round."""

def __init__(self, run_id: str, realtime: bool, timescale: int, external_clock: bool, start_datetime: str, end_datetime: str, **kwargs) -> None:
super().__init__(run_id, realtime, timescale, external_clock, start_datetime, end_datetime)

# Create manager to handle communication between main process and simulation process
self.manager = Manager()
# advance_event: set by main process to signal simulation process to advance.
# Cleared by simulation process immediately before waiting for a new advance event.
self.advance_event = self.manager.Event()
self.running_event = self.manager.Event()

# stop_event: set by main process to signal simulation process to stop
self.stop_event = self.manager.Event()

# running_event: is set by simulation process when the simulation moves out of the warmup stage
self.running_event = self.manager.Event()

# error_event: set by simulation process to signal that an error has occurred
self.error_event = self.manager.Event()
# error_log: contains string serialized errors
self.error_log = self.manager.Value(c_wchar_p, '')
self.simulation_process: Process
self.subprocess: bool = False

# timestamp: string serialized sim_time, set by simulation process after advancing
self.timestamp = self.manager.Value(c_wchar_p, '')

# simulation_process: object to contain simulation process
self.simulation_process: Process

# in_process: whether the current context is within simulation process or not
self.in_subprocess = False

def initialize_simulation(self) -> None:
"""Starts simulation process. Waits for running event to be set. And then records updated time."""
self.simulation_process = Process(target=StepRunProcess._start_simulation_process, args=(self,))
self.simulation_process.start()

self._wait_for_event(self.running_event, self.options.start_timeout, desired_event_set=True)
self.update_run_time()

def set_run_time(self, sim_time: datetime):
if self.subprocess:
def set_run_time(self, sim_time: datetime) -> None:
"""In simulation process the sim_time is saved to the shared timestamp.
In main process this calls the default implementation."""
if self.in_subprocess:
self.timestamp.value = sim_time.strftime(DATETIME_FORMAT)
else:
return super().set_run_time(sim_time)

def update_run_time(self) -> None:
if self.subprocess:
"""In simulation process calls default implementation.
In main process sets run_time to the serialized timestamp from the simulation process."""
if self.in_subprocess:
super().update_run_time()
else:
self.set_run_time(datetime.strptime(self.timestamp.value, DATETIME_FORMAT))

def _start_simulation_process(self) -> None:
self.subprocess = True
try:
return self.start_simulation_process()
self.in_subprocess = True
return self.simulation_process_entrypoint()
except Exception:
self.catch_exception()
self.report_exception()

def start_simulation_process(self) -> None:
def simulation_process_entrypoint(self) -> None:
"""Placeholder for spinning up the simulation"""
raise NotImplementedError

def handle_process_error(self) -> None:
"""This method is called in the main process when an error has been detected in the simulation process.
It kills the simulation process if it is still alive and raises an exception with the contents of the process
error log."""
if self.simulation_process.is_alive():
self.simulation_process.kill()
raise JobExceptionExternalProcess(self.error_log.value)

def catch_exception(self, notes: list[str]) -> None:
if self.subprocess:
def report_exception(self, notes: list[str]) -> None:
"""This method should be called by the subclass, when an exception has occurred
within the simulation process, to properly record the error log and signal that an error has occurred."""
if self.in_subprocess:
exception_log = exc_to_str()
self.error_log.value = exception_log
if len(notes) > 0:
Expand All @@ -73,12 +103,15 @@ def catch_exception(self, notes: list[str]) -> None:
def check_simulation_stop_conditions(self) -> bool:
return not self.simulation_process.is_alive()

def check_for_errors(self):
def check_for_errors(self) -> None:
"""Checks for errors with the simulation_process and raises an exception if any are detected.
This method should be overridden to add additional checks specific to a given process."""
exit_code = self.simulation_process.exitcode
if exit_code:
raise JobExceptionExternalProcess(f"Simulation process exited with non-zero exit code: {exit_code}")

def _wait_for_event(self, event: threading.Event, timeout: float, desired_event_set: bool = False):
def _wait_for_event(self, event: threading.Event, timeout: float, desired_event_set: bool = False) -> None:
"""Wait for a given event to go be set or cleared within a given amount of time"""
wait_until = time() + timeout
while (event.is_set() != desired_event_set
and time() < wait_until
Expand Down Expand Up @@ -107,15 +140,14 @@ def advance(self) -> None:

@message
def stop(self):
self.logger.info("Stop called, stopping")
if not self.stop_event.is_set():
stop_start = time()
stop_start_time = time()
self.stop_event.set()
while (self.simulation_process.is_alive()
and time() - stop_start < self.options.stop_timeout
and time() - stop_start_time < self.options.stop_timeout
and not self.error_event.is_set()):
pass
if time() - stop_start > self.options.stop_timeout and self.simulation_process.is_alive():
if time() - stop_start_time > self.options.stop_timeout and self.simulation_process.is_alive():
self.simulation_process.kill()
raise JobExceptionExternalProcess("Simulation process stopped responding and was killed.")
if self.error_event.is_set():
Expand Down

0 comments on commit 9fad399

Please sign in to comment.