From 9260a7adb34927c94fafea6b6147cdbafb02fd37 Mon Sep 17 00:00:00 2001 From: Willy Lulciuc Date: Thu, 31 Oct 2024 15:01:15 -0700 Subject: [PATCH] continued: Deprecate airflow quickstart Signed-off-by: Willy Lulciuc --- examples/airflow/README.md | 6 - examples/airflow/airflow.md | 314 +----------------------------------- 2 files changed, 4 insertions(+), 316 deletions(-) delete mode 100644 examples/airflow/README.md diff --git a/examples/airflow/README.md b/examples/airflow/README.md deleted file mode 100644 index 56dd8ce0c6..0000000000 --- a/examples/airflow/README.md +++ /dev/null @@ -1,6 +0,0 @@ -# `DEPRECATED` - -[![Project status](https://img.shields.io/badge/status-deprecated-orange.svg)]() - -This example has been moved to [`https://openlineage.io/docs/guides/airflow-quickstart`](https://openlineage.io/docs/guides/airflow-quickstart). -We also suggested [_Integrate OpenLineage and Airflow with Marquez_](https://www.astronomer.io/docs/learn/marquez) by [Astronomer](https://www.astronomer.io). \ No newline at end of file diff --git a/examples/airflow/airflow.md b/examples/airflow/airflow.md index 3f8cb504af..56dd8ce0c6 100644 --- a/examples/airflow/airflow.md +++ b/examples/airflow/airflow.md @@ -1,312 +1,6 @@ -# Getting Started with Airflow and OpenLineage+Marquez +# `DEPRECATED` -> **Note:** For a modified version of this guide that uses [Astro](https://www.astronomer.io/try-astro/?referral=docs-what-astro-banner) instead of vanilla Airflow, visit see the OpenLineage [docs](https://openlineage.io/docs/guides/airflow-quickstart). +[![Project status](https://img.shields.io/badge/status-deprecated-orange.svg)]() -In this example, we'll walk you through how to enable Airflow DAGs to send lineage metadata to [Marquez](https://marquezproject.ai/) using OpenLineage. - -### You’ll learn how to: - -* enable OpenLineage in Airflow -* write your very first OpenLineage-enabled DAG -* troubleshoot a failing DAG using Marquez - -# Prerequisites - -Before you begin, make sure you have installed: - -* [Docker 17.05](https://docs.docker.com/install)+ -* [Docker Compose](https://docs.docker.com/compose/install) - -> **Note:** We recommend that you have allocated at least **2 CPUs** and **8 GB** of memory to Docker. - -# Step 1: Setup - -First, if you haven't already, clone the Marquez repository and change into the [`examples/airflow`](https://github.com/MarquezProject/marquez/tree/main/examples/airflow) directory: - -```bash -git clone https://github.com/MarquezProject/marquez && cd marquez/examples/airflow -``` - -To make sure the latest [`openlineage-airflow`](https://pypi.org/project/openlineage-airflow) library is downloaded and installed when starting Airflow, you'll need to create a `requirements.txt` file with the following content: - -``` -apache-airflow-providers-openlineage -``` - -Next, we'll need to specify where we want Airflow to send DAG metadata. To do so, create a config file named `openlineage.env` with the following environment variables and values: - -```bash -OPENLINEAGE_URL=http://marquez:5000 # The URL of the HTTP backend -OPENLINEAGE_NAMESPACE=example # The namespace associated with the dataset, job, and run metadata collected -``` -> **Note:** The `openlineage.env` config file will be used by the `airflow`, `airflow_scheduler`, and `airflow_worker` containers to send lineage metadata to Marquez. - -> **Note:** The namespace refered above is the namespace of jobs, that model your DAG runs. The datasets itself reside in namespaces connected to their data sources, compatible with [OpenLineage naming.](https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md) - -Your `examples/airflow/` directory should now contain the following: - - ``` - . - ├── README.md - ├── docker - ├── docker-compose.yml - ├── docs - ├── openlineage.env - └── requirements.txt - - ``` - -# Step 2: Write Airflow DAGs using OpenLineage - -In this step, we'll create two new Airflow DAGs that perform simple tasks. The `counter` DAG generates a random number every minute, while the `sum` DAG calculates a sum every five minutes. This will result in a simple pipeline containing two jobs and two datasets. - -First, let's create the `dags/` folder where our example DAGs will be located: - -```bash -$ mkdir dags -``` - -When writing our DAGs, we'll use [`openlineage-airflow`](https://pypi.org/project/openlineage-airflow), enabling OpenLineage to observe the DAG and automatically collect task-level metadata. If you're using Airflow 2.3+ no further changes to your DAG code or configuration are needed. If you're using an older version of Airflow, please read [this](https://github.com/OpenLineage/OpenLineage/blob/main/integration/airflow/README.md#setup) to understand how to configure the Airflow integration. - -## Step 2.1: Create `counter` DAG - -Under `dags/`, create a file named `counter.py` and add the following code: - -```python -import random - -from airflow import DAG -from airflow.providers.postgres.operators.postgres import PostgresOperator -from airflow.utils.dates import days_ago - -default_args = { - 'owner': 'datascience', - 'depends_on_past': False, - 'start_date': days_ago(1), - 'email_on_failure': False, - 'email_on_retry': False, - 'email': ['datascience@example.com'] -} - -dag = DAG( - 'counter', - schedule_interval='*/1 * * * *', - catchup=False, - is_paused_upon_creation=False, - max_active_runs=1, - default_args=default_args, - description='DAG that generates a new count value between 1-10.' -) - -t1 = PostgresOperator( - task_id='if_not_exists', - postgres_conn_id='example_db', - sql=''' - CREATE TABLE IF NOT EXISTS counts ( - value INTEGER - );''', - dag=dag -) - -t2 = PostgresOperator( - task_id='inc', - postgres_conn_id='example_db', - sql=''' - INSERT INTO counts (value) - VALUES (%(value)s) - ''', - parameters={ - 'value': random.randint(1, 10) - }, - dag=dag -) - -t1 >> t2 -``` - -## Step 2.2: Create `sum` DAG - -In `dags/`, create a file named `sum.py` and add the following code: - -```python -from airflow import DAG -from airflow.providers.postgres.operators.postgres import PostgresOperator -from airflow.utils.dates import days_ago - -default_args = { - 'owner': 'datascience', - 'depends_on_past': False, - 'start_date': days_ago(1), - 'email_on_failure': False, - 'email_on_retry': False, - 'email': ['datascience@example.com'] -} - -dag = DAG( - 'sum', - schedule_interval='*/5 * * * *', - catchup=False, - is_paused_upon_creation=False, - max_active_runs=1, - default_args=default_args, - description='DAG that sums the total of generated count values.' -) - -t1 = PostgresOperator( - task_id='if_not_exists', - postgres_conn_id='example_db', - sql=''' - CREATE TABLE IF NOT EXISTS sums ( - value INTEGER - );''', - dag=dag -) - -t2 = PostgresOperator( - task_id='total', - postgres_conn_id='example_db', - sql=''' - INSERT INTO sums (value) - SELECT SUM(c.value) FROM counts AS c; - ''', - dag=dag -) - -t1 >> t2 -``` - -At this point, your `examples/airflow/` directory should look like this: - -``` -. -├── README.md -├── dags -│   ├── counter.py -│   └── sum.py -├── docker/ -├── docker-compose.yml -├── docs/ -├── openlineage.env -└── requirements.txt -``` - -# Step 3: Start Airflow with Marquez - -Now that we have our DAGs defined and OpenLineage is enabled in Airflow, we can run the example! To start Airflow, run: - -```bash -$ docker-compose up -``` - -> **Tip:** Use `-d` to run in detached mode. - -**The above command will:** - -* start Airflow and install `openlineage-airflow` -* start Marquez -* start Postgres - -To view the Airflow UI and verify it's running, open [http://localhost:8080](http://localhost:8080). Then, log in using the username and password `airflow` / `airflow`. You can also browse to [http://localhost:3000](http://localhost:3000) to view the Marquez UI. - -# Step 4: View Collected Metadata - -To ensure that Airflow is executing `counter` and `sum`, navigate to the DAGs tab in Airflow and verify that they are both enabled and are in a _running_ state: - -![](./docs/airflow-view-dag.png) - -To view DAG metadata collected by Marquez from Airflow, browse to the Marquez UI by visiting [http://localhost:3000](http://localhost:3000). Then, use the _search_ bar in the upper right-side of the page and search for the `counter.inc` job. To view lineage metadata for `counter.inc`, click on the job from the drop-down list: - -> **Note:** If the `counter.inc` job is not in the drop-down list, check to see if Airflow has successfully executed the DAG. - -

- -

- -If you take a quick look at the lineage graph for `counter.if_not_exists`, you should see `example.public.counts` as an output dataset and `sum.total` as a downstream job! - -![](./docs/current-lineage-view-job.png) - -# Step 5: Troubleshoot a Failing DAG with Marquez - -In this step, let's quickly walk through a simple troubleshooting scenario where DAG `sum` begins to fail as the result of an upstream schema change for table `counts`. So, let's get to it! - -> **Tip:** It's helpful to also apply the same code changes outlined below to your Airflow DAGs defined in **Step 2**. - -Let's say team `A` owns the DAG `counter`. Team `A` decides to update the `t1` task in `counter` to rename the `values` column in the `counts` table to `value_1_to_10` (without properly communicating the schema change!): - -```diff -t1 = PostgresOperator( -- task_id='if_not_exists', -+ task_id='alter_name_of_column', - postgres_conn_id='example_db', - sql=''' -- CREATE TABLE IF NOT EXISTS counts ( -- value INTEGER -- );''', -+ DO $$ -+ BEGIN -+ IF EXISTS(SELECT * -+ FROM information_schema.columns -+ WHERE table_name='counts' and column_name='value') -+ THEN -+ ALTER TABLE "counts" RENAME COLUMN "value" TO "value_1_to_10"; -+ END IF; -+ END $$; - ''', - dag=dag -) -``` - -```diff -t2 = PostgresOperator( - task_id='inc', - postgres_conn_id='example_db', - sql=''' -- INSERT INTO counts (value) -+ INSERT INTO counts (value_1_to_10) - VALUES (%(value)s) - ''', - parameters={ - 'value': random.randint(1, 10) - }, - dag=dag -) -``` - -Team `B`, unaware of the schema change, owns DAG `sum` and begins to see DAG run metadata with _failed_ run states: - -![](./docs/search-job-failure.png) - -But, team `B` isn't sure what might have caused the DAG failure as no recent code changes have been made to DAG `sum`. So, team `B` decides to check the schema of the input dataset: - -![](./docs/lineage-view-dataset.png) - -Team `B` soon realizes that the schema has changed recently for the `counts` table! To fix the DAG `sum`, team `B` updates the `t2` task that calcuates the count total to use the new column name: - -```diff -t2 = PostgresOperator( - task_id='total', - postgres_conn_id='example_db', - sql=''' - INSERT INTO sums (value) -- SELECT SUM(c.value) FROM counts AS c; -+ SELECT SUM(c.value_1_to_10) FROM counts AS c; - ''', - dag=dag -) -``` - -With the code change, the DAG `sum` begins to run successfully: - -![](./docs/lineage-view-job-successful.png) - -_Congrats_! You successfully step through a troubleshooting scenario of a failing DAG using metadata collected with Marquez! You can now add your own DAGs to `dags/` to build more complex data lineage graphs. - -# Next Steps - -* Review the Marquez [HTTP API](https://marquezproject.github.io/marquez/openapi.html) used to collect Airflow DAG metadata and learn how to build your own integrations using OpenLineage -* Take a look at [`openlineage-spark`](https://openlineage.io/docs/integrations/spark/) integration that can be used with Airflow - -# Feedback - -What did you think of this example? You can reach out to us on [slack](https://bit.ly/Marquez_Slack_invite) and leave us feedback, or [open a pull request](https://github.com/MarquezProject/marquez/blob/main/CONTRIBUTING.md#submitting-a-pull-request) with your suggestions! +This example has been moved to [`https://openlineage.io/docs/guides/airflow-quickstart`](https://openlineage.io/docs/guides/airflow-quickstart). +We also suggested [_Integrate OpenLineage and Airflow with Marquez_](https://www.astronomer.io/docs/learn/marquez) by [Astronomer](https://www.astronomer.io). \ No newline at end of file