Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add job_limits to config; limit the number jobs in the job queue #1010

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions beeflow/common/config_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,8 @@ def filepath_completion_input(*pargs, **kwargs):
# Task manager
VALIDATOR.section('task_manager',
info='Task manager configuration and config of container to use.')
VALIDATOR.option('task_manager', 'jobs_limit', default='', prompt=True,
info='The number of jobs that can be in the job queue.')
VALIDATOR.option('task_manager', 'container_runtime', default='Charliecloud',
choices=('Charliecloud', 'Singularity'), prompt=False,
info='container runtime to use for configuration')
Expand Down
16 changes: 13 additions & 3 deletions beeflow/task_manager/background.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,29 @@
"""
import traceback
import jsonpickle
from beeflow.common.config_driver import BeeConfig as bc
from beeflow.task_manager import utils
from beeflow.common import log as bee_logging
from beeflow.common.build.utils import ContainerBuildError
from beeflow.common.build_interfaces import build_main
from beeflow.common.worker import WorkerError


JOBS_MAX = 1000
log = bee_logging.setup(__name__)
jobs_limit = bc.get('task_manager', 'jobs_limit')
if not jobs_limit:
jobs_limit = JOBS_MAX
else:
try:
jobs_limit = int(jobs_limit)
except ValueError:
log.info(f'Value for jobs_limit in bee.conf not an integer, setting it to {JOBS_MAX}')
jobs_limit = JOBS_MAX
log.info(f'The number of jobs queued will be limited to {jobs_limit}.')

# States are based on https://slurm.schedmd.com/squeue.html#SECTION_JOB-STATE-CODES
COMPLETED_STATES = {'UNKNOWN', 'COMPLETED', 'CANCELLED', 'FAILED', 'TIMEOUT', 'TIMELIMIT'}


def resolve_environment(task):
"""Use build interface to create a valid environment.

Expand Down Expand Up @@ -57,7 +67,7 @@ def submit_task(db, worker, task):
def submit_jobs(db):
"""Submit all jobs currently in submit queue to the workload scheduler."""
worker = utils.worker_interface()
while db.submit_queue.count() >= 1:
while db.submit_queue.count() >= 1 and db.job_queue.count() < jobs_limit:
# Single value dictionary
task = db.submit_queue.pop()
job_state = submit_task(db, worker, task)
Expand Down
8 changes: 6 additions & 2 deletions beeflow/wf_manager/resources/wf_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,12 @@ def submit_tasks_tm(wf_id, tasks, allocation):
except requests.exceptions.ConnectionError:
log.error('Unable to connect to task manager to submit tasks.')
return

if resp.status_code != 200:
# Change state of any tasks sent to the submit queue
if resp.status_code == 200:
for task in tasks:
log.info(f"change state of {task.name} to SUBMIT")
wfi.set_task_state(task, 'SUBMIT')
else:
log.info(f"Submit task to TM returned bad status: {resp.status_code}")


Expand Down
Loading