Skip to content

Commit

Permalink
ref:add:fix: some more slurm integration
Browse files Browse the repository at this point in the history
- use `is_async` instead of python keyword `async`
- add potential user provided dependency to `get_dep_xxx` methods.
  This allows for internal dependencies between subjobs of a
  particular task, like prepare_data.
- add a submit method to the `Config` class
- Start slurm integration for prepare_data. Continue the same way for
  the rest of it.
- replace some `setattr` calls by direct assignments
- move `jobs`, `startdate` and `enddate` to the Config object and
  remove them from the arguments of `restart_runs_xxx` functions
  • Loading branch information
leclairm committed Dec 5, 2023
1 parent dc15d91 commit 14dc786
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 89 deletions.
54 changes: 43 additions & 11 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def set_workflow(self):
with open('workflows.yaml') as file:
workflows = yaml.safe_load(file)
self.workflow = workflows[self.workflow_name]
self. async = 'dependencies' in self.workflow
self.is_async = 'dependencies' in self.workflow

# Initiate empty job ids dictionnary so that it can be filled in later
self.job_ids = {'current': {}, 'previous': {}}
Expand Down Expand Up @@ -324,29 +324,41 @@ def create_vars_from_dicts(self, dct=None, key=None):
else:
setattr(self, subkey, v)

def get_dep_ids(self, job_name):
def get_dep_ids(self, job_name, add_dep=None):
"""Get dependency job ids for `job_name`"""

deps_ids = []
if self. async:
# Couls be that job has no dependency, even in an async config,
# Initial list of dependencies
if add_dep is not None:
if type(add_dep) is int:
dep_id_list = [add_dep]
else:
try:
dep_id_list = list(add_dep)
except TypeError:
print(f'add_dep must be an iterable')
else:
dep_id_list = []

# Add job dependencies
if self.is_async:
# Could be that job has no dependency, even in an async config,
# e.g., prepare_data
if deps := self.workflow['dependencies'].get(job_name):
for stage in 'previous', 'current':
if dep_stage := deps.get(stage):
for job in dep_stage:
# Could be that dep job id does not exist, e.g.,
# if it dep job is deactivated or it's the first chunk
# if dep job is deactivated or it's the first chunk
if dep_id := self.job_ids[stage].get(job):
deps_ids.extend(dep_id)
return dep_ids
dep_id_list.extend(dep_id)
return dep_id_list

def get_dep_cmd(self, job_name):
def get_dep_cmd(self, job_name, add_dep=None):
"""Generate the part of the sbatch command that sepcifies dependencies for job_name."""

if self. async:
if self.is_async:
# async case
if dep_ids := self.get_dep_ids(job_name):
if dep_ids := self.get_dep_ids(job_name, add_dep=add_dep):
dep_str = ':'.join(map(str, deps_ids))
return f'--dependency=afterok:{dep_str}'
else:
Expand All @@ -357,6 +369,26 @@ def get_dep_cmd(self, job_name):
# sequential case
return '--wait'

def submit(self, job_name, script, add_dep=None):
"""Submit job with dependencies"""

script_path = Path(script)
sbatch_cmd = ['sbatch', '--parsable']
if dep_cmd := self.get_dep_cmd(job_name, add_dep=add_dep):
sbatch_cmd.append(dep_cmd)
sbatch_cmd.append(script_path.name)

result = subprocess.run(sbatch_cmd, cwd=script_path.parent, capture_output=True)
job_id = int(result.stdout)
if not job_name in self.job_ids['current']:
self.job_ids['current'][job_name] = [job_id]
else:
self.job_ids['current'][job_name].append(job_id)

# If needed internaly in a multi-job task like prepare_data
# Can then be passed as add_dep keyword
return job_id

def wait_for_previous(self):
"""wait for all jobs of the previous stage to be finished
Expand Down
26 changes: 8 additions & 18 deletions jobs/icon.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,28 +77,18 @@ def main(cfg):
cfg.meteo['nameformat']) + '.nc')

# Write run script (run_icon.job)
icon_runjob = os.path.join(cfg.case_path, cfg.icon_runjob_filename)
with open(icon_runjob) as input_file:
to_write = input_file.read()
output_file = os.path.join(cfg.icon_work, "run_icon.job")
with open(output_file, "w") as outf:
outf.write(
to_write.format(cfg=cfg,
inidata_filename=inidata_filename,
logfile=logfile,
logfile_finish=logfile_finish))
template = (cfg.case_path / cfg.icon_runjob_filename).read_text()
script_str = template.format(cfg=cfg,
inidata_filename=inidata_filename,
logfile=logfile,
logfile_finish=logfile_finish)
script = (cfg.icon_work / 'run_icon.job').write_text(script_str)

# Submit run script
sbatch_cmd = ['sbatch', '--parsable']
if dep_cmd := cfg.get_dep_cmd('icon'):
sbatch_cmd.append(dep_cmd)
sbatch_cmd.append(os.path.join(cfg.icon_work, 'run_icon.job'))

result = subprocess.run(sbatch_cmd, capture_output=True)
cfg.job_ids['current']['icon'] = int(result.stdout),
cfg.submit('icon', script)

# Anything hapenning after submission only makes sense in sequential mode
if not cfg. async:
if not cfg.is_async:
exitcode = result.returncode

# In case of ICON-ART, ignore the "invalid pointer" error on successful run
Expand Down
37 changes: 30 additions & 7 deletions jobs/prepare_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,12 @@ def set_cfg_variables(cfg):
cfg.restart_file = cfg.icon_restart_in / cfg.restart_filename
cfg.restart_file_scratch = cfg.icon_work / cfg.restart_filename

return cfg
cfg.job_ids['current']['prepare_data'] = []


def async_error(cfg, part="This part"):
if cfg.is_async:
raise NotImplementedError(f"{part} isn't ready for async execution yet")


def main(cfg):
Expand Down Expand Up @@ -123,7 +128,7 @@ def main(cfg):
Object holding all user-configuration parameters as attributes
"""

cfg = set_cfg_variables(cfg)
set_cfg_variables(cfg)

if cfg.workflow_name.startswith('icon'):
logging.info('ICON input data (IC/BC)')
Expand All @@ -139,13 +144,29 @@ def main(cfg):
#-----------------------------------------------------
# Copy input files
#-----------------------------------------------------
for varname in cfg.input_files:
varname_scratch = f'{varname}_scratch'
tools.copy_file(cfg.input_files[varname],
cfg.input_files_scratch[varname],
output_log=True)
wall_time = getattr(cfg, 'copy_input_walltime', '00:01:00')
queue = getattr(cfg, 'copy_input_queue', 'normal')

script_lines = ['#!/usr/bin/env bash',
f'#SBATCH --job-name="copy_input_{cfg.casename}_{cfg.startdate_sim_yyyymmddhh}_{cfg.enddate_sim_yyyymmddhh}"',
f'#SBATCH --account={cfg.compute_account}',
f'#SBATCH --time={walltime}',
f'#SBATCH --partition={queue}',
'#SBATCH --constraint=gpu',
'#SBATCH --nodes=1',
'']
for target, destination in zip(cfg.input_files.values(),
cfg.input_files_scratch.values()):
script_lines.append(f'rsync -av {target} {destination}')


with (script := cfg.icon_base / 'copy_input.job').open('w') as f:
f.write('\n'.join(script_lines))

cfg.submit('prepare_data', script)

if cfg.workflow_name == 'icon-art-global':
async_error(cfg, part='global ICON-ART')
# -- Download ERA5 data and create the inicond file
if cfg.era5_inicond and cfg.lrestart == '.FALSE.':
# -- Fetch ERA5 data
Expand Down Expand Up @@ -306,6 +327,7 @@ def main(cfg):
process.communicate()

else: # non-global ICON-ART
async_error(cfg, part='non-global ICON-ART')
#-----------------------------------------------------
# Create LBC datafile lists (each at 00 UTC and others)
#-----------------------------------------------------
Expand Down Expand Up @@ -523,6 +545,7 @@ def main(cfg):

# If COSMO (and not ICON):
else:
async_error(cfg, part='COSMO')
logging.info('COSMO analysis data for IC/BC')

dest_path = os.path.join(cfg.int2lm_input, 'meteo')
Expand Down
Loading

0 comments on commit 14dc786

Please sign in to comment.