Skip to content

Commit

Permalink
Rename base_output_node -> optimized_branch.
Browse files Browse the repository at this point in the history
  • Loading branch information
plypaul committed May 15, 2024
1 parent 055cd72 commit f251ff3
Showing 1 changed file with 12 additions and 12 deletions.
24 changes: 12 additions & 12 deletions metricflow/dataflow/optimizer/source_scan/source_scan_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@

@dataclass(frozen=True)
class OptimizeBranchResult: # noqa: D101
base_output_node: Optional[DataflowPlanNode] = None
optimized_branch: Optional[DataflowPlanNode] = None
sink_node: Optional[DataflowPlanNode] = None

@property
def checked_base_output(self) -> DataflowPlanNode: # noqa: D102
assert self.base_output_node, f"Expected the result of traversal to produce a {DataflowPlanNode}"
return self.base_output_node
assert self.optimized_branch, f"Expected the result of traversal to produce a {DataflowPlanNode}"
return self.optimized_branch

@property
def checked_sink_node(self) -> DataflowPlanNode: # noqa: D102
Expand Down Expand Up @@ -133,7 +133,7 @@ def _default_base_output_handler(
)
# Parents should always be DataflowPlanNode
return OptimizeBranchResult(
base_output_node=node.with_new_parents(tuple(x.checked_base_output for x in optimized_parents))
optimized_branch=node.with_new_parents(tuple(x.checked_base_output for x in optimized_parents))
)

def _default_sink_node_handler(
Expand Down Expand Up @@ -164,17 +164,17 @@ def visit_compute_metrics_node(self, node: ComputeMetricsNode) -> OptimizeBranch
self._log_visit_node_type(node)
# Run the optimizer on the parent branch to handle derived metrics, which are defined recursively in the DAG.
optimized_parent_result: OptimizeBranchResult = node.parent_node.accept(self)
if optimized_parent_result.base_output_node is not None:
if optimized_parent_result.optimized_branch is not None:
return OptimizeBranchResult(
base_output_node=ComputeMetricsNode(
parent_node=optimized_parent_result.base_output_node,
optimized_branch=ComputeMetricsNode(
parent_node=optimized_parent_result.optimized_branch,
metric_specs=node.metric_specs,
for_group_by_source_node=node.for_group_by_source_node,
aggregated_to_elements=node.aggregated_to_elements,
)
)

return OptimizeBranchResult(base_output_node=node)
return OptimizeBranchResult(optimized_branch=node)

def visit_order_by_limit_node(self, node: OrderByLimitNode) -> OptimizeBranchResult: # noqa: D102
self._log_visit_node_type(node)
Expand Down Expand Up @@ -255,9 +255,9 @@ def visit_combine_aggregated_outputs_node( # noqa: D102
)

assert (
result.base_output_node is not None
result.optimized_branch is not None
), f"Traversing the parents of a CombineAggregatedOutputsNode should always produce a DataflowPlanNode. Got: {result}"
optimized_parent_branches.append(result.base_output_node)
optimized_parent_branches.append(result.optimized_branch)

# Try to combine (using ComputeMetricsBranchCombiner) as many parent branches as possible in a
# greedy N^2 approach. The optimality of this approach needs more thought to prove conclusively, but given
Expand Down Expand Up @@ -288,10 +288,10 @@ def visit_combine_aggregated_outputs_node( # noqa: D102
# If we were able to reduce the parent branches of the CombineAggregatedOutputsNode into a single one, there's no need
# for a CombineAggregatedOutputsNode.
if len(combined_parent_branches) == 1:
return OptimizeBranchResult(base_output_node=combined_parent_branches[0])
return OptimizeBranchResult(optimized_branch=combined_parent_branches[0])

return OptimizeBranchResult(
base_output_node=CombineAggregatedOutputsNode(parent_nodes=combined_parent_branches)
optimized_branch=CombineAggregatedOutputsNode(parent_nodes=combined_parent_branches)
)

def visit_constrain_time_range_node(self, node: ConstrainTimeRangeNode) -> OptimizeBranchResult: # noqa: D102
Expand Down

0 comments on commit f251ff3

Please sign in to comment.