Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create S3 production/cloud processing pipeline and update dagster infrastructure #163

Merged
merged 22 commits into from
Sep 17, 2024

Conversation

e-belfer
Copy link
Member

@e-belfer e-belfer commented Sep 11, 2024

Overview

Closes #147.

What problem does this address?

What did you change in this PR?

  • Update PostgresIOManager to use dagster's io_manager decorator, and create SQLIOManager class to hold methods common to both SQL database types
  • Add partition_key to load_input and handle_output functions for both IO managers
  • Add parse_dates handling to prevent dtypes from changing when reading in and out of SQL databases
  • Add alembic to handle database schema changes
  • Add an environment variable to distinguish between local and prod database runs
  • Update README
  • When writing partitions to SQLite or Postgres databases, only add in records whose primary keys aren't in the dataset yet (this marks partitions as successful rather than failed when they are re-run).
  • Update GHA to run all_metrics_etl weekly and send update to pudl-deployments

Testing

How did you make sure this worked? How can a reviewer verify this?
Run all_etl locally.

To-do list

Tasks

Preview Give feedback

@e-belfer e-belfer linked an issue Sep 11, 2024 that may be closed by this pull request
@e-belfer e-belfer self-assigned this Sep 11, 2024
@e-belfer e-belfer added the s3 Relating to S3 usage metrics label Sep 11, 2024
README.md Outdated Show resolved Hide resolved
@e-belfer e-belfer changed the title Create S3 processing pipeline and update dagster infrastructure Create S3 production/cloud processing pipeline and update dagster infrastructure Sep 11, 2024
@e-belfer e-belfer marked this pull request as ready for review September 13, 2024 14:59
@e-belfer e-belfer requested a review from bendnorman September 13, 2024 14:59
Copy link
Member

@bendnorman bendnorman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good! I left a few requests and questions for ya.

.github/workflows/load-metrics.yml Show resolved Hide resolved
.github/workflows/load-metrics.yml Show resolved Hide resolved
README.md Show resolved Hide resolved
migrations/env.py Outdated Show resolved Hide resolved
migrations/env.py Outdated Show resolved Hide resolved
src/usage_metrics/etl/__init__.py Outdated Show resolved Hide resolved
src/usage_metrics/resources/postgres.py Outdated Show resolved Hide resolved
src/usage_metrics/resources/postgres.py Outdated Show resolved Hide resolved
src/usage_metrics/resources/sqlite.py Outdated Show resolved Hide resolved
src/usage_metrics/resources/sqlite.py Outdated Show resolved Hide resolved
@e-belfer e-belfer requested a review from bendnorman September 17, 2024 14:28
Copy link
Member

@bendnorman bendnorman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! Just a few non blocking requests about comments.

.github/workflows/load-metrics.yml Show resolved Hide resolved
README.md Show resolved Hide resolved
Comment on lines 92 to 102
resources_by_env = { # STILL TO DO!
"prod": {"database_manager": postgres_manager},
"local": {"database_manager": sqlite_manager},
}

resources = resources_by_env[os.getenv("METRICS_PROD_ENV", "local")]

defs: Definitions = Definitions(
assets=default_assets,
# asset_checks=default_asset_checks,
resources={"database_manager": sqlite_manager}, # TODO: How to handle this?
resources=resources, # TODO: How to handle this?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can these comments be removed?

@@ -73,4 +75,6 @@ def raw_s3_logs(context: AssetExecutionContext) -> pd.DataFrame:
weekly_dfs.append(pd.read_csv(path, delimiter=" ", header=None))
except pd.errors.EmptyDataError:
context.log.warnings(f"{path} is an empty file, couldn't read.")
return pd.concat(weekly_dfs)
if weekly_dfs: # If data
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remove this comment or make it more descriptive, please?

@@ -12,7 +13,9 @@
AssetsDefinition,
AssetSelection,
Definitions,
ExperimentalWarning,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't look like this is being used anywhere in the module.

src/usage_metrics/resources/sqlite.py Outdated Show resolved Hide resolved
@e-belfer e-belfer merged commit ff0f41f into main Sep 17, 2024
5 checks passed
@e-belfer e-belfer deleted the duckdb_io branch September 17, 2024 19:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
s3 Relating to S3 usage metrics
Projects
Archived in project
Development

Successfully merging this pull request may close these issues.

Automate local S3 ETL in cloud
2 participants