Skip to content

Commit

Permalink
Move batch print eventing methods to MicrobatchBatchRunner
Browse files Browse the repository at this point in the history
  • Loading branch information
QMalcolm committed Feb 24, 2025
1 parent 046e08a commit 86a8439
Showing 1 changed file with 38 additions and 56 deletions.
94 changes: 38 additions & 56 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,62 +378,6 @@ def describe_node(self) -> str:
# TODO: Move to microbatch orchestration runner AND batch runner
return f"{self.node.language} microbatch model {self.get_node_representation()}"

def print_batch_result_line(
self,
result: RunResult,
):
# TODO: Move to batch runner

# TODO: This should go away
if self.batch_idx is None:
return

description = self.describe_batch()
group = group_lookup.get(self.node.unique_id)
if result.status == NodeStatus.Error:
status = result.status
level = EventLevel.ERROR
elif result.status == NodeStatus.Skipped:
status = result.status
level = EventLevel.INFO
else:
status = result.message
level = EventLevel.INFO
fire_event(
LogBatchResult(
description=description,
status=status,
batch_index=self.batch_idx + 1,
total_batches=len(self.batches),
execution_time=result.execution_time,
node_info=self.node.node_info,
group=group,
),
level=level,
)

def print_batch_start_line(self) -> None:
# TODO: Move to batch runner

# TODO: This should go away
if self.batch_idx is None:
return

# TODO: This should go away as the batch start is now guaranteed
batch_start = self.batches[self.batch_idx][0]
if batch_start is None:
return

batch_description = self.describe_batch()
fire_event(
LogStartBatch(
description=batch_description,
batch_index=self.batch_idx + 1,
total_batches=len(self.batches),
node_info=self.node.node_info,
)
)

def before_execute(self) -> None:
# TODO: Split into two method
# The first part of the if statement should move to the microbatch orchestration runner
Expand Down Expand Up @@ -602,6 +546,44 @@ def describe_batch(self) -> str:
)
return f"batch {formatted_batch_start} of {self.get_node_representation()}"

Check warning on line 547 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L547

Added line #L547 was not covered by tests

def print_batch_result_line(self, result: RunResult):
description = self.describe_batch()
group = group_lookup.get(self.node.unique_id)

Check warning on line 551 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L550-L551

Added lines #L550 - L551 were not covered by tests

if result.status == NodeStatus.Error:
status = result.status
level = EventLevel.ERROR
elif result.status == NodeStatus.Skipped:
status = result.status
level = EventLevel.INFO

Check warning on line 558 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L553-L558

Added lines #L553 - L558 were not covered by tests
else:
status = result.message
level = EventLevel.INFO

Check warning on line 561 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L560-L561

Added lines #L560 - L561 were not covered by tests

fire_event(

Check warning on line 563 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L563

Added line #L563 was not covered by tests
LogBatchResult(
description=description,
status=status,
batch_index=self.batch_idx + 1,
total_batches=len(self.batches),
execution_time=result.execution_time,
node_info=self.node.node_info,
group=group,
),
level=level,
)

def print_batch_start_line(self) -> None:
batch_description = self.describe_batch()
fire_event(

Check warning on line 578 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L577-L578

Added lines #L577 - L578 were not covered by tests
LogStartBatch(
description=batch_description,
batch_index=self.batch_idx + 1,
total_batches=len(self.batches),
node_info=self.node.node_info,
)
)

def should_run_in_parallel(self) -> bool:
if not self.adapter.supports(Capability.MicrobatchConcurrency):
run_in_parallel = False
Expand Down

0 comments on commit 86a8439

Please sign in to comment.