Skip to content

Commit

Permalink
Merge pull request #61 from opensafely-core/update-jobrunner
Browse files Browse the repository at this point in the history
Update to latest jobrunner
  • Loading branch information
rebkwok authored Aug 4, 2021
2 parents ed193d2 + 398a4fb commit 076df7d
Show file tree
Hide file tree
Showing 10 changed files with 332 additions and 177 deletions.
8 changes: 7 additions & 1 deletion opensafely/_vendor/jobrunner/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class ConfigException(Exception):
else:
USING_DUMMY_DATA_BACKEND = BACKEND == "expectations"

ALLOWED_IMAGES = {"cohortextractor", "stata-mp", "r", "jupyter", "python"}
ALLOWED_IMAGES = {"cohortextractor", "cohortextractor-v2", "stata-mp", "r", "jupyter", "python"}

DOCKER_REGISTRY = "ghcr.io/opensafely-core"

Expand Down Expand Up @@ -93,6 +93,12 @@ class ConfigException(Exception):

MAX_WORKERS = int(os.environ.get("MAX_WORKERS") or max(cpu_count() - 1, 1))

# This is a crude mechanism for preventing a single large JobRequest with lots
# of associated Jobs from hogging all the resources. We want this configurable
# because it's useful to be able to disable this during tests and when running
# locally
RANDOMISE_JOB_ORDER = True

# See `local_run.py` for more detail
LOCAL_RUN_MODE = False

Expand Down
61 changes: 11 additions & 50 deletions opensafely/_vendor/jobrunner/create_or_update_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import logging
from pathlib import Path
import re
from urllib.parse import urlparse
import time

from . import config
Expand All @@ -23,10 +22,14 @@
from .git import (
read_file_from_repo,
get_sha_from_remote_ref,
commit_reachable_from_ref,
GitError,
GitFileNotFoundError,
)
from .github_validators import (
validate_branch_and_commit,
validate_repo_url,
GithubValidationError,
)
from .project import (
parse_and_validate_project_file,
get_action_specification,
Expand Down Expand Up @@ -64,7 +67,12 @@ def create_or_update_jobs(job_request):
log.info(f"Handling new JobRequest:\n{job_request}")
new_job_count = create_jobs(job_request)
log.info(f"Created {new_job_count} new jobs")
except (GitError, ProjectValidationError, JobRequestError) as e:
except (
GitError,
GithubValidationError,
ProjectValidationError,
JobRequestError,
) as e:
log.info(f"JobRequest failed:\n{e}")
create_failed_job(job_request, e)
except Exception:
Expand Down Expand Up @@ -263,53 +271,6 @@ def validate_job_request(job_request):
)


def validate_repo_url(repo_url, allowed_gitub_orgs):
parsed_url = urlparse(repo_url)
if parsed_url.scheme != "https" or parsed_url.netloc != "github.com":
raise JobRequestError("Repository URLs must start https://github.com")
path = parsed_url.path.strip("/").split("/")
if not path or path[0] not in allowed_gitub_orgs:
raise JobRequestError(
f"Repositories must belong to one of the following Github "
f"organisations: {' '.join(allowed_gitub_orgs)}"
)
expected_url = f"https://github.com/{'/'.join(path[:2])}"
if repo_url.rstrip("/") != expected_url or len(path) != 2:
raise JobRequestError(
"Repository URL was not of the expected format: "
"https://github.com/[organisation]/[project-name]"
)


def validate_branch_and_commit(repo_url, commit, branch):
"""
Due to the way Github works, anyone who can open a pull request against a
repository can make a commit appear to be "in" that repository, even if
they do not have write access to it.
For example, someone created this PR against the Linux kernel:
https://github.com/torvalds/linux/pull/437
And even though this will never be merged, it still appears as a commit in
that repo:
https://github.com/torvalds/linux/commit/2793ae1df012c7c3f13ea5c0f0adb99017999c3b
If we are enforcing that only code from certain organisations can be run
then we need to check that any commits supplied have been made by someone
with write access to the repository, which means we need to check they
belong to a branch or tag in the repository.
"""
if not branch:
raise JobRequestError("A branch name must be supplied")
# A further wrinkle is that each PR gets an associated ref within the repo
# of the form `pull/PR_NUMBER/head`. So we enforce that the branch name
# must be a "plain vanilla" branch name with no slashes.
if "/" in branch:
raise JobRequestError(f"Branch name must not contain slashes: {branch}")
if not commit_reachable_from_ref(repo_url, commit, branch):
raise JobRequestError(f"Could not find commit on branch '{branch}': {commit}")


def create_failed_job(job_request, exception):
"""
Sometimes we want to say to the job-server (and the user): your JobRequest
Expand Down
56 changes: 56 additions & 0 deletions opensafely/_vendor/jobrunner/github_validators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from .git import commit_reachable_from_ref

from urllib.parse import urlparse


class GithubValidationError(Exception):
pass


def validate_repo_url(repo_url, allowed_gitub_orgs):
parsed_url = urlparse(repo_url)
if parsed_url.scheme != "https" or parsed_url.netloc != "github.com":
raise GithubValidationError("Repository URLs must start https://github.com")
path = parsed_url.path.strip("/").split("/")
if not path or path[0] not in allowed_gitub_orgs:
raise GithubValidationError(
f"Repositories must belong to one of the following Github "
f"organisations: {' '.join(allowed_gitub_orgs)}"
)
expected_url = f"https://github.com/{'/'.join(path[:2])}"
if repo_url.rstrip("/") != expected_url or len(path) != 2:
raise GithubValidationError(
"Repository URL was not of the expected format: "
"https://github.com/[organisation]/[project-name]"
)


def validate_branch_and_commit(repo_url, commit, branch):
"""
Due to the way Github works, anyone who can open a pull request against a
repository can make a commit appear to be "in" that repository, even if
they do not have write access to it.
For example, someone created this PR against the Linux kernel:
https://github.com/torvalds/linux/pull/437
And even though this will never be merged, it still appears as a commit in
that repo:
https://github.com/torvalds/linux/commit/2793ae1df012c7c3f13ea5c0f0adb99017999c3b
If we are enforcing that only code from certain organisations can be run
then we need to check that any commits supplied have been made by someone
with write access to the repository, which means we need to check they
belong to a branch or tag in the repository.
"""
if not branch:
raise GithubValidationError("A branch name must be supplied")
# A further wrinkle is that each PR gets an associated ref within the repo
# of the form `pull/PR_NUMBER/head`. So we enforce that the branch name
# must be a "plain vanilla" branch name with no slashes.
if "/" in branch:
raise GithubValidationError(f"Branch name must not contain slashes: {branch}")
if not commit_reachable_from_ref(repo_url, commit, branch):
raise GithubValidationError(
f"Could not find commit on branch '{branch}': {commit}"
)
26 changes: 19 additions & 7 deletions opensafely/_vendor/jobrunner/local_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import tempfile
import textwrap

from .run import main as run_main, JobError
from .run import main as run_main
from . import config
from . import docker
from .database import find_where
Expand Down Expand Up @@ -196,6 +196,8 @@ def create_and_run_jobs(
# Configure
docker.LABEL = docker_label
config.LOCAL_RUN_MODE = True
# It's more helpful in this context to have things consistent
config.RANDOMISE_JOB_ORDER = False
config.HIGH_PRIVACY_WORKSPACES_DIR = project_dir.parent
# Append a random value so that multiple runs in the same process will each
# get their own unique in-memory database. This is only really relevant
Expand Down Expand Up @@ -298,14 +300,14 @@ def create_and_run_jobs(
# Github Actions
if format_output_for_github:
print(f"::group::Job Runner Logs {ANSI.Grey}(click to view){ANSI.Reset}")

# Run everything
exit_condition = (
no_jobs_remaining if continue_on_error else job_failed_or_none_remaining
)
try:
run_main(
exit_when_done=True,
shuffle_jobs=False,
raise_on_failure=not continue_on_error,
)
except (JobError, KeyboardInterrupt):
run_main(exit_callback=exit_condition)
except KeyboardInterrupt:
pass
finally:
if format_output_for_github:
Expand Down Expand Up @@ -376,6 +378,16 @@ def create_and_run_jobs(
return success_flag


def no_jobs_remaining(active_jobs):
return len(active_jobs) == 0


def job_failed_or_none_remaining(active_jobs):
if any(job.state == State.FAILED for job in active_jobs):
return True
return len(active_jobs) == 0


# Copied from test/conftest.py to avoid a more complex dependency tree
def delete_docker_entities(entity, label, ignore_errors=False):
ls_args = [
Expand Down
130 changes: 79 additions & 51 deletions opensafely/_vendor/jobrunner/log_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
out to external processes.
"""
import contextlib
import datetime
import logging
import logging.handlers
import os
import subprocess
import sys
Expand All @@ -18,46 +18,48 @@
DEFAULT_FORMAT = "{asctime} {message} {tags}"


def formatting_filter(record):
"""Ensure various record attribute are always available for formatting."""

# ensure this are always available for static formatting
record.action = ""

tags = {}
ctx = set_log_context.current_context
job = getattr(record, "job", None) or ctx.get("job")
req = getattr(record, "job_request", None) or ctx.get("job_request")

status_code = getattr(record, "status_code", None)
if status_code:
tags["status"] = record.status_code

if job:
# preserve short action for local run formatting
record.action = job.action + ": "
if "status" not in tags and job.status_code:
tags["status"] = job.status_code
tags["project"] = job.project
tags["action"] = job.action
tags["id"] = job.id

if req:
tags["req"] = req.id

record.tags = " ".join(f"{k}={v}" for k, v in tags.items())

return True


def configure_logging(fmt=DEFAULT_FORMAT, stream=None, status_codes_to_ignore=None):
formatter = JobRunnerFormatter(fmt, style="{")
handler = logging.StreamHandler(stream=stream)
handler.setFormatter(formatter)
if status_codes_to_ignore:
handler.addFilter(IgnoreStatusCodes(status_codes_to_ignore))
handler.addFilter(formatting_filter)
logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO"), handlers=[handler])

log_level = os.environ.get("LOGLEVEL", "INFO")
handlers = [handler]

# Support a separate log file at level DEBUG, while leaving the default
# logs untouched. DEBUG logging can be extremely noisy and so we want a way
# to capture these that doesn't pollute the primary logs.
debug_log_file = os.environ.get("DEBUG_LOG_FILE")
if debug_log_file:
debug_handler = logging.handlers.TimedRotatingFileHandler(
debug_log_file,
encoding="utf-8",
delay=True,
# Rotate daily, keeping 14 days of backups
when="D",
interval=1,
backupCount=14,
utc=True,
)
debug_handler.setFormatter(formatter)
debug_handler.addFilter(formatting_filter)
debug_handler.setLevel("DEBUG")
handlers.append(debug_handler)
# Set the default handler to the originally specified log level and
# then increase the base log level to DEBUG
handler.setLevel(log_level)
log_level = "DEBUG"

logging.basicConfig(level=log_level, handlers=handlers)

if debug_log_file:
logging.getLogger(__name__).info(f"Writing DEBUG logs to '{debug_log_file}'")

# We attach a custom handler for uncaught exceptions to display error
# output from failed subprocesses
sys.excepthook = show_subprocess_stderr


Expand All @@ -82,19 +84,35 @@ def formatException(self, exc_info):
return message


def show_subprocess_stderr(typ, value, traceback):
"""
This applies the same CalledProcessError formatting as in `formatException`
above but to uncaught exceptions
"""
sys.__excepthook__(typ, value, traceback)
if isinstance(value, subprocess.CalledProcessError):
stderr = value.stderr
if stderr:
if isinstance(stderr, bytes):
stderr = stderr.decode("utf-8", "ignore")
print("\nstderr:\n", file=sys.stderr)
print(stderr, file=sys.stderr)
def formatting_filter(record):
"""Ensure various record attribute are always available for formatting."""

ctx = set_log_context.current_context
job = getattr(record, "job", None) or ctx.get("job")
req = getattr(record, "job_request", None) or ctx.get("job_request")

status_code = getattr(record, "status_code", None)
if job and not status_code:
status_code = job.status_code

tags = {}

if status_code:
tags["status"] = status_code
if job:
tags["project"] = job.project
tags["action"] = job.action
tags["id"] = job.id
if req:
tags["req"] = req.id

record.tags = " ".join(f"{k}={v}" for k, v in tags.items())

# The `action` attribute is only used by format string in "local_run" mode
# but we make sure it's always available
record.action = f"{job.action}: " if job else ""

return True


class IgnoreStatusCodes:
Expand Down Expand Up @@ -143,10 +161,20 @@ def __call__(self, **kwargs):
finally:
self.current_context = self.context_stack.pop()

def filter(self, record):
if hasattr(record, "status_code"):
return record.status_code not in self.status_codes_to_ignore
return True

def show_subprocess_stderr(typ, value, traceback):
"""
This applies the same CalledProcessError formatting as in `JobRunnerFormatter`
above but to uncaught exceptions
"""
sys.__excepthook__(typ, value, traceback)
if isinstance(value, subprocess.CalledProcessError):
stderr = value.stderr
if stderr:
if isinstance(stderr, bytes):
stderr = stderr.decode("utf-8", "ignore")
print("\nstderr:\n", file=sys.stderr)
print(stderr, file=sys.stderr)


set_log_context = SetLogContext()
Loading

0 comments on commit 076df7d

Please sign in to comment.