Skip to content

Commit

Permalink
Merge pull request #47 from sul-dlss-labs/dimensions-pubs
Browse files Browse the repository at this point in the history
Adding dimensions publications
  • Loading branch information
edsu authored Jun 24, 2024
2 parents a0c3380 + c74ba6a commit 6836995
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 31 deletions.
8 changes: 5 additions & 3 deletions rialto_airflow/dags/harvest.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,13 @@ def doi_set(dimensions, openalex, sul_pub):
return create_doi_set(dimensions, openalex, sul_pub)

@task()
def dimensions_harvest_pubs(dois):
def dimensions_harvest_pubs(dois, snapshot_dir):
"""
Harvest publication metadata from Dimensions using the dois from doi_set.
"""
return True
csv_file = Path(snapshot_dir) / "dimensions-pubs.csv"
dimensions.publications_csv(dois, csv_file)
return str(csv_file)

@task()
def openalex_harvest_pubs(dois):
Expand Down Expand Up @@ -131,7 +133,7 @@ def publish(dataset):

dois = doi_set(dimensions_dois, openalex_dois, sul_pub)

dimensions_pubs = dimensions_harvest_pubs(dois)
dimensions_pubs = dimensions_harvest_pubs(dois, snapshot_dir)

openalex_pubs = openalex_harvest_pubs(dois)

Expand Down
102 changes: 82 additions & 20 deletions rialto_airflow/harvest/dimensions.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,20 @@
import os
import dimcli
import dotenv
import csv
import logging
import pandas as pd
import os
import pickle
import time
import re
from functools import cache

import dimcli
import pandas as pd
import requests
from more_itertools import batched

from rialto_airflow.utils import invert_dict

dotenv.load_dotenv()

dimcli.login(
os.environ.get("AIRFLOW_VAR_DIMENSIONS_API_USER"),
os.environ.get("AIRFLOW_VAR_DIMENSIONS_API_PASS"),
"https://app.dimensions.ai",
)

dsl = dimcli.Dsl(verbose=False)


def dois_from_orcid(orcid):
logging.info(f"looking up dois for orcid {orcid}")
orcid = re.sub(r"^https://orcid.org/", "", orcid)
q = """
search publications where researchers.orcid_id = "{}"
return publications [doi]
Expand All @@ -35,9 +26,9 @@ def dois_from_orcid(orcid):
while try_count < 20:
try_count += 1
try:
result = dsl.query(q)
result = dsl().query(q)
break
except requests.exceptions.HTTPError as e:
except requests.exceptions.RequestException as e:
logging.error("Dimensions API call %s resulted in error: %s", try_count, e)
time.sleep(try_count * 10)

Expand All @@ -48,8 +39,12 @@ def dois_from_orcid(orcid):
yield pub["doi"]


def doi_orcids_pickle(org_data_file, pickle_file, limit=None):
df = pd.read_csv(org_data_file)
def doi_orcids_pickle(authors_csv, pickle_file, limit=None) -> None:
"""
Read the ORCIDs in the provided rialto-orgs authors.csv file and write a
dictionary mapping of DOI -> [ORCID] to a pickle file at the provided path.
"""
df = pd.read_csv(authors_csv)
orcids = df[df["orcidid"].notna()]["orcidid"]
orcid_dois = {}

Expand All @@ -60,3 +55,70 @@ def doi_orcids_pickle(org_data_file, pickle_file, limit=None):

with open(pickle_file, "wb") as handle:
pickle.dump(invert_dict(orcid_dois), handle, protocol=pickle.HIGHEST_PROTOCOL)


def publications_csv(dois, csv_file) -> None:
with open(csv_file, "w") as output:
writer = csv.DictWriter(output, publication_fields())
writer.writeheader()
for pub in publications_from_dois(dois):
logging.info(f"writing metadata for {pub.get('doi')}")
writer.writerow(pub)


def publications_from_dois(dois: list, batch_size=200) -> str:
"""
Get the publications metadata for the provided list of DOIs and write as a
CSV file.
"""
fields = " + ".join(publication_fields())
for doi_batch in batched(dois, batch_size):
doi_list = ",".join(['"{}"'.format(doi) for doi in doi_batch])

q = f"""
search publications where doi in [{doi_list}]
return publications [{fields}]
limit 1000
"""

result = dsl().query(q)

for pub in result["publications"]:
yield normalize_publication(pub)


@cache
def publication_fields():
"""
Get a list of all possible fields for publications.
"""
result = dsl().query("describe schema")
return list(result.data["sources"]["publications"]["fields"].keys())


def normalize_publication(pub) -> dict:
for field in publication_fields():
if field not in pub:
pub[field] = None

return pub


@cache # TODO: maybe the login should expire after some time?
def login():
"""
Login to Dimensions API and cache the result.
"""
dimcli.login(
os.environ.get("AIRFLOW_VAR_DIMENSIONS_API_USER"),
os.environ.get("AIRFLOW_VAR_DIMENSIONS_API_PASS"),
"https://app.dimensions.ai",
)


def dsl():
"""
Get the Dimensions DSL for querying.
"""
login()
return dimcli.Dsl(verbose=False)
2 changes: 2 additions & 0 deletions rialto_airflow/harvest/doi_set.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import csv
import logging
import pickle


Expand All @@ -8,6 +9,7 @@ def create_doi_set(dimensions: str, openalex: str, sul_pub_csv: str) -> list:
openalex_dois = dois_from_pickle(openalex)
sul_pub_dois = get_sul_pub_dois(sul_pub_csv)
unique_dois = list(set(dimensions_dois + openalex_dois + sul_pub_dois))
logging.info(f"found {len(unique_dois)}")

return unique_dois

Expand Down
2 changes: 1 addition & 1 deletion rialto_airflow/harvest/openalex.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ def doi_orcids_pickle(authors_csv, pickle_file, limit=None):
orcid_dois = {}
count = 0
for row in csv.DictReader(csv_input):
count += 1
orcid = row["orcidid"].replace("https://orcid.org/", "")
if orcid:
count += 1
orcid_dois[orcid] = list(dois_from_orcid(orcid))
if limit is not None and count > limit:
break
Expand Down
50 changes: 43 additions & 7 deletions test/harvest/test_dimensions.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,63 @@
import os
import dotenv
import pickle
import pytest

from rialto_airflow.harvest.dimensions import doi_orcids_pickle
import dotenv
import pandas

from rialto_airflow.harvest import dimensions

dotenv.load_dotenv()

dimensions_user = os.environ.get("AIRFLOW_VAR_DIMENSIONS_API_USER")
dimensions_password = os.environ.get("AIRFLOW_VAR_DIMENSIONS_API_PASS")

no_auth = not (dimensions_user and dimensions_password)


@pytest.mark.skipif(no_auth, reason="no dimensions key")
def test_doi_orcids_dict(tmpdir):
pickle_file = tmpdir / "dimensions.pickle"
doi_orcids_pickle("test/data/authors.csv", pickle_file, limit=5)
dimensions.doi_orcids_pickle("test/data/authors.csv", pickle_file, limit=5)
assert pickle_file.isfile()

with open(pickle_file, "rb") as handle:
doi_orcids = pickle.load(handle)

assert len(doi_orcids) > 0
assert doi_orcids["10.1109/lra.2018.2890209"] == ["0000-0002-0770-2940"]


def test_publications_from_dois():
# use batch_size=1 to test paging for two DOIs
pubs = list(
dimensions.publications_from_dois(
["10.48550/arxiv.1706.03762", "10.1145/3442188.3445922"], batch_size=1
)
)
assert len(pubs) == 2
assert len(pubs[0].keys()) == 74, "first publication has 74 columns"
assert len(pubs[1].keys()) == 74, "second publication has 74 columns"


def test_publication_fields():
fields = dimensions.publication_fields()
assert len(fields) == 74
assert "title" in fields


def test_publications_csv(tmpdir):
pubs_csv = tmpdir / "dimensions-pubs.csv"
dimensions.publications_csv(
["10.48550/arxiv.1706.03762", "10.1145/3442188.3445922"], pubs_csv
)

df = pandas.read_csv(pubs_csv)

assert len(df) == 2

# the order of the results isn't guaranteed but make sure things are coming back

assert set(df.title.tolist()) == set(
["On the Dangers of Stochastic Parrots", "Attention Is All You Need"]
)

assert set(df.doi.tolist()) == set(
["10.48550/arxiv.1706.03762", "10.1145/3442188.3445922"]
)

0 comments on commit 6836995

Please sign in to comment.