From 4169b8370de9d64bd7f7a615cd91c5a7f82c9a3f Mon Sep 17 00:00:00 2001 From: Ed Summers Date: Fri, 21 Jun 2024 18:56:35 -0400 Subject: [PATCH 1/2] Adding dimensions publications Adds `rialto_airflow.harvest.dimensions.publications_csv()` which fetches publication data from Dimensions using the supplied list of DOIs and writes it to a CSV. All the possible fields for publications are fetched and written. It was also added to the harvest DAG. Closes #7 --- rialto_airflow/dags/harvest.py | 8 ++- rialto_airflow/harvest/dimensions.py | 102 +++++++++++++++++++++------ rialto_airflow/harvest/doi_set.py | 4 +- rialto_airflow/harvest/openalex.py | 2 +- test/harvest/test_dimensions.py | 50 +++++++++++-- 5 files changed, 134 insertions(+), 32 deletions(-) diff --git a/rialto_airflow/dags/harvest.py b/rialto_airflow/dags/harvest.py index f4575d3..480a971 100644 --- a/rialto_airflow/dags/harvest.py +++ b/rialto_airflow/dags/harvest.py @@ -78,11 +78,13 @@ def doi_set(dimensions, openalex, sul_pub): return create_doi_set(dimensions, openalex, sul_pub) @task() - def dimensions_harvest_pubs(dois): + def dimensions_harvest_pubs(dois, snapshot_dir): """ Harvest publication metadata from Dimensions using the dois from doi_set. """ - return True + csv_file = Path(snapshot_dir) / "dimensions-pubs.csv" + dimensions.publications_csv(dois, csv_file) + return str(csv_file) @task() def openalex_harvest_pubs(dois): @@ -131,7 +133,7 @@ def publish(dataset): dois = doi_set(dimensions_dois, openalex_dois, sul_pub) - dimensions_pubs = dimensions_harvest_pubs(dois) + dimensions_pubs = dimensions_harvest_pubs(dois, snapshot_dir) openalex_pubs = openalex_harvest_pubs(dois) diff --git a/rialto_airflow/harvest/dimensions.py b/rialto_airflow/harvest/dimensions.py index dcf54aa..835e3f9 100644 --- a/rialto_airflow/harvest/dimensions.py +++ b/rialto_airflow/harvest/dimensions.py @@ -1,29 +1,20 @@ -import os -import dimcli -import dotenv +import csv import logging -import pandas as pd +import os import pickle import time -import re +from functools import cache + +import dimcli +import pandas as pd import requests +from more_itertools import batched from rialto_airflow.utils import invert_dict -dotenv.load_dotenv() - -dimcli.login( - os.environ.get("AIRFLOW_VAR_DIMENSIONS_API_USER"), - os.environ.get("AIRFLOW_VAR_DIMENSIONS_API_PASS"), - "https://app.dimensions.ai", -) - -dsl = dimcli.Dsl(verbose=False) - def dois_from_orcid(orcid): logging.info(f"looking up dois for orcid {orcid}") - orcid = re.sub(r"^https://orcid.org/", "", orcid) q = """ search publications where researchers.orcid_id = "{}" return publications [doi] @@ -35,9 +26,9 @@ def dois_from_orcid(orcid): while try_count < 20: try_count += 1 try: - result = dsl.query(q) + result = dsl().query(q) break - except requests.exceptions.HTTPError as e: + except requests.exceptions.RequestException as e: logging.error("Dimensions API call %s resulted in error: %s", try_count, e) time.sleep(try_count * 10) @@ -48,8 +39,12 @@ def dois_from_orcid(orcid): yield pub["doi"] -def doi_orcids_pickle(org_data_file, pickle_file, limit=None): - df = pd.read_csv(org_data_file) +def doi_orcids_pickle(authors_csv, pickle_file, limit=None) -> None: + """ + Read the ORCIDs in the provided rialto-orgs authors.csv file and write a + dictionary mapping of DOI -> [ORCID] to a pickle file at the provided path. + """ + df = pd.read_csv(authors_csv) orcids = df[df["orcidid"].notna()]["orcidid"] orcid_dois = {} @@ -60,3 +55,70 @@ def doi_orcids_pickle(org_data_file, pickle_file, limit=None): with open(pickle_file, "wb") as handle: pickle.dump(invert_dict(orcid_dois), handle, protocol=pickle.HIGHEST_PROTOCOL) + + +def publications_csv(dois, csv_file) -> None: + with open(csv_file, "w") as output: + writer = csv.DictWriter(output, publication_fields()) + writer.writeheader() + for pub in publications_from_dois(dois): + logging.info(f"writing metadata for {pub.get('doi')}") + writer.writerow(pub) + + +def publications_from_dois(dois: list, batch_size=200) -> str: + """ + Get the publications metadata for the provided list of DOIs and write as a + CSV file. + """ + fields = " + ".join(publication_fields()) + for doi_batch in batched(dois, batch_size): + doi_list = ",".join(['"{}"'.format(doi) for doi in doi_batch]) + + q = f""" + search publications where doi in [{doi_list}] + return publications [{fields}] + limit 1000 + """ + + result = dsl().query(q) + + for pub in result["publications"]: + yield normalize_publication(pub) + + +@cache +def publication_fields(): + """ + Get a list of all possible fields for publications. + """ + result = dsl().query("describe schema") + return list(result.data["sources"]["publications"]["fields"].keys()) + + +def normalize_publication(pub) -> dict: + for field in publication_fields(): + if field not in pub: + pub[field] = None + + return pub + + +@cache # TODO: maybe the login should expire after some time? +def login(): + """ + Login to Dimensions API and cache the result. + """ + dimcli.login( + os.environ.get("AIRFLOW_VAR_DIMENSIONS_API_USER"), + os.environ.get("AIRFLOW_VAR_DIMENSIONS_API_PASS"), + "https://app.dimensions.ai", + ) + + +def dsl(): + """ + Get the Dimensions DSL for querying. + """ + login() + return dimcli.Dsl(verbose=False) diff --git a/rialto_airflow/harvest/doi_set.py b/rialto_airflow/harvest/doi_set.py index 5f5f43e..a29f852 100644 --- a/rialto_airflow/harvest/doi_set.py +++ b/rialto_airflow/harvest/doi_set.py @@ -1,4 +1,5 @@ import csv +import logging import pickle @@ -8,7 +9,8 @@ def create_doi_set(dimensions: str, openalex: str, sul_pub_csv: str) -> list: openalex_dois = dois_from_pickle(openalex) sul_pub_dois = get_sul_pub_dois(sul_pub_csv) unique_dois = list(set(dimensions_dois + openalex_dois + sul_pub_dois)) - + logging.info(f"found {len(unique_dois)}") + return unique_dois diff --git a/rialto_airflow/harvest/openalex.py b/rialto_airflow/harvest/openalex.py index e180b04..53d3ec8 100644 --- a/rialto_airflow/harvest/openalex.py +++ b/rialto_airflow/harvest/openalex.py @@ -16,9 +16,9 @@ def doi_orcids_pickle(authors_csv, pickle_file, limit=None): orcid_dois = {} count = 0 for row in csv.DictReader(csv_input): - count += 1 orcid = row["orcidid"].replace("https://orcid.org/", "") if orcid: + count += 1 orcid_dois[orcid] = list(dois_from_orcid(orcid)) if limit is not None and count > limit: break diff --git a/test/harvest/test_dimensions.py b/test/harvest/test_dimensions.py index df6e748..f7007bd 100644 --- a/test/harvest/test_dimensions.py +++ b/test/harvest/test_dimensions.py @@ -1,23 +1,20 @@ import os -import dotenv import pickle -import pytest -from rialto_airflow.harvest.dimensions import doi_orcids_pickle +import dotenv +import pandas +from rialto_airflow.harvest import dimensions dotenv.load_dotenv() dimensions_user = os.environ.get("AIRFLOW_VAR_DIMENSIONS_API_USER") dimensions_password = os.environ.get("AIRFLOW_VAR_DIMENSIONS_API_PASS") -no_auth = not (dimensions_user and dimensions_password) - -@pytest.mark.skipif(no_auth, reason="no dimensions key") def test_doi_orcids_dict(tmpdir): pickle_file = tmpdir / "dimensions.pickle" - doi_orcids_pickle("test/data/authors.csv", pickle_file, limit=5) + dimensions.doi_orcids_pickle("test/data/authors.csv", pickle_file, limit=5) assert pickle_file.isfile() with open(pickle_file, "rb") as handle: @@ -25,3 +22,42 @@ def test_doi_orcids_dict(tmpdir): assert len(doi_orcids) > 0 assert doi_orcids["10.1109/lra.2018.2890209"] == ["0000-0002-0770-2940"] + + +def test_publications_from_dois(): + # use batch_size=1 to test paging for two DOIs + pubs = list( + dimensions.publications_from_dois( + ["10.48550/arxiv.1706.03762", "10.1145/3442188.3445922"], batch_size=1 + ) + ) + assert len(pubs) == 2 + assert len(pubs[0].keys()) == 74, "first publication has 74 columns" + assert len(pubs[1].keys()) == 74, "second publication has 74 columns" + + +def test_publication_fields(): + fields = dimensions.publication_fields() + assert len(fields) == 74 + assert "title" in fields + + +def test_publications_csv(tmpdir): + pubs_csv = tmpdir / "dimensions-pubs.csv" + dimensions.publications_csv( + ["10.48550/arxiv.1706.03762", "10.1145/3442188.3445922"], pubs_csv + ) + + df = pandas.read_csv(pubs_csv) + + assert len(df) == 2 + + # the order of the results isn't guaranteed but make sure things are coming back + + assert set(df.title.tolist()) == set( + ["On the Dangers of Stochastic Parrots", "Attention Is All You Need"] + ) + + assert set(df.doi.tolist()) == set( + ["10.48550/arxiv.1706.03762", "10.1145/3442188.3445922"] + ) From c74ba6a34e49b35c0e3f109eebb5ab44f8921dfa Mon Sep 17 00:00:00 2001 From: Ed Summers Date: Mon, 24 Jun 2024 07:01:09 -0400 Subject: [PATCH 2/2] reformat --- rialto_airflow/harvest/doi_set.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rialto_airflow/harvest/doi_set.py b/rialto_airflow/harvest/doi_set.py index a29f852..6bfc199 100644 --- a/rialto_airflow/harvest/doi_set.py +++ b/rialto_airflow/harvest/doi_set.py @@ -10,7 +10,7 @@ def create_doi_set(dimensions: str, openalex: str, sul_pub_csv: str) -> list: sul_pub_dois = get_sul_pub_dois(sul_pub_csv) unique_dois = list(set(dimensions_dois + openalex_dois + sul_pub_dois)) logging.info(f"found {len(unique_dois)}") - + return unique_dois