From 1d73fc9cc1c42b4b735c4344fecc09420fac4065 Mon Sep 17 00:00:00 2001 From: Patricia Grubel Date: Tue, 11 Feb 2025 17:01:39 -0700 Subject: [PATCH 1/3] Add job_limits to config and allow only that number of jobs to be submitted to platform scheduler --- beeflow/common/config_driver.py | 2 ++ beeflow/task_manager/background.py | 5 +++-- beeflow/wf_manager/resources/wf_utils.py | 8 ++++++-- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/beeflow/common/config_driver.py b/beeflow/common/config_driver.py index 554bcfd15..fdd0254a8 100644 --- a/beeflow/common/config_driver.py +++ b/beeflow/common/config_driver.py @@ -281,6 +281,8 @@ def filepath_completion_input(*pargs, **kwargs): # Task manager VALIDATOR.section('task_manager', info='Task manager configuration and config of container to use.') +VALIDATOR.option('task_manager', 'jobs_limit', default=2, prompt=True, + info='The number of jobs that can be in the job queue.') VALIDATOR.option('task_manager', 'container_runtime', default='Charliecloud', choices=('Charliecloud', 'Singularity'), prompt=False, info='container runtime to use for configuration') diff --git a/beeflow/task_manager/background.py b/beeflow/task_manager/background.py index 58da80036..2500302e9 100644 --- a/beeflow/task_manager/background.py +++ b/beeflow/task_manager/background.py @@ -5,6 +5,7 @@ """ import traceback import jsonpickle +from beeflow.common.config_driver import BeeConfig as bc from beeflow.task_manager import utils from beeflow.common import log as bee_logging from beeflow.common.build.utils import ContainerBuildError @@ -13,11 +14,11 @@ log = bee_logging.setup(__name__) +jobs_limit = int(bc.get('task_manager', 'jobs_limit')) # States are based on https://slurm.schedmd.com/squeue.html#SECTION_JOB-STATE-CODES COMPLETED_STATES = {'UNKNOWN', 'COMPLETED', 'CANCELLED', 'FAILED', 'TIMEOUT', 'TIMELIMIT'} - def resolve_environment(task): """Use build interface to create a valid environment. @@ -57,7 +58,7 @@ def submit_task(db, worker, task): def submit_jobs(db): """Submit all jobs currently in submit queue to the workload scheduler.""" worker = utils.worker_interface() - while db.submit_queue.count() >= 1: + while db.submit_queue.count() >= 1 and db.job_queue.count() < jobs_limit: # Single value dictionary task = db.submit_queue.pop() job_state = submit_task(db, worker, task) diff --git a/beeflow/wf_manager/resources/wf_utils.py b/beeflow/wf_manager/resources/wf_utils.py index ced30eb87..b540c9396 100644 --- a/beeflow/wf_manager/resources/wf_utils.py +++ b/beeflow/wf_manager/resources/wf_utils.py @@ -209,8 +209,12 @@ def submit_tasks_tm(wf_id, tasks, allocation): except requests.exceptions.ConnectionError: log.error('Unable to connect to task manager to submit tasks.') return - - if resp.status_code != 200: + # Change state of any tasks sent to the submit queue + if resp.status_code == 200: + for task in tasks: + log.info(f"change state of {task.name} to SUBMIT") + wfi.set_task_state(task, 'SUBMIT') + else: log.info(f"Submit task to TM returned bad status: {resp.status_code}") From 49743c3d7cda2641ffa1d1cb8d56369a0d7536da Mon Sep 17 00:00:00 2001 From: Patricia Grubel Date: Wed, 12 Feb 2025 09:24:41 -0700 Subject: [PATCH 2/3] Fixed default jobs_limit --- beeflow/common/config_driver.py | 2 +- beeflow/task_manager/background.py | 13 +++++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/beeflow/common/config_driver.py b/beeflow/common/config_driver.py index fdd0254a8..18239e41d 100644 --- a/beeflow/common/config_driver.py +++ b/beeflow/common/config_driver.py @@ -281,7 +281,7 @@ def filepath_completion_input(*pargs, **kwargs): # Task manager VALIDATOR.section('task_manager', info='Task manager configuration and config of container to use.') -VALIDATOR.option('task_manager', 'jobs_limit', default=2, prompt=True, +VALIDATOR.option('task_manager', 'jobs_limit', default='', prompt=True, info='The number of jobs that can be in the job queue.') VALIDATOR.option('task_manager', 'container_runtime', default='Charliecloud', choices=('Charliecloud', 'Singularity'), prompt=False, diff --git a/beeflow/task_manager/background.py b/beeflow/task_manager/background.py index 2500302e9..49627a817 100644 --- a/beeflow/task_manager/background.py +++ b/beeflow/task_manager/background.py @@ -12,9 +12,18 @@ from beeflow.common.build_interfaces import build_main from beeflow.common.worker import WorkerError - +JOBS_MAX = 1000 log = bee_logging.setup(__name__) -jobs_limit = int(bc.get('task_manager', 'jobs_limit')) +jobs_limit = bc.get('task_manager', 'jobs_limit') +if not jobs_limit: + jobs_limit = JOBS_MAX +else: + try: + jobs_limit = int(bc.get('task_manager', 'jobs_limit')) + except ValueError: + log.info(f'Value for jobs_limit in bee.conf not an integer, setting it to {JOBS_MAX}') + jobs_limit = JOBS_MAX +log.info(f'The number of jobs queued will be limited to {jobs_limit}.') # States are based on https://slurm.schedmd.com/squeue.html#SECTION_JOB-STATE-CODES COMPLETED_STATES = {'UNKNOWN', 'COMPLETED', 'CANCELLED', 'FAILED', 'TIMEOUT', 'TIMELIMIT'} From b4b8c6530d1a1cdff29f4bec69fc982d2a309d68 Mon Sep 17 00:00:00 2001 From: Patricia Grubel Date: Wed, 12 Feb 2025 09:28:24 -0700 Subject: [PATCH 3/3] Minor fix --- beeflow/task_manager/background.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beeflow/task_manager/background.py b/beeflow/task_manager/background.py index 49627a817..3e59e41e0 100644 --- a/beeflow/task_manager/background.py +++ b/beeflow/task_manager/background.py @@ -19,7 +19,7 @@ jobs_limit = JOBS_MAX else: try: - jobs_limit = int(bc.get('task_manager', 'jobs_limit')) + jobs_limit = int(jobs_limit) except ValueError: log.info(f'Value for jobs_limit in bee.conf not an integer, setting it to {JOBS_MAX}') jobs_limit = JOBS_MAX