Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Google Batch for full ETL runs #3211

Merged
merged 6 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 20 additions & 26 deletions .github/workflows/build-deploy-pudl.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ env:
GCE_INSTANCE: pudl-deployment-tag # This is changed to pudl-deployment-dev if running on a schedule
GCE_INSTANCE_ZONE: ${{ secrets.GCE_INSTANCE_ZONE }}
GCS_OUTPUT_BUCKET: gs://builds.catalyst.coop
BATCH_JOB_JSON: batch_job.json

jobs:
build_and_deploy_pudl:
Expand Down Expand Up @@ -115,17 +116,14 @@ jobs:
uses: google-github-actions/setup-gcloud@v2

# Deploy PUDL image to GCE
- name: Deploy
# Dagster Postgres connection configured in docker/dagster.yaml - otherwise we get a str for port num
- name: Make GCP Batch config file
if: ${{ env.SKIP_BUILD != 'true' }}
env:
DAGSTER_PG_PASSWORD: ${{ secrets.DAGSTER_PG_PASSWORD }}
PUDL_GCS_OUTPUT: ${{ env.GCS_OUTPUT_BUCKET }}/${{ env.BUILD_ID }}
run: |-
gcloud compute instances add-metadata "$GCE_INSTANCE" \
--zone "$GCE_INSTANCE_ZONE" \
--metadata-from-file startup-script=./docker/vm_startup_script.sh
gcloud compute instances update-container "$GCE_INSTANCE" \
--zone "$GCE_INSTANCE_ZONE" \
./devtools/generate_batch_config.py \
--output ${{ env.BATCH_JOB_JSON }} \
--container-image "docker.io/catalystcoop/pudl-etl@${{ steps.docker-build.outputs.digest }}" \
--container-command "micromamba" \
--container-arg="run" \
Expand All @@ -135,40 +133,36 @@ jobs:
--container-arg='' \
--container-arg="bash" \
--container-arg="./docker/gcp_pudl_etl.sh" \
--container-env BUILD_REF=${{ github.ref_name }} \
zaneselvans marked this conversation as resolved.
Show resolved Hide resolved
--container-env AWS_ACCESS_KEY_ID=${{ secrets.AWS_ACCESS_KEY_ID }} \
--container-env AWS_DEFAULT_REGION=${{ secrets.AWS_DEFAULT_REGION }} \
--container-env AWS_SECRET_ACCESS_KEY=${{ secrets.AWS_SECRET_ACCESS_KEY }} \
--container-env BUILD_ID=${{ env.BUILD_ID }} \
--container-env NIGHTLY_TAG=${{ env.NIGHTLY_TAG }} \
--container-env GITHUB_ACTION_TRIGGER=${{ github.event_name }} \
--container-env SLACK_TOKEN=${{ secrets.PUDL_DEPLOY_SLACK_TOKEN }} \
--container-env BUILD_REF=${{ env.BUILD_REF }} \
--container-env FLY_ACCESS_TOKEN=${{ secrets.FLY_ACCESS_TOKEN }} \
--container-env GCE_INSTANCE=${{ env.GCE_INSTANCE }} \
--container-env GCE_INSTANCE_ZONE=${{ env.GCE_INSTANCE_ZONE }} \
--container-env GCP_BILLING_PROJECT=${{ secrets.GCP_BILLING_PROJECT }} \
--container-env AWS_ACCESS_KEY_ID=${{ secrets.AWS_ACCESS_KEY_ID }} \
--container-env AWS_SECRET_ACCESS_KEY=${{ secrets.AWS_SECRET_ACCESS_KEY }} \
--container-env AWS_DEFAULT_REGION=${{ secrets.AWS_DEFAULT_REGION }} \
--container-env DAGSTER_PG_USERNAME="postgres" \
--container-env DAGSTER_PG_PASSWORD="$DAGSTER_PG_PASSWORD" \
--container-env DAGSTER_PG_HOST="104.154.182.24" \
--container-env DAGSTER_PG_DB="dagster-storage" \
Comment on lines -149 to -152
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These guys got moved into docker/dagster.yaml because:

  • I wanted all the dagster postgres configs in one place
  • dagster postgres port configuration has to be in dagster.yaml, otherwise we run into "port is a string, not an int" issues

--container-env FLY_ACCESS_TOKEN=${{ secrets.FLY_ACCESS_TOKEN }} \
--container-env GITHUB_ACTION_TRIGGER=${{ github.event_name }} \
--container-env NIGHTLY_TAG=${{ env.NIGHTLY_TAG }} \
--container-env PUDL_BOT_PAT=${{ secrets.PUDL_BOT_PAT }} \
--container-env PUDL_GCS_OUTPUT=${{ env.PUDL_GCS_OUTPUT }} \
--container-env PUDL_SETTINGS_YML="/home/mambauser/pudl/src/pudl/package_data/settings/etl_full.yml" \
--container-env SLACK_TOKEN=${{ secrets.PUDL_DEPLOY_SLACK_TOKEN }} \
--container-env ZENODO_SANDBOX_TOKEN_PUBLISH=${{ secrets.ZENODO_SANDBOX_TOKEN_PUBLISH }} \
--container-env ZENODO_TARGET_ENV=${{ (startsWith(github.ref_name, 'v20') && 'production') || 'sandbox' }} \
--container-env ZENODO_TOKEN_UPLOAD=${{ secrets.ZENODO_TOKEN_UPLOAD }} \
--container-env PUDL_SETTINGS_YML="/home/mambauser/pudl/src/pudl/package_data/settings/etl_full.yml" \
--container-env PUDL_GCS_OUTPUT=${{ env.PUDL_GCS_OUTPUT }}
--output ${{ env.BATCH_JOB_JSON }}

# Start the VM
- name: Start the deploy-pudl-vm
if: ${{ env.SKIP_BUILD != 'true' }}
run: gcloud compute instances start "$GCE_INSTANCE" --zone="$GCE_INSTANCE_ZONE"
# Start the batch job
- name: Kick off batch job
run: gcloud batch jobs submit run-etl-${{ env.BUILD_ID }} --config ${{ env.BATCH_JOB_JSON }} --location us-west1

- name: Post to a pudl-deployments channel
id: slack
uses: slackapi/slack-github-action@v1
with:
channel-id: "C03FHB9N0PQ"
slack-message: "build-deploy-pudl status: ${{ (env.SKIP_BUILD == 'true') && 'skipped' || job.status }}\n${{ env.GCS_OUTPUT_BUCKET }}/${{ env.BUILD_ID }}"
slack-message: "`${{ env.BUILD_ID }}` build-deploy-pudl status: ${{ (env.SKIP_BUILD == 'true') && 'skipped' || job.status }}\n${{ env.GCS_OUTPUT_BUCKET }}/${{ env.BUILD_ID }}"
env:
channel-id: "C03FHB9N0PQ"
SLACK_BOT_TOKEN: ${{ secrets.PUDL_DEPLOY_SLACK_TOKEN }}
102 changes: 102 additions & 0 deletions devtools/generate_batch_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#! /usr/bin/env python

"""Generate a Google Batch Job configuration file.

Since we're running this from a GHA runner that doesn't have our whole
environment installed, just uses stdlib.

Current shape is intended as a drop-in replacement for `gcloud compute
instances update-container`.
"""

import argparse
import itertools
import json
import logging
from collections import OrderedDict
from pathlib import Path

logging.basicConfig()
logger = logging.getLogger()


def _flat(ls: list[list]) -> list:
return list(itertools.chain.from_iterable(ls))


def to_config(
*,
container_image: str,
container_env: list[list[str]],
container_command: str,
container_arg: str,
) -> dict:
"""Munge arguments into a configuration dictionary."""
complete_env = sorted(_flat(container_env))
env_dict = OrderedDict(
(name, value.strip('"'))
for name, value in (pair.split("=", maxsplit=1) for pair in complete_env)
)

# NOTE (daz): the best documentation of the actual data structure I've found is at
# https://cloud.google.com/python/docs/reference/batch/latest/google.cloud.batch_v1.types.Job
config = {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I briefly thought about doing some dataclass thing here instead of a dictionary, but that seemed super verbose so I abandoned that idea.

"taskGroups": [
{
"taskSpec": {
"runnables": [
{
"container": {
"imageUri": container_image,
"commands": [container_command] + _flat(container_arg),
},
"environment": {"variables": env_dict},
}
],
"computeResource": {
"cpuMilli": 8000,
"memoryMib": 32 * 1024,
zaneselvans marked this conversation as resolved.
Show resolved Hide resolved
"bootDiskMib": 80 * 1024,
},
"maxRunDuration": f"{60 * 60 * 12}s",
}
}
],
"allocationPolicy": {
"serviceAccount": {
"email": "deploy-pudl-vm-service-account@catalyst-cooperative-pudl.iam.gserviceaccount.com"
},
},
"logsPolicy": {"destination": "CLOUD_LOGGING"},
}
return config


def generate_batch_config():
"""Generate a Batch configuration file.

Take almost all the same arguments as `gcloud compute instances
update-container`, but output a Batch configuration json instead.
"""
parser = argparse.ArgumentParser()
parser.add_argument("--container-image")
parser.add_argument("--container-command")
parser.add_argument("--container-env", action="append", nargs="*", default=[])
parser.add_argument("--container-arg", action="append", nargs="*", default=[])
parser.add_argument("--output", type=Path)
args = parser.parse_args()

config = to_config(
container_image=args.container_image,
container_command=args.container_command,
container_arg=args.container_arg,
container_env=args.container_env,
)

logger.info(f"Writing to {args.output}")
with args.output.open("w") as f:
f.write(json.dumps(config, indent=2))


if __name__ == "__main__":
generate_batch_config()
13 changes: 12 additions & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
FROM mambaorg/micromamba:1.5.6

ENV PGDATA=${CONTAINER_HOME}/pgdata

USER root

SHELL [ "/bin/bash", "-exo", "pipefail", "-c" ]
Expand All @@ -8,14 +10,23 @@ SHELL [ "/bin/bash", "-exo", "pipefail", "-c" ]
# awscli requires unzip, less, groff and mandoc
# hadolint ignore=DL3008
RUN apt-get update && \
apt-get install --no-install-recommends -y git jq unzip less groff mandoc && \
apt-get install --no-install-recommends -y git jq unzip less groff mandoc postgresql && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*

# Configure gsutil authentication
# hadolint ignore=DL3059
RUN printf '[GoogleCompute]\nservice_account = default' > /etc/boto.cfg

# Add mamba user to postgres group
# hadolint ignore=DL3059
RUN usermod -aG postgres "$MAMBA_USER"

# Create new cluster for Dagster usage that's owned by $MAMBA_USER.
# When the PG major version changes we'll have to update this from 15 to 16
# hadolint ignore=DL3059
RUN pg_createcluster 15 dagster -u "$MAMBA_USER" -- -A trust

# Switch back to being non-root user and get into the home directory
USER $MAMBA_USER
ENV CONTAINER_HOME=/home/$MAMBA_USER
Expand Down
14 changes: 5 additions & 9 deletions docker/dagster.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
storage:
postgres:
postgres_db:
username:
env: DAGSTER_PG_USERNAME
password:
env: DAGSTER_PG_PASSWORD
hostname:
env: DAGSTER_PG_HOST
db_name:
env: DAGSTER_PG_DB
port: 5432
username: dagster
password: dagster_password
hostname: 127.0.0.1
db_name: dagster
port: 5433
45 changes: 30 additions & 15 deletions docker/gcp_pudl_etl.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,28 @@ function authenticate_gcp() {
gcloud config set project "$GCP_BILLING_PROJECT"
}

function initialize_postgres() {
echo "initializing postgres."
# This is sort of a fiddly set of postgres admin tasks:
#
# 1. start the dagster cluster, which is set to be owned by mambauser in the Dockerfile
# 2. create a db within this cluster so we can do things
# 3. tell it to actually fail when we mess up, instead of continuing blithely
# 4. create a *dagster* user, whose creds correspond with those in docker/dagster.yaml
# 5. make a database for dagster, which is owned by the dagster user
#
# When the PG major version changes we'll have to update this from 15 to 16
pg_ctlcluster 15 dagster start && \
createdb -h127.0.0.1 -p5433 && \
psql -v "ON_ERROR_STOP=1" -h127.0.0.1 -p5433 && \
psql -c "CREATE USER dagster WITH SUPERUSER PASSWORD 'dagster_password'" -h127.0.0.1 -p5433 && \
psql -c "CREATE DATABASE dagster OWNER dagster" -h127.0.0.1 -p5433
}

function run_pudl_etl() {
echo "Running PUDL ETL"
send_slack_msg ":large_yellow_circle: Deployment started for $BUILD_ID :floppy_disk:"
initialize_postgres && \
authenticate_gcp && \
alembic upgrade head && \
ferc_to_sqlite \
Expand All @@ -45,19 +64,10 @@ function run_pudl_etl() {
--gcs-cache-path gs://internal-zenodo-cache.catalyst.coop \
--etl-settings "$PUDL_SETTINGS_YML" \
--live-dbs test/validate \
&& pg_ctlcluster 15 dagster stop \
&& touch "$PUDL_OUTPUT/success"
}

function shutdown_vm() {
upload_file_to_slack "$LOGFILE" "pudl_etl logs for $BUILD_ID:"
# Shut down the vm instance when the etl is done.
echo "Shutting down VM."
ACCESS_TOKEN=$(curl \
"http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token" \
-H "Metadata-Flavor: Google" | jq -r '.access_token')
curl -X POST -H "Content-Length: 0" -H "Authorization: Bearer ${ACCESS_TOKEN}" "https://compute.googleapis.com/compute/v1/projects/catalyst-cooperative-pudl/zones/$GCE_INSTANCE_ZONE/instances/$GCE_INSTANCE/stop"
}

function save_outputs_to_gcs() {
echo "Copying outputs to GCP bucket $PUDL_GCS_OUTPUT" && \
gsutil -m cp -r "$PUDL_OUTPUT" "$PUDL_GCS_OUTPUT" && \
Expand Down Expand Up @@ -115,10 +125,11 @@ function zenodo_data_release() {
function notify_slack() {
# Notify pudl-deployment slack channel of deployment status
echo "Notifying Slack about deployment status"
message='# `${BUILD_ID}` status\n\n'
if [[ "$1" == "success" ]]; then
message=":large_green_circle: :sunglasses: :unicorn_face: :rainbow: The deployment succeeded!! :partygritty: :database_parrot: :blob-dance: :large_green_circle:\n\n"
message+=":large_green_circle: :sunglasses: :unicorn_face: :rainbow: deployment succeeded!! :partygritty: :database_parrot: :blob-dance: :large_green_circle:\n\n"
elif [[ "$1" == "failure" ]]; then
message=":x: Oh bummer the deployment failed :fiiiiine: :sob: :cry_spin: :x:\n\n"
message+=":x: Oh bummer the deployment failed :fiiiiine: :sob: :cry_spin: :x:\n\n"
else
echo "Invalid deployment status"
exit 1
Expand All @@ -133,9 +144,14 @@ function notify_slack() {
message+="DISTRIBUTION_BUCKET_SUCCESS: $DISTRIBUTION_BUCKET_SUCCESS\n"
message+="ZENODO_SUCCESS: $ZENODO_SUCCESS\n\n"

message+="See https://console.cloud.google.com/storage/browser/builds.catalyst.coop/$BUILD_ID for logs and outputs."
message+="*Query* logs on <https://console.cloud.google.com/batch/jobsDetail/regions/us-west1/jobs/run-etl-$BUILD_ID/logs?project=catalyst-cooperative-pudl|Google Batch Console>.\n\n"

message+="*Download* logs at <https://console.cloud.google.com/storage/browser/_details/builds.catalyst.coop/$BUILD_ID/$BUILD_ID-pudl-etl.log|gs://builds.catalyst.coop/${BUILD_ID}/${BUILD_ID}-pudl-etl.log>\n\n"

message+="Get *full outputs* at <https://console.cloud.google.com/storage/browser/builds.catalyst.coop/$BUILD_ID|gs://builds.catalyst.coop/${BUILD_ID}>."

send_slack_msg "$message"
upload_file_to_slack "$LOGFILE" "$BUILD_ID logs:"
}

function update_nightly_branch() {
Expand Down Expand Up @@ -232,6 +248,5 @@ if [[ $ETL_SUCCESS == 0 && \
notify_slack "success"
else
notify_slack "failure"
exit 1
fi

shutdown_vm
Loading