Skip to content

Commit

Permalink
Use config instead of local variables
Browse files Browse the repository at this point in the history
  • Loading branch information
mjaehn committed Dec 20, 2023
1 parent 5a1ccde commit 6dd552a
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 15 deletions.
2 changes: 1 addition & 1 deletion config.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ def wait_for_previous(self):

subprocess.run(['sbatch', '--wait', job_file], check=True)

# Remove sbatch script after execution
# Remove sbatch script and log file after execution
os.remove(job_file)
os.remove(log_file)

Expand Down
26 changes: 12 additions & 14 deletions run_chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,15 +182,13 @@ def run_chunk(cfg, force, resume):
cfg.meteo.prefix = 'lffd'

# Logging
log_working_dir = cfg.chain_root / 'checkpoints' / 'working'
log_finished_dir = cfg.chain_root / 'checkpoints' / 'finished'
cfg.log_working_dir = log_working_dir
cfg.log_finished_dir = log_finished_dir
cfg.log_working_dir = cfg.chain_root / 'checkpoints' / 'working'
cfg.log_finished_dir = cfg.chain_root / 'checkpoints' / 'finished'

# Create working directories
tools.create_dir(cfg.chain_root, "chain_root")
tools.create_dir(log_working_dir, "log_working")
tools.create_dir(log_finished_dir, "log_finished")
tools.create_dir(cfg.log_working_dir, "log_working")
tools.create_dir(cfg.log_finished_dir, "log_finished")

# Number of levels and switch for unit conversion for 'reduce_output' job
if not hasattr(cfg, 'output_levels'):
Expand All @@ -201,7 +199,7 @@ def run_chunk(cfg, force, resume):
if cfg.is_async:
# Submit current chunk
for job in cfg.jobs:
if (log_finished_dir / job).exists() and not force:
if (cfg.log_finished_dir / job).exists() and not force:
# Skip job if already finished
print(f' └── Skip "{job}" for chunk "{cfg.job_id}"')
skip = True
Expand All @@ -224,10 +222,10 @@ def run_chunk(cfg, force, resume):
skip = False

# if exists job is currently worked on or has been finished
if (log_working_dir / job).exists():
if (cfg.log_working_dir / job).exists():
if not force:
while True:
if (log_finished_dir / job).exists():
if (cfg.log_finished_dir / job).exists():
print(f"Skip {job} for chunk {cfg.job_id}")
skip = True
break
Expand All @@ -240,8 +238,8 @@ def run_chunk(cfg, force, resume):
for _ in range(3000):
time.sleep(0.1)
else:
(log_working_dir / job).unlink()
(log_finished_dir / job).unlink(missing_ok=True)
(cfg.log_working_dir / job).unlink()
(cfg.log_finished_dir / job).unlink(missing_ok=True)

if not skip:
print(f' └── Process "{job}" for chunk "{cfg.job_id}"')
Expand Down Expand Up @@ -269,19 +267,19 @@ def run_chunk(cfg, force, resume):
job, cfg.job_id)
logging.exception(subject)
if cfg.user_mail:
message = tools.prepare_message(log_working_dir /
message = tools.prepare_message(cfg.log_working_dir /
job)
logging.info('Sending log file to %s' %
cfg.user_mail)
tools.send_mail(cfg.user_mail, subject, message)
if try_count == 0:
raise RuntimeError(subject)

if exitcode != 0 or not (log_finished_dir / job).exists():
if exitcode != 0 or not (cfg.log_finished_dir / job).exists():
subject = "ERROR or TIMEOUT in job '%s' for chain '%s'" % (
job, cfg.job_id)
if cfg.user_mail:
message = tools.prepare_message(log_working_dir / job)
message = tools.prepare_message(cfg.log_working_dir / job)
logging.info('Sending log file to %s' % cfg.user_mail)
tools.send_mail(cfg.user_mail, subject, message)
raise RuntimeError(subject)
Expand Down

0 comments on commit 6dd552a

Please sign in to comment.