From 0973653890cc70ded404e2be58e5ce15aca69a3b Mon Sep 17 00:00:00 2001 From: niklasvm Date: Thu, 12 Dec 2024 09:46:35 +0200 Subject: [PATCH 1/4] Implement date_partition_column Signed-off-by: niklasvm --- .../contrib/spark_offline_store/spark.py | 10 ++++++++++ .../contrib/spark_offline_store/spark_source.py | 4 ++++ 2 files changed, 14 insertions(+) diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index aeb9e3cd68b..3ea970e323f 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -99,6 +99,8 @@ def pull_latest_from_table_or_query( fields_as_string = ", ".join(fields_with_aliases) aliases_as_string = ", ".join(aliases) + date_partition_column = data_source.date_partition_column + start_date_str = _format_datetime(start_date) end_date_str = _format_datetime(end_date) query = f""" @@ -110,6 +112,7 @@ def pull_latest_from_table_or_query( ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS feast_row_ FROM {from_expression} t1 WHERE {timestamp_field} BETWEEN TIMESTAMP('{start_date_str}') AND TIMESTAMP('{end_date_str}') + {"AND "+date_partition_column+" >= '"+start_date.strftime('%Y-%m-%d')+"' AND "+date_partition_column+" <= '"+end_date.strftime('%Y-%m-%d')+"' " if date_partition_column != "" and date_partition_column is not None else ''} ) t2 WHERE feast_row_ = 1 """ @@ -641,8 +644,15 @@ def _cast_data_frame( {% endfor %} FROM {{ featureview.table_subquery }} WHERE {{ featureview.timestamp_field }} <= '{{ featureview.max_event_timestamp }}' + {% if featureview.date_partition_column != "" and featureview.date_partition_column is not none %} + AND {{ featureview.date_partition_column }} <= '{{ featureview.max_event_timestamp[:10] }}' + {% endif %} + {% if featureview.ttl == 0 %}{% else %} AND {{ featureview.timestamp_field }} >= '{{ featureview.min_event_timestamp }}' + {% if featureview.date_partition_column != "" and featureview.date_partition_column is not none %} + AND {{ featureview.date_partition_column }} >= '{{ featureview.min_event_timestamp[:10] }}' + {% endif %} {% endif %} ), diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py index 209e3b87e8b..37af91e9a17 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py @@ -45,6 +45,7 @@ def __init__( tags: Optional[Dict[str, str]] = None, owner: Optional[str] = "", timestamp_field: Optional[str] = None, + date_partition_column: Optional[str] = None, ): """Creates a SparkSource object. @@ -77,6 +78,7 @@ def __init__( created_timestamp_column=created_timestamp_column, field_mapping=field_mapping, description=description, + date_partition_column=date_partition_column, tags=tags, owner=owner, ) @@ -135,6 +137,7 @@ def from_proto(data_source: DataSourceProto) -> Any: query=spark_options.query, path=spark_options.path, file_format=spark_options.file_format, + date_partition_column=data_source.date_partition_column, timestamp_field=data_source.timestamp_field, created_timestamp_column=data_source.created_timestamp_column, description=data_source.description, @@ -148,6 +151,7 @@ def to_proto(self) -> DataSourceProto: type=DataSourceProto.BATCH_SPARK, data_source_class_type="feast.infra.offline_stores.contrib.spark_offline_store.spark_source.SparkSource", field_mapping=self.field_mapping, + date_partition_column=self.date_partition_column, spark_options=self.spark_options.to_proto(), description=self.description, tags=self.tags, From 4d3a5dececd591a7b74a6e1c663fa77112d2292e Mon Sep 17 00:00:00 2001 From: niklasvm Date: Thu, 12 Dec 2024 10:53:54 +0200 Subject: [PATCH 2/4] fix unit tests Signed-off-by: niklasvm --- .../infra/offline_stores/contrib/spark_offline_store/spark.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index 3ea970e323f..2fdbffb1ecb 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -111,8 +111,7 @@ def pull_latest_from_table_or_query( SELECT {fields_as_string}, ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS feast_row_ FROM {from_expression} t1 - WHERE {timestamp_field} BETWEEN TIMESTAMP('{start_date_str}') AND TIMESTAMP('{end_date_str}') - {"AND "+date_partition_column+" >= '"+start_date.strftime('%Y-%m-%d')+"' AND "+date_partition_column+" <= '"+end_date.strftime('%Y-%m-%d')+"' " if date_partition_column != "" and date_partition_column is not None else ''} + WHERE {timestamp_field} BETWEEN TIMESTAMP('{start_date_str}') AND TIMESTAMP('{end_date_str}'){"AND "+date_partition_column+" >= '"+start_date.strftime('%Y-%m-%d')+"' AND "+date_partition_column+" <= '"+end_date.strftime('%Y-%m-%d')+"' " if date_partition_column != "" and date_partition_column is not None else ''} ) t2 WHERE feast_row_ = 1 """ From 434af7347d850bd4322911e149527e8d6b77dea8 Mon Sep 17 00:00:00 2001 From: niklasvm Date: Thu, 12 Dec 2024 11:13:01 +0200 Subject: [PATCH 3/4] add arg to docstring Signed-off-by: niklasvm --- .../offline_stores/contrib/spark_offline_store/spark_source.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py index 37af91e9a17..7ad331239ff 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py @@ -65,6 +65,8 @@ def __init__( maintainer. timestamp_field: Event timestamp field used for point-in-time joins of feature values. + date_partition_column: The column to partition the data on for faster + retrieval. This is useful for large tables and will limit the number ofi """ # If no name, use the table as the default name. if name is None and table is None: From 1b046e365993d5623f5f795ecbedbeaad88d6d14 Mon Sep 17 00:00:00 2001 From: niklasvm Date: Thu, 12 Dec 2024 11:31:05 +0200 Subject: [PATCH 4/4] add unit tests that include a date_partition_column set Signed-off-by: niklasvm --- .../contrib/spark_offline_store/spark.py | 2 +- .../contrib/spark_offline_store/test_spark.py | 121 ++++++++++++++++++ 2 files changed, 122 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index 2fdbffb1ecb..4b501886327 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -111,7 +111,7 @@ def pull_latest_from_table_or_query( SELECT {fields_as_string}, ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS feast_row_ FROM {from_expression} t1 - WHERE {timestamp_field} BETWEEN TIMESTAMP('{start_date_str}') AND TIMESTAMP('{end_date_str}'){"AND "+date_partition_column+" >= '"+start_date.strftime('%Y-%m-%d')+"' AND "+date_partition_column+" <= '"+end_date.strftime('%Y-%m-%d')+"' " if date_partition_column != "" and date_partition_column is not None else ''} + WHERE {timestamp_field} BETWEEN TIMESTAMP('{start_date_str}') AND TIMESTAMP('{end_date_str}'){" AND "+date_partition_column+" >= '"+start_date.strftime('%Y-%m-%d')+"' AND "+date_partition_column+" <= '"+end_date.strftime('%Y-%m-%d')+"' " if date_partition_column != "" and date_partition_column is not None else ''} ) t2 WHERE feast_row_ = 1 """ diff --git a/sdk/python/tests/unit/infra/offline_stores/contrib/spark_offline_store/test_spark.py b/sdk/python/tests/unit/infra/offline_stores/contrib/spark_offline_store/test_spark.py index b8f8cc42474..307ba4058c1 100644 --- a/sdk/python/tests/unit/infra/offline_stores/contrib/spark_offline_store/test_spark.py +++ b/sdk/python/tests/unit/infra/offline_stores/contrib/spark_offline_store/test_spark.py @@ -71,6 +71,68 @@ def test_pull_latest_from_table_with_nested_timestamp_or_query(mock_get_spark_se assert retrieval_job.query.strip() == expected_query.strip() +@patch( + "feast.infra.offline_stores.contrib.spark_offline_store.spark.get_spark_session_or_start_new_with_repoconfig" +) +def test_pull_latest_from_table_with_nested_timestamp_or_query_and_date_partition_column_set( + mock_get_spark_session, +): + mock_spark_session = MagicMock() + mock_get_spark_session.return_value = mock_spark_session + + test_repo_config = RepoConfig( + project="test_project", + registry="test_registry", + provider="local", + offline_store=SparkOfflineStoreConfig(type="spark"), + ) + + test_data_source = SparkSource( + name="test_nested_batch_source", + description="test_nested_batch_source", + table="offline_store_database_name.offline_store_table_name", + timestamp_field="nested_timestamp", + field_mapping={ + "event_header.event_published_datetime_utc": "nested_timestamp", + }, + date_partition_column="effective_date", + ) + + # Define the parameters for the method + join_key_columns = ["key1", "key2"] + feature_name_columns = ["feature1", "feature2"] + timestamp_field = "event_header.event_published_datetime_utc" + created_timestamp_column = "created_timestamp" + start_date = datetime(2021, 1, 1) + end_date = datetime(2021, 1, 2) + + # Call the method + retrieval_job = SparkOfflineStore.pull_latest_from_table_or_query( + config=test_repo_config, + data_source=test_data_source, + join_key_columns=join_key_columns, + feature_name_columns=feature_name_columns, + timestamp_field=timestamp_field, + created_timestamp_column=created_timestamp_column, + start_date=start_date, + end_date=end_date, + ) + + expected_query = """SELECT + key1, key2, feature1, feature2, nested_timestamp, created_timestamp + + FROM ( + SELECT key1, key2, feature1, feature2, event_header.event_published_datetime_utc AS nested_timestamp, created_timestamp, + ROW_NUMBER() OVER(PARTITION BY key1, key2 ORDER BY event_header.event_published_datetime_utc DESC, created_timestamp DESC) AS feast_row_ + FROM `offline_store_database_name`.`offline_store_table_name` t1 + WHERE event_header.event_published_datetime_utc BETWEEN TIMESTAMP('2021-01-01 00:00:00.000000') AND TIMESTAMP('2021-01-02 00:00:00.000000') AND effective_date >= '2021-01-01' AND effective_date <= '2021-01-02' + ) t2 + WHERE feast_row_ = 1""" # noqa: W293, W291 + + assert isinstance(retrieval_job, RetrievalJob) + assert retrieval_job.query.strip() == expected_query.strip() + + @patch( "feast.infra.offline_stores.contrib.spark_offline_store.spark.get_spark_session_or_start_new_with_repoconfig" ) @@ -127,3 +189,62 @@ def test_pull_latest_from_table_without_nested_timestamp_or_query( assert isinstance(retrieval_job, RetrievalJob) assert retrieval_job.query.strip() == expected_query.strip() + + +@patch( + "feast.infra.offline_stores.contrib.spark_offline_store.spark.get_spark_session_or_start_new_with_repoconfig" +) +def test_pull_latest_from_table_without_nested_timestamp_or_query_and_date_partition_column_set( + mock_get_spark_session, +): + mock_spark_session = MagicMock() + mock_get_spark_session.return_value = mock_spark_session + + test_repo_config = RepoConfig( + project="test_project", + registry="test_registry", + provider="local", + offline_store=SparkOfflineStoreConfig(type="spark"), + ) + + test_data_source = SparkSource( + name="test_batch_source", + description="test_nested_batch_source", + table="offline_store_database_name.offline_store_table_name", + timestamp_field="event_published_datetime_utc", + date_partition_column="effective_date", + ) + + # Define the parameters for the method + join_key_columns = ["key1", "key2"] + feature_name_columns = ["feature1", "feature2"] + timestamp_field = "event_published_datetime_utc" + created_timestamp_column = "created_timestamp" + start_date = datetime(2021, 1, 1) + end_date = datetime(2021, 1, 2) + + # Call the method + retrieval_job = SparkOfflineStore.pull_latest_from_table_or_query( + config=test_repo_config, + data_source=test_data_source, + join_key_columns=join_key_columns, + feature_name_columns=feature_name_columns, + timestamp_field=timestamp_field, + created_timestamp_column=created_timestamp_column, + start_date=start_date, + end_date=end_date, + ) + + expected_query = """SELECT + key1, key2, feature1, feature2, event_published_datetime_utc, created_timestamp + + FROM ( + SELECT key1, key2, feature1, feature2, event_published_datetime_utc, created_timestamp, + ROW_NUMBER() OVER(PARTITION BY key1, key2 ORDER BY event_published_datetime_utc DESC, created_timestamp DESC) AS feast_row_ + FROM `offline_store_database_name`.`offline_store_table_name` t1 + WHERE event_published_datetime_utc BETWEEN TIMESTAMP('2021-01-01 00:00:00.000000') AND TIMESTAMP('2021-01-02 00:00:00.000000') AND effective_date >= '2021-01-01' AND effective_date <= '2021-01-02' + ) t2 + WHERE feast_row_ = 1""" # noqa: W293, W291 + + assert isinstance(retrieval_job, RetrievalJob) + assert retrieval_job.query.strip() == expected_query.strip()