Skip to content

Commit

Permalink
DBT model instrumentation
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-vguttha committed Feb 3, 2025
1 parent fdabe95 commit 75ab71e
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 4 deletions.
28 changes: 24 additions & 4 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
from pathlib import Path
from typing import AbstractSet, Dict, Iterable, List, Optional, Set, Tuple, Type, Union

from opentelemetry import context, trace
from opentelemetry.trace import SpanContext, StatusCode

import dbt.exceptions
import dbt.tracking
import dbt.utils
Expand Down Expand Up @@ -91,6 +94,7 @@ def __init__(self, args: Flags, config: RuntimeConfig, manifest: Manifest) -> No
self.previous_defer_state: Optional[PreviousState] = None
self.run_count: int = 0
self.started_at: float = 0
self._node_span_context_mapping: Dict[str, SpanContext] = {}

if self.args.state:
self.previous_state = PreviousState(
Expand Down Expand Up @@ -223,13 +227,24 @@ def get_runner(self, node) -> BaseRunner:
return cls(self.config, adapter, node, run_count, num_nodes)

def call_runner(self, runner: BaseRunner) -> RunResult:
with log_contextvars(node_info=runner.node.node_info):
tracer = trace.get_tracer("dbt-runner")
node_info = runner.node.node_info
model_span = tracer.start_span(node_info["unique_id"])
ctx = trace.set_span_in_context(model_span)
token = context.attach(ctx)
self._node_span_context_mapping[node_info["unique_id"]] = model_span.get_span_context()
for parent_node in runner.node.depends_on.nodes:
if parent_node in self._node_span_context_mapping:
model_span.add_link(
self._node_span_context_mapping[parent_node], {"model_name": parent_node}
)
with log_contextvars(node_info=node_info):
runner.node.update_event_status(
started_at=datetime.utcnow().isoformat(), node_status=RunningStatus.Started
)
fire_event(
NodeStart(
node_info=runner.node.node_info,
node_info=node_info,
)
)
try:
Expand All @@ -242,10 +257,12 @@ def call_runner(self, runner: BaseRunner) -> RunResult:
result = None
thread_exception = e
finally:
context.detach(token)
model_span.end()
if result is not None:
fire_event(
NodeFinished(
node_info=runner.node.node_info,
node_info=node_info,
run_result=result.to_msg_dict(),
)
)
Expand All @@ -256,7 +273,7 @@ def call_runner(self, runner: BaseRunner) -> RunResult:
GenericExceptionOnRun(
unique_id=runner.node.unique_id,
exc=str(thread_exception),
node_info=runner.node.node_info,
node_info=node_info,
)
)

Expand All @@ -278,6 +295,9 @@ def call_runner(self, runner: BaseRunner) -> RunResult:

fail_fast = get_flags().FAIL_FAST

if result.status in (NodeStatus.Error, NodeStatus.Fail, NodeStatus.PartialSuccess):
model_span.set_status(StatusCode.ERROR)

if (
result.status in (NodeStatus.Error, NodeStatus.Fail, NodeStatus.PartialSuccess)
and fail_fast
Expand Down
1 change: 1 addition & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@ types-pytz
types-requests
types-setuptools
mocker
opentelemetry-api

0 comments on commit 75ab71e

Please sign in to comment.