From 89cc86157ee5405f473f6966c3f1f7b7e7b67e21 Mon Sep 17 00:00:00 2001 From: Charles Julian Knight Date: Mon, 16 Aug 2021 17:21:12 -0400 Subject: [PATCH 1/5] add test to demonstrate issue --- tests/unit/test_target_tools.py | 46 +++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/tests/unit/test_target_tools.py b/tests/unit/test_target_tools.py index d12450a0..41d33e3d 100644 --- a/tests/unit/test_target_tools.py +++ b/tests/unit/test_target_tools.py @@ -154,6 +154,52 @@ class TestStream(ListStream): assert rows_persisted == expected_rows +def test_record_with_target_postgres_precision(): + values = [1, 1.0, 2, 2.0, 3, 7, 10.1] + records = [] + for value in values: + records.append({ + "type": "RECORD", + "stream": "test", + "record": {"multipleOfKey": value}, + }) + + class TestStream(ListStream): + stream = [ + { + "type": "SCHEMA", + "stream": "test", + "schema": { + "properties": { + "multipleOfKey": { + "type": [ + "null", + "number" + ], + "exclusiveMaximum": True, + "maximum": 100000000000000000000000000000000000000000000000000000000000000, + "multipleOf": 1e-38, + "exclusiveMinimum": True, + "minimum": -100000000000000000000000000000000000000000000000000000000000000 + } + } + }, + "key_properties": [] + } + ] + records + + target = Target() + + target_tools.stream_to_target(TestStream(), target, config=CONFIG.copy()) + + expected_rows = len(records) + rows_persisted = 0 + for call in target.calls['write_batch']: + rows_persisted += call['records_count'] + + assert rows_persisted == expected_rows + + def test_state__capture(capsys): stream = [ json.dumps({'type': 'STATE', 'value': {'test': 'state-1'}}), From 6caf51cf4db62b0f809a79e02eef4407c1e3c5fc Mon Sep 17 00:00:00 2001 From: Charles Julian Knight Date: Mon, 16 Aug 2021 17:27:52 -0400 Subject: [PATCH 2/5] port solution --- target_postgres/json_schema.py | 34 ++++++++++++++++++++++++++++++++ target_postgres/singer_stream.py | 1 + 2 files changed, 35 insertions(+) diff --git a/target_postgres/json_schema.py b/target_postgres/json_schema.py index 8e971877..5718c7b1 100644 --- a/target_postgres/json_schema.py +++ b/target_postgres/json_schema.py @@ -1,6 +1,7 @@ from copy import deepcopy import decimal import json +import math import re from jsonschema import Draft4Validator @@ -587,3 +588,36 @@ def shorthand(schema): t.append('date-time') return _type_shorthand(t) + +def numeric_schema_with_precision(schema): + if 'type' not in schema: + return False + if isinstance(schema['type'], list): + if 'number' not in schema['type']: + return False + elif schema['type'] != 'number': + return False + if 'multipleOf' in schema: + return True + return 'minimum' in schema or 'maximum' in schema + + +def walk_schema_for_numeric_precision(schema): + if isinstance(schema, list): + for v in schema: + walk_schema_for_numeric_precision(v) + elif isinstance(schema, dict): + if numeric_schema_with_precision(schema): + def get_precision(key): + v = abs(decimal.Decimal(schema.get(key, 1))).log10() + if v < 0: + return round(math.floor(v)) + return round(math.ceil(v)) + scale = -1 * get_precision('multipleOf') + digits = max(get_precision('minimum'), get_precision('maximum')) + precision = digits + scale + if decimal.getcontext().prec < precision: + decimal.getcontext().prec = precision + else: + for v in schema.values(): + walk_schema_for_numeric_precision(v) diff --git a/target_postgres/singer_stream.py b/target_postgres/singer_stream.py index 871d1c2c..abc9f20d 100644 --- a/target_postgres/singer_stream.py +++ b/target_postgres/singer_stream.py @@ -67,6 +67,7 @@ def __init__(self, def update_schema(self, schema, key_properties): # In order to determine whether a value _is in_ properties _or not_ we need to flatten `$ref`s etc. self.schema = json_schema.simplify(schema) + json_schema.walk_schema_for_numeric_precision(schema) self.key_properties = deepcopy(key_properties) # The validator can handle _many_ more things than our simplified schema, and is, in general handled by third party code From 68e3c3b2a424a338cc77ae71009f48c2e878569f Mon Sep 17 00:00:00 2001 From: Charles Julian Knight Date: Mon, 23 Aug 2021 12:37:17 -0400 Subject: [PATCH 3/5] Revert "port solution" This reverts commit 6caf51cf4db62b0f809a79e02eef4407c1e3c5fc. --- target_postgres/json_schema.py | 34 -------------------------------- target_postgres/singer_stream.py | 1 - 2 files changed, 35 deletions(-) diff --git a/target_postgres/json_schema.py b/target_postgres/json_schema.py index 5718c7b1..8e971877 100644 --- a/target_postgres/json_schema.py +++ b/target_postgres/json_schema.py @@ -1,7 +1,6 @@ from copy import deepcopy import decimal import json -import math import re from jsonschema import Draft4Validator @@ -588,36 +587,3 @@ def shorthand(schema): t.append('date-time') return _type_shorthand(t) - -def numeric_schema_with_precision(schema): - if 'type' not in schema: - return False - if isinstance(schema['type'], list): - if 'number' not in schema['type']: - return False - elif schema['type'] != 'number': - return False - if 'multipleOf' in schema: - return True - return 'minimum' in schema or 'maximum' in schema - - -def walk_schema_for_numeric_precision(schema): - if isinstance(schema, list): - for v in schema: - walk_schema_for_numeric_precision(v) - elif isinstance(schema, dict): - if numeric_schema_with_precision(schema): - def get_precision(key): - v = abs(decimal.Decimal(schema.get(key, 1))).log10() - if v < 0: - return round(math.floor(v)) - return round(math.ceil(v)) - scale = -1 * get_precision('multipleOf') - digits = max(get_precision('minimum'), get_precision('maximum')) - precision = digits + scale - if decimal.getcontext().prec < precision: - decimal.getcontext().prec = precision - else: - for v in schema.values(): - walk_schema_for_numeric_precision(v) diff --git a/target_postgres/singer_stream.py b/target_postgres/singer_stream.py index abc9f20d..871d1c2c 100644 --- a/target_postgres/singer_stream.py +++ b/target_postgres/singer_stream.py @@ -67,7 +67,6 @@ def __init__(self, def update_schema(self, schema, key_properties): # In order to determine whether a value _is in_ properties _or not_ we need to flatten `$ref`s etc. self.schema = json_schema.simplify(schema) - json_schema.walk_schema_for_numeric_precision(schema) self.key_properties = deepcopy(key_properties) # The validator can handle _many_ more things than our simplified schema, and is, in general handled by third party code From b04c11ab9c6b3a313ba1e2c0a0ac608ea43f234e Mon Sep 17 00:00:00 2001 From: Charles Julian Knight Date: Mon, 23 Aug 2021 12:48:23 -0400 Subject: [PATCH 4/5] set precision from config --- target_postgres/target_tools.py | 5 +++++ tests/unit/test_target_tools.py | 4 +++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/target_postgres/target_tools.py b/target_postgres/target_tools.py index c01cc40b..82bdd447 100644 --- a/target_postgres/target_tools.py +++ b/target_postgres/target_tools.py @@ -1,3 +1,4 @@ +import decimal import http.client import io import json @@ -53,6 +54,10 @@ def stream_to_target(stream, target, config={}): max_batch_size = config.get('max_batch_size', 104857600) # 100MB batch_detection_threshold = config.get('batch_detection_threshold', max(max_batch_rows / 40, 50)) + prec = config.get('decimal_precision') + if prec: + decimal.getcontext().prec = prec + line_count = 0 for line in stream: _line_handler(state_tracker, diff --git a/tests/unit/test_target_tools.py b/tests/unit/test_target_tools.py index 41d33e3d..c387124d 100644 --- a/tests/unit/test_target_tools.py +++ b/tests/unit/test_target_tools.py @@ -190,7 +190,9 @@ class TestStream(ListStream): target = Target() - target_tools.stream_to_target(TestStream(), target, config=CONFIG.copy()) + config = CONFIG.copy() + config['decimal_precision'] = 100 + target_tools.stream_to_target(TestStream(), target, config=config) expected_rows = len(records) rows_persisted = 0 From 91933b0891eeab2a6438096362c44ff7320256fa Mon Sep 17 00:00:00 2001 From: Charles Julian Knight Date: Mon, 23 Aug 2021 12:57:24 -0400 Subject: [PATCH 5/5] document config option --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 6e344184..54fee129 100644 --- a/README.md +++ b/README.md @@ -101,6 +101,7 @@ here. | `batch_detection_threshold` | `["integer", "null"]` | `5000`, or 1/40th `max_batch_rows` | How often, in rows received, to count the buffered rows and bytes to check if a flush is necessary. There's a slight performance penalty to checking the buffered records count or bytesize, so this controls how often this is polled in order to mitigate the penalty. This value is usually not necessary to set as the default is dynamically adjusted to check reasonably often. | | `state_support` | `["boolean", "null"]` | `True` | Whether the Target should emit `STATE` messages to stdout for further consumption. In this mode, which is on by default, STATE messages are buffered in memory until all the records that occurred before them are flushed according to the batch flushing schedule the target is configured with. | | `add_upsert_indexes` | `["boolean", "null"]` | `True` | Whether the Target should create column indexes on the important columns used during data loading. These indexes will make data loading slightly slower but the deduplication phase much faster. Defaults to on for better baseline performance. | +| `decimal_precision` | `["integer", "null"]` | `None`, the python default (28) | Set the precision of decimal operations. This is required to avoid `decimal.DivisionImpossible` errors for some taps which encode data type precision using JSON schema `minimum`/`maximum`/`multipleOf` validations. | | `before_run_sql` | `["string", "null"]` | `None` | Raw SQL statement(s) to execute as soon as the connection to Postgres is opened by the target. Useful for setup like `SET ROLE` or other connection state that is important. | | `after_run_sql` | `["string", "null"]` | `None` | Raw SQL statement(s) to execute as soon as the connection to Postgres is opened by the target. Useful for setup like `SET ROLE` or other connection state that is important. |