Skip to content

Commit

Permalink
Adding Web of Science harvest
Browse files Browse the repository at this point in the history
Closes #165
  • Loading branch information
edsu committed Mar 4, 2025
1 parent f3943b0 commit becde5b
Show file tree
Hide file tree
Showing 5 changed files with 349 additions and 2 deletions.
1 change: 1 addition & 0 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ x-airflow-common:
AIRFLOW_VAR_MAIS_SECRET: ${AIRFLOW_VAR_MAIS_SECRET}
AIRFLOW_VAR_SUL_PUB_HOST: ${AIRFLOW_VAR_SUL_PUB_HOST}
AIRFLOW_VAR_SUL_PUB_KEY: ${AIRFLOW_VAR_SUL_PUB_KEY}
AIRFLOW_VAR_WOS_KEY: ${AIRFLOW_VAR_WOS_KEY}
AIRFLOW_VAR_DEV_LIMIT: ${AIRFLOW_VAR_DEV_LIMIT}
AIRFLOW_VAR_DATA_DIR: /opt/airflow/data
AIRFLOW_VAR_PUBLISH_DIR: /opt/airflow/data/latest
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ dependencies = [
[tool.pytest.ini_options]
pythonpath = ["."]
markers = "mais_tests: Tests requiring MAIS access"
addopts = "-v --cov --cov-report=html --cov-report=term"
addopts = "-v --cov --cov-report=html --cov-report=term --log-level INFO --log-file test.log"


[tool.coverage.run]
omit = ["test/*"]
Expand Down
14 changes: 13 additions & 1 deletion rialto_airflow/dags/harvest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from airflow.decorators import dag, task
from airflow.models import Variable

from rialto_airflow.harvest import authors, dimensions, merge_pubs, openalex
from rialto_airflow.harvest import authors, dimensions, merge_pubs, openalex, wos
from rialto_airflow.harvest.doi_sunet import create_doi_sunet_pickle
from rialto_airflow.harvest import sul_pub
from rialto_airflow.harvest.contribs import create_contribs
Expand Down Expand Up @@ -74,6 +74,15 @@ def openalex_harvest(snapshot):

return jsonl_file

@task()
def wos_harvest(snapshot):
"""
Fetch the data by ORCID from OpenAlex.
"""
jsonl_file = wos.harvest(snapshot, limit=dev_limit)

return jsonl_file

@task()
def sul_pub_harvest(snapshot):
"""
Expand Down Expand Up @@ -150,6 +159,9 @@ def publish(pubs_to_contribs, merge_publications):

openalex_jsonl = openalex_harvest(snapshot)

# TODO: use the return value here to hook into the workflow
wos_harvest(snapshot)

doi_sunet = create_doi_sunet(
dimensions_dois,
openalex_jsonl,
Expand Down
174 changes: 174 additions & 0 deletions rialto_airflow/harvest/wos.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
import json
import logging
import os
import re
from pathlib import Path

import requests
from typing import Generator, Optional, Dict, Union
from sqlalchemy.dialects.postgresql import insert

from rialto_airflow.database import (
Author,
Publication,
get_session,
pub_author_association,
)
from rialto_airflow.snapshot import Snapshot
from rialto_airflow.utils import normalize_doi

Params = Dict[str, Union[int, str]]


def harvest(snapshot: Snapshot, limit=None) -> Path:
"""
Walk through all the Author ORCIDs and generate publications for them.
"""
jsonl_file = snapshot.path / "wos.jsonl"
count = 0
stop = False

with jsonl_file.open("w") as jsonl_output:
with get_session(snapshot.database_name).begin() as select_session:
# get all authors that have an ORCID
# TODO: should we just pull the relevant bits back into memory since
# that's what's going on with our client-side buffering connection
# and there aren't that many of them?
for author in (
select_session.query(Author).where(Author.orcid.is_not(None)).all() # type: ignore
):
if stop is True:
logging.info(f"Reached limit of {limit} publications stopping")
break

for wos_pub in orcid_publications(author.orcid):
count += 1
if limit is not None and count > limit:
stop = True
break

doi = get_doi(wos_pub)

with get_session(snapshot.database_name).begin() as insert_session:
# if there's a DOI constraint violation we need to update instead of insert
pub_id = insert_session.execute(
insert(Publication)
.values(
doi=doi,
wos_json=wos_pub,
)
.on_conflict_do_update(
constraint="publication_doi_key",
set_=dict(wos_json=wos_pub),
)
.returning(Publication.id)
).scalar_one()

# a constraint violation is ok here, since it means we
# already know that the publication is by the author
insert_session.execute(
insert(pub_author_association)
.values(publication_id=pub_id, author_id=author.id)
.on_conflict_do_nothing()
)

jsonl_output.write(json.dumps(wos_pub) + "\n")

return jsonl_file


def orcid_publications(orcid) -> Generator[dict, None, None]:
"""
A generator that returns publications associated with a given ORCID.
"""

# For API details see: https://api.clarivate.com/swagger-ui/

# WoS doesn't recognize ORCID URIs which are stored in User table
if m := re.match(r"^https?://orcid.org/(.+)$", orcid):
orcid = m.group(1)

Check warning on line 89 in rialto_airflow/harvest/wos.py

View check run for this annotation

Codecov / codecov/patch

rialto_airflow/harvest/wos.py#L89

Added line #L89 was not covered by tests

wos_key = os.environ.get("AIRFLOW_VAR_WOS_KEY")
base_url = "https://wos-api.clarivate.com/api/wos"
headers = {"Accept": "application/json", "X-ApiKey": wos_key}

Check warning on line 93 in rialto_airflow/harvest/wos.py

View check run for this annotation

Codecov / codecov/patch

rialto_airflow/harvest/wos.py#L91-L93

Added lines #L91 - L93 were not covered by tests

# the number of records to get in each request (100 is max)
batch_size = 100

Check warning on line 96 in rialto_airflow/harvest/wos.py

View check run for this annotation

Codecov / codecov/patch

rialto_airflow/harvest/wos.py#L96

Added line #L96 was not covered by tests

params: Params = {

Check warning on line 98 in rialto_airflow/harvest/wos.py

View check run for this annotation

Codecov / codecov/patch

rialto_airflow/harvest/wos.py#L98

Added line #L98 was not covered by tests
"databaseId": "WOK",
"usrQuery": f"AI=({orcid})",
"count": batch_size,
"firstRecord": 1,
}

http = requests.Session()

Check warning on line 105 in rialto_airflow/harvest/wos.py

View check run for this annotation

Codecov / codecov/patch

rialto_airflow/harvest/wos.py#L105

Added line #L105 was not covered by tests

# get the initial set of results, which also gives us a Query ID to fetch
# subsequent pages of results if there are any

logging.info(f"fetching {base_url} with {params}")
resp: requests.Response = http.get(base_url, params=params, headers=headers)
resp.raise_for_status()

Check warning on line 112 in rialto_airflow/harvest/wos.py

View check run for this annotation

Codecov / codecov/patch

rialto_airflow/harvest/wos.py#L110-L112

Added lines #L110 - L112 were not covered by tests

results = get_json(resp)

Check warning on line 114 in rialto_airflow/harvest/wos.py

View check run for this annotation

Codecov / codecov/patch

rialto_airflow/harvest/wos.py#L114

Added line #L114 was not covered by tests
if results is None:
return

Check warning on line 116 in rialto_airflow/harvest/wos.py

View check run for this annotation

Codecov / codecov/patch

rialto_airflow/harvest/wos.py#L116

Added line #L116 was not covered by tests

if results["QueryResult"]["RecordsFound"] == 0:
logging.info(f"No results found for ORCID {orcid}")
return

Check warning on line 120 in rialto_airflow/harvest/wos.py

View check run for this annotation

Codecov / codecov/patch

rialto_airflow/harvest/wos.py#L119-L120

Added lines #L119 - L120 were not covered by tests

yield from results["Data"]["Records"]["records"]["REC"]

Check warning on line 122 in rialto_airflow/harvest/wos.py

View check run for this annotation

Codecov / codecov/patch

rialto_airflow/harvest/wos.py#L122

Added line #L122 was not covered by tests

# get subsequent results using the Query ID

query_id = results["QueryResult"]["QueryID"]
records_found = results["QueryResult"]["RecordsFound"]
first_record = batch_size + 1 # since the initial set included 100

Check warning on line 128 in rialto_airflow/harvest/wos.py

View check run for this annotation

Codecov / codecov/patch

rialto_airflow/harvest/wos.py#L126-L128

Added lines #L126 - L128 were not covered by tests

# if there aren't any more results to fetch this loop will never be entered

logging.info(f"{records_found} records found")

Check warning on line 132 in rialto_airflow/harvest/wos.py

View check run for this annotation

Codecov / codecov/patch

rialto_airflow/harvest/wos.py#L132

Added line #L132 was not covered by tests
while first_record < records_found:
page_params: Params = {"firstRecord": first_record, "count": batch_size}
logging.info(f"fetching {base_url}/query/{query_id} with {page_params}")

Check warning on line 135 in rialto_airflow/harvest/wos.py

View check run for this annotation

Codecov / codecov/patch

rialto_airflow/harvest/wos.py#L134-L135

Added lines #L134 - L135 were not covered by tests

resp = http.get(

Check warning on line 137 in rialto_airflow/harvest/wos.py

View check run for this annotation

Codecov / codecov/patch

rialto_airflow/harvest/wos.py#L137

Added line #L137 was not covered by tests
f"{base_url}/query/{query_id}", params=page_params, headers=headers
)
resp.raise_for_status()

Check warning on line 140 in rialto_airflow/harvest/wos.py

View check run for this annotation

Codecov / codecov/patch

rialto_airflow/harvest/wos.py#L140

Added line #L140 was not covered by tests

records = get_json(resp)

Check warning on line 142 in rialto_airflow/harvest/wos.py

View check run for this annotation

Codecov / codecov/patch

rialto_airflow/harvest/wos.py#L142

Added line #L142 was not covered by tests
if records is None:
break

Check warning on line 144 in rialto_airflow/harvest/wos.py

View check run for this annotation

Codecov / codecov/patch

rialto_airflow/harvest/wos.py#L144

Added line #L144 was not covered by tests

yield from records["Records"]["records"]["REC"]

Check warning on line 146 in rialto_airflow/harvest/wos.py

View check run for this annotation

Codecov / codecov/patch

rialto_airflow/harvest/wos.py#L146

Added line #L146 was not covered by tests

# move the offset along in the results list
first_record += batch_size

Check warning on line 149 in rialto_airflow/harvest/wos.py

View check run for this annotation

Codecov / codecov/patch

rialto_airflow/harvest/wos.py#L149

Added line #L149 was not covered by tests


def get_json(resp: requests.Response) -> Optional[dict]:
try:
return resp.json()
except requests.exceptions.JSONDecodeError as e:

Check warning on line 155 in rialto_airflow/harvest/wos.py

View check run for this annotation

Codecov / codecov/patch

rialto_airflow/harvest/wos.py#L153-L155

Added lines #L153 - L155 were not covered by tests
if resp.text == "":
# some items seem to return 200 OK but with empty JSON payloads, e.g.
# curl -i --header "application/json" --header 'X-ApiKey: API_KEY' 'https://wos-api.clarivate.com/api/wos?databaseId=WOK&usrQuery=AI%3D%280000-0002-3271-7861%29&count=100&firstRecord=1'
logging.error(

Check warning on line 159 in rialto_airflow/harvest/wos.py

View check run for this annotation

Codecov / codecov/patch

rialto_airflow/harvest/wos.py#L159

Added line #L159 was not covered by tests
f"got empty string instead of JSON when looking up {resp.url}"
)
return None

Check warning on line 162 in rialto_airflow/harvest/wos.py

View check run for this annotation

Codecov / codecov/patch

rialto_airflow/harvest/wos.py#L162

Added line #L162 was not covered by tests
else:
logging.error(f"uhoh, instead of JSON we got: {resp.text}")
raise e

Check warning on line 165 in rialto_airflow/harvest/wos.py

View check run for this annotation

Codecov / codecov/patch

rialto_airflow/harvest/wos.py#L164-L165

Added lines #L164 - L165 were not covered by tests


def get_doi(pub) -> Optional[str]:
ids = pub.get("cluster_related", {}).get("identifiers", {}).get("identifier", [])
for id in ids:
if id["type"] == "doi":
return normalize_doi(id["value"])

return None
159 changes: 159 additions & 0 deletions test/harvest/test_wos.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import dotenv
import logging
import os
import pytest

from rialto_airflow.harvest import wos
from rialto_airflow.snapshot import Snapshot
from rialto_airflow.database import Publication

from test.utils import num_jsonl_objects

dotenv.load_dotenv()


wos_key = os.environ.get("AIRFLOW_VAR_WOS_KEY")


@pytest.fixture
def mock_wos(monkeypatch):
"""
Mock our function for fetching publications by orcid from Web of Science.
"""

def f(*args, **kwargs):
yield {
"static_data": {
"summary": {
"titles": {
"title": [{"type": "source", "content": "An example title"}]
}
}
},
"cluster_related": {
"identifiers": {
"identifier": [
{"type": "issn", "value": "2211-9124"},
{
"type": "doi",
"value": "https://doi.org/10.1515/9781503624153",
},
]
}
},
}

monkeypatch.setattr(wos, "orcid_publications", f)


@pytest.fixture
def mock_many_wos(monkeypatch):
"""
A fixture for that returns 1000 fake documents from Web of Science.
"""

def f(*args, **kwargs):
for n in range(1, 1000):
yield {"UID": f"mock:{n}"}

monkeypatch.setattr(wos, "orcid_publications", f)


@pytest.fixture
def existing_publication(test_session):
with test_session.begin() as session:
pub = Publication(
doi="10.1515/9781503624153",
sulpub_json={"sulpub": "data"},
wos_json={"wos": "data"},
)
session.add(pub)
return pub


@pytest.mark.skipif(wos_key is None, reason="no Web of Science key")
def test_orcid_publications_with_paging():
"""
This is a live test of WoS API to ensure paging works properly.
"""

# https://www-webofscience-com.stanford.idm.oclc.org/wos/alldb/advanced-search
# The ORCID that is tested should return more than 200 results to exercise paging
orcid = "https://orcid.org/0000-0002-0673-5257"

uids = set()
for pub in wos.orcid_publications(orcid):
assert pub
assert pub["UID"] not in uids, "haven't seen publication before"
uids.add(pub["UID"])

assert len(uids) > 200, "found more than 200 publications"


@pytest.mark.skipif(wos_key is None, reason="no Web of Science key")
def test_orcid_publications_with_bad_orcid():
"""
This is a live test of the WoS API to ensure that a search for an invalid ORCID yields no results.
"""
assert (
len(list(wos.orcid_publications("https://orcid.org/0000-0003-0784-7987-XXX")))
== 0
)


def test_harvest(tmp_path, test_session, mock_authors, mock_wos):
"""
With some authors loaded and a mocked WoS API make sure that a
publication is matched up to the authors using the ORCID.
"""
# harvest from Web of Science
snapshot = Snapshot(path=tmp_path, database_name="rialto_test")
wos.harvest(snapshot)

# the mocked Web of Science api returns the same publication for both authors
assert num_jsonl_objects(snapshot.path / "wos.jsonl") == 2

# make sure a publication is in the database and linked to the author
with test_session.begin() as session:
assert session.query(Publication).count() == 1, "one publication loaded"

pub = session.query(Publication).first()
assert pub.doi == "10.1515/9781503624153", "doi was normalized"

assert len(pub.authors) == 2, "publication has two authors"
assert pub.authors[0].orcid == "https://orcid.org/0000-0000-0000-0001"
assert pub.authors[1].orcid == "https://orcid.org/0000-0000-0000-0002"


def test_harvest_when_doi_exists(
tmp_path, test_session, existing_publication, mock_authors, mock_wos
):
"""
When a publication and its authors already exist in the database make sure that the wos_json is updated.
"""
# harvest from web of science
snapshot = Snapshot(path=tmp_path, database_name="rialto_test")
wos.harvest(snapshot)

# jsonl file is there and has two lines (one for each author)
assert num_jsonl_objects(snapshot.path / "wos.jsonl") == 2

# ensure that the existing publication for the DOI was updated
with test_session.begin() as session:
assert session.query(Publication).count() == 1, "one publication loaded"
pub = session.query(Publication).first()

assert pub.wos_json
assert pub.sulpub_json == {"sulpub": "data"}, "sulpub data the same"
assert pub.pubmed_json is None

assert len(pub.authors) == 2, "publication has two authors"
assert pub.authors[0].orcid == "https://orcid.org/0000-0000-0000-0001"
assert pub.authors[1].orcid == "https://orcid.org/0000-0000-0000-0002"


def test_log_message(tmp_path, mock_authors, mock_many_wos, caplog):
caplog.set_level(logging.INFO)
snapshot = Snapshot(tmp_path, "rialto_test")
wos.harvest(snapshot, limit=50)
assert "Reached limit of 50 publications stopping" in caplog.text

0 comments on commit becde5b

Please sign in to comment.