Skip to content

Commit

Permalink
sulpub code + test
Browse files Browse the repository at this point in the history
Port over sul_pub code from rialto-data, and add some tests.

Closes #3
  • Loading branch information
edsu committed Jun 17, 2024
1 parent 55a2366 commit e08692f
Show file tree
Hide file tree
Showing 12 changed files with 317 additions and 10 deletions.
31 changes: 31 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
name: Test
on:
- push
- pull_request
jobs:
build:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.12]
steps:

- name: checkout
uses: actions/checkout@v3

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}

- name: Lint
uses: chartboost/ruff-action@v1
# it may move, see https://github.com/astral-sh/ruff/issues/8400

- name: Install dependencies
run: |
pip install -r requirements.txt
pip install -r requirements-dev.txt
- name: Run tests
run: pytest
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,5 @@ cython_debug/
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

data/
56 changes: 47 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
# rialto-airflow
Airflow for harvesting data for open access analysis and research intelligence.

[![.github/workflows/test.yml](https://github.com/sul-dlss-labs/rialto-airflow/actions/workflows/test.yml/badge.svg)](https://github.com/sul-dlss-labs/rialto-airflow/actions/workflows/test.yml)

Airflow for harvesting data for open access analysis and research intelligence. The workflow is integrates data from [sul_pub](https://github.com/sul-dlss/sul_pub), [rialto-orgs](https://github.com/sul-dlss/rialto-orgs), [OpenAlex](https://openalex.org/) and [Dimensions](https://www.dimensions.ai/) APIs to provide a view of publication data for Stanford University research. The basic workflow is: fetch Stanford Research publications from sul_pub, look those publications up in OpenAlex and Dimensions using the DOI, merge the the author/department information found in [rialto_orgs], and publish the data to our JupyterHub environment.

```mermaid
flowchart TD
last_harvest(Determine last harvest) --> sul_pub(Publications from sul_pub)
sul_pub --> extract_doi(Extract DOIs)
extract_doi -- DOI --> openalex(OpenAlex)
extract_doi -- DOI --> dimensions(Dimensions)
dimensions --> merge_pubs(Merge Publications)
openalex --> merge_pubs(Merge Publications)
merge_pubs -- SUNETID --> join_departments(Join Departments)
join_departments --> publish(Publish)
```

## Running Locally with Docker

Expand All @@ -11,19 +26,22 @@ Based on the documentation, [Running Airflow in Docker](https://airflow.apache.o

3. Create a `.env` file with the `AIRFLOW_UID` and `AIRFLOW_GROUP` values. For local development these can usually be:
```
AIRFLOW_UID=50000
AIRFLOW_GROUP=0
```
AIRFLOW_UID=50000
AIRFLOW_GROUP=0
AIRFLOW_VAR_DATA_DIR="data"
```
(See [Airflow docs](https://airflow.apache.org/docs/apache-airflow/2.9.2/howto/docker-compose/index.html#setting-the-right-airflow-user) for more info.)

4. Add to the `.env` values for any environment variables used by DAGs. Not in place yet--they will usually applied to VMs by puppet once productionized.

These environment variables must be prefixed with `AIRFLOW_VAR_` to be accessible to DAGs. (See [Airflow env var documentation](https://airflow.apache.org/docs/apache-airflow/stable/howto/variable.html#storing-variables-in-environment-variables and `docker-compose.yml`).) They can have placeholder values. The secrets will be in vault, not prefixed by `AIRFLOW_VAR_`: `vault kv list puppet/application/rialto_airflow/{env}`.
Here is an script to generate content for your dev .env file:

Example script to quickly populate your .env file for dev:
```
for i in `vault kv list puppet/application/rialto_airflow/dev`; do val=$(echo $i| tr '[a-z]' '[A-Z]'); echo AIRFLOW_VAR_$val=`vault kv get -field=content puppet/application/rialto_airflow/dev/$i`; done
```
```
for i in `vault kv list -format yaml puppet/application/rialto-airflow/dev | sed 's/- //'` ; do \
val=$(echo $i| tr '[a-z]' '[A-Z]'); \
echo AIRFLOW_VAR_$val=`vault kv get -field=content puppet/application/rialto-airflow/dev/$i`; \
done
```

## Development

Expand Down Expand Up @@ -56,3 +74,23 @@ uv pip compile pyproject.toml -o requirements.txt
```

Unlike poetry, uv's dependency resolution is not platform-agnostic. If we find we need to generate a requirements.txt for linux, we can use [uv's multi-platform resolution options](https://github.com/astral-sh/uv?tab=readme-ov-file#multi-platform-resolution).

## Run Tests

First enable the virtual environment:

```
source .venv/bin/activate
```

Then ensure the app dependencies and dev dependencies are installed.

```
uv pip install -r requirements.txt -r requirements-dev.txt
```

Then run the tests:

```
pytest
```
Empty file added data/.keep
Empty file.
4 changes: 4 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,13 @@ x-airflow-common:
AIRFLOW_VAR_DIMENSIONS_API_PASS: ${AIRFLOW_VAR_DIMENSIONS_API_PASS}
AIRFLOW_VAR_SUL_PUB_HOST: ${AIRFLOW_VAR_SUL_PUB_HOST}
AIRFLOW_VAR_SUL_PUB_KEY: ${AIRFLOW_VAR_SUL_PUB_KEY}
AIRFLOW_VAR_DATA_DIR: /opt/airflow/data
volumes:
- ${AIRFLOW_PROJ_DIR:-.}/rialto_airflow:/opt/airflow/rialto_airflow
# TODO: we may want to put logs and data outside of the project directory so
# they can persist across capistrano deploys?
- ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
- ${AIRFLOW_PROJ_DIR:-.}/data:/opt/airflow/data
user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-0}"
depends_on:
&airflow-common-depends-on
Expand Down
7 changes: 6 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,13 @@ requires-python = ">= 3.12"
dependencies = [
"pandas",
"requests",
"python-dotenv"
]

[tool.pytest.ini_options]
pythonpath = ["."]
addopts = "-v"

[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"
build-backend = "setuptools.build_meta"
3 changes: 3 additions & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pytest
python-dotenv
apache-airflow==2.9.2
Empty file added rialto_airflow/__init__.py
Empty file.
100 changes: 100 additions & 0 deletions rialto_airflow/dags/update_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import datetime
import pathlib

from airflow.models import Variable
from airflow.decorators import dag, task

from rialto_airflow.utils import last_harvest, create_snapshot_dir
from rialto_airflow.harvest.sul_pub import sul_pub_csv

data_dir = Variable.get("data_dir")
sul_pub_host = Variable.get("sul_pub_host")
sul_pub_key = Variable.get("sul_pub_key")

@dag(
schedule=None,
start_date=datetime.datetime(2024, 1, 1),
catchup=False,
)
def update_data():

@task(multiple_outputs=True)
def setup():
"""
Setup the data directory to write to and determine the last harvest.
"""
return {
"last_harvest": last_harvest(),
"snapshot_dir": create_snapshot_dir(data_dir)
}

@task()
def fetch_sul_pub(last_harvest, snapshot_dir):
"""
Harvest data from sul_pub using the last harvest date.
"""
csv_file = pathlib.Path(snapshot_dir) / "sulpub.csv"
sul_pub_csv(csv_file, sul_pub_host, sul_pub_key, since=last_harvest)

return str(csv_file)

@task()
def extract_doi(sulpub):
"""
Extract a unique list of DOIs from the new publications data.
"""
return True

@task()
def fetch_openalex(dois):
"""
Fetch the data by DOI from OpenAlex.
"""
return True

@task()
def fetch_dimensions(dois):
"""
Fetch the data by DOI from Dimensions.
"""
return True

@task()
def merge_publications(sul_pub, openalex, dimensions):
"""
Merge the OpenAlex, Dimensions and sul_pub data.
"""
return True

@task()
def merge_contributors(pubs):
"""
Merge in contributor and departmental data from rialto-orgs.
"""
return True

@task
def create_dataset(pubs, contribs):
"""
Aggregate the incremental snapshot data into a single dataset.
"""
return True

@task()
def publish(dataset):
"""
Publish aggregate data to JupyterHub environment.
"""
return True

config = setup()
sul_pub = fetch_sul_pub(config["last_harvest"], config["snapshot_dir"])
dois = extract_doi(sul_pub)
openalex = fetch_openalex(dois)
dimensions = fetch_dimensions(dois)
pubs = merge_publications(sul_pub, openalex, dimensions)
contribs = merge_contributors(pubs)
dataset = create_dataset(pubs, contribs)
publish(dataset)

update_data()
76 changes: 76 additions & 0 deletions rialto_airflow/harvest/sul_pub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import csv
import logging

import requests


sul_pub_fields = [
"authorship",
"title",
"abstract",
"author",
"year",
"type",
"mesh_headings",
"publisher",
"journal",
"provenance",
"doi",
"issn",
"sulpubid",
"sw_id",
"pmid",
"identifier",
"last_updated",
"pages",
"date",
"country",
"booktitle",
"edition",
"series",
"chapter",
"editor",
]


def sul_pub_csv(csv_file, host, key, since=None, limit=None):
with open(csv_file, "w") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=sul_pub_fields)
writer.writeheader()
for row in harvest(host, key, since, limit):
writer.writerow(row)


def harvest(host, key, since, limit):
url = f"https://{host}/publications.json"

http_headers = {"CAPKEY": key}

params = { "per": 1000 }
if since:
params["changedSince"] = since.strftime('%Y-%m-%d')

page = 0
record_count = 0
more = True

while more:
page += 1
params['page'] = page

logging.info(f"fetching sul_pub results {url} {params}")
resp = requests.get(url, params=params, headers=http_headers)
resp.raise_for_status()

records = resp.json()['records']
if len(records) == 0:
more = False

for record in records:
record_count += 1
if limit is not None and record_count > limit:
logging.info(f"stopping with limit={limit}")
more = False
break

yield {key: record[key] for key in record if key in sul_pub_fields}
13 changes: 13 additions & 0 deletions rialto_airflow/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import os
import datetime

def last_harvest():
# TODO: look in the data_dir to determine the last harvest
return datetime.datetime(2024, 1, 1, tzinfo=datetime.timezone.utc)

def create_snapshot_dir(data_dir):
now = datetime.datetime.now()
snapshot_dir = os.path.join(data_dir, now.strftime('%Y%m%d%H%M%S'))
os.mkdir(snapshot_dir)

return snapshot_dir
35 changes: 35 additions & 0 deletions test/harvest/test_sul_pub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import os
import datetime

import dotenv
import pandas
import pytest

from rialto_airflow.harvest.sul_pub import sul_pub_csv

dotenv.load_dotenv()

sul_pub_host = os.environ.get('AIRFLOW_VAR_SUL_PUB_HOST')
sul_pub_key = os.environ.get('AIRFLOW_VAR_SUL_PUB_KEY')

no_auth = not (sul_pub_host and sul_pub_key)

@pytest.mark.skipif(no_auth, reason="no sul_pub key")
def test_sul_pub_csv(tmpdir):
csv_file = tmpdir / "sul_pub.csv"
sul_pub_csv(csv_file, sul_pub_host, sul_pub_key, limit=2000)
assert csv_file.isfile()

df = pandas.read_csv(csv_file)
assert len(df) == 2000
assert "title" in df.columns

@pytest.mark.skip(reason="sul_pub changeSince broken")
@pytest.mark.skipif(no_auth, reason="no sul_pub key")
def test_sul_pub_csv_since(tmpdir):
csv_file = tmpdir / "sul_pub.csv"
since = datetime.datetime(2024, 1, 1, tzinfo=datetime.timezone.utc)
sul_pub_csv(csv_file, sul_pub_host, sul_pub_key, since=since, limit=100)

df = pandas.read_csv(csv_file, parse_dates=['last_updated'])
assert len(df[df['last_updated'] < since]) == 0

0 comments on commit e08692f

Please sign in to comment.