Skip to content

Commit

Permalink
Add an out table with the differences for each Zenodo field
Browse files Browse the repository at this point in the history
  • Loading branch information
e-belfer committed Oct 25, 2024
1 parent c656a31 commit 05d984a
Show file tree
Hide file tree
Showing 5 changed files with 260 additions and 1 deletion.
59 changes: 59 additions & 0 deletions migrations/versions/f5dbba621bb1_add_out_zenodo_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""Add out_zenodo_logs
Revision ID: f5dbba621bb1
Revises: b4fee22b4a4d
Create Date: 2024-10-25 13:30:27.163055
"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = 'f5dbba621bb1'
down_revision: Union[str, None] = 'b4fee22b4a4d'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('out_zenodo_logs',
sa.Column('metrics_date', sa.Date(), nullable=False, comment='The date when the metadata was reported.'),
sa.Column('version', sa.String(), nullable=True, comment='The version (e.g. 10.0.0) of the dataset record.'),
sa.Column('dataset_slug', sa.String(), nullable=True, comment='The shorthand for the dataset being archived. Matches the pudl_archiver repository dataset slugs when the dataset is archived by the PUDL archiver.'),
sa.Column('new_dataset_downloads', sa.Integer(), nullable=True, comment='The number of new daily downloads for the entire dataset. A download is a user (human or machine) downloading a file from a record, excluding double-clicks and robots. If a record has multiple files and you download all files, each file counts as one download.'),
sa.Column('new_dataset_unique_downloads', sa.Integer(), nullable=True, comment='The number of new daily unique downloads for the entire dataset. A unique download is defined as one or more file downloads from files of a single record by a user within a 1-hour time-window. This means that if one or more files of the same record were downloaded multiple times by the same user within the same time-window, it is considered to be one unique download.'),
sa.Column('new_dataset_views', sa.Integer(), nullable=True, comment='The number of new daily views for the entire dataset. A total view is a user (human or machine) visiting a record, excluding double-clicks and robots.'),
sa.Column('new_dataset_unique_views', sa.Integer(), nullable=True, comment='The number of new daily unique downloads for the entire dataset. A unique view is defined as one or more visits by a user within a 1-hour time-window. This means that if the same record was accessed multiple times by the same user within the same time-window, Zenodo considers it as one unique view.'),
sa.Column('new_version_downloads', sa.Integer(), nullable=True, comment='The number of new daily downloads for the version. A total download is a user (human or machine) downloading a file from a record, excluding double-clicks and robots. If a record has multiple files and you download all files, each file counts as one download.'),
sa.Column('new_version_unique_downloads', sa.Integer(), nullable=True, comment='The number of new daily unique downloads for the version. A unique download is defined as one or more file downloads from files of a single record by a user within a 1-hour time-window. This means that if one or more files of the same record were downloaded multiple times by the same user within the same time-window, it is considered to be one unique download.'),
sa.Column('new_version_views', sa.Integer(), nullable=True, comment='The number of new daily views for the version. A total view is a user (human or machine) visiting a record, excluding double-clicks and robots.'),
sa.Column('new_version_unique_views', sa.Integer(), nullable=True, comment='The number of new daily unique downloads for the version. A unique view is defined as one or more visits by a user within a 1-hour time-window. This means that if the same record was accessed multiple times by the same user within the same time-window, Zenodo considers it as one unique view.'),
sa.Column('version_title', sa.String(), nullable=True, comment='The name of the version in Zenodo.'),
sa.Column('version_id', sa.Integer(), nullable=False, comment='The unique ID of the Zenodo version. This is identical to the version DOI.'),
sa.Column('version_record_id', sa.Integer(), nullable=True, comment='The record ID of the Zenodo version. This is identical to the version ID.'),
sa.Column('concept_record_id', sa.Integer(), nullable=True, comment='The concept record ID. This is shared between all versions of a record.'),
sa.Column('version_creation_date', sa.DateTime(), nullable=True, comment='The datetime the record was created.'),
sa.Column('version_last_modified_date', sa.DateTime(), nullable=True, comment='The datetime the record was last modified.'),
sa.Column('version_last_updated_date', sa.DateTime(), nullable=True, comment='The datetime the record was last updated.'),
sa.Column('version_publication_date', sa.Date(), nullable=True, comment='The date that the version was published.'),
sa.Column('version_doi', sa.String(), nullable=True, comment='The DOI of the Zenodo version.'),
sa.Column('concept_record_doi', sa.String(), nullable=True, comment='The DOI of the Zenodo concept record.'),
sa.Column('version_doi_url', sa.String(), nullable=True, comment='The DOI link of the Zenodo version.'),
sa.Column('version_status', sa.String(), nullable=True, comment='The status of the Zenodo version.'),
sa.Column('version_state', sa.String(), nullable=True, comment='The state of the Zenodo version.'),
sa.Column('version_submitted', sa.Boolean(), nullable=True, comment='Is the version submitted?'),
sa.Column('version_description', sa.String(), nullable=True, comment='The description of the version.'),
sa.Column('partition_key', sa.String(), nullable=True),
sa.PrimaryKeyConstraint('metrics_date', 'version_id')
)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('out_zenodo_logs')
# ### end Alembic commands ###
1 change: 1 addition & 0 deletions src/usage_metrics/etl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

out_module_groups = {
"out_s3": [usage_metrics.out.s3],
"out_zenodo": [usage_metrics.out.zenodo],
}

non_partitioned_module_groups = {
Expand Down
138 changes: 138 additions & 0 deletions src/usage_metrics/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,144 @@
Column("partition_key", String),
)

out_zenodo_logs = Table(
"out_zenodo_logs",
usage_metrics_metadata,
Column(
"metrics_date",
Date,
primary_key=True,
comment="The date when the metadata was reported.",
),
Column(
"version",
String,
comment="The version (e.g. 10.0.0) of the dataset record.",
),
Column(
"dataset_slug",
String,
comment="The shorthand for the dataset being archived. Matches the pudl_archiver repository dataset slugs when the dataset is archived by the PUDL archiver.",
),
Column(
"new_dataset_downloads",
Integer,
comment="The number of new daily downloads for the entire dataset. A download is a user (human or machine) downloading a file from a record, excluding double-clicks and robots. If a record has multiple files and you download all files, each file counts as one download.",
),
Column(
"new_dataset_unique_downloads",
Integer,
comment="The number of new daily unique downloads for the entire dataset. A unique download is defined as one or more file downloads from files of a single record by a user within a 1-hour time-window. This means that if one or more files of the same record were downloaded multiple times by the same user within the same time-window, it is considered to be one unique download.",
),
Column(
"new_dataset_views",
Integer,
comment="The number of new daily views for the entire dataset. A total view is a user (human or machine) visiting a record, excluding double-clicks and robots.",
),
Column(
"new_dataset_unique_views",
Integer,
comment="The number of new daily unique downloads for the entire dataset. A unique view is defined as one or more visits by a user within a 1-hour time-window. This means that if the same record was accessed multiple times by the same user within the same time-window, Zenodo considers it as one unique view.",
),
Column(
"new_version_downloads",
Integer,
comment="The number of new daily downloads for the version. A total download is a user (human or machine) downloading a file from a record, excluding double-clicks and robots. If a record has multiple files and you download all files, each file counts as one download.",
),
Column(
"new_version_unique_downloads",
Integer,
comment="The number of new daily unique downloads for the version. A unique download is defined as one or more file downloads from files of a single record by a user within a 1-hour time-window. This means that if one or more files of the same record were downloaded multiple times by the same user within the same time-window, it is considered to be one unique download.",
),
Column(
"new_version_views",
Integer,
comment="The number of new daily views for the version. A total view is a user (human or machine) visiting a record, excluding double-clicks and robots.",
),
Column(
"new_version_unique_views",
Integer,
comment="The number of new daily unique downloads for the version. A unique view is defined as one or more visits by a user within a 1-hour time-window. This means that if the same record was accessed multiple times by the same user within the same time-window, Zenodo considers it as one unique view.",
),
Column(
"version_title",
String,
comment="The name of the version in Zenodo.",
),
Column(
"version_id",
Integer,
primary_key=True,
comment="The unique ID of the Zenodo version. This is identical to the version DOI.",
),
Column(
"version_record_id",
Integer,
comment="The record ID of the Zenodo version. This is identical to the version ID.",
),
Column(
"concept_record_id",
Integer,
comment="The concept record ID. This is shared between all versions of a record.",
),
Column(
"version_creation_date",
DateTime,
comment="The datetime the record was created.",
),
Column(
"version_last_modified_date",
DateTime,
comment="The datetime the record was last modified.",
),
Column(
"version_last_updated_date",
DateTime,
comment="The datetime the record was last updated.",
),
Column(
"version_publication_date",
Date,
comment="The date that the version was published.",
),
Column(
"version_doi",
String,
comment="The DOI of the Zenodo version.",
),
Column(
"concept_record_doi",
String,
comment="The DOI of the Zenodo concept record.",
),
Column(
"version_doi_url",
String,
comment="The DOI link of the Zenodo version.",
),
Column(
"version_status",
String,
comment="The status of the Zenodo version.",
),
Column(
"version_state",
String,
comment="The state of the Zenodo version.",
),
Column(
"version_submitted",
Boolean,
comment="Is the version submitted?",
),
Column(
"version_description",
String,
comment="The description of the version.",
),
Column("partition_key", String),
)

intake_logs = Table(
"intake_logs",
usage_metrics_metadata,
Expand Down
2 changes: 1 addition & 1 deletion src/usage_metrics/out/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Module contains assets that transform data into core assets."""

from . import s3
from . import s3, zenodo
61 changes: 61 additions & 0 deletions src/usage_metrics/out/zenodo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""Create outputs from Zenodo logs."""

import pandas as pd
from dagster import (
AssetExecutionContext,
WeeklyPartitionsDefinition,
asset,
)


@asset(
partitions_def=WeeklyPartitionsDefinition(start_date="2023-08-16"),
io_manager_key="database_manager",
tags={"source": "zenodo"},
)
def out_zenodo_logs(
context: AssetExecutionContext,
core_zenodo_logs: pd.DataFrame,
) -> pd.DataFrame:
"""Output daily Zenodo logs.
Calculate differences from the cumulative views and downloads columns.
"""
context.log.info(f"Processing data for the week of {context.partition_key}")

if core_zenodo_logs.empty:
return core_zenodo_logs
metrics_cols = [
"dataset_downloads",
"dataset_unique_downloads",
"dataset_views",
"dataset_unique_views",
"version_downloads",
"version_unique_downloads",
"version_views",
"version_unique_views",
]

# Drop mistaken/deleted archives
df = core_zenodo_logs.loc[~core_zenodo_logs.version_id.isin([13919960, 13920120])]

df["metrics_date"] = pd.to_datetime(df["metrics_date"])
# First, ffill all gaps in between dates in case any daily downloads failed
df = df.set_index(["metrics_date", "version_id"]).unstack().ffill()
# Then, backfill all dates prior to the existence of the datasets with 0 for the metric columns
idx = pd.IndexSlice
df.loc(axis=1)[idx[metrics_cols, :]] = (
df.loc(axis=1)[idx[metrics_cols, :]].fillna(0).astype(int)
)
# Backfill the metadata
df = df.bfill()
# # Convert the cumulative sum columns to diff columns
df.loc(axis=1)[idx[metrics_cols, :]] = (
df.loc(axis=1)[idx[metrics_cols, :]].diff().fillna(0)
)

new_df = df.stack()
# Rename the diff columns
new_df = new_df.rename({col: "new_" + col for col in metrics_cols}, axis=1)

return new_df.reset_index()

0 comments on commit 05d984a

Please sign in to comment.