Skip to content

Commit

Permalink
fix: skips geometry and processes wkb (#858)
Browse files Browse the repository at this point in the history
* fix: skips geometry and processes wkb

* removed repeated test

* fix: skipped bytes

* fix: skipped bytes
  • Loading branch information
nazarfil authored and qgerome committed Dec 2, 2024
1 parent 27b29ca commit 2fe2399
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 28 deletions.
5 changes: 3 additions & 2 deletions hexa/datasets/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import base64
import logging
import math
import secrets
Expand Down Expand Up @@ -356,14 +355,16 @@ class DataframeJsonEncoder(DjangoJSONEncoder):
def encode(self, obj):
# Recursively replace NaN with None (since it's a float, it does not call 'default' method)
def custom_encoding(item):
SKIPPED_FIELD = "<SKIPPED_BYTES>"

if isinstance(item, float) and math.isnan(item):
return None
elif isinstance(item, dict):
return {key: custom_encoding(value) for key, value in item.items()}
elif isinstance(item, list):
return [custom_encoding(element) for element in item]
elif isinstance(item, bytes):
return base64.b64encode(item).decode("utf-8")
return SKIPPED_FIELD
return item

# Preprocess the object to replace NaN values with None and encode bytes to base64
Expand Down
54 changes: 32 additions & 22 deletions hexa/datasets/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,28 +97,38 @@ def generate_sample(

def generate_profile(df: pd.DataFrame) -> list:
logger.info("Calculating profiling per column")
for col in df.select_dtypes(include=["object"]).columns:
df[col] = df[col].astype("string")

data_types = df.dtypes.apply(str).to_dict()
missing_values = df.isnull().sum().to_dict()
unique_values = df.nunique().to_dict()
distinct_values = df.apply(lambda x: x.nunique(dropna=False)).to_dict()
constant_values = df.apply(lambda x: x.nunique() == 1).astype("bool").to_dict()

metadata_per_column = [
{
"column_name": column,
"data_type": data_types.get(column),
"missing_values": missing_values.get(column),
"unique_values": unique_values.get(column),
"distinct_values": distinct_values.get(column),
"constant_values": constant_values.get(column),
}
for column in df.columns
]
try:
if "geometry" in df.columns:
logger.warning("Skipping the 'geometry' column from profiling.")
df = df.drop(columns=["geometry"])
for col in df.select_dtypes(include=["object"]).columns:
try:
df[col] = df[col].astype("string")
except Exception as e:
logger.warning(f"Failed to convert column '{col}' to string: {e}")

data_types = df.dtypes.apply(str).to_dict()
missing_values = df.isnull().sum().to_dict()
unique_values = df.nunique().to_dict()
distinct_values = df.apply(lambda x: x.nunique(dropna=False)).to_dict()
constant_values = df.apply(lambda x: x.nunique() == 1).astype("bool").to_dict()

metadata_per_column = [
{
"column_name": column,
"data_type": data_types.get(column),
"missing_values": missing_values.get(column),
"unique_values": unique_values.get(column),
"distinct_values": distinct_values.get(column),
"constant_values": constant_values.get(column),
}
for column in df.columns
]
return metadata_per_column

return metadata_per_column
except Exception as e:
logger.exception("Failed to calculate profiling", exc_info=e)
return []


def get_previous_version_file(
Expand Down Expand Up @@ -192,7 +202,7 @@ def generate_file_metadata_task(version_file: DatasetVersionFile) -> None:
f"Failed to load dataframe for file {version_file.id}", exc_info=e
)
return

logger.info("Finished sample generation, calculating profiling")
add_system_attributes(version_file, df)


Expand Down
Binary file not shown.
14 changes: 14 additions & 0 deletions hexa/datasets/tests/fixtures/wkb_geometry_encoded.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
wkb_geometry = {
"bbox": {
"xmax": -66.96465999999998,
"xmin": -171.79111060289122,
"ymax": 71.35776357694175,
"ymin": 18.91619,
},
"geometry": "<SKIPPED_BYTES>",
"name": "United States of America",
"iso_a3": "USA",
"pop_est": 328239523.0,
"continent": "North America",
"gdp_md_est": 21433226,
}
53 changes: 49 additions & 4 deletions hexa/datasets/tests/test_generate_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
get_previous_version_file,
load_df,
)
from hexa.datasets.tests.fixtures.wkb_geometry_encoded import wkb_geometry
from hexa.datasets.tests.testutils import DatasetTestMixin
from hexa.files import storage
from hexa.metadata.models import MetadataAttribute
Expand Down Expand Up @@ -147,19 +148,19 @@ def test_generate_sample(
DatasetFileSample.STATUS_FINISHED,
[
{
"geometry": "AQMAAAACAAAAoQEAAQ==",
"geometry": "<SKIPPED_BYTES>",
"id": "2",
"name": "District B",
"value": 2.5,
},
{
"geometry": "AQMAAAABAAAAoAAAAA==",
"geometry": "<SKIPPED_BYTES>",
"id": "1",
"name": "District A",
"value": None,
},
{
"geometry": "AQMAAAABAAAAoAAAAA==",
"geometry": "<SKIPPED_BYTES>",
"id": "1",
"name": "District A",
"value": None,
Expand All @@ -176,7 +177,51 @@ def test_generate_sample(
) in CASES:
with self.subTest(fixture_name=fixture_name):
fixture_file_path = os.path.join(
os.path.dirname(__file__), f"./fixtures/{fixture_name}"
os.path.dirname(__file__), "fixtures", fixture_name
)
version_file = DatasetVersionFile.objects.create_if_has_perm(
self.USER_SERENA,
self.DATASET_VERSION,
uri=fixture_file_path,
content_type="application/octet-stream",
)

with patch(
"hexa.datasets.queue.generate_download_url"
) as mock_generate_download_url:
mock_generate_download_url.return_value = fixture_file_path
df = load_df(version_file)
generate_sample(version_file, df)
sample_entry = version_file.sample_entry
self.assertEqual(sample_entry.status, expected_status)
self.assertEqual(sample_entry.sample, expected_sample)

if expected_status_reason:
self.assertEqual(
sample_entry.status_reason, expected_status_reason
)

@override_settings(WORKSPACE_DATASETS_FILE_SNAPSHOT_SIZE=1)
def test_generate_sample_wkb(
self,
):
CASES = [
(
"example_with_wkb_geometry.parquet",
DatasetFileSample.STATUS_FINISHED,
[wkb_geometry],
None,
),
]
for (
fixture_name,
expected_status,
expected_sample,
expected_status_reason,
) in CASES:
with self.subTest(fixture_name=fixture_name):
fixture_file_path = os.path.join(
os.path.dirname(__file__), "fixtures", fixture_name
)
version_file = DatasetVersionFile.objects.create_if_has_perm(
self.USER_SERENA,
Expand Down

0 comments on commit 2fe2399

Please sign in to comment.