Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
courtneyholcomb committed Jan 30, 2025
1 parent 5266f30 commit af6933d
Show file tree
Hide file tree
Showing 23 changed files with 4,216 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -881,3 +881,13 @@ metric:
offset_window: 1 martian_day
alias: bookings_offset
- name: bookings
---
metric:
name: bookings_offset_one_martian_day_then_2_martian_days
description: tests a metric with nested custom offset windows
type: derived
type_params:
expr: bookings_offset_one_martian_day
metrics:
- name: bookings_offset_one_martian_day
offset_window: 2 martian_day
28 changes: 21 additions & 7 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -1570,7 +1570,10 @@ def _build_time_spine_join_node_for_nested_offset(
time_range_constraint: Optional[TimeRangeConstraint],
metric_source_node: DataflowPlanNode,
) -> DataflowPlanNode:
# TODO: nested custom offset window plans
# use_offset_custom_granularity_node = self._should_use_offset_custom_granularity_node(
# join_description=before_aggregation_time_spine_join_description,
# queried_agg_time_dimension_specs=queried_agg_time_dimension_specs,
# )
join_spec = self._sort_by_base_granularity(queried_agg_time_dimension_specs)[0]
time_spine_node = self._build_time_spine_node(
queried_time_spine_specs=queried_agg_time_dimension_specs,
Expand All @@ -1588,6 +1591,7 @@ def _build_time_spine_join_node_for_nested_offset(
)

# TODO: fix bug here where filter specs are being included in when aggregating.
# After that, can this code be combined with basic offset code?
if len(metric_spec.filter_spec_set.all_filter_specs) > 0 or time_range_constraint:
# FilterElementsNode will only be needed if there are where filter specs that were selected in the group by.
specs_in_filters = set(
Expand Down Expand Up @@ -1659,6 +1663,18 @@ def _get_base_grain_for_custom_grain(self, custom_grain: str) -> TimeGranularity
raise ValueError(LazyFormat("Custom grain not found in semantic model.", custom_grain=custom_grain))
return self._semantic_model_lookup.custom_granularities[custom_grain].base_granularity

def _should_use_offset_custom_granularity_node(
self,
join_description: Optional[JoinToTimeSpineDescription],
queried_agg_time_dimension_specs: Sequence[TimeDimensionSpec],
) -> bool:
return bool(
join_description
and join_description.custom_offset_window
and {spec.time_granularity_name for spec in queried_agg_time_dimension_specs}
== {join_description.custom_offset_window.granularity}
)

def _build_aggregated_measure_from_measure_source_node(
self,
metric_input_measure_spec: MetricInputMeasureSpec,
Expand Down Expand Up @@ -1783,11 +1799,9 @@ def _build_aggregated_measure_from_measure_source_node(
)

# If querying an offset metric, join to time spine before aggregation.
use_offset_custom_granularity_node = bool(
before_aggregation_time_spine_join_description
and before_aggregation_time_spine_join_description.custom_offset_window
and {spec.time_granularity_name for spec in queried_agg_time_dimension_specs}
== {before_aggregation_time_spine_join_description.custom_offset_window.granularity}
use_offset_custom_granularity_node = self._should_use_offset_custom_granularity_node(
join_description=before_aggregation_time_spine_join_description,
queried_agg_time_dimension_specs=queried_agg_time_dimension_specs,
)
if before_aggregation_time_spine_join_description and queried_agg_time_dimension_specs:
unaggregated_measure_node = self._build_time_spine_join_node_for_offset(
Expand Down Expand Up @@ -2048,7 +2062,7 @@ def build_custom_offset_time_spine_node(
use_offset_custom_granularity_node: bool,
required_time_spine_sources: Tuple[TimeSpineSource, ...],
) -> DataflowPlanNode:
"""Builds an OffsetByCustomGranularityNode used for custom offset windows."""
"""Builds a time spine node used for custom offset windows."""
custom_time_spine_read_node = self._get_time_spine_read_node_for_custom_grain(offset_window.granularity)
if use_offset_custom_granularity_node:
return OffsetCustomGranularityNode.create(
Expand Down
9 changes: 6 additions & 3 deletions metricflow/dataflow/nodes/join_to_time_spine.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def create( # noqa: D102
offset_to_grain: Optional[TimeGranularity] = None,
) -> JoinToTimeSpineNode:
return JoinToTimeSpineNode(
# Note: parent nodes must be in the order of metric_source_node, time_spine_node
parent_nodes=(metric_source_node, time_spine_node),
metric_source_node=metric_source_node,
time_spine_node=time_spine_node,
Expand Down Expand Up @@ -107,10 +108,12 @@ def functionally_identical(self, other_node: DataflowPlanNode) -> bool: # noqa:
)

def with_new_parents(self, new_parent_nodes: Sequence[DataflowPlanNode]) -> JoinToTimeSpineNode: # noqa: D102
assert len(new_parent_nodes) == 1
assert len(new_parent_nodes) == 2, "JoinToTimeSpineNode must have exactly 2 parent nodes."
# Note: parent nodes remain in the order of metric_source_node, time_spine_node

return JoinToTimeSpineNode.create(
metric_source_node=self.metric_source_node,
time_spine_node=self.time_spine_node,
metric_source_node=new_parent_nodes[0],
time_spine_node=new_parent_nodes[1],
requested_agg_time_dimension_specs=self.requested_agg_time_dimension_specs,
standard_offset_window=self.standard_offset_window,
offset_to_grain=self.offset_to_grain,
Expand Down
18 changes: 14 additions & 4 deletions tests_metricflow/integration/query_output/test_offset_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,15 @@ def test_custom_offset_window_with_base_grain(
sql_client: SqlClient,
it_helpers: IntegrationTestHelpers,
) -> None:
"""Gives a side by side comparison of bookings and bookings_offset_one_martian_day."""
"""Gives a side by side comparison of the normal bookings metric with related custom offset metrics, using base grain."""
query_result = it_helpers.mf_engine.query(
MetricFlowQueryRequest.create_with_random_request_id(
metric_names=["bookings", "bookings_offset_one_martian_day"],
metric_names=[
"bookings",
"bookings_offset_one_martian_day",
"bookings_martian_day_over_martian_day",
"bookings_offset_one_martian_day_then_2_martian_days",
],
group_by_names=["metric_time__day", "metric_time__martian_day"],
order_by_names=["metric_time__day", "metric_time__martian_day"],
)
Expand Down Expand Up @@ -129,10 +134,15 @@ def test_custom_offset_window_with_matching_custom_grain(
sql_client: SqlClient,
it_helpers: IntegrationTestHelpers,
) -> None:
"""Gives a side by side comparison of bookings and bookings_offset_one_martian_day."""
"""Gives a side by side comparison of the normal bookings metric with related custom offset metrics, using matching grain."""
query_result = it_helpers.mf_engine.query(
MetricFlowQueryRequest.create_with_random_request_id(
metric_names=["bookings", "bookings_offset_one_martian_day"],
metric_names=[
"bookings",
"bookings_offset_one_martian_day",
"bookings_martian_day_over_martian_day",
# "bookings_offset_one_martian_day_then_2_martian_days",
],
group_by_names=["booking__ds__martian_day", "metric_time__martian_day"],
order_by_names=["booking__ds__martian_day", "metric_time__martian_day"],
)
Expand Down
155 changes: 150 additions & 5 deletions tests_metricflow/query_rendering/test_custom_granularity.py
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ def test_custom_offset_window_with_granularity_and_date_part( # noqa: D103


@pytest.mark.sql_engine_snapshot
def test_custom_offset_window_with_only_window_grain( # noqa: D103
def test_custom_offset_window_with_where_filter_not_in_group_by( # noqa: D103
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
dataflow_plan_builder: DataflowPlanBuilder,
Expand All @@ -677,7 +677,10 @@ def test_custom_offset_window_with_only_window_grain( # noqa: D103
) -> None:
query_spec = query_parser.parse_and_validate_query(
metric_names=("bookings_offset_one_martian_day",),
group_by_names=("metric_time__martian_day", "booking__ds__martian_day"),
group_by_names=("metric_time__day",),
where_constraints=[
PydanticWhereFilter(where_sql_template=("{{ TimeDimension('metric_time', 'martian_day') }} = '2020-01-01'"))
],
).query_spec

render_and_check(
Expand All @@ -690,9 +693,28 @@ def test_custom_offset_window_with_only_window_grain( # noqa: D103
)


# TODO: add more tests
# - with where filter not included in group by
# - nested custom offset
@pytest.mark.sql_engine_snapshot
def test_custom_offset_window_with_only_window_grain( # noqa: D103
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlPlanConverter,
sql_client: SqlClient,
query_parser: MetricFlowQueryParser,
) -> None:
query_spec = query_parser.parse_and_validate_query(
metric_names=("bookings_offset_one_martian_day",),
group_by_names=("metric_time__martian_day", "booking__ds__martian_day"),
).query_spec

render_and_check(
request=request,
mf_test_configuration=mf_test_configuration,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
dataflow_plan_builder=dataflow_plan_builder,
query_spec=query_spec,
)


@pytest.mark.sql_engine_snapshot
Expand Down Expand Up @@ -765,3 +787,126 @@ def test_custom_offset_window_with_multiple_time_spines( # noqa: D103
dataflow_plan_builder=dataflow_plan_builder,
query_spec=query_spec,
)


@pytest.mark.sql_engine_snapshot
def test_custom_offset_window_with_matching_grain_where_filter_not_in_group_by( # noqa: D103
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlPlanConverter,
sql_client: SqlClient,
query_parser: MetricFlowQueryParser,
) -> None:
query_spec = query_parser.parse_and_validate_query(
metric_names=("bookings_offset_one_martian_day",),
group_by_names=("booking__ds__martian_day",),
where_constraints=[
PydanticWhereFilter(where_sql_template=("{{ TimeDimension('metric_time', 'martian_day') }} = '2020-01-01'"))
],
).query_spec

render_and_check(
request=request,
mf_test_configuration=mf_test_configuration,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
dataflow_plan_builder=dataflow_plan_builder,
query_spec=query_spec,
)


@pytest.mark.sql_engine_snapshot
def test_custom_offset_window_time_over_time( # noqa: D103
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlPlanConverter,
sql_client: SqlClient,
query_parser: MetricFlowQueryParser,
) -> None:
query_spec = query_parser.parse_and_validate_query(
metric_names=("bookings_martian_day_over_martian_day",),
group_by_names=("metric_time__week",),
).query_spec

render_and_check(
request=request,
mf_test_configuration=mf_test_configuration,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
dataflow_plan_builder=dataflow_plan_builder,
query_spec=query_spec,
)


@pytest.mark.sql_engine_snapshot
def test_custom_offset_window_time_over_time_with_matching_grain( # noqa: D103
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlPlanConverter,
sql_client: SqlClient,
query_parser: MetricFlowQueryParser,
) -> None:
query_spec = query_parser.parse_and_validate_query(
metric_names=("bookings_martian_day_over_martian_day",),
group_by_names=("metric_time__martian_day",),
).query_spec

render_and_check(
request=request,
mf_test_configuration=mf_test_configuration,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
dataflow_plan_builder=dataflow_plan_builder,
query_spec=query_spec,
)


@pytest.mark.sql_engine_snapshot
def test_nested_custom_offset_window( # noqa: D103
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlPlanConverter,
sql_client: SqlClient,
query_parser: MetricFlowQueryParser,
) -> None:
query_spec = query_parser.parse_and_validate_query(
metric_names=("bookings_offset_one_martian_day_then_2_martian_days",),
group_by_names=("metric_time__day",),
).query_spec

render_and_check(
request=request,
mf_test_configuration=mf_test_configuration,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
dataflow_plan_builder=dataflow_plan_builder,
query_spec=query_spec,
)


@pytest.mark.sql_engine_snapshot
def test_nested_custom_offset_window_matching_grain( # noqa: D103
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlPlanConverter,
sql_client: SqlClient,
query_parser: MetricFlowQueryParser,
) -> None:
query_spec = query_parser.parse_and_validate_query(
metric_names=("bookings_offset_one_martian_day_then_2_martian_days",),
group_by_names=("metric_time__martian_day",),
).query_spec

render_and_check(
request=request,
mf_test_configuration=mf_test_configuration,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
dataflow_plan_builder=dataflow_plan_builder,
query_spec=query_spec,
)
Loading

0 comments on commit af6933d

Please sign in to comment.