Skip to content

Commit

Permalink
Merge pull request #68 from opensafely-core/jobrunner-2.36.0
Browse files Browse the repository at this point in the history
jobrunner 2.36.0
  • Loading branch information
bloodearnest authored Oct 21, 2021
2 parents a3af70c + 1cf085a commit c57d604
Show file tree
Hide file tree
Showing 21 changed files with 1,232 additions and 460 deletions.
15 changes: 1 addition & 14 deletions opensafely/_vendor/jobrunner/cli/add_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@
job-server
"""
import argparse
import base64
import dataclasses
import pprint
import secrets
import textwrap
from pathlib import Path
from urllib.parse import urlparse
Expand All @@ -15,7 +13,7 @@
from opensafely._vendor.jobrunner.lib.database import find_where
from opensafely._vendor.jobrunner.lib.git import get_sha_from_remote_ref
from opensafely._vendor.jobrunner.lib.log_utils import configure_logging
from opensafely._vendor.jobrunner.models import Job
from opensafely._vendor.jobrunner.models import Job, random_id
from opensafely._vendor.jobrunner.sync import job_request_from_remote_format


Expand Down Expand Up @@ -59,17 +57,6 @@ def display_obj(obj):
print()


def random_id():
"""
Return a random 16 character lowercase alphanumeric string
We used to use UUID4's but they are unnecessarily long for our purposes
(particularly the hex representation) and shorter IDs make debugging
and inspecting the job-runner a bit more ergonomic.
"""
return base64.b32encode(secrets.token_bytes(10)).decode("ascii").lower()


def run():
configure_logging()
parser = argparse.ArgumentParser(description=__doc__.partition("\n\n")[0])
Expand Down
72 changes: 57 additions & 15 deletions opensafely/_vendor/jobrunner/cli/local_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,26 @@
from datetime import datetime, timedelta
from pathlib import Path

from opensafely._vendor.jobrunner import config
from opensafely._vendor.jobrunner import config, manifest_to_database_migration
from opensafely._vendor.jobrunner.create_or_update_jobs import (
RUN_ALL_COMMAND,
JobRequestError,
NothingToDoError,
ProjectValidationError,
assert_new_jobs_created,
get_latest_job_for_each_action,
get_new_jobs_to_run,
insert_into_database,
parse_and_validate_project_file,
)
from opensafely._vendor.jobrunner.lib import docker
from opensafely._vendor.jobrunner.lib import database, docker
from opensafely._vendor.jobrunner.lib.database import find_where
from opensafely._vendor.jobrunner.lib.log_utils import configure_logging
from opensafely._vendor.jobrunner.lib.string_utils import tabulate
from opensafely._vendor.jobrunner.lib.subprocess_utils import subprocess_run
from opensafely._vendor.jobrunner.manage_jobs import METADATA_DIR
from opensafely._vendor.jobrunner.models import Job, JobRequest, State, StatusCode
from opensafely._vendor.jobrunner.models import Job, JobRequest, State, StatusCode, random_id
from opensafely._vendor.jobrunner.project import UnknownActionError, get_all_actions
from opensafely._vendor.jobrunner.queries import calculate_workspace_state
from opensafely._vendor.jobrunner.reusable_actions import (
ReusableActionError,
resolve_reusable_action_references,
Expand Down Expand Up @@ -200,11 +201,7 @@ def create_and_run_jobs(
# It's more helpful in this context to have things consistent
config.RANDOMISE_JOB_ORDER = False
config.HIGH_PRIVACY_WORKSPACES_DIR = project_dir.parent
# Append a random value so that multiple runs in the same process will each
# get their own unique in-memory database. This is only really relevant
# during testing as we never do mutliple runs in the same process
# otherwise.
config.DATABASE_FILE = f":memory:{random.randrange(sys.maxsize)}"
config.DATABASE_FILE = project_dir / "metadata" / "db.sqlite"
config.TMP_DIR = temp_dir
config.JOB_LOG_DIR = temp_dir / "logs"
config.BACKEND = "expectations"
Expand All @@ -227,6 +224,24 @@ def create_and_run_jobs(
config.MEDIUM_PRIVACY_STORAGE_BASE = None
config.MEDIUM_PRIVACY_WORKSPACES_DIR = None

# This is a temporary migration step to avoid unnecessarily re-running actions as we migrate away from the manifest.
manifest_to_database_migration.migrate_one(
project_dir, write_medium_privacy_manifest=False, batch_size=1000, log=False
)

# Any jobs that are running or pending must be left over from a previous run that was aborted either by an
# unexpected and unhandled exception or by the researcher abruptly terminating the process. We can't reasonably
# recover them (and the researcher may not want to -- maybe that's why they terminated), so we mark them as
# cancelled. This causes the rest of the system to effectively ignore them.
#
# We do this here at the beginning rather than trying to catch these cases when the process exits because the
# latter couldn't ever completely guarantee to catch every possible termination case correctly.
database.update_where(
Job,
{"cancelled": True, "state": State.FAILED},
state__in=[State.RUNNING, State.PENDING],
)

try:
job_request, jobs = create_job_request_and_jobs(
project_dir, actions, force_run_dependencies
Expand Down Expand Up @@ -317,7 +332,9 @@ def create_and_run_jobs(
if format_output_for_github:
print("::endgroup::")

final_jobs = find_where(Job, state__in=[State.FAILED, State.SUCCEEDED])
final_jobs = find_where(
Job, state__in=[State.FAILED, State.SUCCEEDED], job_request_id=job_request.id
)
# Always show failed jobs last, otherwise show in order run
final_jobs.sort(
key=lambda job: (
Expand Down Expand Up @@ -384,7 +401,7 @@ def create_and_run_jobs(

def create_job_request_and_jobs(project_dir, actions, force_run_dependencies):
job_request = JobRequest(
id="local",
id=random_id(),
repo_url=str(project_dir),
commit=None,
requested_actions=actions,
Expand All @@ -398,6 +415,7 @@ def create_job_request_and_jobs(project_dir, actions, force_run_dependencies):
branch="",
original={"created_by": getpass.getuser()},
)

project_file_path = project_dir / "project.yaml"
if not project_file_path.exists():
raise ProjectValidationError(f"No project.yaml file found in {project_dir}")
Expand All @@ -406,14 +424,38 @@ def create_job_request_and_jobs(project_dir, actions, force_run_dependencies):
# changes below then consider what, if any, the appropriate corresponding
# changes might be for production jobs.
project = parse_and_validate_project_file(project_file_path.read_bytes())
current_jobs = get_latest_job_for_each_action(job_request.workspace)
new_jobs = get_new_jobs_to_run(job_request, project, current_jobs)
assert_new_jobs_created(new_jobs, current_jobs)
latest_jobs = calculate_workspace_state(job_request.workspace)

# On the server out-of-band deletion of an existing output is considered an error, so we ignore that case when
# scheduling and allow jobs with missing dependencies to fail loudly when they are actually run. However for local
# running we should allow researchers to delete outputs on disk and automatically rerun the actions that create
# if they are needed. So here we check whether any files are missing for completed actions and, if so, treat them
# as though they had not been run -- this will automatically trigger a rerun.
latest_jobs_with_files_present = [
job for job in latest_jobs if all_output_files_present(project_dir, job)
]

try:
if not actions:
raise UnknownActionError("At least one action must be supplied")
new_jobs = get_new_jobs_to_run(
job_request, project, latest_jobs_with_files_present
)
except UnknownActionError as e:
# Annotate the exception with a list of valid action names so we can
# show them to the user
e.valid_actions = [RUN_ALL_COMMAND] + get_all_actions(project)
raise e
assert_new_jobs_created(new_jobs, latest_jobs_with_files_present)
resolve_reusable_action_references(new_jobs)
insert_into_database(job_request, new_jobs)
return job_request, new_jobs


def all_output_files_present(project_dir, job):
return all(project_dir.joinpath(f).exists() for f in job.output_files)


def no_jobs_remaining(active_jobs):
return len(active_jobs) == 0

Expand Down Expand Up @@ -504,7 +546,7 @@ def get_stata_license(repo=config.STATA_LICENSE_REPO):
def git_clone(repo_url, cwd):
cmd = ["git", "clone", "--depth=1", repo_url, "repo"]
# GIT_TERMINAL_PROMPT=0 means it will fail if it requires auth. This
# alows us to retry with an ssh url on linux/mac, as they would
# allows us to retry with an ssh url on linux/mac, as they would
# generally prompt given an https url.
result = subprocess_run(
cmd,
Expand Down
44 changes: 44 additions & 0 deletions opensafely/_vendor/jobrunner/cli/manifest_migration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""
Tool for migrating data from manifests to the database.
Usage:
python -m jobrunner.cli.manifest_migration --batch-size 50
Reads job records from manifest files in all workspaces and creates records in the database for any that are missing.
For jobs that already exist in the database, it doesn't make any attempt to check that they are consistent with the
record in the manifest.
The number of jobs that are created list limited by the --batch-size argument. If there are more jobs that could have
been migrated then a message is displayed when the tool exits. The operator is expected to run the tool once with a
small batch size to check that it's working correctly and then use a large batch size to complete the operation. The
tool will report when there are no further jobs to be migrated.
"""
import argparse
import sys

from opensafely._vendor.jobrunner import manifest_to_database_migration


def main(args=None):
args = args or []
parser = argparse.ArgumentParser(description=__doc__.partition("\n\n")[0])
parser.add_argument(
"--batch-size", type=int, default=1, help="maximum number of jobs to create"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="do a dry run migration without modifying data and carrying on if there is an error with a single job or workspace",
)
parsed = parser.parse_args(args)

manifest_to_database_migration.migrate_all(
batch_size=parsed.batch_size,
dry_run=parsed.dry_run,
ignore_errors=parsed.dry_run,
)


if __name__ == "__main__":
main(sys.argv[1:])
18 changes: 4 additions & 14 deletions opensafely/_vendor/jobrunner/cli/retry_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@
This only applies to (and only works for) jobs which failed with an
Internal Error during the "finalise" step where we copy output files,
dump logs, update the manifest etc. When this happens we don't
automatically clean up the job's container and volume giving us an
opportunity to fix the bug and try again without having to re-run the
entire job.
dump logs etc. When this happens we don't automatically clean up the
job's container and volume giving us an opportunity to fix the bug and
try again without having to re-run the entire job.
To do this we simply put the job back into the RUNNING state and let the
jobrunner pick it up again. We also need to update the job-server when we do
Expand All @@ -34,16 +33,7 @@ def main(partial_job_id):
job.updated_at = int(time.time())
print("\nUpdating job in database:")
print(job)
update(
job,
update_fields=[
"state",
"status_message",
"status_code",
"completed_at",
"updated_at",
],
)
update(job)
print("\nPOSTing update to job-server")
api_post("jobs", json=[job_to_remote_format(job)])
print("\nDone")
Expand Down
7 changes: 7 additions & 0 deletions opensafely/_vendor/jobrunner/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ class ConfigException(Exception):
pass


def _is_valid_backend_name(name):
return bool(re.match(r"^[A-Za-z0-9][A-Za-z0-9_\-]*[A-Za-z0-9]$", name))


default_work_dir = Path(__file__) / "../../workdir"

WORK_DIR = Path(os.environ.get("WORK_DIR", default_work_dir)).resolve()
Expand Down Expand Up @@ -43,6 +47,8 @@ class ConfigException(Exception):
JOB_LOOP_INTERVAL = float(os.environ.get("JOB_LOOP_INTERVAL", "1.0"))

BACKEND = os.environ.get("BACKEND", "expectations")
if not _is_valid_backend_name(BACKEND):
raise RuntimeError(f"BACKEND not in valid format: '{BACKEND}'")

truthy = ("true", "1", "yes")

Expand Down Expand Up @@ -173,3 +179,4 @@ def parse_job_resource_weights(config_file):

# feature flag to enable new API abstraction
EXECUTION_API = os.environ.get("EXECUTION_API", "false").lower() == "true"
EXECUTOR = os.environ.get("EXECUTOR", "opensafely._vendor.jobrunner.executors.local:LocalDockerAPI")
Loading

0 comments on commit c57d604

Please sign in to comment.