diff --git a/pyproject.toml b/pyproject.toml index 2673f8f..214729f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "target-s3" -version = "1.0.1" +version = "1.0.3" description = "`target-s3` is a Singer target for s3, built with the Meltano Singer SDK." authors = ["crowemi"] keywords = [ diff --git a/target_s3/formats/format_parquet.py b/target_s3/formats/format_parquet.py index 1802adf..1b56363 100644 --- a/target_s3/formats/format_parquet.py +++ b/target_s3/formats/format_parquet.py @@ -40,17 +40,31 @@ def create_filesystem( raise e def validate(self, schema: dict, field, value) -> dict: - def unpack_dict(record): + """Validates data elements.""" + + def unpack_dict(record) -> dict: ret = dict() for field in record: - if isinstance(value[field], dict): - ret[field] = unpack_dict(value[field]) + if isinstance(record[field], dict): + ret[field] = unpack_dict(record[field]) + elif isinstance(record[field], list): + ret[field] = unpack_list(record[field]) + else: + ret[field] = {"type": type(record[field])} + return ret + + def unpack_list(record) -> dict: + ret = dict() + for idx, value in enumerate(record): + if isinstance(record[idx], dict): + ret[idx] = unpack_dict(value) + elif isinstance(record[idx], list): + ret[idx] = unpack_list(value) else: - ret[field] = {"type": type(value[field])} + ret[idx] = {"type": type(value)} return ret - def validate_dict(value): - fields = schema[field].get("fields") + def validate_dict(value, fields): for v in value: # make sure value is in fields if not v in fields: @@ -62,13 +76,37 @@ def validate_dict(value): else: # check data type if isinstance(value[v], dict): - value[v] = unpack_dict(value[field]) + value[v] = validate_dict(value[v], fields[v]) + if isinstance(value[v], list): + value[v] = validate_list(value[v], fields[v]) else: expected_type = fields[v].get("type") if not isinstance(value[v], expected_type): value[v] = expected_type(value[v]) return value + def validate_list(value, fields): + for i, v in enumerate(value): + if not i in fields: + # add field and type + if isinstance(v, dict): + fields[i] = unpack_dict(v) + if isinstance(v, list): + fields[i] = unpack_list(v) + else: + fields[i] = {"type": type(v)} + else: + # validate + if isinstance(v, dict): + value[i] = validate_dict(v, fields[i]) + if isinstance(v, list): + value[i] = validate_list(v, fields[i]) + else: + expected_type = fields[i].get("type") + if not isinstance(v, expected_type): + value[i] = expected_type(v) + return value + if field in schema: # make sure datatypes align if isinstance(value, dict): @@ -76,7 +114,9 @@ def validate_dict(value): # pyarrow can't process empty struct, return None return None else: - validate_dict(value) + validate_dict(value, schema[field].get("fields")) + elif isinstance(value, list): + validate_list(value, schema[field].get("fields")) else: expected_type = schema[field].get("type") if not isinstance(value, expected_type): @@ -88,6 +128,8 @@ def validate_dict(value): # add new entry for field if isinstance(value, dict): schema[field] = {"type": type(value), "fields": unpack_dict(value)} + elif isinstance(value, list): + schema[field] = {"type": type(value), "fields": unpack_list(value)} else: schema[field] = {"type": type(value)} @@ -100,7 +142,8 @@ def create_dataframe(self) -> Table: for d in self.records: fields = fields.union(d.keys()) - if self.format.get("format_parquet", None).get("validate", None): + format_parquet = self.format.get("format_parquet", None) + if format_parquet and format_parquet.get("validate", None) == True: schema = dict() input = { f: [self.validate(schema, f, row.get(f)) for row in self.records] @@ -111,6 +154,7 @@ def create_dataframe(self) -> Table: ret = Table.from_pydict(mapping=input) except Exception as e: + self.logger.info(self.records) self.logger.error("Failed to create parquet dataframe.") self.logger.error(e) raise e