Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple time spines in one query #1644

Merged
merged 4 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20250127-203427.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Support for multiple time spines in one query.
time: 2025-01-27T20:34:27.68611-08:00
custom:
Author: courtneyholcomb
Issue: "1644"
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import logging
from dataclasses import dataclass
from typing import Dict, Optional, Sequence
from typing import Dict, Optional, Sequence, Tuple

from dbt_semantic_interfaces.implementations.time_spine import PydanticTimeSpineCustomGranularityColumn
from dbt_semantic_interfaces.protocols import SemanticManifest
Expand Down Expand Up @@ -99,10 +99,10 @@ def build_custom_granularities(time_spine_sources: Sequence[TimeSpineSource]) ->
}

@staticmethod
def choose_time_spine_source(
def choose_time_spine_sources(
required_time_spine_specs: Sequence[TimeDimensionSpec],
time_spine_sources: Dict[TimeGranularity, TimeSpineSource],
) -> TimeSpineSource:
) -> Tuple[TimeSpineSource, ...]:
"""Determine which time spine sources to use to satisfy the given specs.

Custom grains can only use the time spine where they are defined. For standard grains, this will choose the time
Expand Down Expand Up @@ -147,15 +147,14 @@ def choose_time_spine_source(
if not required_time_spines.intersection(set(compatible_time_spines_for_standard_grains.values())):
required_time_spines.add(time_spine_sources[max(compatible_time_spines_for_standard_grains)])

if len(required_time_spines) != 1:
raise RuntimeError(
"Multiple time spines are required to satisfy the specs, but only one is supported per query currently. "
f"Multiple will be supported in the future. Time spines required: {required_time_spines}."
)

return required_time_spines.pop()
return tuple(sorted(required_time_spines, key=lambda x: x.base_granularity.to_int()))

@property
def data_set_description(self) -> str:
"""Description to be displayed when this time spine is used in a data set."""
return f"Read From Time Spine '{self.table_name}'"

@property
def custom_grain_names(self) -> Sequence[str]:
"""Names of custom grains defined in this time spine."""
return tuple(custom_granularity.name for custom_granularity in self.custom_granularities)
61 changes: 41 additions & 20 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -1034,9 +1034,11 @@ def _find_source_node_recipe_non_cached(
)
# If metric_time is requested without metrics, choose appropriate time spine node to select those values from.
if linkable_specs_to_satisfy.metric_time_specs:
time_spine_node = self._choose_time_spine_metric_time_node(linkable_specs_to_satisfy.metric_time_specs)
candidate_nodes_for_right_side_of_join += [time_spine_node]
candidate_nodes_for_left_side_of_join += [time_spine_node]
time_spine_nodes = self._choose_time_spine_metric_time_nodes(
linkable_specs_to_satisfy.metric_time_specs
)
candidate_nodes_for_right_side_of_join += list(time_spine_nodes)
candidate_nodes_for_left_side_of_join += list(time_spine_nodes)
default_join_type = SqlJoinType.FULL_OUTER

logger.debug(
Expand Down Expand Up @@ -1929,19 +1931,24 @@ def _build_semi_additive_join_node(
queried_time_dimension_spec=queried_time_dimension_spec,
)

def _choose_time_spine_source(self, required_time_spine_specs: Sequence[TimeDimensionSpec]) -> TimeSpineSource:
def _choose_time_spine_sources(
self, required_time_spine_specs: Sequence[TimeDimensionSpec]
) -> Tuple[TimeSpineSource, ...]:
"""Choose the time spine source that can satisfy the required time spine specs."""
return TimeSpineSource.choose_time_spine_source(
return TimeSpineSource.choose_time_spine_sources(
required_time_spine_specs=required_time_spine_specs,
time_spine_sources=self._source_node_builder.time_spine_sources,
)

def _choose_time_spine_metric_time_node(
def _choose_time_spine_metric_time_nodes(
self, required_time_spine_specs: Sequence[TimeDimensionSpec]
) -> MetricTimeDimensionTransformNode:
) -> Tuple[MetricTimeDimensionTransformNode, ...]:
"""Return the MetricTimeDimensionTransform time spine node needed to satisfy the specs."""
time_spine_source = self._choose_time_spine_source(required_time_spine_specs)
return self._source_node_set.time_spine_metric_time_nodes[time_spine_source.base_granularity]
time_spine_sources = self._choose_time_spine_sources(required_time_spine_specs)
return tuple(
self._source_node_set.time_spine_metric_time_nodes[time_spine_source.base_granularity]
for time_spine_source in time_spine_sources
)

def _choose_time_spine_read_node(self, time_spine_source: TimeSpineSource) -> ReadSqlSourceNode:
"""Return the MetricTimeDimensionTransform time spine node needed to satisfy the specs."""
Expand Down Expand Up @@ -1969,6 +1976,7 @@ def _build_time_spine_node(
required_specs += (time_dimension_spec,)

should_dedupe = False
custom_grain_specs_to_join: Tuple[TimeDimensionSpec, ...] = ()
if custom_offset_window:
time_spine_node = self.build_custom_offset_time_spine_node(
offset_window=custom_offset_window,
Expand All @@ -1979,13 +1987,24 @@ def _build_time_spine_node(
time_spine_node
).instance_set.spec_set.time_dimension_specs
else:
# For simpler time spine queries, choose the appropriate time spine node and apply requested aliases.
time_spine_source = self._choose_time_spine_source(required_specs)
# TODO: support multiple time spines here. Build node on the one with the smallest base grain.
# Then, pass custom_granularity_specs into _build_pre_aggregation_plan if they aren't satisfied by smallest time spine.
read_node = self._choose_time_spine_read_node(time_spine_source)
time_spine_data_set = self._node_data_set_resolver.get_output_data_set(read_node)
time_spine_sources = self._choose_time_spine_sources(required_specs)
smallest_time_spine_source = time_spine_sources[0] # these are already sorted by base grain
read_node = self._choose_time_spine_read_node(smallest_time_spine_source)

# If any custom grains cannot be satisfied by the time spine read node, they will need to be joined later.
updated_required_specs: Tuple[TimeDimensionSpec, ...] = ()
for spec in required_specs:
if (
spec.has_custom_grain
and spec.time_granularity_name not in smallest_time_spine_source.custom_grain_names
):
custom_grain_specs_to_join += (spec,)
updated_required_specs += (spec.with_base_grain(),)
else:
updated_required_specs += (spec,)

# Change the column aliases to match the specs that were requested in the query.
time_spine_data_set = self._node_data_set_resolver.get_output_data_set(read_node)
time_spine_node = AliasSpecsNode.create(
parent_node=read_node,
change_specs=tuple(
Expand All @@ -1995,27 +2014,29 @@ def _build_time_spine_node(
).spec,
output_spec=required_spec,
)
for required_spec in required_specs
for required_spec in updated_required_specs
),
)
# If the base grain of the time spine isn't selected, it will have duplicate rows that need deduping.
should_dedupe = ExpandedTimeGranularity.from_time_granularity(time_spine_source.base_granularity) not in {
spec.time_granularity for spec in filter_to_specs
}
should_dedupe = ExpandedTimeGranularity.from_time_granularity(
smallest_time_spine_source.base_granularity
) not in {spec.time_granularity for spec in filter_to_specs}

return self._build_pre_aggregation_plan(
source_node=time_spine_node,
filter_to_specs=InstanceSpecSet(time_dimension_specs=filter_to_specs),
time_range_constraint=time_range_constraint,
where_filter_specs=where_filter_specs,
custom_granularity_specs=custom_grain_specs_to_join,
distinct=should_dedupe,
)

def _get_time_spine_read_node_for_custom_grain(self, custom_grain: str) -> ReadSqlSourceNode:
"""Return the read node for the custom grain."""
time_spine_source = self._choose_time_spine_source(
time_spine_sources = self._choose_time_spine_sources(
(DataSet.metric_time_dimension_spec(self._semantic_model_lookup.custom_granularities[custom_grain]),)
)
time_spine_source = time_spine_sources[0]
return self._choose_time_spine_read_node(time_spine_source)

def build_custom_offset_time_spine_node(
Expand Down
15 changes: 13 additions & 2 deletions metricflow/plan_conversion/to_sql_plan/dataflow_to_subquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
TimeDimensionInstance,
group_instances_by_type,
)
from metricflow_semantics.mf_logging.lazy_formattable import LazyFormat
from metricflow_semantics.mf_logging.runtime import log_block_runtime
from metricflow_semantics.model.semantic_manifest_lookup import SemanticManifestLookup
from metricflow_semantics.specs.column_assoc import ColumnAssociationResolver
Expand Down Expand Up @@ -192,7 +193,7 @@ def copy(self) -> DataflowNodeToSqlSubqueryVisitor:
_node_to_output_data_set=dict(self._node_to_output_data_set),
)

# TODO: replace this with a dataflow plan node for cumulative metrics
# TODO: replace this with a dataflow plan node for cumulative metrics - SL-3324
def _make_time_spine_data_set(
self,
agg_time_dimension_instances: Tuple[TimeDimensionInstance, ...],
Expand All @@ -215,9 +216,19 @@ def _make_time_spine_data_set(
]
required_specs = queried_specs + specs_required_for_where_constraints

time_spine_source = TimeSpineSource.choose_time_spine_source(
time_spine_sources = TimeSpineSource.choose_time_spine_sources(
required_time_spine_specs=required_specs, time_spine_sources=self._time_spine_sources
)
if len(time_spine_sources) != 1:
raise RuntimeError(
str(
LazyFormat(
"Unexpected number of time spine sources required for query - cumulative metrics should require exactly one.",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Custom grains are joined later for cumulative metrics, so this should always be true.

time_spine_sources=time_spine_sources,
)
)
)
time_spine_source = time_spine_sources[0]
time_spine_base_granularity = ExpandedTimeGranularity.from_time_granularity(time_spine_source.base_granularity)

base_column_expr = SqlColumnReferenceExpression.from_column_reference(
Expand Down
26 changes: 26 additions & 0 deletions tests_metricflow/integration/query_output/test_query_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,29 @@ def test_scd_filter_without_metric_time( # noqa: D103
snapshot_str=query_result.result_df.text_format(),
sql_engine=sql_client.sql_engine_type,
)


@pytest.mark.sql_engine_snapshot
@pytest.mark.duckdb_only
def test_multiple_time_spines( # noqa: D103
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
sql_client: SqlClient,
it_helpers: IntegrationTestHelpers,
) -> None:
query_result = it_helpers.mf_engine.query(
MetricFlowQueryRequest.create_with_random_request_id(
metric_names=["subdaily_join_to_time_spine_metric", "subdaily_cumulative_window_metric"],
group_by_names=["metric_time__martian_day", "metric_time__hour"],
order_by_names=["metric_time__martian_day", "metric_time__hour"],
)
)
assert query_result.result_df is not None, "Unexpected empty result."

assert_str_snapshot_equal(
request=request,
mf_test_configuration=mf_test_configuration,
snapshot_id="query_output",
snapshot_str=query_result.result_df.text_format(),
sql_engine=sql_client.sql_engine_type,
)
48 changes: 48 additions & 0 deletions tests_metricflow/query_rendering/test_custom_granularity.py
Original file line number Diff line number Diff line change
Expand Up @@ -693,3 +693,51 @@ def test_custom_offset_window_with_only_window_grain( # noqa: D103
# TODO: add more tests
# - with where filter not included in group by
# - nested custom offset


@pytest.mark.sql_engine_snapshot
def test_multiple_time_spines_in_query_for_join_to_time_spine_metric( # noqa: D103
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlPlanConverter,
sql_client: SqlClient,
query_parser: MetricFlowQueryParser,
) -> None:
query_spec = query_parser.parse_and_validate_query(
metric_names=("subdaily_join_to_time_spine_metric",),
group_by_names=("metric_time__martian_day", "metric_time__hour"),
).query_spec

render_and_check(
request=request,
mf_test_configuration=mf_test_configuration,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
dataflow_plan_builder=dataflow_plan_builder,
query_spec=query_spec,
)


@pytest.mark.sql_engine_snapshot
def test_multiple_time_spines_in_query_for_cumulative_metric( # noqa: D103
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlPlanConverter,
sql_client: SqlClient,
query_parser: MetricFlowQueryParser,
) -> None:
query_spec = query_parser.parse_and_validate_query(
metric_names=("subdaily_cumulative_window_metric",),
group_by_names=("metric_time__martian_day", "metric_time__hour"),
).query_spec

render_and_check(
request=request,
mf_test_configuration=mf_test_configuration,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
dataflow_plan_builder=dataflow_plan_builder,
query_spec=query_spec,
)
Loading