Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement running Prefect task after models #52

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion prefect_dbt_flow/dbt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from pathlib import Path
from typing import List, Optional, Union

from prefect import Task


class DbtResourceType(Enum):
"""
Expand Down Expand Up @@ -73,13 +75,16 @@ class DbtDagOptions:
Args:
select: dbt module to include in the run
exclude: dbt module to exclude in the run
run_test_after_model: run test afeter run model
run_test_after_model: If True, run dbt tests after running each model.
run_task_after_model: A Prefect task to run after each model.
Must have exactly one required parameter: model_id (str).
vars: dbt vars
install_deps: install dbt dependencies, default behavior install deps
"""

select: Optional[str] = None
exclude: Optional[str] = None
run_test_after_model: bool = False
run_task_after_model: Optional[Task] = None
vars: Optional[dict[str, str]] = None
install_deps: bool = True
43 changes: 30 additions & 13 deletions prefect_dbt_flow/dbt/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ def generate_tasks_dag(
profile: Optional[DbtProfile],
dag_options: Optional[DbtDagOptions],
dbt_graph: List[DbtNode],
run_test_after_model: bool = False,
) -> None:
"""
Generate a Prefect DAG for running and testing dbt models.
Expand All @@ -224,13 +223,10 @@ def generate_tasks_dag(
profile: A class that represents a dbt profile configuration.
dag_options: A class to add dbt DAG configurations.
dbt_graph: A list of dbt nodes (models) to include in the DAG.
run_test_after_model: If True, run tests after running each model.

Returns:
None
"""

# TODO: refactor this
all_tasks = {
dbt_node.unique_id: RESOURCE_TYPE_TO_TASK[dbt_node.resource_type](
project=project,
Expand All @@ -243,25 +239,46 @@ def generate_tasks_dag(

submitted_tasks: Dict[str, PrefectFuture] = {}
while node := _get_next_node(dbt_graph, list(submitted_tasks.keys())):
# Get dbt dependencies for current run task
run_task = all_tasks[node.unique_id]
task_dependencies = [
submitted_tasks[node_unique_id] for node_unique_id in node.depends_on
submitted_tasks[node_unique_id]
for node_unique_id in node.depends_on
]

run_task_future = run_task.submit(wait_for=task_dependencies)

if run_test_after_model and node.has_tests:
# Future for the run task
future = run_task.submit(wait_for=task_dependencies)

# Check if we should run a dbt test task
should_run_tests = (
dag_options
and dag_options.run_test_after_model
and node.has_tests
)
if should_run_tests:
test_task = _task_dbt_test(
project=project,
profile=profile,
dag_options=dag_options,
dbt_node=node,
)
test_task_future = test_task.submit(wait_for=run_task_future)
# Future for the test task, depending on the run future
future = test_task.submit(wait_for=future)

# Check if we should run a Prefect task
should_run_prefect_task = (
dag_options
and dag_options.run_task_after_model
and node.resource_type is DbtResourceType.MODEL
)
if should_run_prefect_task:
# Future for the Prefect task, depending on the previous future
future = run_task_after_model.submit(
model_id=node.unique_id,
wait_for=future,
)

submitted_tasks[node.unique_id] = test_task_future
else:
submitted_tasks[node.unique_id] = run_task_future
# Submit the current future, which may be a chain of dependencies
submitted_tasks[node.unique_id] = future


def _get_next_node(
Expand Down
1 change: 0 additions & 1 deletion prefect_dbt_flow/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ def dbt_flow():
profile,
dag_options,
dbt_graph,
dag_options.run_test_after_model if dag_options else False,
)

return dbt_flow