From 7ba8d1afe01e04cc4f56876ef1fcf7baf8e793c0 Mon Sep 17 00:00:00 2001 From: Ralph Rassweiler Date: Tue, 22 Aug 2023 10:58:58 -0300 Subject: [PATCH] Release/1.2.2 (#346) * Less strict requirements (#333) * bump a few requirements; increase lower bound for h3 version range; adding pyarrow dev dependency * fix type repr for spark types; fix: broken tests (pyspark 3.4) --------- Co-authored-by: Ralph Rassweiler * feat: optional row count validation (#340) * fix: parameter, libs (#341) --------- --- CHANGELOG.md | 7 ++++++ Makefile | 6 ++--- butterfree/configs/db/cassandra_config.py | 2 +- butterfree/load/sink.py | 17 +++++++------ .../historical_feature_store_writer.py | 7 +++++- butterfree/load/writers/writer.py | 2 ++ butterfree/reports/metadata.py | 4 +-- requirements.dev.txt | 10 ++++---- requirements.txt | 7 +++--- setup.cfg | 1 + setup.py | 4 +-- .../pipelines/test_feature_set_pipeline.py | 25 +++++++++++-------- 12 files changed, 56 insertions(+), 36 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6482ea7b1..27b680bf2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,8 +5,14 @@ Preferably use **Added**, **Changed**, **Removed** and **Fixed** topics in each ## [Unreleased] +## [1.2.2](https://github.com/quintoandar/butterfree/releases/tag/1.2.2) + +### Changed +* Optional row count validation ([#284](https://github.com/quintoandar/butterfree/pull/284)) +* Bump several libs versions ([#333](https://github.com/quintoandar/butterfree/pull/333)) ## [1.2.1](https://github.com/quintoandar/butterfree/releases/tag/1.2.1) + ### Changed * Update README.md ([#331](https://github.com/quintoandar/butterfree/pull/331)) * Update Github Actions Workflow runner ([#332](https://github.com/quintoandar/butterfree/pull/332)) @@ -16,6 +22,7 @@ Preferably use **Added**, **Changed**, **Removed** and **Fixed** topics in each * Add the missing link for H3 geohash ([#330](https://github.com/quintoandar/butterfree/pull/330)) ## [1.2.0](https://github.com/quintoandar/butterfree/releases/tag/1.2.0) + ### Added * [MLOP-636] Create migration classes ([#282](https://github.com/quintoandar/butterfree/pull/282)) * [MLOP-635] Rebase Incremental Job/Interval Run branch for test on selected feature sets ([#278](https://github.com/quintoandar/butterfree/pull/278)) diff --git a/Makefile b/Makefile index 95cc6e3a6..4109504f6 100644 --- a/Makefile +++ b/Makefile @@ -9,8 +9,8 @@ VERSION := $(shell grep __version__ setup.py | head -1 | cut -d \" -f2 | cut -d .PHONY: environment ## create virtual environment for butterfree environment: - @pyenv install -s 3.7.6 - @pyenv virtualenv 3.7.6 butterfree + @pyenv install -s 3.7.13 + @pyenv virtualenv 3.7.13 butterfree @pyenv local butterfree @PYTHONPATH=. python -m pip install --upgrade pip @@ -221,4 +221,4 @@ help: } \ printf "\n"; \ }' \ - | more $(shell test $(shell uname) = Darwin && echo '--no-init --raw-control-chars') \ No newline at end of file + | more $(shell test $(shell uname) = Darwin && echo '--no-init --raw-control-chars') diff --git a/butterfree/configs/db/cassandra_config.py b/butterfree/configs/db/cassandra_config.py index 3d94e7567..a038cb177 100644 --- a/butterfree/configs/db/cassandra_config.py +++ b/butterfree/configs/db/cassandra_config.py @@ -246,7 +246,7 @@ def translate(self, schema: List[Dict[str, Any]]) -> List[Dict[str, Any]]: cassandra_schema.append( { "column_name": features["column_name"], - "type": cassandra_mapping[str(features["type"])], + "type": cassandra_mapping[str(features["type"]).replace("()", "")], "primary_key": features["primary_key"], } ) diff --git a/butterfree/load/sink.py b/butterfree/load/sink.py index 0b0c10c9e..7c0328d6f 100644 --- a/butterfree/load/sink.py +++ b/butterfree/load/sink.py @@ -69,14 +69,15 @@ def validate( """ failures = [] for writer in self.writers: - try: - writer.validate( - feature_set=feature_set, - dataframe=dataframe, - spark_client=spark_client, - ) - except AssertionError as e: - failures.append(e) + if writer.row_count_validation: + try: + writer.validate( + feature_set=feature_set, + dataframe=dataframe, + spark_client=spark_client, + ) + except AssertionError as e: + failures.append(e) if failures: raise RuntimeError( diff --git a/butterfree/load/writers/historical_feature_store_writer.py b/butterfree/load/writers/historical_feature_store_writer.py index 489f22be1..0ea9b50c8 100644 --- a/butterfree/load/writers/historical_feature_store_writer.py +++ b/butterfree/load/writers/historical_feature_store_writer.py @@ -113,9 +113,14 @@ def __init__( debug_mode: bool = False, interval_mode: bool = False, check_schema_hook: Hook = None, + row_count_validation: bool = True, ): super(HistoricalFeatureStoreWriter, self).__init__( - db_config or MetastoreConfig(), debug_mode, interval_mode + db_config or MetastoreConfig(), + debug_mode, + interval_mode, + False, + row_count_validation, ) self.database = database or environment.get_variable( "FEATURE_STORE_HISTORICAL_DATABASE" diff --git a/butterfree/load/writers/writer.py b/butterfree/load/writers/writer.py index e12a4317e..5073f4726 100644 --- a/butterfree/load/writers/writer.py +++ b/butterfree/load/writers/writer.py @@ -26,6 +26,7 @@ def __init__( debug_mode: bool = False, interval_mode: bool = False, write_to_entity: bool = False, + row_count_validation: bool = True, ) -> None: super().__init__() self.db_config = db_config @@ -33,6 +34,7 @@ def __init__( self.debug_mode = debug_mode self.interval_mode = interval_mode self.write_to_entity = write_to_entity + self.row_count_validation = row_count_validation def with_( self, transformer: Callable[..., DataFrame], *args: Any, **kwargs: Any diff --git a/butterfree/reports/metadata.py b/butterfree/reports/metadata.py index d54bbba9d..dc1f7cbb4 100644 --- a/butterfree/reports/metadata.py +++ b/butterfree/reports/metadata.py @@ -162,7 +162,7 @@ def to_json(self) -> Any: "features": [ { "column_name": c["column_name"], - "data_type": str(c["type"]), + "data_type": str(c["type"]).replace("()", ""), "description": desc, } for c, desc in params._features @@ -208,7 +208,7 @@ def to_markdown(self) -> Any: features = ["Column name", "Data type", "Description"] for c, desc in params._features: - features.extend([c["column_name"], str(c["type"]), desc]) + features.extend([c["column_name"], str(c["type"]).replace("()", ""), desc]) count_rows = len(features) // 3 diff --git a/requirements.dev.txt b/requirements.dev.txt index 96ddefc18..4e164c83f 100644 --- a/requirements.dev.txt +++ b/requirements.dev.txt @@ -1,11 +1,11 @@ -cmake==3.18.4 -h3==3.7.0 -pyarrow==0.15.1 +h3==3.7.4 jupyter==1.0.0 twine==3.1.1 mypy==0.790 -pyspark-stubs==3.0.0 sphinx==3.5.4 sphinxemoji==0.1.8 sphinx-rtd-theme==0.5.2 -recommonmark==0.7.1 \ No newline at end of file +recommonmark==0.7.1 +pyarrow>=1.0.0 +setuptools +wheel diff --git a/requirements.txt b/requirements.txt index 9548edb31..d61d125bc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,9 +1,8 @@ cassandra-driver>=3.22.0,<4.0 mdutils>=1.2.2,<2.0 -pandas>=0.24,<1.1 +pandas>=0.24,<2.0 parameters-validation>=1.1.5,<2.0 pyspark==3.* typer>=0.3,<0.4 -setuptools>=41,<42 -typing-extensions==3.7.4.3 -boto3==1.17.* \ No newline at end of file +typing-extensions>3.7.4,<5 +boto3==1.17.* diff --git a/setup.cfg b/setup.cfg index 255fff848..cff001224 100644 --- a/setup.cfg +++ b/setup.cfg @@ -24,6 +24,7 @@ spark_options = spark.sql.session.timeZone: UTC spark.driver.bindAddress: 127.0.0.1 spark.sql.legacy.timeParserPolicy: LEGACY + spark.sql.legacy.createHiveTableByDefault: false [mypy] # suppress errors about unsatisfied imports diff --git a/setup.py b/setup.py index 07d476302..57a4b9825 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,7 @@ from setuptools import find_packages, setup __package_name__ = "butterfree" -__version__ = "1.2.1" +__version__ = "1.2.2" __repository_url__ = "https://github.com/quintoandar/butterfree" with open("requirements.txt") as f: @@ -34,7 +34,7 @@ license="Copyright", author="QuintoAndar", install_requires=requirements, - extras_require={"h3": ["cmake==3.16.3", "h3==3.4.2"]}, + extras_require={"h3": ["h3>=3.7.4,<4"]}, python_requires=">=3.7, <4", entry_points={"console_scripts": ["butterfree=butterfree._cli.main:app"]}, include_package_data=True, diff --git a/tests/integration/butterfree/pipelines/test_feature_set_pipeline.py b/tests/integration/butterfree/pipelines/test_feature_set_pipeline.py index 753dfe7c2..d67e0a387 100644 --- a/tests/integration/butterfree/pipelines/test_feature_set_pipeline.py +++ b/tests/integration/butterfree/pipelines/test_feature_set_pipeline.py @@ -77,9 +77,11 @@ def test_feature_set_pipeline( self, mocked_df, spark_session, fixed_windows_output_feature_set_dataframe, ): # arrange + table_reader_id = "a_source" table_reader_table = "table" table_reader_db = environment.get_variable("FEATURE_STORE_HISTORICAL_DATABASE") + create_temp_view(dataframe=mocked_df, name=table_reader_id) create_db_and_table( spark=spark_session, @@ -88,14 +90,16 @@ def test_feature_set_pipeline( table_reader_table=table_reader_table, ) - dbconfig = Mock() - dbconfig.mode = "overwrite" - dbconfig.format_ = "parquet" + path = "test_folder/historical/entity/feature_set" + + dbconfig = MetastoreConfig() dbconfig.get_options = Mock( - return_value={"path": "test_folder/historical/entity/feature_set"} + return_value={"mode": "overwrite", "format_": "parquet", "path": path} ) - historical_writer = HistoricalFeatureStoreWriter(db_config=dbconfig) + historical_writer = HistoricalFeatureStoreWriter( + db_config=dbconfig, debug_mode=True + ) # act test_pipeline = FeatureSetPipeline( @@ -151,9 +155,13 @@ def test_feature_set_pipeline( ) test_pipeline.run() + # act and assert + dbconfig.get_path_with_partitions = Mock( + return_value=["historical/entity/feature_set"] + ) + # assert - path = dbconfig.get_options("historical/entity/feature_set").get("path") - df = spark_session.read.parquet(path).orderBy(TIMESTAMP_COLUMN) + df = spark_session.sql("select * from historical_feature_store__feature_set") target_df = fixed_windows_output_feature_set_dataframe.orderBy( test_pipeline.feature_set.timestamp_column @@ -162,9 +170,6 @@ def test_feature_set_pipeline( # assert assert_dataframe_equality(df, target_df) - # tear down - shutil.rmtree("test_folder") - def test_feature_set_pipeline_with_dates( self, mocked_date_df,