-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Closes #165
- Loading branch information
Showing
4 changed files
with
330 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,155 @@ | ||
import json | ||
import logging | ||
import os | ||
import re | ||
from pathlib import Path | ||
|
||
import requests | ||
from typing import Generator, Optional, Dict, Union | ||
from sqlalchemy.dialects.postgresql import insert | ||
|
||
from rialto_airflow.database import ( | ||
Author, | ||
Publication, | ||
get_session, | ||
pub_author_association, | ||
) | ||
from rialto_airflow.snapshot import Snapshot | ||
from rialto_airflow.utils import normalize_doi | ||
|
||
Params = Dict[str, Union[int, str]] | ||
|
||
|
||
def harvest(snapshot: Snapshot, limit=None) -> Path: | ||
""" | ||
Walk through all the Author ORCIDs and generate publications for them. | ||
""" | ||
jsonl_file = snapshot.path / "wos.jsonl" | ||
count = 0 | ||
stop = False | ||
|
||
with jsonl_file.open("w") as jsonl_output: | ||
with get_session(snapshot.database_name).begin() as select_session: | ||
# get all authors that have an ORCID | ||
# TODO: should we just pull the relevant bits back into memory since | ||
# that's what's going on with our client-side buffering connection | ||
# and there aren't that many of them? | ||
for author in ( | ||
select_session.query(Author).where(Author.orcid.is_not(None)).all() # type: ignore | ||
): | ||
if stop is True: | ||
logging.info(f"Reached limit of {limit} publications stopping") | ||
break | ||
|
||
for wos_pub in orcid_publications(author.orcid): | ||
count += 1 | ||
if limit is not None and count > limit: | ||
stop = True | ||
break | ||
|
||
doi = get_doi(wos_pub) | ||
|
||
with get_session(snapshot.database_name).begin() as insert_session: | ||
# if there's a DOI constraint violation we need to update instead of insert | ||
pub_id = insert_session.execute( | ||
insert(Publication) | ||
.values( | ||
doi=doi, | ||
wos_json=wos_pub, | ||
) | ||
.on_conflict_do_update( | ||
constraint="publication_doi_key", | ||
set_=dict(wos_json=wos_pub), | ||
) | ||
.returning(Publication.id) | ||
).scalar_one() | ||
|
||
# a constraint violation is ok here, since it means we | ||
# already know that the publication is by the author | ||
insert_session.execute( | ||
insert(pub_author_association) | ||
.values(publication_id=pub_id, author_id=author.id) | ||
.on_conflict_do_nothing() | ||
) | ||
|
||
jsonl_output.write(json.dumps(wos_pub) + "\n") | ||
|
||
return jsonl_file | ||
|
||
|
||
def orcid_publications(orcid) -> Generator[dict, None, None]: | ||
""" | ||
A generator that returns publications associated with a given ORCID. | ||
""" | ||
|
||
# For API details see: https://api.clarivate.com/swagger-ui/ | ||
|
||
# WoS doesn't recognize ORCID URIs which are stored in User table | ||
if m := re.match(r"^https?://orcid.org/(.+)$", orcid): | ||
orcid = m.group(1) | ||
|
||
wos_key = os.environ.get("AIRFLOW_VAR_WOS_KEY") | ||
base_url = "https://wos-api.clarivate.com/api/wos" | ||
|
||
headers = {"Accept": "application/json", "X-ApiKey": wos_key} | ||
|
||
params: Params = { | ||
"databaseId": "WOK", | ||
"usrQuery": f"AI=({orcid})", | ||
"count": 100, | ||
"firstRecord": 1, | ||
} | ||
|
||
http = requests.Session() | ||
|
||
# get the initial set of results, which also gives us a Query ID to fetch | ||
# subsequent pages of results if there are any | ||
|
||
logging.info(f"fetching {base_url} with {params}") | ||
resp: requests.Response = http.get(base_url, params=params, headers=headers) | ||
resp.raise_for_status() | ||
|
||
results = get_json(resp) | ||
if results["QueryResult"]["RecordsFound"] == 0: | ||
logging.info(f"No results found for ORCID {orcid}") | ||
return | ||
|
||
yield from results["Data"]["Records"]["records"]["REC"] | ||
|
||
# get subsequent results using the Query ID | ||
|
||
query_id = results["QueryResult"]["QueryID"] | ||
records_found = results["QueryResult"]["RecordsFound"] | ||
first_record = 101 # since the initial set included 100 | ||
|
||
# if there aren't any more results to fetch this loop will never be entered | ||
while first_record < records_found: | ||
page_params: Params = {"firstRecord": first_record, "count": 100} | ||
logging.info(f"fetching {base_url}/query/{query_id} with {params}") | ||
|
||
resp = http.get( | ||
f"{base_url}/query/{query_id}", params=page_params, headers=headers | ||
) | ||
resp.raise_for_status() | ||
|
||
records = get_json(resp)["Records"]["records"]["REC"] | ||
yield from records | ||
|
||
first_record += len(records) | ||
|
||
|
||
def get_json(resp: requests.Response) -> dict: | ||
try: | ||
return resp.json() | ||
except requests.exceptions.JSONDecodeError as e: | ||
logging.error(f"uhoh, instead of JSON we got: {resp.text}") | ||
raise e | ||
|
||
|
||
def get_doi(pub) -> Optional[str]: | ||
ids = pub.get("cluster_related", {}).get("identifiers", {}).get("identifier", []) | ||
for id in ids: | ||
if id["type"] == "doi": | ||
return normalize_doi(id["value"]) | ||
|
||
return None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
import dotenv | ||
import logging | ||
import os | ||
import pytest | ||
|
||
from rialto_airflow.harvest import wos | ||
from rialto_airflow.snapshot import Snapshot | ||
from rialto_airflow.database import Publication | ||
|
||
from test.utils import num_jsonl_objects | ||
|
||
dotenv.load_dotenv() | ||
|
||
|
||
wos_key = os.environ.get("AIRFLOW_VAR_WOS_KEY") | ||
|
||
|
||
@pytest.fixture | ||
def mock_wos(monkeypatch): | ||
""" | ||
Mock our function for fetching publications by orcid from Web of Science. | ||
""" | ||
|
||
def f(*args, **kwargs): | ||
yield { | ||
"static_data": { | ||
"summary": { | ||
"titles": { | ||
"title": [{"type": "source", "content": "An example title"}] | ||
} | ||
} | ||
}, | ||
"cluster_related": { | ||
"identifiers": { | ||
"identifier": [ | ||
{"type": "issn", "value": "2211-9124"}, | ||
{ | ||
"type": "doi", | ||
"value": "https://doi.org/10.1515/9781503624153", | ||
}, | ||
] | ||
} | ||
}, | ||
} | ||
|
||
monkeypatch.setattr(wos, "orcid_publications", f) | ||
|
||
|
||
@pytest.fixture | ||
def mock_many_wos(monkeypatch): | ||
""" | ||
A fixture for that returns 1000 fake documents from Web of Science. | ||
""" | ||
|
||
def f(*args, **kwargs): | ||
for n in range(1, 1000): | ||
yield {"UID": f"mock:{n}"} | ||
|
||
monkeypatch.setattr(wos, "orcid_publications", f) | ||
|
||
|
||
@pytest.fixture | ||
def existing_publication(test_session): | ||
with test_session.begin() as session: | ||
pub = Publication( | ||
doi="10.1515/9781503624153", | ||
sulpub_json={"sulpub": "data"}, | ||
wos_json={"wos": "data"}, | ||
) | ||
session.add(pub) | ||
return pub | ||
|
||
|
||
@pytest.mark.skipif(wos_key is None, reason="no Web of Science key") | ||
def test_orcid_publications_with_paging(): | ||
""" | ||
This is a live test of WoS API to ensure paging works properly. | ||
""" | ||
|
||
# https://www-webofscience-com.stanford.idm.oclc.org/wos/alldb/advanced-search | ||
# The ORCID that is tested should return more than 200 results to exercise paging | ||
orcid = "https://orcid.org/0000-0002-0673-5257" | ||
|
||
uids = set() | ||
for pub in wos.orcid_publications(orcid): | ||
assert pub | ||
if type(pub) != dict: | ||
print("xxx", pub) | ||
assert pub["UID"] not in uids, "haven't seen publication before" | ||
uids.add(pub["UID"]) | ||
|
||
assert len(uids) > 200, "found more than 200 publications" | ||
|
||
|
||
@pytest.mark.skipif(wos_key is None, reason="no Web of Science key") | ||
def test_orcid_publications_with_bad_orcid(): | ||
""" | ||
This is a live test of the WoS API to ensure that a search for an invalid ORCID yields no results. | ||
""" | ||
assert ( | ||
len(list(wos.orcid_publications("https://orcid.org/0000-0003-0784-7987-XXX"))) | ||
== 0 | ||
) | ||
|
||
|
||
def test_harvest(tmp_path, test_session, mock_authors, mock_wos): | ||
""" | ||
With some authors loaded and a mocked WoS API make sure that a | ||
publication is matched up to the authors using the ORCID. | ||
""" | ||
# harvest from Web of Science | ||
snapshot = Snapshot(path=tmp_path, database_name="rialto_test") | ||
wos.harvest(snapshot) | ||
|
||
# the mocked Web of Science api returns the same publication for both authors | ||
assert num_jsonl_objects(snapshot.path / "wos.jsonl") == 2 | ||
|
||
# make sure a publication is in the database and linked to the author | ||
with test_session.begin() as session: | ||
assert session.query(Publication).count() == 1, "one publication loaded" | ||
|
||
pub = session.query(Publication).first() | ||
assert pub.doi == "10.1515/9781503624153", "doi was normalized" | ||
|
||
assert len(pub.authors) == 2, "publication has two authors" | ||
assert pub.authors[0].orcid == "https://orcid.org/0000-0000-0000-0001" | ||
assert pub.authors[1].orcid == "https://orcid.org/0000-0000-0000-0002" | ||
|
||
|
||
def test_harvest_when_doi_exists( | ||
tmp_path, test_session, existing_publication, mock_authors, mock_wos | ||
): | ||
""" | ||
When a publication and its authors already exist in the database make sure that the wos_json is updated. | ||
""" | ||
# harvest from web of science | ||
snapshot = Snapshot(path=tmp_path, database_name="rialto_test") | ||
wos.harvest(snapshot) | ||
|
||
# jsonl file is there and has two lines (one for each author) | ||
assert num_jsonl_objects(snapshot.path / "wos.jsonl") == 2 | ||
|
||
# ensure that the existing publication for the DOI was updated | ||
with test_session.begin() as session: | ||
assert session.query(Publication).count() == 1, "one publication loaded" | ||
pub = session.query(Publication).first() | ||
|
||
assert pub.wos_json | ||
assert pub.sulpub_json == {"sulpub": "data"}, "sulpub data the same" | ||
assert pub.pubmed_json is None | ||
|
||
assert len(pub.authors) == 2, "publication has two authors" | ||
assert pub.authors[0].orcid == "https://orcid.org/0000-0000-0000-0001" | ||
assert pub.authors[1].orcid == "https://orcid.org/0000-0000-0000-0002" | ||
|
||
|
||
def test_log_message(tmp_path, mock_authors, mock_many_wos, caplog): | ||
caplog.set_level(logging.INFO) | ||
snapshot = Snapshot(tmp_path, "rialto_test") | ||
wos.harvest(snapshot, limit=50) | ||
assert "Reached limit of 50 publications stopping" in caplog.text |