From 19f228709aa3df6de8b22609d7ee513dbe31c083 Mon Sep 17 00:00:00 2001 From: David Wood Date: Wed, 23 Oct 2024 14:19:48 -0400 Subject: [PATCH 01/11] add polars to try and read some troublesome parquet files to arrow tables Signed-off-by: David Wood --- data-processing-lib/python/pyproject.toml | 1 + .../src/data_processing/utils/transform_utils.py | 10 ++++++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/data-processing-lib/python/pyproject.toml b/data-processing-lib/python/pyproject.toml index f00d45a0a..774f6714c 100644 --- a/data-processing-lib/python/pyproject.toml +++ b/data-processing-lib/python/pyproject.toml @@ -17,6 +17,7 @@ dependencies = [ "argparse", "mmh3", "psutil", + "polars", ] [project_urls] diff --git a/data-processing-lib/python/src/data_processing/utils/transform_utils.py b/data-processing-lib/python/src/data_processing/utils/transform_utils.py index e2d37581c..4688373d6 100644 --- a/data-processing-lib/python/src/data_processing/utils/transform_utils.py +++ b/data-processing-lib/python/src/data_processing/utils/transform_utils.py @@ -11,6 +11,7 @@ ################################################################################ import hashlib +import io import os import string import sys @@ -144,8 +145,13 @@ def convert_binary_to_arrow(data: bytes, schema: pa.schema = None) -> pa.Table: table = pq.read_table(reader, schema=schema) return table except Exception as e: - logger.error(f"Failed to convert byte array to arrow table, exception {e}. Skipping it") - return None + logger.warning(f"Could not convert bytes to pyarrow: {e}") + + logger.info(f"Attempting read of pyarrow Table using polars") + import polars + + df = polars.read_parquet(io.BytesIO(data)) + table = df.to_arrow() @staticmethod def convert_arrow_to_binary(table: pa.Table) -> bytes: From 0116b9efaeab41e5d7f24e2f97a9eea84948a78f Mon Sep 17 00:00:00 2001 From: David Wood Date: Wed, 23 Oct 2024 20:43:53 -0400 Subject: [PATCH 02/11] fix bug in convert_binary_to_arrow() by returnning table from polars Signed-off-by: David Wood --- .../python/src/data_processing/utils/transform_utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/data-processing-lib/python/src/data_processing/utils/transform_utils.py b/data-processing-lib/python/src/data_processing/utils/transform_utils.py index 4688373d6..e9a10f396 100644 --- a/data-processing-lib/python/src/data_processing/utils/transform_utils.py +++ b/data-processing-lib/python/src/data_processing/utils/transform_utils.py @@ -152,6 +152,7 @@ def convert_binary_to_arrow(data: bytes, schema: pa.schema = None) -> pa.Table: df = polars.read_parquet(io.BytesIO(data)) table = df.to_arrow() + return table @staticmethod def convert_arrow_to_binary(table: pa.Table) -> bytes: From ffe87d0b15b618c65f32fb132a88db290027f8f1 Mon Sep 17 00:00:00 2001 From: David Wood Date: Wed, 23 Oct 2024 20:45:52 -0400 Subject: [PATCH 03/11] update convert_binary_to_arrow() by catching exceptoins from polars Signed-off-by: David Wood --- .../src/data_processing/utils/transform_utils.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/data-processing-lib/python/src/data_processing/utils/transform_utils.py b/data-processing-lib/python/src/data_processing/utils/transform_utils.py index e9a10f396..e2fc6e2e7 100644 --- a/data-processing-lib/python/src/data_processing/utils/transform_utils.py +++ b/data-processing-lib/python/src/data_processing/utils/transform_utils.py @@ -148,10 +148,14 @@ def convert_binary_to_arrow(data: bytes, schema: pa.schema = None) -> pa.Table: logger.warning(f"Could not convert bytes to pyarrow: {e}") logger.info(f"Attempting read of pyarrow Table using polars") - import polars + try: + import polars - df = polars.read_parquet(io.BytesIO(data)) - table = df.to_arrow() + df = polars.read_parquet(io.BytesIO(data)) + table = df.to_arrow() + except Exception as e: + logger.warning(f"Could not convert bytes to pyarrow using polars: {e}. Skipping.") + table = None return table @staticmethod From ca3560a1edb50f5d8e2db8b2df463e29f73327a9 Mon Sep 17 00:00:00 2001 From: David Wood Date: Tue, 29 Oct 2024 14:19:27 -0400 Subject: [PATCH 04/11] change filter's duckdb setting to allow large buffers on arrow tables Signed-off-by: David Wood --- transforms/universal/filter/python/src/filter_transform.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/transforms/universal/filter/python/src/filter_transform.py b/transforms/universal/filter/python/src/filter_transform.py index fe002e497..9a521659e 100644 --- a/transforms/universal/filter/python/src/filter_transform.py +++ b/transforms/universal/filter/python/src/filter_transform.py @@ -67,6 +67,10 @@ def __init__(self, config: dict): self.logical_operator = config.get(filter_logical_operator_key, filter_logical_operator_default) self.columns_to_drop = config.get(filter_columns_to_drop_key, filter_columns_to_drop_default) + # Temporarily here to test if this can allow use to process files that are required to be read by polars for mm + # If this works, we should add as a configurable or always enable (not sure of the downside of enabling this). + duckdb.execute("SET arrow_large_buffer_size = true") + def transform(self, table: pa.Table, file_name: str = None) -> tuple[list[pa.Table], dict]: """ This implementation filters the input table using a SQL statement and From 2f0f22bbfa2b24d68242bf8831cc45becd3566ab Mon Sep 17 00:00:00 2001 From: David Wood Date: Wed, 20 Nov 2024 09:48:54 -0500 Subject: [PATCH 05/11] turn off changes to filter for now Signed-off-by: David Wood --- transforms/universal/filter/python/src/filter_transform.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transforms/universal/filter/python/src/filter_transform.py b/transforms/universal/filter/python/src/filter_transform.py index 9a521659e..7a09bd389 100644 --- a/transforms/universal/filter/python/src/filter_transform.py +++ b/transforms/universal/filter/python/src/filter_transform.py @@ -69,7 +69,7 @@ def __init__(self, config: dict): # Temporarily here to test if this can allow use to process files that are required to be read by polars for mm # If this works, we should add as a configurable or always enable (not sure of the downside of enabling this). - duckdb.execute("SET arrow_large_buffer_size = true") + # duckdb.execute("SET arrow_large_buffer_size = true") def transform(self, table: pa.Table, file_name: str = None) -> tuple[list[pa.Table], dict]: """ From 6c7ed53feece038360ade1ec1fc813205228a456 Mon Sep 17 00:00:00 2001 From: David Wood Date: Wed, 20 Nov 2024 10:02:28 -0500 Subject: [PATCH 06/11] add polars to core library Signed-off-by: David Wood --- data-processing-lib/python/requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/data-processing-lib/python/requirements.txt b/data-processing-lib/python/requirements.txt index 7b363f2b5..532401f35 100644 --- a/data-processing-lib/python/requirements.txt +++ b/data-processing-lib/python/requirements.txt @@ -4,3 +4,4 @@ argparse mmh3 psutil + polars From dd957e37f93125b27f9e1de3bbabf789af28b022 Mon Sep 17 00:00:00 2001 From: David Wood Date: Thu, 21 Nov 2024 14:44:48 -0500 Subject: [PATCH 07/11] add comment to say way we're adding polars for reading some parquet files Signed-off-by: David Wood --- .../python/src/data_processing/utils/transform_utils.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/data-processing-lib/python/src/data_processing/utils/transform_utils.py b/data-processing-lib/python/src/data_processing/utils/transform_utils.py index e2fc6e2e7..3e7cd1e30 100644 --- a/data-processing-lib/python/src/data_processing/utils/transform_utils.py +++ b/data-processing-lib/python/src/data_processing/utils/transform_utils.py @@ -147,6 +147,9 @@ def convert_binary_to_arrow(data: bytes, schema: pa.schema = None) -> pa.Table: except Exception as e: logger.warning(f"Could not convert bytes to pyarrow: {e}") + # We have seen this exception before when using pyarrow, but polars does not throw it. + # "Nested data conversions not implemented for chunked array outputs" + # See issue 816 https://github.com/IBM/data-prep-kit/issues/816. logger.info(f"Attempting read of pyarrow Table using polars") try: import polars From fc95c6e4585f6b2f36f868e4cb02f1566448b1a0 Mon Sep 17 00:00:00 2001 From: David Wood Date: Mon, 2 Dec 2024 10:18:07 -0500 Subject: [PATCH 08/11] pin core lib polars>=1.16.0 Signed-off-by: David Wood --- data-processing-lib/python/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-processing-lib/python/requirements.txt b/data-processing-lib/python/requirements.txt index 532401f35..846bf48c8 100644 --- a/data-processing-lib/python/requirements.txt +++ b/data-processing-lib/python/requirements.txt @@ -4,4 +4,4 @@ argparse mmh3 psutil - polars + polars>=1.16.0 From 6123a6efcd9e5e0da33efd37964481051ebbb12d Mon Sep 17 00:00:00 2001 From: David Wood Date: Mon, 2 Dec 2024 10:20:42 -0500 Subject: [PATCH 09/11] change failure on polars read from warning to error Signed-off-by: David Wood --- .../python/src/data_processing/utils/transform_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-processing-lib/python/src/data_processing/utils/transform_utils.py b/data-processing-lib/python/src/data_processing/utils/transform_utils.py index 3e7cd1e30..ccb7f3fe8 100644 --- a/data-processing-lib/python/src/data_processing/utils/transform_utils.py +++ b/data-processing-lib/python/src/data_processing/utils/transform_utils.py @@ -157,7 +157,7 @@ def convert_binary_to_arrow(data: bytes, schema: pa.schema = None) -> pa.Table: df = polars.read_parquet(io.BytesIO(data)) table = df.to_arrow() except Exception as e: - logger.warning(f"Could not convert bytes to pyarrow using polars: {e}. Skipping.") + logger.error(f"Could not convert bytes to pyarrow using polars: {e}. Skipping.") table = None return table From 0eb6a29024a6c01f1101a2588f6fb24f916d196f Mon Sep 17 00:00:00 2001 From: David Wood Date: Mon, 2 Dec 2024 10:24:35 -0500 Subject: [PATCH 10/11] remove comments on duckdb settings for multimodal in FilterTransform.init(). Signed-off-by: David Wood --- transforms/universal/filter/python/src/filter_transform.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/transforms/universal/filter/python/src/filter_transform.py b/transforms/universal/filter/python/src/filter_transform.py index 7a09bd389..fe002e497 100644 --- a/transforms/universal/filter/python/src/filter_transform.py +++ b/transforms/universal/filter/python/src/filter_transform.py @@ -67,10 +67,6 @@ def __init__(self, config: dict): self.logical_operator = config.get(filter_logical_operator_key, filter_logical_operator_default) self.columns_to_drop = config.get(filter_columns_to_drop_key, filter_columns_to_drop_default) - # Temporarily here to test if this can allow use to process files that are required to be read by polars for mm - # If this works, we should add as a configurable or always enable (not sure of the downside of enabling this). - # duckdb.execute("SET arrow_large_buffer_size = true") - def transform(self, table: pa.Table, file_name: str = None) -> tuple[list[pa.Table], dict]: """ This implementation filters the input table using a SQL statement and From bdd77e66b5223901b8582a6e29776989ec8370d2 Mon Sep 17 00:00:00 2001 From: David Wood Date: Mon, 2 Dec 2024 10:40:39 -0500 Subject: [PATCH 11/11] downgrade polars to >=1.9.0 Signed-off-by: David Wood --- data-processing-lib/python/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-processing-lib/python/requirements.txt b/data-processing-lib/python/requirements.txt index 846bf48c8..318d715d5 100644 --- a/data-processing-lib/python/requirements.txt +++ b/data-processing-lib/python/requirements.txt @@ -4,4 +4,4 @@ argparse mmh3 psutil - polars>=1.16.0 + polars>=1.9.0