Skip to content

Commit

Permalink
Adding Web of Science harvest
Browse files Browse the repository at this point in the history
Closes #165
  • Loading branch information
edsu committed Mar 3, 2025
1 parent f3943b0 commit a8aceec
Show file tree
Hide file tree
Showing 4 changed files with 309 additions and 1 deletion.
1 change: 1 addition & 0 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ x-airflow-common:
AIRFLOW_VAR_MAIS_SECRET: ${AIRFLOW_VAR_MAIS_SECRET}
AIRFLOW_VAR_SUL_PUB_HOST: ${AIRFLOW_VAR_SUL_PUB_HOST}
AIRFLOW_VAR_SUL_PUB_KEY: ${AIRFLOW_VAR_SUL_PUB_KEY}
AIRFLOW_VAR_WOS_KEY: ${AIRFLOW_VAR_WOS_KEY}
AIRFLOW_VAR_DEV_LIMIT: ${AIRFLOW_VAR_DEV_LIMIT}
AIRFLOW_VAR_DATA_DIR: /opt/airflow/data
AIRFLOW_VAR_PUBLISH_DIR: /opt/airflow/data/latest
Expand Down
14 changes: 13 additions & 1 deletion rialto_airflow/dags/harvest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from airflow.decorators import dag, task
from airflow.models import Variable

from rialto_airflow.harvest import authors, dimensions, merge_pubs, openalex
from rialto_airflow.harvest import authors, dimensions, merge_pubs, openalex, wos
from rialto_airflow.harvest.doi_sunet import create_doi_sunet_pickle
from rialto_airflow.harvest import sul_pub
from rialto_airflow.harvest.contribs import create_contribs
Expand Down Expand Up @@ -74,6 +74,15 @@ def openalex_harvest(snapshot):

return jsonl_file

@task()
def wos_harvest(snapshot):
"""
Fetch the data by ORCID from OpenAlex.
"""
jsonl_file = wos.harvest(snapshot, limit=dev_limit)

return jsonl_file

@task()
def sul_pub_harvest(snapshot):
"""
Expand Down Expand Up @@ -150,6 +159,9 @@ def publish(pubs_to_contribs, merge_publications):

openalex_jsonl = openalex_harvest(snapshot)

# TODO: use the return value here to hook into the workflow
wos_harvest(snapshot)

doi_sunet = create_doi_sunet(
dimensions_dois,
openalex_jsonl,
Expand Down
136 changes: 136 additions & 0 deletions rialto_airflow/harvest/wos.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import json
import logging
import os
import re
from pathlib import Path

import requests
from typing import Generator, Optional
from sqlalchemy.dialects.postgresql import insert

from rialto_airflow.database import (
Author,
Publication,
get_session,
pub_author_association,
)
from rialto_airflow.snapshot import Snapshot
from rialto_airflow.utils import normalize_doi


def harvest(snapshot: Snapshot, limit=None) -> Path:
"""
Walk through all the Author ORCIDs and generate publications for them.
"""
jsonl_file = snapshot.path / "wos.jsonl"
count = 0
stop = False

with jsonl_file.open("w") as jsonl_output:
with get_session(snapshot.database_name).begin() as select_session:
# get all authors that have an ORCID
# TODO: should we just pull the relevant bits back into memory since
# that's what's going on with our client-side buffering connection
# and there aren't that many of them?
for author in (
select_session.query(Author).where(Author.orcid.is_not(None)).all() # type: ignore
):
if stop is True:
logging.info(f"Reached limit of {limit} publications stopping")
break

for wos_pub in orcid_publications(author.orcid):
count += 1
if limit is not None and count > limit:
stop = True
break

doi = get_doi(wos_pub)

with get_session(snapshot.database_name).begin() as insert_session:
# if there's a DOI constraint violation we need to update instead of insert
pub_id = insert_session.execute(
insert(Publication)
.values(
doi=doi,
wos_json=wos_pub,
)
.on_conflict_do_update(
constraint="publication_doi_key",
set_=dict(wos_json=wos_pub),
)
.returning(Publication.id)
).scalar_one()

# a constraint violation is ok here, since it means we
# already know that the publication is by the author
insert_session.execute(
insert(pub_author_association)
.values(publication_id=pub_id, author_id=author.id)
.on_conflict_do_nothing()
)

jsonl_output.write(json.dumps(wos_pub) + "\n")

return jsonl_file


def orcid_publications(orcid) -> Generator[dict]:
# For API details see: https://api.clarivate.com/swagger-ui/

# WoS doesn't recognize ORCID URIs
if m := re.match(r"^https?://orcid.org/(.+)$", orcid):
orcid = m.group(1)

wos_key = os.environ.get("AIRFLOW_VAR_WOS_KEY")
base_url = "https://wos-api.clarivate.com/api/wos"

headers = {"Accept": "application/json", "X-ApiKey": wos_key}

params = {
"databaseId": "WOK",
"usrQuery": f"AI=({orcid})",
"count": 100,
"firstRecord": 1,
}

http = requests.Session()

# get initial set of results
logging.info(f"fetching {base_url} with {params}")
resp = http.get(base_url, params=params, headers=headers)
resp.raise_for_status()
results = resp.json()

if results["QueryResult"]["RecordsFound"] == 0:
logging.info(f"No results found for ORCID {orcid}")
return

yield from results["Data"]["Records"]["records"]["REC"]

# get subsequent results using the query ID

query_id = results["QueryResult"]["QueryID"]
records_found = results["QueryResult"]["RecordsFound"]
first_record = 101 # since the initial set included 100

while first_record < records_found:
params = {"firstRecord": first_record, "count": 100}
logging.info(f"fetching {base_url}/query/{query_id} with {params}")

resp = http.get(f"{base_url}/query/{query_id}", params=params, headers=headers)
resp.raise_for_status()

records = resp.json()["Records"]["records"]["REC"]
yield from records

first_record += len(records)


def get_doi(pub) -> Optional[str]:
ids = pub.get("cluster_related", {}).get("identifiers", {}).get("identifier", [])
for id in ids:
if id["type"] == "doi":
return normalize_doi(id["value"])

return None
159 changes: 159 additions & 0 deletions test/harvest/test_wos.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import dotenv
import logging
import os
import pytest

from rialto_airflow.harvest import wos
from rialto_airflow.snapshot import Snapshot
from rialto_airflow.database import Publication

from test.utils import num_jsonl_objects

dotenv.load_dotenv()


wos_key = os.environ.get("AIRFLOW_VAR_WOS_KEY")


@pytest.fixture
def mock_wos(monkeypatch):
"""
Mock our function for fetching publications by orcid from Web of Science.
"""

def f(*args, **kwargs):
yield {
"static_data": {
"summary": {
"titles": {
"title": [{"type": "source", "content": "An example title"}]
}
}
},
"cluster_related": {
"identifiers": {
"identifier": [
{"type": "issn", "value": "2211-9124"},
{
"type": "doi",
"value": "https://doi.org/10.1515/9781503624153",
},
]
}
},
}

monkeypatch.setattr(wos, "orcid_publications", f)


@pytest.fixture
def mock_many_wos(monkeypatch):
"""
A fixture for that returns 1000 fake documents from Web of Science.
"""

def f(*args, **kwargs):
for n in range(1, 1000):
yield {"UID": f"mock:{n}"}

monkeypatch.setattr(wos, "orcid_publications", f)


@pytest.fixture
def existing_publication(test_session):
with test_session.begin() as session:
pub = Publication(
doi="10.1515/9781503624153",
sulpub_json={"sulpub": "data"},
wos_json={"wos": "data"},
)
session.add(pub)
return pub


@pytest.mark.skipif(wos_key is None, reason="no Web of Science key")
def test_orcid_publications_with_paging():
"""
This is a live test of WoS API to ensure paging works properly.
"""

# https://www-webofscience-com.stanford.idm.oclc.org/wos/alldb/advanced-search
# The ORCID that is tested should return more than 200 results to exercise paging
orcid = "https://orcid.org/0000-0002-0673-5257"

uids = set()
for pub in wos.orcid_publications(orcid):
assert pub
assert pub["UID"] not in uids, "haven't seen publication before"
uids.add(pub["UID"])

assert len(uids) > 200, "found more than 200 publications"


@pytest.mark.skipif(wos_key is None, reason="no Web of Science key")
def test_orcid_publications_with_bad_orcid():
"""
This is a live test of the WoS API to ensure that a search for an invalid ORCID yields no results.
"""
assert (
len(list(wos.orcid_publications("https://orcid.org/0000-0003-0784-7987-XXX")))
== 0
)


def test_harvest(tmp_path, test_session, mock_authors, mock_wos):
"""
With some authors loaded and a mocked WoS API make sure that a
publication is matched up to the authors using the ORCID.
"""
# harvest from Web of Science
snapshot = Snapshot(path=tmp_path, database_name="rialto_test")
wos.harvest(snapshot)

# the mocked Web of Science api returns the same publication for both authors
assert num_jsonl_objects(snapshot.path / "wos.jsonl") == 2

# make sure a publication is in the database and linked to the author
with test_session.begin() as session:
assert session.query(Publication).count() == 1, "one publication loaded"

pub = session.query(Publication).first()
assert pub.doi == "10.1515/9781503624153", "doi was normalized"

assert len(pub.authors) == 2, "publication has two authors"
assert pub.authors[0].orcid == "https://orcid.org/0000-0000-0000-0001"
assert pub.authors[1].orcid == "https://orcid.org/0000-0000-0000-0002"


def test_harvest_when_doi_exists(
tmp_path, test_session, existing_publication, mock_authors, mock_wos
):
"""
When a publication and its authors already exist in the database make sure that the wos_json is updated.
"""
# harvest from web of science
snapshot = Snapshot(path=tmp_path, database_name="rialto_test")
wos.harvest(snapshot)

# jsonl file is there and has two lines (one for each author)
assert num_jsonl_objects(snapshot.path / "wos.jsonl") == 2

# ensure that the existing publication for the DOI was updated
with test_session.begin() as session:
assert session.query(Publication).count() == 1, "one publication loaded"
pub = session.query(Publication).first()

assert pub.wos_json
assert pub.sulpub_json == {"sulpub": "data"}, "sulpub data the same"
assert pub.pubmed_json is None

assert len(pub.authors) == 2, "publication has two authors"
assert pub.authors[0].orcid == "https://orcid.org/0000-0000-0000-0001"
assert pub.authors[1].orcid == "https://orcid.org/0000-0000-0000-0002"


def test_log_message(tmp_path, mock_authors, mock_many_wos, caplog):
caplog.set_level(logging.INFO)
snapshot = Snapshot(tmp_path, "rialto_test")
wos.harvest(snapshot, limit=50)
assert "Reached limit of 50 publications stopping" in caplog.text

0 comments on commit a8aceec

Please sign in to comment.