diff --git a/compose.yaml b/compose.yaml index a472fe7..54793d2 100644 --- a/compose.yaml +++ b/compose.yaml @@ -82,6 +82,7 @@ x-airflow-common: AIRFLOW_VAR_MAIS_SECRET: ${AIRFLOW_VAR_MAIS_SECRET} AIRFLOW_VAR_SUL_PUB_HOST: ${AIRFLOW_VAR_SUL_PUB_HOST} AIRFLOW_VAR_SUL_PUB_KEY: ${AIRFLOW_VAR_SUL_PUB_KEY} + AIRFLOW_VAR_WOS_KEY: ${AIRFLOW_VAR_WOS_KEY} AIRFLOW_VAR_DEV_LIMIT: ${AIRFLOW_VAR_DEV_LIMIT} AIRFLOW_VAR_DATA_DIR: /opt/airflow/data AIRFLOW_VAR_PUBLISH_DIR: /opt/airflow/data/latest diff --git a/rialto_airflow/dags/harvest.py b/rialto_airflow/dags/harvest.py index 57fe0d9..9aecca3 100644 --- a/rialto_airflow/dags/harvest.py +++ b/rialto_airflow/dags/harvest.py @@ -6,7 +6,7 @@ from airflow.decorators import dag, task from airflow.models import Variable -from rialto_airflow.harvest import authors, dimensions, merge_pubs, openalex +from rialto_airflow.harvest import authors, dimensions, merge_pubs, openalex, wos from rialto_airflow.harvest.doi_sunet import create_doi_sunet_pickle from rialto_airflow.harvest import sul_pub from rialto_airflow.harvest.contribs import create_contribs @@ -74,6 +74,15 @@ def openalex_harvest(snapshot): return jsonl_file + @task() + def wos_harvest(snapshot): + """ + Fetch the data by ORCID from OpenAlex. + """ + jsonl_file = wos.harvest(snapshot, limit=dev_limit) + + return jsonl_file + @task() def sul_pub_harvest(snapshot): """ @@ -150,6 +159,8 @@ def publish(pubs_to_contribs, merge_publications): openalex_jsonl = openalex_harvest(snapshot) + wos_harvest(snapshot) + doi_sunet = create_doi_sunet( dimensions_dois, openalex_jsonl, diff --git a/rialto_airflow/harvest/wos.py b/rialto_airflow/harvest/wos.py new file mode 100644 index 0000000..a16950f --- /dev/null +++ b/rialto_airflow/harvest/wos.py @@ -0,0 +1,135 @@ +import json +import logging +import os +import re +from pathlib import Path + +import requests +from pyalex import Authors, Works, config +from typing import Generator +from sqlalchemy.dialects.postgresql import insert + +from rialto_airflow.database import ( + Author, + Publication, + get_session, + pub_author_association, +) +from rialto_airflow.snapshot import Snapshot +from rialto_airflow.utils import normalize_doi + + +def harvest(snapshot: Snapshot, limit=None) -> Path: + """ + Walk through all the Author ORCIDs and generate publications for them. + """ + jsonl_file = snapshot.path / "wos.jsonl" + count = 0 + stop = False + + with jsonl_file.open("w") as jsonl_output: + with get_session(snapshot.database_name).begin() as select_session: + # get all authors that have an ORCID + # TODO: should we just pull the relevant bits back into memory since + # that's what's going on with our client-side buffering connection + # and there aren't that many of them? + for author in ( + select_session.query(Author).where(Author.orcid.is_not(None)).all() # type: ignore + ): + if stop is True: + logging.info(f"Reached limit of {limit} publications stopping") + break + + for wos_pub in orcid_publications(author.orcid): + count += 1 + if limit is not None and count > limit: + stop = True + break + + doi = get_doi(wos_pub) + + with get_session(snapshot.database_name).begin() as insert_session: + # if there's a DOI constraint violation we need to update instead of insert + pub_id = insert_session.execute( + insert(Publication) + .values( + doi=doi, + wos_json=wos_pub, + ) + .on_conflict_do_update( + constraint="publication_doi_key", + set_=dict(wos_json=wos_pub), + ) + .returning(Publication.id) + ).scalar_one() + + # a constraint violation is ok here, since it means we + # already know that the publication is by the author + insert_session.execute( + insert(pub_author_association) + .values(publication_id=pub_id, author_id=author.id) + .on_conflict_do_nothing() + ) + + jsonl_output.write(json.dumps(wos_pub) + "\n") + + return jsonl_file + + +def orcid_publications(orcid): + # For API details see: https://api.clarivate.com/swagger-ui/ + + # WoS doesn't recognize ORCID URIs + if m := re.match(r'^https?://orcid.org/(.+)$', orcid): + orcid = m.group(1) + + wos_key = os.environ.get("AIRFLOW_VAR_WOS_KEY") + base_url = "https://wos-api.clarivate.com/api/wos" + + headers = {"Accept": "application/json", "X-ApiKey": wos_key} + + params = { + "databaseId": "WOK", + "usrQuery": f"AI=({orcid})", + "count": 100, + "firstRecord": 1, + } + + http = requests.Session() + + # get initial set of results + logging.info(f"fetching {base_url} with {params}") + resp = http.get(base_url, params=params, headers=headers) + resp.raise_for_status() + results = resp.json() + + if results["QueryResult"]["RecordsFound"] == 0: + logging.info(f"No results found for ORCID {orcid}") + return + + yield from results["Data"]["Records"]["records"]["REC"] + + # get subsequent results using the query ID + + query_id = results["QueryResult"]["QueryID"] + records_found = results["QueryResult"]["RecordsFound"] + first_record = 101 # since the initial set included 100 + + while first_record < records_found: + params = {"firstRecord": first_record, "count": 100} + logging.info(f"fetching {base_url}/query/{query_id} with {params}") + + resp = http.get(f"{base_url}/query/{query_id}", params=params, headers=headers) + resp.raise_for_status() + + records = resp.json()["Records"]["records"]["REC"] + yield from records + + first_record += len(records) + + +def get_doi(pub): + ids = pub.get("cluster_related", {}).get("identifiers", {}).get("identifier", []) + for id in ids: + if id["type"] == "doi": + return normalize_doi(id["value"]) diff --git a/test/harvest/test_wos.py b/test/harvest/test_wos.py new file mode 100644 index 0000000..4e96d43 --- /dev/null +++ b/test/harvest/test_wos.py @@ -0,0 +1,156 @@ +import dotenv +import logging +import os +import pytest + +from rialto_airflow.harvest import wos +from rialto_airflow.snapshot import Snapshot +from rialto_airflow.database import Publication + +from test.utils import num_jsonl_objects + +dotenv.load_dotenv() + + +wos_key = os.environ.get("AIRFLOW_VAR_WOS_KEY") + + +@pytest.fixture +def mock_wos(monkeypatch): + """ + Mock our function for fetching publications by orcid from Web of Science. + """ + + def f(*args, **kwargs): + yield { + "static_data": { + "summary": { + "titles": { + "title": [{"type": "source", "content": "An example title"}] + } + } + }, + "cluster_related": { + "identifiers": { + "identifier": [ + {"type": "issn", "value": "2211-9124"}, + { + "type": "doi", + "value": "https://doi.org/10.1515/9781503624153", + }, + ] + } + }, + } + + monkeypatch.setattr(wos, "orcid_publications", f) + + +@pytest.fixture +def mock_many_wos(monkeypatch): + """ + A fixture for that returns 1000 fake documents from Web of Science. + """ + + def f(*args, **kwargs): + for n in range(1, 1000): + yield {"UID": f"mock:{n}"} + + monkeypatch.setattr(wos, "orcid_publications", f) + + +@pytest.fixture +def existing_publication(test_session): + with test_session.begin() as session: + pub = Publication( + doi="10.1515/9781503624153", + sulpub_json={"sulpub": "data"}, + wos_json={"wos": "data"}, + ) + session.add(pub) + return pub + + +@pytest.mark.skipif(wos_key is None, reason="no Web of Science key") +def test_orcid_publications_with_paging(): + """ + This is a live test of WoS API to ensure paging works properly. + """ + + # https://www-webofscience-com.stanford.idm.oclc.org/wos/alldb/advanced-search + # The ORCID that is tested should return more than 200 results to exercise paging + orcid = "https://orcid.org/0000-0002-0673-5257" + + uids = set() + for pub in wos.orcid_publications(orcid): + assert pub + assert pub["UID"] not in uids, "haven't seen publication before" + uids.add(pub["UID"]) + + assert len(uids) > 200, "found more than 200 publications" + + +@pytest.mark.skipif(wos_key is None, reason="no Web of Science key") +def test_orcid_publications_with_bad_orcid(): + """ + This is a live test of the WoS API to ensure that a search for an invalid ORCID yields no results. + """ + assert len(list(wos.orcid_publications("https://orcid.org/0000-0003-0784-7987-XXX"))) == 0 + + +def test_harvest(tmp_path, test_session, mock_authors, mock_wos): + """ + With some authors loaded and a mocked WoS API make sure that a + publication is matched up to the authors using the ORCID. + """ + # harvest from Web of Science + snapshot = Snapshot(path=tmp_path, database_name="rialto_test") + wos.harvest(snapshot) + + # the mocked Web of Science api returns the same publication for both authors + assert num_jsonl_objects(snapshot.path / "wos.jsonl") == 2 + + # make sure a publication is in the database and linked to the author + with test_session.begin() as session: + assert session.query(Publication).count() == 1, "one publication loaded" + + pub = session.query(Publication).first() + assert pub.doi == "10.1515/9781503624153", "doi was normalized" + + assert len(pub.authors) == 2, "publication has two authors" + assert pub.authors[0].orcid == "https://orcid.org/0000-0000-0000-0001" + assert pub.authors[1].orcid == "https://orcid.org/0000-0000-0000-0002" + + +def test_harvest_when_doi_exists( + tmp_path, test_session, existing_publication, mock_authors, mock_wos +): + """ + When a publication and its authors already exist in the database make sure that the wos_json is updated. + """ + # harvest from web of science + snapshot = Snapshot(path=tmp_path, database_name="rialto_test") + wos.harvest(snapshot) + + # jsonl file is there and has two lines (one for each author) + assert num_jsonl_objects(snapshot.path / "wos.jsonl") == 2 + + # ensure that the existing publication for the DOI was updated + with test_session.begin() as session: + assert session.query(Publication).count() == 1, "one publication loaded" + pub = session.query(Publication).first() + + assert pub.wos_json + assert pub.sulpub_json == {"sulpub": "data"}, "sulpub data the same" + assert pub.pubmed_json is None + + assert len(pub.authors) == 2, "publication has two authors" + assert pub.authors[0].orcid == "https://orcid.org/0000-0000-0000-0001" + assert pub.authors[1].orcid == "https://orcid.org/0000-0000-0000-0002" + + +def test_log_message(tmp_path, mock_authors, mock_many_wos, caplog): + caplog.set_level(logging.INFO) + snapshot = Snapshot(tmp_path, "rialto_test") + wos.harvest(snapshot, limit=50) + assert "Reached limit of 50 publications stopping" in caplog.text