diff --git a/beeflow/common/config_driver.py b/beeflow/common/config_driver.py index 554bcfd1..18239e41 100644 --- a/beeflow/common/config_driver.py +++ b/beeflow/common/config_driver.py @@ -281,6 +281,8 @@ def filepath_completion_input(*pargs, **kwargs): # Task manager VALIDATOR.section('task_manager', info='Task manager configuration and config of container to use.') +VALIDATOR.option('task_manager', 'jobs_limit', default='', prompt=True, + info='The number of jobs that can be in the job queue.') VALIDATOR.option('task_manager', 'container_runtime', default='Charliecloud', choices=('Charliecloud', 'Singularity'), prompt=False, info='container runtime to use for configuration') diff --git a/beeflow/task_manager/background.py b/beeflow/task_manager/background.py index 58da8003..3e59e41e 100644 --- a/beeflow/task_manager/background.py +++ b/beeflow/task_manager/background.py @@ -5,19 +5,29 @@ """ import traceback import jsonpickle +from beeflow.common.config_driver import BeeConfig as bc from beeflow.task_manager import utils from beeflow.common import log as bee_logging from beeflow.common.build.utils import ContainerBuildError from beeflow.common.build_interfaces import build_main from beeflow.common.worker import WorkerError - +JOBS_MAX = 1000 log = bee_logging.setup(__name__) +jobs_limit = bc.get('task_manager', 'jobs_limit') +if not jobs_limit: + jobs_limit = JOBS_MAX +else: + try: + jobs_limit = int(jobs_limit) + except ValueError: + log.info(f'Value for jobs_limit in bee.conf not an integer, setting it to {JOBS_MAX}') + jobs_limit = JOBS_MAX +log.info(f'The number of jobs queued will be limited to {jobs_limit}.') # States are based on https://slurm.schedmd.com/squeue.html#SECTION_JOB-STATE-CODES COMPLETED_STATES = {'UNKNOWN', 'COMPLETED', 'CANCELLED', 'FAILED', 'TIMEOUT', 'TIMELIMIT'} - def resolve_environment(task): """Use build interface to create a valid environment. @@ -57,7 +67,7 @@ def submit_task(db, worker, task): def submit_jobs(db): """Submit all jobs currently in submit queue to the workload scheduler.""" worker = utils.worker_interface() - while db.submit_queue.count() >= 1: + while db.submit_queue.count() >= 1 and db.job_queue.count() < jobs_limit: # Single value dictionary task = db.submit_queue.pop() job_state = submit_task(db, worker, task) diff --git a/beeflow/wf_manager/resources/wf_utils.py b/beeflow/wf_manager/resources/wf_utils.py index ced30eb8..b540c939 100644 --- a/beeflow/wf_manager/resources/wf_utils.py +++ b/beeflow/wf_manager/resources/wf_utils.py @@ -209,8 +209,12 @@ def submit_tasks_tm(wf_id, tasks, allocation): except requests.exceptions.ConnectionError: log.error('Unable to connect to task manager to submit tasks.') return - - if resp.status_code != 200: + # Change state of any tasks sent to the submit queue + if resp.status_code == 200: + for task in tasks: + log.info(f"change state of {task.name} to SUBMIT") + wfi.set_task_state(task, 'SUBMIT') + else: log.info(f"Submit task to TM returned bad status: {resp.status_code}")