Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: 🎨 Parquet data type fixed #1456

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 45 additions & 42 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ google-cloud-batch = "~0.17.26"
duckdb = "~1.0.0"
google-cloud-storage = "~2.14.0"
pandas = "~2.2.2"
pyarrow = "~17.0.0"
pyarrow = "18.0.0"
rich = "~13.9.2" # Used for CLI pretty print

[tool.poetry.dependencies.sentry-sdk]
Expand Down
32 changes: 22 additions & 10 deletions robotoff/products.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from robotoff import settings
from robotoff.types import JSONType, ProductIdentifier, ServerType
from robotoff.utils import get_logger, gzip_jsonl_iter, http_session, jsonl_iter
from robotoff.utils import export

logger = get_logger(__name__)

Expand Down Expand Up @@ -581,21 +582,32 @@ def convert_jsonl_to_parquet(
dataset_path: Path = settings.JSONL_DATASET_PATH,
query_path: Path = settings.JSONL_TO_PARQUET_SQL_QUERY,
) -> None:
from pyarrow import ArrowException
logger.info("Start JSONL to Parquet conversion process.")
if not dataset_path.exists() or not query_path.exists():
raise FileNotFoundError(
f"{str(dataset_path)} or {str(query_path)} was not found."
)
query = (
query_path.read_text()
.replace("{dataset_path}", str(dataset_path))
.replace("{output_path}", output_file_path)
)
try:
duckdb.sql(query)
except duckdb.Error as e:
logger.error(f"Error executing query: {query}\nError message: {e}")
raise
with tempfile.TemporaryDirectory() as tmp_dir:
try:
tmp_parquet_path = os.path.join(tmp_dir, "temp.parquet")
query = (
query_path.read_text()
.replace("{dataset_path}", str(dataset_path))
.replace("{output_path}", tmp_parquet_path)
)
logger.info("Query the JSONL using DuckDB.")
duckdb.sql(query)
# logger.info("Post-process extracted data using Arrow")
# arrow_batches = export.load_parquet()
logger.info("Write post-processed data into Parquet.")
export.sink_to_parquet(tmp_parquet_path, output_file_path)
except duckdb.Error as e:
logger.error("Error executing query: %s\nError message: %s", query, e)
raise
except ArrowException as e:
logger.error(e)
raise
logger.info("JSONL successfully converted into Parquet file.")


Expand Down
137 changes: 137 additions & 0 deletions robotoff/utils/export.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
"Functions to postprocess the database conversion into Parquet."

import json

import pyarrow as pa
import pyarrow.parquet as pq

################
# Schemas
################
## Images field
_size_schema = pa.struct(
[
pa.field("h", pa.int32(), nullable=True),
pa.field("w", pa.int32(), nullable=True),
]
)


IMAGES_DATATYPE = pa.list_(
pa.struct(
[
pa.field("key", pa.string(), nullable=True),
pa.field("imgid", pa.string(), nullable=True),
pa.field(
"sizes",
pa.struct(
[
pa.field("100", _size_schema, nullable=True),
pa.field("200", _size_schema, nullable=True),
pa.field("400", _size_schema, nullable=True),
pa.field("full", _size_schema, nullable=True),
]
),
nullable=True,
),
pa.field("uploaded_t", pa.string(), nullable=True),
pa.field("uploader", pa.string(), nullable=True),
]
)
)

SCHEMAS = {"images": IMAGES_DATATYPE}


################
# Functions
################
def sink_to_parquet(parquet_path: str, output_path: str):
parquet_file = pq.ParquetFile(parquet_path)
updated_schema = update_schema(parquet_file.schema.to_arrow_schema())
with pq.ParquetWriter(output_path, schema=updated_schema) as writer:
for batch in parquet_file.iter_batches(batch_size=1000):
batch = _postprocess_arrow_batch(batch)
writer.write_batch(batch)


# def postprocess_arrow_batches(batches: pa.RecordBatchReader) -> pa.RecordBatchReader:
# schema = _udpate_schema_by_field(
# schema=batches.schema,
# field_name="images",
# field_datatype=IMAGES_DATATYPE,
# )
# batches = [_postprocess_arrow_batch(batch) for batch in batches]
# return pa.RecordBatchReader.from_batches(
# schema=schema,
# batches=batches,
# )


def _postprocess_arrow_batch(batch: pa.RecordBatch) -> pa.RecordBatch:
batch = _postprocess_images(batch)
return batch


def _postprocess_images(batch: pa.RecordBatch, datatype: pa.DataType = IMAGES_DATATYPE):
postprocessed_images = []
images: list[dict | None] = [
json.loads(image) if image else None for image in batch["images"].to_pylist()
]
for image in images:
if image:
postprocessed_images.append(
[
{
"key": str(key),
"imgid": str(value.get("imgid", "unknown")),
"sizes": {
"100": {
"h": int(value.get("sizes", {}).get("100", {}).get("h", 0) or 0), # (or 0) because "h" or "w" can be none, leading to an error with int
"w": int(value.get("sizes", {}).get("100", {}).get("w", 0) or 0),
},
"200": {
"h": int(value.get("sizes", {}).get("200", {}).get("h", 0) or 0),
"w": int(value.get("sizes", {}).get("200", {}).get("w", 0) or 0),
},
"400": {
"h": int(value.get("sizes", {}).get("400", {}).get("h", 0) or 0),
"w": int(value.get("sizes", {}).get("400", {}).get("w", 0) or 0),
},
"full": {
"h": int(value.get("sizes", {}).get("full", {}).get("h", 0) or 0),
"w": int(value.get("sizes", {}).get("full", {}).get("w", 0) or 0),
},
},
"uploaded_t": str(value.get("uploaded_t", "unknown")),
"uploader": str(value.get("uploader", "unknown")),
}
for key, value in image.items()
]
)
else:
postprocessed_images.append([])
images_col_index = batch.schema.get_field_index("images")
batch = batch.set_column(
images_col_index,
pa.field("images", datatype),
pa.array(postprocessed_images, type=datatype)
)
return batch


def update_schema(schema: pa.Schema) -> pa.Schema:
for field_name, field_datatype in SCHEMAS.items():
schema = _udpate_schema_by_field(
schema=schema, field_name=field_name, field_datatype=field_datatype
)
return schema


def _udpate_schema_by_field(
schema: pa.Schema, field_name: str, field_datatype: pa.DataType
) -> pa.schema:
field_index = schema.get_field_index(field_name)
schema = schema.remove(field_index)
schema = schema.insert(field_index, pa.field(field_name, field_datatype))
return schema
Loading
Loading