Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create S3 production/cloud processing pipeline and update dagster infrastructure #163

Merged
merged 22 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions .github/workflows/load-metrics.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ name: load-metrics

on:
workflow_dispatch:
# schedule:
# - cron: "14 0 * * *" # Every day at 12:14 AM UTC
schedule:
- cron: "14 7 * * 1" # Every Monday at 7:14 AM UTC

env:
PRESET_IP1: 44.193.153.196
PRESET_IP2: 52.70.123.52
PRESET_IP3: 54.83.88.93
METRICS_PROD_ENV: "prod"

jobs:
load-metrics:
Expand Down Expand Up @@ -57,7 +58,7 @@ jobs:
run: |
gcloud sql instances patch ${{ secrets.GCSQL_INSTANCE_NAME }} --authorized-networks=${{ steps.ip.outputs.ipv4 }},${{ env.PRESET_IP1 }},${{ env.PRESET_IP2 }},${{ env.PRESET_IP3 }}

- name: Run ETL
- name: Run ETL on the latest full week of data
env:
IPINFO_TOKEN: ${{ secrets.IPINFO_TOKEN }}
POSTGRES_IP: ${{ secrets.POSTGRES_IP }}
Expand Down
75 changes: 47 additions & 28 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,32 @@ This is the project structure generated by the [dagster cli](https://docs.dagste

# Setup

## Conda Environment
## Monda Environment
e-belfer marked this conversation as resolved.
Show resolved Hide resolved

We use the conda package manager to specify and update our development environment. We recommend using [miniconda](https://docs.conda.io/en/latest/miniconda.html) rather than the large pre-defined collection of scientific packages bundled together in the Anaconda Python distribution. You may also want to consider using [mamba](https://github.com/mamba-org/mamba) – a faster drop-in replacement for conda written in C++.
We use the mamba package manager to specify and update our development environment.

```
conda update conda
conda env create --name pudl-usage-metrics --file environment.yml
conda activate pudl-usage-metrics
mamba update mamba
mamba env create --name pudl-usage-metrics --file environment.yml
mamba activate pudl-usage-metrics
```

## Environment Variables

The ETL uses [ipinfo](https://ipinfo.io/) to geocode ip addresses. You need to obtain an ipinfo API token and store it in the `IPINFO_TOKEN` environment variable.

If you want to take advantage of caching raw logs, rather than redownloading them for each run, you can set the optional ``DATA_DIR`` environment variable. If this is not set, the script will save files to a temporary directory by default. This is highly recommended to avoid unnecessary egress charges.
If you want to take advantage of caching raw logs, rather than redownloading them for each run, you can set the optional ``DATA_DIR`` environment variable. If this is not set, the script will save files to a temporary directory by default.

Dagster stores run logs and caches in a directory stored in the `DAGSTER_HOME` environment variable. The `usage_metrics/dagster_home/dagster.yaml` file contains configuration for the dagster instance. **Note:** The `usage_metrics/dagster_home/storage` directory could grow to become a couple GBs because all op outputs for every run are stored there. You can read more about the dagster_home directory in the [dagster docs](https://docs.dagster.io/deployment/dagster-instance#default-local-behavior).

To set these environment variables, run these commands:

```
conda activate pudl-usage-metrics
conda env config vars set IPINFO_TOKEN="{your_api_key_here}"
conda env config vars set DAGSTER_HOME="$(pwd)/dagster_home/"
conda env config vars set DATA_DIR="$(pwd)/data/"
conda activate pudl-usage-metrics
mamba activate pudl-usage-metrics
mamba env config vars set IPINFO_TOKEN="{your_api_key_here}"
mamba env config vars set DAGSTER_HOME="$(pwd)/dagster_home/"
mamba env config vars set DATA_DIR="$(pwd)/data/"
mamba activate pudl-usage-metrics
```

## Google Cloud Permissions
Expand Down Expand Up @@ -85,7 +85,7 @@ When running backfills, this prevents you from kicking off 80 concurrent runs th
In one terminal window start the dagster-daemon and UI by running these commands:

```
conda activate pudl-usage-metrics
mamba activate pudl-usage-metrics
dagster dev -m usage_metrics.etl
```

Expand All @@ -109,42 +109,61 @@ You can run the ETL via the dagit UI or the [dagster CLI](https://docs.dagster.i

To run a a complete backfill from the Dagit UI go to the job's partitions tab. Then click on the "Launch Backfill" button in the upper left corner of the window. This should bring up a new window with a list of partitions. Click "Select All" and then click the "Submit" button. This will submit a run for each partition. You can follow the runs on the ["Runs" tab](http://localhost:3000/instance/runs).

### Databases
### Local vs. production development

#### SQLite
The choice between local development (written to an SQLite database) and production development (written to a Google CloudSQL Postgres database) is determined through the `METRIC_PROD_ENV` environment variable. By default, if this is not set you will develop locally. To set this variable to develop in production, run the following:

Jobs in the `local_usage_metrics` dagster repository create a sqlite database called `usage_metrics.db` in the `usage_metrics/data/` directory. A primary key constraint error will be thrown if you rerun the ETL for a partition. If you want to recreate the entire database just delete the sqlite database and rerun the ETL.
```
mamba env config vars set METRIC_PROD_ENV='prod'
mamba activate pudl-usage-metrics
```

To revert to local development, set `METRIC_PROD_ENV='local'`.

#### Schema management
We use Alembic to manage the schemas of both local and production databases. Whenever a new column or table is added, run the following commands to create a new schema migration and then upgrade the database schema to match using the following code:

```
alembic revision --autogenerate -m "Add my cool new table"
alembic upgrade head
```

Because of the primary key constraints, if you need to rerun a partition that has already been run before you'll need to delete the database and start over. If you're adding a new table or datasource, run a backfill just for that dataset's particular job to avoid this constraint.

#### Local development (SQLite)

Local development will create a sqlite database called `usage_metrics.db` in the `usage_metrics/data/` directory. A primary key constraint error will be thrown if you rerun the ETL for a partition. If you want to recreate the entire database just delete the sqlite database and rerun the ETL.

#### Google Cloud SQL Postgres
#### Production development (Google Cloud SQL Postgres)

Jobs in the `gcp_usage_metrics` dagster repository append new partitions to tables in a Cloud SQL postgres database. A primary key constraint error will be thrown if you rerun the ETL for a partition. The `load-metrics` GitHub action is responsible for updating the database with new partitioned data.
Production runs will append new partitions to tables in a Cloud SQL postgres database. A primary key constraint error will be thrown if you rerun the ETL for a partition. The `load-metrics` GitHub action is responsible for updating the database with new partitioned data.

If a new column is added or data is processed in a new way, you'll have to delete the table in the database and rerun a complete backfill. **Note: The Preset dashboard will be unavailable during the complete backfill.**
If a new column is added or data is processed in a new way, you'll have to delete the table in the database and rerun a complete backfill.

To run jobs in the `gcp_usage_metrics` repo, you need to whitelist your ip address for the database:

```
gcloud sql instances patch pudl-usage-metrics-db --authorized-networks={YOUR_IP_ADDRESS}
```

Then add the connection details as environment variables to your conda environment:
Then add the connection details as environment variables to your mamba environment:

```
conda activate pudl-usage-metrics
conda env config vars set POSTGRES_IP={PUDL_USAGE_METRICS_DB_IP}
conda env config vars set POSTGRES_USER={PUDL_USAGE_METRICS_DB_USER}
conda env config vars set POSTGRES_PASSWORD={PUDL_USAGE_METRICS_DB_PASSWORD}
conda env config vars set POSTGRES_DB={PUDL_USAGE_METRICS_DB_DB}
conda env config vars set POSTGRES_PORT={PUDL_USAGE_METRICS_DB_PORT}
conda activate pudl-usage-metrics
mamba activate pudl-usage-metrics
mamba env config vars set POSTGRES_IP={PUDL_USAGE_METRICS_DB_IP}
mamba env config vars set POSTGRES_USER={PUDL_USAGE_METRICS_DB_USER}
mamba env config vars set POSTGRES_PASSWORD={PUDL_USAGE_METRICS_DB_PASSWORD}
mamba env config vars set POSTGRES_DB={PUDL_USAGE_METRICS_DB_DB}
mamba env config vars set POSTGRES_PORT={PUDL_USAGE_METRICS_DB_PORT}
mamba activate pudl-usage-metrics
```

You can find the connection details in the
Ask a member of Inframundo for the connection details.
bendnorman marked this conversation as resolved.
Show resolved Hide resolved

### IP Geocoding with ipinfo

The ETL uses [ipinfo](https://ipinfo.io/) for geocoding the user ip addresses which provides 50k free API requests a month. The `usage_metrics.helpers.geocode_ip()` function using [joblib](https://joblib.readthedocs.io/en/latest/#main-features) to cache API calls so we don't call the API multiple times for a single ip address. The first time you run the ETL no API calls will be cached so the `geocode_ips()` op will take a while to complete.

## Add new data sources

To add a new data source to the dagster repo, add new modules to the `raw` and `core` and `out` directories and add these modules to the corresponding jobs. Once the dataset has been tested locally, run a complete backfill for the job that uses the `PostgresManager` to populate the Cloud SQL database.
To add a new data source to the dagster repo, add new modules to the `raw` and `core` and `out` directories and add these modules to the corresponding jobs. Once the dataset has been tested locally, run a complete backfill for the job with `METRIC_PROD_ENV="prod"` to populate the Cloud SQL database.
107 changes: 107 additions & 0 deletions alembic.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# A generic, single database configuration.

[alembic]
# path to migration scripts
script_location = migrations

# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s

# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
prepend_sys_path = .

# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python-dateutil library that can be
# installed by adding `alembic[tz]` to the pip requirements
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =

# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40

# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false

# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false

# version location specification; This defaults
# to migrations/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "version_path_separator" below.
# version_locations = %(here)s/bar:%(here)s/bat:migrations/versions

# version path separator; As mentioned above, this is the character used to split
# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep.
# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas.
# Valid values for version_path_separator are:
#
# version_path_separator = :
# version_path_separator = ;
# version_path_separator = space
version_path_separator = os # Use os.pathsep. Default configuration used for new projects.

# set to 'true' to search source files recursively
# in each "version_locations" directory
# new in Alembic version 1.10
# recursive_version_locations = false

# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8

[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples

# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME

# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic

[handlers]
keys = console

[formatters]
keys = generic

[logger_root]
level = INFO
handlers = console
qualname =

[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine

[logger_alembic]
level = WARN
handlers =
qualname = alembic

[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic

[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
1 change: 1 addition & 0 deletions migrations/README
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Generic single-database configuration.
96 changes: 96 additions & 0 deletions migrations/env.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
"""Environment configuration for alembic."""

import logging
import os
from logging.config import fileConfig

from alembic import context
from sqlalchemy import engine_from_config, pool

from usage_metrics.models import usage_metrics_metadata
from usage_metrics.resources.postgres import PostgresIOManager # TO DO: Set up paths!!
e-belfer marked this conversation as resolved.
Show resolved Hide resolved
from usage_metrics.resources.sqlite import SQLiteIOManager

# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
config.include_schemas = True

# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)

logger = logging.getLogger("root")


# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = usage_metrics_metadata

# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
dev_envr = os.getenv("METRICS_PROD_ENV", "local")
engines = {"prod": PostgresIOManager().engine, "local": SQLiteIOManager().engine}

logger.info(f"Configuring database for {dev_envr} database")

# Get URL from engine based on METRICS_PROD_ENV environment variable and handle escaping %ages
db_location = (
engines[dev_envr].url.render_as_string(hide_password=False).replace("%", "%%")
)
bendnorman marked this conversation as resolved.
Show resolved Hide resolved
config.set_main_option("sqlalchemy.url", db_location)


def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.

This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.

Calls to context.execute() here emit the given string to the
script output.

"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)

with context.begin_transaction():
context.run_migrations()


def run_migrations_online() -> None:
"""Run migrations in 'online' mode.

In this scenario we need to create an Engine
and associate a connection with the context.

"""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)

with connectable.connect() as connection:
context.configure(connection=connection, target_metadata=target_metadata)

with context.begin_transaction():
context.run_migrations()


if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
26 changes: 26 additions & 0 deletions migrations/script.py.mako
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""${message}

Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa
${imports if imports else ""}

# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}


def upgrade() -> None:
${upgrades if upgrades else "pass"}


def downgrade() -> None:
${downgrades if downgrades else "pass"}
Loading
Loading