Skip to content

Commit

Permalink
Merge branch 'async' of github.com:C2SM/processing-chain into async
Browse files Browse the repository at this point in the history
  • Loading branch information
leclairm committed Jan 9, 2024
2 parents d7033a5 + 3ce57d6 commit 8df44e8
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 81 deletions.
15 changes: 7 additions & 8 deletions config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import subprocess
import os
import yaml
import logging

from jobs import tools
from pathlib import Path
Expand Down Expand Up @@ -408,13 +407,13 @@ def get_dep_ids(self, job_name, add_dep=None):
"""Get dependency job ids for `job_name`"""
# Initial list of dependencies
if add_dep is not None:
if type(add_dep) is int:
if isinstance(add_dep, int):
dep_id_list = [add_dep]
else:
try:
dep_id_list = list(add_dep)
except TypeError:
print(f'add_dep must be an iterable')
print("add_dep must be an iterable")
else:
dep_id_list = []

Expand Down Expand Up @@ -461,7 +460,7 @@ def submit(self, job_name, script, add_dep=None, logfile=None):
job_id = int(result.stdout)
print(f' └── Submitted batch job {job_id}')

if not job_name in self.job_ids['current']:
if job_name not in self.job_ids['current']:
self.job_ids['current'][job_name] = [job_id]
else:
self.job_ids['current'][job_name].append(job_id)
Expand Down Expand Up @@ -490,9 +489,9 @@ def create_sbatch_script(self, job_name, log_file):
script_lines = [
'#!/usr/bin/env bash',
f'#SBATCH --job-name="{job_name}_{self.job_id}"',
f'#SBATCH --nodes=1',
'#SBATCH --nodes=1',
f'#SBATCH --output={log_file}',
f'#SBATCH --open-mode=append',
'#SBATCH --open-mode=append',
f'#SBATCH --account={self.compute_account}',
f'#SBATCH --partition={self.compute_queue}',
f'#SBATCH --constraint={self.constraint}',
Expand Down Expand Up @@ -524,8 +523,8 @@ def wait_for_previous(self):
log_file = self.case_root / 'wait.log'
dep_str = ':'.join(map(str, dep_ids))
script_lines = [
'#!/usr/bin/env bash', f'#SBATCH --job-name="wait"',
f'#SBATCH --nodes=1', f'#SBATCH --output={log_file}',
'#!/usr/bin/env bash', '#SBATCH --job-name="wait"',
'#SBATCH --nodes=1', f'#SBATCH --output={log_file}',
f'#SBATCH --account={self.compute_account}',
f'#SBATCH --partition={self.compute_queue}',
f'#SBATCH --constraint={self.constraint}',
Expand Down
78 changes: 7 additions & 71 deletions jobs/prepare_art_global.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,88 +6,24 @@
import xarray as xr
import shutil
import subprocess
from . import tools
from . import tools, prepare_data
from pathlib import Path
from .tools.interpolate_data import create_oh_for_restart, create_oh_for_inicond
from .tools.interpolate_data import create_oh_for_restart, create_oh_for_inicond # noqa: F401
from .tools.fetch_external_data import fetch_era5, fetch_era5_nudging


def set_cfg_variables(cfg):
if cfg.workflow_name.startswith('cosmo'):
cfg.int2lm_root = cfg.chain_root / 'int2lm'
cfg.int2lm_input = cfg.int2lm_root / 'input'
elif cfg.workflow_name.startswith('icon'):
cfg.icon_base = cfg.chain_root / 'icon'
cfg.icon_input = cfg.icon_base / 'input'
cfg.icon_input_icbc = cfg.icon_input / 'icbc'
cfg.icon_work = cfg.icon_base / 'run'
cfg.icon_output = cfg.icon_base / 'output'
cfg.icon_output_reduced = cfg.icon_base / 'output_reduced'
cfg.icon_restart_out = cfg.icon_base / 'restart'
cfg.icon_restart_in = cfg.chain_root_prev / 'icon' / 'run'
cfg.icon_input_icbc_prev = cfg.chain_root_prev / 'icon' / 'input' / 'icbc'

cfg.input_files_scratch = {}
for dsc, file in cfg.input_files.items():
cfg.input_files[dsc] = (p := Path(file))
cfg.input_files_scratch[dsc] = cfg.icon_input / p.name

cfg.create_vars_from_dicts()

cfg.ini_datetime_string = cfg.startdate.strftime('%Y-%m-%dT%H:00:00Z')
cfg.end_datetime_string = cfg.enddate.strftime('%Y-%m-%dT%H:00:00Z')

if cfg.workflow_name == 'icon-art-oem':
cfg.startdate_sim_yyyymmdd_hh = cfg.startdate_sim.strftime(
'%Y%m%d_%H')

if cfg.workflow_name == 'icon-art-global':
# Nudge type (global or nothing)
cfg.nudge_type = 2 if cfg.era5_global_nudging else 0
# Time step for global nudging in seconds
cfg.nudging_step_seconds = cfg.nudging_step * 3600
# Prescribed initial conditions for CH4, CO and/or OH
cfg.iart_init_gas = 4 if cfg.species_inicond else 0

if cfg.lrestart == '.TRUE.':
cfg.restart_filename = 'restart_atm_DOM01.nc'
cfg.restart_file = cfg.icon_restart_in / cfg.restart_filename
cfg.restart_file_scratch = cfg.icon_work / cfg.restart_filename

cfg.job_ids['current']['prepare_data'] = []


def main(cfg):
"""
This function prepares the data for global ICON-ART simulations
by downloading necessary meteorological and chemical data,
creating the inicond file, and handling global nudging
conditions. The workflow includes fetching ERA5 data, processing it,
and creating initial conditions for tracers (such as CH4, CO, and OH).
The main steps performed by this function include:
1. Downloading ERA5 data and creating the inicond file if specified in the
configuration.
2. Creating initial conditions for tracers (CH4, CO, OH) if needed,
including handling restart scenarios.
3. Handling global nudging conditions by downloading and processing ERA5 and
CAMS data if configured.
Note
----
- The function utilizes external scripts for processing ERA5 data
(`icon_era5_inicond.sh`, `icon_era5_nudging.sh`) and CAMS data
(`icon_cams_nudging.sh`).
- The `tools` module provides various utility functions used in the workflow.
Prepare global ICON-ART simulations.
Parameters
----------
cfg : Config
Object holding all user-configuration parameters as attributes.
"""
set_cfg_variables(cfg)
prepare_data.set_cfg_variables(cfg)
launch_time = cfg.init_time_logging("prepare_art_global")
logging.info("Preprae ICON-ART for global simulations")
logging.info("Prepare ICON-ART for global simulations")

# -- Download ERA5 data and create the inicond file
if cfg.era5_inicond and cfg.lrestart == '.FALSE.':
Expand Down Expand Up @@ -153,8 +89,8 @@ def main(cfg):
else:

# -- Check the extension of tracer variables in the restart file
ds_restart = xr.open_dataset(cfg.restart_file)
tracer_name = cfg.species2restart[0]
ds_restart = xr.open_dataset(cfg.restart_file) # noqa: F841
tracer_name = cfg.species2restart[0] # noqa: F841
# FIXME:
# var_restart = [
# IndexError: list index out of range
Expand Down
4 changes: 2 additions & 2 deletions run_chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ def run_chunk(cfg, force, resume):

# Submit the job
script = cfg.create_sbatch_script(job, logfile)
job_id = cfg.submit(job, script)
cfg.submit(job, script)

# wait for previous chunk to be done
cfg.wait_for_previous()
Expand Down Expand Up @@ -295,7 +295,7 @@ def run_chunk(cfg, force, resume):

exitcode = 0
try_count = 0
except:
except Exception:
subject = "ERROR or TIMEOUT in job '%s' for chain '%s'" % (
job, cfg.job_id)
logging.exception(subject)
Expand Down
2 changes: 2 additions & 0 deletions workflows.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ icon-art-global:
prepare_art_global:
current:
- prepare_data
previous:
- icon
icon:
current:
- prepare_data
Expand Down

0 comments on commit 8df44e8

Please sign in to comment.