Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated sync all columns in single json #237

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 8 additions & 107 deletions target_postgres/denest.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from copy import deepcopy

from target_postgres import json_schema, singer


def to_table_batches(schema, key_properties, records):
"""
Given a schema, and records, get all table schemas and records and prep them
Expand Down Expand Up @@ -98,6 +96,7 @@ def _literal_only_schema(schema):


def _create_subtable(table_path, table_json_schema, key_prop_schemas, subtables, level):

if json_schema.is_object(table_json_schema['items']):
new_properties = table_json_schema['items']['properties']
else:
Expand Down Expand Up @@ -156,7 +155,7 @@ def _denest_schema_helper(
key_prop_schemas,
subtables,
level):

for prop, item_json_schema in _denest_schema__singular_schemas(table_json_schema):

if json_schema.is_object(item_json_schema):
Expand Down Expand Up @@ -196,30 +195,10 @@ def _denest_schema(

new_properties = {}
for prop, item_json_schema in _denest_schema__singular_schemas(table_json_schema):

if json_schema.is_object(item_json_schema):
_denest_schema_helper(table_path + (prop,),
(prop,),
item_json_schema,
json_schema.is_nullable(item_json_schema),
new_properties,
key_prop_schemas,
subtables,
level)

elif json_schema.is_iterable(item_json_schema):
_create_subtable(table_path + (prop,),
item_json_schema,
key_prop_schemas,
subtables,
level + 1)

elif json_schema.is_literal(item_json_schema):
if (prop,) in new_properties:
new_properties[(prop,)]['anyOf'].append(item_json_schema)
else:
new_properties[(prop,)] = {'anyOf': [item_json_schema]}

if (prop,) in new_properties:
new_properties[(prop,)]['anyOf'].append(item_json_schema)
else:
new_properties[(prop,)] = {'anyOf': [item_json_schema]}

table_json_schema['properties'] = new_properties

Expand All @@ -245,60 +224,6 @@ def _get_streamed_table_records(key_properties, records):
return records_map


def _denest_subrecord(table_path,
prop_path,
parent_record,
record,
records_map,
key_properties,
pk_fks,
level):
""""""
"""
{...}
"""
for prop, value in record.items():
"""
str : {...} | [...] | ???None??? | <literal>
"""

if isinstance(value, dict):
"""
{...}
"""
_denest_subrecord(table_path + (prop,),
prop_path + (prop,),
parent_record,
value,
records_map,
key_properties,
pk_fks,
level)

elif isinstance(value, list):
"""
[...]
"""
_denest_records(table_path + (prop,),
value,
records_map,
key_properties,
pk_fks=pk_fks,
level=level + 1)

elif value is None:
"""
None
"""
continue

else:
"""
<literal>
"""
parent_record[prop_path + (prop,)] = (json_schema.python_type(value), value)


def _denest_record(table_path, record, records_map, key_properties, pk_fks, level):
""""""
"""
Expand All @@ -309,32 +234,7 @@ def _denest_record(table_path, record, records_map, key_properties, pk_fks, leve
"""
str : {...} | [...] | None | <literal>
"""

if isinstance(value, dict):
"""
{...}
"""
_denest_subrecord(table_path + (prop,),
(prop,),
denested_record,
value,
records_map,
key_properties,
pk_fks,
level)

elif isinstance(value, list):
"""
[...]
"""
_denest_records(table_path + (prop,),
value,
records_map,
key_properties,
pk_fks=pk_fks,
level=level + 1)

elif value is None:
if value is None:
"""
None
"""
Expand All @@ -348,6 +248,7 @@ def _denest_record(table_path, record, records_map, key_properties, pk_fks, leve

if table_path not in records_map:
records_map[table_path] = []

records_map[table_path].append(denested_record)


Expand Down
9 changes: 6 additions & 3 deletions target_postgres/json_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@
bool: BOOLEAN,
str: STRING,
type(None): NULL,
decimal.Decimal: NUMBER
decimal.Decimal: NUMBER,
dict: OBJECT,
list: ARRAY
}


def python_type(x):
"""
Given a value `x`, return its Python Type as a JSONSchema type.
Expand Down Expand Up @@ -559,7 +560,9 @@ def validation_errors(schema):
'number': 'f',
'integer': 'i',
'boolean': 'b',
'date-time': 't'
'date-time': 't',
'object': 'o',
'array': 'a'
}


Expand Down
90 changes: 86 additions & 4 deletions target_postgres/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,76 @@
from target_postgres import json_schema, singer
from target_postgres.exceptions import PostgresError
from target_postgres.sql_base import SEPARATOR, SQLInterface
from decimal import Decimal


RESERVED_NULL_DEFAULT = 'NULL'

@lru_cache(maxsize=128)
def transform_dict(input_dict,id_columns):
id_columns=eval(id_columns)
sdc_columns = [key for key in input_dict['properties'] if key.startswith("_sdc_")]
tenant_id = [key for key in input_dict['properties'] if key == "tenant_id"]

# Create a copy of the original dictionary
result_dict = {
'type': input_dict['type'],
'properties': {}
}

# Move id_columns to the result dictionary
for column in id_columns:
if column in input_dict['properties']:
result_dict['properties'][column] = input_dict['properties'][column]

# Move sdc_columns to the result dictionary
for column in sdc_columns:
if column in input_dict['properties']:
result_dict['properties'][column] = input_dict['properties'][column]

# Move tenant_id to the result dictionary
for column in tenant_id:
if column in input_dict['properties']:
result_dict['properties'][column] = input_dict['properties'][column]

# Move other columns to the 'result' dictionary
result_dict['properties']['record'] = {
'type': ['object'],
'properties': {
column: input_dict['properties'][column]
for column in input_dict['properties']
if column not in id_columns + sdc_columns + tenant_id
}
}

return result_dict


def transform_data_dict(input_string,id_columns):
id_columns=eval(id_columns)
data=eval(input_string)
result_list = []

for entry in data:
result_entry = {'record': {}}

# Separate id_columns and columns starting with "_sdc_"
id_columns = {key: entry.pop(key) for key in id_columns}
sdc_columns = {key: entry.pop(key) for key in entry.copy() if key.startswith("_sdc_")}
tenant_id = {key: entry.pop(key) for key in entry.copy() if key == "tenant_id"}

# Move remaining columns to the 'result' dictionary
result_entry['record'] = entry

# Add id_columns and sdc_columns back to the result_entry
result_entry.update(id_columns)
result_entry.update(sdc_columns)
result_entry.update(tenant_id)

result_list.append(result_entry)

return result_list

def _format_datetime(value):
"""
Format a datetime value. This is only called from the
Expand Down Expand Up @@ -303,9 +368,9 @@ def write_batch(self, stream_buffer):

written_batches_details = self.write_batch_helper(cur,
root_table_name,
stream_buffer.schema,
transform_dict(stream_buffer.schema,str(stream_buffer.key_properties)),
stream_buffer.key_properties,
stream_buffer.get_batch(),
transform_data_dict(str(stream_buffer.get_batch()),str(stream_buffer.key_properties)),
{'version': target_table_version})

cur.execute('COMMIT;')
Expand Down Expand Up @@ -569,6 +634,7 @@ def persist_csv_rows(self,
sql.Identifier(temp_table_name),
sql.SQL(', ').join(map(sql.Identifier, columns)),
sql.Literal(RESERVED_NULL_DEFAULT))

cur.copy_expert(copy, csv_rows)

pattern = re.compile(singer.LEVEL_FMT.format('[0-9]+'))
Expand All @@ -582,6 +648,7 @@ def persist_csv_rows(self,
canonicalized_key_properties,
columns,
subkeys)

cur.execute(update_sql)

def write_table_batch(self, cur, table_batch, metadata):
Expand All @@ -601,10 +668,19 @@ def write_table_batch(self, cur, table_batch, metadata):
csv_headers = list(remote_schema['schema']['properties'].keys())
rows_iter = iter(table_batch['records'])

def handle_decimal(obj):
if isinstance(obj, Decimal):
return float(obj)
raise TypeError(f"Object of type '{type(obj).__name__}' is not JSON serializable")

def transform():
try:
row = next(rows_iter)

for header in csv_headers:
if header in row and isinstance(row[header], (dict, list)):
row[header] = json.dumps(row[header], default=handle_decimal)

with io.StringIO() as out:
writer = csv.DictWriter(out, csv_headers)
writer.writerow(row)
Expand All @@ -623,8 +699,7 @@ def transform():

return len(table_batch['records'])

def add_column(self, cur, table_name, column_name, column_schema):

def add_column(self, cur, table_name, column_name, column_schema):
cur.execute(sql.SQL('''
ALTER TABLE {table_schema}.{table_name}
ADD COLUMN {column_name} {data_type};
Expand Down Expand Up @@ -818,6 +893,7 @@ def sql_type_to_json_schema(self, sql_type, is_nullable):
:return: JSONSchema
"""
_format = None

if sql_type == 'timestamp with time zone':
json_type = 'string'
_format = 'date-time'
Expand All @@ -829,6 +905,8 @@ def sql_type_to_json_schema(self, sql_type, is_nullable):
json_type = 'boolean'
elif sql_type == 'text':
json_type = 'string'
elif sql_type == 'jsonb':
json_type = 'json'
else:
raise PostgresError('Unsupported type `{}` in existing target table'.format(sql_type))

Expand Down Expand Up @@ -869,6 +947,10 @@ def json_schema_to_sql_type(self, schema):
sql_type = 'bigint'
elif _type == 'number':
sql_type = 'double precision'
elif _type == 'object':
sql_type = 'jsonb'
elif _type == 'array':
sql_type = 'jsonb'

if not_null:
sql_type += ' NOT NULL'
Expand Down