From 14dc7866d65422b47622657824bfb363cfe73d7a Mon Sep 17 00:00:00 2001 From: Matthieu Date: Tue, 5 Dec 2023 15:36:04 +0100 Subject: [PATCH] ref:add:fix: some more slurm integration - use `is_async` instead of python keyword `async` - add potential user provided dependency to `get_dep_xxx` methods. This allows for internal dependencies between subjobs of a particular task, like prepare_data. - add a submit method to the `Config` class - Start slurm integration for prepare_data. Continue the same way for the rest of it. - replace some `setattr` calls by direct assignments - move `jobs`, `startdate` and `enddate` to the Config object and remove them from the arguments of `restart_runs_xxx` functions --- config.py | 54 +++++++++++++++++++++------ jobs/icon.py | 26 ++++--------- jobs/prepare_data.py | 37 ++++++++++++++---- run_chain.py | 89 ++++++++++++++++++-------------------------- 4 files changed, 117 insertions(+), 89 deletions(-) diff --git a/config.py b/config.py index 9878d0ec..299f7220 100644 --- a/config.py +++ b/config.py @@ -193,7 +193,7 @@ def set_workflow(self): with open('workflows.yaml') as file: workflows = yaml.safe_load(file) self.workflow = workflows[self.workflow_name] - self. async = 'dependencies' in self.workflow + self.is_async = 'dependencies' in self.workflow # Initiate empty job ids dictionnary so that it can be filled in later self.job_ids = {'current': {}, 'previous': {}} @@ -324,29 +324,41 @@ def create_vars_from_dicts(self, dct=None, key=None): else: setattr(self, subkey, v) - def get_dep_ids(self, job_name): + def get_dep_ids(self, job_name, add_dep=None): """Get dependency job ids for `job_name`""" - deps_ids = [] - if self. async: - # Couls be that job has no dependency, even in an async config, + # Initial list of dependencies + if add_dep is not None: + if type(add_dep) is int: + dep_id_list = [add_dep] + else: + try: + dep_id_list = list(add_dep) + except TypeError: + print(f'add_dep must be an iterable') + else: + dep_id_list = [] + + # Add job dependencies + if self.is_async: + # Could be that job has no dependency, even in an async config, # e.g., prepare_data if deps := self.workflow['dependencies'].get(job_name): for stage in 'previous', 'current': if dep_stage := deps.get(stage): for job in dep_stage: # Could be that dep job id does not exist, e.g., - # if it dep job is deactivated or it's the first chunk + # if dep job is deactivated or it's the first chunk if dep_id := self.job_ids[stage].get(job): - deps_ids.extend(dep_id) - return dep_ids + dep_id_list.extend(dep_id) + return dep_id_list - def get_dep_cmd(self, job_name): + def get_dep_cmd(self, job_name, add_dep=None): """Generate the part of the sbatch command that sepcifies dependencies for job_name.""" - if self. async: + if self.is_async: # async case - if dep_ids := self.get_dep_ids(job_name): + if dep_ids := self.get_dep_ids(job_name, add_dep=add_dep): dep_str = ':'.join(map(str, deps_ids)) return f'--dependency=afterok:{dep_str}' else: @@ -357,6 +369,26 @@ def get_dep_cmd(self, job_name): # sequential case return '--wait' + def submit(self, job_name, script, add_dep=None): + """Submit job with dependencies""" + + script_path = Path(script) + sbatch_cmd = ['sbatch', '--parsable'] + if dep_cmd := self.get_dep_cmd(job_name, add_dep=add_dep): + sbatch_cmd.append(dep_cmd) + sbatch_cmd.append(script_path.name) + + result = subprocess.run(sbatch_cmd, cwd=script_path.parent, capture_output=True) + job_id = int(result.stdout) + if not job_name in self.job_ids['current']: + self.job_ids['current'][job_name] = [job_id] + else: + self.job_ids['current'][job_name].append(job_id) + + # If needed internaly in a multi-job task like prepare_data + # Can then be passed as add_dep keyword + return job_id + def wait_for_previous(self): """wait for all jobs of the previous stage to be finished diff --git a/jobs/icon.py b/jobs/icon.py index d2c68364..4e6b8dcb 100644 --- a/jobs/icon.py +++ b/jobs/icon.py @@ -77,28 +77,18 @@ def main(cfg): cfg.meteo['nameformat']) + '.nc') # Write run script (run_icon.job) - icon_runjob = os.path.join(cfg.case_path, cfg.icon_runjob_filename) - with open(icon_runjob) as input_file: - to_write = input_file.read() - output_file = os.path.join(cfg.icon_work, "run_icon.job") - with open(output_file, "w") as outf: - outf.write( - to_write.format(cfg=cfg, - inidata_filename=inidata_filename, - logfile=logfile, - logfile_finish=logfile_finish)) + template = (cfg.case_path / cfg.icon_runjob_filename).read_text() + script_str = template.format(cfg=cfg, + inidata_filename=inidata_filename, + logfile=logfile, + logfile_finish=logfile_finish) + script = (cfg.icon_work / 'run_icon.job').write_text(script_str) # Submit run script - sbatch_cmd = ['sbatch', '--parsable'] - if dep_cmd := cfg.get_dep_cmd('icon'): - sbatch_cmd.append(dep_cmd) - sbatch_cmd.append(os.path.join(cfg.icon_work, 'run_icon.job')) - - result = subprocess.run(sbatch_cmd, capture_output=True) - cfg.job_ids['current']['icon'] = int(result.stdout), + cfg.submit('icon', script) # Anything hapenning after submission only makes sense in sequential mode - if not cfg. async: + if not cfg.is_async: exitcode = result.returncode # In case of ICON-ART, ignore the "invalid pointer" error on successful run diff --git a/jobs/prepare_data.py b/jobs/prepare_data.py index 3f1733e3..7f9e1533 100644 --- a/jobs/prepare_data.py +++ b/jobs/prepare_data.py @@ -80,7 +80,12 @@ def set_cfg_variables(cfg): cfg.restart_file = cfg.icon_restart_in / cfg.restart_filename cfg.restart_file_scratch = cfg.icon_work / cfg.restart_filename - return cfg + cfg.job_ids['current']['prepare_data'] = [] + + +def async_error(cfg, part="This part"): + if cfg.is_async: + raise NotImplementedError(f"{part} isn't ready for async execution yet") def main(cfg): @@ -123,7 +128,7 @@ def main(cfg): Object holding all user-configuration parameters as attributes """ - cfg = set_cfg_variables(cfg) + set_cfg_variables(cfg) if cfg.workflow_name.startswith('icon'): logging.info('ICON input data (IC/BC)') @@ -139,13 +144,29 @@ def main(cfg): #----------------------------------------------------- # Copy input files #----------------------------------------------------- - for varname in cfg.input_files: - varname_scratch = f'{varname}_scratch' - tools.copy_file(cfg.input_files[varname], - cfg.input_files_scratch[varname], - output_log=True) + wall_time = getattr(cfg, 'copy_input_walltime', '00:01:00') + queue = getattr(cfg, 'copy_input_queue', 'normal') + + script_lines = ['#!/usr/bin/env bash', + f'#SBATCH --job-name="copy_input_{cfg.casename}_{cfg.startdate_sim_yyyymmddhh}_{cfg.enddate_sim_yyyymmddhh}"', + f'#SBATCH --account={cfg.compute_account}', + f'#SBATCH --time={walltime}', + f'#SBATCH --partition={queue}', + '#SBATCH --constraint=gpu', + '#SBATCH --nodes=1', + ''] + for target, destination in zip(cfg.input_files.values(), + cfg.input_files_scratch.values()): + script_lines.append(f'rsync -av {target} {destination}') + + + with (script := cfg.icon_base / 'copy_input.job').open('w') as f: + f.write('\n'.join(script_lines)) + + cfg.submit('prepare_data', script) if cfg.workflow_name == 'icon-art-global': + async_error(cfg, part='global ICON-ART') # -- Download ERA5 data and create the inicond file if cfg.era5_inicond and cfg.lrestart == '.FALSE.': # -- Fetch ERA5 data @@ -306,6 +327,7 @@ def main(cfg): process.communicate() else: # non-global ICON-ART + async_error(cfg, part='non-global ICON-ART') #----------------------------------------------------- # Create LBC datafile lists (each at 00 UTC and others) #----------------------------------------------------- @@ -523,6 +545,7 @@ def main(cfg): # If COSMO (and not ICON): else: + async_error(cfg, part='COSMO') logging.info('COSMO analysis data for IC/BC') dest_path = os.path.join(cfg.int2lm_input, 'meteo') diff --git a/run_chain.py b/run_chain.py index 096c26e7..7c5f1cb4 100755 --- a/run_chain.py +++ b/run_chain.py @@ -84,7 +84,7 @@ def parse_arguments(): return args -def run_chain(cfg, startdate_sim, enddate_sim, job_names, force, resume): +def run_chain(cfg, force, resume): """Run the processing chain, managing job execution and logging. This function sets up and manages the execution of a processing chain, handling @@ -94,12 +94,6 @@ def run_chain(cfg, startdate_sim, enddate_sim, job_names, force, resume): ---------- cfg : Config Object holding user-defined configuration parameters as attributes. - startdate_sim : datetime-object - The start date of the simulation. - enddate_sim : datetime-object - The end date of the simulation. - job_names : list of str - List of names of jobs to execute on every timeslice. force : bool If True, it will force the execution of jobs regardless of their completion status. resume : bool @@ -115,18 +109,13 @@ def run_chain(cfg, startdate_sim, enddate_sim, job_names, force, resume): - This function sets various configuration values based on the provided parameters. - It checks for job completion status and resumes or forces execution accordingly. - Job log files are managed, and errors or timeouts are handled with notifications. - """ - # Write current start and end dates to config variables - cfg.startdate_sim = startdate_sim - cfg.enddate_sim = enddate_sim - - # Set forecast time + """# Set forecast time cfg.forecasttime = (cfg.enddate_sim - cfg.startdate_sim).total_seconds() / 3600 # String variables for startdate_sim - cfg.startdate_sim_yyyymmddhh = startdate_sim.strftime('%Y%m%d%H') - cfg.enddate_sim_yyyymmddhh = enddate_sim.strftime('%Y%m%d%H') + cfg.startdate_sim_yyyymmddhh = cfg.startdate_sim.strftime('%Y%m%d%H') + cfg.enddate_sim_yyyymmddhh = cfg.enddate_sim.strftime('%Y%m%d%H') # Folder naming and structure cfg.job_id = f'{cfg.startdate_sim_yyyymmddhh}_{cfg.enddate_sim_yyyymmddhh}' @@ -203,14 +192,16 @@ def run_chain(cfg, startdate_sim, enddate_sim, job_names, force, resume): tools.create_dir(log_finished_dir, "log_finished") # Number of levels and switch for unit conversion for 'reduce_output' job - if not hasattr(cfg, 'output_levels'): cfg.output_levels = -1 - if not hasattr(cfg, 'convert_gas'): cfg.convert_gas = True + if not hasattr(cfg, 'output_levels'): + cfg.output_levels = -1 + if not hasattr(cfg, 'convert_gas'): + cfg.convert_gas = True - if async: + if cfg.is_async: # Submit current chunck # - [ ] This bypasses all the logfile moving/checking # - [ ] Still needs a mechanism for resume - for job in job_names: + for job in cfg.jobs: getattr(jobs, job).main(cfg) # wait for previsouy chunk to be done @@ -219,7 +210,7 @@ def run_chain(cfg, startdate_sim, enddate_sim, job_names, force, resume): cfg.job_ids['previous'] = cfg.job_ids['current'] else: # run jobs (if required) - for job in job_names: + for job in cfg.jobs: skip = False # if exists job is currently worked on or has been finished @@ -286,7 +277,7 @@ def run_chain(cfg, startdate_sim, enddate_sim, job_names, force, resume): raise RuntimeError(subject) -def restart_runs(cfg, job_names, force, resume): +def restart_runs(cfg, force, resume): """Start subchains in specified intervals and manage restarts. This function slices the total runtime of the processing chain according to the @@ -297,8 +288,6 @@ def restart_runs(cfg, job_names, force, resume): ---------- cfg : Config Object holding all user-configuration parameters as attributes. - job_names : list of str - List of names of jobs to execute on every timeslice. force : bool If True, it will force the execution of jobs regardless of their completion status. resume : bool @@ -318,22 +307,19 @@ def restart_runs(cfg, job_names, force, resume): continue # Set restart variable (only takes effect for ICON) - if startdate_sim == cfg.startdate: - setattr(cfg, "lrestart", '.FALSE.') - else: - setattr(cfg, "lrestart", '.TRUE.') + cfg.lrestart = '.FALSE.' if startdate_sim == cfg.startdate else '.TRUE.' - print("Starting run with startdate {}".format(startdate_sim)) + print(f"Starting run with startdate {startdate_sim}") - run_chain(cfg=cfg, - startdate_sim=startdate_sim, - enddate_sim=enddate_sim, - job_names=job_names, + cfg.startdate_sim = startdate_sim + cfg.enddate_sim = enddate_sim + + run_chain(cfg=cfg force=force, resume=resume) -def restart_runs_spinup(cfg, job_names, force, resume): +def restart_runs_spinup(cfg, force, resume): """Start subchains in specified intervals and manage restarts with spin-up. This function slices the total runtime of the processing chain according to the @@ -343,8 +329,6 @@ def restart_runs_spinup(cfg, job_names, force, resume): Parameters ---------- cfg : Config - Object holding all user-configuration parameters as attributes. - job_names : list of str List of names of jobs to execute on every timeslice. force : bool If True, it will force the execution of jobs regardless of their completion status. @@ -359,22 +343,22 @@ def restart_runs_spinup(cfg, job_names, force, resume): for startdate_sim in tools.iter_hours(cfg.startdate, cfg.enddate, cfg.restart_step_hours): if startdate_sim == cfg.startdate: - setattr(cfg, "first_one", True) - setattr(cfg, "second_one", False) - setattr(cfg, "lrestart", '.FALSE.') + cfg.first_one = True + cfg.second_one = False + cfg.lrestart = '.FALSE.' run_time = cfg.restart_step_hours startdate_sim_spinup = startdate_sim elif startdate_sim == cfg.startdate + timedelta( hours=cfg.restart_step_hours): - setattr(cfg, "first_one", False) - setattr(cfg, "second_one", True) - setattr(cfg, "lrestart", '.TRUE.') + cfg.first_one = False + cfg.second_one = True + cfg.lrestart = '.TRUE.' run_time = cfg.restart_step_hours + cfg.spinup startdate_sim_spinup = startdate_sim - timedelta(hours=cfg.spinup) else: - setattr(cfg, "first_one", False) - setattr(cfg, "second_one", False) - setattr(cfg, "lrestart", '.TRUE.') + cfg.first_one = False + cfg.second_one = False + cfg.lrestart = '.TRUE.' run_time = cfg.restart_step_hours + cfg.spinup startdate_sim_spinup = startdate_sim - timedelta(hours=cfg.spinup) @@ -385,10 +369,10 @@ def restart_runs_spinup(cfg, job_names, force, resume): print(f'Runtime of sub-simulation: {run_time} h') + cfg.startdate_sim = startdate_sim_spinup + cfg.enddate_sim = enddate_sim + run_chain(cfg=cfg, - startdate_sim=startdate_sim_spinup, - enddate_sim=enddate_sim, - job_names=job_names, force=force, resume=resume) @@ -430,7 +414,9 @@ def main(): # Check if jobs are set or if default ones are used if args.job_list is None: - args.job_list = cfg.workflow['jobs'] + cfg.jobs = cfg.workflow['jobs'] + else: + cfg.jobs = args.job_list print( f"Starting chain for case {casename} and workflow {cfg.workflow_name}" @@ -441,21 +427,18 @@ def main(): if hasattr(cfg, 'spinup'): print("Using spin-up restarts.") restart_runs_spinup(cfg=cfg, - job_names=args.job_list, force=args.force, resume=args.resume) else: print("Using built-in model restarts.") restart_runs(cfg=cfg, - job_names=args.job_list, force=args.force, resume=args.resume) else: print("No restarts are used.") + cfg.startdate_sim = cfg.startdate + cfg.enddate_sim = cfg.enddate run_chain(cfg=cfg, - startdate_sim=cfg.startdate, - enddate_sim=cfg.enddate, - job_names=args.job_list, force=args.force, resume=args.resume)