Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-vguttha committed Feb 26, 2025
1 parent 7b7808a commit c1954ab
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 16 deletions.
10 changes: 8 additions & 2 deletions core/dbt/clients/jinja.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import jinja2.nodes
import jinja2.parser
import jinja2.sandbox
from opentelemetry import trace

from dbt.contracts.graph.nodes import GenericTestNode
from dbt.exceptions import (
Expand Down Expand Up @@ -57,6 +58,7 @@ def __init__(
super().__init__(macro, context)
self.node = node
self.stack = stack
self.macro_tracer = trace.get_tracer("dbt.runner")

# This adds the macro's unique id to the node's 'depends_on'
@contextmanager
Expand All @@ -78,8 +80,12 @@ def track_call(self):

# this makes MacroGenerator objects callable like functions
def __call__(self, *args, **kwargs):
with self.track_call():
return self.call_macro(*args, **kwargs)
if "run_hooks" == self.get_name() and "span_name" in kwargs and len(*args) > 0:
with self.track_call(), self.macro_tracer.start_as_current_span(kwargs["span_name"]):
return self.call_macro(*args, **kwargs)
else:
with self.track_call():
return self.call_macro(*args, **kwargs)


class UnitTestMacroGenerator(MacroGenerator):
Expand Down
9 changes: 0 additions & 9 deletions core/dbt/context/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
# These modules are added to the context. Consider alternative
# approaches which will extend well to potentially many modules
import pytz
from opentelemetry import trace

import dbt.flags as flags_module
from dbt import tracking, utils
Expand Down Expand Up @@ -87,20 +86,12 @@ def get_itertools_module_context() -> Dict[str, Any]:
return {name: getattr(itertools, name) for name in context_exports}


def get_otel_trace_module_context() -> Dict[str, Dict[str, Any]]:
context_exports = trace.__all__
return {name: getattr(trace, name) for name in context_exports}


def get_context_modules() -> Dict[str, Dict[str, Any]]:
return {
"pytz": get_pytz_module_context(),
"datetime": get_datetime_module_context(),
"re": get_re_module_context(),
"itertools": get_itertools_module_context(),
"opentelemetry": {
"trace": get_otel_trace_module_context(),
},
}


Expand Down
4 changes: 2 additions & 2 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ def __init__(
) -> None:
super().__init__(args, config, manifest)
self.batch_map = batch_map
self._dbt_tracer = trace.get_tracer("com.dbt.runner")
self._dbt_tracer = trace.get_tracer("dbt.runner")

def raise_on_first_error(self) -> bool:
return False
Expand Down Expand Up @@ -1000,7 +1000,7 @@ def before_run(self, adapter: BaseAdapter, selected_uids: AbstractSet[str]) -> R
with adapter.connection_named("master"):
self.defer_to_manifest()
required_schemas = self.get_model_schemas(adapter, selected_uids)
with self._dbt_tracer.start_as_current_span("metadata setup") as _:
with self._dbt_tracer.start_as_current_span("metadata.setup"):
self.create_schemas(adapter, required_schemas)
self.populate_adapter_cache(adapter, required_schemas)
self.populate_microbatch_batches(selected_uids)
Expand Down
5 changes: 2 additions & 3 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def __init__(self, args: Flags, config: RuntimeConfig, manifest: Manifest) -> No
self.run_count: int = 0
self.started_at: float = 0
self._node_span_context_mapping: Dict[str, SpanContext] = {}
self._dbt_tracer = trace.get_tracer("com.dbt.runner")
self._dbt_tracer = trace.get_tracer("dbt.runner")

if self.args.state:
self.previous_state = PreviousState(
Expand Down Expand Up @@ -527,7 +527,7 @@ def populate_adapter_cache(
def before_run(self, adapter: BaseAdapter, selected_uids: AbstractSet[str]) -> RunStatus:
with adapter.connection_named("master"):
self.defer_to_manifest()
with self._dbt_tracer.start_as_current_span("metadata setup") as _:
with self._dbt_tracer.start_as_current_span("metadata.setup"):
self.populate_adapter_cache(adapter)
return RunStatus.Success

Expand Down Expand Up @@ -713,7 +713,6 @@ def create_schema(relation: BaseRelation) -> None:
create_futures = []
# TODO: following has a mypy issue because profile and project config
# defines threads as int and HasThreadingConfig defines it as Optional[int]

with dbt_common.utils.executor(self.config) as tpe: # type: ignore
for req in required_databases:
if req.database is None:
Expand Down

0 comments on commit c1954ab

Please sign in to comment.