Skip to content

Commit

Permalink
Merge pull request #66 from opensafely-core/cohortextractor-v2-update
Browse files Browse the repository at this point in the history
Update job-runner version
  • Loading branch information
rebkwok authored Sep 30, 2021
2 parents bd1c02b + 07f7826 commit a3af70c
Show file tree
Hide file tree
Showing 14 changed files with 750 additions and 294 deletions.
78 changes: 52 additions & 26 deletions opensafely/_vendor/jobrunner/cli/local_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
match any of the output patterns in the project. We then copy in just the
explicit dependencies of the action.
This is achieved by setting a LOCAL_RUN_MODE flag in the config which, in two
key places, tells the code not to talk to git but do something else instead.
The job creation logic is also slighty different here (compare the
`create_job_request_and_jobs` function below with the `create_jobs` function in
`jobrunner.create_or_update_jobs`) as things like validating the repo URL don't
apply locally.
Other than that, everything else runs entirely as it would in production. A
temporary database and log directory is created for each run and then thrown
Expand All @@ -39,7 +41,11 @@
JobRequestError,
NothingToDoError,
ProjectValidationError,
create_jobs,
assert_new_jobs_created,
get_latest_job_for_each_action,
get_new_jobs_to_run,
insert_into_database,
parse_and_validate_project_file,
)
from opensafely._vendor.jobrunner.lib import docker
from opensafely._vendor.jobrunner.lib.database import find_where
Expand All @@ -48,6 +54,10 @@
from opensafely._vendor.jobrunner.lib.subprocess_utils import subprocess_run
from opensafely._vendor.jobrunner.manage_jobs import METADATA_DIR
from opensafely._vendor.jobrunner.models import Job, JobRequest, State, StatusCode
from opensafely._vendor.jobrunner.reusable_actions import (
ReusableActionError,
resolve_reusable_action_references,
)
from opensafely._vendor.jobrunner.run import main as run_main

# First paragraph of docstring
Expand Down Expand Up @@ -185,15 +195,15 @@ def create_and_run_jobs(
log_format=LOCAL_RUN_FORMAT,
format_output_for_github=False,
):
# Configure
# Fiddle with the configuration to suit what we need for running local jobs
docker.LABEL = docker_label
config.LOCAL_RUN_MODE = True
# It's more helpful in this context to have things consistent
config.RANDOMISE_JOB_ORDER = False
config.HIGH_PRIVACY_WORKSPACES_DIR = project_dir.parent
# Append a random value so that multiple runs in the same process will each
# get their own unique in-memory database. This is only really relevant
# during testing.
# during testing as we never do mutliple runs in the same process
# otherwise.
config.DATABASE_FILE = f":memory:{random.randrange(sys.maxsize)}"
config.TMP_DIR = temp_dir
config.JOB_LOG_DIR = temp_dir / "logs"
Expand All @@ -217,29 +227,15 @@ def create_and_run_jobs(
config.MEDIUM_PRIVACY_STORAGE_BASE = None
config.MEDIUM_PRIVACY_WORKSPACES_DIR = None

# Create job_request and jobs
job_request = JobRequest(
id="local",
repo_url=str(project_dir),
commit=None,
requested_actions=actions,
cancelled_actions=[],
workspace=project_dir.name,
database_name="dummy",
force_run_dependencies=force_run_dependencies,
# The default behaviour of refusing to run if a dependency has failed
# makes for an awkward workflow when iterating in development
force_run_failed=True,
branch="",
original={"created_by": getpass.getuser()},
)
try:
create_jobs(job_request)
job_request, jobs = create_job_request_and_jobs(
project_dir, actions, force_run_dependencies
)
except NothingToDoError:
print("=> All actions already completed successfully")
print(" Use -f option to force everything to re-run")
return True
except (ProjectValidationError, JobRequestError) as e:
except (ProjectValidationError, ReusableActionError, JobRequestError) as e:
print(f"=> {type(e).__name__}")
print(textwrap.indent(str(e), " "))
if hasattr(e, "valid_actions"):
Expand All @@ -251,8 +247,6 @@ def create_and_run_jobs(
print(f" {action} (runs all actions in project)")
return False

jobs = find_where(Job)

docker_images = get_docker_images(jobs)

uses_stata = any(
Expand Down Expand Up @@ -388,6 +382,38 @@ def create_and_run_jobs(
return success_flag


def create_job_request_and_jobs(project_dir, actions, force_run_dependencies):
job_request = JobRequest(
id="local",
repo_url=str(project_dir),
commit=None,
requested_actions=actions,
cancelled_actions=[],
workspace=project_dir.name,
database_name="dummy",
force_run_dependencies=force_run_dependencies,
# The default behaviour of refusing to run if a dependency has failed
# makes for an awkward workflow when iterating in development
force_run_failed=True,
branch="",
original={"created_by": getpass.getuser()},
)
project_file_path = project_dir / "project.yaml"
if not project_file_path.exists():
raise ProjectValidationError(f"No project.yaml file found in {project_dir}")
# NOTE: Similar but non-identical logic is implemented for running jobs in
# production in `jobrunner.create_or_update_jobs.create_jobs`. If you make
# changes below then consider what, if any, the appropriate corresponding
# changes might be for production jobs.
project = parse_and_validate_project_file(project_file_path.read_bytes())
current_jobs = get_latest_job_for_each_action(job_request.workspace)
new_jobs = get_new_jobs_to_run(job_request, project, current_jobs)
assert_new_jobs_created(new_jobs, current_jobs)
resolve_reusable_action_references(new_jobs)
insert_into_database(job_request, new_jobs)
return job_request, new_jobs


def no_jobs_remaining(active_jobs):
return len(active_jobs) == 0

Expand Down
7 changes: 4 additions & 3 deletions opensafely/_vendor/jobrunner/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,6 @@ class ConfigException(Exception):
# locally
RANDOMISE_JOB_ORDER = True

# See `local_run.py` for more detail
LOCAL_RUN_MODE = False

# Automatically delete containers and volumes after they have been used
CLEAN_UP_DOCKER_OBJECTS = True

Expand Down Expand Up @@ -172,3 +169,7 @@ def parse_job_resource_weights(config_file):
STATS_DATABASE_FILE = Path(STATS_DATABASE_FILE)

STATS_POLL_INTERVAL = float(os.environ.get("STATS_POLL_INTERVAL", "10"))


# feature flag to enable new API abstraction
EXECUTION_API = os.environ.get("EXECUTION_API", "false").lower() == "true"
47 changes: 22 additions & 25 deletions opensafely/_vendor/jobrunner/create_or_update_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import logging
import re
import time
from pathlib import Path

from opensafely._vendor.jobrunner import config
from opensafely._vendor.jobrunner.lib.database import (
Expand All @@ -34,6 +33,10 @@
get_all_actions,
parse_and_validate_project_file,
)
from opensafely._vendor.jobrunner.reusable_actions import (
ReusableActionError,
resolve_reusable_action_references,
)

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -72,6 +75,7 @@ def create_or_update_jobs(job_request):
GitError,
GithubValidationError,
ProjectValidationError,
ReusableActionError,
JobRequestError,
) as e:
log.info(f"JobRequest failed:\n{e}")
Expand All @@ -90,12 +94,17 @@ def create_or_update_jobs(job_request):


def create_jobs(job_request):
# NOTE: Similar but non-identical logic is implemented for running jobs
# locally in `jobrunner.cli.local_run.create_job_request_and_jobs`. If you
# make changes below then consider what the appropriate corresponding
# changes are for locally run jobs.
validate_job_request(job_request)
project_file = get_project_file(job_request)
project = parse_and_validate_project_file(project_file)
current_jobs = get_latest_job_for_each_action(job_request.workspace)
new_jobs = get_new_jobs_to_run(job_request, project, current_jobs)
assert_new_jobs_created(new_jobs, current_jobs)
resolve_reusable_action_references(new_jobs)
# There is a delay between getting the current jobs (which we fetch from
# the database and the disk) and inserting our new jobs below. This means
# the state of the world may have changed in the meantime. Why is this OK?
Expand All @@ -115,22 +124,16 @@ def create_jobs(job_request):


def validate_job_request(job_request):
if config.ALLOWED_GITHUB_ORGS and not config.LOCAL_RUN_MODE:
if config.ALLOWED_GITHUB_ORGS:
validate_repo_url(job_request.repo_url, config.ALLOWED_GITHUB_ORGS)
if not job_request.workspace:
raise JobRequestError("Workspace name cannot be blank")
if not job_request.requested_actions:
raise JobRequestError("At least one action must be supplied")
# In local run mode the workspace name is whatever the user's working
# directory happens to be called, which we don't want or need to place any
# restrictions on. Otherwise, as these are externally supplied strings that
# end up as paths, we want to be much more restrictive.
if not config.LOCAL_RUN_MODE:
if re.search(r"[^a-zA-Z0-9_\-]", job_request.workspace):
raise JobRequestError(
"Invalid workspace name (allowed are alphanumeric, dash and underscore)"
)

if not job_request.workspace:
raise JobRequestError("Workspace name cannot be blank")
if re.search(r"[^a-zA-Z0-9_\-]", job_request.workspace):
raise JobRequestError(
"Invalid workspace name (allowed are alphanumeric, dash and underscore)"
)
if not config.USING_DUMMY_DATA_BACKEND:
database_name = job_request.database_name
valid_names = config.DATABASE_URLS.keys()
Expand All @@ -148,7 +151,7 @@ def validate_job_request(job_request):
)
# If we're not restricting to specific Github organisations then there's no
# point in checking the provenance of the supplied commit
if config.ALLOWED_GITHUB_ORGS and not config.LOCAL_RUN_MODE:
if config.ALLOWED_GITHUB_ORGS:
# As this involves talking to the remote git server we only do it at
# the end once all other checks have passed
validate_branch_and_commit(
Expand All @@ -158,15 +161,11 @@ def validate_job_request(job_request):

def get_project_file(job_request):
try:
if not config.LOCAL_RUN_MODE:
project_file = read_file_from_repo(
job_request.repo_url, job_request.commit, "project.yaml"
)
else:
project_file = (Path(job_request.repo_url) / "project.yaml").read_bytes()
except (GitFileNotFoundError, FileNotFoundError):
return read_file_from_repo(
job_request.repo_url, job_request.commit, "project.yaml"
)
except GitFileNotFoundError:
raise JobRequestError(f"No project.yaml file found in {job_request.repo_url}")
return project_file


def get_latest_job_for_each_action(workspace):
Expand Down Expand Up @@ -271,8 +270,6 @@ def recursively_build_jobs(jobs_by_action, job_request, project, action):
workspace=job_request.workspace,
database_name=job_request.database_name,
action=action,
action_repo_url=action_spec.repo_url,
action_commit=action_spec.commit,
wait_for_job_ids=wait_for_job_ids,
requires_outputs_from=action_spec.needs,
run_command=action_spec.run,
Expand Down
Loading

0 comments on commit a3af70c

Please sign in to comment.