Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix dup code #373

Merged
merged 2 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions butterfree/load/writers/historical_feature_store_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,15 @@ class HistoricalFeatureStoreWriter(Writer):
improve queries performance. The data is stored in partition folders in AWS S3
based on time (per year, month and day).

>>> spark_client = SparkClient()
>>> writer = HistoricalFeatureStoreWriter()
>>> writer.write(feature_set=feature_set,
... dataframe=dataframe,
... spark_client=spark_client
... merge_on=["id", "timestamp"])

This procedure will skip dataframe write and will activate Delta Merge.
Use it when the table already exist.
"""

PARTITION_BY = [
Expand Down
53 changes: 17 additions & 36 deletions tests/unit/butterfree/transform/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@
from butterfree.transform.utils import Function


def create_dataframe(data, timestamp_col="ts"):
pdf = ps.DataFrame.from_dict(data)
df = pdf.to_spark()
df = df.withColumn(
TIMESTAMP_COLUMN, df[timestamp_col].cast(DataType.TIMESTAMP.spark)
)
return df


def make_dataframe(spark_context, spark_session):
data = [
{
Expand Down Expand Up @@ -54,11 +63,7 @@ def make_dataframe(spark_context, spark_session):
"nonfeature": 0,
},
]
pdf = ps.DataFrame.from_dict(data)
df = pdf.to_spark()
df = df.withColumn(TIMESTAMP_COLUMN, df.ts.cast(DataType.TIMESTAMP.spark))

return df
return create_dataframe(data)


def make_filtering_dataframe(spark_context, spark_session):
Expand All @@ -71,11 +76,7 @@ def make_filtering_dataframe(spark_context, spark_session):
{"id": 1, "ts": 6, "feature1": None, "feature2": None, "feature3": None},
{"id": 1, "ts": 7, "feature1": None, "feature2": None, "feature3": None},
]
pdf = ps.DataFrame.from_dict(data)
df = pdf.to_spark()
df = df.withColumn(TIMESTAMP_COLUMN, df.ts.cast(DataType.TIMESTAMP.spark))

return df
return create_dataframe(data)


def make_output_filtering_dataframe(spark_context, spark_session):
Expand All @@ -86,11 +87,7 @@ def make_output_filtering_dataframe(spark_context, spark_session):
{"id": 1, "ts": 4, "feature1": 0, "feature2": 1, "feature3": 1},
{"id": 1, "ts": 6, "feature1": None, "feature2": None, "feature3": None},
]
pdf = ps.DataFrame.from_dict(data)
df = pdf.to_spark()
df = df.withColumn(TIMESTAMP_COLUMN, df.ts.cast(DataType.TIMESTAMP.spark))

return df
return create_dataframe(data)


def make_rolling_windows_agg_dataframe(spark_context, spark_session):
Expand Down Expand Up @@ -126,11 +123,7 @@ def make_rolling_windows_agg_dataframe(spark_context, spark_session):
"feature2__avg_over_1_week_rolling_windows": None,
},
]
pdf = ps.DataFrame.from_dict(data)
df = pdf.to_spark()
df = df.withColumn("timestamp", df.timestamp.cast(DataType.TIMESTAMP.spark))

return df
return create_dataframe(data, timestamp_col="timestamp")


def make_rolling_windows_hour_slide_agg_dataframe(spark_context, spark_session):
Expand All @@ -154,11 +147,7 @@ def make_rolling_windows_hour_slide_agg_dataframe(spark_context, spark_session):
"feature2__avg_over_1_day_rolling_windows": 500.0,
},
]
pdf = ps.DataFrame.from_dict(data)
df = pdf.to_spark()
df = df.withColumn("timestamp", df.timestamp.cast(DataType.TIMESTAMP.spark))

return df
return create_dataframe(data, timestamp_col="timestamp")


def make_multiple_rolling_windows_hour_slide_agg_dataframe(
Expand Down Expand Up @@ -202,11 +191,7 @@ def make_multiple_rolling_windows_hour_slide_agg_dataframe(
"feature2__avg_over_3_days_rolling_windows": 500.0,
},
]
pdf = ps.DataFrame.from_dict(data)
df = pdf.to_spark()
df = df.withColumn("timestamp", df.timestamp.cast(DataType.TIMESTAMP.spark))

return df
return create_dataframe(data, timestamp_col="timestamp")


def make_fs(spark_context, spark_session):
Expand Down Expand Up @@ -253,9 +238,7 @@ def make_fs_dataframe_with_distinct(spark_context, spark_session):
"h3": "86a8100efffffff",
},
]
pdf = ps.DataFrame.from_dict(data)
df = pdf.to_spark()
df = df.withColumn("timestamp", df.timestamp.cast(DataType.TIMESTAMP.spark))
df = create_dataframe(data, "timestamp")

return df

Expand Down Expand Up @@ -283,9 +266,7 @@ def make_target_df_distinct(spark_context, spark_session):
"feature__sum_over_3_days_rolling_windows": None,
},
]
pdf = ps.DataFrame.from_dict(data)
df = pdf.to_spark()
df = df.withColumn("timestamp", df.timestamp.cast(DataType.TIMESTAMP.spark))
df = create_dataframe(data, "timestamp")

return df

Expand Down
Loading