Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix decimal precision (decimal.InvalidOperation decimal.DivisionImpossible error) #207

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ here.
| `batch_detection_threshold` | `["integer", "null"]` | `5000`, or 1/40th `max_batch_rows` | How often, in rows received, to count the buffered rows and bytes to check if a flush is necessary. There's a slight performance penalty to checking the buffered records count or bytesize, so this controls how often this is polled in order to mitigate the penalty. This value is usually not necessary to set as the default is dynamically adjusted to check reasonably often. |
| `state_support` | `["boolean", "null"]` | `True` | Whether the Target should emit `STATE` messages to stdout for further consumption. In this mode, which is on by default, STATE messages are buffered in memory until all the records that occurred before them are flushed according to the batch flushing schedule the target is configured with. |
| `add_upsert_indexes` | `["boolean", "null"]` | `True` | Whether the Target should create column indexes on the important columns used during data loading. These indexes will make data loading slightly slower but the deduplication phase much faster. Defaults to on for better baseline performance. |
| `decimal_precision` | `["integer", "null"]` | `None`, the python default (28) | Set the precision of decimal operations. This is required to avoid `decimal.DivisionImpossible` errors for some taps which encode data type precision using JSON schema `minimum`/`maximum`/`multipleOf` validations. |
| `before_run_sql` | `["string", "null"]` | `None` | Raw SQL statement(s) to execute as soon as the connection to Postgres is opened by the target. Useful for setup like `SET ROLE` or other connection state that is important. |
| `after_run_sql` | `["string", "null"]` | `None` | Raw SQL statement(s) to execute as soon as the connection to Postgres is opened by the target. Useful for setup like `SET ROLE` or other connection state that is important. |

Expand Down
5 changes: 5 additions & 0 deletions target_postgres/target_tools.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import decimal
import http.client
import io
import json
Expand Down Expand Up @@ -53,6 +54,10 @@ def stream_to_target(stream, target, config={}):
max_batch_size = config.get('max_batch_size', 104857600) # 100MB
batch_detection_threshold = config.get('batch_detection_threshold', max(max_batch_rows / 40, 50))

prec = config.get('decimal_precision')
if prec:
decimal.getcontext().prec = prec

line_count = 0
for line in stream:
_line_handler(state_tracker,
Expand Down
48 changes: 48 additions & 0 deletions tests/unit/test_target_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,54 @@ class TestStream(ListStream):
assert rows_persisted == expected_rows


def test_record_with_target_postgres_precision():
values = [1, 1.0, 2, 2.0, 3, 7, 10.1]
records = []
for value in values:
records.append({
"type": "RECORD",
"stream": "test",
"record": {"multipleOfKey": value},
})

class TestStream(ListStream):
stream = [
{
"type": "SCHEMA",
"stream": "test",
"schema": {
"properties": {
"multipleOfKey": {
"type": [
"null",
"number"
],
"exclusiveMaximum": True,
"maximum": 100000000000000000000000000000000000000000000000000000000000000,
"multipleOf": 1e-38,
"exclusiveMinimum": True,
"minimum": -100000000000000000000000000000000000000000000000000000000000000
}
}
},
"key_properties": []
}
] + records

target = Target()

config = CONFIG.copy()
config['decimal_precision'] = 100
target_tools.stream_to_target(TestStream(), target, config=config)

expected_rows = len(records)
rows_persisted = 0
for call in target.calls['write_batch']:
rows_persisted += call['records_count']

assert rows_persisted == expected_rows


def test_state__capture(capsys):
stream = [
json.dumps({'type': 'STATE', 'value': {'test': 'state-1'}}),
Expand Down