From 75ab71e1745aa4cedf6d2ae5012df52a6441ee59 Mon Sep 17 00:00:00 2001 From: Vengal Rao Guttha Date: Mon, 3 Feb 2025 16:31:31 +0530 Subject: [PATCH] DBT model instrumentation --- core/dbt/task/runnable.py | 28 ++++++++++++++++++++++++---- dev-requirements.txt | 1 + 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index bbab544d0c9..b98bd08a179 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -7,6 +7,9 @@ from pathlib import Path from typing import AbstractSet, Dict, Iterable, List, Optional, Set, Tuple, Type, Union +from opentelemetry import context, trace +from opentelemetry.trace import SpanContext, StatusCode + import dbt.exceptions import dbt.tracking import dbt.utils @@ -91,6 +94,7 @@ def __init__(self, args: Flags, config: RuntimeConfig, manifest: Manifest) -> No self.previous_defer_state: Optional[PreviousState] = None self.run_count: int = 0 self.started_at: float = 0 + self._node_span_context_mapping: Dict[str, SpanContext] = {} if self.args.state: self.previous_state = PreviousState( @@ -223,13 +227,24 @@ def get_runner(self, node) -> BaseRunner: return cls(self.config, adapter, node, run_count, num_nodes) def call_runner(self, runner: BaseRunner) -> RunResult: - with log_contextvars(node_info=runner.node.node_info): + tracer = trace.get_tracer("dbt-runner") + node_info = runner.node.node_info + model_span = tracer.start_span(node_info["unique_id"]) + ctx = trace.set_span_in_context(model_span) + token = context.attach(ctx) + self._node_span_context_mapping[node_info["unique_id"]] = model_span.get_span_context() + for parent_node in runner.node.depends_on.nodes: + if parent_node in self._node_span_context_mapping: + model_span.add_link( + self._node_span_context_mapping[parent_node], {"model_name": parent_node} + ) + with log_contextvars(node_info=node_info): runner.node.update_event_status( started_at=datetime.utcnow().isoformat(), node_status=RunningStatus.Started ) fire_event( NodeStart( - node_info=runner.node.node_info, + node_info=node_info, ) ) try: @@ -242,10 +257,12 @@ def call_runner(self, runner: BaseRunner) -> RunResult: result = None thread_exception = e finally: + context.detach(token) + model_span.end() if result is not None: fire_event( NodeFinished( - node_info=runner.node.node_info, + node_info=node_info, run_result=result.to_msg_dict(), ) ) @@ -256,7 +273,7 @@ def call_runner(self, runner: BaseRunner) -> RunResult: GenericExceptionOnRun( unique_id=runner.node.unique_id, exc=str(thread_exception), - node_info=runner.node.node_info, + node_info=node_info, ) ) @@ -278,6 +295,9 @@ def call_runner(self, runner: BaseRunner) -> RunResult: fail_fast = get_flags().FAIL_FAST + if result.status in (NodeStatus.Error, NodeStatus.Fail, NodeStatus.PartialSuccess): + model_span.set_status(StatusCode.ERROR) + if ( result.status in (NodeStatus.Error, NodeStatus.Fail, NodeStatus.PartialSuccess) and fail_fast diff --git a/dev-requirements.txt b/dev-requirements.txt index 0f5f6cd8a7f..42a276efcd0 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -38,3 +38,4 @@ types-pytz types-requests types-setuptools mocker +opentelemetry-api