Skip to content

Commit

Permalink
Cleanup and document StepRunBase
Browse files Browse the repository at this point in the history
  • Loading branch information
TShapinsky committed Feb 3, 2025
1 parent 8637193 commit 415aebc
Show file tree
Hide file tree
Showing 5 changed files with 542 additions and 249 deletions.
124 changes: 69 additions & 55 deletions alfalfa_worker/jobs/step_run_base.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import logging
import os
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import auto

from influxdb import InfluxDBClient

from alfalfa_worker.lib.alfalfa_connections_manager import (
AlafalfaConnectionsManager
)
from alfalfa_worker.lib.constants import DATETIME_FORMAT
from alfalfa_worker.lib.enums import AutoName, RunStatus
from alfalfa_worker.lib.job import Job, message
Expand Down Expand Up @@ -39,48 +41,66 @@ class Options:
# Should points be logged to InfluxDB
historian_enabled: bool = os.environ.get('HISTORIAN_ENABLE', False) == 'true'

# Timeouts
# These are mostly used for the StepRunProcess class to clean up the subprocess
# when it hangs.

# How long can an advance call run before something bad is assumed to have happened.
advance_timeout: int = 45
# How long can it take a simulation to start before something bad is assumed to have happened.
start_timeout: int = 300
# Same as previous timeouts, but related to stopping
stop_timeout: int = 45

# How many timesteps can a timescale run lag behind before being stopped
timescale_lag_limit: int = 2

def __init__(self, realtime: bool, timescale: int, external_clock: bool, start_datetime: str, end_datetime: str):
print(f"external_clock: {external_clock}")
self.logger = logging.getLogger(self.__class__.__name__)

if external_clock:
self.clock_source = ClockSource.EXTERNAL
else:
print(f"setting source to: {ClockSource.INTERNAL}")
self.clock_source = ClockSource.INTERNAL

print(f"clock_source: {self.clock_source}")
print(f"internal is: {ClockSource.INTERNAL}")
self.logger.debug(f"Clock Source: {self.clock_source}")

self.start_datetime = datetime.strptime(start_datetime, DATETIME_FORMAT)
self.end_datetime = datetime.strptime(end_datetime, DATETIME_FORMAT)
self.logger.debug(f"Start Datetime: {self.start_datetime}")
self.logger.debug(f"End Datetime: {self.end_datetime}")

if realtime:
self.timescale = 1
else:
self.timescale = timescale

if self.clock_source == ClockSource.INTERNAL:
self.logger.debug(f"Timescale: {self.timescale}")

# Check for at least one of the required parameters
if not realtime and not timescale and not external_clock:
raise JobException("At least one of 'external_clock', 'timescale', or 'realtime' must be specified")

@property
def advance_interval(self) -> timedelta:
"""Get the timedelta for how often a simulation should be advanced
based on the timescale and timestep_duration
"""
if self.timestep_duration and self.timescale:
return self.timestep_duration / self.timescale
return None

@property
def timesteps_per_hour(self) -> int:
"""Get advance step time in terms of how many steps are requried
to advance the model one hour
"""
return int(timedelta(hours=1) / self.timestep_duration)


class StepRunBase(Job):
def __init__(self, run_id: str, realtime: bool, timescale: int, external_clock: bool, start_datetime: str, end_datetime: str, **kwargs) -> None:
def __init__(self, run_id: str, realtime: bool, timescale: int, external_clock: bool, start_datetime: str, end_datetime: str) -> None:
"""Base class for all jobs to step a run. The init handles the basic configuration needed
for the derived classes.
Expand All @@ -91,96 +111,89 @@ def __init__(self, run_id: str, realtime: bool, timescale: int, external_clock:
external_clock (bool): Use an external clock to step the simulation.
start_datetime (str): Start datetime. #TODO: this should be typed datetime
end_datetime (str): End datetime. #TODO: this should be typed datetime
**skip_site_init (bool): Skip the initialization of the site database object. This is mainly used in testing.
"""
super().__init__()
self.set_run_status(RunStatus.STARTING)
self.options: Options = Options(to_bool(realtime), int(timescale), to_bool(external_clock), start_datetime, end_datetime)

self.first_step_time: datetime = None
self.next_step_time: datetime = None
self.setup_connections()
self.warmup_cleared = not self.options.warmup_is_first_step

connections_manager = AlafalfaConnectionsManager()
self.influx_db_name = connections_manager.influx_db_name
self.influx_client = connections_manager.influx_client
self.historian_enabled = connections_manager.historian_enabled

def exec(self) -> None:
self.logger.info("Initializing simulation...")
self.initialize_simulation()
self.logger.info("Done initializing simulation")
self.logger.info("Simulation initialized.")
self.set_run_status(RunStatus.STARTED)

self.logger.info("Advancing to start time...")
self.advance_to_start_time()
if self.options.warmup_is_first_step and self.options.clock_source == ClockSource.INTERNAL:
if not self.warmup_cleared and self.options.clock_source == ClockSource.INTERNAL:
self.advance()
self.logger.info("Advanced to start time.")

self.set_run_status(RunStatus.RUNNING)
self.logger.info(f"Clock source: {self.options.clock_source}")
if self.options.clock_source == ClockSource.INTERNAL:
self.logger.info("Running Simulation with Internal Clock")
self.logger.info("Running Simulation with Internal Clock.")
self.run_timescale()
elif self.options.clock_source == ClockSource.EXTERNAL:
self.logger.info("Running Simulations with External Clock")
self.logger.info("Running Simulations with External Clock.")
self.start_message_loop()

def initialize_simulation(self) -> None:
"""Placeholder for all things necessary to initialize simulation"""
raise NotImplementedError

def advance_to_start_time(self):
self.logger.info("Advancing to start time")
while self.run.sim_time < self.options.start_datetime:
self.logger.info("Calling advance")
self.advance()

def create_tag_dictionaries(self):
"""Placeholder for method necessary to create Haystack entities and records"""
self.warmup_cleared = True

def run_timescale(self):
self.first_timestep_time = datetime.now()
self.next_step_time = datetime.now() + self.options.advance_interval
self.logger.info(f"Next step time: {self.next_step_time}")
self.logger.info(f"Advance interval is: {self.options.advance_interval}")
"""Run simulation at timescale"""
next_advance_time = datetime.now() + self.options.advance_interval
self.logger.debug(f"Advance interval is: {self.options.advance_interval}")
self.logger.debug(f"Next step time: {next_advance_time}")

while self.is_running:
if self.options.clock_source == ClockSource.INTERNAL:
if datetime.now() >= self.next_step_time:
steps_behind = (datetime.now() - self.next_step_time) / self.options.advance_interval
if steps_behind > self.options.timescale_lag_limit:
raise JobExceptionSimulation(f"Timescale too high. Simulation more than {self.options.timescale_lag_limit} timesteps behind")
self.next_step_time = self.next_step_time + self.options.advance_interval
self.logger.info(f"Internal clock called advance at {self.run.sim_time}")
self.logger.info(f"Next step time: {self.next_step_time}")
self.advance()

if self.check_simulation_stop_conditions() or self.run.sim_time >= self.options.end_datetime:
self.logger.info(f"Stopping at time: {self.run.sim_time}")
self.stop()
break
if datetime.now() >= next_advance_time:
steps_behind = (datetime.now() - next_advance_time) / self.options.advance_interval
if steps_behind > self.options.timescale_lag_limit:
raise JobExceptionSimulation(f"Timescale too high. Simulation more than {self.options.timescale_lag_limit} timesteps behind")
next_advance_time = next_advance_time + self.options.advance_interval

self.logger.debug(f"Internal clock called advance at {self.run.sim_time}")
self.logger.debug(f"Next advance time: {next_advance_time}")
self.advance()

if self.check_simulation_stop_conditions() or self.run.sim_time >= self.options.end_datetime:
self.logger.debug(f"Stopping at time: {self.run.sim_time}")
self.stop()
break

self._check_messages()
self.logger.info("Exitting timescale run")
self.logger.info("Internal clock simulation has exited.")

def get_sim_time(self) -> datetime:
"""Placeholder for method which retrieves time in the simulation"""
raise NotImplementedError

def update_run_time(self) -> None:
"""Update the sim_time in the Run object to match the most up to date sim_time"""
self.set_run_time(self.get_sim_time())

def check_simulation_stop_conditions(self) -> bool:
"""Placeholder to determine whether a simulation should stop"""
"""Placeholder to determine whether a simulation should stop.
This can be used to signal that the simulation has exited and the job can now continue
to clean up."""
return False

def setup_connections(self):
"""Placeholder until all db/connections operations can be completely moved out of the job"""
# InfluxDB
self.historian_enabled = os.environ.get('HISTORIAN_ENABLE', False) == 'true'
if self.historian_enabled:
self.influx_db_name = os.environ['INFLUXDB_DB']
self.influx_client = InfluxDBClient(host=os.environ['INFLUXDB_HOST'],
username=os.environ['INFLUXDB_ADMIN_USER'],
password=os.environ['INFLUXDB_ADMIN_PASSWORD'])
else:
self.influx_db_name = None
self.influx_client = None

@message
def stop(self) -> None:
self.logger.info("Received stop command")
self.logger.info("Stopping simulation.")
super().stop()
self.set_run_status(RunStatus.STOPPING)

Expand All @@ -190,4 +203,5 @@ def cleanup(self) -> None:

@message
def advance(self) -> None:
"""Placeholder for method which advances the simulation one timestep"""
raise NotImplementedError
12 changes: 12 additions & 0 deletions alfalfa_worker/lib/alfalfa_connections_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os

import boto3
from influxdb import InfluxDBClient
from mongoengine import connect
from redis import Redis

Expand All @@ -19,3 +20,14 @@ def __init__(self) -> None:

# Redis
self.redis = Redis(host=os.environ['REDIS_HOST'])

# InfluxDB
self.historian_enabled = os.environ.get('HISTORIAN_ENABLE', False) == 'true'
if self.historian_enabled:
self.influx_db_name = os.environ['INFLUXDB_DB']
self.influx_client = InfluxDBClient(host=os.environ['INFLUXDB_HOST'],
username=os.environ['INFLUXDB_ADMIN_USER'],
password=os.environ['INFLUXDB_ADMIN_PASSWORD'])
else:
self.influx_db_name = None
self.influx_client = None
3 changes: 1 addition & 2 deletions alfalfa_worker/lib/job.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import json
import logging
import os
from datetime import datetime
from enum import Enum
from json.decoder import JSONDecodeError
Expand Down Expand Up @@ -76,7 +75,6 @@ def __new_init__(self: "Job", *args, **kwargs):
# Redis
self.redis = connections_manager.redis
self.redis_pubsub = self.redis.pubsub(ignore_subscribe_messages=True)
logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO"))
self.logger = logging.getLogger(self.__class__.__name__)

# Message Handlers
Expand Down Expand Up @@ -169,6 +167,7 @@ def status(self) -> "JobStatus":

@property
def is_running(self) -> bool:
"""Easily check if the state of the job is running or not"""
return self._status.value < JobStatus.STOPPING.value

@property
Expand Down
Loading

0 comments on commit 415aebc

Please sign in to comment.