Skip to content

Commit

Permalink
Merge pull request #13 from crowemi/update-validate
Browse files Browse the repository at this point in the history
updated validate method to handle list
  • Loading branch information
crowemi authored Apr 27, 2023
2 parents 7be1bca + 5a3a7ee commit 24b9d10
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 10 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "target-s3"
version = "1.0.1"
version = "1.0.3"
description = "`target-s3` is a Singer target for s3, built with the Meltano Singer SDK."
authors = ["crowemi"]
keywords = [
Expand Down
62 changes: 53 additions & 9 deletions target_s3/formats/format_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,31 @@ def create_filesystem(
raise e

def validate(self, schema: dict, field, value) -> dict:
def unpack_dict(record):
"""Validates data elements."""

def unpack_dict(record) -> dict:
ret = dict()
for field in record:
if isinstance(value[field], dict):
ret[field] = unpack_dict(value[field])
if isinstance(record[field], dict):
ret[field] = unpack_dict(record[field])
elif isinstance(record[field], list):
ret[field] = unpack_list(record[field])
else:
ret[field] = {"type": type(record[field])}
return ret

def unpack_list(record) -> dict:
ret = dict()
for idx, value in enumerate(record):
if isinstance(record[idx], dict):
ret[idx] = unpack_dict(value)
elif isinstance(record[idx], list):
ret[idx] = unpack_list(value)
else:
ret[field] = {"type": type(value[field])}
ret[idx] = {"type": type(value)}
return ret

def validate_dict(value):
fields = schema[field].get("fields")
def validate_dict(value, fields):
for v in value:
# make sure value is in fields
if not v in fields:
Expand All @@ -62,21 +76,47 @@ def validate_dict(value):
else:
# check data type
if isinstance(value[v], dict):
value[v] = unpack_dict(value[field])
value[v] = validate_dict(value[v], fields[v])
if isinstance(value[v], list):
value[v] = validate_list(value[v], fields[v])
else:
expected_type = fields[v].get("type")
if not isinstance(value[v], expected_type):
value[v] = expected_type(value[v])
return value

def validate_list(value, fields):
for i, v in enumerate(value):
if not i in fields:
# add field and type
if isinstance(v, dict):
fields[i] = unpack_dict(v)
if isinstance(v, list):
fields[i] = unpack_list(v)
else:
fields[i] = {"type": type(v)}
else:
# validate
if isinstance(v, dict):
value[i] = validate_dict(v, fields[i])
if isinstance(v, list):
value[i] = validate_list(v, fields[i])
else:
expected_type = fields[i].get("type")
if not isinstance(v, expected_type):
value[i] = expected_type(v)
return value

if field in schema:
# make sure datatypes align
if isinstance(value, dict):
if not value:
# pyarrow can't process empty struct, return None
return None
else:
validate_dict(value)
validate_dict(value, schema[field].get("fields"))
elif isinstance(value, list):
validate_list(value, schema[field].get("fields"))
else:
expected_type = schema[field].get("type")
if not isinstance(value, expected_type):
Expand All @@ -88,6 +128,8 @@ def validate_dict(value):
# add new entry for field
if isinstance(value, dict):
schema[field] = {"type": type(value), "fields": unpack_dict(value)}
elif isinstance(value, list):
schema[field] = {"type": type(value), "fields": unpack_list(value)}
else:
schema[field] = {"type": type(value)}

Expand All @@ -100,7 +142,8 @@ def create_dataframe(self) -> Table:
for d in self.records:
fields = fields.union(d.keys())

if self.format.get("format_parquet", None).get("validate", None):
format_parquet = self.format.get("format_parquet", None)
if format_parquet and format_parquet.get("validate", None) == True:
schema = dict()
input = {
f: [self.validate(schema, f, row.get(f)) for row in self.records]
Expand All @@ -111,6 +154,7 @@ def create_dataframe(self) -> Table:

ret = Table.from_pydict(mapping=input)
except Exception as e:
self.logger.info(self.records)
self.logger.error("Failed to create parquet dataframe.")
self.logger.error(e)
raise e
Expand Down

0 comments on commit 24b9d10

Please sign in to comment.