Skip to content

Commit

Permalink
feat: codelists add command (#294)
Browse files Browse the repository at this point in the history
* Refactor in advance of adding `add()``

A lot of the logic of `update` is needed by `add`:
* codelist file parsing (indirectly)
* file downloading
* manifest handling

This commit pulls out those needed parts into
their own functions, and pulls out
parse_codelist_file's line-parsing logic into its
own function.

This allows us to check the line we're about to
add to the codelists file is valid before doing
so, check that once we add the line to the
codelists file that it is valid, leave
pre-existing codelists and their corresponding
manifest file entries intact.

* `codelists add` command

Allow users to add OpenCodelists codelists to the
codelists.txt file by URL, allowing for common
variations from the "base" URL
  • Loading branch information
Jongmassey authored Jan 29, 2025
1 parent c62aef9 commit c52e797
Show file tree
Hide file tree
Showing 2 changed files with 245 additions and 70 deletions.
225 changes: 155 additions & 70 deletions opensafely/codelists.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ def show_help(**kwargs):
title="available commands", description="", metavar="COMMAND"
)

parser_add = subparsers.add_parser(
"add", help="Add an OpenCodelists codelist to your project"
)
parser_add.add_argument(
"codelist_url", help="URL of the codelist to be added to your project"
)
parser_add.set_defaults(function=add)

parser_update = subparsers.add_parser(
"update",
help=(
Expand Down Expand Up @@ -63,39 +71,93 @@ def main():
pass


def add(codelist_url, codelists_dir=None):
if OPENCODELISTS_BASE_URL not in codelist_url:
exit_with_error(f"Unable to parse URL, {OPENCODELISTS_BASE_URL} not found")

if not codelists_dir:
codelists_dir = get_codelists_dir()

codelists_file = get_codelist_file(codelists_dir)

codelist_handle = codelist_url.replace(f"{OPENCODELISTS_BASE_URL}/codelist/", "")

# trim any anchors from the end
if "#" in codelist_handle:
codelist_handle = codelist_handle[: codelist_handle.find("#")]

# turn a download url into a base codelist version url
if codelist_handle.endswith(".csv"):
codelist_handle = codelist_handle[: codelist_handle.rfind("/")]

# parse the line to make sure it's valid before adding to file
codelist = parse_codelist_file_line(codelist_handle)

with codelists_file.open("r+") as f:
lines = f.readlines()
if lines and not lines[-1].endswith("\n"):
codelist_handle = "\n" + codelist_handle
f.write(codelist_handle + "\n")

# parse the codelists file to make sure it's valid
parse_codelist_file(codelists_dir)

# download to the codelists dir
codelist.filename = codelists_dir / codelist.filename
downloaded_on, sha = fetch_codelist(codelist)

write_manifest(codelists_dir, [(codelist, downloaded_on, sha)], True)


def update(codelists_dir=None):
if not codelists_dir:
codelists_dir = Path.cwd() / CODELISTS_DIR
codelists = parse_codelist_file(codelists_dir)
old_files = set(codelists_dir.glob("*.csv"))
new_files = set()
manifest = {"files": {}}
downloaded_codelists = []
for codelist in codelists:
print(f"Fetching {codelist.id}")
try:
response = requests.get(codelist.download_url)
response.raise_for_status()
except Exception as e:
exit_with_error(
f"Error downloading codelist: {e}\n\n"
f"Check that you can access the codelist at:\n{codelist.url}"
)
codelist.filename.write_bytes(response.content)
downloaded_at, sha = fetch_codelist(codelist)
downloaded_codelists.append((codelist, downloaded_at, sha))
new_files.add(codelist.filename)
write_manifest(codelists_dir, downloaded_codelists, False)
for file in old_files - new_files:
print(f"Deleting {file.name}")
file.unlink()
return True


def write_manifest(codelists_dir, downloaded_codelists, append):
manifest = {"files": {}}
for codelist, downloaded_at, sha in downloaded_codelists:
key = str(codelist.filename.relative_to(codelists_dir))
manifest["files"][key] = {
"id": codelist.id,
"url": codelist.url,
"downloaded_at": f"{datetime.datetime.utcnow()}Z",
"sha": hash_bytes(response.content),
"downloaded_at": f"{downloaded_at}Z",
"sha": sha,
}
manifest_file = codelists_dir / MANIFEST_FILE
preserve_download_dates(manifest, manifest_file)
if append:
old_manifest = get_manifest(codelists_dir)
manifest["files"].update(old_manifest["files"])
else:
preserve_download_dates(manifest, manifest_file)
manifest_file.write_text(json.dumps(manifest, indent=2))
for file in old_files - new_files:
print(f"Deleting {file.name}")
file.unlink()
return True


def fetch_codelist(codelist):
try:
response = requests.get(codelist.download_url)
response.raise_for_status()
except Exception as e:
exit_with_error(
f"Error downloading codelist: {e}\n\n"
f"Check that you can access the codelist at:\n{codelist.url}"
)
codelist.filename.write_bytes(response.content)
return (datetime.datetime.utcnow(), hash_bytes(response.content))


def get_codelists_dir(codelists_dir=None):
Expand All @@ -106,6 +168,15 @@ def get_codelists_dir(codelists_dir=None):
return codelists_dir


def get_codelist_file(codelists_dir):
if not codelists_dir.exists() or not codelists_dir.is_dir():
exit_with_error(f"No '{CODELISTS_DIR}' folder found")
codelists_file = codelists_dir / CODELISTS_FILE
if not codelists_file.exists():
exit_with_error(f"No file found at '{CODELISTS_DIR}/{CODELISTS_FILE}'")
return codelists_file


def check_upstream(codelists_dir=None):
"""
Check currently downloaded codelists against current OpenCodelists data.
Expand Down Expand Up @@ -167,30 +238,7 @@ def check():
return True

codelists = parse_codelist_file(codelists_dir)
manifest_file = codelists_dir / MANIFEST_FILE
if not manifest_file.exists():
# This is here so that switching to use this test in Github Actions
# doesn't cause existing repos which previously passed to start
# failing. It works by creating a temporary manifest file and then
# checking against that. Functionally, this is the same as the old test
# which would check against the OpenCodelists website every time.
if os.environ.get("GITHUB_WORKFLOW"):
print(
"==> WARNING\n"
" Using temporary workaround for Github Actions tests.\n"
" You should run: opensafely codelists update\n"
)
manifest = make_temporary_manifest(codelists_dir)
else:
exit_with_prompt(f"No file found at '{CODELISTS_DIR}/{MANIFEST_FILE}'.")
else:
try:
manifest = json.loads(manifest_file.read_text())
except json.decoder.JSONDecodeError:
exit_with_prompt(
f"'{CODELISTS_DIR}/{MANIFEST_FILE}' is invalid.\n"
"Note that this file is automatically generated and should not be manually edited.\n"
)
manifest = get_manifest(codelists_dir)
all_ids = {codelist.id for codelist in codelists}
ids_in_manifest = {f["id"] for f in manifest["files"].values()}
if all_ids != ids_in_manifest:
Expand Down Expand Up @@ -229,6 +277,35 @@ def check():
return True


def get_manifest(codelists_dir):
manifest_file = codelists_dir / MANIFEST_FILE
if not manifest_file.exists():
# This is here so that switching to use this test in Github Actions
# doesn't cause existing repos which previously passed to start
# failing. It works by creating a temporary manifest file and then
# checking against that. Functionally, this is the same as the old test
# which would check against the OpenCodelists website every time.
if os.environ.get("GITHUB_WORKFLOW"):
print(
"==> WARNING\n"
" Using temporary workaround for Github Actions tests.\n"
" You should run: opensafely codelists update\n"
)
manifest = make_temporary_manifest(codelists_dir)
else:
exit_with_prompt(f"No file found at '{CODELISTS_DIR}/{MANIFEST_FILE}'.")
else:
try:
manifest = json.loads(manifest_file.read_text())
except json.decoder.JSONDecodeError:
exit_with_prompt(
f"'{CODELISTS_DIR}/{MANIFEST_FILE}' is invalid.\n"
"Note that this file is automatically generated and should not be manually edited.\n"
)

return manifest


def make_temporary_manifest(codelists_dir):
with tempfile.TemporaryDirectory() as tmpdir:
tmpdir = Path(tmpdir)
Expand All @@ -242,50 +319,58 @@ def make_temporary_manifest(codelists_dir):
@dataclasses.dataclass
class Codelist:
id: str # noqa: A003
codelist: str
version: str
url: str
download_url: str
filename: Path


def parse_codelist_file_line(line):
line = line.strip().rstrip("/")
if not line or line.startswith("#"):
return
tokens = line.split("/")
if len(tokens) not in [3, 4]:
exit_with_error(
f"{line} does not match [project]/[codelist]/[version] "
"or user/[username]/[codelist]/[version]"
)
line_without_version = "/".join(tokens[:-1])
line_version = tokens[-1]

url = f"{OPENCODELISTS_BASE_URL}/codelist/{line}/"
filename = "-".join(tokens[:-1]) + ".csv"
return Codelist(
id=line,
codelist=line_without_version,
version=line_version,
url=url,
download_url=f"{url}download.csv",
filename=Path(filename),
)


def parse_codelist_file(codelists_dir):
if not codelists_dir.exists() or not codelists_dir.is_dir():
exit_with_error(f"No '{CODELISTS_DIR}' folder found")
codelists_file = codelists_dir / CODELISTS_FILE
if not codelists_file.exists():
exit_with_error(f"No file found at '{CODELISTS_DIR}/{CODELISTS_FILE}'")
codelists_file = get_codelist_file(codelists_dir)
codelists = []
codelist_versions = {}
for line in codelists_file.read_text().splitlines():
line = line.strip().rstrip("/")
if not line or line.startswith("#"):
codelist = parse_codelist_file_line(line)
if not codelist:
continue
tokens = line.split("/")
if len(tokens) not in [3, 4]:
exit_with_error(
f"{line} does not match [project]/[codelist]/[version] "
"or user/[username]/[codelist]/[version]"
)
line_without_version = "/".join(tokens[:-1])
existing_version = codelist_versions.get(line_without_version)
line_version = tokens[-1]
if existing_version == line_version:
codelist.filename = codelists_dir / codelist.filename
existing_version = codelist_versions.get(codelist.codelist)

if existing_version == codelist.version:
exit_with_error(f"{line} is a duplicate of a previous line")
if existing_version is not None:
exit_with_error(
f"{line} conflicts with a different version of the same codelist: {existing_version}"
)
codelist_versions[line_without_version] = line_version

url = f"{OPENCODELISTS_BASE_URL}/codelist/{line}/"
filename = "-".join(tokens[:-1]) + ".csv"
codelists.append(
Codelist(
id=line,
url=url,
download_url=f"{url}download.csv",
filename=codelists_dir / filename,
)
)
codelist_versions[codelist.codelist] = codelist.version
codelists.append(codelist)

return codelists


Expand Down
90 changes: 90 additions & 0 deletions tests/test_codelists.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,3 +260,93 @@ def test_codelists_check_with_upstream_changes_in_CI(
os.chdir(codelists_path)
# check doesn't fail in CI if there are upstream errors only
assert codelists.check()


def test_codelists_add(codelists_path, requests_mock):
codelists_path /= "codelists"
codelists_file = codelists_path / "codelists.txt"
prior_codelists = codelists_file.read_text()
for codelist in prior_codelists.splitlines():
requests_mock.get(
f"https://www.opencodelists.org/codelist/{codelist.rstrip('/')}/download.csv",
text="foo",
)
requests_mock.get(
"https://www.opencodelists.org/"
"codelist/project123/codelist456/version1/download.csv",
text="foo",
)

codelists.add(
"https://www.opencodelists.org/codelist/project123/codelist456/version1",
codelists_path,
)

assert (
codelists_file.read_text()
== prior_codelists + "project123/codelist456/version1\n"
)
assert (codelists_path / "project123-codelist456.csv").read_text() == "foo"


def test_codelists_add_with_anchor_url(codelists_path, requests_mock):
codelists_path /= "codelists"
codelists_file = codelists_path / "codelists.txt"
prior_codelists = codelists_file.read_text()
for codelist in prior_codelists.split("\n"):
if codelist:
requests_mock.get(
f"https://www.opencodelists.org/codelist/{codelist.rstrip('/')}/download.csv",
text="foo",
)
requests_mock.get(
"https://www.opencodelists.org/"
"codelist/project123/codelist456/version1/download.csv",
text="foo",
)

codelists.add(
"https://www.opencodelists.org/codelist/project123/codelist456/version1/#full-list",
codelists_path,
)

assert (
codelists_file.read_text()
== prior_codelists + "project123/codelist456/version1/\n"
)
assert (codelists_path / "project123-codelist456.csv").read_text() == "foo"


def test_codelists_add_with_download_url(codelists_path, requests_mock):
codelists_path /= "codelists"
codelists_file = codelists_path / "codelists.txt"
prior_codelists = codelists_file.read_text()
for codelist in prior_codelists.split("\n"):
if codelist:
requests_mock.get(
f"https://www.opencodelists.org/codelist/{codelist.rstrip('/')}/download.csv",
text="foo",
)
requests_mock.get(
"https://www.opencodelists.org/"
"codelist/project123/codelist456/version1/download.csv",
text="foo",
)

codelists.add(
"https://www.opencodelists.org/codelist/project123/codelist456/version1/download.csv",
codelists_path,
)

assert (
codelists_file.read_text()
== prior_codelists + "project123/codelist456/version1\n"
)
assert (codelists_path / "project123-codelist456.csv").read_text() == "foo"


def test_codelists_add_with_invalid_url(codelists_path):
codelists_path /= "codelists"

with pytest.raises(SystemExit):
codelists.add("https://example.com/codelists/test/")

0 comments on commit c52e797

Please sign in to comment.