Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregator and launcher for Flexpart #1

Open
wants to merge 37 commits into
base: main
Choose a base branch
from

Conversation

ninaburg
Copy link
Collaborator

@ninaburg ninaburg commented Oct 3, 2024

Purpose:

Trigger both preprocessing and Flexpart from Aviso

Changes:

  • Extracted Aviso Files: Moved Aviso and service settings from FlexPrep (see PR #18) into this new orchestrator repo.
  • Flexpart Aggregator: Added an aggregator for Flexpart that triggers after the Flexprep preprocessing step.
  • Flexpart Launch: Implemented the launch of Flexpart by calling the containerize branch from the Flexpart repo (currently in development).

@ninaburg ninaburg assigned ninaburg and unassigned ninaburg Oct 17, 2024
@ninaburg ninaburg force-pushed the trigger_aggregation_flexpartjob branch 3 times, most recently from 7c95f62 to 192e0c5 Compare October 29, 2024 14:41
@ninaburg ninaburg force-pushed the trigger_aggregation_flexpartjob branch from 192e0c5 to d4a13d0 Compare October 29, 2024 14:53
Copy link
Collaborator

@victoria-cherkas victoria-cherkas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be nicer if all reading and writing to the db was handled in this repo, so flexprep does not need to know about the DB at all. Then the workflow logic is separated from the processing.

Comment on lines 61 to 77
# SR It would be handy to abort if one linter fails b/c the same error often
# SR triggers multiple linters. That way, if flake8 fails, pylint and mypy
# SR (which in large code bases can take some time to run) could be skipped
# SR before fixing the error. Unfortunately, pre-commit only provides the global
# SR option fail_fast, which would abort even after the small fixers and
# SR formatters changed something. A potential solution could be to write a
# SR small bash script run-linters.sh that runs flake8, pylint and run-mypy.sh
# SR in that order and aborts on error.
# TODO: There is significant work involved in getting pylint going. This can be its separate task
# - repo: local
# hooks:
# - id: pylint
# name: pylint
# description: Check Python code for correctness, consistency and adherence to best practices
# language: system
# entry: pylint
# types: [python]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove the comment here? Also fine to remove the whole of pre-commit since linting/mypy is checked in CICD ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea to align how the linting is done with how it's done in CI/CD. I removed the dependencies for pre-commit

README.rst Outdated
Comment on lines 4 to 5
The flex-container-orchestrator manages Aviso notifications and triggers the flexprep and flexpart-ifs containers,
as well as the file aggregation script for Flexpart.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
The flex-container-orchestrator manages Aviso notifications and triggers the flexprep and flexpart-ifs containers,
as well as the file aggregation script for Flexpart.
The flex-container-orchestrator manages the event driven workflow Flexpart IFS workflow, based on events from Aviso. The repo coordinates both flexprep (pre-processing of raw IFS data) and flexpart-ifs containers, ensuring all required lead time data is processed before launching Flexpart.

README.rst Outdated
Comment on lines 20 to 23
$ pipx install mchbuild
$ cd flex-container-orchestrator
$ mchbuild local.build local.test
$ mchbuild local.run
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool didnt know about this!

README.rst Outdated

.. code-block:: console

$ poetry run uvicorn --port 8080 --reload flex_container_orchestrator.main:app
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how come we need uvicorn?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we don't. This came with the mch python template and I updated it now with:
$ poetry run python3 flex_container_orchestrator/main.py --date {date} --time {time} --step {step} --location {location}

pyproject.toml Outdated
pydantic-settings = "^2.6.0"
python-json-logger = "^2.0.7"
boto3 = "^1.35.48"
# mchbuild = "^0.4.8"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove comment? or add to dev dependencies

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is removed. mchbuild doesn't work outside of meteoswiss network, so I don't see the value of adding it here. For testing, one can install it with pip.

time_settings (dict): Time settings dictionary.

Returns:
tuple: Tuple containing lists of forecast times, lead times, and all steps.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should describe here what is the difference between lead times and all steps, to me it is not immediately clear

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored the description of the returns:

    Returns:
        tuple[list[list[str]], list[list[datetime]], set[str]]:
            - all_input_forecasts: A nested list where each sublist contains forecast labels
              for each Flexpart run in the format "{reference_time}_{step}".
            - all_flexpart_leadtimes: A nested list where each sublist contains datetime objects
              representing the leadtimes (reference_time + step) for each Flexpart run.
            - all_input_forecasts_set: A set of unique forecasts in the format "{reference_time}_{step}"
              required for Flexpart simulations.

time_settings["tdelta"],
time_settings["tfreq_f"],
)
all_list_ltf, all_list_lt, all_steps = generate_forecast_times(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all_list_ltf, all_list_lt <- i think the naming of these could be improved, the similarity makes it hard to read

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it to: input_forecasts, flexpart_leadtimes

Comment on lines 166 to 173
for item in items_f:
if item[0]: # processed == True
frt_str = (
frt.strftime("%Y%m%d%H%M")
if isinstance(frt, datetime.datetime)
else str(frt)
)
processed_items.add(frt_str + f"{int(item[1]):02}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for item in items_f:
if item[0]: # processed == True
frt_str = (
frt.strftime("%Y%m%d%H%M")
if isinstance(frt, datetime.datetime)
else str(frt)
)
processed_items.add(frt_str + f"{int(item[1]):02}")
for processed, step in items_f:
if processed:
frt_str = (
frt.strftime("%Y%m%d%H%M")
if isinstance(frt, datetime.datetime)
else str(frt)
)
processed_items.add(frt_str + f"{int(step):02}")

Comment on lines 251 to 253
all_steps = set()
all_list_ltf = []
all_list_lt = []
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The naming of these variables is a bit confusing

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it to:
all_input_forecasts_set
all_input_forecasts
all_flexpart_leadtimes

Comment on lines 315 to 318
if not is_row_processed(conn, frt_dt, args.step):
logging.info("File processing incomplete. Exiting before launching Flexpart.")
conn.close()
sys.exit(0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like how here (within main()) in the flow is clear and it exits if the lead time is not processed. But in the rest of this function, it is not clear where in the logic we discover that the forecast is not fully processed / ready to run Flexart,

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree and have refactored the module. The main() function is now run_aggregator, and I've included comments for better clarity.

@ninaburg ninaburg force-pushed the trigger_aggregation_flexpartjob branch from 252bbba to d84223a Compare November 8, 2024 14:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants