diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 9890e81..787cdfe 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -38,6 +38,7 @@ jobs: env: AIRFLOW_VAR_DIMENSIONS_API_USER: ${{ secrets.AIRFLOW_VAR_DIMENSIONS_API_USER }} AIRFLOW_VAR_DIMENSIONS_API_PASS: ${{ secrets.AIRFLOW_VAR_DIMENSIONS_API_PASS }} + AIRFLOW_VAR_WOS_KEY: ${{ secrets.AIRFLOW_VAR_WOS_KEY }} - name: Upload coverage reports to Codecov uses: codecov/codecov-action@v5 diff --git a/compose.yaml b/compose.yaml index a472fe7..54793d2 100644 --- a/compose.yaml +++ b/compose.yaml @@ -82,6 +82,7 @@ x-airflow-common: AIRFLOW_VAR_MAIS_SECRET: ${AIRFLOW_VAR_MAIS_SECRET} AIRFLOW_VAR_SUL_PUB_HOST: ${AIRFLOW_VAR_SUL_PUB_HOST} AIRFLOW_VAR_SUL_PUB_KEY: ${AIRFLOW_VAR_SUL_PUB_KEY} + AIRFLOW_VAR_WOS_KEY: ${AIRFLOW_VAR_WOS_KEY} AIRFLOW_VAR_DEV_LIMIT: ${AIRFLOW_VAR_DEV_LIMIT} AIRFLOW_VAR_DATA_DIR: /opt/airflow/data AIRFLOW_VAR_PUBLISH_DIR: /opt/airflow/data/latest diff --git a/pyproject.toml b/pyproject.toml index e671151..27e1575 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,7 +28,8 @@ dependencies = [ [tool.pytest.ini_options] pythonpath = ["."] markers = "mais_tests: Tests requiring MAIS access" -addopts = "-v --cov --cov-report=html --cov-report=term" +addopts = "-v --cov --cov-report=html --cov-report=term --log-level INFO --log-file test.log" + [tool.coverage.run] omit = ["test/*"] diff --git a/rialto_airflow/dags/harvest.py b/rialto_airflow/dags/harvest.py index 57fe0d9..5cfe33b 100644 --- a/rialto_airflow/dags/harvest.py +++ b/rialto_airflow/dags/harvest.py @@ -6,7 +6,7 @@ from airflow.decorators import dag, task from airflow.models import Variable -from rialto_airflow.harvest import authors, dimensions, merge_pubs, openalex +from rialto_airflow.harvest import authors, dimensions, merge_pubs, openalex, wos from rialto_airflow.harvest.doi_sunet import create_doi_sunet_pickle from rialto_airflow.harvest import sul_pub from rialto_airflow.harvest.contribs import create_contribs @@ -74,6 +74,15 @@ def openalex_harvest(snapshot): return jsonl_file + @task() + def wos_harvest(snapshot): + """ + Fetch the data by ORCID from OpenAlex. + """ + jsonl_file = wos.harvest(snapshot, limit=dev_limit) + + return jsonl_file + @task() def sul_pub_harvest(snapshot): """ @@ -150,6 +159,9 @@ def publish(pubs_to_contribs, merge_publications): openalex_jsonl = openalex_harvest(snapshot) + # TODO: use the return value here to hook into the workflow + wos_harvest(snapshot) + doi_sunet = create_doi_sunet( dimensions_dois, openalex_jsonl, diff --git a/rialto_airflow/harvest/wos.py b/rialto_airflow/harvest/wos.py new file mode 100644 index 0000000..6d9f914 --- /dev/null +++ b/rialto_airflow/harvest/wos.py @@ -0,0 +1,191 @@ +import json +import logging +import os +import re +from pathlib import Path + +import requests +from typing import Generator, Optional, Dict, Union +from sqlalchemy.dialects.postgresql import insert + +from rialto_airflow.database import ( + Author, + Publication, + get_session, + pub_author_association, +) +from rialto_airflow.snapshot import Snapshot +from rialto_airflow.utils import normalize_doi + +Params = Dict[str, Union[int, str]] + + +def harvest(snapshot: Snapshot, limit=None) -> Path: + """ + Walk through all the Author ORCIDs and generate publications for them. + """ + jsonl_file = snapshot.path / "wos.jsonl" + count = 0 + stop = False + + with jsonl_file.open("w") as jsonl_output: + with get_session(snapshot.database_name).begin() as select_session: + # get all authors that have an ORCID + # TODO: should we just pull the relevant bits back into memory since + # that's what's going on with our client-side buffering connection + # and there aren't that many of them? + for author in ( + select_session.query(Author).where(Author.orcid.is_not(None)).all() # type: ignore + ): + if stop is True: + logging.info(f"Reached limit of {limit} publications stopping") + break + + for wos_pub in orcid_publications(author.orcid): + count += 1 + if limit is not None and count > limit: + stop = True + break + + doi = get_doi(wos_pub) + + with get_session(snapshot.database_name).begin() as insert_session: + # if there's a DOI constraint violation we need to update instead of insert + pub_id = insert_session.execute( + insert(Publication) + .values( + doi=doi, + wos_json=wos_pub, + ) + .on_conflict_do_update( + constraint="publication_doi_key", + set_=dict(wos_json=wos_pub), + ) + .returning(Publication.id) + ).scalar_one() + + # a constraint violation is ok here, since it means we + # already know that the publication is by the author + insert_session.execute( + insert(pub_author_association) + .values(publication_id=pub_id, author_id=author.id) + .on_conflict_do_nothing() + ) + + jsonl_output.write(json.dumps(wos_pub) + "\n") + + return jsonl_file + + +def orcid_publications(orcid) -> Generator[dict, None, None]: + """ + A generator that returns publications associated with a given ORCID. + """ + + # For API details see: https://api.clarivate.com/swagger-ui/ + + # WoS doesn't recognize ORCID URIs which are stored in User table + if m := re.match(r"^https?://orcid.org/(.+)$", orcid): + orcid = m.group(1) + + wos_key = os.environ.get("AIRFLOW_VAR_WOS_KEY") + base_url = "https://wos-api.clarivate.com/api/wos" + headers = {"Accept": "application/json", "X-ApiKey": wos_key} + + # the number of records to get in each request (100 is max) + batch_size = 100 + + params: Params = { + "databaseId": "WOK", + "usrQuery": f"AI=({orcid})", + "count": batch_size, + "firstRecord": 1, + } + + http = requests.Session() + + # get the initial set of results, which also gives us a Query ID to fetch + # subsequent pages of results if there are any + + logging.info(f"fetching {base_url} with {params}") + resp: requests.Response = http.get(base_url, params=params, headers=headers) + + if not check_status(resp): + return + + results = get_json(resp) + if results is None: + return + + if results["QueryResult"]["RecordsFound"] == 0: + logging.info(f"No results found for ORCID {orcid}") + return + + yield from results["Data"]["Records"]["records"]["REC"] + + # get subsequent results using the Query ID + + query_id = results["QueryResult"]["QueryID"] + records_found = results["QueryResult"]["RecordsFound"] + first_record = batch_size + 1 # since the initial set included 100 + + # if there aren't any more results to fetch this loop will never be entered + + logging.info(f"{records_found} records found") + while first_record < records_found: + page_params: Params = {"firstRecord": first_record, "count": batch_size} + logging.info(f"fetching {base_url}/query/{query_id} with {page_params}") + + resp = http.get( + f"{base_url}/query/{query_id}", params=page_params, headers=headers + ) + + if not check_status(resp): + return + + records = get_json(resp) + if records is None: + break + + yield from records["Records"]["records"]["REC"] + + # move the offset along in the results list + first_record += batch_size + + +def get_json(resp: requests.Response) -> Optional[dict]: + try: + return resp.json() + except requests.exceptions.JSONDecodeError as e: + # see https://github.com/sul-dlss/rialto-airflow/issues/207 for why + if resp.text == "": + logging.error( + f"got empty string instead of JSON when looking up {resp.url}" + ) + return None + else: + logging.error(f"uhoh, instead of JSON we got: {resp.text}") + raise e + + +def check_status(resp): + # see https://github.com/sul-dlss/rialto-airflow/issues/208 + if ( + resp.status_code == 500 + and resp.headers.get("Content-Type") == "application/json" + and "Customization error" in resp.json().get("message", "") + ): + logging.error(f"got a 500 Customization Error when looking up {resp.url}") + return False + else: + resp.raise_for_status() + return True + + +def get_doi(pub) -> Optional[str]: + ids = pub.get("cluster_related", {}).get("identifiers", {}).get("identifier", []) + for id in ids: + if id["type"] == "doi": + return normalize_doi(id["value"]) + + return None diff --git a/test/harvest/test_wos.py b/test/harvest/test_wos.py new file mode 100644 index 0000000..0295323 --- /dev/null +++ b/test/harvest/test_wos.py @@ -0,0 +1,200 @@ +import logging +import os +import re +from dataclasses import dataclass + +import dotenv +import pytest +import requests + +from rialto_airflow.database import Publication +from rialto_airflow.harvest import wos +from rialto_airflow.snapshot import Snapshot +from test.utils import num_jsonl_objects + +dotenv.load_dotenv() + + +wos_key = os.environ.get("AIRFLOW_VAR_WOS_KEY") + + +@pytest.fixture +def mock_wos(monkeypatch): + """ + Mock our function for fetching publications by orcid from Web of Science. + """ + + def f(*args, **kwargs): + yield { + "static_data": { + "summary": { + "titles": { + "title": [{"type": "source", "content": "An example title"}] + } + } + }, + "cluster_related": { + "identifiers": { + "identifier": [ + {"type": "issn", "value": "2211-9124"}, + { + "type": "doi", + "value": "https://doi.org/10.1515/9781503624153", + }, + ] + } + }, + } + + monkeypatch.setattr(wos, "orcid_publications", f) + + +@pytest.fixture +def mock_many_wos(monkeypatch): + """ + A fixture for that returns 1000 fake documents from Web of Science. + """ + + def f(*args, **kwargs): + for n in range(1, 1000): + yield {"UID": f"mock:{n}"} + + monkeypatch.setattr(wos, "orcid_publications", f) + + +@pytest.fixture +def existing_publication(test_session): + with test_session.begin() as session: + pub = Publication( + doi="10.1515/9781503624153", + sulpub_json={"sulpub": "data"}, + wos_json={"wos": "data"}, + ) + session.add(pub) + return pub + + +@pytest.mark.skipif(wos_key is None, reason="no Web of Science key") +def test_orcid_publications_with_paging(): + """ + This is a live test of WoS API to ensure paging works properly. + """ + + # https://www-webofscience-com.stanford.idm.oclc.org/wos/alldb/advanced-search + # The ORCID that is tested should return more than 200 results to exercise paging + orcid = "https://orcid.org/0000-0002-0673-5257" + + uids = set() + for pub in wos.orcid_publications(orcid): + assert pub + assert pub["UID"] not in uids, "haven't seen publication before" + uids.add(pub["UID"]) + + assert len(uids) > 200, "found more than 200 publications" + + +@pytest.mark.skipif(wos_key is None, reason="no Web of Science key") +def test_orcid_publications_with_bad_orcid(): + """ + This is a live test of the WoS API to ensure that a search for an invalid ORCID yields no results. + """ + assert ( + len(list(wos.orcid_publications("https://orcid.org/0000-0003-0784-7987-XXX"))) + == 0 + ) + + +def test_harvest(tmp_path, test_session, mock_authors, mock_wos): + """ + With some authors loaded and a mocked WoS API make sure that a + publication is matched up to the authors using the ORCID. + """ + # harvest from Web of Science + snapshot = Snapshot(path=tmp_path, database_name="rialto_test") + wos.harvest(snapshot) + + # the mocked Web of Science api returns the same publication for both authors + assert num_jsonl_objects(snapshot.path / "wos.jsonl") == 2 + + # make sure a publication is in the database and linked to the author + with test_session.begin() as session: + assert session.query(Publication).count() == 1, "one publication loaded" + + pub = session.query(Publication).first() + assert pub.doi == "10.1515/9781503624153", "doi was normalized" + + assert len(pub.authors) == 2, "publication has two authors" + assert pub.authors[0].orcid == "https://orcid.org/0000-0000-0000-0001" + assert pub.authors[1].orcid == "https://orcid.org/0000-0000-0000-0002" + + +def test_harvest_when_doi_exists( + tmp_path, test_session, existing_publication, mock_authors, mock_wos +): + """ + When a publication and its authors already exist in the database make sure that the wos_json is updated. + """ + # harvest from web of science + snapshot = Snapshot(path=tmp_path, database_name="rialto_test") + wos.harvest(snapshot) + + # jsonl file is there and has two lines (one for each author) + assert num_jsonl_objects(snapshot.path / "wos.jsonl") == 2 + + # ensure that the existing publication for the DOI was updated + with test_session.begin() as session: + assert session.query(Publication).count() == 1, "one publication loaded" + pub = session.query(Publication).first() + + assert pub.wos_json + assert pub.sulpub_json == {"sulpub": "data"}, "sulpub data the same" + assert pub.pubmed_json is None + + assert len(pub.authors) == 2, "publication has two authors" + assert pub.authors[0].orcid == "https://orcid.org/0000-0000-0000-0001" + assert pub.authors[1].orcid == "https://orcid.org/0000-0000-0000-0002" + + +def test_log_message(tmp_path, mock_authors, mock_many_wos, caplog): + caplog.set_level(logging.INFO) + snapshot = Snapshot(tmp_path, "rialto_test") + wos.harvest(snapshot, limit=50) + assert "Reached limit of 50 publications stopping" in caplog.text + + +@dataclass +class MockResponse: + status_code: int = 500 + content: str = "" + + +def test_customization_error( + test_session, tmp_path, caplog, mock_authors, requests_mock +): + """ + A 500 error from WoS with a specific JSON error payload should be skipped over. + """ + requests_mock.get( + re.compile(".*"), + json={"message": "Customization error"}, + status_code=500, + headers={"Content-Type": "application/json"}, + ) + + snapshot = Snapshot(tmp_path, "rialto_test") + wos.harvest(snapshot, limit=50) + assert test_session().query(Publication).count() == 0, "no publications loaded" + assert "got a 500 Customization Error" in caplog.text + + +def test_empty_payload(test_session, tmp_path, caplog, mock_authors, requests_mock): + """ + A 200 OK from WoS with an empty JSON payload should be skipped over. + """ + requests_mock.get(re.compile(".*"), text="", status_code=200) + + snapshot = Snapshot(tmp_path, "rialto_test") + wos.harvest(snapshot, limit=50) + + assert test_session().query(Publication).count() == 0, "no publications loaded" + assert "got empty string instead of JSON" in caplog.text