Skip to content

Commit

Permalink
Merge branch 'async' of github.com:C2SM/processing-chain into async
Browse files Browse the repository at this point in the history
  • Loading branch information
leclairm committed Jan 8, 2024
2 parents 9956a7d + 837334d commit d1fb2f7
Show file tree
Hide file tree
Showing 18 changed files with 678 additions and 645 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Configuration file for the 'icon-async-test' case with ICON
# Configuration file for the 'icon-test' case with ICON

workflow: icon-async
workflow: icon-seq
constraint: gpu
run_on: cpu
compute_queue: normal
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion cases/icon-test/config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Configuration file for the 'icon-test' case with ICON
# Configuration file for the 'icon-async-test' case with ICON

workflow: icon
constraint: gpu
Expand Down
68 changes: 53 additions & 15 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
import os
import yaml
import logging

from jobs import tools
from pathlib import Path
from datetime import datetime


class Config():
Expand Down Expand Up @@ -347,7 +349,7 @@ def log_job_status(self, job, status, launch_time, duration=None):
"""
log_file = self.case_root / "chain_status.log"

# Check if the header exists, if not, create it
# Check if the file exists, if not, create it and write header
if not log_file.is_file():
header = "Name ID Status Time Duration\n"
with open(log_file, 'w') as f:
Expand All @@ -357,7 +359,7 @@ def log_job_status(self, job, status, launch_time, duration=None):
if job == 'chain':
if duration is not None:
duration = self.format_duration(duration)
job_id = ''
job_id = self.casename
else:
if duration is not None:
duration = f"{str(int(duration.total_seconds()))} s"
Expand Down Expand Up @@ -391,6 +393,17 @@ def format_duration(self, duration):
formatted_duration = f"{int(days)}d {int(hours)}h {int(minutes)}m {int(seconds)}s"
return formatted_duration

def init_time_logging(self, job):
launch_time = datetime.now()
self.log_job_status(job, 'START', launch_time)

return launch_time

def finish_time_logging(self, job, launch_time):
end_time = datetime.now()
duration = end_time - launch_time
self.log_job_status(job, 'FINISH', end_time, duration)

def get_dep_ids(self, job_name, add_dep=None):
"""Get dependency job ids for `job_name`"""

Expand Down Expand Up @@ -435,7 +448,7 @@ def get_dep_cmd(self, job_name, add_dep=None):
# sequential case
return '--wait'

def submit(self, job_name, script, add_dep=None):
def submit(self, job_name, script, add_dep=None, logfile=None):
"""Submit job with dependencies"""

script_path = Path(script)
Expand All @@ -455,15 +468,44 @@ def submit(self, job_name, script, add_dep=None):
else:
self.job_ids['current'][job_name].append(job_id)

# If needed internaly in a multi-job task like prepare_data
# Can then be passed as add_dep keyword
return result, job_id

def check_submitted_job(self, script, result):
exitcode = result.returncode
self.check_job(exitcode, logfile)

return job_id

def check_job(self, exitcode, logfile=None):
# In case of ICON-ART, ignore the "invalid pointer" error on successful run
if logfile and tools.grep("ART: ", logfile)['success'] and \
tools.grep("free(): invalid pointer", logfile)['success'] and \
tools.grep("clean-up finished", logfile)['success']:
exitcode = 0

if exitcode != 0:
raise RuntimeError(f"sbatch returned exitcode {exitcode}")
logging.info(f"{script} successfully executed.")

def create_sbatch_script(self, job_name, log_file):
script_lines = [
'#!/usr/bin/env bash',
f'#SBATCH --job-name="{job_name}_{self.job_id}"',
f'#SBATCH --nodes=1',
f'#SBATCH --output={log_file}',
f'#SBATCH --open-mode=append',
f'#SBATCH --account={self.compute_account}',
f'#SBATCH --partition={self.compute_queue}',
f'#SBATCH --constraint={self.constraint}',
'',
f'cd {self.chain_src_dir}',
'eval "$(conda shell.bash hook)"',
'conda activate proc-chain',
f'./run_chain.py {self.casename} -j {job_name} -c {self.job_id} -f -s --no-logging',
'',
]

job_file = self.chain_root / f'{job_name}.sh'
with open(job_file, mode='w') as job_script:
job_script.write('\n'.join(script_lines))

return job_file

def wait_for_previous(self):
"""wait for all jobs of the previous stage to be finished
Expand All @@ -476,8 +518,8 @@ def wait_for_previous(self):
for ids in self.job_ids['previous'].values():
dep_ids.extend(ids)
if dep_ids:
job_file = self.chain_root / 'submit.wait.slurm'
log_file = self.chain_root / 'wait.log'
job_file = self.case_root / 'submit.wait.slurm'
log_file = self.case_root / 'wait.log'
dep_str = ':'.join(map(str, dep_ids))
script_lines = [
'#!/usr/bin/env bash', f'#SBATCH --job-name="wait"',
Expand All @@ -493,10 +535,6 @@ def wait_for_previous(self):

subprocess.run(['sbatch', '--wait', job_file], check=True)

# Remove sbatch script and log file after execution
os.remove(job_file)
os.remove(log_file)

def get_job_info(jobid, slurm_keys=['Elapsed']):
"""Return information from slurm job as given by sacct
Expand Down
3 changes: 3 additions & 0 deletions jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
from . import photo_rate
from . import post_cosmo
from . import post_int2lm
from . import prepare_art
from . import prepare_art_oem
from . import prepare_art_global
from . import prepare_data
from . import reduce_output
from . import verify_chain
10 changes: 5 additions & 5 deletions jobs/geosp.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,19 @@
import os
from pathlib import Path
import logging
from datetime import timedelta
import xarray as xr
import numpy as np
from . import tools, prepare_data


def main(cfg):
"""
Add GEOSP
Add GEOSP to all meteo files
"""
prepare_data.set_cfg_variables(cfg)
launch_time = cfg.init_time_logging("geosp")
logging.info("Add GEOSP to all meteo files")

#-----------------------------------------------------
# Add GEOSP to all meteo files
#-----------------------------------------------------
for time in tools.iter_hours(cfg.startdate_sim, cfg.enddate_sim,
cfg.meteo['inc']):
# Specify file names
Expand Down Expand Up @@ -58,3 +56,5 @@ def main(cfg):
logging.info("Added GEOSP to file {}".format(merged_file))
# Rename file to get original file name
tools.rename_file(merged_file, src_file)

cfg.finish_time_logging("geosp", launch_time)
17 changes: 3 additions & 14 deletions jobs/icon.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# -*- coding: utf-8 -*-

import logging
import subprocess
from . import tools, prepare_data


Expand Down Expand Up @@ -33,6 +32,7 @@ def main(cfg):
Object holding all user-configuration parameters as attributes.
"""
prepare_data.set_cfg_variables(cfg)
launch_time = cfg.init_time_logging("icon")

logfile = cfg.log_working_dir / "icon"
logfile_finish = cfg.log_finished_dir / "icon"
Expand Down Expand Up @@ -66,17 +66,6 @@ def main(cfg):
script.write_text(script_str)

# Submit run script
result, job_id = cfg.submit('icon', script)
job_id = cfg.submit('icon', script, logfile=logfile)

# Anything hapenning after submission only makes sense in sequential mode
if not cfg.is_async:
exitcode = result.returncode

# In case of ICON-ART, ignore the "invalid pointer" error on successful run
if cfg.workflow_name.startswith('icon-art'):
if tools.grep("free(): invalid pointer", logfile)['success'] and \
tools.grep("clean-up finished", logfile)['success']:
exitcode = 0

if exitcode != 0:
raise RuntimeError("sbatch returned exitcode {}".format(exitcode))
cfg.finish_time_logging("icon", launch_time)
2 changes: 1 addition & 1 deletion jobs/int2lm.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def main(cfg):
input_art_filename)

# Change of soil model from TERRA to TERRA multi-layer on 2 Aug 2007
if cfg.startdate_sim < datetime(2007, 8, 2, tzinfo=pytz.UTC):
if cfg.startdate_sim < datetime(2007, 8, 2):
multi_layer = ".FALSE."
else:
multi_layer = ".TRUE."
Expand Down
Loading

0 comments on commit d1fb2f7

Please sign in to comment.